Fix PubSub client creation/pending topic resolving (#3037)

Co-authored-by: Rasmus Karlsson <rasmus.karlsson@pajlada.com>
Co-authored-by: Felanbird <41973452+Felanbird@users.noreply.github.com>
Co-authored-by: zneix <zneix@zneix.eu>
This commit is contained in:
pajlada 2021-07-25 17:13:04 +02:00 committed by GitHub
parent 33d1837f4f
commit 770b9f263b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 128 additions and 28 deletions

View file

@ -9,6 +9,7 @@
- Minor: Added informative messages for recent-messages API's errors. (#3029)
- Minor: Added section with helpful Chatterino-related links to the About page. (#3068)
- Bugfix: Fixed "smiley" emotes being unable to be "Tabbed" with autocompletion, introduced in v2.3.3. (#3010)
- Bugfix: Fixed PubSub not properly trying to resolve pending listens when the pending listens list was larger than 50. (#3037)
- Bugfix: Copy buttons in usercard now show properly in light mode (#3057)
- Bugfix: Fixed comma appended to username completion when not at the beginning of the message. (#3060)
- Bugfix: Fixed bug misplacing chat when zooming on Chrome with Chatterino Native Host extension (#1936)

View file

@ -3,6 +3,7 @@
#include "providers/twitch/PubsubActions.hpp"
#include "providers/twitch/PubsubHelpers.hpp"
#include "singletons/Settings.hpp"
#include "util/DebugCount.hpp"
#include "util/Helpers.hpp"
#include "util/RapidjsonHelpers.hpp"
@ -23,7 +24,8 @@ namespace chatterino {
static const char *pingPayload = "{\"type\":\"PING\"}";
static std::map<QString, QString> sentMessages;
static std::map<QString, RequestMessage> sentListens;
static std::map<QString, RequestMessage> sentUnlistens;
namespace detail {
@ -59,8 +61,9 @@ namespace detail {
// This PubSubClient is already at its peak listens
return false;
}
this->numListens_ += numRequestedListens;
DebugCount::increase("PubSub topic pending listens",
numRequestedListens);
for (const auto &topic : message["data"]["topics"].GetArray())
{
@ -68,12 +71,11 @@ namespace detail {
Listener{topic.GetString(), false, false, false});
}
auto uuid = generateUuid();
rj::set(message, "nonce", uuid);
auto nonce = generateUuid();
rj::set(message, "nonce", nonce);
QString payload = rj::stringify(message);
sentMessages[uuid] = payload;
sentListens[nonce] = RequestMessage{payload, numRequestedListens};
this->send(payload.toUtf8());
@ -103,14 +105,19 @@ namespace detail {
return;
}
int numRequestedUnlistens = topics.size();
this->numListens_ -= numRequestedUnlistens;
DebugCount::increase("PubSub topic pending unlistens",
numRequestedUnlistens);
auto message = createUnlistenMessage(topics);
auto uuid = generateUuid();
rj::set(message, "nonce", generateUuid());
auto nonce = generateUuid();
rj::set(message, "nonce", nonce);
QString payload = rj::stringify(message);
sentMessages[uuid] = payload;
sentUnlistens[nonce] = RequestMessage{payload, numRequestedUnlistens};
this->send(payload.toUtf8());
}
@ -865,6 +872,13 @@ PubSub::PubSub()
void PubSub::addClient()
{
if (this->addingClient)
{
return;
}
this->addingClient = true;
websocketpp::lib::error_code ec;
auto con = this->websocketClient.get_connection(TWITCH_PUBSUB_URL, ec);
@ -998,6 +1012,8 @@ void PubSub::listen(rapidjson::Document &&msg)
this->requests.emplace_back(
std::make_unique<rapidjson::Document>(std::move(msg)));
DebugCount::increase("PubSub topic backlog");
}
bool PubSub::tryListen(rapidjson::Document &msg)
@ -1066,7 +1082,7 @@ void PubSub::onMessage(websocketpp::connection_hdl hdl,
if (type == "RESPONSE")
{
this->handleListenResponse(msg);
this->handleResponse(msg);
}
else if (type == "MESSAGE")
{
@ -1107,6 +1123,9 @@ void PubSub::onMessage(websocketpp::connection_hdl hdl,
void PubSub::onConnectionOpen(WebsocketHandle hdl)
{
DebugCount::increase("PubSub connections");
this->addingClient = false;
auto client =
std::make_shared<detail::PubSubClient>(this->websocketClient, hdl);
@ -1123,6 +1142,7 @@ void PubSub::onConnectionOpen(WebsocketHandle hdl)
const auto &request = *it;
if (client->listen(*request))
{
DebugCount::decrease("PubSub topic backlog");
it = this->requests.erase(it);
}
else
@ -1130,10 +1150,16 @@ void PubSub::onConnectionOpen(WebsocketHandle hdl)
++it;
}
}
if (!this->requests.empty())
{
this->addClient();
}
}
void PubSub::onConnectionClose(WebsocketHandle hdl)
{
DebugCount::decrease("PubSub connections");
auto clientIt = this->clients.find(hdl);
// If this assert goes off, there's something wrong with the connection
@ -1169,26 +1195,63 @@ PubSub::WebsocketContextPtr PubSub::onTLSInit(websocketpp::connection_hdl hdl)
return ctx;
}
void PubSub::handleListenResponse(const rapidjson::Document &msg)
void PubSub::handleResponse(const rapidjson::Document &msg)
{
QString error;
if (rj::getSafe(msg, "error", error))
{
QString nonce;
rj::getSafe(msg, "nonce", nonce);
if (error.isEmpty())
{
qCDebug(chatterinoPubsub)
<< "Successfully listened to nonce" << nonce;
// Nothing went wrong
return;
}
qCDebug(chatterinoPubsub)
<< "PubSub error:" << error << "on nonce" << nonce;
if (!rj::getSafe(msg, "error", error))
return;
QString nonce;
rj::getSafe(msg, "nonce", nonce);
const bool failed = !error.isEmpty();
if (failed)
{
qCDebug(chatterinoPubsub)
<< QString("Error %1 on nonce %2").arg(error, nonce);
}
if (auto it = sentListens.find(nonce); it != sentListens.end())
{
this->handleListenResponse(it->second, failed);
return;
}
if (auto it = sentUnlistens.find(nonce); it != sentUnlistens.end())
{
this->handleUnlistenResponse(it->second, failed);
return;
}
qCDebug(chatterinoPubsub)
<< "Response on unused" << nonce << "client/topic listener mismatch?";
}
void PubSub::handleListenResponse(const RequestMessage &msg, bool failed)
{
DebugCount::decrease("PubSub topic pending listens", msg.topicCount);
if (failed)
{
DebugCount::increase("PubSub topic failed listens", msg.topicCount);
}
else
{
DebugCount::increase("PubSub topic listening", msg.topicCount);
}
}
void PubSub::handleUnlistenResponse(const RequestMessage &msg, bool failed)
{
DebugCount::decrease("PubSub topic pending unlistens", msg.topicCount);
if (failed)
{
DebugCount::increase("PubSub topic failed unlistens", msg.topicCount);
}
else
{
DebugCount::decrease("PubSub topic listening", msg.topicCount);
}
}

View file

@ -47,6 +47,11 @@ using WebsocketErrorCode = websocketpp::lib::error_code;
#define MAX_PUBSUB_LISTENS 50
#define MAX_PUBSUB_CONNECTIONS 10
struct RequestMessage {
QString payload;
int topicCount;
};
namespace detail {
struct Listener {
@ -172,6 +177,7 @@ private:
bool isListeningToTopic(const QString &topic);
void addClient();
std::atomic<bool> addingClient{false};
State state = State::Connected;
@ -192,7 +198,9 @@ private:
void onConnectionClose(websocketpp::connection_hdl hdl);
WebsocketContextPtr onTLSInit(websocketpp::connection_hdl hdl);
void handleListenResponse(const rapidjson::Document &msg);
void handleResponse(const rapidjson::Document &msg);
void handleListenResponse(const RequestMessage &msg, bool failed);
void handleUnlistenResponse(const RequestMessage &msg, bool failed);
void handleMessageResponse(const rapidjson::Value &data);
void runThread();

View file

@ -27,6 +27,20 @@ public:
reinterpret_cast<int64_t &>(it.value())++;
}
}
static void increase(const QString &name, const int64_t &amount)
{
auto counts = counts_.access();
auto it = counts->find(name);
if (it == counts->end())
{
counts->insert(name, amount);
}
else
{
reinterpret_cast<int64_t &>(it.value()) += amount;
}
}
static void decrease(const QString &name)
{
@ -42,6 +56,20 @@ public:
reinterpret_cast<int64_t &>(it.value())--;
}
}
static void decrease(const QString &name, const int64_t &amount)
{
auto counts = counts_.access();
auto it = counts->find(name);
if (it == counts->end())
{
counts->insert(name, -amount);
}
else
{
reinterpret_cast<int64_t &>(it.value()) -= amount;
}
}
static QString getDebugText()
{