From 416806bb0ae5be8451f8a7c26024add0d82fce92 Mon Sep 17 00:00:00 2001 From: pajlada Date: Sat, 6 Jan 2024 13:18:37 +0100 Subject: [PATCH] refactor: Twitch PubSub client (#5059) * Remove unused `setAccountData` function * Move PubSub out of TwitchIrcServer and into Application * Add changelog entry * fix: assert feedback * Add PubSub::unlistenPrefix as per review suggestion * Fix tests * quit pubsub on exit might conflict with exit removal, so can be reverted but this shows it's possible * Don't manually call stop on clients, it's called when the connection is closed * nit: rename `mainThread` to `thread` * Join in a thread!!!!!!!! --- CHANGELOG.md | 1 + mocks/include/mocks/EmptyApplication.hpp | 6 + src/Application.cpp | 486 +++++++++++------------ src/Application.hpp | 9 + src/RunGui.cpp | 2 + src/providers/twitch/PubSubClient.cpp | 7 +- src/providers/twitch/PubSubClient.hpp | 1 + src/providers/twitch/PubSubManager.cpp | 236 +++++------ src/providers/twitch/PubSubManager.hpp | 117 +++--- src/providers/twitch/TwitchChannel.cpp | 10 +- src/providers/twitch/TwitchIrcServer.cpp | 6 +- src/providers/twitch/TwitchIrcServer.hpp | 3 - src/widgets/Window.cpp | 4 +- tests/src/TwitchPubSubClient.cpp | 54 ++- 14 files changed, 458 insertions(+), 484 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 63dfefacc..f00312f7d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -89,6 +89,7 @@ - Dev: Refactor `DebugCount` and add copy button to debug popup. (#4921) - Dev: Refactor `common/Credentials`. (#4979) - Dev: Refactor chat logger. (#5058) +- Dev: Refactor Twitch PubSub client. (#5059) - Dev: Changed lifetime of context menus. (#4924) - Dev: Renamed `tools` directory to `scripts`. (#5035) - Dev: Refactor `ChannelView`, removing a bunch of clang-tidy warnings. (#4926) diff --git a/mocks/include/mocks/EmptyApplication.hpp b/mocks/include/mocks/EmptyApplication.hpp index 327304d77..84cadffc0 100644 --- a/mocks/include/mocks/EmptyApplication.hpp +++ b/mocks/include/mocks/EmptyApplication.hpp @@ -104,6 +104,12 @@ public: return nullptr; } + PubSub *getTwitchPubSub() override + { + assert(false && "getTwitchPubSub was called without being initialized"); + return nullptr; + } + Logging *getChatLogger() override { assert(!"getChatLogger was called without being initialized"); diff --git a/src/Application.cpp b/src/Application.cpp index 689a2e4f3..81b604cf0 100644 --- a/src/Application.cpp +++ b/src/Application.cpp @@ -87,6 +87,8 @@ ISoundController *makeSoundController(Settings &settings) } } +const QString TWITCH_PUBSUB_URL = "wss://pubsub-edge.twitch.tv"; + } // namespace namespace chatterino { @@ -128,6 +130,7 @@ Application::Application(Settings &_settings, Paths &_paths, const Args &_args) , userData(&this->emplace()) , sound(&this->emplace(makeSoundController(_settings))) , twitchLiveController(&this->emplace()) + , twitchPubSub(new PubSub(TWITCH_PUBSUB_URL)) , logging(new Logging(_settings)) #ifdef CHATTERINO_HAVE_PLUGINS , plugins(&this->emplace()) @@ -144,6 +147,11 @@ Application::Application(Settings &_settings, Paths &_paths, const Args &_args) Application::~Application() = default; +void Application::fakeDtor() +{ + this->twitchPubSub.reset(); +} + void Application::initialize(Settings &settings, Paths &paths) { assert(isAppInitialized == false); @@ -314,6 +322,11 @@ ITwitchIrcServer *Application::getTwitch() return this->twitch; } +PubSub *Application::getTwitchPubSub() +{ + return this->twitchPubSub.get(); +} + Logging *Application::getChatLogger() { return this->logging.get(); @@ -343,7 +356,7 @@ void Application::initPubSub() { // We can safely ignore these signal connections since the twitch object will always // be destroyed before the Application - std::ignore = this->twitch->pubsub->signals_.moderation.chatCleared.connect( + std::ignore = this->twitchPubSub->moderation.chatCleared.connect( [this](const auto &action) { auto chan = this->twitch->getChannelOrEmptyByID(action.roomID); if (chan->isEmpty()) @@ -360,7 +373,7 @@ void Application::initPubSub() }); }); - std::ignore = this->twitch->pubsub->signals_.moderation.modeChanged.connect( + std::ignore = this->twitchPubSub->moderation.modeChanged.connect( [this](const auto &action) { auto chan = this->twitch->getChannelOrEmptyByID(action.roomID); if (chan->isEmpty()) @@ -386,29 +399,28 @@ void Application::initPubSub() }); }); - std::ignore = - this->twitch->pubsub->signals_.moderation.moderationStateChanged - .connect([this](const auto &action) { - auto chan = this->twitch->getChannelOrEmptyByID(action.roomID); - if (chan->isEmpty()) - { - return; - } + std::ignore = this->twitchPubSub->moderation.moderationStateChanged.connect( + [this](const auto &action) { + auto chan = this->twitch->getChannelOrEmptyByID(action.roomID); + if (chan->isEmpty()) + { + return; + } - QString text; + QString text; - text = QString("%1 %2 %3.") - .arg(action.source.login, - (action.modded ? "modded" : "unmodded"), - action.target.login); + text = QString("%1 %2 %3.") + .arg(action.source.login, + (action.modded ? "modded" : "unmodded"), + action.target.login); - auto msg = makeSystemMessage(text); - postToThread([chan, msg] { - chan->addMessage(msg); - }); + auto msg = makeSystemMessage(text); + postToThread([chan, msg] { + chan->addMessage(msg); }); + }); - std::ignore = this->twitch->pubsub->signals_.moderation.userBanned.connect( + std::ignore = this->twitchPubSub->moderation.userBanned.connect( [&](const auto &action) { auto chan = this->twitch->getChannelOrEmptyByID(action.roomID); @@ -423,67 +435,65 @@ void Application::initPubSub() chan->addOrReplaceTimeout(msg.release()); }); }); - std::ignore = - this->twitch->pubsub->signals_.moderation.messageDeleted.connect( - [&](const auto &action) { - auto chan = this->twitch->getChannelOrEmptyByID(action.roomID); + std::ignore = this->twitchPubSub->moderation.messageDeleted.connect( + [&](const auto &action) { + auto chan = this->twitch->getChannelOrEmptyByID(action.roomID); - if (chan->isEmpty() || getSettings()->hideDeletionActions) + if (chan->isEmpty() || getSettings()->hideDeletionActions) + { + return; + } + + MessageBuilder msg; + TwitchMessageBuilder::deletionMessage(action, &msg); + msg->flags.set(MessageFlag::PubSub); + + postToThread([chan, msg = msg.release()] { + auto replaced = false; + LimitedQueueSnapshot snapshot = + chan->getMessageSnapshot(); + int snapshotLength = snapshot.size(); + + // without parens it doesn't build on windows + int end = (std::max)(0, snapshotLength - 200); + + for (int i = snapshotLength - 1; i >= end; --i) { - return; + auto &s = snapshot[i]; + if (!s->flags.has(MessageFlag::PubSub) && + s->timeoutUser == msg->timeoutUser) + { + chan->replaceMessage(s, msg); + replaced = true; + break; + } } - - MessageBuilder msg; - TwitchMessageBuilder::deletionMessage(action, &msg); - msg->flags.set(MessageFlag::PubSub); - - postToThread([chan, msg = msg.release()] { - auto replaced = false; - LimitedQueueSnapshot snapshot = - chan->getMessageSnapshot(); - int snapshotLength = snapshot.size(); - - // without parens it doesn't build on windows - int end = (std::max)(0, snapshotLength - 200); - - for (int i = snapshotLength - 1; i >= end; --i) - { - auto &s = snapshot[i]; - if (!s->flags.has(MessageFlag::PubSub) && - s->timeoutUser == msg->timeoutUser) - { - chan->replaceMessage(s, msg); - replaced = true; - break; - } - } - if (!replaced) - { - chan->addMessage(msg); - } - }); - }); - - std::ignore = - this->twitch->pubsub->signals_.moderation.userUnbanned.connect( - [&](const auto &action) { - auto chan = this->twitch->getChannelOrEmptyByID(action.roomID); - - if (chan->isEmpty()) + if (!replaced) { - return; - } - - auto msg = MessageBuilder(action).release(); - - postToThread([chan, msg] { chan->addMessage(msg); - }); + } }); + }); + + std::ignore = this->twitchPubSub->moderation.userUnbanned.connect( + [&](const auto &action) { + auto chan = this->twitch->getChannelOrEmptyByID(action.roomID); + + if (chan->isEmpty()) + { + return; + } + + auto msg = MessageBuilder(action).release(); + + postToThread([chan, msg] { + chan->addMessage(msg); + }); + }); std::ignore = - this->twitch->pubsub->signals_.moderation.suspiciousMessageReceived - .connect([&](const auto &action) { + this->twitchPubSub->moderation.suspiciousMessageReceived.connect( + [&](const auto &action) { if (action.treatment == PubSubLowTrustUsersMessage::Treatment::INVALID) { @@ -525,8 +535,8 @@ void Application::initPubSub() }); std::ignore = - this->twitch->pubsub->signals_.moderation.suspiciousTreatmentUpdated - .connect([&](const auto &action) { + this->twitchPubSub->moderation.suspiciousTreatmentUpdated.connect( + [&](const auto &action) { if (action.treatment == PubSubLowTrustUsersMessage::Treatment::INVALID) { @@ -562,170 +572,162 @@ void Application::initPubSub() }); }); - std::ignore = - this->twitch->pubsub->signals_.moderation.autoModMessageCaught.connect( - [&](const auto &msg, const QString &channelID) { - auto chan = this->twitch->getChannelOrEmptyByID(channelID); - if (chan->isEmpty()) - { - return; - } + std::ignore = this->twitchPubSub->moderation.autoModMessageCaught.connect( + [&](const auto &msg, const QString &channelID) { + auto chan = this->twitch->getChannelOrEmptyByID(channelID); + if (chan->isEmpty()) + { + return; + } - switch (msg.type) - { - case PubSubAutoModQueueMessage::Type:: - AutoModCaughtMessage: { - if (msg.status == "PENDING") + switch (msg.type) + { + case PubSubAutoModQueueMessage::Type::AutoModCaughtMessage: { + if (msg.status == "PENDING") + { + AutomodAction action(msg.data, channelID); + action.reason = QString("%1 level %2") + .arg(msg.contentCategory) + .arg(msg.contentLevel); + + action.msgID = msg.messageID; + action.message = msg.messageText; + + // this message also contains per-word automod data, which could be implemented + + // extract sender data manually because Twitch loves not being consistent + QString senderDisplayName = + msg.senderUserDisplayName; // Might be transformed later + bool hasLocalizedName = false; + if (!msg.senderUserDisplayName.isEmpty()) { - AutomodAction action(msg.data, channelID); - action.reason = QString("%1 level %2") - .arg(msg.contentCategory) - .arg(msg.contentLevel); - - action.msgID = msg.messageID; - action.message = msg.messageText; - - // this message also contains per-word automod data, which could be implemented - - // extract sender data manually because Twitch loves not being consistent - QString senderDisplayName = - msg.senderUserDisplayName; // Might be transformed later - bool hasLocalizedName = false; - if (!msg.senderUserDisplayName.isEmpty()) + // check for non-ascii display names + if (QString::compare(msg.senderUserDisplayName, + msg.senderUserLogin, + Qt::CaseInsensitive) != 0) { - // check for non-ascii display names - if (QString::compare(msg.senderUserDisplayName, - msg.senderUserLogin, - Qt::CaseInsensitive) != 0) - { - hasLocalizedName = true; - } + hasLocalizedName = true; } - QColor senderColor = msg.senderUserChatColor; - QString senderColor_; - if (!senderColor.isValid() && - getSettings()->colorizeNicknames) - { - // color may be not present if user is a grey-name - senderColor = getRandomColor(msg.senderUserID); - } - - // handle username style based on prefered setting - switch ( - getSettings()->usernameDisplayMode.getValue()) - { - case UsernameDisplayMode::Username: { - if (hasLocalizedName) - { - senderDisplayName = msg.senderUserLogin; - } - break; - } - case UsernameDisplayMode::LocalizedName: { - break; - } - case UsernameDisplayMode:: - UsernameAndLocalizedName: { - if (hasLocalizedName) - { - senderDisplayName = - QString("%1(%2)").arg( - msg.senderUserLogin, - msg.senderUserDisplayName); - } - break; - } - } - - action.target = ActionUser{ - msg.senderUserID, msg.senderUserLogin, - senderDisplayName, senderColor}; - postToThread([chan, action] { - const auto p = - TwitchMessageBuilder::makeAutomodMessage( - action, chan->getName()); - chan->addMessage(p.first); - chan->addMessage(p.second); - - getApp()->twitch->automodChannel->addMessage( - p.first); - getApp()->twitch->automodChannel->addMessage( - p.second); - }); } - // "ALLOWED" and "DENIED" statuses remain unimplemented - // They are versions of automod_message_(denied|approved) but for mods. + QColor senderColor = msg.senderUserChatColor; + QString senderColor_; + if (!senderColor.isValid() && + getSettings()->colorizeNicknames) + { + // color may be not present if user is a grey-name + senderColor = getRandomColor(msg.senderUserID); + } + + // handle username style based on prefered setting + switch (getSettings()->usernameDisplayMode.getValue()) + { + case UsernameDisplayMode::Username: { + if (hasLocalizedName) + { + senderDisplayName = msg.senderUserLogin; + } + break; + } + case UsernameDisplayMode::LocalizedName: { + break; + } + case UsernameDisplayMode:: + UsernameAndLocalizedName: { + if (hasLocalizedName) + { + senderDisplayName = QString("%1(%2)").arg( + msg.senderUserLogin, + msg.senderUserDisplayName); + } + break; + } + } + + action.target = + ActionUser{msg.senderUserID, msg.senderUserLogin, + senderDisplayName, senderColor}; + postToThread([chan, action] { + const auto p = + TwitchMessageBuilder::makeAutomodMessage( + action, chan->getName()); + chan->addMessage(p.first); + chan->addMessage(p.second); + + getApp()->twitch->automodChannel->addMessage( + p.first); + getApp()->twitch->automodChannel->addMessage( + p.second); + }); } - break; - - case PubSubAutoModQueueMessage::Type::INVALID: - default: { - } - break; + // "ALLOWED" and "DENIED" statuses remain unimplemented + // They are versions of automod_message_(denied|approved) but for mods. } + break; + + case PubSubAutoModQueueMessage::Type::INVALID: + default: { + } + break; + } + }); + + std::ignore = this->twitchPubSub->moderation.autoModMessageBlocked.connect( + [&](const auto &action) { + auto chan = this->twitch->getChannelOrEmptyByID(action.roomID); + if (chan->isEmpty()) + { + return; + } + + postToThread([chan, action] { + const auto p = TwitchMessageBuilder::makeAutomodMessage( + action, chan->getName()); + chan->addMessage(p.first); + chan->addMessage(p.second); }); + }); + + std::ignore = this->twitchPubSub->moderation.automodUserMessage.connect( + [&](const auto &action) { + // This condition has been set up to execute isInStreamerMode() as the last thing + // as it could end up being expensive. + if (getSettings()->streamerModeHideModActions && isInStreamerMode()) + { + return; + } + auto chan = this->twitch->getChannelOrEmptyByID(action.roomID); + + if (chan->isEmpty()) + { + return; + } + + auto msg = MessageBuilder(action).release(); + + postToThread([chan, msg] { + chan->addMessage(msg); + }); + chan->deleteMessage(msg->id); + }); + + std::ignore = this->twitchPubSub->moderation.automodInfoMessage.connect( + [&](const auto &action) { + auto chan = this->twitch->getChannelOrEmptyByID(action.roomID); + + if (chan->isEmpty()) + { + return; + } + + postToThread([chan, action] { + const auto p = + TwitchMessageBuilder::makeAutomodInfoMessage(action); + chan->addMessage(p); + }); + }); std::ignore = - this->twitch->pubsub->signals_.moderation.autoModMessageBlocked.connect( - [&](const auto &action) { - auto chan = this->twitch->getChannelOrEmptyByID(action.roomID); - if (chan->isEmpty()) - { - return; - } - - postToThread([chan, action] { - const auto p = TwitchMessageBuilder::makeAutomodMessage( - action, chan->getName()); - chan->addMessage(p.first); - chan->addMessage(p.second); - }); - }); - - std::ignore = - this->twitch->pubsub->signals_.moderation.automodUserMessage.connect( - [&](const auto &action) { - // This condition has been set up to execute isInStreamerMode() as the last thing - // as it could end up being expensive. - if (getSettings()->streamerModeHideModActions && - isInStreamerMode()) - { - return; - } - auto chan = this->twitch->getChannelOrEmptyByID(action.roomID); - - if (chan->isEmpty()) - { - return; - } - - auto msg = MessageBuilder(action).release(); - - postToThread([chan, msg] { - chan->addMessage(msg); - }); - chan->deleteMessage(msg->id); - }); - - std::ignore = - this->twitch->pubsub->signals_.moderation.automodInfoMessage.connect( - [&](const auto &action) { - auto chan = this->twitch->getChannelOrEmptyByID(action.roomID); - - if (chan->isEmpty()) - { - return; - } - - postToThread([chan, action] { - const auto p = - TwitchMessageBuilder::makeAutomodInfoMessage(action); - chan->addMessage(p); - }); - }); - - std::ignore = this->twitch->pubsub->signals_.pointReward.redeemed.connect( - [&](auto &data) { + this->twitchPubSub->pointReward.redeemed.connect([&](auto &data) { QString channelId = data.value("channel_id").toString(); if (channelId.isEmpty()) { @@ -746,29 +748,19 @@ void Application::initPubSub() }); }); - this->twitch->pubsub->start(); - - auto RequestModerationActions = [this]() { - this->twitch->pubsub->setAccount( - getApp()->accounts->twitch.getCurrent()); - // TODO(pajlada): Unlisten to all authed topics instead of only - // moderation topics this->twitch->pubsub->UnlistenAllAuthedTopics(); - - this->twitch->pubsub->listenToWhispers(); - }; + this->twitchPubSub->start(); + this->twitchPubSub->setAccount(this->accounts->twitch.getCurrent()); this->accounts->twitch.currentUserChanged.connect( [this] { - this->twitch->pubsub->unlistenAllModerationActions(); - this->twitch->pubsub->unlistenAutomod(); - this->twitch->pubsub->unlistenLowTrustUsers(); - this->twitch->pubsub->unlistenWhispers(); + this->twitchPubSub->unlistenChannelModerationActions(); + this->twitchPubSub->unlistenAutomod(); + this->twitchPubSub->unlistenLowTrustUsers(); + this->twitchPubSub->unlistenChannelPointRewards(); + + this->twitchPubSub->setAccount(this->accounts->twitch.getCurrent()); }, boost::signals2::at_front); - - this->accounts->twitch.currentUserChanged.connect(RequestModerationActions); - - RequestModerationActions(); } void Application::initBttvLiveUpdates() diff --git a/src/Application.hpp b/src/Application.hpp index 249f2acd0..fb84d7f50 100644 --- a/src/Application.hpp +++ b/src/Application.hpp @@ -68,6 +68,7 @@ public: virtual HighlightController *getHighlights() = 0; virtual NotificationController *getNotifications() = 0; virtual ITwitchIrcServer *getTwitch() = 0; + virtual PubSub *getTwitchPubSub() = 0; virtual Logging *getChatLogger() = 0; virtual ChatterinoBadges *getChatterinoBadges() = 0; virtual FfzBadges *getFfzBadges() = 0; @@ -97,6 +98,12 @@ public: Application &operator=(const Application &) = delete; Application &operator=(Application &&) = delete; + /** + * In the interim, before we remove _exit(0); from RunGui.cpp, + * this will destroy things we know can be destroyed + */ + void fakeDtor(); + void initialize(Settings &settings, Paths &paths); void load(); void save(); @@ -128,6 +135,7 @@ public: private: TwitchLiveController *const twitchLiveController{}; + std::unique_ptr twitchPubSub; const std::unique_ptr logging; public: @@ -181,6 +189,7 @@ public: return this->highlights; } ITwitchIrcServer *getTwitch() override; + PubSub *getTwitchPubSub() override; Logging *getChatLogger() override; ChatterinoBadges *getChatterinoBadges() override { diff --git a/src/RunGui.cpp b/src/RunGui.cpp index 39bed5594..994115961 100644 --- a/src/RunGui.cpp +++ b/src/RunGui.cpp @@ -288,6 +288,8 @@ void runGui(QApplication &a, Paths &paths, Settings &settings, const Args &args) flushClipboard(); #endif + app.fakeDtor(); + _exit(0); } } // namespace chatterino diff --git a/src/providers/twitch/PubSubClient.cpp b/src/providers/twitch/PubSubClient.cpp index 79232ef9c..cd68f4d91 100644 --- a/src/providers/twitch/PubSubClient.cpp +++ b/src/providers/twitch/PubSubClient.cpp @@ -22,6 +22,8 @@ PubSubClient::PubSubClient(WebsocketClient &websocketClient, const PubSubClientOptions &clientOptions) : websocketClient_(websocketClient) , handle_(handle) + , heartbeatTimer_(std::make_shared( + this->websocketClient_.get_io_service())) , clientOptions_(clientOptions) { } @@ -40,6 +42,7 @@ void PubSubClient::stop() assert(this->started_); this->started_ = false; + this->heartbeatTimer_->cancel(); } void PubSubClient::close(const std::string &reason, @@ -187,8 +190,8 @@ void PubSubClient::ping() auto self = this->shared_from_this(); - runAfter(this->websocketClient_.get_io_service(), - this->clientOptions_.pingInterval_, [self](auto timer) { + runAfter(this->heartbeatTimer_, this->clientOptions_.pingInterval_, + [self](auto timer) { if (!self->started_) { return; diff --git a/src/providers/twitch/PubSubClient.hpp b/src/providers/twitch/PubSubClient.hpp index ee0e40c1a..1b4ed0d26 100644 --- a/src/providers/twitch/PubSubClient.hpp +++ b/src/providers/twitch/PubSubClient.hpp @@ -70,6 +70,7 @@ private: std::atomic awaitingPong_{false}; std::atomic started_{false}; + std::shared_ptr heartbeatTimer_; const PubSubClientOptions &clientOptions_; }; diff --git a/src/providers/twitch/PubSubManager.cpp b/src/providers/twitch/PubSubManager.cpp index 1dbe41392..7f75fd8c2 100644 --- a/src/providers/twitch/PubSubManager.cpp +++ b/src/providers/twitch/PubSubManager.cpp @@ -17,12 +17,14 @@ #include #include +#include #include #include using websocketpp::lib::bind; using websocketpp::lib::placeholders::_1; using websocketpp::lib::placeholders::_2; +using namespace std::chrono_literals; namespace chatterino { @@ -36,7 +38,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval) const auto &roomID) { ClearChatAction action(data, roomID); - this->signals_.moderation.chatCleared.invoke(action); + this->moderation.chatCleared.invoke(action); }; this->moderationActionHandlers["slowoff"] = [this](const auto &data, @@ -46,7 +48,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval) action.mode = ModeChangedAction::Mode::Slow; action.state = ModeChangedAction::State::Off; - this->signals_.moderation.modeChanged.invoke(action); + this->moderation.modeChanged.invoke(action); }; this->moderationActionHandlers["slow"] = [this](const auto &data, @@ -69,7 +71,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval) action.duration = args.at(0).toString().toUInt(&ok, 10); - this->signals_.moderation.modeChanged.invoke(action); + this->moderation.modeChanged.invoke(action); }; this->moderationActionHandlers["r9kbetaoff"] = [this](const auto &data, @@ -79,7 +81,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval) action.mode = ModeChangedAction::Mode::R9K; action.state = ModeChangedAction::State::Off; - this->signals_.moderation.modeChanged.invoke(action); + this->moderation.modeChanged.invoke(action); }; this->moderationActionHandlers["r9kbeta"] = [this](const auto &data, @@ -89,7 +91,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval) action.mode = ModeChangedAction::Mode::R9K; action.state = ModeChangedAction::State::On; - this->signals_.moderation.modeChanged.invoke(action); + this->moderation.modeChanged.invoke(action); }; this->moderationActionHandlers["subscribersoff"] = @@ -99,7 +101,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval) action.mode = ModeChangedAction::Mode::SubscribersOnly; action.state = ModeChangedAction::State::Off; - this->signals_.moderation.modeChanged.invoke(action); + this->moderation.modeChanged.invoke(action); }; this->moderationActionHandlers["subscribers"] = [this](const auto &data, @@ -109,7 +111,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval) action.mode = ModeChangedAction::Mode::SubscribersOnly; action.state = ModeChangedAction::State::On; - this->signals_.moderation.modeChanged.invoke(action); + this->moderation.modeChanged.invoke(action); }; this->moderationActionHandlers["emoteonlyoff"] = @@ -119,7 +121,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval) action.mode = ModeChangedAction::Mode::EmoteOnly; action.state = ModeChangedAction::State::Off; - this->signals_.moderation.modeChanged.invoke(action); + this->moderation.modeChanged.invoke(action); }; this->moderationActionHandlers["emoteonly"] = [this](const auto &data, @@ -129,7 +131,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval) action.mode = ModeChangedAction::Mode::EmoteOnly; action.state = ModeChangedAction::State::On; - this->signals_.moderation.modeChanged.invoke(action); + this->moderation.modeChanged.invoke(action); }; this->moderationActionHandlers["unmod"] = [this](const auto &data, @@ -149,7 +151,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval) action.modded = false; - this->signals_.moderation.moderationStateChanged.invoke(action); + this->moderation.moderationStateChanged.invoke(action); }; this->moderationActionHandlers["mod"] = [this](const auto &data, @@ -167,7 +169,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval) action.target.id = data.value("target_user_id").toString(); action.target.login = data.value("target_user_login").toString(); - this->signals_.moderation.moderationStateChanged.invoke(action); + this->moderation.moderationStateChanged.invoke(action); }; this->moderationActionHandlers["timeout"] = [this](const auto &data, @@ -191,7 +193,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval) action.duration = args[1].toString().toUInt(&ok, 10); action.reason = args[2].toString(); // May be omitted - this->signals_.moderation.userBanned.invoke(action); + this->moderation.userBanned.invoke(action); }; this->moderationActionHandlers["delete"] = [this](const auto &data, @@ -214,7 +216,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval) action.messageText = args[1].toString(); action.messageId = args[2].toString(); - this->signals_.moderation.messageDeleted.invoke(action); + this->moderation.messageDeleted.invoke(action); }; this->moderationActionHandlers["ban"] = [this](const auto &data, @@ -236,7 +238,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval) action.target.login = args[0].toString(); action.reason = args[1].toString(); // May be omitted - this->signals_.moderation.userBanned.invoke(action); + this->moderation.userBanned.invoke(action); }; this->moderationActionHandlers["unban"] = [this](const auto &data, @@ -259,7 +261,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval) action.target.login = args[0].toString(); - this->signals_.moderation.userUnbanned.invoke(action); + this->moderation.userUnbanned.invoke(action); }; this->moderationActionHandlers["untimeout"] = [this](const auto &data, @@ -282,7 +284,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval) action.target.login = args[0].toString(); - this->signals_.moderation.userUnbanned.invoke(action); + this->moderation.userUnbanned.invoke(action); }; /* @@ -315,7 +317,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval) action.message = args[1].toString(); // May be omitted action.reason = args[2].toString(); // May be omitted - this->signals_.moderation.autoModMessageBlocked.invoke(action); + this->moderation.autoModMessageBlocked.invoke(action); }; */ @@ -323,21 +325,21 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval) [this](const auto &data, const auto &roomID) { AutomodInfoAction action(data, roomID); action.type = AutomodInfoAction::OnHold; - this->signals_.moderation.automodInfoMessage.invoke(action); + this->moderation.automodInfoMessage.invoke(action); }; this->moderationActionHandlers["automod_message_denied"] = [this](const auto &data, const auto &roomID) { AutomodInfoAction action(data, roomID); action.type = AutomodInfoAction::Denied; - this->signals_.moderation.automodInfoMessage.invoke(action); + this->moderation.automodInfoMessage.invoke(action); }; this->moderationActionHandlers["automod_message_approved"] = [this](const auto &data, const auto &roomID) { AutomodInfoAction action(data, roomID); action.type = AutomodInfoAction::Approved; - this->signals_.moderation.automodInfoMessage.invoke(action); + this->moderation.automodInfoMessage.invoke(action); }; this->channelTermsActionHandlers["add_permitted_term"] = @@ -351,7 +353,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval) action.message = data.value("text").toString(); action.source.login = data.value("requester_login").toString(); - this->signals_.moderation.automodUserMessage.invoke(action); + this->moderation.automodUserMessage.invoke(action); }; this->channelTermsActionHandlers["add_blocked_term"] = @@ -365,7 +367,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval) action.message = data.value("text").toString(); action.source.login = data.value("requester_login").toString(); - this->signals_.moderation.automodUserMessage.invoke(action); + this->moderation.automodUserMessage.invoke(action); }; this->moderationActionHandlers["delete_permitted_term"] = @@ -385,7 +387,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval) action.message = args[0].toString(); - this->signals_.moderation.automodUserMessage.invoke(action); + this->moderation.automodUserMessage.invoke(action); }; this->channelTermsActionHandlers["delete_permitted_term"] = @@ -399,7 +401,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval) action.message = data.value("text").toString(); action.source.login = data.value("requester_login").toString(); - this->signals_.moderation.automodUserMessage.invoke(action); + this->moderation.automodUserMessage.invoke(action); }; this->moderationActionHandlers["delete_blocked_term"] = @@ -420,7 +422,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval) action.message = args[0].toString(); - this->signals_.moderation.automodUserMessage.invoke(action); + this->moderation.automodUserMessage.invoke(action); }; this->channelTermsActionHandlers["delete_blocked_term"] = [this](const auto &data, const auto &roomID) { @@ -434,21 +436,9 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval) action.message = data.value("text").toString(); action.source.login = data.value("requester_login").toString(); - this->signals_.moderation.automodUserMessage.invoke(action); + this->moderation.automodUserMessage.invoke(action); }; - // We don't get this one anymore or anything similiar - // We need some new topic so we can listen - // - //this->moderationActionHandlers["modified_automod_properties"] = - // [this](const auto &data, const auto &roomID) { - // // The automod settings got modified - // AutomodUserAction action(data, roomID); - // getCreatedByUser(data, action.source); - // action.type = AutomodUserAction::Properties; - // this->signals_.moderation.automodUserMessage.invoke(action); - // }; - this->moderationActionHandlers["denied_automod_message"] = [](const auto &data, const auto &roomID) { // This message got denied by a moderator @@ -482,18 +472,17 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval) bind(&PubSub::onConnectionFail, this, ::_1)); } +PubSub::~PubSub() +{ + this->stop(); +} + void PubSub::setAccount(std::shared_ptr account) { this->token_ = account->getOAuthToken(); this->userID_ = account->getUserId(); } -void PubSub::setAccountData(QString token, QString userID) -{ - this->token_ = token; - this->userID_ = userID; -} - void PubSub::addClient() { if (this->addingClient) @@ -525,104 +514,43 @@ void PubSub::start() { this->work = std::make_shared( this->websocketClient.get_io_service()); - this->mainThread.reset( - new std::thread(std::bind(&PubSub::runThread, this))); + this->thread.reset(new std::thread(std::bind(&PubSub::runThread, this))); } void PubSub::stop() { this->stopping_ = true; - for (const auto &client : this->clients) + for (const auto &[hdl, client] : this->clients) { - client.second->close("Shutting down"); + (void)hdl; + + client->close("Shutting down"); } this->work.reset(); - if (this->mainThread->joinable()) + if (this->thread->joinable()) { - this->mainThread->join(); + // NOTE: We spawn a new thread to join the websocket thread. + // There is a case where a new client was initiated but not added to the clients list. + // We just don't join the thread & let the operating system nuke the thread if joining fails + // within 1s. + // We could fix the underlying bug, but this is easier & we realistically won't use this exact code + // for super much longer. + auto joiner = std::async(std::launch::async, &std::thread::join, + this->thread.get()); + if (joiner.wait_for(1s) == std::future_status::timeout) + { + qCWarning(chatterinoPubSub) + << "Thread didn't join within 1 second, rip it out"; + this->websocketClient.stop(); + } } assert(this->clients.empty()); } -void PubSub::unlistenAllModerationActions() -{ - for (const auto &p : this->clients) - { - const auto &client = p.second; - if (const auto &[topics, nonce] = - client->unlistenPrefix("chat_moderator_actions."); - !topics.empty()) - { - this->registerNonce(nonce, { - client, - "UNLISTEN", - topics, - topics.size(), - }); - } - } -} - -void PubSub::unlistenAutomod() -{ - for (const auto &p : this->clients) - { - const auto &client = p.second; - if (const auto &[topics, nonce] = - client->unlistenPrefix("automod-queue."); - !topics.empty()) - { - this->registerNonce(nonce, { - client, - "UNLISTEN", - topics, - topics.size(), - }); - } - } -} - -void PubSub::unlistenLowTrustUsers() -{ - for (const auto &p : this->clients) - { - const auto &client = p.second; - if (const auto &[topics, nonce] = - client->unlistenPrefix("low-trust-users."); - !topics.empty()) - { - this->registerNonce(nonce, { - client, - "UNLISTEN", - topics, - topics.size(), - }); - } - } -} - -void PubSub::unlistenWhispers() -{ - for (const auto &p : this->clients) - { - const auto &client = p.second; - if (const auto &[topics, nonce] = client->unlistenPrefix("whispers."); - !topics.empty()) - { - this->registerNonce(nonce, { - client, - "UNLISTEN", - topics, - topics.size(), - }); - } - } -} - bool PubSub::listenToWhispers() { if (this->userID_.isEmpty()) @@ -642,6 +570,11 @@ bool PubSub::listenToWhispers() return true; } +void PubSub::unlistenWhispers() +{ + this->unlistenPrefix("whispers."); +} + void PubSub::listenToChannelModerationActions(const QString &channelID) { if (this->userID_.isEmpty()) @@ -666,6 +599,11 @@ void PubSub::listenToChannelModerationActions(const QString &channelID) this->listenToTopic(topic); } +void PubSub::unlistenChannelModerationActions() +{ + this->unlistenPrefix("chat_moderator_actions."); +} + void PubSub::listenToAutomod(const QString &channelID) { if (this->userID_.isEmpty()) @@ -690,6 +628,11 @@ void PubSub::listenToAutomod(const QString &channelID) this->listenToTopic(topic); } +void PubSub::unlistenAutomod() +{ + this->unlistenPrefix("automod-queue."); +} + void PubSub::listenToLowTrustUsers(const QString &channelID) { if (this->userID_.isEmpty()) @@ -714,6 +657,11 @@ void PubSub::listenToLowTrustUsers(const QString &channelID) this->listenToTopic(topic); } +void PubSub::unlistenLowTrustUsers() +{ + this->unlistenPrefix("low-trust-users."); +} + void PubSub::listenToChannelPointRewards(const QString &channelID) { static const QString topicFormat("community-points-channel-v1.%1"); @@ -730,6 +678,30 @@ void PubSub::listenToChannelPointRewards(const QString &channelID) this->listenToTopic(topic); } +void PubSub::unlistenChannelPointRewards() +{ + this->unlistenPrefix("community-points-channel-v1."); +} + +void PubSub::unlistenPrefix(const QString &prefix) +{ + for (const auto &p : this->clients) + { + const auto &client = p.second; + if (const auto &[topics, nonce] = client->unlistenPrefix(prefix); + !topics.empty()) + { + NonceInfo nonceInfo{ + client, + "UNLISTEN", + topics, + topics.size(), + }; + this->registerNonce(nonce, nonceInfo); + } + } +} + void PubSub::listen(PubSubListenMessage msg) { if (this->tryListen(msg)) @@ -1083,11 +1055,11 @@ void PubSub::handleMessageResponse(const PubSubMessageMessage &message) switch (whisperMessage.type) { case PubSubWhisperMessage::Type::WhisperReceived: { - this->signals_.whisper.received.invoke(whisperMessage); + this->whisper.received.invoke(whisperMessage); } break; case PubSubWhisperMessage::Type::WhisperSent: { - this->signals_.whisper.sent.invoke(whisperMessage); + this->whisper.sent.invoke(whisperMessage); } break; case PubSubWhisperMessage::Type::Thread: { @@ -1182,7 +1154,7 @@ void PubSub::handleMessageResponse(const PubSubMessageMessage &message) case PubSubCommunityPointsChannelV1Message::Type::RewardRedeemed: { auto redemption = innerMessage.data.value("redemption").toObject(); - this->signals_.pointReward.redeemed.invoke(redemption); + this->pointReward.redeemed.invoke(redemption); } break; @@ -1210,8 +1182,7 @@ void PubSub::handleMessageResponse(const PubSubMessageMessage &message) // Channel ID where the moderator actions are coming from auto channelID = topicParts[2]; - this->signals_.moderation.autoModMessageCaught.invoke(innerMessage, - channelID); + this->moderation.autoModMessageCaught.invoke(innerMessage, channelID); } else if (topic.startsWith("low-trust-users.")) { @@ -1226,13 +1197,12 @@ void PubSub::handleMessageResponse(const PubSubMessageMessage &message) switch (innerMessage.type) { case PubSubLowTrustUsersMessage::Type::UserMessage: { - this->signals_.moderation.suspiciousMessageReceived.invoke( - innerMessage); + this->moderation.suspiciousMessageReceived.invoke(innerMessage); } break; case PubSubLowTrustUsersMessage::Type::TreatmentUpdate: { - this->signals_.moderation.suspiciousTreatmentUpdated.invoke( + this->moderation.suspiciousTreatmentUpdated.invoke( innerMessage); } break; diff --git a/src/providers/twitch/PubSubManager.hpp b/src/providers/twitch/PubSubManager.hpp index f0701101f..6ddd98369 100644 --- a/src/providers/twitch/PubSubManager.hpp +++ b/src/providers/twitch/PubSubManager.hpp @@ -41,6 +41,13 @@ struct PubSubListenMessage; struct PubSubMessage; struct PubSubMessageMessage; +/** + * This handles the Twitch PubSub connection + * + * Known issues: + * - Upon closing a channel, we don't unsubscribe to its pubsub connections + * - Stop is never called, meaning we never do a clean shutdown + */ class PubSub { using WebsocketMessagePtr = @@ -60,75 +67,62 @@ class PubSub }; WebsocketClient websocketClient; - std::unique_ptr mainThread; + std::unique_ptr thread; // Account credentials - // Set from setAccount or setAccountData + // Set from setAccount QString token_; QString userID_; public: PubSub(const QString &host, std::chrono::seconds pingInterval = std::chrono::seconds(15)); + ~PubSub(); + + PubSub(const PubSub &) = delete; + PubSub(PubSub &&) = delete; + PubSub &operator=(const PubSub &) = delete; + PubSub &operator=(PubSub &&) = delete; void setAccount(std::shared_ptr account); - void setAccountData(QString token, QString userID); - - enum class State { - Connected, - Disconnected, - }; - void start(); void stop(); - bool isConnected() const - { - return this->state == State::Connected; - } + struct { + Signal chatCleared; + Signal messageDeleted; + Signal modeChanged; + Signal moderationStateChanged; + + Signal userBanned; + Signal userUnbanned; + + Signal suspiciousMessageReceived; + Signal suspiciousTreatmentUpdated; + + // Message caught by automod + // channelID + pajlada::Signals::Signal + autoModMessageCaught; + + // Message blocked by moderator + Signal autoModMessageBlocked; + + Signal automodUserMessage; + Signal automodInfoMessage; + } moderation; struct { - struct { - Signal chatCleared; - Signal messageDeleted; - Signal modeChanged; - Signal moderationStateChanged; + // Parsing should be done in PubSubManager as well, + // but for now we just send the raw data + Signal received; + Signal sent; + } whisper; - Signal userBanned; - Signal userUnbanned; - - Signal suspiciousMessageReceived; - Signal suspiciousTreatmentUpdated; - - // Message caught by automod - // channelID - pajlada::Signals::Signal - autoModMessageCaught; - - // Message blocked by moderator - Signal autoModMessageBlocked; - - Signal automodUserMessage; - Signal automodInfoMessage; - } moderation; - - struct { - // Parsing should be done in PubSubManager as well, - // but for now we just send the raw data - Signal received; - Signal sent; - } whisper; - - struct { - Signal redeemed; - } pointReward; - } signals_; - - void unlistenAllModerationActions(); - void unlistenAutomod(); - void unlistenLowTrustUsers(); - void unlistenWhispers(); + struct { + Signal redeemed; + } pointReward; /** * Listen to incoming whispers for the currently logged in user. @@ -137,6 +131,7 @@ public: * PubSub topic: whispers.{currentUserID} */ bool listenToWhispers(); + void unlistenWhispers(); /** * Listen to moderation actions in the given channel. @@ -150,6 +145,7 @@ public: * PubSub topic: chat_moderator_actions.{currentUserID}.{channelID} */ void listenToChannelModerationActions(const QString &channelID); + void unlistenChannelModerationActions(); /** * Listen to Automod events in the given channel. @@ -160,6 +156,7 @@ public: * PubSub topic: automod-queue.{currentUserID}.{channelID} */ void listenToAutomod(const QString &channelID); + void unlistenAutomod(); /** * Listen to Low Trust events in the given channel. @@ -170,6 +167,7 @@ public: * PubSub topic: low-trust-users.{currentUserID}.{channelID} */ void listenToLowTrustUsers(const QString &channelID); + void unlistenLowTrustUsers(); /** * Listen to incoming channel point redemptions in the given channel. @@ -178,8 +176,7 @@ public: * PubSub topic: community-points-channel-v1.{channelID} */ void listenToChannelPointRewards(const QString &channelID); - - std::vector requests; + void unlistenChannelPointRewards(); struct { std::atomic connectionsClosed{0}; @@ -192,20 +189,26 @@ public: std::atomic unlistenResponses{0}; } diag; +private: + /** + * Unlistens to all topics matching the prefix in all clients + */ + void unlistenPrefix(const QString &prefix); + void listenToTopic(const QString &topic); -private: void listen(PubSubListenMessage msg); bool tryListen(PubSubListenMessage msg); bool isListeningToTopic(const QString &topic); void addClient(); + + std::vector requests; + std::atomic addingClient{false}; ExponentialBackoff<5> connectBackoff{std::chrono::milliseconds(1000)}; - State state = State::Connected; - std::map, std::owner_less> clients; diff --git a/src/providers/twitch/TwitchChannel.cpp b/src/providers/twitch/TwitchChannel.cpp index 8ee4ebbaf..2db85bd93 100644 --- a/src/providers/twitch/TwitchChannel.cpp +++ b/src/providers/twitch/TwitchChannel.cpp @@ -1250,15 +1250,15 @@ void TwitchChannel::refreshPubSub() auto currentAccount = getApp()->accounts->twitch.getCurrent(); - getApp()->twitch->pubsub->setAccount(currentAccount); + getIApp()->getTwitchPubSub()->setAccount(currentAccount); - getApp()->twitch->pubsub->listenToChannelModerationActions(roomId); + getIApp()->getTwitchPubSub()->listenToChannelModerationActions(roomId); if (this->hasModRights()) { - getApp()->twitch->pubsub->listenToAutomod(roomId); - getApp()->twitch->pubsub->listenToLowTrustUsers(roomId); + getIApp()->getTwitchPubSub()->listenToAutomod(roomId); + getIApp()->getTwitchPubSub()->listenToLowTrustUsers(roomId); } - getApp()->twitch->pubsub->listenToChannelPointRewards(roomId); + getIApp()->getTwitchPubSub()->listenToChannelPointRewards(roomId); } void TwitchChannel::refreshChatters() diff --git a/src/providers/twitch/TwitchIrcServer.cpp b/src/providers/twitch/TwitchIrcServer.cpp index 9602ce976..19f52f466 100644 --- a/src/providers/twitch/TwitchIrcServer.cpp +++ b/src/providers/twitch/TwitchIrcServer.cpp @@ -1,4 +1,4 @@ -#include "TwitchIrcServer.hpp" +#include "providers/twitch/TwitchIrcServer.hpp" #include "Application.hpp" #include "common/Channel.hpp" @@ -13,7 +13,6 @@ #include "providers/twitch/api/Helix.hpp" #include "providers/twitch/ChannelPointReward.hpp" #include "providers/twitch/IrcMessageHandler.hpp" -#include "providers/twitch/PubSubManager.hpp" #include "providers/twitch/TwitchAccount.hpp" #include "providers/twitch/TwitchChannel.hpp" #include "singletons/Settings.hpp" @@ -29,7 +28,6 @@ using namespace std::chrono_literals; namespace { -const QString TWITCH_PUBSUB_URL = "wss://pubsub-edge.twitch.tv"; const QString BTTV_LIVE_UPDATES_URL = "wss://sockets.betterttv.net/ws"; const QString SEVENTV_EVENTAPI_URL = "wss://events.7tv.io/v3"; @@ -43,7 +41,6 @@ TwitchIrcServer::TwitchIrcServer() , liveChannel(new Channel("/live", Channel::Type::TwitchLive)) , automodChannel(new Channel("/automod", Channel::Type::TwitchAutomod)) , watchingChannel(Channel::getEmpty(), Channel::Type::TwitchWatching) - , pubsub(new PubSub(TWITCH_PUBSUB_URL)) { this->initializeIrc(); @@ -72,7 +69,6 @@ void TwitchIrcServer::initialize(Settings &settings, Paths &paths) getApp()->accounts->twitch.currentUserChanged.connect([this]() { postToThread([this] { this->connect(); - this->pubsub->setAccount(getApp()->accounts->twitch.getCurrent()); }); }); diff --git a/src/providers/twitch/TwitchIrcServer.hpp b/src/providers/twitch/TwitchIrcServer.hpp index f7f047de7..8ddb713a4 100644 --- a/src/providers/twitch/TwitchIrcServer.hpp +++ b/src/providers/twitch/TwitchIrcServer.hpp @@ -18,7 +18,6 @@ namespace chatterino { class Settings; class Paths; -class PubSub; class TwitchChannel; class BttvLiveUpdates; class SeventvEventAPI; @@ -80,8 +79,6 @@ public: const ChannelPtr automodChannel; IndirectChannel watchingChannel; - // NOTE: We currently leak this - PubSub *pubsub; std::unique_ptr bttvLiveUpdates; std::unique_ptr seventvEventAPI; diff --git a/src/widgets/Window.cpp b/src/widgets/Window.cpp index fa4b6c8a6..3df75dca5 100644 --- a/src/widgets/Window.cpp +++ b/src/widgets/Window.cpp @@ -281,7 +281,7 @@ void Window::addDebugStuff(HotkeyController::HotkeyMap &actions) ->toInner(); app->twitch->addFakeMessage(getSampleChannelRewardIRCMessage()); - app->twitch->pubsub->signals_.pointReward.redeemed.invoke( + getIApp()->getTwitchPubSub()->pointReward.redeemed.invoke( oInnerMessage->data.value("redemption").toObject()); alt = !alt; } @@ -292,7 +292,7 @@ void Window::addDebugStuff(HotkeyController::HotkeyMap &actions) auto oInnerMessage = oMessage->toInner() ->toInner(); - app->twitch->pubsub->signals_.pointReward.redeemed.invoke( + getIApp()->getTwitchPubSub()->pointReward.redeemed.invoke( oInnerMessage->data.value("redemption").toObject()); alt = !alt; } diff --git a/tests/src/TwitchPubSubClient.cpp b/tests/src/TwitchPubSubClient.cpp index 6e1a12897..30e02e567 100644 --- a/tests/src/TwitchPubSubClient.cpp +++ b/tests/src/TwitchPubSubClient.cpp @@ -3,6 +3,7 @@ #include "providers/twitch/PubSubManager.hpp" #include "providers/twitch/pubsubmessages/AutoMod.hpp" #include "providers/twitch/pubsubmessages/Whisper.hpp" +#include "providers/twitch/TwitchAccount.hpp" #include "TestHelpers.hpp" #include @@ -35,9 +36,13 @@ using namespace std::chrono_literals; class FTest : public PubSub { public: - explicit FTest(const char *path, std::chrono::seconds pingInterval) + explicit FTest(const char *path, std::chrono::seconds pingInterval, + QString token = "token") : PubSub(QString("wss://127.0.0.1:9050%1").arg(path), pingInterval) { + auto account = std::make_shared("testaccount_420", token, + "clientid", "123456"); + this->setAccount(account); } }; @@ -45,7 +50,6 @@ TEST(TwitchPubSubClient, ServerRespondsToPings) { FTest pubSub("", 1s); - pubSub.setAccountData("token", "123456"); pubSub.start(); std::this_thread::sleep_for(50ms); @@ -55,7 +59,7 @@ TEST(TwitchPubSubClient, ServerRespondsToPings) ASSERT_EQ(pubSub.diag.connectionsFailed, 0); ASSERT_EQ(pubSub.diag.messagesReceived, 0); - pubSub.listenToTopic("test"); + pubSub.listenToChannelModerationActions("123456"); std::this_thread::sleep_for(150ms); @@ -85,9 +89,8 @@ TEST(TwitchPubSubClient, ServerDoesntRespondToPings) { FTest pubSub("/dont-respond-to-ping", 1s); - pubSub.setAccountData("token", "123456"); pubSub.start(); - pubSub.listenToTopic("test"); + pubSub.listenToChannelModerationActions("123456"); std::this_thread::sleep_for(750ms); @@ -115,7 +118,6 @@ TEST(TwitchPubSubClient, DisconnectedAfter1s) { FTest pubSub("/disconnect-client-after-1s", 10s); - pubSub.setAccountData("token", "123456"); pubSub.start(); std::this_thread::sleep_for(50ms); @@ -126,7 +128,7 @@ TEST(TwitchPubSubClient, DisconnectedAfter1s) ASSERT_EQ(pubSub.diag.messagesReceived, 0); ASSERT_EQ(pubSub.diag.listenResponses, 0); - pubSub.listenToTopic("test"); + pubSub.listenToChannelModerationActions("123456"); std::this_thread::sleep_for(500ms); @@ -158,7 +160,6 @@ TEST(TwitchPubSubClient, ExceedTopicLimit) { FTest pubSub("", 1s); - pubSub.setAccountData("token", "123456"); pubSub.start(); ASSERT_EQ(pubSub.diag.connectionsOpened, 0); @@ -168,7 +169,7 @@ TEST(TwitchPubSubClient, ExceedTopicLimit) for (auto i = 0; i < PubSubClient::MAX_LISTENS; ++i) { - pubSub.listenToTopic(QString("test-1.%1").arg(i)); + pubSub.listenToChannelModerationActions(QString("1%1").arg(i)); } std::this_thread::sleep_for(50ms); @@ -179,7 +180,7 @@ TEST(TwitchPubSubClient, ExceedTopicLimit) for (auto i = 0; i < PubSubClient::MAX_LISTENS; ++i) { - pubSub.listenToTopic(QString("test-2.%1").arg(i)); + pubSub.listenToChannelModerationActions(QString("2%1").arg(i)); } std::this_thread::sleep_for(50ms); @@ -199,7 +200,6 @@ TEST(TwitchPubSubClient, ExceedTopicLimitSingleStep) { FTest pubSub("", 1s); - pubSub.setAccountData("token", "123456"); pubSub.start(); ASSERT_EQ(pubSub.diag.connectionsOpened, 0); @@ -209,7 +209,7 @@ TEST(TwitchPubSubClient, ExceedTopicLimitSingleStep) for (auto i = 0; i < PubSubClient::MAX_LISTENS * 2; ++i) { - pubSub.listenToTopic("test"); + pubSub.listenToChannelModerationActions("123456"); } std::this_thread::sleep_for(150ms); @@ -229,17 +229,16 @@ TEST(TwitchPubSubClient, ReceivedWhisper) { FTest pubSub("/receive-whisper", 1s); - pubSub.setAccountData("token", "123456"); pubSub.start(); ReceivedMessage aReceivedWhisper; - pubSub.signals_.whisper.received.connect( + std::ignore = pubSub.whisper.received.connect( [&aReceivedWhisper](const auto &whisperMessage) { aReceivedWhisper = whisperMessage; }); - pubSub.listenToTopic("whispers.123456"); + pubSub.listenToWhispers(); std::this_thread::sleep_for(150ms); @@ -266,19 +265,18 @@ TEST(TwitchPubSubClient, ModeratorActionsUserBanned) { FTest pubSub("/moderator-actions-user-banned", 1s); - pubSub.setAccountData("token", "123456"); pubSub.start(); ReceivedMessage received; - pubSub.signals_.moderation.userBanned.connect( - [&received](const auto &action) { + std::ignore = + pubSub.moderation.userBanned.connect([&received](const auto &action) { received = action; }); ASSERT_EQ(pubSub.diag.listenResponses, 0); - pubSub.listenToTopic("chat_moderator_actions.123456.123456"); + pubSub.listenToChannelModerationActions("123456"); std::this_thread::sleep_for(50ms); @@ -308,12 +306,11 @@ TEST(TwitchPubSubClient, ModeratorActionsUserBanned) TEST(TwitchPubSubClient, MissingToken) { // The token that's required is "xD" - FTest pubSub("/authentication-required", 1s); + FTest pubSub("/authentication-required", 1s, ""); - // pubSub.setAccountData("", "123456"); pubSub.start(); - pubSub.listenToTopic("chat_moderator_actions.123456.123456"); + pubSub.listenToChannelModerationActions("123456"); std::this_thread::sleep_for(150ms); @@ -336,10 +333,9 @@ TEST(TwitchPubSubClient, WrongToken) // The token that's required is "xD" FTest pubSub("/authentication-required", 1s); - pubSub.setAccountData("wrongtoken", "123456"); pubSub.start(); - pubSub.listenToTopic("chat_moderator_actions.123456.123456"); + pubSub.listenToChannelModerationActions("123456"); std::this_thread::sleep_for(50ms); @@ -360,12 +356,11 @@ TEST(TwitchPubSubClient, WrongToken) TEST(TwitchPubSubClient, CorrectToken) { // The token that's required is "xD" - FTest pubSub("/authentication-required", 1s); + FTest pubSub("/authentication-required", 1s, "xD"); - pubSub.setAccountData("xD", "123456"); pubSub.start(); - pubSub.listenToTopic("chat_moderator_actions.123456.123456"); + pubSub.listenToChannelModerationActions("123456"); std::this_thread::sleep_for(50ms); @@ -387,19 +382,18 @@ TEST(TwitchPubSubClient, AutoModMessageHeld) { FTest pubSub("/automod-held", 1s); - pubSub.setAccountData("xD", "123456"); pubSub.start(); ReceivedMessage received; ReceivedMessage channelID; - pubSub.signals_.moderation.autoModMessageCaught.connect( + std::ignore = pubSub.moderation.autoModMessageCaught.connect( [&](const auto &msg, const QString &incomingChannelID) { received = msg; channelID = incomingChannelID; }); - pubSub.listenToTopic("automod-queue.117166826.117166826"); + pubSub.listenToAutomod("117166826"); std::this_thread::sleep_for(50ms);