mirror of
https://codeberg.org/StreamGraph/StreamGraph.git
synced 2024-11-13 19:49:55 +01:00
Add in General EventSub Nodes (#65)
Adds in these nodes: - Twitch Add EventSub Subscription - Twitch EventSub Event Co-authored-by: Eroax <eroaxe.business@gmail.com> Reviewed-on: https://codeberg.org/StreamGraph/StreamGraph/pulls/65 Co-authored-by: Eroax <eroax@noreply.codeberg.org> Co-committed-by: Eroax <eroax@noreply.codeberg.org>
This commit is contained in:
parent
51046034e4
commit
f98364f40e
6 changed files with 207 additions and 21 deletions
|
@ -5,5 +5,5 @@ extends Button
|
||||||
|
|
||||||
func _pressed():
|
func _pressed():
|
||||||
|
|
||||||
OS.shell_open(twitch_connection.authenticate_with_twitch(["channel:read:redemptions", "chat:read", "chat:edit"]))
|
OS.shell_open(twitch_connection.authenticate_with_twitch())
|
||||||
|
|
||||||
|
|
|
@ -11,29 +11,58 @@ signal notif_received(data)
|
||||||
|
|
||||||
signal welcome_received()
|
signal welcome_received()
|
||||||
|
|
||||||
|
var keepalive_timer := 0
|
||||||
|
var timeout_time : int
|
||||||
|
|
||||||
func _init(owner):
|
var subscribed_events : Array[Twitch_Connection.EventSub_Subscription]
|
||||||
|
|
||||||
|
|
||||||
|
func _init(owner, timeout : int):
|
||||||
|
|
||||||
connection = owner
|
connection = owner
|
||||||
|
timeout_time = timeout
|
||||||
|
|
||||||
packet_received.connect(data_received)
|
packet_received.connect(data_received)
|
||||||
|
|
||||||
|
|
||||||
func connect_to_eventsub(events : Array[Twitch_Connection.EventSub_Subscription] = []):
|
## Overrides the default poll function for [Websocket_Client] to add functionality for a keepalive timer and reconnecting when the connection is lost.
|
||||||
|
func poll_socket():
|
||||||
|
|
||||||
|
super()
|
||||||
|
|
||||||
|
keepalive_timer += connection.get_process_delta_time()
|
||||||
|
if keepalive_timer >= timeout_time:
|
||||||
|
|
||||||
|
socket_closed.emit()
|
||||||
|
close()
|
||||||
|
connect_to_eventsub(subscribed_events)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
## Handles setting up the connection to EventSub with an Array of the Events that should be subscribed to.
|
||||||
|
func connect_to_eventsub(events : Array[Twitch_Connection.EventSub_Subscription]):
|
||||||
|
|
||||||
connect_to_url(eventsub_url)
|
connect_to_url(eventsub_url)
|
||||||
await welcome_received
|
await welcome_received
|
||||||
|
|
||||||
if events.is_empty():
|
subscribed_events = events
|
||||||
|
|
||||||
return
|
return await subscribe_to_events(events)
|
||||||
|
|
||||||
|
|
||||||
|
## Utility function for subscribing to multiple Twitch EventSub events at once.
|
||||||
|
func subscribe_to_events(events : Array[Twitch_Connection.EventSub_Subscription]):
|
||||||
|
|
||||||
var responses : Array[Twitch_Connection.HTTPResponse]
|
var responses : Array[Twitch_Connection.HTTPResponse]
|
||||||
|
|
||||||
for all in events:
|
for all in events:
|
||||||
|
|
||||||
responses.append(connection.add_eventsub_subscription(all))
|
responses.append(await connection.add_eventsub_subscription(all))
|
||||||
|
|
||||||
|
|
||||||
|
if responses.size() == 1:
|
||||||
|
|
||||||
|
return responses[0]
|
||||||
|
|
||||||
|
|
||||||
return responses
|
return responses
|
||||||
|
@ -53,6 +82,7 @@ func data_received(packet : PackedByteArray):
|
||||||
|
|
||||||
"session_ping":
|
"session_ping":
|
||||||
|
|
||||||
|
print("Ping Received")
|
||||||
send_pong(info)
|
send_pong(info)
|
||||||
|
|
||||||
|
|
||||||
|
@ -61,6 +91,13 @@ func data_received(packet : PackedByteArray):
|
||||||
notif_received.emit(info)
|
notif_received.emit(info)
|
||||||
|
|
||||||
|
|
||||||
|
"session_keepalive":
|
||||||
|
|
||||||
|
keepalive_timer = 0
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
print(info)
|
||||||
|
|
||||||
|
|
||||||
func send_pong(pong):
|
func send_pong(pong):
|
||||||
|
|
|
@ -16,6 +16,7 @@ signal chat_received_rich(chat_dict)
|
||||||
@export var redirect_port := "8000"
|
@export var redirect_port := "8000"
|
||||||
|
|
||||||
var auth_url := "https://id.twitch.tv/oauth2/authorize?response_type=token&"
|
var auth_url := "https://id.twitch.tv/oauth2/authorize?response_type=token&"
|
||||||
|
var auth_scopes : Array[String] = ["chat:read", "chat:edit", "channel:read:redemptions"]
|
||||||
|
|
||||||
var auth_server : TCPServer
|
var auth_server : TCPServer
|
||||||
|
|
||||||
|
@ -50,7 +51,7 @@ func cache_user_data():
|
||||||
var resp = request_user_info()
|
var resp = request_user_info()
|
||||||
resp.response_received.connect(func(data):
|
resp.response_received.connect(func(data):
|
||||||
|
|
||||||
user_info = data
|
user_info = data.data
|
||||||
print("User Info Cached")
|
print("User Info Cached")
|
||||||
|
|
||||||
)
|
)
|
||||||
|
@ -59,9 +60,10 @@ func cache_user_data():
|
||||||
|
|
||||||
## Handles the basic Twitch Authentication process to request and then later receive a Token (using [method check_auth_peer]).
|
## Handles the basic Twitch Authentication process to request and then later receive a Token (using [method check_auth_peer]).
|
||||||
## Returns the authentication URL.
|
## Returns the authentication URL.
|
||||||
func authenticate_with_twitch(client_id = client_id, scopes : Array[String] = ["chat:read", "chat:edit"]) -> String:
|
func authenticate_with_twitch(client_id = client_id, scopes : Array[String] = auth_scopes) -> String:
|
||||||
auth_server = TCPServer.new()
|
auth_server = TCPServer.new()
|
||||||
var url := create_auth_url(scopes)
|
auth_scopes = scopes
|
||||||
|
var url := create_auth_url()
|
||||||
auth_server.listen(int(redirect_port))
|
auth_server.listen(int(redirect_port))
|
||||||
return url
|
return url
|
||||||
|
|
||||||
|
@ -96,13 +98,23 @@ func send_chat(msg : String, channel : String = ""):
|
||||||
## Sets up an EventSub connection to allow subscribing to EventSub events. Ex. Alerts, Channel Point Redemptions etc.
|
## Sets up an EventSub connection to allow subscribing to EventSub events. Ex. Alerts, Channel Point Redemptions etc.
|
||||||
func setup_eventsub_connection(events : Array[EventSub_Subscription] = [], timeout_duration : int = 20):
|
func setup_eventsub_connection(events : Array[EventSub_Subscription] = [], timeout_duration : int = 20):
|
||||||
|
|
||||||
eventsub_socket = eventsub_socket_class.new(self)
|
eventsub_socket = eventsub_socket_class.new(self, timeout_duration)
|
||||||
var ret = await eventsub_socket.connect_to_eventsub(events)
|
var ret = await eventsub_socket.connect_to_eventsub(events)
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
|
|
||||||
func add_eventsub_subscription(sub_type : EventSub_Subscription):
|
func add_eventsub_subscription(sub_type : EventSub_Subscription):
|
||||||
|
|
||||||
|
if eventsub_socket == null:
|
||||||
|
|
||||||
|
return await setup_eventsub_connection([sub_type])
|
||||||
|
|
||||||
|
|
||||||
|
if sub_type.session_id == "":
|
||||||
|
|
||||||
|
sub_type.session_id = eventsub_socket.session_id
|
||||||
|
|
||||||
|
|
||||||
return twitch_request("https://api.twitch.tv/helix/eventsub/subscriptions", HTTPClient.METHOD_POST, str(sub_type.return_request_dictionary()))
|
return twitch_request("https://api.twitch.tv/helix/eventsub/subscriptions", HTTPClient.METHOD_POST, str(sub_type.return_request_dictionary()))
|
||||||
|
|
||||||
|
|
||||||
|
@ -114,7 +126,7 @@ func subscribe_to_channel_points(user_id : String):
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
||||||
return add_eventsub_subscription(EventSub_Subscription.new("channel.channel_points_custom_reward_redemption.add", {"broadcaster_user_id" : user_id}, eventsub_socket.session_id))
|
return await add_eventsub_subscription(EventSub_Subscription.new("channel.channel_points_custom_reward_redemption.add", {"broadcaster_user_id" : user_id}, eventsub_socket.session_id))
|
||||||
|
|
||||||
|
|
||||||
func _process(delta):
|
func _process(delta):
|
||||||
|
@ -139,11 +151,11 @@ func _process(delta):
|
||||||
## [param scopes], Twitch Client ID ([param id]) and [param redirect_uri].
|
## [param scopes], Twitch Client ID ([param id]) and [param redirect_uri].
|
||||||
## [param id] defaults to [member client_id] and both [param redirect] and
|
## [param id] defaults to [member client_id] and both [param redirect] and
|
||||||
## [param redirect_port]
|
## [param redirect_port]
|
||||||
func create_auth_url(scopes : Array[String], port := redirect_port, id : String = client_id, redirect : String = redirect_uri) -> String:
|
func create_auth_url(port := redirect_port, id : String = client_id, redirect : String = redirect_uri) -> String:
|
||||||
|
|
||||||
var str_scopes : String
|
var str_scopes : String
|
||||||
|
|
||||||
for all in scopes:
|
for all in auth_scopes:
|
||||||
|
|
||||||
str_scopes += " " + all
|
str_scopes += " " + all
|
||||||
str_scopes = str_scopes.strip_edges()
|
str_scopes = str_scopes.strip_edges()
|
||||||
|
@ -264,8 +276,6 @@ func check_auth_peer(peer : StreamPeerTCP):
|
||||||
|
|
||||||
func check_chat_socket(dict, rich = false):
|
func check_chat_socket(dict, rich = false):
|
||||||
|
|
||||||
prints(dict.user, dict.message)
|
|
||||||
|
|
||||||
if rich:
|
if rich:
|
||||||
|
|
||||||
chat_received_rich.emit(dict)
|
chat_received_rich.emit(dict)
|
||||||
|
@ -292,12 +302,16 @@ class HTTPResponse:
|
||||||
|
|
||||||
var info = JSON.parse_string(body.get_string_from_utf8())
|
var info = JSON.parse_string(body.get_string_from_utf8())
|
||||||
|
|
||||||
|
info["result"] = result
|
||||||
|
info["code"] = response_code
|
||||||
|
info["headers"] = headers
|
||||||
|
|
||||||
if info.has("error"):
|
if info.has("error"):
|
||||||
|
|
||||||
push_error("NoTwitch Twitch API Error: " + info.error + " " + info.message)
|
push_error("NoTwitch Twitch API Error: " + str(info))
|
||||||
return
|
return
|
||||||
|
|
||||||
info = info.data[0]
|
info["data"] = info.data[0]
|
||||||
|
|
||||||
print("Response Received")
|
print("Response Received")
|
||||||
inf_data = info
|
inf_data = info
|
||||||
|
@ -322,7 +336,7 @@ class EventSub_Subscription:
|
||||||
|
|
||||||
var method = "websocket"
|
var method = "websocket"
|
||||||
|
|
||||||
func _init(sub_type : String, cond : Dictionary, sess_id : String, vers : String = "1"):
|
func _init(sub_type : String, cond : Dictionary, sess_id : String = "", vers : String = "1"):
|
||||||
|
|
||||||
subscription_type = sub_type
|
subscription_type = sub_type
|
||||||
version = vers
|
version = vers
|
||||||
|
|
|
@ -6,6 +6,11 @@ class_name Connections
|
||||||
static var obs_websocket
|
static var obs_websocket
|
||||||
static var twitch
|
static var twitch
|
||||||
|
|
||||||
|
static func _twitch_eventsub_event_received(event_data : Dictionary):
|
||||||
|
|
||||||
|
DeckHolder.send_event(&"twitch_eventsub", event_data)
|
||||||
|
|
||||||
|
|
||||||
static func _twitch_chat_received(msg_dict : Dictionary):
|
static func _twitch_chat_received(msg_dict : Dictionary):
|
||||||
|
|
||||||
DeckHolder.send_event(&"twitch_chat", msg_dict)
|
DeckHolder.send_event(&"twitch_chat", msg_dict)
|
||||||
|
|
|
@ -0,0 +1,76 @@
|
||||||
|
# (c) 2023-present Eroax
|
||||||
|
# (c) 2023-present Yagich
|
||||||
|
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
|
||||||
|
extends DeckNode
|
||||||
|
|
||||||
|
|
||||||
|
var subscription_data : Twitch_Connection.EventSub_Subscription
|
||||||
|
|
||||||
|
func _init():
|
||||||
|
name = "Twitch Add EventSub Subscription"
|
||||||
|
node_type = name.to_snake_case()
|
||||||
|
description = "Adds a subscription to a specific Twitch EventSub Event with the given dictionary 'condition' for the data needed."
|
||||||
|
|
||||||
|
props_to_serialize = []
|
||||||
|
|
||||||
|
#Event Name
|
||||||
|
add_input_port(DeckType.Types.STRING, "Event Name", "field")
|
||||||
|
#Subscription Data
|
||||||
|
add_input_port(DeckType.Types.DICTIONARY, "Subscription Data")
|
||||||
|
#Trigger
|
||||||
|
add_input_port(DeckType.Types.ANY, "Add Subscription", "button")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
func _receive(to_input_port, data: Variant, extra_data: Array = []):
|
||||||
|
|
||||||
|
if to_input_port != 2:
|
||||||
|
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
var input_data = await resolve_input_port_value_async(1)
|
||||||
|
|
||||||
|
|
||||||
|
if input_data == null or !"condition" in input_data.keys():
|
||||||
|
DeckHolder.logger.log_node(name + ": Incorrect Subscription Data Connected, please supply a Dictionary with condition and if needed, version. Last supplied Data was: " + str(input_data), Logger.LogType.ERROR)
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
var sub_type = await resolve_input_port_value_async(0)
|
||||||
|
|
||||||
|
# Creates an instance of Twitch_Connection.EventSub_Subscription to store the data with all the given inputs.
|
||||||
|
subscription_data = Twitch_Connection.EventSub_Subscription.new(sub_type, input_data.condition)
|
||||||
|
# Checks if the data has a version field, if so sets it on the EventSub_Subscription
|
||||||
|
if input_data.has("version"):
|
||||||
|
|
||||||
|
subscription_data.version = input_data.version
|
||||||
|
|
||||||
|
|
||||||
|
# Calls the connection to add the Subscription
|
||||||
|
var req = await Connections.twitch.add_eventsub_subscription(subscription_data)
|
||||||
|
req.response_received.connect(eventsub_subscription_response)
|
||||||
|
|
||||||
|
|
||||||
|
## Handles checking the [Twitch_Connection.HTTPResponse] returned by [method Twitch_Connection.add_eventsub_subscription] to ensure that it succeeded.
|
||||||
|
func eventsub_subscription_response(data):
|
||||||
|
|
||||||
|
match data.code:
|
||||||
|
|
||||||
|
202:
|
||||||
|
|
||||||
|
var succ_string = name + ": EventSub Subscription Added for " + subscription_data.subscription_type + " successfully"
|
||||||
|
DeckHolder.logger.log_node(succ_string, Logger.LogType.INFO)
|
||||||
|
Connections.twitch.eventsub_socket.notif_received.connect(Connections._twitch_eventsub_event_received)
|
||||||
|
|
||||||
|
|
||||||
|
_:
|
||||||
|
|
||||||
|
var error_string = name + ": Error" + data.code + " Received from Twitch when Subscribing to " + subscription_data.sub_type + " with " + str(subscription_data.return_request_dictionary)
|
||||||
|
|
||||||
|
DeckHolder.logger.log_node(error_string, Logger.LogType.ERROR)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
54
classes/deck/nodes/twitch/twitch_eventsub_event.gd
Normal file
54
classes/deck/nodes/twitch/twitch_eventsub_event.gd
Normal file
|
@ -0,0 +1,54 @@
|
||||||
|
# (c) 2023-present Eroax
|
||||||
|
# (c) 2023-present Yagich
|
||||||
|
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
|
||||||
|
extends DeckNode
|
||||||
|
|
||||||
|
var cached_event_data : Dictionary
|
||||||
|
|
||||||
|
func _init():
|
||||||
|
name = "Twitch EventSub Event"
|
||||||
|
node_type = name.to_snake_case()
|
||||||
|
description = "Listens for a specific Event from Twitch EventSub"
|
||||||
|
|
||||||
|
props_to_serialize = []
|
||||||
|
|
||||||
|
# Adds a port that allows specifying what type of event to listen for.
|
||||||
|
add_input_port(DeckType.Types.STRING, "Event Name", "field")
|
||||||
|
# Adds a port that outputs when the Event has been received
|
||||||
|
add_output_port(DeckType.Types.ANY, "Event Received")
|
||||||
|
# Adds a port that outputs the data received when the Event has been received.
|
||||||
|
add_output_port(DeckType.Types.DICTIONARY, "Event Data")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
func _event_received(event_name: StringName, event_data: Dictionary = {}):
|
||||||
|
|
||||||
|
if event_name != &"twitch_eventsub":
|
||||||
|
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
var port_0 = await resolve_input_port_value_async(0)
|
||||||
|
print("Event Name ", event_data)
|
||||||
|
if port_0 == null or port_0 != event_data.payload.subscription.type:
|
||||||
|
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
# Sends to indicate that the specified event has happened.
|
||||||
|
send(0, null)
|
||||||
|
# Sends the data along as well as the fact that the event happened. While also caching the event data for later access
|
||||||
|
cached_event_data = event_data
|
||||||
|
send(1, event_data)
|
||||||
|
|
||||||
|
|
||||||
|
func _value_request(port):
|
||||||
|
|
||||||
|
if port != 1:
|
||||||
|
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
return cached_event_data
|
||||||
|
|
Loading…
Reference in a new issue