From 804abfed6481040fc2f8d6d3621286033770b81f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lera=20Elvo=C3=A9?= Date: Sat, 16 Mar 2024 07:20:47 +0000 Subject: [PATCH] add an RPC client abstraction class (#109) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reviewed-on: https://codeberg.org/StreamGraph/StreamGraph/pulls/109 Co-authored-by: Lera ElvoƩ Co-committed-by: Lera ElvoƩ --- rpc_renderer/rpc_renderer.gd | 63 +++++++++++++++++++++-- rpc_renderer/rpc_signal_layer.gd | 4 ++ rpc_renderer/scopes/scope_deck.gd | 2 +- rpc_renderer/scopes/scope_system.gd | 77 +++++++++++++++++++++++++++++ rpc_renderer/types/rpc_event.gd | 2 + rpc_renderer/types/rpc_request.gd | 8 +-- 6 files changed, 147 insertions(+), 9 deletions(-) create mode 100644 rpc_renderer/scopes/scope_system.gd diff --git a/rpc_renderer/rpc_renderer.gd b/rpc_renderer/rpc_renderer.gd index 5e40798..8508d47 100644 --- a/rpc_renderer/rpc_renderer.gd +++ b/rpc_renderer/rpc_renderer.gd @@ -11,9 +11,12 @@ const SCOPES_DIR := "res://rpc_renderer/scopes/" @export var default_port := 6907 +var clients: Dictionary # Dictionary[int -> id, Client] + var _ws: WebSocketServer = WebSocketServer.new() var scopes: Array[RPCScope] +var system_scope: RPCScopeSystem var request_schema: Zodot @@ -29,16 +32,33 @@ func load_scopes() -> void: current = d.get_next() for scope in scopes: + if scope.name == "system": + system_scope = scope + scope.event.connect( func(event: RPCEvent): - send_frame(0, event) + if event.to_peer != 0: + send_frame(event.to_peer, event) + return + for client_id in clients: + var client: Client = get_client(client_id) + if not client.subscriptions.has(event.scope): + continue + if event.type in (client.subscriptions[event.scope] as Array): + send_frame(client_id, event) ) scope.response.connect( func(response: RPCRequestResponse): send_frame(response.peer_id, response) if response.event_counterpart != null: - send_frame(-response.peer_id, response.event_counterpart) + var event := response.event_counterpart + for client_id in clients: + if client_id == response.peer_id: + continue + var client: Client = get_client(client_id) + if event.name in (client.subscriptions[event.scope] as Array): + send_frame(client_id, event) ) @@ -90,6 +110,22 @@ func send_frame(peer_id: int, frame: RPCFrame) -> void: _ws.send(peer_id, JSON.stringify(frame.to_dict(), "", false)) +func get_client(peer_id: int) -> Client: + return clients.get(peer_id) + + +func drop_client(peer_id: int, reason: String) -> void: + var disconnect_data := { + "disconnect": { + "message": "You have been disconnected: %s" % reason + } + } + + _ws.send(peer_id, JSON.stringify(disconnect_data, "", false)) + _ws.peers.erase(peer_id) + clients.erase(peer_id) + + func _on_ws_message(peer_id: int, message: Variant) -> void: if not message is String: return @@ -104,7 +140,15 @@ func _on_ws_message(peer_id: int, message: Variant) -> void: send_frame(peer_id, RPCError.new(result.error)) return - var req := RPCRequest.from_dict(result.data, peer_id) + var req := RPCRequest.from_dict(result.data, get_client(peer_id)) + if not get_client(peer_id).identified: + if not (req.scope == "system" and req.operation.type == "identify"): + drop_client(peer_id, "You must identify your client first.") + return + + system_scope.identify(req) + return + var scope_idx := -1 for i in scopes.size(): var scope := scopes[i] @@ -121,8 +165,17 @@ func _on_ws_message(peer_id: int, message: Variant) -> void: func _on_ws_client_connected(peer_id: int) -> void: - pass + var c := Client.new() + c.id = peer_id + clients[peer_id] = c + RPCSignalLayer.signals.client_connected.emit(c) func _on_ws_client_disconnected(peer_id: int) -> void: - pass + clients.erase(peer_id) + + +class Client: + var id: int + var identified: bool = false + var subscriptions: Dictionary = {} diff --git a/rpc_renderer/rpc_signal_layer.gd b/rpc_renderer/rpc_signal_layer.gd index f5d0dac..e5844d5 100644 --- a/rpc_renderer/rpc_signal_layer.gd +++ b/rpc_renderer/rpc_signal_layer.gd @@ -62,6 +62,10 @@ static func _on_node_renamed(new_name: String, node: DeckNode) -> void: class Signals: + #region system + signal client_connected(client: RPCRenderer.Client) + #endregion + #region deck holder signal deck_added(deck_id: String) signal deck_closed(deck_id: String) diff --git a/rpc_renderer/scopes/scope_deck.gd b/rpc_renderer/scopes/scope_deck.gd index cdaace5..4226f2b 100644 --- a/rpc_renderer/scopes/scope_deck.gd +++ b/rpc_renderer/scopes/scope_deck.gd @@ -18,7 +18,7 @@ func _init() -> void: "group_nodes": {"callable": group_nodes, "event_name": "nodes_grouped"}, - "set_variable": {"callable": set_variable, "event_name": ""}, + "set_variable": {"callable": set_variable, "event_name": "variables_updated"}, "get_variable": {"callable": get_variable, "event_name": ""}, "get_variable_list": {"callable": get_variable_list, "event_name": ""}, diff --git a/rpc_renderer/scopes/scope_system.gd b/rpc_renderer/scopes/scope_system.gd new file mode 100644 index 0000000..2f85fbd --- /dev/null +++ b/rpc_renderer/scopes/scope_system.gd @@ -0,0 +1,77 @@ +# (c) 2023-present Eroax +# (c) 2023-present Yagich +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) +extends RPCScope +class_name RPCScopeSystem + + +func _init() -> void: + name = "system" + operation_types = { + "identify": {"callable": identify, "event_name": ""}, + "add_subscription": {"callable": add_subscription, "event_name": ""}, + "get_subscriptions": {"callable": get_subscriptions, "event_name": ""}, + "remove_subscriptions": {"callable": remove_subscriptions, "event_name": ""}, + } + + RPCSignalLayer.signals.client_connected.connect(_on_client_connected) + + +func identify(r: RPCRequest) -> void: + var subscriptions: Dictionary = r.operation.payload.get("subscriptions", {}) + if subscriptions.is_empty(): + subscriptions = { + "deck": [ + "node_added", "node_removed", "nodes_connected", "nodes_disconnected", + "nodes_grouped", "variables_updated" + ], + "deck_holder": [ + "new_deck", "deck_closed" + ] + } + r.client.subscriptions = subscriptions + r.client.identified = true + + var resp := create_response(r, {"identified": true}) + response.emit(resp) + + +func add_subscription(r: RPCRequest) -> void: + var new_subscriptions: Dictionary = r.operation.payload.subscriptions + for subscription_scope: String in new_subscriptions: + var event_names: Array = new_subscriptions[subscription_scope] + for event: String in event_names: + if event not in r.client.subscriptions[subscription_scope]: + (r.client.subscriptions[subscription_scope] as Array).append(event) + + response.emit(create_generic_success(r)) + + +func get_subscriptions(r: RPCRequest) -> void: + var resp := create_response(r, {"subscriptions": r.client.subscriptions}) + response.emit(resp) + + +func remove_subscriptions(r: RPCRequest) -> void: + var to_remove: Dictionary = r.operation.payload.subscriptions + + var scopes_to_remove: Array[String] + for subscription_scope: String in to_remove: + if subscription_scope not in r.client.subscriptions: + continue + var event_names: Array = to_remove[subscription_scope] + for event: String in event_names: + (r.client.subscriptions[subscription_scope] as Array).erase(event) + if (r.client.subscriptions[subscription_scope] as Array).is_empty(): + scopes_to_remove.append(subscription_scope) + + for scope in scopes_to_remove: + r.client.subscriptions.erase(scope) + + response.emit(create_generic_success(r)) + + +func _on_client_connected(client: RPCRenderer.Client) -> void: + var ev := create_event("identify", {}) + ev.to_peer = client.id + event.emit(ev) diff --git a/rpc_renderer/types/rpc_event.gd b/rpc_renderer/types/rpc_event.gd index 7d5a1e2..062667c 100644 --- a/rpc_renderer/types/rpc_event.gd +++ b/rpc_renderer/types/rpc_event.gd @@ -13,6 +13,8 @@ var scope: String var data: Variant var condition: Dictionary +var to_peer: int = 0 + func _init( p_type: String, diff --git a/rpc_renderer/types/rpc_request.gd b/rpc_renderer/types/rpc_request.gd index de4ed7c..f5a272e 100644 --- a/rpc_renderer/types/rpc_request.gd +++ b/rpc_renderer/types/rpc_request.gd @@ -18,8 +18,10 @@ var keep: Dictionary ## Which peer initiated this request. var peer_id: int +var client: RPCRenderer.Client -static func from_dict(d: Dictionary, p_peer_id: int) -> RPCRequest: + +static func from_dict(d: Dictionary, p_client: RPCRenderer.Client) -> RPCRequest: if not d.has("request"): return null @@ -28,7 +30,7 @@ static func from_dict(d: Dictionary, p_peer_id: int) -> RPCRequest: r.scope = d.request.scope r.operation = RPCOperation.from_dict(d.request.operation) r.keep = d.request.get("keep", {}) - - r.peer_id = p_peer_id + r.client = p_client + r.peer_id = p_client.id return r