feat: Basic PubSub Manager for Live Emote Updates (#4087)

Co-authored-by: Rasmus Karlsson <rasmus.karlsson@pajlada.com>
This commit is contained in:
nerix 2022-10-29 14:01:01 +02:00 committed by GitHub
parent dd39bd66a0
commit ff684fc7ed
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 742 additions and 1 deletions

View file

@ -42,6 +42,8 @@ CheckOptions:
value: .*
- key: readability-identifier-naming.PrivateMemberSuffix
value: _
- key: readability-identifier-naming.ProtectedMemberSuffix
value: _
- key: readability-identifier-naming.UnionCase
value: CamelCase
- key: readability-identifier-naming.GlobalVariableCase

View file

@ -6,7 +6,7 @@ on:
workflow_dispatch:
env:
TWITCH_PUBSUB_SERVER_IMAGE: ghcr.io/chatterino/twitch-pubsub-server-test:v1.0.3
TWITCH_PUBSUB_SERVER_IMAGE: ghcr.io/chatterino/twitch-pubsub-server-test:v1.0.4
concurrency:
group: test-${{ github.ref }}

View file

@ -214,6 +214,10 @@ set(SOURCE_FILES
providers/irc/IrcServer.cpp
providers/irc/IrcServer.hpp
providers/liveupdates/BasicPubSubClient.hpp
providers/liveupdates/BasicPubSubManager.hpp
providers/liveupdates/BasicPubSubWebsocket.hpp
providers/seventv/SeventvBadges.cpp
providers/seventv/SeventvBadges.hpp
providers/seventv/SeventvEmotes.cpp

View file

@ -22,6 +22,8 @@ Q_LOGGING_CATEGORY(chatterinoHTTP, "chatterino.http", logThreshold);
Q_LOGGING_CATEGORY(chatterinoImage, "chatterino.image", logThreshold);
Q_LOGGING_CATEGORY(chatterinoIrc, "chatterino.irc", logThreshold);
Q_LOGGING_CATEGORY(chatterinoIvr, "chatterino.ivr", logThreshold);
Q_LOGGING_CATEGORY(chatterinoLiveupdates, "chatterino.liveupdates",
logThreshold);
Q_LOGGING_CATEGORY(chatterinoMain, "chatterino.main", logThreshold);
Q_LOGGING_CATEGORY(chatterinoMessage, "chatterino.message", logThreshold);
Q_LOGGING_CATEGORY(chatterinoNativeMessage, "chatterino.nativemessage",

View file

@ -18,6 +18,7 @@ Q_DECLARE_LOGGING_CATEGORY(chatterinoHTTP);
Q_DECLARE_LOGGING_CATEGORY(chatterinoImage);
Q_DECLARE_LOGGING_CATEGORY(chatterinoIrc);
Q_DECLARE_LOGGING_CATEGORY(chatterinoIvr);
Q_DECLARE_LOGGING_CATEGORY(chatterinoLiveupdates);
Q_DECLARE_LOGGING_CATEGORY(chatterinoMain);
Q_DECLARE_LOGGING_CATEGORY(chatterinoMessage);
Q_DECLARE_LOGGING_CATEGORY(chatterinoNativeMessage);

View file

@ -0,0 +1,177 @@
#pragma once
#include <atomic>
#include <chrono>
#include <pajlada/signals/signal.hpp>
#include <unordered_set>
#include "common/QLogging.hpp"
#include "providers/liveupdates/BasicPubSubWebsocket.hpp"
#include "singletons/Settings.hpp"
#include "util/DebugCount.hpp"
#include "util/Helpers.hpp"
namespace chatterino {
/**
* This class manages a single connection
* that has at most #maxSubscriptions subscriptions.
*
* You can safely overload the #onConnectionEstablished method
* and e.g. add additional heartbeat logic.
*
* You can use shared_from_this to get a shared_ptr of this client.
*
* @tparam Subscription see BasicPubSubManager
*/
template <typename Subscription>
class BasicPubSubClient
: public std::enable_shared_from_this<BasicPubSubClient<Subscription>>
{
public:
// The maximum amount of subscriptions this connections can handle
const size_t maxSubscriptions;
BasicPubSubClient(liveupdates::WebsocketClient &websocketClient,
liveupdates::WebsocketHandle handle,
size_t maxSubscriptions = 100)
: maxSubscriptions(maxSubscriptions)
, websocketClient_(websocketClient)
, handle_(std::move(handle))
{
}
virtual ~BasicPubSubClient() = default;
BasicPubSubClient(const BasicPubSubClient &) = delete;
BasicPubSubClient(const BasicPubSubClient &&) = delete;
BasicPubSubClient &operator=(const BasicPubSubClient &) = delete;
BasicPubSubClient &operator=(const BasicPubSubClient &&) = delete;
protected:
virtual void onConnectionEstablished()
{
}
bool send(const char *payload)
{
liveupdates::WebsocketErrorCode ec;
this->websocketClient_.send(this->handle_, payload,
websocketpp::frame::opcode::text, ec);
if (ec)
{
qCDebug(chatterinoLiveupdates) << "Error sending message" << payload
<< ":" << ec.message().c_str();
return false;
}
return true;
}
/**
* @return true if this client subscribed to this subscription
* and the current subscriptions don't exceed the maximum
* amount.
* It won't subscribe twice to the same subscription.
* Don't use this in place of subscription management
* in the BasicPubSubManager.
*/
bool subscribe(const Subscription &subscription)
{
if (this->subscriptions_.size() >= this->maxSubscriptions)
{
return false;
}
if (!this->subscriptions_.emplace(subscription).second)
{
qCWarning(chatterinoLiveupdates)
<< "Tried subscribing to" << subscription
<< "but we're already subscribed!";
return true; // true because the subscription already exists
}
qCDebug(chatterinoLiveupdates) << "Subscribing to" << subscription;
DebugCount::increase("LiveUpdates subscriptions");
QByteArray encoded = subscription.encodeSubscribe();
this->send(encoded);
return true;
}
/**
* @return true if this client previously subscribed
* and now unsubscribed from this subscription.
*/
bool unsubscribe(const Subscription &subscription)
{
if (this->subscriptions_.erase(subscription) <= 0)
{
return false;
}
qCDebug(chatterinoLiveupdates) << "Unsubscribing from" << subscription;
DebugCount::decrease("LiveUpdates subscriptions");
QByteArray encoded = subscription.encodeUnsubscribe();
this->send(encoded);
return true;
}
bool isStarted() const
{
return this->started_.load(std::memory_order_acquire);
}
liveupdates::WebsocketClient &websocketClient_;
private:
void start()
{
assert(!this->isStarted());
this->started_.store(true, std::memory_order_release);
this->onConnectionEstablished();
}
void stop()
{
assert(this->isStarted());
this->started_.store(false, std::memory_order_release);
}
void close(const std::string &reason,
websocketpp::close::status::value code =
websocketpp::close::status::normal)
{
liveupdates::WebsocketErrorCode ec;
auto conn = this->websocketClient_.get_con_from_hdl(this->handle_, ec);
if (ec)
{
qCDebug(chatterinoLiveupdates)
<< "Error getting connection:" << ec.message().c_str();
return;
}
conn->close(code, reason, ec);
if (ec)
{
qCDebug(chatterinoLiveupdates)
<< "Error closing:" << ec.message().c_str();
return;
}
}
liveupdates::WebsocketHandle handle_;
std::unordered_set<Subscription> subscriptions_;
std::atomic<bool> started_{false};
template <typename ManagerSubscription>
friend class BasicPubSubManager;
};
} // namespace chatterino

View file

@ -0,0 +1,370 @@
#pragma once
#include "common/QLogging.hpp"
#include "providers/liveupdates/BasicPubSubClient.hpp"
#include "providers/liveupdates/BasicPubSubWebsocket.hpp"
#include "providers/twitch/PubSubHelpers.hpp"
#include "util/DebugCount.hpp"
#include "util/ExponentialBackoff.hpp"
#include <QJsonObject>
#include <QString>
#include <pajlada/signals/signal.hpp>
#include <websocketpp/client.hpp>
#include <algorithm>
#include <atomic>
#include <chrono>
#include <exception>
#include <map>
#include <memory>
#include <thread>
#include <unordered_map>
#include <utility>
#include <vector>
namespace chatterino {
/**
* This class is the basis for connecting and interacting with
* simple PubSub servers over the Websocket protocol.
* It acts as a pool for connections (see BasicPubSubClient).
*
* You can customize the clients, by creating your custom
* client in ::createClient.
*
* You **must** implement #onMessage. The method gets called for every
* received message on every connection.
* If you want to get the connection this message was received on,
* use #findClient.
*
* You must expose your own subscribe and unsubscribe methods
* (e.g. [un-]subscribeTopic).
* This manager does not keep track of the subscriptions.
*
* @tparam Subscription
* The subscription has the following requirements:
* It must have the methods QByteArray encodeSubscribe(),
* and QByteArray encodeUnsubscribe().
* It must have an overload for
* QDebug &operator<< (see tests/src/BasicPubSub.cpp),
* a specialization for std::hash,
* and and overload for operator== and operator!=.
*
* @see BasicPubSubClient
*/
template <typename Subscription>
class BasicPubSubManager
{
public:
BasicPubSubManager(QString host)
: host_(std::move(host))
{
this->websocketClient_.set_access_channels(
websocketpp::log::alevel::all);
this->websocketClient_.clear_access_channels(
websocketpp::log::alevel::frame_payload |
websocketpp::log::alevel::frame_header);
this->websocketClient_.init_asio();
// SSL Handshake
this->websocketClient_.set_tls_init_handler([this](auto hdl) {
return this->onTLSInit(hdl);
});
this->websocketClient_.set_message_handler([this](auto hdl, auto msg) {
this->onMessage(hdl, msg);
});
this->websocketClient_.set_open_handler([this](auto hdl) {
this->onConnectionOpen(hdl);
});
this->websocketClient_.set_close_handler([this](auto hdl) {
this->onConnectionClose(hdl);
});
this->websocketClient_.set_fail_handler([this](auto hdl) {
this->onConnectionFail(hdl);
});
}
virtual ~BasicPubSubManager() = default;
BasicPubSubManager(const BasicPubSubManager &) = delete;
BasicPubSubManager(const BasicPubSubManager &&) = delete;
BasicPubSubManager &operator=(const BasicPubSubManager &) = delete;
BasicPubSubManager &operator=(const BasicPubSubManager &&) = delete;
/** This is only used for testing. */
struct {
std::atomic<uint32_t> connectionsClosed{0};
std::atomic<uint32_t> connectionsOpened{0};
std::atomic<uint32_t> connectionsFailed{0};
} diag;
void start()
{
this->work_ = std::make_shared<boost::asio::io_service::work>(
this->websocketClient_.get_io_service());
this->mainThread_.reset(new std::thread([this] {
runThread();
}));
}
void stop()
{
this->stopping_ = true;
for (const auto &client : this->clients_)
{
client.second->close("Shutting down");
}
this->work_.reset();
if (this->mainThread_->joinable())
{
this->mainThread_->join();
}
assert(this->clients_.empty());
}
protected:
using WebsocketMessagePtr =
websocketpp::config::asio_tls_client::message_type::ptr;
using WebsocketContextPtr =
websocketpp::lib::shared_ptr<boost::asio::ssl::context>;
virtual void onMessage(websocketpp::connection_hdl hdl,
WebsocketMessagePtr msg) = 0;
virtual std::shared_ptr<BasicPubSubClient<Subscription>> createClient(
liveupdates::WebsocketClient &client, websocketpp::connection_hdl hdl)
{
return std::make_shared<BasicPubSubClient<Subscription>>(client, hdl);
}
/**
* @param hdl The handle of the client.
* @return The client managing this connection, empty shared_ptr otherwise.
*/
std::shared_ptr<BasicPubSubClient<Subscription>> findClient(
websocketpp::connection_hdl hdl)
{
auto clientIt = this->clients_.find(hdl);
if (clientIt == this->clients_.end())
{
return {};
}
return clientIt->second;
}
void unsubscribe(const Subscription &subscription)
{
for (auto &client : this->clients_)
{
if (client.second->unsubscribe(subscription))
{
return;
}
}
}
void subscribe(const Subscription &subscription)
{
if (this->trySubscribe(subscription))
{
return;
}
this->addClient();
this->pendingSubscriptions_.emplace_back(subscription);
DebugCount::increase("LiveUpdates subscription backlog");
}
private:
void onConnectionOpen(websocketpp::connection_hdl hdl)
{
DebugCount::increase("LiveUpdates connections");
this->addingClient_ = false;
this->diag.connectionsOpened.fetch_add(1, std::memory_order_acq_rel);
this->connectBackoff_.reset();
auto client = this->createClient(this->websocketClient_, hdl);
// We separate the starting from the constructor because we will want to use
// shared_from_this
client->start();
this->clients_.emplace(hdl, client);
auto pendingSubsToTake = (std::min)(this->pendingSubscriptions_.size(),
client->maxSubscriptions);
qCDebug(chatterinoLiveupdates)
<< "LiveUpdate connection opened, subscribing to"
<< pendingSubsToTake << "subscriptions!";
while (pendingSubsToTake > 0 && !this->pendingSubscriptions_.empty())
{
const auto last = std::move(this->pendingSubscriptions_.back());
this->pendingSubscriptions_.pop_back();
if (!client->subscribe(last))
{
qCDebug(chatterinoLiveupdates)
<< "Failed to subscribe to" << last << "on new client.";
// TODO: should we try to add a new client here?
return;
}
DebugCount::decrease("LiveUpdates subscription backlog");
pendingSubsToTake--;
}
if (!this->pendingSubscriptions_.empty())
{
this->addClient();
}
}
void onConnectionFail(websocketpp::connection_hdl hdl)
{
DebugCount::increase("LiveUpdates failed connections");
this->diag.connectionsFailed.fetch_add(1, std::memory_order_acq_rel);
if (auto conn = this->websocketClient_.get_con_from_hdl(std::move(hdl)))
{
qCDebug(chatterinoLiveupdates)
<< "LiveUpdates connection attempt failed (error: "
<< conn->get_ec().message().c_str() << ")";
}
else
{
qCDebug(chatterinoLiveupdates)
<< "LiveUpdates connection attempt failed but we can't get the "
"connection from a handle.";
}
this->addingClient_ = false;
if (!this->pendingSubscriptions_.empty())
{
runAfter(this->websocketClient_.get_io_service(),
this->connectBackoff_.next(), [this](auto /*timer*/) {
this->addClient();
});
}
}
void onConnectionClose(websocketpp::connection_hdl hdl)
{
qCDebug(chatterinoLiveupdates) << "Connection closed";
DebugCount::decrease("LiveUpdates connections");
this->diag.connectionsClosed.fetch_add(1, std::memory_order_acq_rel);
auto clientIt = this->clients_.find(hdl);
// If this assert goes off, there's something wrong with the connection
// creation/preserving code KKona
assert(clientIt != this->clients_.end());
auto client = clientIt->second;
this->clients_.erase(clientIt);
client->stop();
if (!this->stopping_)
{
for (const auto &sub : client->subscriptions_)
{
this->subscribe(sub);
}
}
}
WebsocketContextPtr onTLSInit(const websocketpp::connection_hdl & /*hdl*/)
{
WebsocketContextPtr ctx(
new boost::asio::ssl::context(boost::asio::ssl::context::tlsv12));
try
{
ctx->set_options(boost::asio::ssl::context::default_workarounds |
boost::asio::ssl::context::no_sslv2 |
boost::asio::ssl::context::single_dh_use);
}
catch (const std::exception &e)
{
qCDebug(chatterinoLiveupdates)
<< "Exception caught in OnTLSInit:" << e.what();
}
return ctx;
}
void runThread()
{
qCDebug(chatterinoLiveupdates) << "Start LiveUpdates manager thread";
this->websocketClient_.run();
qCDebug(chatterinoLiveupdates)
<< "Done with LiveUpdates manager thread";
}
void addClient()
{
if (this->addingClient_)
{
return;
}
qCDebug(chatterinoLiveupdates) << "Adding an additional client";
this->addingClient_ = true;
websocketpp::lib::error_code ec;
auto con = this->websocketClient_.get_connection(
this->host_.toStdString(), ec);
if (ec)
{
qCDebug(chatterinoLiveupdates)
<< "Unable to establish connection:" << ec.message().c_str();
return;
}
this->websocketClient_.connect(con);
}
bool trySubscribe(const Subscription &subscription)
{
for (auto &client : this->clients_)
{
if (client.second->subscribe(subscription))
{
return true;
}
}
return false;
}
std::map<liveupdates::WebsocketHandle,
std::shared_ptr<BasicPubSubClient<Subscription>>,
std::owner_less<liveupdates::WebsocketHandle>>
clients_;
std::vector<Subscription> pendingSubscriptions_;
std::atomic<bool> addingClient_{false};
ExponentialBackoff<5> connectBackoff_{std::chrono::milliseconds(1000)};
std::shared_ptr<boost::asio::io_service::work> work_{nullptr};
liveupdates::WebsocketClient websocketClient_;
std::unique_ptr<std::thread> mainThread_;
const QString host_;
bool stopping_{false};
};
} // namespace chatterino

View file

@ -0,0 +1,36 @@
#pragma once
#include "providers/twitch/ChatterinoWebSocketppLogger.hpp"
#include <websocketpp/client.hpp>
#include <websocketpp/config/asio_client.hpp>
#include <websocketpp/extensions/permessage_deflate/disabled.hpp>
#include <websocketpp/logger/basic.hpp>
namespace chatterino {
struct BasicPubSubConfig : public websocketpp::config::asio_tls_client {
// NOLINTBEGIN(modernize-use-using)
typedef websocketpp::log::chatterinowebsocketpplogger<
concurrency_type, websocketpp::log::elevel>
elog_type;
typedef websocketpp::log::chatterinowebsocketpplogger<
concurrency_type, websocketpp::log::alevel>
alog_type;
struct PerMessageDeflateConfig {
};
typedef websocketpp::extensions::permessage_deflate::disabled<
PerMessageDeflateConfig>
permessage_deflate_type;
// NOLINTEND(modernize-use-using)
};
namespace liveupdates {
using WebsocketClient = websocketpp::client<chatterino::BasicPubSubConfig>;
using WebsocketHandle = websocketpp::connection_hdl;
using WebsocketErrorCode = websocketpp::lib::error_code;
} // namespace liveupdates
} // namespace chatterino

View file

@ -20,6 +20,7 @@ set(test_SOURCES
${CMAKE_CURRENT_LIST_DIR}/src/HighlightController.cpp
${CMAKE_CURRENT_LIST_DIR}/src/FormatTime.cpp
${CMAKE_CURRENT_LIST_DIR}/src/LimitedQueue.cpp
${CMAKE_CURRENT_LIST_DIR}/src/BasicPubSub.cpp
# Add your new file above this line!
)

148
tests/src/BasicPubSub.cpp Normal file
View file

@ -0,0 +1,148 @@
#include "providers/liveupdates/BasicPubSubClient.hpp"
#include "providers/liveupdates/BasicPubSubManager.hpp"
#include <gtest/gtest.h>
#include <QByteArray>
#include <QJsonDocument>
#include <QJsonObject>
#include <QString>
#include <deque>
#include <mutex>
#include <optional>
using namespace chatterino;
using namespace std::chrono_literals;
struct DummySubscription {
int type;
QString condition;
bool operator==(const DummySubscription &rhs) const
{
return std::tie(this->condition, this->type) ==
std::tie(rhs.condition, rhs.type);
}
bool operator!=(const DummySubscription &rhs) const
{
return !(rhs == *this);
}
QByteArray encodeSubscribe() const
{
QJsonObject root;
root["op"] = "sub";
root["type"] = this->type;
root["condition"] = this->condition;
return QJsonDocument(root).toJson();
}
QByteArray encodeUnsubscribe() const
{
QJsonObject root;
root["op"] = "unsub";
root["type"] = this->type;
root["condition"] = this->condition;
return QJsonDocument(root).toJson();
}
friend QDebug &operator<<(QDebug &dbg,
const DummySubscription &subscription)
{
dbg << "DummySubscription{ condition:" << subscription.condition
<< "type:" << (int)subscription.type << '}';
return dbg;
}
};
namespace std {
template <>
struct hash<DummySubscription> {
size_t operator()(const DummySubscription &sub) const
{
return (size_t)qHash(sub.condition, qHash(sub.type));
}
};
} // namespace std
class MyManager : public BasicPubSubManager<DummySubscription>
{
public:
MyManager(QString host)
: BasicPubSubManager(std::move(host))
{
}
std::atomic<int32_t> messagesReceived{0};
std::optional<QString> popMessage()
{
std::lock_guard<std::mutex> guard(this->messageMtx_);
if (this->messageQueue_.empty())
{
return std::nullopt;
}
QString front = this->messageQueue_.front();
this->messageQueue_.pop_front();
return front;
}
void sub(const DummySubscription &sub)
{
// We don't track subscriptions in this test
this->subscribe(sub);
}
void unsub(const DummySubscription &sub)
{
this->unsubscribe(sub);
}
protected:
void onMessage(
websocketpp::connection_hdl /*hdl*/,
BasicPubSubManager<DummySubscription>::WebsocketMessagePtr msg) override
{
std::lock_guard<std::mutex> guard(this->messageMtx_);
this->messagesReceived.fetch_add(1, std::memory_order_acq_rel);
this->messageQueue_.emplace_back(
QString::fromStdString(msg->get_payload()));
}
private:
std::mutex messageMtx_;
std::deque<QString> messageQueue_;
};
TEST(BasicPubSub, SubscriptionCycle)
{
const QString host("wss://127.0.0.1:9050/liveupdates/sub-unsub");
auto *manager = new MyManager(host);
manager->start();
std::this_thread::sleep_for(50ms);
manager->sub({1, "foo"});
std::this_thread::sleep_for(500ms);
ASSERT_EQ(manager->diag.connectionsOpened, 1);
ASSERT_EQ(manager->diag.connectionsClosed, 0);
ASSERT_EQ(manager->diag.connectionsFailed, 0);
ASSERT_EQ(manager->messagesReceived, 1);
ASSERT_EQ(manager->popMessage(), QString("ack-sub-1-foo"));
manager->unsub({1, "foo"});
std::this_thread::sleep_for(50ms);
ASSERT_EQ(manager->diag.connectionsOpened, 1);
ASSERT_EQ(manager->diag.connectionsClosed, 0);
ASSERT_EQ(manager->diag.connectionsFailed, 0);
ASSERT_EQ(manager->messagesReceived, 2);
ASSERT_EQ(manager->popMessage(), QString("ack-unsub-1-foo"));
manager->stop();
ASSERT_EQ(manager->diag.connectionsOpened, 1);
ASSERT_EQ(manager->diag.connectionsClosed, 1);
ASSERT_EQ(manager->diag.connectionsFailed, 0);
ASSERT_EQ(manager->messagesReceived, 2);
}