From f98364f40eda9613cfec59fd35836fa7cdabe9ed Mon Sep 17 00:00:00 2001 From: Eroax Date: Fri, 9 Feb 2024 11:43:17 +0000 Subject: [PATCH] Add in General EventSub Nodes (#65) Adds in these nodes: - Twitch Add EventSub Subscription - Twitch EventSub Event Co-authored-by: Eroax Reviewed-on: https://codeberg.org/StreamGraph/StreamGraph/pulls/65 Co-authored-by: Eroax Co-committed-by: Eroax --- addons/no_twitch/demo/test_button.gd | 2 +- addons/no_twitch/eventsub_socket.gd | 51 +++++++++++-- addons/no_twitch/twitch_connection.gd | 38 +++++++--- classes/connections/connections.gd | 7 +- .../twitch_add_eventsub_subscription.gd | 76 +++++++++++++++++++ .../nodes/twitch/twitch_eventsub_event.gd | 54 +++++++++++++ 6 files changed, 207 insertions(+), 21 deletions(-) create mode 100644 classes/deck/nodes/twitch/twitch_add_eventsub_subscription.gd create mode 100644 classes/deck/nodes/twitch/twitch_eventsub_event.gd diff --git a/addons/no_twitch/demo/test_button.gd b/addons/no_twitch/demo/test_button.gd index 61a1962..ebc9937 100644 --- a/addons/no_twitch/demo/test_button.gd +++ b/addons/no_twitch/demo/test_button.gd @@ -5,5 +5,5 @@ extends Button func _pressed(): - OS.shell_open(twitch_connection.authenticate_with_twitch(["channel:read:redemptions", "chat:read", "chat:edit"])) + OS.shell_open(twitch_connection.authenticate_with_twitch()) diff --git a/addons/no_twitch/eventsub_socket.gd b/addons/no_twitch/eventsub_socket.gd index 682db60..d2a9c5f 100644 --- a/addons/no_twitch/eventsub_socket.gd +++ b/addons/no_twitch/eventsub_socket.gd @@ -11,29 +11,58 @@ signal notif_received(data) signal welcome_received() +var keepalive_timer := 0 +var timeout_time : int -func _init(owner): +var subscribed_events : Array[Twitch_Connection.EventSub_Subscription] + + +func _init(owner, timeout : int): connection = owner + timeout_time = timeout packet_received.connect(data_received) -func connect_to_eventsub(events : Array[Twitch_Connection.EventSub_Subscription] = []): +## Overrides the default poll function for [Websocket_Client] to add functionality for a keepalive timer and reconnecting when the connection is lost. +func poll_socket(): + + super() + + keepalive_timer += connection.get_process_delta_time() + if keepalive_timer >= timeout_time: + + socket_closed.emit() + close() + connect_to_eventsub(subscribed_events) + + + +## Handles setting up the connection to EventSub with an Array of the Events that should be subscribed to. +func connect_to_eventsub(events : Array[Twitch_Connection.EventSub_Subscription]): connect_to_url(eventsub_url) await welcome_received - if events.is_empty(): - - return - + subscribed_events = events + + return await subscribe_to_events(events) + + +## Utility function for subscribing to multiple Twitch EventSub events at once. +func subscribe_to_events(events : Array[Twitch_Connection.EventSub_Subscription]): var responses : Array[Twitch_Connection.HTTPResponse] for all in events: - responses.append(connection.add_eventsub_subscription(all)) + responses.append(await connection.add_eventsub_subscription(all)) + + + if responses.size() == 1: + + return responses[0] return responses @@ -53,6 +82,7 @@ func data_received(packet : PackedByteArray): "session_ping": + print("Ping Received") send_pong(info) @@ -61,6 +91,13 @@ func data_received(packet : PackedByteArray): notif_received.emit(info) + "session_keepalive": + + keepalive_timer = 0 + + + + print(info) func send_pong(pong): diff --git a/addons/no_twitch/twitch_connection.gd b/addons/no_twitch/twitch_connection.gd index 9a1a675..c1ad058 100644 --- a/addons/no_twitch/twitch_connection.gd +++ b/addons/no_twitch/twitch_connection.gd @@ -16,6 +16,7 @@ signal chat_received_rich(chat_dict) @export var redirect_port := "8000" var auth_url := "https://id.twitch.tv/oauth2/authorize?response_type=token&" +var auth_scopes : Array[String] = ["chat:read", "chat:edit", "channel:read:redemptions"] var auth_server : TCPServer @@ -50,7 +51,7 @@ func cache_user_data(): var resp = request_user_info() resp.response_received.connect(func(data): - user_info = data + user_info = data.data print("User Info Cached") ) @@ -59,9 +60,10 @@ func cache_user_data(): ## Handles the basic Twitch Authentication process to request and then later receive a Token (using [method check_auth_peer]). ## Returns the authentication URL. -func authenticate_with_twitch(client_id = client_id, scopes : Array[String] = ["chat:read", "chat:edit"]) -> String: +func authenticate_with_twitch(client_id = client_id, scopes : Array[String] = auth_scopes) -> String: auth_server = TCPServer.new() - var url := create_auth_url(scopes) + auth_scopes = scopes + var url := create_auth_url() auth_server.listen(int(redirect_port)) return url @@ -96,13 +98,23 @@ func send_chat(msg : String, channel : String = ""): ## Sets up an EventSub connection to allow subscribing to EventSub events. Ex. Alerts, Channel Point Redemptions etc. func setup_eventsub_connection(events : Array[EventSub_Subscription] = [], timeout_duration : int = 20): - eventsub_socket = eventsub_socket_class.new(self) + eventsub_socket = eventsub_socket_class.new(self, timeout_duration) var ret = await eventsub_socket.connect_to_eventsub(events) return ret func add_eventsub_subscription(sub_type : EventSub_Subscription): + if eventsub_socket == null: + + return await setup_eventsub_connection([sub_type]) + + + if sub_type.session_id == "": + + sub_type.session_id = eventsub_socket.session_id + + return twitch_request("https://api.twitch.tv/helix/eventsub/subscriptions", HTTPClient.METHOD_POST, str(sub_type.return_request_dictionary())) @@ -114,7 +126,7 @@ func subscribe_to_channel_points(user_id : String): return - return add_eventsub_subscription(EventSub_Subscription.new("channel.channel_points_custom_reward_redemption.add", {"broadcaster_user_id" : user_id}, eventsub_socket.session_id)) + return await add_eventsub_subscription(EventSub_Subscription.new("channel.channel_points_custom_reward_redemption.add", {"broadcaster_user_id" : user_id}, eventsub_socket.session_id)) func _process(delta): @@ -139,11 +151,11 @@ func _process(delta): ## [param scopes], Twitch Client ID ([param id]) and [param redirect_uri]. ## [param id] defaults to [member client_id] and both [param redirect] and ## [param redirect_port] -func create_auth_url(scopes : Array[String], port := redirect_port, id : String = client_id, redirect : String = redirect_uri) -> String: +func create_auth_url(port := redirect_port, id : String = client_id, redirect : String = redirect_uri) -> String: var str_scopes : String - for all in scopes: + for all in auth_scopes: str_scopes += " " + all str_scopes = str_scopes.strip_edges() @@ -264,8 +276,6 @@ func check_auth_peer(peer : StreamPeerTCP): func check_chat_socket(dict, rich = false): - prints(dict.user, dict.message) - if rich: chat_received_rich.emit(dict) @@ -292,12 +302,16 @@ class HTTPResponse: var info = JSON.parse_string(body.get_string_from_utf8()) + info["result"] = result + info["code"] = response_code + info["headers"] = headers + if info.has("error"): - push_error("NoTwitch Twitch API Error: " + info.error + " " + info.message) + push_error("NoTwitch Twitch API Error: " + str(info)) return - info = info.data[0] + info["data"] = info.data[0] print("Response Received") inf_data = info @@ -322,7 +336,7 @@ class EventSub_Subscription: var method = "websocket" - func _init(sub_type : String, cond : Dictionary, sess_id : String, vers : String = "1"): + func _init(sub_type : String, cond : Dictionary, sess_id : String = "", vers : String = "1"): subscription_type = sub_type version = vers diff --git a/classes/connections/connections.gd b/classes/connections/connections.gd index e1d2606..bf0d70b 100644 --- a/classes/connections/connections.gd +++ b/classes/connections/connections.gd @@ -4,7 +4,12 @@ class_name Connections static var obs_websocket -static var twitch +static var twitch + +static func _twitch_eventsub_event_received(event_data : Dictionary): + + DeckHolder.send_event(&"twitch_eventsub", event_data) + static func _twitch_chat_received(msg_dict : Dictionary): diff --git a/classes/deck/nodes/twitch/twitch_add_eventsub_subscription.gd b/classes/deck/nodes/twitch/twitch_add_eventsub_subscription.gd new file mode 100644 index 0000000..04b1732 --- /dev/null +++ b/classes/deck/nodes/twitch/twitch_add_eventsub_subscription.gd @@ -0,0 +1,76 @@ +# (c) 2023-present Eroax +# (c) 2023-present Yagich +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) +extends DeckNode + + +var subscription_data : Twitch_Connection.EventSub_Subscription + +func _init(): + name = "Twitch Add EventSub Subscription" + node_type = name.to_snake_case() + description = "Adds a subscription to a specific Twitch EventSub Event with the given dictionary 'condition' for the data needed." + + props_to_serialize = [] + + #Event Name + add_input_port(DeckType.Types.STRING, "Event Name", "field") + #Subscription Data + add_input_port(DeckType.Types.DICTIONARY, "Subscription Data") + #Trigger + add_input_port(DeckType.Types.ANY, "Add Subscription", "button") + + + + +func _receive(to_input_port, data: Variant, extra_data: Array = []): + + if to_input_port != 2: + + return + + + var input_data = await resolve_input_port_value_async(1) + + + if input_data == null or !"condition" in input_data.keys(): + DeckHolder.logger.log_node(name + ": Incorrect Subscription Data Connected, please supply a Dictionary with condition and if needed, version. Last supplied Data was: " + str(input_data), Logger.LogType.ERROR) + return + + + var sub_type = await resolve_input_port_value_async(0) + +# Creates an instance of Twitch_Connection.EventSub_Subscription to store the data with all the given inputs. + subscription_data = Twitch_Connection.EventSub_Subscription.new(sub_type, input_data.condition) +# Checks if the data has a version field, if so sets it on the EventSub_Subscription + if input_data.has("version"): + + subscription_data.version = input_data.version + + +# Calls the connection to add the Subscription + var req = await Connections.twitch.add_eventsub_subscription(subscription_data) + req.response_received.connect(eventsub_subscription_response) + + +## Handles checking the [Twitch_Connection.HTTPResponse] returned by [method Twitch_Connection.add_eventsub_subscription] to ensure that it succeeded. +func eventsub_subscription_response(data): + + match data.code: + + 202: + + var succ_string = name + ": EventSub Subscription Added for " + subscription_data.subscription_type + " successfully" + DeckHolder.logger.log_node(succ_string, Logger.LogType.INFO) + Connections.twitch.eventsub_socket.notif_received.connect(Connections._twitch_eventsub_event_received) + + + _: + + var error_string = name + ": Error" + data.code + " Received from Twitch when Subscribing to " + subscription_data.sub_type + " with " + str(subscription_data.return_request_dictionary) + + DeckHolder.logger.log_node(error_string, Logger.LogType.ERROR) + + + + diff --git a/classes/deck/nodes/twitch/twitch_eventsub_event.gd b/classes/deck/nodes/twitch/twitch_eventsub_event.gd new file mode 100644 index 0000000..671ec65 --- /dev/null +++ b/classes/deck/nodes/twitch/twitch_eventsub_event.gd @@ -0,0 +1,54 @@ +# (c) 2023-present Eroax +# (c) 2023-present Yagich +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) +extends DeckNode + +var cached_event_data : Dictionary + +func _init(): + name = "Twitch EventSub Event" + node_type = name.to_snake_case() + description = "Listens for a specific Event from Twitch EventSub" + + props_to_serialize = [] + +# Adds a port that allows specifying what type of event to listen for. + add_input_port(DeckType.Types.STRING, "Event Name", "field") +# Adds a port that outputs when the Event has been received + add_output_port(DeckType.Types.ANY, "Event Received") +# Adds a port that outputs the data received when the Event has been received. + add_output_port(DeckType.Types.DICTIONARY, "Event Data") + + + + +func _event_received(event_name: StringName, event_data: Dictionary = {}): + + if event_name != &"twitch_eventsub": + + return + + + var port_0 = await resolve_input_port_value_async(0) + print("Event Name ", event_data) + if port_0 == null or port_0 != event_data.payload.subscription.type: + + return + + +# Sends to indicate that the specified event has happened. + send(0, null) +# Sends the data along as well as the fact that the event happened. While also caching the event data for later access + cached_event_data = event_data + send(1, event_data) + + +func _value_request(port): + + if port != 1: + + return + + + return cached_event_data +