From 30fee9ac8581a7363b663d86773454a394c5cbd1 Mon Sep 17 00:00:00 2001 From: Rasmus Karlsson Date: Sat, 18 Nov 2023 11:20:09 +0100 Subject: [PATCH] Add eventsub class that uses the eventsub library this isn't a submodule yet xd --- CMakeLists.txt | 4 +- src/CMakeLists.txt | 13 + .../commands/builtin/chatterino/Debugging.cpp | 71 +--- src/providers/twitch/EventSub.cpp | 349 ++++++++++++++++++ src/providers/twitch/EventSub.hpp | 17 + 5 files changed, 385 insertions(+), 69 deletions(-) create mode 100644 src/providers/twitch/EventSub.cpp create mode 100644 src/providers/twitch/EventSub.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index f6b8281e1..78a943c1a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -123,7 +123,7 @@ endif () find_package(Sanitizers QUIET) # Find boost on the system -find_package(Boost REQUIRED OPTIONAL_COMPONENTS headers) +find_package(Boost REQUIRED OPTIONAL_COMPONENTS headers json) # Find OpenSSL on the system find_package(OpenSSL REQUIRED) @@ -213,6 +213,8 @@ if (BUILD_WITH_CRASHPAD) add_subdirectory("${CMAKE_SOURCE_DIR}/tools/crash-handler") endif() +add_subdirectory("${CMAKE_SOURCE_DIR}/../beast-websocket-client" eventsub EXCLUDE_FROM_ALL) + # Used to provide a date of build in the About page (for nightly builds). Getting the actual time of # compilation in CMake is a more involved, as documented in https://stackoverflow.com/q/24292898. # For CI runs, however, the date of build file generation should be consistent with the date of diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index c22beef6c..910c65dae 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -372,6 +372,8 @@ set(SOURCE_FILES providers/twitch/ChannelPointReward.cpp providers/twitch/ChannelPointReward.hpp + providers/twitch/EventSub.cpp + providers/twitch/EventSub.hpp providers/twitch/IrcMessageHandler.cpp providers/twitch/IrcMessageHandler.hpp providers/twitch/PubSubActions.cpp @@ -765,6 +767,12 @@ target_link_libraries(${LIBRARY_PROJECT} LRUCache MagicEnum ) + +target_link_libraries(${LIBRARY_PROJECT} + PUBLIC + eventsub + ) + if (CHATTERINO_PLUGINS) target_link_libraries(${LIBRARY_PROJECT} PUBLIC lua) endif() @@ -814,6 +822,11 @@ if (BUILD_APP) target_link_libraries(${EXECUTABLE_PROJECT} PUBLIC ${LIBRARY_PROJECT}) + target_link_libraries(${EXECUTABLE_PROJECT} + PUBLIC + eventsub + ) + set_target_directory_hierarchy(${EXECUTABLE_PROJECT}) if (WIN32) diff --git a/src/controllers/commands/builtin/chatterino/Debugging.cpp b/src/controllers/commands/builtin/chatterino/Debugging.cpp index 0505cdf8c..0af5fc36b 100644 --- a/src/controllers/commands/builtin/chatterino/Debugging.cpp +++ b/src/controllers/commands/builtin/chatterino/Debugging.cpp @@ -10,6 +10,7 @@ #include "messages/MessageBuilder.hpp" #include "messages/MessageElement.hpp" #include "providers/twitch/api/Helix.hpp" +#include "providers/twitch/EventSub.hpp" #include "providers/twitch/TwitchAccount.hpp" #include "providers/twitch/TwitchChannel.hpp" #include "providers/twitch/TwitchIrcServer.hpp" @@ -142,75 +143,9 @@ QString forceImageUnload(const CommandContext &ctx) QString debugEventSub(const CommandContext &ctx) { - if (ctx.words.size() < 2) - { - ctx.channel->addMessage(makeSystemMessage("missing session ID")); - return {}; - } + static EventSub eventSub; - const auto &sessionID = ctx.words[1]; - - const auto currentUser = getApp()->accounts->twitch.getCurrent(); - - if (currentUser->isAnon()) - { - ctx.channel->addMessage( - makeSystemMessage("you must be logged in to use this command")); - return {}; - } - - auto sourceUserID = currentUser->getUserId(); - - getApp()->twitch->forEachChannelAndSpecialChannels( - [sessionID, sourceUserID](const ChannelPtr &channel) { - if (channel->getType() == Channel::Type::Twitch) - { - auto *twitchChannel = - dynamic_cast(channel.get()); - - auto roomID = twitchChannel->roomId(); - - if (channel->isBroadcaster()) - { - QJsonObject condition; - condition.insert("broadcaster_user_id", roomID); - - getHelix()->createEventSubSubscription( - "channel.ban", "1", sessionID, condition, - [roomID](const auto &response) { - qDebug() << "Successfully subscribed to " - "channel.ban in" - << roomID << ":" << response; - }, - [roomID](auto error, const auto &message) { - (void)error; - qDebug() << "Failed subscription to channel.ban in" - << roomID << ":" << message; - }); - } - - { - QJsonObject condition; - condition.insert("broadcaster_user_id", roomID); - condition.insert("user_id", sourceUserID); - - getHelix()->createEventSubSubscription( - "channel.chat.notification", "beta", sessionID, - condition, - [roomID](const auto &response) { - qDebug() << "Successfully subscribed to " - "channel.chat.notification in " - << roomID << ":" << response; - }, - [roomID](auto error, const auto &message) { - (void)error; - qDebug() << "Failed subscription to " - "channel.chat.notification in" - << roomID << ":" << message; - }); - } - } - }); + eventSub.start(); return ""; } diff --git a/src/providers/twitch/EventSub.cpp b/src/providers/twitch/EventSub.cpp new file mode 100644 index 000000000..d971c255d --- /dev/null +++ b/src/providers/twitch/EventSub.cpp @@ -0,0 +1,349 @@ +#include "providers/twitch/EventSub.hpp" + +#include "Application.hpp" +#include "controllers/accounts/AccountController.hpp" +#include "eventsub/listener.hpp" +#include "eventsub/payloads/channel-ban-v1.hpp" +#include "eventsub/payloads/session-welcome.hpp" +#include "eventsub/session.hpp" +#include "providers/twitch/api/Helix.hpp" +#include "providers/twitch/TwitchAccount.hpp" +#include "providers/twitch/TwitchChannel.hpp" +#include "providers/twitch/TwitchIrcServer.hpp" +#include "util/PostToThread.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace beast = boost::beast; // from +namespace http = beast::http; // from +namespace websocket = beast::websocket; // from +namespace ssl = boost::asio::ssl; // from + +using boost::asio::awaitable; +using boost::asio::co_spawn; +using boost::asio::detached; +using boost::asio::use_awaitable; +using boost::asio::ip::tcp; +// using namespace boost::asio::experimental::awaitable_operators; +using namespace std::literals::chrono_literals; + +using WebSocketStream = websocket::stream>; + +namespace chatterino { + +using namespace eventsub; + +// Report a failure +void fail(beast::error_code ec, char const *what) +{ + std::cerr << what << ": " << ec.message() << "\n"; +} + +awaitable session(WebSocketStream &ws, std::unique_ptr listener) +{ + // start reader + std::cout << "start reader\n"; + co_await (sessionReader(ws, std::move(listener))); + // co_spawn(ws.get_executor(), sessionReader(ws), detached); + std::cout << "reader stopped\n"; + + co_return; +} + +class MyListener final : public Listener +{ +public: + void onSessionWelcome(messages::Metadata metadata, + payload::session_welcome::Payload payload) override + { + (void)metadata; + std::cout << "ON session welcome " << payload.id << " XD\n"; + + auto sessionID = QString::fromStdString(payload.id); + + const auto currentUser = getApp()->accounts->twitch.getCurrent(); + + if (currentUser->isAnon()) + { + return; + } + + auto sourceUserID = currentUser->getUserId(); + + getApp()->twitch->forEachChannelAndSpecialChannels( + [sessionID, sourceUserID](const ChannelPtr &channel) { + if (channel->getType() == Channel::Type::Twitch) + { + auto *twitchChannel = + dynamic_cast(channel.get()); + + auto roomID = twitchChannel->roomId(); + + if (channel->isBroadcaster()) + { + QJsonObject condition; + condition.insert("broadcaster_user_id", roomID); + + getHelix()->createEventSubSubscription( + "channel.ban", "1", sessionID, condition, + [roomID](const auto &response) { + qDebug() << "Successfully subscribed to " + "channel.ban in" + << roomID << ":" << response; + }, + [roomID](auto error, const auto &message) { + (void)error; + qDebug() + << "Failed subscription to channel.ban in" + << roomID << ":" << message; + }); + } + + { + QJsonObject condition; + condition.insert("broadcaster_user_id", roomID); + condition.insert("user_id", sourceUserID); + + getHelix()->createEventSubSubscription( + "channel.chat.notification", "beta", sessionID, + condition, + [roomID](const auto &response) { + qDebug() << "Successfully subscribed to " + "channel.chat.notification in " + << roomID << ":" << response; + }, + [roomID](auto error, const auto &message) { + (void)error; + qDebug() << "Failed subscription to " + "channel.chat.notification in" + << roomID << ":" << message; + }); + } + } + }); + } + + void onNotification(messages::Metadata metadata, + const boost::json::value &jv) override + { + (void)metadata; + std::cout << "on notification: " << jv << '\n'; + } + + void onChannelBan(messages::Metadata metadata, + payload::channel_ban::v1::Payload payload) override + { + (void)metadata; + std::cout << "Channel ban occured in " + << payload.event.broadcasterUserLogin << "'s channel:" + << " isPermanent=" << payload.event.isPermanent + << " reason=" << payload.event.reason + << " userLogin=" << payload.event.userLogin + << " moderatorLogin=" << payload.event.moderatorUserLogin + << '\n'; + } + + void onStreamOnline(messages::Metadata metadata, + payload::stream_online::v1::Payload payload) override + { + (void)metadata; + (void)payload; + std::cout << "ON STREAM ONLINE XD\n"; + } + + void onStreamOffline(messages::Metadata metadata, + payload::stream_offline::v1::Payload payload) override + { + (void)metadata; + (void)payload; + std::cout << "ON STREAM OFFLINE XD\n"; + } + + void onChannelChatNotification( + messages::Metadata metadata, + payload::channel_chat_notification::beta::Payload payload) override + { + (void)metadata; + (void)payload; + std::cout << "Received channel.chat.notification beta\n"; + } + + void onChannelUpdate(messages::Metadata metadata, + payload::channel_update::v1::Payload payload) override + { + (void)metadata; + (void)payload; + std::cout << "Channel update event!\n"; + } + + // Add your new subscription types above this line +}; + +awaitable connectToClient(boost::asio::io_context &ioContext, + const std::string host, const std::string port, + const std::string path, + boost::asio::ssl::context &sslContext) +{ + auto tcpResolver = tcp::resolver(ioContext); + + for (;;) + { + // TODO: wait on (AND INCREMENT) backoff timer + + boost::system::error_code resolveError; + auto target = co_await tcpResolver.async_resolve( + host, port, + boost::asio::redirect_error(boost::asio::use_awaitable, + resolveError)); + + std::cout << "Connecting to " << host << ":" << port << "\n"; + if (resolveError) + { + fail(resolveError, "resolve"); + continue; + } + + WebSocketStream ws(ioContext, sslContext); + + // Make the connection on the IP address we get from a lookup + // TODO: Check connectError + boost::system::error_code connectError; + auto endpoint = co_await beast::get_lowest_layer(ws).async_connect( + target, boost::asio::redirect_error(boost::asio::use_awaitable, + connectError)); + + std::string hostHeader{host}; + + // Set SNI Hostname (many hosts need this to handshake successfully) + if (!SSL_set_tlsext_host_name(ws.next_layer().native_handle(), + host.data())) + { + auto ec = beast::error_code(static_cast(::ERR_get_error()), + boost::asio::error::get_ssl_category()); + fail(ec, "connect"); + continue; + } + + // Update the host string. This will provide the value of the + // Host HTTP header during the WebSocket handshake. + // See https://tools.ietf.org/html/rfc7230#section-5.4 + hostHeader += ':' + std::to_string(endpoint.port()); + + // Set a timeout on the operation + beast::get_lowest_layer(ws).expires_after(std::chrono::seconds(30)); + + // Set a decorator to change the User-Agent of the handshake + ws.set_option( + websocket::stream_base::decorator([](websocket::request_type &req) { + req.set(http::field::user_agent, + std::string(BOOST_BEAST_VERSION_STRING) + + " websocket-client-coro"); + })); + + // Perform the SSL handshake + boost::system::error_code sslHandshakeError; + co_await ws.next_layer().async_handshake( + ssl::stream_base::client, + boost::asio::redirect_error(boost::asio::use_awaitable, + sslHandshakeError)); + if (sslHandshakeError) + { + fail(sslHandshakeError, "ssl_handshake"); + continue; + } + + // Turn off the timeout on the tcp_stream, because + // the websocket stream has its own timeout system. + beast::get_lowest_layer(ws).expires_never(); + + // Set suggested timeout settings for the websocket + ws.set_option(websocket::stream_base::timeout::suggested( + beast::role_type::client)); + + // Perform the websocket handshake + boost::system::error_code wsHandshakeError; + co_await ws.async_handshake( + hostHeader, path, + boost::asio::redirect_error(boost::asio::use_awaitable, + wsHandshakeError)); + if (wsHandshakeError) + { + fail(wsHandshakeError, "handshake"); + continue; + } + + std::unique_ptr listener = std::make_unique(); + co_await session(ws, std::move(listener)); + + // Close the WebSocket connection + boost::system::error_code closeError; + co_await ws.async_close(websocket::close_code::normal, + boost::asio::redirect_error( + boost::asio::use_awaitable, closeError)); + if (closeError) + { + fail(closeError, "close"); + } + else + { + // If we get here then the connection is closed gracefully + std::cout << "Closed connection gracefully\n"; + } + + // TODO: reset backoff timer + } +} + +void EventSub::start() +{ + // for use with twitch CLI: twitch event websocket start-server --ssl --port 3012 + // const auto *const host = "localhost"; + // const auto *const port = "3012"; + // const auto *const path = "/ws"; + + // for use with real Twitch eventsub + std::string host{"eventsub.wss.twitch.tv"}; + std::string port("443"); + std::string path("/ws"); + + try + { + this->mainThread = std::make_unique([=] { + boost::asio::io_context ctx(1); + + boost::asio::ssl::context sslContext{ + boost::asio::ssl::context::tlsv12_client}; + + // TODO: Load certificates into SSL context + + co_spawn(ctx, connectToClient(ctx, host, port, path, sslContext), + detached); + ctx.run(); + }); + } + catch (std::exception &e) + { + std::cerr << "Exception: " << e.what() << "\n"; + } +} + +} // namespace chatterino diff --git a/src/providers/twitch/EventSub.hpp b/src/providers/twitch/EventSub.hpp new file mode 100644 index 000000000..6fde6c54b --- /dev/null +++ b/src/providers/twitch/EventSub.hpp @@ -0,0 +1,17 @@ +#pragma once + +#include +#include + +namespace chatterino { + +class EventSub +{ +public: + void start(); + +private: + std::unique_ptr mainThread; +}; + +} // namespace chatterino