diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
index ce9910951..e88f25334 100644
--- a/.github/workflows/test.yml
+++ b/.github/workflows/test.yml
@@ -4,6 +4,9 @@ name: Test
on:
pull_request:
+env:
+ TWITCH_PUBSUB_SERVER_IMAGE: ghcr.io/chatterino/twitch-pubsub-server-test:v1.0.3
+
jobs:
test:
runs-on: ${{ matrix.os }}
@@ -77,6 +80,8 @@ jobs:
- name: Test (Ubuntu)
if: startsWith(matrix.os, 'ubuntu')
run: |
+ docker pull ${{ env.TWITCH_PUBSUB_SERVER_IMAGE }}
+ docker run --network=host --detach ${{ env.TWITCH_PUBSUB_SERVER_IMAGE }}
./bin/chatterino-test --platform minimal
working-directory: build-test
shell: bash
diff --git a/.gitmodules b/.gitmodules
index adef9ede4..3694e06f0 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -29,3 +29,6 @@
[submodule "cmake/sanitizers-cmake"]
path = cmake/sanitizers-cmake
url = https://github.com/arsenm/sanitizers-cmake
+[submodule "lib/magic_enum"]
+ path = lib/magic_enum
+ url = https://github.com/Neargye/magic_enum
diff --git a/CHANGELOG.md b/CHANGELOG.md
index c87ad5517..540e2ff3c 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -7,6 +7,7 @@
- Minor: Adjust large stream thumbnail to 16:9 (#3655)
- Minor: Fixed being unable to load Twitch Usercards from the `/mentions` tab. (#3623)
- Minor: Add information about the user's operating system in the About page. (#3663)
+- Bugfix: Connection to Twitch PubSub now recovers more reliably. (#3643)
- Minor: Added chatter count for each category in viewer list. (#3683)
- Minor: Sorted usernames in /vips message to be case-insensitive. (#3696)
- Minor: Added option to open a user's chat in a new tab from the usercard profile picture context menu. (#3625)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index f29563160..b2cd73c37 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -112,6 +112,7 @@ endif ()
find_package(PajladaSerialize REQUIRED)
find_package(PajladaSignals REQUIRED)
find_package(LRUCache REQUIRED)
+find_package(MagicEnum REQUIRED)
if (USE_SYSTEM_PAJLADA_SETTINGS)
find_package(PajladaSettings REQUIRED)
diff --git a/chatterino.pro b/chatterino.pro
index 97089e66b..5cccdc935 100644
--- a/chatterino.pro
+++ b/chatterino.pro
@@ -96,6 +96,7 @@ include(lib/signals.pri)
include(lib/settings.pri)
include(lib/serialize.pri)
include(lib/lrucache.pri)
+include(lib/magic_enum.pri)
include(lib/winsdk.pri)
include(lib/rapidjson.pri)
include(lib/qtkeychain.pri)
@@ -212,9 +213,16 @@ SOURCES += \
src/providers/twitch/api/Helix.cpp \
src/providers/twitch/ChannelPointReward.cpp \
src/providers/twitch/IrcMessageHandler.cpp \
- src/providers/twitch/PubsubActions.cpp \
- src/providers/twitch/PubsubClient.cpp \
- src/providers/twitch/PubsubHelpers.cpp \
+ src/providers/twitch/PubSubActions.cpp \
+ src/providers/twitch/PubSubClient.cpp \
+ src/providers/twitch/PubSubManager.cpp \
+ src/providers/twitch/pubsubmessages/AutoMod.cpp \
+ src/providers/twitch/pubsubmessages/Base.cpp \
+ src/providers/twitch/pubsubmessages/ChannelPoints.cpp \
+ src/providers/twitch/pubsubmessages/ChatModeratorAction.cpp \
+ src/providers/twitch/pubsubmessages/Listen.cpp \
+ src/providers/twitch/pubsubmessages/Unlisten.cpp \
+ src/providers/twitch/pubsubmessages/Whisper.cpp \
src/providers/twitch/TwitchAccount.cpp \
src/providers/twitch/TwitchAccountManager.cpp \
src/providers/twitch/TwitchBadge.cpp \
@@ -432,6 +440,7 @@ HEADERS += \
src/messages/search/LinkPredicate.hpp \
src/messages/search/MessageFlagsPredicate.hpp \
src/messages/search/MessagePredicate.hpp \
+ src/messages/search/RegexPredicate.hpp \
src/messages/search/SubstringPredicate.hpp \
src/messages/Selection.hpp \
src/messages/SharedMessageBuilder.hpp \
@@ -458,9 +467,21 @@ HEADERS += \
src/providers/twitch/ChatterinoWebSocketppLogger.hpp \
src/providers/twitch/EmoteValue.hpp \
src/providers/twitch/IrcMessageHandler.hpp \
- src/providers/twitch/PubsubActions.hpp \
- src/providers/twitch/PubsubClient.hpp \
- src/providers/twitch/PubsubHelpers.hpp \
+ src/providers/twitch/PubSubActions.hpp \
+ src/providers/twitch/PubSubClient.hpp \
+ src/providers/twitch/PubSubClientOptions.hpp \
+ src/providers/twitch/PubSubHelpers.hpp \
+ src/providers/twitch/PubSubManager.hpp \
+ src/providers/twitch/PubSubMessages.hpp \
+ src/providers/twitch/pubsubmessages/AutoMod.hpp \
+ src/providers/twitch/pubsubmessages/Base.hpp \
+ src/providers/twitch/pubsubmessages/ChannelPoints.hpp \
+ src/providers/twitch/pubsubmessages/ChatModeratorAction.hpp \
+ src/providers/twitch/pubsubmessages/Listen.hpp \
+ src/providers/twitch/pubsubmessages/Message.hpp \
+ src/providers/twitch/pubsubmessages/Unlisten.hpp \
+ src/providers/twitch/pubsubmessages/Whisper.hpp \
+ src/providers/twitch/PubSubWebsocket.hpp \
src/providers/twitch/TwitchAccount.hpp \
src/providers/twitch/TwitchAccountManager.hpp \
src/providers/twitch/TwitchBadge.hpp \
diff --git a/cmake/FindMagicEnum.cmake b/cmake/FindMagicEnum.cmake
new file mode 100644
index 000000000..0a77bd279
--- /dev/null
+++ b/cmake/FindMagicEnum.cmake
@@ -0,0 +1,14 @@
+include(FindPackageHandleStandardArgs)
+
+find_path(MagicEnum_INCLUDE_DIR magic_enum.hpp HINTS ${CMAKE_SOURCE_DIR}/lib/magic_enum/include)
+
+find_package_handle_standard_args(MagicEnum DEFAULT_MSG MagicEnum_INCLUDE_DIR)
+
+if (MagicEnum_FOUND)
+ add_library(MagicEnum INTERFACE IMPORTED)
+ set_target_properties(MagicEnum PROPERTIES
+ INTERFACE_INCLUDE_DIRECTORIES "${MagicEnum_INCLUDE_DIR}"
+ )
+endif ()
+
+mark_as_advanced(MagicEnum_INCLUDE_DIR)
diff --git a/lib/magic_enum b/lib/magic_enum
new file mode 160000
index 000000000..b2ac76235
--- /dev/null
+++ b/lib/magic_enum
@@ -0,0 +1 @@
+Subproject commit b2ac76235b2261305bdfe562eb5982c808d07e73
diff --git a/lib/magic_enum.pri b/lib/magic_enum.pri
new file mode 100644
index 000000000..15f1f21c2
--- /dev/null
+++ b/lib/magic_enum.pri
@@ -0,0 +1 @@
+INCLUDEPATH += $$PWD/magic_enum/include/
diff --git a/resources/licenses/magic_enum.txt b/resources/licenses/magic_enum.txt
new file mode 100644
index 000000000..05b298b75
--- /dev/null
+++ b/resources/licenses/magic_enum.txt
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2019 - 2022 Daniil Goncharov
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/resources/resources_autogenerated.qrc b/resources/resources_autogenerated.qrc
index e83762905..550bf3437 100644
--- a/resources/resources_autogenerated.qrc
+++ b/resources/resources_autogenerated.qrc
@@ -59,6 +59,7 @@
licenses/emoji-data-source.txt
licenses/libcommuni_BSD3.txt
licenses/lrucache.txt
+ licenses/magic_enum.txt
licenses/openssl.txt
licenses/pajlada_settings.txt
licenses/pajlada_signals.txt
diff --git a/src/Application.cpp b/src/Application.cpp
index 357e39932..3be36ae3a 100644
--- a/src/Application.cpp
+++ b/src/Application.cpp
@@ -17,7 +17,7 @@
#include "providers/ffz/FfzBadges.hpp"
#include "providers/ffz/FfzEmotes.hpp"
#include "providers/irc/Irc2.hpp"
-#include "providers/twitch/PubsubClient.hpp"
+#include "providers/twitch/PubSubManager.hpp"
#include "providers/twitch/TwitchIrcServer.hpp"
#include "providers/twitch/TwitchMessageBuilder.hpp"
#include "singletons/Emotes.hpp"
@@ -31,6 +31,7 @@
#include "singletons/Toasts.hpp"
#include "singletons/Updates.hpp"
#include "singletons/WindowManager.hpp"
+#include "util/Helpers.hpp"
#include "util/IsBigEndian.hpp"
#include "util/PostToThread.hpp"
#include "util/RapidjsonHelpers.hpp"
@@ -137,7 +138,7 @@ void Application::initialize(Settings &settings, Paths &paths)
{
this->initNm(paths);
}
- this->initPubsub();
+ this->initPubSub();
}
int Application::run(QApplication &qtApp)
@@ -194,7 +195,7 @@ void Application::initNm(Paths &paths)
#endif
}
-void Application::initPubsub()
+void Application::initPubSub()
{
this->twitch->pubsub->signals_.moderation.chatCleared.connect(
[this](const auto &action) {
@@ -331,21 +332,105 @@ void Application::initPubsub()
});
});
- this->twitch->pubsub->signals_.moderation.automodMessage.connect(
- [&](const auto &action) {
- auto chan = this->twitch->getChannelOrEmptyByID(action.roomID);
+ const auto handleAutoModMessage = [&](const auto &action) {
+ auto chan = this->twitch->getChannelOrEmptyByID(action.roomID);
- if (chan->isEmpty())
- {
- return;
- }
+ if (chan->isEmpty())
+ {
+ return;
+ }
- postToThread([chan, action] {
- const auto p = makeAutomodMessage(action);
- chan->addMessage(p.first);
- chan->addMessage(p.second);
- });
+ postToThread([chan, action] {
+ const auto p = makeAutomodMessage(action);
+ chan->addMessage(p.first);
+ chan->addMessage(p.second);
});
+ };
+
+ this->twitch->pubsub->signals_.moderation.autoModMessageCaught.connect(
+ [&](const auto &msg, const QString &channelID) {
+ switch (msg.type)
+ {
+ case PubSubAutoModQueueMessage::Type::AutoModCaughtMessage: {
+ if (msg.status == "PENDING")
+ {
+ AutomodAction action(msg.data, channelID);
+ action.reason = QString("%1 level %2")
+ .arg(msg.contentCategory)
+ .arg(msg.contentLevel);
+
+ action.msgID = msg.messageID;
+ action.message = msg.messageText;
+
+ // this message also contains per-word automod data, which could be implemented
+
+ // extract sender data manually because Twitch loves not being consistent
+ QString senderDisplayName =
+ msg.senderUserDisplayName; // Might be transformed later
+ bool hasLocalizedName = false;
+ if (!msg.senderUserDisplayName.isEmpty())
+ {
+ // check for non-ascii display names
+ if (QString::compare(msg.senderUserDisplayName,
+ msg.senderUserLogin,
+ Qt::CaseInsensitive) != 0)
+ {
+ hasLocalizedName = true;
+ }
+ }
+ QColor senderColor = msg.senderUserChatColor;
+ QString senderColor_;
+ if (!senderColor.isValid() &&
+ getSettings()->colorizeNicknames)
+ {
+ // color may be not present if user is a grey-name
+ senderColor = getRandomColor(msg.senderUserID);
+ }
+
+ // handle username style based on prefered setting
+ switch (getSettings()->usernameDisplayMode.getValue())
+ {
+ case UsernameDisplayMode::Username: {
+ if (hasLocalizedName)
+ {
+ senderDisplayName = msg.senderUserLogin;
+ }
+ break;
+ }
+ case UsernameDisplayMode::LocalizedName: {
+ break;
+ }
+ case UsernameDisplayMode::
+ UsernameAndLocalizedName: {
+ if (hasLocalizedName)
+ {
+ senderDisplayName = QString("%1(%2)").arg(
+ msg.senderUserLogin,
+ msg.senderUserDisplayName);
+ }
+ break;
+ }
+ }
+
+ action.target =
+ ActionUser{msg.senderUserID, msg.senderUserLogin,
+ senderDisplayName, senderColor};
+ handleAutoModMessage(action);
+ }
+ // "ALLOWED" and "DENIED" statuses remain unimplemented
+ // They are versions of automod_message_(denied|approved) but for mods.
+ }
+ break;
+
+ case PubSubAutoModQueueMessage::Type::INVALID:
+ default: {
+ }
+ break;
+ }
+ });
+
+ this->twitch->pubsub->signals_.moderation.autoModMessageBlocked.connect(
+ handleAutoModMessage);
this->twitch->pubsub->signals_.moderation.automodUserMessage.connect(
[&](const auto &action) {
@@ -381,39 +466,44 @@ void Application::initPubsub()
this->twitch->pubsub->signals_.pointReward.redeemed.connect(
[&](auto &data) {
- QString channelId;
- if (rj::getSafe(data, "channel_id", channelId))
- {
- auto chan = this->twitch->getChannelOrEmptyByID(channelId);
-
- auto reward = ChannelPointReward(data);
-
- postToThread([chan, reward] {
- if (auto channel =
- dynamic_cast(chan.get()))
- {
- channel->addChannelPointReward(reward);
- }
- });
- }
- else
+ QString channelId = data.value("channel_id").toString();
+ if (channelId.isEmpty())
{
qCDebug(chatterinoApp)
<< "Couldn't find channel id of point reward";
+ return;
}
+
+ auto chan = this->twitch->getChannelOrEmptyByID(channelId);
+
+ auto reward = ChannelPointReward(data);
+
+ postToThread([chan, reward] {
+ if (auto channel = dynamic_cast(chan.get()))
+ {
+ channel->addChannelPointReward(reward);
+ }
+ });
});
this->twitch->pubsub->start();
auto RequestModerationActions = [=]() {
- this->twitch->pubsub->unlistenAllModerationActions();
+ this->twitch->pubsub->setAccount(
+ getApp()->accounts->twitch.getCurrent());
// TODO(pajlada): Unlisten to all authed topics instead of only
// moderation topics this->twitch->pubsub->UnlistenAllAuthedTopics();
- this->twitch->pubsub->listenToWhispers(
- this->accounts->twitch.getCurrent());
+ this->twitch->pubsub->listenToWhispers();
};
+ this->accounts->twitch.currentUserChanged.connect(
+ [=] {
+ this->twitch->pubsub->unlistenAllModerationActions();
+ this->twitch->pubsub->unlistenWhispers();
+ },
+ boost::signals2::at_front);
+
this->accounts->twitch.currentUserChanged.connect(RequestModerationActions);
RequestModerationActions();
diff --git a/src/Application.hpp b/src/Application.hpp
index 5091322af..846b8231d 100644
--- a/src/Application.hpp
+++ b/src/Application.hpp
@@ -66,7 +66,7 @@ public:
private:
void addSingleton(Singleton *singleton);
- void initPubsub();
+ void initPubSub();
void initNm(Paths &paths);
template hasParsedSuccessfully =
- rj::getSafeObject(redemption, "user", user)))
- {
- qCDebug(chatterinoTwitch) << "No user info found for redemption";
- return;
- }
-
- rapidjson::Value reward;
- if (!(this->hasParsedSuccessfully =
- rj::getSafeObject(redemption, "reward", reward)))
- {
- qCDebug(chatterinoTwitch) << "No reward info found for redemption";
- return;
- }
-
- if (!(this->hasParsedSuccessfully = rj::getSafe(reward, "id", this->id)))
- {
- qCDebug(chatterinoTwitch) << "No id found for reward";
- return;
- }
-
- if (!(this->hasParsedSuccessfully =
- rj::getSafe(reward, "channel_id", this->channelId)))
- {
- qCDebug(chatterinoTwitch) << "No channel_id found for reward";
- return;
- }
-
- if (!(this->hasParsedSuccessfully =
- rj::getSafe(reward, "title", this->title)))
- {
- qCDebug(chatterinoTwitch) << "No title found for reward";
- return;
- }
-
- if (!(this->hasParsedSuccessfully =
- rj::getSafe(reward, "cost", this->cost)))
- {
- qCDebug(chatterinoTwitch) << "No cost found for reward";
- return;
- }
-
- if (!(this->hasParsedSuccessfully = rj::getSafe(
- reward, "is_user_input_required", this->isUserInputRequired)))
- {
- qCDebug(chatterinoTwitch)
- << "No information if user input is required found for reward";
- return;
- }
+ this->id = reward.value("id").toString();
+ this->channelId = reward.value("channel_id").toString();
+ this->title = reward.value("title").toString();
+ this->cost = reward.value("cost").toInt();
+ this->isUserInputRequired = reward.value("is_user_input_required").toBool();
// We don't need to store user information for rewards with user input
// because we will get the user info from a corresponding IRC message
if (!this->isUserInputRequired)
{
- this->parseUser(user);
+ auto user = redemption.value("user").toObject();
+
+ this->user.id = user.value("id").toString();
+ this->user.login = user.value("login").toString();
+ this->user.displayName = user.value("display_name").toString();
}
- rapidjson::Value obj;
- if (rj::getSafeObject(reward, "image", obj) && !obj.IsNull() &&
- obj.IsObject())
+ auto imageValue = reward.value("image");
+
+ if (imageValue.isObject())
{
+ auto imageObject = imageValue.toObject();
this->image = ImageSet{
- Image::fromUrl(
- {parseRewardImage(obj, "url_1x", this->hasParsedSuccessfully)},
- 1),
- Image::fromUrl(
- {parseRewardImage(obj, "url_2x", this->hasParsedSuccessfully)},
- 0.5),
- Image::fromUrl(
- {parseRewardImage(obj, "url_4x", this->hasParsedSuccessfully)},
- 0.25),
+ Image::fromUrl({imageObject.value("url_1x").toString()}, 1),
+ Image::fromUrl({imageObject.value("url_2x").toString()}, 0.5),
+ Image::fromUrl({imageObject.value("url_4x").toString()}, 0.25),
};
}
else
@@ -104,27 +45,4 @@ ChannelPointReward::ChannelPointReward(rapidjson::Value &redemption)
}
}
-void ChannelPointReward::parseUser(rapidjson::Value &user)
-{
- if (!(this->hasParsedSuccessfully = rj::getSafe(user, "id", this->user.id)))
- {
- qCDebug(chatterinoTwitch) << "No id found for user in reward";
- return;
- }
-
- if (!(this->hasParsedSuccessfully =
- rj::getSafe(user, "login", this->user.login)))
- {
- qCDebug(chatterinoTwitch) << "No login name found for user in reward";
- return;
- }
-
- if (!(this->hasParsedSuccessfully =
- rj::getSafe(user, "display_name", this->user.displayName)))
- {
- qCDebug(chatterinoTwitch) << "No display name found for user in reward";
- return;
- }
-}
-
} // namespace chatterino
diff --git a/src/providers/twitch/ChannelPointReward.hpp b/src/providers/twitch/ChannelPointReward.hpp
index 65885bcda..fad2ed375 100644
--- a/src/providers/twitch/ChannelPointReward.hpp
+++ b/src/providers/twitch/ChannelPointReward.hpp
@@ -4,7 +4,7 @@
#include "messages/Image.hpp"
#include "messages/ImageSet.hpp"
-#include
+#include
#define TWITCH_CHANNEL_POINT_REWARD_URL(x) \
QString("https://static-cdn.jtvnw.net/custom-reward-images/default-%1") \
@@ -12,14 +12,13 @@
namespace chatterino {
struct ChannelPointReward {
- ChannelPointReward(rapidjson::Value &reward);
+ ChannelPointReward(const QJsonObject &redemption);
ChannelPointReward() = delete;
QString id;
QString channelId;
QString title;
int cost;
ImageSet image;
- bool hasParsedSuccessfully = false;
bool isUserInputRequired = false;
struct {
@@ -27,9 +26,6 @@ struct ChannelPointReward {
QString login;
QString displayName;
} user;
-
-private:
- void parseUser(rapidjson::Value &user);
};
} // namespace chatterino
diff --git a/src/providers/twitch/PubSubActions.cpp b/src/providers/twitch/PubSubActions.cpp
new file mode 100644
index 000000000..eb1e6ffbf
--- /dev/null
+++ b/src/providers/twitch/PubSubActions.cpp
@@ -0,0 +1,13 @@
+#include "providers/twitch/PubSubActions.hpp"
+
+namespace chatterino {
+
+PubSubAction::PubSubAction(const QJsonObject &data, const QString &_roomID)
+ : timestamp(std::chrono::steady_clock::now())
+ , roomID(_roomID)
+{
+ this->source.id = data.value("created_by_user_id").toString();
+ this->source.login = data.value("created_by").toString();
+}
+
+} // namespace chatterino
diff --git a/src/providers/twitch/PubsubActions.hpp b/src/providers/twitch/PubSubActions.hpp
similarity index 83%
rename from src/providers/twitch/PubsubActions.hpp
rename to src/providers/twitch/PubSubActions.hpp
index abe106df1..d6361738b 100644
--- a/src/providers/twitch/PubsubActions.hpp
+++ b/src/providers/twitch/PubSubActions.hpp
@@ -1,7 +1,7 @@
#pragma once
-#include
#include
+#include
#include
#include
@@ -15,10 +15,24 @@ struct ActionUser {
// displayName should be in format "login(localizedName)" for non-ascii usernames
QString displayName;
QColor color;
+
+ inline bool operator==(const ActionUser &rhs) const
+ {
+ return this->id == rhs.id && this->login == rhs.login &&
+ this->displayName == rhs.displayName && this->color == rhs.color;
+ }
};
+inline QDebug operator<<(QDebug dbg, const ActionUser &user)
+{
+ dbg.nospace() << "ActionUser(" << user.id << ", " << user.login << ", "
+ << user.displayName << ", " << user.color << ")";
+
+ return dbg.maybeSpace();
+}
+
struct PubSubAction {
- PubSubAction(const rapidjson::Value &data, const QString &_roomID);
+ PubSubAction(const QJsonObject &data, const QString &_roomID);
ActionUser source;
std::chrono::steady_clock::time_point timestamp;
diff --git a/src/providers/twitch/PubSubClient.cpp b/src/providers/twitch/PubSubClient.cpp
new file mode 100644
index 000000000..c35d9a418
--- /dev/null
+++ b/src/providers/twitch/PubSubClient.cpp
@@ -0,0 +1,212 @@
+#include "providers/twitch/PubSubClient.hpp"
+
+#include "common/QLogging.hpp"
+#include "providers/twitch/PubSubActions.hpp"
+#include "providers/twitch/PubSubHelpers.hpp"
+#include "providers/twitch/PubSubMessages.hpp"
+#include "providers/twitch/pubsubmessages/Unlisten.hpp"
+#include "singletons/Settings.hpp"
+#include "util/DebugCount.hpp"
+#include "util/Helpers.hpp"
+#include "util/RapidjsonHelpers.hpp"
+
+#include
+#include
+
+namespace chatterino {
+
+static const char *PING_PAYLOAD = R"({"type":"PING"})";
+
+PubSubClient::PubSubClient(WebsocketClient &websocketClient,
+ WebsocketHandle handle,
+ const PubSubClientOptions &clientOptions)
+ : websocketClient_(websocketClient)
+ , handle_(handle)
+ , clientOptions_(clientOptions)
+{
+}
+
+void PubSubClient::start()
+{
+ assert(!this->started_);
+
+ this->started_ = true;
+
+ this->ping();
+}
+
+void PubSubClient::stop()
+{
+ assert(this->started_);
+
+ this->started_ = false;
+}
+
+void PubSubClient::close(const std::string &reason,
+ websocketpp::close::status::value code)
+{
+ WebsocketErrorCode ec;
+
+ auto conn = this->websocketClient_.get_con_from_hdl(this->handle_, ec);
+ if (ec)
+ {
+ qCDebug(chatterinoPubSub)
+ << "Error getting con:" << ec.message().c_str();
+ return;
+ }
+
+ conn->close(code, reason, ec);
+ if (ec)
+ {
+ qCDebug(chatterinoPubSub) << "Error closing:" << ec.message().c_str();
+ return;
+ }
+}
+
+bool PubSubClient::listen(PubSubListenMessage msg)
+{
+ int numRequestedListens = msg.topics.size();
+
+ if (this->numListens_ + numRequestedListens > PubSubClient::MAX_LISTENS)
+ {
+ // This PubSubClient is already at its peak listens
+ return false;
+ }
+ this->numListens_ += numRequestedListens;
+ DebugCount::increase("PubSub topic pending listens", numRequestedListens);
+
+ for (const auto &topic : msg.topics)
+ {
+ this->listeners_.emplace_back(Listener{topic, false, false, false});
+ }
+
+ qCDebug(chatterinoPubSub)
+ << "Subscribing to" << numRequestedListens << "topics";
+
+ this->send(msg.toJson());
+
+ return true;
+}
+
+PubSubClient::UnlistenPrefixResponse PubSubClient::unlistenPrefix(
+ const QString &prefix)
+{
+ std::vector topics;
+
+ for (auto it = this->listeners_.begin(); it != this->listeners_.end();)
+ {
+ const auto &listener = *it;
+ if (listener.topic.startsWith(prefix))
+ {
+ topics.push_back(listener.topic);
+ it = this->listeners_.erase(it);
+ }
+ else
+ {
+ ++it;
+ }
+ }
+
+ if (topics.empty())
+ {
+ return {{}, ""};
+ }
+
+ auto numRequestedUnlistens = topics.size();
+
+ this->numListens_ -= numRequestedUnlistens;
+ DebugCount::increase("PubSub topic pending unlistens",
+ numRequestedUnlistens);
+
+ PubSubUnlistenMessage message(topics);
+
+ this->send(message.toJson());
+
+ return {message.topics, message.nonce};
+}
+
+void PubSubClient::handleListenResponse(const PubSubMessage &message)
+{
+}
+
+void PubSubClient::handleUnlistenResponse(const PubSubMessage &message)
+{
+}
+
+void PubSubClient::handlePong()
+{
+ assert(this->awaitingPong_);
+
+ this->awaitingPong_ = false;
+}
+
+bool PubSubClient::isListeningToTopic(const QString &topic)
+{
+ for (const auto &listener : this->listeners_)
+ {
+ if (listener.topic == topic)
+ {
+ return true;
+ }
+ }
+
+ return false;
+}
+
+std::vector PubSubClient::getListeners() const
+{
+ return this->listeners_;
+}
+
+void PubSubClient::ping()
+{
+ assert(this->started_);
+
+ if (this->awaitingPong_)
+ {
+ qCDebug(chatterinoPubSub) << "No pong response, disconnect!";
+ this->close("Didn't respond to ping");
+
+ return;
+ }
+
+ if (!this->send(PING_PAYLOAD))
+ {
+ return;
+ }
+
+ this->awaitingPong_ = true;
+
+ auto self = this->shared_from_this();
+
+ runAfter(this->websocketClient_.get_io_service(),
+ this->clientOptions_.pingInterval_, [self](auto timer) {
+ if (!self->started_)
+ {
+ return;
+ }
+
+ self->ping();
+ });
+}
+
+bool PubSubClient::send(const char *payload)
+{
+ WebsocketErrorCode ec;
+ this->websocketClient_.send(this->handle_, payload,
+ websocketpp::frame::opcode::text, ec);
+
+ if (ec)
+ {
+ qCDebug(chatterinoPubSub) << "Error sending message" << payload << ":"
+ << ec.message().c_str();
+ // TODO(pajlada): Check which error code happened and maybe
+ // gracefully handle it
+
+ return false;
+ }
+
+ return true;
+}
+
+} // namespace chatterino
diff --git a/src/providers/twitch/PubSubClient.hpp b/src/providers/twitch/PubSubClient.hpp
new file mode 100644
index 000000000..848328617
--- /dev/null
+++ b/src/providers/twitch/PubSubClient.hpp
@@ -0,0 +1,74 @@
+#pragma once
+
+#include "providers/twitch/PubSubClientOptions.hpp"
+#include "providers/twitch/PubSubMessages.hpp"
+#include "providers/twitch/PubSubWebsocket.hpp"
+
+#include
+#include
+
+#include
+#include
+
+namespace chatterino {
+
+struct TopicData {
+ QString topic;
+ bool authed{false};
+ bool persistent{false};
+};
+
+struct Listener : TopicData {
+ bool confirmed{false};
+};
+
+class PubSubClient : public std::enable_shared_from_this
+{
+public:
+ struct UnlistenPrefixResponse {
+ std::vector topics;
+ QString nonce;
+ };
+
+ // The max amount of topics we may listen to with a single connection
+ static constexpr std::vector::size_type MAX_LISTENS = 50;
+
+ PubSubClient(WebsocketClient &_websocketClient, WebsocketHandle _handle,
+ const PubSubClientOptions &clientOptions);
+
+ void start();
+ void stop();
+
+ void close(const std::string &reason,
+ websocketpp::close::status::value code =
+ websocketpp::close::status::normal);
+
+ bool listen(PubSubListenMessage msg);
+ UnlistenPrefixResponse unlistenPrefix(const QString &prefix);
+
+ void handleListenResponse(const PubSubMessage &message);
+ void handleUnlistenResponse(const PubSubMessage &message);
+
+ void handlePong();
+
+ bool isListeningToTopic(const QString &topic);
+
+ std::vector getListeners() const;
+
+private:
+ void ping();
+ bool send(const char *payload);
+
+ WebsocketClient &websocketClient_;
+ WebsocketHandle handle_;
+ uint16_t numListens_ = 0;
+
+ std::vector listeners_;
+
+ std::atomic awaitingPong_{false};
+ std::atomic started_{false};
+
+ const PubSubClientOptions &clientOptions_;
+};
+
+} // namespace chatterino
diff --git a/src/providers/twitch/PubSubClientOptions.hpp b/src/providers/twitch/PubSubClientOptions.hpp
new file mode 100644
index 000000000..2bd576efd
--- /dev/null
+++ b/src/providers/twitch/PubSubClientOptions.hpp
@@ -0,0 +1,14 @@
+#pragma once
+
+#include
+
+namespace chatterino {
+
+/**
+ * @brief Options to change the behaviour of the underlying websocket clients
+ **/
+struct PubSubClientOptions {
+ std::chrono::seconds pingInterval_;
+};
+
+} // namespace chatterino
diff --git a/src/providers/twitch/PubsubHelpers.hpp b/src/providers/twitch/PubSubHelpers.hpp
similarity index 64%
rename from src/providers/twitch/PubsubHelpers.hpp
rename to src/providers/twitch/PubSubHelpers.hpp
index 3896f07ff..d1e616c6a 100644
--- a/src/providers/twitch/PubsubHelpers.hpp
+++ b/src/providers/twitch/PubSubHelpers.hpp
@@ -1,5 +1,6 @@
#pragma once
+#include
#include
#include
#include
@@ -11,19 +12,6 @@ namespace chatterino {
class TwitchAccount;
struct ActionUser;
-const rapidjson::Value &getArgs(const rapidjson::Value &data);
-const rapidjson::Value &getMsgID(const rapidjson::Value &data);
-
-bool getCreatedByUser(const rapidjson::Value &data, ActionUser &user);
-
-bool getTargetUser(const rapidjson::Value &data, ActionUser &user);
-bool getTargetUserName(const rapidjson::Value &data, ActionUser &user);
-
-rapidjson::Document createListenMessage(const std::vector &topicsVec,
- std::shared_ptr account);
-rapidjson::Document createUnlistenMessage(
- const std::vector &topicsVec);
-
// Create timer using given ioService
template
void runAfter(boost::asio::io_service &ioService, Duration duration,
@@ -35,7 +23,7 @@ void runAfter(boost::asio::io_service &ioService, Duration duration,
timer->async_wait([timer, cb](const boost::system::error_code &ec) {
if (ec)
{
- qCDebug(chatterinoPubsub)
+ qCDebug(chatterinoPubSub)
<< "Error in runAfter:" << ec.message().c_str();
return;
}
@@ -54,7 +42,7 @@ void runAfter(std::shared_ptr timer,
timer->async_wait([timer, cb](const boost::system::error_code &ec) {
if (ec)
{
- qCDebug(chatterinoPubsub)
+ qCDebug(chatterinoPubSub)
<< "Error in runAfter:" << ec.message().c_str();
return;
}
diff --git a/src/providers/twitch/PubSubManager.cpp b/src/providers/twitch/PubSubManager.cpp
new file mode 100644
index 000000000..ada771860
--- /dev/null
+++ b/src/providers/twitch/PubSubManager.cpp
@@ -0,0 +1,1153 @@
+#include "providers/twitch/PubSubManager.hpp"
+
+#include "common/QLogging.hpp"
+#include "providers/twitch/PubSubActions.hpp"
+#include "providers/twitch/PubSubHelpers.hpp"
+#include "providers/twitch/PubSubMessages.hpp"
+#include "util/DebugCount.hpp"
+#include "util/Helpers.hpp"
+#include "util/RapidjsonHelpers.hpp"
+
+#include
+#include
+#include
+#include
+
+using websocketpp::lib::bind;
+using websocketpp::lib::placeholders::_1;
+using websocketpp::lib::placeholders::_2;
+
+namespace chatterino {
+
+PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval)
+ : host_(host)
+ , clientOptions_({
+ pingInterval,
+ })
+{
+ this->moderationActionHandlers["clear"] = [this](const auto &data,
+ const auto &roomID) {
+ ClearChatAction action(data, roomID);
+
+ this->signals_.moderation.chatCleared.invoke(action);
+ };
+
+ this->moderationActionHandlers["slowoff"] = [this](const auto &data,
+ const auto &roomID) {
+ ModeChangedAction action(data, roomID);
+
+ action.mode = ModeChangedAction::Mode::Slow;
+ action.state = ModeChangedAction::State::Off;
+
+ this->signals_.moderation.modeChanged.invoke(action);
+ };
+
+ this->moderationActionHandlers["slow"] = [this](const auto &data,
+ const auto &roomID) {
+ ModeChangedAction action(data, roomID);
+
+ action.mode = ModeChangedAction::Mode::Slow;
+ action.state = ModeChangedAction::State::On;
+
+ const auto args = data.value("args").toArray();
+
+ if (args.empty())
+ {
+ qCDebug(chatterinoPubSub)
+ << "Missing duration argument in slowmode on";
+ return;
+ }
+
+ bool ok;
+
+ action.duration = args.at(0).toString().toUInt(&ok, 10);
+
+ this->signals_.moderation.modeChanged.invoke(action);
+ };
+
+ this->moderationActionHandlers["r9kbetaoff"] = [this](const auto &data,
+ const auto &roomID) {
+ ModeChangedAction action(data, roomID);
+
+ action.mode = ModeChangedAction::Mode::R9K;
+ action.state = ModeChangedAction::State::Off;
+
+ this->signals_.moderation.modeChanged.invoke(action);
+ };
+
+ this->moderationActionHandlers["r9kbeta"] = [this](const auto &data,
+ const auto &roomID) {
+ ModeChangedAction action(data, roomID);
+
+ action.mode = ModeChangedAction::Mode::R9K;
+ action.state = ModeChangedAction::State::On;
+
+ this->signals_.moderation.modeChanged.invoke(action);
+ };
+
+ this->moderationActionHandlers["subscribersoff"] =
+ [this](const auto &data, const auto &roomID) {
+ ModeChangedAction action(data, roomID);
+
+ action.mode = ModeChangedAction::Mode::SubscribersOnly;
+ action.state = ModeChangedAction::State::Off;
+
+ this->signals_.moderation.modeChanged.invoke(action);
+ };
+
+ this->moderationActionHandlers["subscribers"] = [this](const auto &data,
+ const auto &roomID) {
+ ModeChangedAction action(data, roomID);
+
+ action.mode = ModeChangedAction::Mode::SubscribersOnly;
+ action.state = ModeChangedAction::State::On;
+
+ this->signals_.moderation.modeChanged.invoke(action);
+ };
+
+ this->moderationActionHandlers["emoteonlyoff"] =
+ [this](const auto &data, const auto &roomID) {
+ ModeChangedAction action(data, roomID);
+
+ action.mode = ModeChangedAction::Mode::EmoteOnly;
+ action.state = ModeChangedAction::State::Off;
+
+ this->signals_.moderation.modeChanged.invoke(action);
+ };
+
+ this->moderationActionHandlers["emoteonly"] = [this](const auto &data,
+ const auto &roomID) {
+ ModeChangedAction action(data, roomID);
+
+ action.mode = ModeChangedAction::Mode::EmoteOnly;
+ action.state = ModeChangedAction::State::On;
+
+ this->signals_.moderation.modeChanged.invoke(action);
+ };
+
+ this->moderationActionHandlers["unmod"] = [this](const auto &data,
+ const auto &roomID) {
+ ModerationStateAction action(data, roomID);
+
+ action.target.id = data.value("target_user_id").toString();
+
+ const auto args = data.value("args").toArray();
+
+ if (args.isEmpty())
+ {
+ return;
+ }
+
+ action.target.login = args[0].toString();
+
+ action.modded = false;
+
+ this->signals_.moderation.moderationStateChanged.invoke(action);
+ };
+
+ this->moderationActionHandlers["mod"] = [this](const auto &data,
+ const auto &roomID) {
+ ModerationStateAction action(data, roomID);
+ action.modded = true;
+
+ auto innerType = data.value("type").toString();
+ if (innerType == "chat_login_moderation")
+ {
+ // Don't display the old message type
+ return;
+ }
+
+ action.target.id = data.value("target_user_id").toString();
+ action.target.login = data.value("target_user_login").toString();
+
+ this->signals_.moderation.moderationStateChanged.invoke(action);
+ };
+
+ this->moderationActionHandlers["timeout"] = [this](const auto &data,
+ const auto &roomID) {
+ BanAction action(data, roomID);
+
+ action.source.id = data.value("created_by_user_id").toString();
+ action.source.login = data.value("created_by").toString();
+
+ action.target.id = data.value("target_user_id").toString();
+
+ const auto args = data.value("args").toArray();
+
+ if (args.size() < 2)
+ {
+ return;
+ }
+
+ action.target.login = args[0].toString();
+ bool ok;
+ action.duration = args[1].toString().toUInt(&ok, 10);
+ action.reason = args[2].toString(); // May be omitted
+
+ this->signals_.moderation.userBanned.invoke(action);
+ };
+
+ this->moderationActionHandlers["delete"] = [this](const auto &data,
+ const auto &roomID) {
+ DeleteAction action(data, roomID);
+
+ action.source.id = data.value("created_by_user_id").toString();
+ action.source.login = data.value("created_by").toString();
+
+ action.target.id = data.value("target_user_id").toString();
+
+ const auto args = data.value("args").toArray();
+
+ if (args.size() < 3)
+ {
+ return;
+ }
+
+ action.target.login = args[0].toString();
+ bool ok;
+ action.messageText = args[1].toString();
+ action.messageId = args[2].toString();
+
+ this->signals_.moderation.messageDeleted.invoke(action);
+ };
+
+ this->moderationActionHandlers["ban"] = [this](const auto &data,
+ const auto &roomID) {
+ BanAction action(data, roomID);
+
+ action.source.id = data.value("created_by_user_id").toString();
+ action.source.login = data.value("created_by").toString();
+
+ action.target.id = data.value("target_user_id").toString();
+
+ const auto args = data.value("args").toArray();
+
+ if (args.isEmpty())
+ {
+ return;
+ }
+
+ action.target.login = args[0].toString();
+ action.reason = args[1].toString(); // May be omitted
+
+ this->signals_.moderation.userBanned.invoke(action);
+ };
+
+ this->moderationActionHandlers["unban"] = [this](const auto &data,
+ const auto &roomID) {
+ UnbanAction action(data, roomID);
+
+ action.source.id = data.value("created_by_user_id").toString();
+ action.source.login = data.value("created_by").toString();
+
+ action.target.id = data.value("target_user_id").toString();
+
+ action.previousState = UnbanAction::Banned;
+
+ const auto args = data.value("args").toArray();
+
+ if (args.isEmpty())
+ {
+ return;
+ }
+
+ action.target.login = args[0].toString();
+
+ this->signals_.moderation.userUnbanned.invoke(action);
+ };
+
+ this->moderationActionHandlers["untimeout"] = [this](const auto &data,
+ const auto &roomID) {
+ UnbanAction action(data, roomID);
+
+ action.source.id = data.value("created_by_user_id").toString();
+ action.source.login = data.value("created_by").toString();
+
+ action.target.id = data.value("target_user_id").toString();
+
+ action.previousState = UnbanAction::TimedOut;
+
+ const auto args = data.value("args").toArray();
+
+ if (args.isEmpty())
+ {
+ return;
+ }
+
+ action.target.login = args[0].toString();
+
+ this->signals_.moderation.userUnbanned.invoke(action);
+ };
+
+ this->moderationActionHandlers["automod_rejected"] =
+ [this](const auto &data, const auto &roomID) {
+ AutomodAction action(data, roomID);
+
+ action.source.id = data.value("created_by_user_id").toString();
+ action.source.login = data.value("created_by").toString();
+
+ action.target.id = data.value("target_user_id").toString();
+
+ const auto args = data.value("args").toArray();
+
+ if (args.isEmpty())
+ {
+ return;
+ }
+
+ action.msgID = data.value("msg_id").toString();
+
+ if (action.msgID.isEmpty())
+ {
+ // Missing required msg_id parameter
+ return;
+ }
+
+ action.target.login = args[0].toString();
+ action.message = args[1].toString(); // May be omitted
+ action.reason = args[2].toString(); // May be omitted
+
+ this->signals_.moderation.autoModMessageBlocked.invoke(action);
+ };
+
+ this->moderationActionHandlers["automod_message_rejected"] =
+ [this](const auto &data, const auto &roomID) {
+ AutomodInfoAction action(data, roomID);
+ action.type = AutomodInfoAction::OnHold;
+ this->signals_.moderation.automodInfoMessage.invoke(action);
+ };
+
+ this->moderationActionHandlers["automod_message_denied"] =
+ [this](const auto &data, const auto &roomID) {
+ AutomodInfoAction action(data, roomID);
+ action.type = AutomodInfoAction::Denied;
+ this->signals_.moderation.automodInfoMessage.invoke(action);
+ };
+
+ this->moderationActionHandlers["automod_message_approved"] =
+ [this](const auto &data, const auto &roomID) {
+ AutomodInfoAction action(data, roomID);
+ action.type = AutomodInfoAction::Approved;
+ this->signals_.moderation.automodInfoMessage.invoke(action);
+ };
+
+ this->channelTermsActionHandlers["add_permitted_term"] =
+ [this](const auto &data, const auto &roomID) {
+ // This term got a pass through automod
+ AutomodUserAction action(data, roomID);
+ action.source.id = data.value("created_by_user_id").toString();
+ action.source.login = data.value("created_by").toString();
+
+ action.type = AutomodUserAction::AddPermitted;
+ action.message = data.value("text").toString();
+ action.source.login = data.value("requester_login").toString();
+
+ this->signals_.moderation.automodUserMessage.invoke(action);
+ };
+
+ this->channelTermsActionHandlers["add_blocked_term"] =
+ [this](const auto &data, const auto &roomID) {
+ // A term has been added
+ AutomodUserAction action(data, roomID);
+ action.source.id = data.value("created_by_user_id").toString();
+ action.source.login = data.value("created_by").toString();
+
+ action.type = AutomodUserAction::AddBlocked;
+ action.message = data.value("text").toString();
+ action.source.login = data.value("requester_login").toString();
+
+ this->signals_.moderation.automodUserMessage.invoke(action);
+ };
+
+ this->moderationActionHandlers["delete_permitted_term"] =
+ [this](const auto &data, const auto &roomID) {
+ // This term got deleted
+ AutomodUserAction action(data, roomID);
+ action.source.id = data.value("created_by_user_id").toString();
+ action.source.login = data.value("created_by").toString();
+
+ const auto args = data.value("args").toArray();
+ action.type = AutomodUserAction::RemovePermitted;
+
+ if (args.isEmpty())
+ {
+ return;
+ }
+
+ action.message = args[0].toString();
+
+ this->signals_.moderation.automodUserMessage.invoke(action);
+ };
+
+ this->channelTermsActionHandlers["delete_permitted_term"] =
+ [this](const auto &data, const auto &roomID) {
+ // This term got deleted
+ AutomodUserAction action(data, roomID);
+ action.source.id = data.value("created_by_user_id").toString();
+ action.source.login = data.value("created_by").toString();
+
+ action.type = AutomodUserAction::RemovePermitted;
+ action.message = data.value("text").toString();
+ action.source.login = data.value("requester_login").toString();
+
+ this->signals_.moderation.automodUserMessage.invoke(action);
+ };
+
+ this->moderationActionHandlers["delete_blocked_term"] =
+ [this](const auto &data, const auto &roomID) {
+ // This term got deleted
+ AutomodUserAction action(data, roomID);
+
+ action.source.id = data.value("created_by_user_id").toString();
+ action.source.login = data.value("created_by").toString();
+
+ const auto args = data.value("args").toArray();
+ action.type = AutomodUserAction::RemoveBlocked;
+
+ if (args.isEmpty())
+ {
+ return;
+ }
+
+ action.message = args[0].toString();
+
+ this->signals_.moderation.automodUserMessage.invoke(action);
+ };
+ this->channelTermsActionHandlers["delete_blocked_term"] =
+ [this](const auto &data, const auto &roomID) {
+ // This term got deleted
+ AutomodUserAction action(data, roomID);
+
+ action.source.id = data.value("created_by_user_id").toString();
+ action.source.login = data.value("created_by").toString();
+
+ action.type = AutomodUserAction::RemoveBlocked;
+ action.message = data.value("text").toString();
+ action.source.login = data.value("requester_login").toString();
+
+ this->signals_.moderation.automodUserMessage.invoke(action);
+ };
+
+ // We don't get this one anymore or anything similiar
+ // We need some new topic so we can listen
+ //
+ //this->moderationActionHandlers["modified_automod_properties"] =
+ // [this](const auto &data, const auto &roomID) {
+ // // The automod settings got modified
+ // AutomodUserAction action(data, roomID);
+ // getCreatedByUser(data, action.source);
+ // action.type = AutomodUserAction::Properties;
+ // this->signals_.moderation.automodUserMessage.invoke(action);
+ // };
+
+ this->moderationActionHandlers["denied_automod_message"] =
+ [](const auto &data, const auto &roomID) {
+ // This message got denied by a moderator
+ // qCDebug(chatterinoPubSub) << rj::stringify(data);
+ };
+
+ this->moderationActionHandlers["approved_automod_message"] =
+ [](const auto &data, const auto &roomID) {
+ // This message got approved by a moderator
+ // qCDebug(chatterinoPubSub) << rj::stringify(data);
+ };
+
+ this->websocketClient.set_access_channels(websocketpp::log::alevel::all);
+ this->websocketClient.clear_access_channels(
+ websocketpp::log::alevel::frame_payload |
+ websocketpp::log::alevel::frame_header);
+
+ this->websocketClient.init_asio();
+
+ // SSL Handshake
+ this->websocketClient.set_tls_init_handler(
+ bind(&PubSub::onTLSInit, this, ::_1));
+
+ this->websocketClient.set_message_handler(
+ bind(&PubSub::onMessage, this, ::_1, ::_2));
+ this->websocketClient.set_open_handler(
+ bind(&PubSub::onConnectionOpen, this, ::_1));
+ this->websocketClient.set_close_handler(
+ bind(&PubSub::onConnectionClose, this, ::_1));
+ this->websocketClient.set_fail_handler(
+ bind(&PubSub::onConnectionFail, this, ::_1));
+}
+
+void PubSub::addClient()
+{
+ if (this->addingClient)
+ {
+ return;
+ }
+
+ qCDebug(chatterinoPubSub) << "Adding an additional client";
+
+ this->addingClient = true;
+
+ websocketpp::lib::error_code ec;
+ auto con =
+ this->websocketClient.get_connection(this->host_.toStdString(), ec);
+
+ if (ec)
+ {
+ qCDebug(chatterinoPubSub)
+ << "Unable to establish connection:" << ec.message().c_str();
+ return;
+ }
+
+ this->websocketClient.connect(con);
+}
+
+void PubSub::start()
+{
+ this->work = std::make_shared(
+ this->websocketClient.get_io_service());
+ this->mainThread.reset(
+ new std::thread(std::bind(&PubSub::runThread, this)));
+}
+
+void PubSub::stop()
+{
+ this->stopping_ = true;
+
+ for (const auto &client : this->clients)
+ {
+ client.second->close("Shutting down");
+ }
+
+ this->work.reset();
+
+ if (this->mainThread->joinable())
+ {
+ this->mainThread->join();
+ }
+
+ assert(this->clients.empty());
+}
+
+void PubSub::unlistenAllModerationActions()
+{
+ for (const auto &p : this->clients)
+ {
+ const auto &client = p.second;
+ if (const auto &[topics, nonce] =
+ client->unlistenPrefix("chat_moderator_actions.");
+ !topics.empty())
+ {
+ this->registerNonce(nonce, {
+ client,
+ "UNLISTEN",
+ topics,
+ topics.size(),
+ });
+ }
+ }
+}
+
+void PubSub::unlistenWhispers()
+{
+ for (const auto &p : this->clients)
+ {
+ const auto &client = p.second;
+ if (const auto &[topics, nonce] = client->unlistenPrefix("whispers.");
+ !topics.empty())
+ {
+ this->registerNonce(nonce, {
+ client,
+ "UNLISTEN",
+ topics,
+ topics.size(),
+ });
+ }
+ }
+}
+
+bool PubSub::listenToWhispers()
+{
+ if (this->userID_.isEmpty())
+ {
+ qCDebug(chatterinoPubSub)
+ << "Unable to listen to whispers topic, no user logged in";
+ return false;
+ }
+
+ static const QString topicFormat("whispers.%1");
+ auto topic = topicFormat.arg(this->userID_);
+
+ qCDebug(chatterinoPubSub) << "Listen to whispers" << topic;
+
+ this->listenToTopic(topic);
+
+ return true;
+}
+
+void PubSub::listenToChannelModerationActions(const QString &channelID)
+{
+ if (this->userID_.isEmpty())
+ {
+ qCDebug(chatterinoPubSub) << "Unable to listen to moderation actions "
+ "topic, no user logged in";
+ return;
+ }
+
+ static const QString topicFormat("chat_moderator_actions.%1.%2");
+ assert(!channelID.isEmpty());
+
+ auto topic = topicFormat.arg(this->userID_, channelID);
+
+ if (this->isListeningToTopic(topic))
+ {
+ return;
+ }
+
+ qCDebug(chatterinoPubSub) << "Listen to topic" << topic;
+
+ this->listenToTopic(topic);
+}
+
+void PubSub::listenToAutomod(const QString &channelID)
+{
+ if (this->userID_.isEmpty())
+ {
+ qCDebug(chatterinoPubSub)
+ << "Unable to listen to automod topic, no user logged in";
+ return;
+ }
+
+ static const QString topicFormat("automod-queue.%1.%2");
+ assert(!channelID.isEmpty());
+
+ auto topic = topicFormat.arg(this->userID_, channelID);
+
+ if (this->isListeningToTopic(topic))
+ {
+ return;
+ }
+
+ qCDebug(chatterinoPubSub) << "Listen to topic" << topic;
+
+ this->listenToTopic(topic);
+}
+
+void PubSub::listenToChannelPointRewards(const QString &channelID)
+{
+ static const QString topicFormat("community-points-channel-v1.%1");
+ assert(!channelID.isEmpty());
+
+ auto topic = topicFormat.arg(channelID);
+
+ if (this->isListeningToTopic(topic))
+ {
+ return;
+ }
+ qCDebug(chatterinoPubSub) << "Listen to topic" << topic;
+
+ this->listenToTopic(topic);
+}
+
+void PubSub::listen(PubSubListenMessage msg)
+{
+ if (this->tryListen(msg))
+ {
+ return;
+ }
+
+ this->addClient();
+
+ std::copy(msg.topics.begin(), msg.topics.end(),
+ std::back_inserter(this->requests));
+
+ DebugCount::increase("PubSub topic backlog", msg.topics.size());
+}
+
+bool PubSub::tryListen(PubSubListenMessage msg)
+{
+ for (const auto &p : this->clients)
+ {
+ const auto &client = p.second;
+ if (auto success = client->listen(msg); success)
+ {
+ this->registerNonce(msg.nonce, {
+ client,
+ "LISTEN",
+ msg.topics,
+ msg.topics.size(),
+ });
+ return true;
+ }
+ }
+
+ return false;
+}
+
+void PubSub::registerNonce(QString nonce, NonceInfo info)
+{
+ this->nonces_[nonce] = std::move(info);
+}
+
+boost::optional PubSub::findNonceInfo(QString nonce)
+{
+ // TODO: This should also DELETE the nonceinfo from the map
+ auto it = this->nonces_.find(nonce);
+
+ if (it == this->nonces_.end())
+ {
+ return boost::none;
+ }
+
+ return it->second;
+}
+
+bool PubSub::isListeningToTopic(const QString &topic)
+{
+ for (const auto &p : this->clients)
+ {
+ const auto &client = p.second;
+ if (client->isListeningToTopic(topic))
+ {
+ return true;
+ }
+ }
+
+ return false;
+}
+
+void PubSub::onMessage(websocketpp::connection_hdl hdl,
+ WebsocketMessagePtr websocketMessage)
+{
+ this->diag.messagesReceived += 1;
+
+ const auto &payload =
+ QString::fromStdString(websocketMessage->get_payload());
+
+ auto oMessage = parsePubSubBaseMessage(payload);
+
+ if (!oMessage)
+ {
+ qCDebug(chatterinoPubSub)
+ << "Unable to parse incoming pubsub message" << payload;
+ this->diag.messagesFailedToParse += 1;
+ return;
+ }
+
+ auto message = *oMessage;
+
+ switch (message.type)
+ {
+ case PubSubMessage::Type::Pong: {
+ auto clientIt = this->clients.find(hdl);
+
+ // If this assert goes off, there's something wrong with the connection
+ // creation/preserving code KKona
+ assert(clientIt != this->clients.end());
+
+ auto &client = *clientIt;
+
+ client.second->handlePong();
+ }
+ break;
+
+ case PubSubMessage::Type::Response: {
+ this->handleResponse(message);
+ }
+ break;
+
+ case PubSubMessage::Type::Message: {
+ auto oMessageMessage = message.toInner();
+ if (!oMessageMessage)
+ {
+ qCDebug(chatterinoPubSub) << "Malformed MESSAGE:" << payload;
+ return;
+ }
+
+ this->handleMessageResponse(*oMessageMessage);
+ }
+ break;
+
+ case PubSubMessage::Type::INVALID:
+ default: {
+ qCDebug(chatterinoPubSub)
+ << "Unknown message type:" << message.typeString;
+ }
+ break;
+ }
+}
+
+void PubSub::onConnectionOpen(WebsocketHandle hdl)
+{
+ this->diag.connectionsOpened += 1;
+
+ DebugCount::increase("PubSub connections");
+ this->addingClient = false;
+
+ this->connectBackoff.reset();
+
+ auto client = std::make_shared(this->websocketClient, hdl,
+ this->clientOptions_);
+
+ // We separate the starting from the constructor because we will want to use
+ // shared_from_this
+ client->start();
+
+ this->clients.emplace(hdl, client);
+
+ qCDebug(chatterinoPubSub) << "PubSub connection opened!";
+
+ const auto topicsToTake =
+ (std::min)(this->requests.size(), PubSubClient::MAX_LISTENS);
+
+ std::vector newTopics(
+ std::make_move_iterator(this->requests.begin()),
+ std::make_move_iterator(this->requests.begin() + topicsToTake));
+
+ this->requests.erase(this->requests.begin(),
+ this->requests.begin() + topicsToTake);
+
+ PubSubListenMessage msg(newTopics);
+ msg.setToken(this->token_);
+
+ if (auto success = client->listen(msg); !success)
+ {
+ qCWarning(chatterinoPubSub) << "Failed to listen to " << topicsToTake
+ << "new topics on new client";
+ return;
+ }
+ DebugCount::decrease("PubSub topic backlog", msg.topics.size());
+
+ this->registerNonce(msg.nonce, {
+ client,
+ "LISTEN",
+ msg.topics,
+ topicsToTake,
+ });
+
+ if (!this->requests.empty())
+ {
+ this->addClient();
+ }
+}
+
+void PubSub::onConnectionFail(WebsocketHandle hdl)
+{
+ this->diag.connectionsFailed += 1;
+
+ DebugCount::increase("PubSub failed connections");
+ if (auto conn = this->websocketClient.get_con_from_hdl(std::move(hdl)))
+ {
+ qCDebug(chatterinoPubSub) << "PubSub connection attempt failed (error: "
+ << conn->get_ec().message().c_str() << ")";
+ }
+ else
+ {
+ qCDebug(chatterinoPubSub)
+ << "PubSub connection attempt failed but we can't "
+ "get the connection from a handle.";
+ }
+
+ this->addingClient = false;
+ if (!this->requests.empty())
+ {
+ runAfter(this->websocketClient.get_io_service(),
+ this->connectBackoff.next(), [this](auto timer) {
+ this->addClient(); //
+ });
+ }
+}
+
+void PubSub::onConnectionClose(WebsocketHandle hdl)
+{
+ qCDebug(chatterinoPubSub) << "Connection closed";
+ this->diag.connectionsClosed += 1;
+
+ DebugCount::decrease("PubSub connections");
+ auto clientIt = this->clients.find(hdl);
+
+ // If this assert goes off, there's something wrong with the connection
+ // creation/preserving code KKona
+ assert(clientIt != this->clients.end());
+
+ auto client = clientIt->second;
+
+ this->clients.erase(clientIt);
+
+ client->stop();
+
+ if (!this->stopping_)
+ {
+ auto clientListeners = client->getListeners();
+ for (const auto &listener : clientListeners)
+ {
+ this->listenToTopic(listener.topic);
+ }
+ }
+}
+
+PubSub::WebsocketContextPtr PubSub::onTLSInit(websocketpp::connection_hdl hdl)
+{
+ WebsocketContextPtr ctx(
+ new boost::asio::ssl::context(boost::asio::ssl::context::tlsv12));
+
+ try
+ {
+ ctx->set_options(boost::asio::ssl::context::default_workarounds |
+ boost::asio::ssl::context::no_sslv2 |
+ boost::asio::ssl::context::single_dh_use);
+ }
+ catch (const std::exception &e)
+ {
+ qCDebug(chatterinoPubSub)
+ << "Exception caught in OnTLSInit:" << e.what();
+ }
+
+ return ctx;
+}
+
+void PubSub::handleResponse(const PubSubMessage &message)
+{
+ const bool failed = !message.error.isEmpty();
+
+ if (failed)
+ {
+ qCDebug(chatterinoPubSub)
+ << "Error" << message.error << "on nonce" << message.nonce;
+ }
+
+ if (message.nonce.isEmpty())
+ {
+ // Can't do any specific handling since no nonce was specified
+ return;
+ }
+
+ if (auto oInfo = this->findNonceInfo(message.nonce); oInfo)
+ {
+ const auto info = *oInfo;
+ auto client = info.client.lock();
+ if (!client)
+ {
+ qCDebug(chatterinoPubSub) << "Client associated with nonce"
+ << message.nonce << "is no longer alive";
+ return;
+ }
+ if (info.messageType == "LISTEN")
+ {
+ client->handleListenResponse(message);
+ this->handleListenResponse(info, failed);
+ }
+ else if (info.messageType == "UNLISTEN")
+ {
+ client->handleUnlistenResponse(message);
+ this->handleUnlistenResponse(info, failed);
+ }
+ else
+ {
+ qCDebug(chatterinoPubSub)
+ << "Unhandled nonce message type" << info.messageType;
+ }
+
+ return;
+ }
+
+ qCDebug(chatterinoPubSub) << "Response on unused" << message.nonce
+ << "client/topic listener mismatch?";
+}
+
+void PubSub::handleListenResponse(const NonceInfo &info, bool failed)
+{
+ DebugCount::decrease("PubSub topic pending listens", info.topicCount);
+ if (failed)
+ {
+ this->diag.failedListenResponses++;
+ DebugCount::increase("PubSub topic failed listens", info.topicCount);
+ }
+ else
+ {
+ this->diag.listenResponses++;
+ DebugCount::increase("PubSub topic listening", info.topicCount);
+ }
+}
+
+void PubSub::handleUnlistenResponse(const NonceInfo &info, bool failed)
+{
+ this->diag.unlistenResponses++;
+ DebugCount::decrease("PubSub topic pending unlistens", info.topicCount);
+ if (failed)
+ {
+ qCDebug(chatterinoPubSub) << "Failed unlistening to" << info.topics;
+ DebugCount::increase("PubSub topic failed unlistens", info.topicCount);
+ }
+ else
+ {
+ qCDebug(chatterinoPubSub) << "Successful unlistened to" << info.topics;
+ DebugCount::decrease("PubSub topic listening", info.topicCount);
+ }
+}
+
+void PubSub::handleMessageResponse(const PubSubMessageMessage &message)
+{
+ QString topic = message.topic;
+
+ if (topic.startsWith("whispers."))
+ {
+ auto oInnerMessage = message.toInner();
+ if (!oInnerMessage)
+ {
+ return;
+ }
+ auto whisperMessage = *oInnerMessage;
+
+ switch (whisperMessage.type)
+ {
+ case PubSubWhisperMessage::Type::WhisperReceived: {
+ this->signals_.whisper.received.invoke(whisperMessage);
+ }
+ break;
+ case PubSubWhisperMessage::Type::WhisperSent: {
+ this->signals_.whisper.sent.invoke(whisperMessage);
+ }
+ break;
+ case PubSubWhisperMessage::Type::Thread: {
+ // Handle thread?
+ }
+ break;
+
+ case PubSubWhisperMessage::Type::INVALID:
+ default: {
+ qCDebug(chatterinoPubSub)
+ << "Invalid whisper type:" << whisperMessage.typeString;
+ }
+ break;
+ }
+ }
+ else if (topic.startsWith("chat_moderator_actions."))
+ {
+ auto oInnerMessage =
+ message.toInner();
+ if (!oInnerMessage)
+ {
+ return;
+ }
+
+ auto innerMessage = *oInnerMessage;
+ auto topicParts = topic.split(".");
+ assert(topicParts.length() == 3);
+
+ // Channel ID where the moderator actions are coming from
+ auto channelID = topicParts[2];
+
+ switch (innerMessage.type)
+ {
+ case PubSubChatModeratorActionMessage::Type::ModerationAction: {
+ QString moderationAction =
+ innerMessage.data.value("moderation_action").toString();
+
+ auto handlerIt =
+ this->moderationActionHandlers.find(moderationAction);
+
+ if (handlerIt == this->moderationActionHandlers.end())
+ {
+ qCDebug(chatterinoPubSub)
+ << "No handler found for moderation action"
+ << moderationAction;
+ return;
+ }
+ // Invoke handler function
+ handlerIt->second(innerMessage.data, channelID);
+ }
+ break;
+ case PubSubChatModeratorActionMessage::Type::ChannelTermsAction: {
+ QString channelTermsAction =
+ innerMessage.data.value("type").toString();
+
+ auto handlerIt =
+ this->channelTermsActionHandlers.find(channelTermsAction);
+
+ if (handlerIt == this->channelTermsActionHandlers.end())
+ {
+ qCDebug(chatterinoPubSub)
+ << "No handler found for channel terms action"
+ << channelTermsAction;
+ return;
+ }
+ // Invoke handler function
+ handlerIt->second(innerMessage.data, channelID);
+ }
+ break;
+
+ case PubSubChatModeratorActionMessage::Type::INVALID:
+ default: {
+ qCDebug(chatterinoPubSub)
+ << "Invalid whisper type:" << innerMessage.typeString;
+ }
+ break;
+ }
+ }
+ else if (topic.startsWith("community-points-channel-v1."))
+ {
+ auto oInnerMessage =
+ message.toInner();
+ if (!oInnerMessage)
+ {
+ return;
+ }
+
+ auto innerMessage = *oInnerMessage;
+
+ switch (innerMessage.type)
+ {
+ case PubSubCommunityPointsChannelV1Message::Type::RewardRedeemed: {
+ auto redemption =
+ innerMessage.data.value("redemption").toObject();
+ this->signals_.pointReward.redeemed.invoke(redemption);
+ }
+ break;
+
+ case PubSubCommunityPointsChannelV1Message::Type::INVALID:
+ default: {
+ qCDebug(chatterinoPubSub)
+ << "Invalid point event type:" << innerMessage.typeString;
+ }
+ break;
+ }
+ }
+ else if (topic.startsWith("automod-queue."))
+ {
+ auto oInnerMessage = message.toInner();
+ if (!oInnerMessage)
+ {
+ return;
+ }
+
+ auto innerMessage = *oInnerMessage;
+
+ auto topicParts = topic.split(".");
+ assert(topicParts.length() == 3);
+
+ // Channel ID where the moderator actions are coming from
+ auto channelID = topicParts[2];
+
+ this->signals_.moderation.autoModMessageCaught.invoke(innerMessage,
+ channelID);
+ }
+ else
+ {
+ qCDebug(chatterinoPubSub) << "Unknown topic:" << topic;
+ return;
+ }
+}
+
+void PubSub::runThread()
+{
+ qCDebug(chatterinoPubSub) << "Start pubsub manager thread";
+ this->websocketClient.run();
+ qCDebug(chatterinoPubSub) << "Done with pubsub manager thread";
+}
+
+void PubSub::listenToTopic(const QString &topic)
+{
+ PubSubListenMessage msg({topic});
+ msg.setToken(this->token_);
+
+ this->listen(std::move(msg));
+}
+
+} // namespace chatterino
diff --git a/src/providers/twitch/PubSubManager.hpp b/src/providers/twitch/PubSubManager.hpp
new file mode 100644
index 000000000..d078e519c
--- /dev/null
+++ b/src/providers/twitch/PubSubManager.hpp
@@ -0,0 +1,198 @@
+#pragma once
+
+#include "providers/twitch/ChatterinoWebSocketppLogger.hpp"
+#include "providers/twitch/PubSubActions.hpp"
+#include "providers/twitch/PubSubClient.hpp"
+#include "providers/twitch/PubSubClientOptions.hpp"
+#include "providers/twitch/PubSubMessages.hpp"
+#include "providers/twitch/PubSubWebsocket.hpp"
+#include "providers/twitch/TwitchAccount.hpp"
+#include "util/ExponentialBackoff.hpp"
+
+#include
+#include
+#include
+#include
+
+#include
+#include
+#include