From 770b9f263b947b7fc1b293bcd29878463e97d534 Mon Sep 17 00:00:00 2001 From: pajlada Date: Sun, 25 Jul 2021 17:13:04 +0200 Subject: [PATCH] Fix PubSub client creation/pending topic resolving (#3037) Co-authored-by: Rasmus Karlsson Co-authored-by: Felanbird <41973452+Felanbird@users.noreply.github.com> Co-authored-by: zneix --- CHANGELOG.md | 1 + src/providers/twitch/PubsubClient.cpp | 117 ++++++++++++++++++++------ src/providers/twitch/PubsubClient.hpp | 10 ++- src/util/DebugCount.hpp | 28 ++++++ 4 files changed, 128 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a1fb92700..30ee83b3d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ - Minor: Added informative messages for recent-messages API's errors. (#3029) - Minor: Added section with helpful Chatterino-related links to the About page. (#3068) - Bugfix: Fixed "smiley" emotes being unable to be "Tabbed" with autocompletion, introduced in v2.3.3. (#3010) +- Bugfix: Fixed PubSub not properly trying to resolve pending listens when the pending listens list was larger than 50. (#3037) - Bugfix: Copy buttons in usercard now show properly in light mode (#3057) - Bugfix: Fixed comma appended to username completion when not at the beginning of the message. (#3060) - Bugfix: Fixed bug misplacing chat when zooming on Chrome with Chatterino Native Host extension (#1936) diff --git a/src/providers/twitch/PubsubClient.cpp b/src/providers/twitch/PubsubClient.cpp index 1dfcd02e9..af131bbce 100644 --- a/src/providers/twitch/PubsubClient.cpp +++ b/src/providers/twitch/PubsubClient.cpp @@ -3,6 +3,7 @@ #include "providers/twitch/PubsubActions.hpp" #include "providers/twitch/PubsubHelpers.hpp" #include "singletons/Settings.hpp" +#include "util/DebugCount.hpp" #include "util/Helpers.hpp" #include "util/RapidjsonHelpers.hpp" @@ -23,7 +24,8 @@ namespace chatterino { static const char *pingPayload = "{\"type\":\"PING\"}"; -static std::map sentMessages; +static std::map sentListens; +static std::map sentUnlistens; namespace detail { @@ -59,8 +61,9 @@ namespace detail { // This PubSubClient is already at its peak listens return false; } - this->numListens_ += numRequestedListens; + DebugCount::increase("PubSub topic pending listens", + numRequestedListens); for (const auto &topic : message["data"]["topics"].GetArray()) { @@ -68,12 +71,11 @@ namespace detail { Listener{topic.GetString(), false, false, false}); } - auto uuid = generateUuid(); - - rj::set(message, "nonce", uuid); + auto nonce = generateUuid(); + rj::set(message, "nonce", nonce); QString payload = rj::stringify(message); - sentMessages[uuid] = payload; + sentListens[nonce] = RequestMessage{payload, numRequestedListens}; this->send(payload.toUtf8()); @@ -103,14 +105,19 @@ namespace detail { return; } + int numRequestedUnlistens = topics.size(); + + this->numListens_ -= numRequestedUnlistens; + DebugCount::increase("PubSub topic pending unlistens", + numRequestedUnlistens); + auto message = createUnlistenMessage(topics); - auto uuid = generateUuid(); - - rj::set(message, "nonce", generateUuid()); + auto nonce = generateUuid(); + rj::set(message, "nonce", nonce); QString payload = rj::stringify(message); - sentMessages[uuid] = payload; + sentUnlistens[nonce] = RequestMessage{payload, numRequestedUnlistens}; this->send(payload.toUtf8()); } @@ -865,6 +872,13 @@ PubSub::PubSub() void PubSub::addClient() { + if (this->addingClient) + { + return; + } + + this->addingClient = true; + websocketpp::lib::error_code ec; auto con = this->websocketClient.get_connection(TWITCH_PUBSUB_URL, ec); @@ -998,6 +1012,8 @@ void PubSub::listen(rapidjson::Document &&msg) this->requests.emplace_back( std::make_unique(std::move(msg))); + + DebugCount::increase("PubSub topic backlog"); } bool PubSub::tryListen(rapidjson::Document &msg) @@ -1066,7 +1082,7 @@ void PubSub::onMessage(websocketpp::connection_hdl hdl, if (type == "RESPONSE") { - this->handleListenResponse(msg); + this->handleResponse(msg); } else if (type == "MESSAGE") { @@ -1107,6 +1123,9 @@ void PubSub::onMessage(websocketpp::connection_hdl hdl, void PubSub::onConnectionOpen(WebsocketHandle hdl) { + DebugCount::increase("PubSub connections"); + this->addingClient = false; + auto client = std::make_shared(this->websocketClient, hdl); @@ -1123,6 +1142,7 @@ void PubSub::onConnectionOpen(WebsocketHandle hdl) const auto &request = *it; if (client->listen(*request)) { + DebugCount::decrease("PubSub topic backlog"); it = this->requests.erase(it); } else @@ -1130,10 +1150,16 @@ void PubSub::onConnectionOpen(WebsocketHandle hdl) ++it; } } + + if (!this->requests.empty()) + { + this->addClient(); + } } void PubSub::onConnectionClose(WebsocketHandle hdl) { + DebugCount::decrease("PubSub connections"); auto clientIt = this->clients.find(hdl); // If this assert goes off, there's something wrong with the connection @@ -1169,26 +1195,63 @@ PubSub::WebsocketContextPtr PubSub::onTLSInit(websocketpp::connection_hdl hdl) return ctx; } -void PubSub::handleListenResponse(const rapidjson::Document &msg) +void PubSub::handleResponse(const rapidjson::Document &msg) { QString error; - if (rj::getSafe(msg, "error", error)) - { - QString nonce; - rj::getSafe(msg, "nonce", nonce); - - if (error.isEmpty()) - { - qCDebug(chatterinoPubsub) - << "Successfully listened to nonce" << nonce; - // Nothing went wrong - return; - } - - qCDebug(chatterinoPubsub) - << "PubSub error:" << error << "on nonce" << nonce; + if (!rj::getSafe(msg, "error", error)) return; + + QString nonce; + rj::getSafe(msg, "nonce", nonce); + + const bool failed = !error.isEmpty(); + + if (failed) + { + qCDebug(chatterinoPubsub) + << QString("Error %1 on nonce %2").arg(error, nonce); + } + + if (auto it = sentListens.find(nonce); it != sentListens.end()) + { + this->handleListenResponse(it->second, failed); + return; + } + + if (auto it = sentUnlistens.find(nonce); it != sentUnlistens.end()) + { + this->handleUnlistenResponse(it->second, failed); + return; + } + + qCDebug(chatterinoPubsub) + << "Response on unused" << nonce << "client/topic listener mismatch?"; +} + +void PubSub::handleListenResponse(const RequestMessage &msg, bool failed) +{ + DebugCount::decrease("PubSub topic pending listens", msg.topicCount); + if (failed) + { + DebugCount::increase("PubSub topic failed listens", msg.topicCount); + } + else + { + DebugCount::increase("PubSub topic listening", msg.topicCount); + } +} + +void PubSub::handleUnlistenResponse(const RequestMessage &msg, bool failed) +{ + DebugCount::decrease("PubSub topic pending unlistens", msg.topicCount); + if (failed) + { + DebugCount::increase("PubSub topic failed unlistens", msg.topicCount); + } + else + { + DebugCount::decrease("PubSub topic listening", msg.topicCount); } } diff --git a/src/providers/twitch/PubsubClient.hpp b/src/providers/twitch/PubsubClient.hpp index 1e5a01d1f..06c8d50b8 100644 --- a/src/providers/twitch/PubsubClient.hpp +++ b/src/providers/twitch/PubsubClient.hpp @@ -47,6 +47,11 @@ using WebsocketErrorCode = websocketpp::lib::error_code; #define MAX_PUBSUB_LISTENS 50 #define MAX_PUBSUB_CONNECTIONS 10 +struct RequestMessage { + QString payload; + int topicCount; +}; + namespace detail { struct Listener { @@ -172,6 +177,7 @@ private: bool isListeningToTopic(const QString &topic); void addClient(); + std::atomic addingClient{false}; State state = State::Connected; @@ -192,7 +198,9 @@ private: void onConnectionClose(websocketpp::connection_hdl hdl); WebsocketContextPtr onTLSInit(websocketpp::connection_hdl hdl); - void handleListenResponse(const rapidjson::Document &msg); + void handleResponse(const rapidjson::Document &msg); + void handleListenResponse(const RequestMessage &msg, bool failed); + void handleUnlistenResponse(const RequestMessage &msg, bool failed); void handleMessageResponse(const rapidjson::Value &data); void runThread(); diff --git a/src/util/DebugCount.hpp b/src/util/DebugCount.hpp index 540a13642..ef53e6814 100644 --- a/src/util/DebugCount.hpp +++ b/src/util/DebugCount.hpp @@ -27,6 +27,20 @@ public: reinterpret_cast(it.value())++; } } + static void increase(const QString &name, const int64_t &amount) + { + auto counts = counts_.access(); + + auto it = counts->find(name); + if (it == counts->end()) + { + counts->insert(name, amount); + } + else + { + reinterpret_cast(it.value()) += amount; + } + } static void decrease(const QString &name) { @@ -42,6 +56,20 @@ public: reinterpret_cast(it.value())--; } } + static void decrease(const QString &name, const int64_t &amount) + { + auto counts = counts_.access(); + + auto it = counts->find(name); + if (it == counts->end()) + { + counts->insert(name, -amount); + } + else + { + reinterpret_cast(it.value()) -= amount; + } + } static QString getDebugText() {