add an RPC client abstraction class (#109)

Reviewed-on: https://codeberg.org/StreamGraph/StreamGraph/pulls/109
Co-authored-by: Lera Elvoé <yagich@poto.cafe>
Co-committed-by: Lera Elvoé <yagich@poto.cafe>
This commit is contained in:
Lera Elvoé 2024-03-16 07:20:47 +00:00 committed by yagich
parent d2d55297a9
commit 804abfed64
6 changed files with 147 additions and 9 deletions

View file

@ -11,9 +11,12 @@ const SCOPES_DIR := "res://rpc_renderer/scopes/"
@export var default_port := 6907 @export var default_port := 6907
var clients: Dictionary # Dictionary[int -> id, Client]
var _ws: WebSocketServer = WebSocketServer.new() var _ws: WebSocketServer = WebSocketServer.new()
var scopes: Array[RPCScope] var scopes: Array[RPCScope]
var system_scope: RPCScopeSystem
var request_schema: Zodot var request_schema: Zodot
@ -29,16 +32,33 @@ func load_scopes() -> void:
current = d.get_next() current = d.get_next()
for scope in scopes: for scope in scopes:
if scope.name == "system":
system_scope = scope
scope.event.connect( scope.event.connect(
func(event: RPCEvent): func(event: RPCEvent):
send_frame(0, event) if event.to_peer != 0:
send_frame(event.to_peer, event)
return
for client_id in clients:
var client: Client = get_client(client_id)
if not client.subscriptions.has(event.scope):
continue
if event.type in (client.subscriptions[event.scope] as Array):
send_frame(client_id, event)
) )
scope.response.connect( scope.response.connect(
func(response: RPCRequestResponse): func(response: RPCRequestResponse):
send_frame(response.peer_id, response) send_frame(response.peer_id, response)
if response.event_counterpart != null: if response.event_counterpart != null:
send_frame(-response.peer_id, response.event_counterpart) var event := response.event_counterpart
for client_id in clients:
if client_id == response.peer_id:
continue
var client: Client = get_client(client_id)
if event.name in (client.subscriptions[event.scope] as Array):
send_frame(client_id, event)
) )
@ -90,6 +110,22 @@ func send_frame(peer_id: int, frame: RPCFrame) -> void:
_ws.send(peer_id, JSON.stringify(frame.to_dict(), "", false)) _ws.send(peer_id, JSON.stringify(frame.to_dict(), "", false))
func get_client(peer_id: int) -> Client:
return clients.get(peer_id)
func drop_client(peer_id: int, reason: String) -> void:
var disconnect_data := {
"disconnect": {
"message": "You have been disconnected: %s" % reason
}
}
_ws.send(peer_id, JSON.stringify(disconnect_data, "", false))
_ws.peers.erase(peer_id)
clients.erase(peer_id)
func _on_ws_message(peer_id: int, message: Variant) -> void: func _on_ws_message(peer_id: int, message: Variant) -> void:
if not message is String: if not message is String:
return return
@ -104,7 +140,15 @@ func _on_ws_message(peer_id: int, message: Variant) -> void:
send_frame(peer_id, RPCError.new(result.error)) send_frame(peer_id, RPCError.new(result.error))
return return
var req := RPCRequest.from_dict(result.data, peer_id) var req := RPCRequest.from_dict(result.data, get_client(peer_id))
if not get_client(peer_id).identified:
if not (req.scope == "system" and req.operation.type == "identify"):
drop_client(peer_id, "You must identify your client first.")
return
system_scope.identify(req)
return
var scope_idx := -1 var scope_idx := -1
for i in scopes.size(): for i in scopes.size():
var scope := scopes[i] var scope := scopes[i]
@ -121,8 +165,17 @@ func _on_ws_message(peer_id: int, message: Variant) -> void:
func _on_ws_client_connected(peer_id: int) -> void: func _on_ws_client_connected(peer_id: int) -> void:
pass var c := Client.new()
c.id = peer_id
clients[peer_id] = c
RPCSignalLayer.signals.client_connected.emit(c)
func _on_ws_client_disconnected(peer_id: int) -> void: func _on_ws_client_disconnected(peer_id: int) -> void:
pass clients.erase(peer_id)
class Client:
var id: int
var identified: bool = false
var subscriptions: Dictionary = {}

View file

@ -62,6 +62,10 @@ static func _on_node_renamed(new_name: String, node: DeckNode) -> void:
class Signals: class Signals:
#region system
signal client_connected(client: RPCRenderer.Client)
#endregion
#region deck holder #region deck holder
signal deck_added(deck_id: String) signal deck_added(deck_id: String)
signal deck_closed(deck_id: String) signal deck_closed(deck_id: String)

View file

@ -18,7 +18,7 @@ func _init() -> void:
"group_nodes": {"callable": group_nodes, "event_name": "nodes_grouped"}, "group_nodes": {"callable": group_nodes, "event_name": "nodes_grouped"},
"set_variable": {"callable": set_variable, "event_name": ""}, "set_variable": {"callable": set_variable, "event_name": "variables_updated"},
"get_variable": {"callable": get_variable, "event_name": ""}, "get_variable": {"callable": get_variable, "event_name": ""},
"get_variable_list": {"callable": get_variable_list, "event_name": ""}, "get_variable_list": {"callable": get_variable_list, "event_name": ""},

View file

@ -0,0 +1,77 @@
# (c) 2023-present Eroax
# (c) 2023-present Yagich
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
extends RPCScope
class_name RPCScopeSystem
func _init() -> void:
name = "system"
operation_types = {
"identify": {"callable": identify, "event_name": ""},
"add_subscription": {"callable": add_subscription, "event_name": ""},
"get_subscriptions": {"callable": get_subscriptions, "event_name": ""},
"remove_subscriptions": {"callable": remove_subscriptions, "event_name": ""},
}
RPCSignalLayer.signals.client_connected.connect(_on_client_connected)
func identify(r: RPCRequest) -> void:
var subscriptions: Dictionary = r.operation.payload.get("subscriptions", {})
if subscriptions.is_empty():
subscriptions = {
"deck": [
"node_added", "node_removed", "nodes_connected", "nodes_disconnected",
"nodes_grouped", "variables_updated"
],
"deck_holder": [
"new_deck", "deck_closed"
]
}
r.client.subscriptions = subscriptions
r.client.identified = true
var resp := create_response(r, {"identified": true})
response.emit(resp)
func add_subscription(r: RPCRequest) -> void:
var new_subscriptions: Dictionary = r.operation.payload.subscriptions
for subscription_scope: String in new_subscriptions:
var event_names: Array = new_subscriptions[subscription_scope]
for event: String in event_names:
if event not in r.client.subscriptions[subscription_scope]:
(r.client.subscriptions[subscription_scope] as Array).append(event)
response.emit(create_generic_success(r))
func get_subscriptions(r: RPCRequest) -> void:
var resp := create_response(r, {"subscriptions": r.client.subscriptions})
response.emit(resp)
func remove_subscriptions(r: RPCRequest) -> void:
var to_remove: Dictionary = r.operation.payload.subscriptions
var scopes_to_remove: Array[String]
for subscription_scope: String in to_remove:
if subscription_scope not in r.client.subscriptions:
continue
var event_names: Array = to_remove[subscription_scope]
for event: String in event_names:
(r.client.subscriptions[subscription_scope] as Array).erase(event)
if (r.client.subscriptions[subscription_scope] as Array).is_empty():
scopes_to_remove.append(subscription_scope)
for scope in scopes_to_remove:
r.client.subscriptions.erase(scope)
response.emit(create_generic_success(r))
func _on_client_connected(client: RPCRenderer.Client) -> void:
var ev := create_event("identify", {})
ev.to_peer = client.id
event.emit(ev)

View file

@ -13,6 +13,8 @@ var scope: String
var data: Variant var data: Variant
var condition: Dictionary var condition: Dictionary
var to_peer: int = 0
func _init( func _init(
p_type: String, p_type: String,

View file

@ -18,8 +18,10 @@ var keep: Dictionary
## Which peer initiated this request. ## Which peer initiated this request.
var peer_id: int var peer_id: int
var client: RPCRenderer.Client
static func from_dict(d: Dictionary, p_peer_id: int) -> RPCRequest:
static func from_dict(d: Dictionary, p_client: RPCRenderer.Client) -> RPCRequest:
if not d.has("request"): if not d.has("request"):
return null return null
@ -28,7 +30,7 @@ static func from_dict(d: Dictionary, p_peer_id: int) -> RPCRequest:
r.scope = d.request.scope r.scope = d.request.scope
r.operation = RPCOperation.from_dict(d.request.operation) r.operation = RPCOperation.from_dict(d.request.operation)
r.keep = d.request.get("keep", {}) r.keep = d.request.get("keep", {})
r.client = p_client
r.peer_id = p_peer_id r.peer_id = p_client.id
return r return r