diff --git a/src/providers/twitch/EventSub.cpp b/src/providers/twitch/EventSub.cpp index 64d116d77..23c43ddd2 100644 --- a/src/providers/twitch/EventSub.cpp +++ b/src/providers/twitch/EventSub.cpp @@ -1,11 +1,9 @@ #include "providers/twitch/EventSub.hpp" #include "Application.hpp" +#include "common/QLogging.hpp" +#include "common/Version.hpp" #include "controllers/accounts/AccountController.hpp" -#include "eventsub/listener.hpp" -#include "eventsub/payloads/channel-ban-v1.hpp" -#include "eventsub/payloads/session-welcome.hpp" -#include "eventsub/session.hpp" #include "messages/Message.hpp" #include "messages/MessageBuilder.hpp" #include "providers/twitch/api/Helix.hpp" @@ -15,71 +13,35 @@ #include "providers/twitch/TwitchIrcServer.hpp" #include "util/PostToThread.hpp" -#include -#include -#include -#include #include -#include -#include -#include -#include -#include -#include -#include -#include +#include #include +#include +#include -#include #include -#include #include -#include -namespace beast = boost::beast; // from -namespace http = beast::http; // from -namespace websocket = beast::websocket; // from -namespace ssl = boost::asio::ssl; // from - -using boost::asio::awaitable; -using boost::asio::co_spawn; -using boost::asio::detached; -using boost::asio::use_awaitable; -using boost::asio::ip::tcp; -// using namespace boost::asio::experimental::awaitable_operators; using namespace std::literals::chrono_literals; -using WebSocketStream = websocket::stream>; +namespace { + +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +const auto &LOG = chatterinoTwitchEventSub; + +} // namespace namespace chatterino { -using namespace eventsub; - -// Report a failure -void fail(beast::error_code ec, char const *what) -{ - std::cerr << what << ": " << ec.message() << "\n"; -} - -awaitable session(WebSocketStream &ws, std::unique_ptr listener) -{ - // start reader - std::cout << "start reader\n"; - co_await (sessionReader(ws, std::move(listener))); - // co_spawn(ws.get_executor(), sessionReader(ws), detached); - std::cout << "reader stopped\n"; - - co_return; -} - -class MyListener final : public Listener +class MyListener final : public eventsub::Listener { public: - void onSessionWelcome(messages::Metadata metadata, - payload::session_welcome::Payload payload) override + void onSessionWelcome( + eventsub::messages::Metadata metadata, + eventsub::payload::session_welcome::Payload payload) override { (void)metadata; - std::cout << "ON session welcome " << payload.id << " XD\n"; + qCDebug(LOG) << "On session welcome:" << payload.id.c_str(); auto sessionID = QString::fromStdString(payload.id); @@ -145,30 +107,23 @@ public: }); } - void onNotification(messages::Metadata metadata, + void onNotification(eventsub::messages::Metadata metadata, const boost::json::value &jv) override { (void)metadata; - std::cout << "on notification: " << jv << '\n'; + auto jsonString = boost::json::serialize(jv); + qCDebug(LOG) << "on notification: " << jsonString.c_str(); } - void onChannelBan(messages::Metadata metadata, - payload::channel_ban::v1::Payload payload) override + void onChannelBan( + eventsub::messages::Metadata metadata, + eventsub::payload::channel_ban::v1::Payload payload) override { (void)metadata; - std::cout << "Channel ban occured in " - << payload.event.broadcasterUserLogin << "'s channel:" - << " isPermanent=" << payload.event.isPermanent - << " reason=" << payload.event.reason - << " userLogin=" << payload.event.userLogin - << " moderatorLogin=" << payload.event.moderatorUserLogin - << " bannedAt=" << payload.event.bannedAt << '\n'; auto roomID = QString::fromStdString(payload.event.broadcasterUserID); - BanAction action{ - // - }; + BanAction action{}; action.timestamp = std::chrono::steady_clock::now(); action.roomID = roomID; @@ -196,8 +151,6 @@ public: timeoutDuration) .count(); action.duration = timeoutDurationInSeconds; - qDebug() << "TIMEOUT DURATION IN SECONDS: " - << timeoutDurationInSeconds; } auto chan = getApp()->twitch->getChannelOrEmptyByID(roomID); @@ -209,172 +162,70 @@ public: }); } - void onStreamOnline(messages::Metadata metadata, - payload::stream_online::v1::Payload payload) override + void onStreamOnline( + eventsub::messages::Metadata metadata, + eventsub::payload::stream_online::v1::Payload payload) override { (void)metadata; - (void)payload; - std::cout << "ON STREAM ONLINE XD\n"; + qCDebug(LOG) << "On stream online event for channel" + << payload.event.broadcasterUserLogin.c_str(); } - void onStreamOffline(messages::Metadata metadata, - payload::stream_offline::v1::Payload payload) override + void onStreamOffline( + eventsub::messages::Metadata metadata, + eventsub::payload::stream_offline::v1::Payload payload) override { (void)metadata; - (void)payload; - std::cout << "ON STREAM OFFLINE XD\n"; + qCDebug(LOG) << "On stream offline event for channel" + << payload.event.broadcasterUserLogin.c_str(); } void onChannelChatNotification( - messages::Metadata metadata, - payload::channel_chat_notification::v1::Payload payload) override + eventsub::messages::Metadata metadata, + eventsub::payload::channel_chat_notification::v1::Payload payload) + override { (void)metadata; - (void)payload; - std::cout << "Received channel.chat.notification v1\n"; + qCDebug(LOG) << "On channel chat notification for" + << payload.event.broadcasterUserLogin.c_str(); } - void onChannelUpdate(messages::Metadata metadata, - payload::channel_update::v1::Payload payload) override + void onChannelUpdate( + eventsub::messages::Metadata metadata, + eventsub::payload::channel_update::v1::Payload payload) override { (void)metadata; - (void)payload; - std::cout << "Channel update event!\n"; + qCDebug(LOG) << "On channel update for" + << payload.event.broadcasterUserLogin.c_str(); } - - // Add your new subscription types above this line }; -awaitable connectToClient(boost::asio::io_context &ioContext, - const std::string host, const std::string port, - const std::string path, - boost::asio::ssl::context &sslContext) -{ - auto tcpResolver = tcp::resolver(ioContext); - - for (;;) - { - // TODO: wait on (AND INCREMENT) backoff timer - - boost::system::error_code resolveError; - auto target = co_await tcpResolver.async_resolve( - host, port, - boost::asio::redirect_error(boost::asio::use_awaitable, - resolveError)); - - std::cout << "Connecting to " << host << ":" << port << "\n"; - if (resolveError) - { - fail(resolveError, "resolve"); - continue; - } - - WebSocketStream ws(ioContext, sslContext); - - // Make the connection on the IP address we get from a lookup - // TODO: Check connectError - boost::system::error_code connectError; - auto endpoint = co_await beast::get_lowest_layer(ws).async_connect( - target, boost::asio::redirect_error(boost::asio::use_awaitable, - connectError)); - - std::string hostHeader{host}; - - // Set SNI Hostname (many hosts need this to handshake successfully) - if (!SSL_set_tlsext_host_name(ws.next_layer().native_handle(), - host.data())) - { - auto ec = beast::error_code(static_cast(::ERR_get_error()), - boost::asio::error::get_ssl_category()); - fail(ec, "connect"); - continue; - } - - // Update the host string. This will provide the value of the - // Host HTTP header during the WebSocket handshake. - // See https://tools.ietf.org/html/rfc7230#section-5.4 - hostHeader += ':' + std::to_string(endpoint.port()); - - // Set a timeout on the operation - beast::get_lowest_layer(ws).expires_after(std::chrono::seconds(30)); - - // Set a decorator to change the User-Agent of the handshake - ws.set_option( - websocket::stream_base::decorator([](websocket::request_type &req) { - req.set(http::field::user_agent, - std::string(BOOST_BEAST_VERSION_STRING) + - " websocket-client-coro"); - })); - - // Perform the SSL handshake - boost::system::error_code sslHandshakeError; - co_await ws.next_layer().async_handshake( - ssl::stream_base::client, - boost::asio::redirect_error(boost::asio::use_awaitable, - sslHandshakeError)); - if (sslHandshakeError) - { - fail(sslHandshakeError, "ssl_handshake"); - continue; - } - - // Turn off the timeout on the tcp_stream, because - // the websocket stream has its own timeout system. - beast::get_lowest_layer(ws).expires_never(); - - // Set suggested timeout settings for the websocket - ws.set_option(websocket::stream_base::timeout::suggested( - beast::role_type::client)); - - // Perform the websocket handshake - boost::system::error_code wsHandshakeError; - co_await ws.async_handshake( - hostHeader, path, - boost::asio::redirect_error(boost::asio::use_awaitable, - wsHandshakeError)); - if (wsHandshakeError) - { - fail(wsHandshakeError, "handshake"); - continue; - } - - std::unique_ptr listener = std::make_unique(); - co_await session(ws, std::move(listener)); - - // Close the WebSocket connection - boost::system::error_code closeError; - co_await ws.async_close(websocket::close_code::normal, - boost::asio::redirect_error( - boost::asio::use_awaitable, closeError)); - if (closeError) - { - fail(closeError, "close"); - } - else - { - // If we get here then the connection is closed gracefully - std::cout << "Closed connection gracefully\n"; - } - - // TODO: reset backoff timer - } -} - void EventSub::start() { + const auto userAgent = QStringLiteral("chatterino/%1 (%2)") + .arg(Version::instance().version(), + Version::instance().commitHash()) + .toUtf8() + .toStdString(); + // for use with twitch CLI: twitch event websocket start-server --ssl --port 3012 - // const auto *const host = "localhost"; - // const auto *const port = "3012"; - // const auto *const path = "/ws"; + // std::string host{"localhost"}; + // std::string port{"3012"}; + // std::string path{"/ws"}; + + // for use with websocat: websocat -s 8080 --pkcs12-der certificate.p12 + // std::string host{"localhost"}; + // std::string port{"8080"}; + // std::string path; // for use with real Twitch eventsub std::string host{"eventsub.wss.twitch.tv"}; - std::string port("443"); - std::string path("/ws"); + std::string port{"443"}; + std::string path{"/ws"}; - try - { - this->mainThread = std::make_unique([=] { + this->mainThread = std::make_unique([=] { + try + { boost::asio::io_context ctx(1); boost::asio::ssl::context sslContext{ @@ -382,15 +233,17 @@ void EventSub::start() // TODO: Load certificates into SSL context - co_spawn(ctx, connectToClient(ctx, host, port, path, sslContext), - detached); + std::make_shared(ctx, sslContext, + std::make_unique()) + ->run(host, port, path, userAgent); + ctx.run(); - }); - } - catch (std::exception &e) - { - std::cerr << "Exception: " << e.what() << "\n"; - } + } + catch (std::exception &e) + { + qCWarning(LOG) << "Error in EventSub run thread" << e.what(); + } + }); } } // namespace chatterino