diff --git a/.clang-tidy b/.clang-tidy index 7ec4b791f..b9cd999da 100644 --- a/.clang-tidy +++ b/.clang-tidy @@ -42,6 +42,8 @@ CheckOptions: value: .* - key: readability-identifier-naming.PrivateMemberSuffix value: _ + - key: readability-identifier-naming.ProtectedMemberSuffix + value: _ - key: readability-identifier-naming.UnionCase value: CamelCase - key: readability-identifier-naming.GlobalVariableCase diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 7da3a0f7b..f0e388640 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -6,7 +6,7 @@ on: workflow_dispatch: env: - TWITCH_PUBSUB_SERVER_IMAGE: ghcr.io/chatterino/twitch-pubsub-server-test:v1.0.3 + TWITCH_PUBSUB_SERVER_IMAGE: ghcr.io/chatterino/twitch-pubsub-server-test:v1.0.4 concurrency: group: test-${{ github.ref }} diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index ea627e44f..8c5450d93 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -214,6 +214,10 @@ set(SOURCE_FILES providers/irc/IrcServer.cpp providers/irc/IrcServer.hpp + providers/liveupdates/BasicPubSubClient.hpp + providers/liveupdates/BasicPubSubManager.hpp + providers/liveupdates/BasicPubSubWebsocket.hpp + providers/seventv/SeventvBadges.cpp providers/seventv/SeventvBadges.hpp providers/seventv/SeventvEmotes.cpp diff --git a/src/common/QLogging.cpp b/src/common/QLogging.cpp index ede6a45bc..525f69b6c 100644 --- a/src/common/QLogging.cpp +++ b/src/common/QLogging.cpp @@ -22,6 +22,8 @@ Q_LOGGING_CATEGORY(chatterinoHTTP, "chatterino.http", logThreshold); Q_LOGGING_CATEGORY(chatterinoImage, "chatterino.image", logThreshold); Q_LOGGING_CATEGORY(chatterinoIrc, "chatterino.irc", logThreshold); Q_LOGGING_CATEGORY(chatterinoIvr, "chatterino.ivr", logThreshold); +Q_LOGGING_CATEGORY(chatterinoLiveupdates, "chatterino.liveupdates", + logThreshold); Q_LOGGING_CATEGORY(chatterinoMain, "chatterino.main", logThreshold); Q_LOGGING_CATEGORY(chatterinoMessage, "chatterino.message", logThreshold); Q_LOGGING_CATEGORY(chatterinoNativeMessage, "chatterino.nativemessage", diff --git a/src/common/QLogging.hpp b/src/common/QLogging.hpp index 4f27d0ea9..0739fbaec 100644 --- a/src/common/QLogging.hpp +++ b/src/common/QLogging.hpp @@ -18,6 +18,7 @@ Q_DECLARE_LOGGING_CATEGORY(chatterinoHTTP); Q_DECLARE_LOGGING_CATEGORY(chatterinoImage); Q_DECLARE_LOGGING_CATEGORY(chatterinoIrc); Q_DECLARE_LOGGING_CATEGORY(chatterinoIvr); +Q_DECLARE_LOGGING_CATEGORY(chatterinoLiveupdates); Q_DECLARE_LOGGING_CATEGORY(chatterinoMain); Q_DECLARE_LOGGING_CATEGORY(chatterinoMessage); Q_DECLARE_LOGGING_CATEGORY(chatterinoNativeMessage); diff --git a/src/providers/liveupdates/BasicPubSubClient.hpp b/src/providers/liveupdates/BasicPubSubClient.hpp new file mode 100644 index 000000000..d23ca6444 --- /dev/null +++ b/src/providers/liveupdates/BasicPubSubClient.hpp @@ -0,0 +1,177 @@ +#pragma once + +#include +#include +#include +#include + +#include "common/QLogging.hpp" +#include "providers/liveupdates/BasicPubSubWebsocket.hpp" +#include "singletons/Settings.hpp" +#include "util/DebugCount.hpp" +#include "util/Helpers.hpp" + +namespace chatterino { + +/** + * This class manages a single connection + * that has at most #maxSubscriptions subscriptions. + * + * You can safely overload the #onConnectionEstablished method + * and e.g. add additional heartbeat logic. + * + * You can use shared_from_this to get a shared_ptr of this client. + * + * @tparam Subscription see BasicPubSubManager + */ +template +class BasicPubSubClient + : public std::enable_shared_from_this> +{ +public: + // The maximum amount of subscriptions this connections can handle + const size_t maxSubscriptions; + + BasicPubSubClient(liveupdates::WebsocketClient &websocketClient, + liveupdates::WebsocketHandle handle, + size_t maxSubscriptions = 100) + : maxSubscriptions(maxSubscriptions) + , websocketClient_(websocketClient) + , handle_(std::move(handle)) + { + } + + virtual ~BasicPubSubClient() = default; + + BasicPubSubClient(const BasicPubSubClient &) = delete; + BasicPubSubClient(const BasicPubSubClient &&) = delete; + BasicPubSubClient &operator=(const BasicPubSubClient &) = delete; + BasicPubSubClient &operator=(const BasicPubSubClient &&) = delete; + +protected: + virtual void onConnectionEstablished() + { + } + + bool send(const char *payload) + { + liveupdates::WebsocketErrorCode ec; + this->websocketClient_.send(this->handle_, payload, + websocketpp::frame::opcode::text, ec); + + if (ec) + { + qCDebug(chatterinoLiveupdates) << "Error sending message" << payload + << ":" << ec.message().c_str(); + return false; + } + + return true; + } + + /** + * @return true if this client subscribed to this subscription + * and the current subscriptions don't exceed the maximum + * amount. + * It won't subscribe twice to the same subscription. + * Don't use this in place of subscription management + * in the BasicPubSubManager. + */ + bool subscribe(const Subscription &subscription) + { + if (this->subscriptions_.size() >= this->maxSubscriptions) + { + return false; + } + + if (!this->subscriptions_.emplace(subscription).second) + { + qCWarning(chatterinoLiveupdates) + << "Tried subscribing to" << subscription + << "but we're already subscribed!"; + return true; // true because the subscription already exists + } + + qCDebug(chatterinoLiveupdates) << "Subscribing to" << subscription; + DebugCount::increase("LiveUpdates subscriptions"); + + QByteArray encoded = subscription.encodeSubscribe(); + this->send(encoded); + + return true; + } + + /** + * @return true if this client previously subscribed + * and now unsubscribed from this subscription. + */ + bool unsubscribe(const Subscription &subscription) + { + if (this->subscriptions_.erase(subscription) <= 0) + { + return false; + } + + qCDebug(chatterinoLiveupdates) << "Unsubscribing from" << subscription; + DebugCount::decrease("LiveUpdates subscriptions"); + + QByteArray encoded = subscription.encodeUnsubscribe(); + this->send(encoded); + + return true; + } + + bool isStarted() const + { + return this->started_.load(std::memory_order_acquire); + } + + liveupdates::WebsocketClient &websocketClient_; + +private: + void start() + { + assert(!this->isStarted()); + this->started_.store(true, std::memory_order_release); + this->onConnectionEstablished(); + } + + void stop() + { + assert(this->isStarted()); + this->started_.store(false, std::memory_order_release); + } + + void close(const std::string &reason, + websocketpp::close::status::value code = + websocketpp::close::status::normal) + { + liveupdates::WebsocketErrorCode ec; + + auto conn = this->websocketClient_.get_con_from_hdl(this->handle_, ec); + if (ec) + { + qCDebug(chatterinoLiveupdates) + << "Error getting connection:" << ec.message().c_str(); + return; + } + + conn->close(code, reason, ec); + if (ec) + { + qCDebug(chatterinoLiveupdates) + << "Error closing:" << ec.message().c_str(); + return; + } + } + + liveupdates::WebsocketHandle handle_; + std::unordered_set subscriptions_; + + std::atomic started_{false}; + + template + friend class BasicPubSubManager; +}; + +} // namespace chatterino diff --git a/src/providers/liveupdates/BasicPubSubManager.hpp b/src/providers/liveupdates/BasicPubSubManager.hpp new file mode 100644 index 000000000..ced55070d --- /dev/null +++ b/src/providers/liveupdates/BasicPubSubManager.hpp @@ -0,0 +1,370 @@ +#pragma once + +#include "common/QLogging.hpp" +#include "providers/liveupdates/BasicPubSubClient.hpp" +#include "providers/liveupdates/BasicPubSubWebsocket.hpp" +#include "providers/twitch/PubSubHelpers.hpp" +#include "util/DebugCount.hpp" +#include "util/ExponentialBackoff.hpp" + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace chatterino { + +/** + * This class is the basis for connecting and interacting with + * simple PubSub servers over the Websocket protocol. + * It acts as a pool for connections (see BasicPubSubClient). + * + * You can customize the clients, by creating your custom + * client in ::createClient. + * + * You **must** implement #onMessage. The method gets called for every + * received message on every connection. + * If you want to get the connection this message was received on, + * use #findClient. + * + * You must expose your own subscribe and unsubscribe methods + * (e.g. [un-]subscribeTopic). + * This manager does not keep track of the subscriptions. + * + * @tparam Subscription + * The subscription has the following requirements: + * It must have the methods QByteArray encodeSubscribe(), + * and QByteArray encodeUnsubscribe(). + * It must have an overload for + * QDebug &operator<< (see tests/src/BasicPubSub.cpp), + * a specialization for std::hash, + * and and overload for operator== and operator!=. + * + * @see BasicPubSubClient + */ +template +class BasicPubSubManager +{ +public: + BasicPubSubManager(QString host) + : host_(std::move(host)) + { + this->websocketClient_.set_access_channels( + websocketpp::log::alevel::all); + this->websocketClient_.clear_access_channels( + websocketpp::log::alevel::frame_payload | + websocketpp::log::alevel::frame_header); + + this->websocketClient_.init_asio(); + + // SSL Handshake + this->websocketClient_.set_tls_init_handler([this](auto hdl) { + return this->onTLSInit(hdl); + }); + + this->websocketClient_.set_message_handler([this](auto hdl, auto msg) { + this->onMessage(hdl, msg); + }); + this->websocketClient_.set_open_handler([this](auto hdl) { + this->onConnectionOpen(hdl); + }); + this->websocketClient_.set_close_handler([this](auto hdl) { + this->onConnectionClose(hdl); + }); + this->websocketClient_.set_fail_handler([this](auto hdl) { + this->onConnectionFail(hdl); + }); + } + + virtual ~BasicPubSubManager() = default; + + BasicPubSubManager(const BasicPubSubManager &) = delete; + BasicPubSubManager(const BasicPubSubManager &&) = delete; + BasicPubSubManager &operator=(const BasicPubSubManager &) = delete; + BasicPubSubManager &operator=(const BasicPubSubManager &&) = delete; + + /** This is only used for testing. */ + struct { + std::atomic connectionsClosed{0}; + std::atomic connectionsOpened{0}; + std::atomic connectionsFailed{0}; + } diag; + + void start() + { + this->work_ = std::make_shared( + this->websocketClient_.get_io_service()); + this->mainThread_.reset(new std::thread([this] { + runThread(); + })); + } + + void stop() + { + this->stopping_ = true; + + for (const auto &client : this->clients_) + { + client.second->close("Shutting down"); + } + + this->work_.reset(); + + if (this->mainThread_->joinable()) + { + this->mainThread_->join(); + } + + assert(this->clients_.empty()); + } + +protected: + using WebsocketMessagePtr = + websocketpp::config::asio_tls_client::message_type::ptr; + using WebsocketContextPtr = + websocketpp::lib::shared_ptr; + + virtual void onMessage(websocketpp::connection_hdl hdl, + WebsocketMessagePtr msg) = 0; + + virtual std::shared_ptr> createClient( + liveupdates::WebsocketClient &client, websocketpp::connection_hdl hdl) + { + return std::make_shared>(client, hdl); + } + + /** + * @param hdl The handle of the client. + * @return The client managing this connection, empty shared_ptr otherwise. + */ + std::shared_ptr> findClient( + websocketpp::connection_hdl hdl) + { + auto clientIt = this->clients_.find(hdl); + + if (clientIt == this->clients_.end()) + { + return {}; + } + + return clientIt->second; + } + + void unsubscribe(const Subscription &subscription) + { + for (auto &client : this->clients_) + { + if (client.second->unsubscribe(subscription)) + { + return; + } + } + } + + void subscribe(const Subscription &subscription) + { + if (this->trySubscribe(subscription)) + { + return; + } + + this->addClient(); + this->pendingSubscriptions_.emplace_back(subscription); + DebugCount::increase("LiveUpdates subscription backlog"); + } + +private: + void onConnectionOpen(websocketpp::connection_hdl hdl) + { + DebugCount::increase("LiveUpdates connections"); + this->addingClient_ = false; + this->diag.connectionsOpened.fetch_add(1, std::memory_order_acq_rel); + + this->connectBackoff_.reset(); + + auto client = this->createClient(this->websocketClient_, hdl); + + // We separate the starting from the constructor because we will want to use + // shared_from_this + client->start(); + + this->clients_.emplace(hdl, client); + + auto pendingSubsToTake = (std::min)(this->pendingSubscriptions_.size(), + client->maxSubscriptions); + + qCDebug(chatterinoLiveupdates) + << "LiveUpdate connection opened, subscribing to" + << pendingSubsToTake << "subscriptions!"; + + while (pendingSubsToTake > 0 && !this->pendingSubscriptions_.empty()) + { + const auto last = std::move(this->pendingSubscriptions_.back()); + this->pendingSubscriptions_.pop_back(); + if (!client->subscribe(last)) + { + qCDebug(chatterinoLiveupdates) + << "Failed to subscribe to" << last << "on new client."; + // TODO: should we try to add a new client here? + return; + } + DebugCount::decrease("LiveUpdates subscription backlog"); + pendingSubsToTake--; + } + + if (!this->pendingSubscriptions_.empty()) + { + this->addClient(); + } + } + + void onConnectionFail(websocketpp::connection_hdl hdl) + { + DebugCount::increase("LiveUpdates failed connections"); + this->diag.connectionsFailed.fetch_add(1, std::memory_order_acq_rel); + + if (auto conn = this->websocketClient_.get_con_from_hdl(std::move(hdl))) + { + qCDebug(chatterinoLiveupdates) + << "LiveUpdates connection attempt failed (error: " + << conn->get_ec().message().c_str() << ")"; + } + else + { + qCDebug(chatterinoLiveupdates) + << "LiveUpdates connection attempt failed but we can't get the " + "connection from a handle."; + } + this->addingClient_ = false; + if (!this->pendingSubscriptions_.empty()) + { + runAfter(this->websocketClient_.get_io_service(), + this->connectBackoff_.next(), [this](auto /*timer*/) { + this->addClient(); + }); + } + } + + void onConnectionClose(websocketpp::connection_hdl hdl) + { + qCDebug(chatterinoLiveupdates) << "Connection closed"; + DebugCount::decrease("LiveUpdates connections"); + this->diag.connectionsClosed.fetch_add(1, std::memory_order_acq_rel); + + auto clientIt = this->clients_.find(hdl); + + // If this assert goes off, there's something wrong with the connection + // creation/preserving code KKona + assert(clientIt != this->clients_.end()); + + auto client = clientIt->second; + + this->clients_.erase(clientIt); + + client->stop(); + + if (!this->stopping_) + { + for (const auto &sub : client->subscriptions_) + { + this->subscribe(sub); + } + } + } + + WebsocketContextPtr onTLSInit(const websocketpp::connection_hdl & /*hdl*/) + { + WebsocketContextPtr ctx( + new boost::asio::ssl::context(boost::asio::ssl::context::tlsv12)); + + try + { + ctx->set_options(boost::asio::ssl::context::default_workarounds | + boost::asio::ssl::context::no_sslv2 | + boost::asio::ssl::context::single_dh_use); + } + catch (const std::exception &e) + { + qCDebug(chatterinoLiveupdates) + << "Exception caught in OnTLSInit:" << e.what(); + } + + return ctx; + } + + void runThread() + { + qCDebug(chatterinoLiveupdates) << "Start LiveUpdates manager thread"; + this->websocketClient_.run(); + qCDebug(chatterinoLiveupdates) + << "Done with LiveUpdates manager thread"; + } + + void addClient() + { + if (this->addingClient_) + { + return; + } + + qCDebug(chatterinoLiveupdates) << "Adding an additional client"; + + this->addingClient_ = true; + + websocketpp::lib::error_code ec; + auto con = this->websocketClient_.get_connection( + this->host_.toStdString(), ec); + + if (ec) + { + qCDebug(chatterinoLiveupdates) + << "Unable to establish connection:" << ec.message().c_str(); + return; + } + + this->websocketClient_.connect(con); + } + + bool trySubscribe(const Subscription &subscription) + { + for (auto &client : this->clients_) + { + if (client.second->subscribe(subscription)) + { + return true; + } + } + return false; + } + + std::map>, + std::owner_less> + clients_; + + std::vector pendingSubscriptions_; + std::atomic addingClient_{false}; + ExponentialBackoff<5> connectBackoff_{std::chrono::milliseconds(1000)}; + + std::shared_ptr work_{nullptr}; + + liveupdates::WebsocketClient websocketClient_; + std::unique_ptr mainThread_; + + const QString host_; + + bool stopping_{false}; +}; + +} // namespace chatterino diff --git a/src/providers/liveupdates/BasicPubSubWebsocket.hpp b/src/providers/liveupdates/BasicPubSubWebsocket.hpp new file mode 100644 index 000000000..bf30c30ca --- /dev/null +++ b/src/providers/liveupdates/BasicPubSubWebsocket.hpp @@ -0,0 +1,36 @@ +#pragma once + +#include "providers/twitch/ChatterinoWebSocketppLogger.hpp" + +#include +#include +#include +#include + +namespace chatterino { + +struct BasicPubSubConfig : public websocketpp::config::asio_tls_client { + // NOLINTBEGIN(modernize-use-using) + typedef websocketpp::log::chatterinowebsocketpplogger< + concurrency_type, websocketpp::log::elevel> + elog_type; + typedef websocketpp::log::chatterinowebsocketpplogger< + concurrency_type, websocketpp::log::alevel> + alog_type; + + struct PerMessageDeflateConfig { + }; + + typedef websocketpp::extensions::permessage_deflate::disabled< + PerMessageDeflateConfig> + permessage_deflate_type; + // NOLINTEND(modernize-use-using) +}; + +namespace liveupdates { + using WebsocketClient = websocketpp::client; + using WebsocketHandle = websocketpp::connection_hdl; + using WebsocketErrorCode = websocketpp::lib::error_code; +} // namespace liveupdates + +} // namespace chatterino diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 827fe67c2..e71dfd3ee 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -20,6 +20,7 @@ set(test_SOURCES ${CMAKE_CURRENT_LIST_DIR}/src/HighlightController.cpp ${CMAKE_CURRENT_LIST_DIR}/src/FormatTime.cpp ${CMAKE_CURRENT_LIST_DIR}/src/LimitedQueue.cpp + ${CMAKE_CURRENT_LIST_DIR}/src/BasicPubSub.cpp # Add your new file above this line! ) diff --git a/tests/src/BasicPubSub.cpp b/tests/src/BasicPubSub.cpp new file mode 100644 index 000000000..d4fc0143b --- /dev/null +++ b/tests/src/BasicPubSub.cpp @@ -0,0 +1,148 @@ +#include "providers/liveupdates/BasicPubSubClient.hpp" +#include "providers/liveupdates/BasicPubSubManager.hpp" + +#include +#include +#include +#include +#include + +#include +#include +#include + +using namespace chatterino; +using namespace std::chrono_literals; + +struct DummySubscription { + int type; + QString condition; + + bool operator==(const DummySubscription &rhs) const + { + return std::tie(this->condition, this->type) == + std::tie(rhs.condition, rhs.type); + } + bool operator!=(const DummySubscription &rhs) const + { + return !(rhs == *this); + } + + QByteArray encodeSubscribe() const + { + QJsonObject root; + root["op"] = "sub"; + root["type"] = this->type; + root["condition"] = this->condition; + return QJsonDocument(root).toJson(); + } + QByteArray encodeUnsubscribe() const + { + QJsonObject root; + root["op"] = "unsub"; + root["type"] = this->type; + root["condition"] = this->condition; + return QJsonDocument(root).toJson(); + } + + friend QDebug &operator<<(QDebug &dbg, + const DummySubscription &subscription) + { + dbg << "DummySubscription{ condition:" << subscription.condition + << "type:" << (int)subscription.type << '}'; + return dbg; + } +}; + +namespace std { +template <> +struct hash { + size_t operator()(const DummySubscription &sub) const + { + return (size_t)qHash(sub.condition, qHash(sub.type)); + } +}; +} // namespace std + +class MyManager : public BasicPubSubManager +{ +public: + MyManager(QString host) + : BasicPubSubManager(std::move(host)) + { + } + + std::atomic messagesReceived{0}; + + std::optional popMessage() + { + std::lock_guard guard(this->messageMtx_); + if (this->messageQueue_.empty()) + { + return std::nullopt; + } + QString front = this->messageQueue_.front(); + this->messageQueue_.pop_front(); + return front; + } + + void sub(const DummySubscription &sub) + { + // We don't track subscriptions in this test + this->subscribe(sub); + } + + void unsub(const DummySubscription &sub) + { + this->unsubscribe(sub); + } + +protected: + void onMessage( + websocketpp::connection_hdl /*hdl*/, + BasicPubSubManager::WebsocketMessagePtr msg) override + { + std::lock_guard guard(this->messageMtx_); + this->messagesReceived.fetch_add(1, std::memory_order_acq_rel); + this->messageQueue_.emplace_back( + QString::fromStdString(msg->get_payload())); + } + +private: + std::mutex messageMtx_; + std::deque messageQueue_; +}; + +TEST(BasicPubSub, SubscriptionCycle) +{ + const QString host("wss://127.0.0.1:9050/liveupdates/sub-unsub"); + auto *manager = new MyManager(host); + manager->start(); + + std::this_thread::sleep_for(50ms); + manager->sub({1, "foo"}); + std::this_thread::sleep_for(500ms); + + ASSERT_EQ(manager->diag.connectionsOpened, 1); + ASSERT_EQ(manager->diag.connectionsClosed, 0); + ASSERT_EQ(manager->diag.connectionsFailed, 0); + ASSERT_EQ(manager->messagesReceived, 1); + + ASSERT_EQ(manager->popMessage(), QString("ack-sub-1-foo")); + + manager->unsub({1, "foo"}); + std::this_thread::sleep_for(50ms); + + ASSERT_EQ(manager->diag.connectionsOpened, 1); + ASSERT_EQ(manager->diag.connectionsClosed, 0); + ASSERT_EQ(manager->diag.connectionsFailed, 0); + ASSERT_EQ(manager->messagesReceived, 2); + ASSERT_EQ(manager->popMessage(), QString("ack-unsub-1-foo")); + + manager->stop(); + + ASSERT_EQ(manager->diag.connectionsOpened, 1); + ASSERT_EQ(manager->diag.connectionsClosed, 1); + ASSERT_EQ(manager->diag.connectionsFailed, 0); + ASSERT_EQ(manager->messagesReceived, 2); +}