Improve Twitch PubSub connection reliability (#3643)

Co-authored-by: Rasmus Karlsson <rasmus.karlsson@pajlada.com>
This commit is contained in:
nerix 2022-05-07 17:22:39 +02:00 committed by GitHub
parent 4aa5b04e37
commit f97780d84e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
64 changed files with 3094 additions and 2126 deletions

View file

@ -4,6 +4,9 @@ name: Test
on:
pull_request:
env:
TWITCH_PUBSUB_SERVER_IMAGE: ghcr.io/chatterino/twitch-pubsub-server-test:v1.0.3
jobs:
test:
runs-on: ${{ matrix.os }}
@ -77,6 +80,8 @@ jobs:
- name: Test (Ubuntu)
if: startsWith(matrix.os, 'ubuntu')
run: |
docker pull ${{ env.TWITCH_PUBSUB_SERVER_IMAGE }}
docker run --network=host --detach ${{ env.TWITCH_PUBSUB_SERVER_IMAGE }}
./bin/chatterino-test --platform minimal
working-directory: build-test
shell: bash

3
.gitmodules vendored
View file

@ -29,3 +29,6 @@
[submodule "cmake/sanitizers-cmake"]
path = cmake/sanitizers-cmake
url = https://github.com/arsenm/sanitizers-cmake
[submodule "lib/magic_enum"]
path = lib/magic_enum
url = https://github.com/Neargye/magic_enum

View file

@ -7,6 +7,7 @@
- Minor: Adjust large stream thumbnail to 16:9 (#3655)
- Minor: Fixed being unable to load Twitch Usercards from the `/mentions` tab. (#3623)
- Minor: Add information about the user's operating system in the About page. (#3663)
- Bugfix: Connection to Twitch PubSub now recovers more reliably. (#3643)
- Minor: Added chatter count for each category in viewer list. (#3683)
- Minor: Sorted usernames in /vips message to be case-insensitive. (#3696)
- Minor: Added option to open a user's chat in a new tab from the usercard profile picture context menu. (#3625)

View file

@ -112,6 +112,7 @@ endif ()
find_package(PajladaSerialize REQUIRED)
find_package(PajladaSignals REQUIRED)
find_package(LRUCache REQUIRED)
find_package(MagicEnum REQUIRED)
if (USE_SYSTEM_PAJLADA_SETTINGS)
find_package(PajladaSettings REQUIRED)

View file

@ -96,6 +96,7 @@ include(lib/signals.pri)
include(lib/settings.pri)
include(lib/serialize.pri)
include(lib/lrucache.pri)
include(lib/magic_enum.pri)
include(lib/winsdk.pri)
include(lib/rapidjson.pri)
include(lib/qtkeychain.pri)
@ -212,9 +213,16 @@ SOURCES += \
src/providers/twitch/api/Helix.cpp \
src/providers/twitch/ChannelPointReward.cpp \
src/providers/twitch/IrcMessageHandler.cpp \
src/providers/twitch/PubsubActions.cpp \
src/providers/twitch/PubsubClient.cpp \
src/providers/twitch/PubsubHelpers.cpp \
src/providers/twitch/PubSubActions.cpp \
src/providers/twitch/PubSubClient.cpp \
src/providers/twitch/PubSubManager.cpp \
src/providers/twitch/pubsubmessages/AutoMod.cpp \
src/providers/twitch/pubsubmessages/Base.cpp \
src/providers/twitch/pubsubmessages/ChannelPoints.cpp \
src/providers/twitch/pubsubmessages/ChatModeratorAction.cpp \
src/providers/twitch/pubsubmessages/Listen.cpp \
src/providers/twitch/pubsubmessages/Unlisten.cpp \
src/providers/twitch/pubsubmessages/Whisper.cpp \
src/providers/twitch/TwitchAccount.cpp \
src/providers/twitch/TwitchAccountManager.cpp \
src/providers/twitch/TwitchBadge.cpp \
@ -432,6 +440,7 @@ HEADERS += \
src/messages/search/LinkPredicate.hpp \
src/messages/search/MessageFlagsPredicate.hpp \
src/messages/search/MessagePredicate.hpp \
src/messages/search/RegexPredicate.hpp \
src/messages/search/SubstringPredicate.hpp \
src/messages/Selection.hpp \
src/messages/SharedMessageBuilder.hpp \
@ -458,9 +467,21 @@ HEADERS += \
src/providers/twitch/ChatterinoWebSocketppLogger.hpp \
src/providers/twitch/EmoteValue.hpp \
src/providers/twitch/IrcMessageHandler.hpp \
src/providers/twitch/PubsubActions.hpp \
src/providers/twitch/PubsubClient.hpp \
src/providers/twitch/PubsubHelpers.hpp \
src/providers/twitch/PubSubActions.hpp \
src/providers/twitch/PubSubClient.hpp \
src/providers/twitch/PubSubClientOptions.hpp \
src/providers/twitch/PubSubHelpers.hpp \
src/providers/twitch/PubSubManager.hpp \
src/providers/twitch/PubSubMessages.hpp \
src/providers/twitch/pubsubmessages/AutoMod.hpp \
src/providers/twitch/pubsubmessages/Base.hpp \
src/providers/twitch/pubsubmessages/ChannelPoints.hpp \
src/providers/twitch/pubsubmessages/ChatModeratorAction.hpp \
src/providers/twitch/pubsubmessages/Listen.hpp \
src/providers/twitch/pubsubmessages/Message.hpp \
src/providers/twitch/pubsubmessages/Unlisten.hpp \
src/providers/twitch/pubsubmessages/Whisper.hpp \
src/providers/twitch/PubSubWebsocket.hpp \
src/providers/twitch/TwitchAccount.hpp \
src/providers/twitch/TwitchAccountManager.hpp \
src/providers/twitch/TwitchBadge.hpp \

14
cmake/FindMagicEnum.cmake Normal file
View file

@ -0,0 +1,14 @@
include(FindPackageHandleStandardArgs)
find_path(MagicEnum_INCLUDE_DIR magic_enum.hpp HINTS ${CMAKE_SOURCE_DIR}/lib/magic_enum/include)
find_package_handle_standard_args(MagicEnum DEFAULT_MSG MagicEnum_INCLUDE_DIR)
if (MagicEnum_FOUND)
add_library(MagicEnum INTERFACE IMPORTED)
set_target_properties(MagicEnum PROPERTIES
INTERFACE_INCLUDE_DIRECTORIES "${MagicEnum_INCLUDE_DIR}"
)
endif ()
mark_as_advanced(MagicEnum_INCLUDE_DIR)

1
lib/magic_enum Submodule

@ -0,0 +1 @@
Subproject commit b2ac76235b2261305bdfe562eb5982c808d07e73

1
lib/magic_enum.pri Normal file
View file

@ -0,0 +1 @@
INCLUDEPATH += $$PWD/magic_enum/include/

View file

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2019 - 2022 Daniil Goncharov
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View file

@ -59,6 +59,7 @@
<file>licenses/emoji-data-source.txt</file>
<file>licenses/libcommuni_BSD3.txt</file>
<file>licenses/lrucache.txt</file>
<file>licenses/magic_enum.txt</file>
<file>licenses/openssl.txt</file>
<file>licenses/pajlada_settings.txt</file>
<file>licenses/pajlada_signals.txt</file>

View file

@ -17,7 +17,7 @@
#include "providers/ffz/FfzBadges.hpp"
#include "providers/ffz/FfzEmotes.hpp"
#include "providers/irc/Irc2.hpp"
#include "providers/twitch/PubsubClient.hpp"
#include "providers/twitch/PubSubManager.hpp"
#include "providers/twitch/TwitchIrcServer.hpp"
#include "providers/twitch/TwitchMessageBuilder.hpp"
#include "singletons/Emotes.hpp"
@ -31,6 +31,7 @@
#include "singletons/Toasts.hpp"
#include "singletons/Updates.hpp"
#include "singletons/WindowManager.hpp"
#include "util/Helpers.hpp"
#include "util/IsBigEndian.hpp"
#include "util/PostToThread.hpp"
#include "util/RapidjsonHelpers.hpp"
@ -137,7 +138,7 @@ void Application::initialize(Settings &settings, Paths &paths)
{
this->initNm(paths);
}
this->initPubsub();
this->initPubSub();
}
int Application::run(QApplication &qtApp)
@ -194,7 +195,7 @@ void Application::initNm(Paths &paths)
#endif
}
void Application::initPubsub()
void Application::initPubSub()
{
this->twitch->pubsub->signals_.moderation.chatCleared.connect(
[this](const auto &action) {
@ -331,8 +332,7 @@ void Application::initPubsub()
});
});
this->twitch->pubsub->signals_.moderation.automodMessage.connect(
[&](const auto &action) {
const auto handleAutoModMessage = [&](const auto &action) {
auto chan = this->twitch->getChannelOrEmptyByID(action.roomID);
if (chan->isEmpty())
@ -345,8 +345,93 @@ void Application::initPubsub()
chan->addMessage(p.first);
chan->addMessage(p.second);
});
};
this->twitch->pubsub->signals_.moderation.autoModMessageCaught.connect(
[&](const auto &msg, const QString &channelID) {
switch (msg.type)
{
case PubSubAutoModQueueMessage::Type::AutoModCaughtMessage: {
if (msg.status == "PENDING")
{
AutomodAction action(msg.data, channelID);
action.reason = QString("%1 level %2")
.arg(msg.contentCategory)
.arg(msg.contentLevel);
action.msgID = msg.messageID;
action.message = msg.messageText;
// this message also contains per-word automod data, which could be implemented
// extract sender data manually because Twitch loves not being consistent
QString senderDisplayName =
msg.senderUserDisplayName; // Might be transformed later
bool hasLocalizedName = false;
if (!msg.senderUserDisplayName.isEmpty())
{
// check for non-ascii display names
if (QString::compare(msg.senderUserDisplayName,
msg.senderUserLogin,
Qt::CaseInsensitive) != 0)
{
hasLocalizedName = true;
}
}
QColor senderColor = msg.senderUserChatColor;
QString senderColor_;
if (!senderColor.isValid() &&
getSettings()->colorizeNicknames)
{
// color may be not present if user is a grey-name
senderColor = getRandomColor(msg.senderUserID);
}
// handle username style based on prefered setting
switch (getSettings()->usernameDisplayMode.getValue())
{
case UsernameDisplayMode::Username: {
if (hasLocalizedName)
{
senderDisplayName = msg.senderUserLogin;
}
break;
}
case UsernameDisplayMode::LocalizedName: {
break;
}
case UsernameDisplayMode::
UsernameAndLocalizedName: {
if (hasLocalizedName)
{
senderDisplayName = QString("%1(%2)").arg(
msg.senderUserLogin,
msg.senderUserDisplayName);
}
break;
}
}
action.target =
ActionUser{msg.senderUserID, msg.senderUserLogin,
senderDisplayName, senderColor};
handleAutoModMessage(action);
}
// "ALLOWED" and "DENIED" statuses remain unimplemented
// They are versions of automod_message_(denied|approved) but for mods.
}
break;
case PubSubAutoModQueueMessage::Type::INVALID:
default: {
}
break;
}
});
this->twitch->pubsub->signals_.moderation.autoModMessageBlocked.connect(
handleAutoModMessage);
this->twitch->pubsub->signals_.moderation.automodUserMessage.connect(
[&](const auto &action) {
auto chan = this->twitch->getChannelOrEmptyByID(action.roomID);
@ -381,39 +466,44 @@ void Application::initPubsub()
this->twitch->pubsub->signals_.pointReward.redeemed.connect(
[&](auto &data) {
QString channelId;
if (rj::getSafe(data, "channel_id", channelId))
QString channelId = data.value("channel_id").toString();
if (channelId.isEmpty())
{
qCDebug(chatterinoApp)
<< "Couldn't find channel id of point reward";
return;
}
auto chan = this->twitch->getChannelOrEmptyByID(channelId);
auto reward = ChannelPointReward(data);
postToThread([chan, reward] {
if (auto channel =
dynamic_cast<TwitchChannel *>(chan.get()))
if (auto channel = dynamic_cast<TwitchChannel *>(chan.get()))
{
channel->addChannelPointReward(reward);
}
});
}
else
{
qCDebug(chatterinoApp)
<< "Couldn't find channel id of point reward";
}
});
this->twitch->pubsub->start();
auto RequestModerationActions = [=]() {
this->twitch->pubsub->unlistenAllModerationActions();
this->twitch->pubsub->setAccount(
getApp()->accounts->twitch.getCurrent());
// TODO(pajlada): Unlisten to all authed topics instead of only
// moderation topics this->twitch->pubsub->UnlistenAllAuthedTopics();
this->twitch->pubsub->listenToWhispers(
this->accounts->twitch.getCurrent());
this->twitch->pubsub->listenToWhispers();
};
this->accounts->twitch.currentUserChanged.connect(
[=] {
this->twitch->pubsub->unlistenAllModerationActions();
this->twitch->pubsub->unlistenWhispers();
},
boost::signals2::at_front);
this->accounts->twitch.currentUserChanged.connect(RequestModerationActions);
RequestModerationActions();

View file

@ -66,7 +66,7 @@ public:
private:
void addSingleton(Singleton *singleton);
void initPubsub();
void initPubSub();
void initNm(Paths &paths);
template <typename T,

View file

@ -210,12 +210,16 @@ set(SOURCE_FILES
providers/twitch/ChannelPointReward.hpp
providers/twitch/IrcMessageHandler.cpp
providers/twitch/IrcMessageHandler.hpp
providers/twitch/PubsubActions.cpp
providers/twitch/PubsubActions.hpp
providers/twitch/PubsubClient.cpp
providers/twitch/PubsubClient.hpp
providers/twitch/PubsubHelpers.cpp
providers/twitch/PubsubHelpers.hpp
providers/twitch/PubSubActions.cpp
providers/twitch/PubSubActions.hpp
providers/twitch/PubSubClient.cpp
providers/twitch/PubSubClient.hpp
providers/twitch/PubSubClientOptions.hpp
providers/twitch/PubSubHelpers.hpp
providers/twitch/PubSubManager.cpp
providers/twitch/PubSubManager.hpp
providers/twitch/PubSubMessages.hpp
providers/twitch/PubSubWebsocket.hpp
providers/twitch/TwitchAccount.cpp
providers/twitch/TwitchAccount.hpp
providers/twitch/TwitchAccountManager.cpp
@ -237,6 +241,22 @@ set(SOURCE_FILES
providers/twitch/TwitchUser.cpp
providers/twitch/TwitchUser.hpp
providers/twitch/pubsubmessages/AutoMod.cpp
providers/twitch/pubsubmessages/AutoMod.hpp
providers/twitch/pubsubmessages/Base.cpp
providers/twitch/pubsubmessages/Base.hpp
providers/twitch/pubsubmessages/ChannelPoints.cpp
providers/twitch/pubsubmessages/ChannelPoints.hpp
providers/twitch/pubsubmessages/ChatModeratorAction.cpp
providers/twitch/pubsubmessages/ChatModeratorAction.hpp
providers/twitch/pubsubmessages/Listen.cpp
providers/twitch/pubsubmessages/Listen.hpp
providers/twitch/pubsubmessages/Message.hpp
providers/twitch/pubsubmessages/Unlisten.cpp
providers/twitch/pubsubmessages/Unlisten.hpp
providers/twitch/pubsubmessages/Whisper.cpp
providers/twitch/pubsubmessages/Whisper.hpp
providers/twitch/api/Helix.cpp
providers/twitch/api/Helix.hpp
@ -510,6 +530,7 @@ target_link_libraries(${LIBRARY_PROJECT}
Threads::Threads
RapidJSON::RapidJSON
LRUCache
MagicEnum
)
if (BUILD_WITH_QTKEYCHAIN)
target_link_libraries(${LIBRARY_PROJECT}

View file

@ -28,7 +28,7 @@ Q_LOGGING_CATEGORY(chatterinoNotification, "chatterino.notification",
logThreshold);
Q_LOGGING_CATEGORY(chatterinoNuulsuploader, "chatterino.nuulsuploader",
logThreshold);
Q_LOGGING_CATEGORY(chatterinoPubsub, "chatterino.pubsub", logThreshold);
Q_LOGGING_CATEGORY(chatterinoPubSub, "chatterino.pubsub", logThreshold);
Q_LOGGING_CATEGORY(chatterinoStreamlink, "chatterino.streamlink", logThreshold);
Q_LOGGING_CATEGORY(chatterinoStreamerMode, "chatterino.streamermode",
logThreshold);

View file

@ -21,7 +21,7 @@ Q_DECLARE_LOGGING_CATEGORY(chatterinoMessage);
Q_DECLARE_LOGGING_CATEGORY(chatterinoNativeMessage);
Q_DECLARE_LOGGING_CATEGORY(chatterinoNotification);
Q_DECLARE_LOGGING_CATEGORY(chatterinoNuulsuploader);
Q_DECLARE_LOGGING_CATEGORY(chatterinoPubsub);
Q_DECLARE_LOGGING_CATEGORY(chatterinoPubSub);
Q_DECLARE_LOGGING_CATEGORY(chatterinoStreamlink);
Q_DECLARE_LOGGING_CATEGORY(chatterinoStreamerMode);
Q_DECLARE_LOGGING_CATEGORY(chatterinoTokenizer);

View file

@ -2,7 +2,7 @@
#include "Application.hpp"
#include "MessageElement.hpp"
#include "providers/twitch/PubsubActions.hpp"
#include "providers/twitch/PubSubActions.hpp"
#include "singletons/Theme.hpp"
#include "util/DebugCount.hpp"
#include "util/IrcHelpers.hpp"

View file

@ -7,7 +7,7 @@
#include "messages/Message.hpp"
#include "messages/MessageElement.hpp"
#include "providers/LinkResolver.hpp"
#include "providers/twitch/PubsubActions.hpp"
#include "providers/twitch/PubSubActions.hpp"
#include "singletons/Emotes.hpp"
#include "singletons/Resources.hpp"
#include "singletons/Theme.hpp"

View file

@ -1,97 +1,38 @@
#include "ChannelPointReward.hpp"
#include "common/QLogging.hpp"
#include "util/RapidjsonHelpers.hpp"
namespace chatterino {
QString parseRewardImage(const rapidjson::Value &obj, const char *key,
bool &result)
ChannelPointReward::ChannelPointReward(const QJsonObject &redemption)
{
QString url;
if (!(result = rj::getSafe(obj, key, url)))
{
qCDebug(chatterinoTwitch)
<< "No url value found for key in reward image object:" << key;
return "";
}
auto reward = redemption.value("reward").toObject();
return url;
}
ChannelPointReward::ChannelPointReward(rapidjson::Value &redemption)
{
rapidjson::Value user;
if (!(this->hasParsedSuccessfully =
rj::getSafeObject(redemption, "user", user)))
{
qCDebug(chatterinoTwitch) << "No user info found for redemption";
return;
}
rapidjson::Value reward;
if (!(this->hasParsedSuccessfully =
rj::getSafeObject(redemption, "reward", reward)))
{
qCDebug(chatterinoTwitch) << "No reward info found for redemption";
return;
}
if (!(this->hasParsedSuccessfully = rj::getSafe(reward, "id", this->id)))
{
qCDebug(chatterinoTwitch) << "No id found for reward";
return;
}
if (!(this->hasParsedSuccessfully =
rj::getSafe(reward, "channel_id", this->channelId)))
{
qCDebug(chatterinoTwitch) << "No channel_id found for reward";
return;
}
if (!(this->hasParsedSuccessfully =
rj::getSafe(reward, "title", this->title)))
{
qCDebug(chatterinoTwitch) << "No title found for reward";
return;
}
if (!(this->hasParsedSuccessfully =
rj::getSafe(reward, "cost", this->cost)))
{
qCDebug(chatterinoTwitch) << "No cost found for reward";
return;
}
if (!(this->hasParsedSuccessfully = rj::getSafe(
reward, "is_user_input_required", this->isUserInputRequired)))
{
qCDebug(chatterinoTwitch)
<< "No information if user input is required found for reward";
return;
}
this->id = reward.value("id").toString();
this->channelId = reward.value("channel_id").toString();
this->title = reward.value("title").toString();
this->cost = reward.value("cost").toInt();
this->isUserInputRequired = reward.value("is_user_input_required").toBool();
// We don't need to store user information for rewards with user input
// because we will get the user info from a corresponding IRC message
if (!this->isUserInputRequired)
{
this->parseUser(user);
auto user = redemption.value("user").toObject();
this->user.id = user.value("id").toString();
this->user.login = user.value("login").toString();
this->user.displayName = user.value("display_name").toString();
}
rapidjson::Value obj;
if (rj::getSafeObject(reward, "image", obj) && !obj.IsNull() &&
obj.IsObject())
auto imageValue = reward.value("image");
if (imageValue.isObject())
{
auto imageObject = imageValue.toObject();
this->image = ImageSet{
Image::fromUrl(
{parseRewardImage(obj, "url_1x", this->hasParsedSuccessfully)},
1),
Image::fromUrl(
{parseRewardImage(obj, "url_2x", this->hasParsedSuccessfully)},
0.5),
Image::fromUrl(
{parseRewardImage(obj, "url_4x", this->hasParsedSuccessfully)},
0.25),
Image::fromUrl({imageObject.value("url_1x").toString()}, 1),
Image::fromUrl({imageObject.value("url_2x").toString()}, 0.5),
Image::fromUrl({imageObject.value("url_4x").toString()}, 0.25),
};
}
else
@ -104,27 +45,4 @@ ChannelPointReward::ChannelPointReward(rapidjson::Value &redemption)
}
}
void ChannelPointReward::parseUser(rapidjson::Value &user)
{
if (!(this->hasParsedSuccessfully = rj::getSafe(user, "id", this->user.id)))
{
qCDebug(chatterinoTwitch) << "No id found for user in reward";
return;
}
if (!(this->hasParsedSuccessfully =
rj::getSafe(user, "login", this->user.login)))
{
qCDebug(chatterinoTwitch) << "No login name found for user in reward";
return;
}
if (!(this->hasParsedSuccessfully =
rj::getSafe(user, "display_name", this->user.displayName)))
{
qCDebug(chatterinoTwitch) << "No display name found for user in reward";
return;
}
}
} // namespace chatterino

View file

@ -4,7 +4,7 @@
#include "messages/Image.hpp"
#include "messages/ImageSet.hpp"
#include <rapidjson/document.h>
#include <QJsonObject>
#define TWITCH_CHANNEL_POINT_REWARD_URL(x) \
QString("https://static-cdn.jtvnw.net/custom-reward-images/default-%1") \
@ -12,14 +12,13 @@
namespace chatterino {
struct ChannelPointReward {
ChannelPointReward(rapidjson::Value &reward);
ChannelPointReward(const QJsonObject &redemption);
ChannelPointReward() = delete;
QString id;
QString channelId;
QString title;
int cost;
ImageSet image;
bool hasParsedSuccessfully = false;
bool isUserInputRequired = false;
struct {
@ -27,9 +26,6 @@ struct ChannelPointReward {
QString login;
QString displayName;
} user;
private:
void parseUser(rapidjson::Value &user);
};
} // namespace chatterino

View file

@ -0,0 +1,13 @@
#include "providers/twitch/PubSubActions.hpp"
namespace chatterino {
PubSubAction::PubSubAction(const QJsonObject &data, const QString &_roomID)
: timestamp(std::chrono::steady_clock::now())
, roomID(_roomID)
{
this->source.id = data.value("created_by_user_id").toString();
this->source.login = data.value("created_by").toString();
}
} // namespace chatterino

View file

@ -1,7 +1,7 @@
#pragma once
#include <rapidjson/document.h>
#include <QColor>
#include <QJsonObject>
#include <QString>
#include <chrono>
@ -15,10 +15,24 @@ struct ActionUser {
// displayName should be in format "login(localizedName)" for non-ascii usernames
QString displayName;
QColor color;
inline bool operator==(const ActionUser &rhs) const
{
return this->id == rhs.id && this->login == rhs.login &&
this->displayName == rhs.displayName && this->color == rhs.color;
}
};
inline QDebug operator<<(QDebug dbg, const ActionUser &user)
{
dbg.nospace() << "ActionUser(" << user.id << ", " << user.login << ", "
<< user.displayName << ", " << user.color << ")";
return dbg.maybeSpace();
}
struct PubSubAction {
PubSubAction(const rapidjson::Value &data, const QString &_roomID);
PubSubAction(const QJsonObject &data, const QString &_roomID);
ActionUser source;
std::chrono::steady_clock::time_point timestamp;

View file

@ -0,0 +1,212 @@
#include "providers/twitch/PubSubClient.hpp"
#include "common/QLogging.hpp"
#include "providers/twitch/PubSubActions.hpp"
#include "providers/twitch/PubSubHelpers.hpp"
#include "providers/twitch/PubSubMessages.hpp"
#include "providers/twitch/pubsubmessages/Unlisten.hpp"
#include "singletons/Settings.hpp"
#include "util/DebugCount.hpp"
#include "util/Helpers.hpp"
#include "util/RapidjsonHelpers.hpp"
#include <exception>
#include <thread>
namespace chatterino {
static const char *PING_PAYLOAD = R"({"type":"PING"})";
PubSubClient::PubSubClient(WebsocketClient &websocketClient,
WebsocketHandle handle,
const PubSubClientOptions &clientOptions)
: websocketClient_(websocketClient)
, handle_(handle)
, clientOptions_(clientOptions)
{
}
void PubSubClient::start()
{
assert(!this->started_);
this->started_ = true;
this->ping();
}
void PubSubClient::stop()
{
assert(this->started_);
this->started_ = false;
}
void PubSubClient::close(const std::string &reason,
websocketpp::close::status::value code)
{
WebsocketErrorCode ec;
auto conn = this->websocketClient_.get_con_from_hdl(this->handle_, ec);
if (ec)
{
qCDebug(chatterinoPubSub)
<< "Error getting con:" << ec.message().c_str();
return;
}
conn->close(code, reason, ec);
if (ec)
{
qCDebug(chatterinoPubSub) << "Error closing:" << ec.message().c_str();
return;
}
}
bool PubSubClient::listen(PubSubListenMessage msg)
{
int numRequestedListens = msg.topics.size();
if (this->numListens_ + numRequestedListens > PubSubClient::MAX_LISTENS)
{
// This PubSubClient is already at its peak listens
return false;
}
this->numListens_ += numRequestedListens;
DebugCount::increase("PubSub topic pending listens", numRequestedListens);
for (const auto &topic : msg.topics)
{
this->listeners_.emplace_back(Listener{topic, false, false, false});
}
qCDebug(chatterinoPubSub)
<< "Subscribing to" << numRequestedListens << "topics";
this->send(msg.toJson());
return true;
}
PubSubClient::UnlistenPrefixResponse PubSubClient::unlistenPrefix(
const QString &prefix)
{
std::vector<QString> topics;
for (auto it = this->listeners_.begin(); it != this->listeners_.end();)
{
const auto &listener = *it;
if (listener.topic.startsWith(prefix))
{
topics.push_back(listener.topic);
it = this->listeners_.erase(it);
}
else
{
++it;
}
}
if (topics.empty())
{
return {{}, ""};
}
auto numRequestedUnlistens = topics.size();
this->numListens_ -= numRequestedUnlistens;
DebugCount::increase("PubSub topic pending unlistens",
numRequestedUnlistens);
PubSubUnlistenMessage message(topics);
this->send(message.toJson());
return {message.topics, message.nonce};
}
void PubSubClient::handleListenResponse(const PubSubMessage &message)
{
}
void PubSubClient::handleUnlistenResponse(const PubSubMessage &message)
{
}
void PubSubClient::handlePong()
{
assert(this->awaitingPong_);
this->awaitingPong_ = false;
}
bool PubSubClient::isListeningToTopic(const QString &topic)
{
for (const auto &listener : this->listeners_)
{
if (listener.topic == topic)
{
return true;
}
}
return false;
}
std::vector<Listener> PubSubClient::getListeners() const
{
return this->listeners_;
}
void PubSubClient::ping()
{
assert(this->started_);
if (this->awaitingPong_)
{
qCDebug(chatterinoPubSub) << "No pong response, disconnect!";
this->close("Didn't respond to ping");
return;
}
if (!this->send(PING_PAYLOAD))
{
return;
}
this->awaitingPong_ = true;
auto self = this->shared_from_this();
runAfter(this->websocketClient_.get_io_service(),
this->clientOptions_.pingInterval_, [self](auto timer) {
if (!self->started_)
{
return;
}
self->ping();
});
}
bool PubSubClient::send(const char *payload)
{
WebsocketErrorCode ec;
this->websocketClient_.send(this->handle_, payload,
websocketpp::frame::opcode::text, ec);
if (ec)
{
qCDebug(chatterinoPubSub) << "Error sending message" << payload << ":"
<< ec.message().c_str();
// TODO(pajlada): Check which error code happened and maybe
// gracefully handle it
return false;
}
return true;
}
} // namespace chatterino

View file

@ -0,0 +1,74 @@
#pragma once
#include "providers/twitch/PubSubClientOptions.hpp"
#include "providers/twitch/PubSubMessages.hpp"
#include "providers/twitch/PubSubWebsocket.hpp"
#include <QString>
#include <pajlada/signals/signal.hpp>
#include <atomic>
#include <vector>
namespace chatterino {
struct TopicData {
QString topic;
bool authed{false};
bool persistent{false};
};
struct Listener : TopicData {
bool confirmed{false};
};
class PubSubClient : public std::enable_shared_from_this<PubSubClient>
{
public:
struct UnlistenPrefixResponse {
std::vector<QString> topics;
QString nonce;
};
// The max amount of topics we may listen to with a single connection
static constexpr std::vector<QString>::size_type MAX_LISTENS = 50;
PubSubClient(WebsocketClient &_websocketClient, WebsocketHandle _handle,
const PubSubClientOptions &clientOptions);
void start();
void stop();
void close(const std::string &reason,
websocketpp::close::status::value code =
websocketpp::close::status::normal);
bool listen(PubSubListenMessage msg);
UnlistenPrefixResponse unlistenPrefix(const QString &prefix);
void handleListenResponse(const PubSubMessage &message);
void handleUnlistenResponse(const PubSubMessage &message);
void handlePong();
bool isListeningToTopic(const QString &topic);
std::vector<Listener> getListeners() const;
private:
void ping();
bool send(const char *payload);
WebsocketClient &websocketClient_;
WebsocketHandle handle_;
uint16_t numListens_ = 0;
std::vector<Listener> listeners_;
std::atomic<bool> awaitingPong_{false};
std::atomic<bool> started_{false};
const PubSubClientOptions &clientOptions_;
};
} // namespace chatterino

View file

@ -0,0 +1,14 @@
#pragma once
#include <chrono>
namespace chatterino {
/**
* @brief Options to change the behaviour of the underlying websocket clients
**/
struct PubSubClientOptions {
std::chrono::seconds pingInterval_;
};
} // namespace chatterino

View file

@ -1,5 +1,6 @@
#pragma once
#include <QJsonObject>
#include <boost/asio.hpp>
#include <boost/asio/steady_timer.hpp>
#include <memory>
@ -11,19 +12,6 @@ namespace chatterino {
class TwitchAccount;
struct ActionUser;
const rapidjson::Value &getArgs(const rapidjson::Value &data);
const rapidjson::Value &getMsgID(const rapidjson::Value &data);
bool getCreatedByUser(const rapidjson::Value &data, ActionUser &user);
bool getTargetUser(const rapidjson::Value &data, ActionUser &user);
bool getTargetUserName(const rapidjson::Value &data, ActionUser &user);
rapidjson::Document createListenMessage(const std::vector<QString> &topicsVec,
std::shared_ptr<TwitchAccount> account);
rapidjson::Document createUnlistenMessage(
const std::vector<QString> &topicsVec);
// Create timer using given ioService
template <typename Duration, typename Callback>
void runAfter(boost::asio::io_service &ioService, Duration duration,
@ -35,7 +23,7 @@ void runAfter(boost::asio::io_service &ioService, Duration duration,
timer->async_wait([timer, cb](const boost::system::error_code &ec) {
if (ec)
{
qCDebug(chatterinoPubsub)
qCDebug(chatterinoPubSub)
<< "Error in runAfter:" << ec.message().c_str();
return;
}
@ -54,7 +42,7 @@ void runAfter(std::shared_ptr<boost::asio::steady_timer> timer,
timer->async_wait([timer, cb](const boost::system::error_code &ec) {
if (ec)
{
qCDebug(chatterinoPubsub)
qCDebug(chatterinoPubSub)
<< "Error in runAfter:" << ec.message().c_str();
return;
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,198 @@
#pragma once
#include "providers/twitch/ChatterinoWebSocketppLogger.hpp"
#include "providers/twitch/PubSubActions.hpp"
#include "providers/twitch/PubSubClient.hpp"
#include "providers/twitch/PubSubClientOptions.hpp"
#include "providers/twitch/PubSubMessages.hpp"
#include "providers/twitch/PubSubWebsocket.hpp"
#include "providers/twitch/TwitchAccount.hpp"
#include "util/ExponentialBackoff.hpp"
#include <QJsonObject>
#include <QString>
#include <pajlada/signals/signal.hpp>
#include <websocketpp/client.hpp>
#include <atomic>
#include <chrono>
#include <map>
#include <memory>
#include <thread>
#include <unordered_map>
#include <vector>
namespace chatterino {
class PubSub
{
using WebsocketMessagePtr =
websocketpp::config::asio_tls_client::message_type::ptr;
using WebsocketContextPtr =
websocketpp::lib::shared_ptr<boost::asio::ssl::context>;
template <typename T>
using Signal =
pajlada::Signals::Signal<T>; // type-id is vector<T, Alloc<T>>
struct NonceInfo {
std::weak_ptr<PubSubClient> client;
QString messageType; // e.g. LISTEN or UNLISTEN
std::vector<QString> topics;
std::vector<QString>::size_type topicCount;
};
WebsocketClient websocketClient;
std::unique_ptr<std::thread> mainThread;
// Account credentials
// Set from setAccount or setAccountData
QString token_;
QString userID_;
public:
// The max amount of connections we may open
static constexpr int maxConnections = 10;
PubSub(const QString &host,
std::chrono::seconds pingInterval = std::chrono::seconds(15));
void setAccount(std::shared_ptr<TwitchAccount> account)
{
this->token_ = account->getOAuthToken();
this->userID_ = account->getUserId();
}
void setAccountData(QString token, QString userID)
{
this->token_ = token;
this->userID_ = userID;
}
~PubSub() = delete;
enum class State {
Connected,
Disconnected,
};
void start();
void stop();
bool isConnected() const
{
return this->state == State::Connected;
}
struct {
struct {
Signal<ClearChatAction> chatCleared;
Signal<DeleteAction> messageDeleted;
Signal<ModeChangedAction> modeChanged;
Signal<ModerationStateAction> moderationStateChanged;
Signal<BanAction> userBanned;
Signal<UnbanAction> userUnbanned;
// Message caught by automod
// channelID
pajlada::Signals::Signal<PubSubAutoModQueueMessage, QString>
autoModMessageCaught;
// Message blocked by moderator
Signal<AutomodAction> autoModMessageBlocked;
Signal<AutomodUserAction> automodUserMessage;
Signal<AutomodInfoAction> automodInfoMessage;
} moderation;
struct {
// Parsing should be done in PubSubManager as well,
// but for now we just send the raw data
Signal<const PubSubWhisperMessage &> received;
Signal<const PubSubWhisperMessage &> sent;
} whisper;
struct {
Signal<const QJsonObject &> redeemed;
} pointReward;
} signals_;
void unlistenAllModerationActions();
void unlistenWhispers();
bool listenToWhispers();
void listenToChannelModerationActions(const QString &channelID);
void listenToAutomod(const QString &channelID);
void listenToChannelPointRewards(const QString &channelID);
std::vector<QString> requests;
struct {
std::atomic<uint32_t> connectionsClosed{0};
std::atomic<uint32_t> connectionsOpened{0};
std::atomic<uint32_t> connectionsFailed{0};
std::atomic<uint32_t> messagesReceived{0};
std::atomic<uint32_t> messagesFailedToParse{0};
std::atomic<uint32_t> failedListenResponses{0};
std::atomic<uint32_t> listenResponses{0};
std::atomic<uint32_t> unlistenResponses{0};
} diag;
void listenToTopic(const QString &topic);
private:
void listen(PubSubListenMessage msg);
bool tryListen(PubSubListenMessage msg);
bool isListeningToTopic(const QString &topic);
void addClient();
std::atomic<bool> addingClient{false};
ExponentialBackoff<5> connectBackoff{std::chrono::milliseconds(1000)};
State state = State::Connected;
std::map<WebsocketHandle, std::shared_ptr<PubSubClient>,
std::owner_less<WebsocketHandle>>
clients;
std::unordered_map<
QString, std::function<void(const QJsonObject &, const QString &)>>
moderationActionHandlers;
std::unordered_map<
QString, std::function<void(const QJsonObject &, const QString &)>>
channelTermsActionHandlers;
void onMessage(websocketpp::connection_hdl hdl, WebsocketMessagePtr msg);
void onConnectionOpen(websocketpp::connection_hdl hdl);
void onConnectionFail(websocketpp::connection_hdl hdl);
void onConnectionClose(websocketpp::connection_hdl hdl);
WebsocketContextPtr onTLSInit(websocketpp::connection_hdl hdl);
void handleResponse(const PubSubMessage &message);
void handleListenResponse(const NonceInfo &info, bool failed);
void handleUnlistenResponse(const NonceInfo &info, bool failed);
void handleMessageResponse(const PubSubMessageMessage &message);
// Register a nonce for a specific client
void registerNonce(QString nonce, NonceInfo nonceInfo);
// Find client associated with a nonce
boost::optional<NonceInfo> findNonceInfo(QString nonce);
std::unordered_map<QString, NonceInfo> nonces_;
void runThread();
std::shared_ptr<boost::asio::io_service::work> work{nullptr};
const QString host_;
const PubSubClientOptions clientOptions_;
bool stopping_{false};
};
} // namespace chatterino

View file

@ -0,0 +1,10 @@
#pragma once
#include "providers/twitch/pubsubmessages/AutoMod.hpp"
#include "providers/twitch/pubsubmessages/Base.hpp"
#include "providers/twitch/pubsubmessages/ChannelPoints.hpp"
#include "providers/twitch/pubsubmessages/ChatModeratorAction.hpp"
#include "providers/twitch/pubsubmessages/Listen.hpp"
#include "providers/twitch/pubsubmessages/Message.hpp"
#include "providers/twitch/pubsubmessages/Unlisten.hpp"
#include "providers/twitch/pubsubmessages/Whisper.hpp"

View file

@ -0,0 +1,32 @@
#pragma once
#include "providers/twitch/ChatterinoWebSocketppLogger.hpp"
#include <websocketpp/client.hpp>
#include <websocketpp/config/asio_client.hpp>
#include <websocketpp/extensions/permessage_deflate/disabled.hpp>
#include <websocketpp/logger/basic.hpp>
namespace chatterino {
struct chatterinoconfig : public websocketpp::config::asio_tls_client {
typedef websocketpp::log::chatterinowebsocketpplogger<
concurrency_type, websocketpp::log::elevel>
elog_type;
typedef websocketpp::log::chatterinowebsocketpplogger<
concurrency_type, websocketpp::log::alevel>
alog_type;
struct permessage_deflate_config {
};
typedef websocketpp::extensions::permessage_deflate::disabled<
permessage_deflate_config>
permessage_deflate_type;
};
using WebsocketClient = websocketpp::client<chatterinoconfig>;
using WebsocketHandle = websocketpp::connection_hdl;
using WebsocketErrorCode = websocketpp::lib::error_code;
} // namespace chatterino

View file

@ -1,14 +0,0 @@
#include "providers/twitch/PubsubActions.hpp"
#include "providers/twitch/PubsubHelpers.hpp"
namespace chatterino {
PubSubAction::PubSubAction(const rapidjson::Value &data, const QString &_roomID)
: timestamp(std::chrono::steady_clock::now())
, roomID(_roomID)
{
getCreatedByUser(data, this->source);
}
} // namespace chatterino

File diff suppressed because it is too large Load diff

View file

@ -1,209 +0,0 @@
#pragma once
#include "providers/twitch/ChatterinoWebSocketppLogger.hpp"
#include "providers/twitch/PubsubActions.hpp"
#include "providers/twitch/TwitchAccount.hpp"
#include "providers/twitch/TwitchIrcServer.hpp"
#include <rapidjson/document.h>
#include <QString>
#include <pajlada/signals/signal.hpp>
#include <websocketpp/client.hpp>
#include <websocketpp/config/asio_client.hpp>
#include <websocketpp/extensions/permessage_deflate/disabled.hpp>
#include <websocketpp/logger/basic.hpp>
#include <atomic>
#include <chrono>
#include <map>
#include <memory>
#include <set>
#include <thread>
#include <unordered_map>
#include <vector>
namespace chatterino {
struct chatterinoconfig : public websocketpp::config::asio_tls_client {
typedef websocketpp::log::chatterinowebsocketpplogger<
concurrency_type, websocketpp::log::elevel>
elog_type;
typedef websocketpp::log::chatterinowebsocketpplogger<
concurrency_type, websocketpp::log::alevel>
alog_type;
struct permessage_deflate_config {
};
typedef websocketpp::extensions::permessage_deflate::disabled<
permessage_deflate_config>
permessage_deflate_type;
};
using WebsocketClient = websocketpp::client<chatterinoconfig>;
using WebsocketHandle = websocketpp::connection_hdl;
using WebsocketErrorCode = websocketpp::lib::error_code;
#define MAX_PUBSUB_LISTENS 50
#define MAX_PUBSUB_CONNECTIONS 10
struct RequestMessage {
QString payload;
int topicCount;
};
namespace detail {
struct Listener {
QString topic;
bool authed;
bool persistent;
bool confirmed = false;
};
class PubSubClient : public std::enable_shared_from_this<PubSubClient>
{
public:
PubSubClient(WebsocketClient &_websocketClient,
WebsocketHandle _handle);
void start();
void stop();
bool listen(rapidjson::Document &message);
void unlistenPrefix(const QString &prefix);
void handlePong();
bool isListeningToTopic(const QString &topic);
private:
void ping();
bool send(const char *payload);
WebsocketClient &websocketClient_;
WebsocketHandle handle_;
uint16_t numListens_ = 0;
std::vector<Listener> listeners_;
std::atomic<bool> awaitingPong_{false};
std::atomic<bool> started_{false};
};
} // namespace detail
class PubSub
{
using WebsocketMessagePtr =
websocketpp::config::asio_tls_client::message_type::ptr;
using WebsocketContextPtr =
websocketpp::lib::shared_ptr<boost::asio::ssl::context>;
template <typename T>
using Signal =
pajlada::Signals::Signal<T>; // type-id is vector<T, Alloc<T>>
WebsocketClient websocketClient;
std::unique_ptr<std::thread> mainThread;
public:
PubSub();
~PubSub() = delete;
enum class State {
Connected,
Disconnected,
};
void start();
bool isConnected() const
{
return this->state == State::Connected;
}
pajlada::Signals::NoArgSignal connected;
struct {
struct {
Signal<ClearChatAction> chatCleared;
Signal<DeleteAction> messageDeleted;
Signal<ModeChangedAction> modeChanged;
Signal<ModerationStateAction> moderationStateChanged;
Signal<BanAction> userBanned;
Signal<UnbanAction> userUnbanned;
Signal<AutomodAction> automodMessage;
Signal<AutomodUserAction> automodUserMessage;
Signal<AutomodInfoAction> automodInfoMessage;
} moderation;
struct {
// Parsing should be done in PubSubManager as well,
// but for now we just send the raw data
Signal<const rapidjson::Value &> received;
Signal<const rapidjson::Value &> sent;
} whisper;
struct {
Signal<rapidjson::Value &> redeemed;
} pointReward;
} signals_;
void listenToWhispers(std::shared_ptr<TwitchAccount> account);
void unlistenAllModerationActions();
void listenToChannelModerationActions(
const QString &channelID, std::shared_ptr<TwitchAccount> account);
void listenToAutomod(const QString &channelID,
std::shared_ptr<TwitchAccount> account);
void listenToChannelPointRewards(const QString &channelID,
std::shared_ptr<TwitchAccount> account);
std::vector<std::unique_ptr<rapidjson::Document>> requests;
private:
void listenToTopic(const QString &topic,
std::shared_ptr<TwitchAccount> account);
void listen(rapidjson::Document &&msg);
bool tryListen(rapidjson::Document &msg);
bool isListeningToTopic(const QString &topic);
void addClient();
std::atomic<bool> addingClient{false};
State state = State::Connected;
std::map<WebsocketHandle, std::shared_ptr<detail::PubSubClient>,
std::owner_less<WebsocketHandle>>
clients;
std::unordered_map<
QString, std::function<void(const rapidjson::Value &, const QString &)>>
moderationActionHandlers;
std::unordered_map<
QString, std::function<void(const rapidjson::Value &, const QString &)>>
channelTermsActionHandlers;
void onMessage(websocketpp::connection_hdl hdl, WebsocketMessagePtr msg);
void onConnectionOpen(websocketpp::connection_hdl hdl);
void onConnectionClose(websocketpp::connection_hdl hdl);
WebsocketContextPtr onTLSInit(websocketpp::connection_hdl hdl);
void handleResponse(const rapidjson::Document &msg);
void handleListenResponse(const RequestMessage &msg, bool failed);
void handleUnlistenResponse(const RequestMessage &msg, bool failed);
void handleMessageResponse(const rapidjson::Value &data);
void runThread();
};
} // namespace chatterino

View file

@ -1,104 +0,0 @@
#include "providers/twitch/PubsubHelpers.hpp"
#include "providers/twitch/PubsubActions.hpp"
#include "providers/twitch/TwitchAccount.hpp"
#include "util/RapidjsonHelpers.hpp"
namespace chatterino {
const rapidjson::Value &getArgs(const rapidjson::Value &data)
{
if (!data.HasMember("args"))
{
throw std::runtime_error("Missing member args");
}
const auto &args = data["args"];
if (!args.IsArray())
{
throw std::runtime_error("args must be an array");
}
return args;
}
const rapidjson::Value &getMsgID(const rapidjson::Value &data)
{
if (!data.HasMember("msg_id"))
{
throw std::runtime_error("Missing member msg_id");
}
const auto &msgID = data["msg_id"];
return msgID;
}
bool getCreatedByUser(const rapidjson::Value &data, ActionUser &user)
{
return rj::getSafe(data, "created_by", user.login) &&
rj::getSafe(data, "created_by_user_id", user.id);
}
bool getTargetUser(const rapidjson::Value &data, ActionUser &user)
{
return rj::getSafe(data, "target_user_id", user.id);
}
bool getTargetUserName(const rapidjson::Value &data, ActionUser &user)
{
return rj::getSafe(data, "target_user_login", user.login);
}
rapidjson::Document createListenMessage(const std::vector<QString> &topicsVec,
std::shared_ptr<TwitchAccount> account)
{
rapidjson::Document msg(rapidjson::kObjectType);
auto &a = msg.GetAllocator();
rj::set(msg, "type", "LISTEN");
rapidjson::Value data(rapidjson::kObjectType);
if (account)
{
rj::set(data, "auth_token", account->getOAuthToken(), a);
}
rapidjson::Value topics(rapidjson::kArrayType);
for (const auto &topic : topicsVec)
{
rj::add(topics, topic, a);
}
rj::set(data, "topics", topics, a);
rj::set(msg, "data", data);
return msg;
}
rapidjson::Document createUnlistenMessage(const std::vector<QString> &topicsVec)
{
rapidjson::Document msg(rapidjson::kObjectType);
auto &a = msg.GetAllocator();
rj::set(msg, "type", "UNLISTEN");
rapidjson::Value data(rapidjson::kObjectType);
rapidjson::Value topics(rapidjson::kArrayType);
for (const auto &topic : topicsVec)
{
rj::add(topics, topic, a);
}
rj::set(data, "topics", topics, a);
rj::set(msg, "data", data);
return msg;
}
} // namespace chatterino

View file

@ -119,7 +119,7 @@ void TwitchAccountManager::reloadUsers()
qCDebug(chatterinoTwitch)
<< "It was the current user, so we need to "
"reconnect stuff!";
this->currentUserChanged.invoke();
this->currentUserChanged();
}
}
break;
@ -156,7 +156,7 @@ void TwitchAccountManager::load()
this->currentUser_ = this->anonymousUser_;
}
this->currentUserChanged.invoke();
this->currentUserChanged();
});
}

View file

@ -5,6 +5,8 @@
#include "providers/twitch/TwitchAccount.hpp"
#include "util/SharedPtrElementLess.hpp"
#include <boost/signals2.hpp>
#include <mutex>
#include <vector>
@ -48,7 +50,8 @@ public:
pajlada::Settings::Setting<QString> currentUsername{"/accounts/current",
""};
pajlada::Signals::NoArgSignal currentUserChanged;
// pajlada::Signals::NoArgSignal currentUserChanged;
boost::signals2::signal<void()> currentUserChanged;
pajlada::Signals::NoArgSignal userListUpdated;
SignalVector<std::shared_ptr<TwitchAccount>> accounts;

View file

@ -11,8 +11,9 @@
#include "providers/bttv/BttvEmotes.hpp"
#include "providers/bttv/LoadBttvChannelEmote.hpp"
#include "providers/twitch/IrcMessageHandler.hpp"
#include "providers/twitch/PubsubClient.hpp"
#include "providers/twitch/PubSubManager.hpp"
#include "providers/twitch/TwitchCommon.hpp"
#include "providers/twitch/TwitchIrcServer.hpp"
#include "providers/twitch/TwitchMessageBuilder.hpp"
#include "providers/twitch/api/Helix.hpp"
#include "singletons/Emotes.hpp"
@ -159,24 +160,20 @@ TwitchChannel::TwitchChannel(const QString &name)
{
qCDebug(chatterinoTwitch) << "[TwitchChannel" << name << "] Opened";
this->signalHolder_.managedConnect(
getApp()->accounts->twitch.currentUserChanged, [=] {
this->bSignals_.emplace_back(
getApp()->accounts->twitch.currentUserChanged.connect([=] {
this->setMod(false);
});
this->refreshPubSub();
}));
// pubsub
this->signalHolder_.managedConnect(
getApp()->accounts->twitch.currentUserChanged, [=] {
this->refreshPubsub();
});
this->refreshPubsub();
this->refreshPubSub();
this->userStateChanged.connect([this] {
this->refreshPubsub();
this->refreshPubSub();
});
// room id loaded -> refresh live status
this->roomIdChanged.connect([this]() {
this->refreshPubsub();
this->refreshPubSub();
this->refreshTitle();
this->refreshLiveStatus();
this->refreshBadges();
@ -281,11 +278,6 @@ void TwitchChannel::addChannelPointReward(const ChannelPointReward &reward)
{
assertInGuiThread();
if (!reward.hasParsedSuccessfully)
{
return;
}
if (!reward.isUserInputRequired)
{
MessageBuilder builder;
@ -295,7 +287,7 @@ void TwitchChannel::addChannelPointReward(const ChannelPointReward &reward)
return;
}
bool result;
bool result = false;
{
auto channelPointRewards = this->channelPointRewards_.access();
result = channelPointRewards->try_emplace(reward.id, reward).second;
@ -847,16 +839,21 @@ void TwitchChannel::loadRecentMessages()
.execute();
}
void TwitchChannel::refreshPubsub()
void TwitchChannel::refreshPubSub()
{
auto roomId = this->roomId();
if (roomId.isEmpty())
{
return;
}
auto account = getApp()->accounts->twitch.getCurrent();
getApp()->twitch->pubsub->listenToChannelModerationActions(roomId, account);
getApp()->twitch->pubsub->listenToAutomod(roomId, account);
getApp()->twitch->pubsub->listenToChannelPointRewards(roomId, account);
auto currentAccount = getApp()->accounts->twitch.getCurrent();
getApp()->twitch->pubsub->setAccount(currentAccount);
getApp()->twitch->pubsub->listenToChannelModerationActions(roomId);
getApp()->twitch->pubsub->listenToAutomod(roomId);
getApp()->twitch->pubsub->listenToChannelPointRewards(roomId);
}
void TwitchChannel::refreshChatters()

View file

@ -16,6 +16,7 @@
#include <QElapsedTimer>
#include <QRegularExpression>
#include <boost/optional.hpp>
#include <boost/signals2.hpp>
#include <pajlada/signals/signalholder.hpp>
#include <mutex>
@ -144,7 +145,7 @@ private:
// Methods
void refreshLiveStatus();
void parseLiveStatus(bool live, const HelixStream &stream);
void refreshPubsub();
void refreshPubSub();
void refreshChatters();
void refreshBadges();
void refreshCheerEmotes();
@ -199,6 +200,7 @@ private:
bool isClipCreationInProgress{false};
pajlada::Signals::SignalHolder signalHolder_;
std::vector<boost::signals2::scoped_connection> bSignals_;
friend class TwitchIrcServer;
friend class TwitchMessageBuilder;

View file

@ -11,7 +11,7 @@
#include "messages/Message.hpp"
#include "messages/MessageBuilder.hpp"
#include "providers/twitch/IrcMessageHandler.hpp"
#include "providers/twitch/PubsubClient.hpp"
#include "providers/twitch/PubSubManager.hpp"
#include "providers/twitch/TwitchAccount.hpp"
#include "providers/twitch/TwitchChannel.hpp"
#include "providers/twitch/TwitchHelpers.hpp"
@ -22,6 +22,8 @@
// using namespace Communi;
using namespace std::chrono_literals;
#define TWITCH_PUBSUB_URL "wss://pubsub-edge.twitch.tv"
namespace chatterino {
TwitchIrcServer::TwitchIrcServer()
@ -32,7 +34,7 @@ TwitchIrcServer::TwitchIrcServer()
{
this->initializeIrc();
this->pubsub = new PubSub;
this->pubsub = new PubSub(TWITCH_PUBSUB_URL);
// getSettings()->twitchSeperateWriteConnection.connect([this](auto, auto) {
// this->connect(); },
@ -45,6 +47,7 @@ void TwitchIrcServer::initialize(Settings &settings, Paths &paths)
getApp()->accounts->twitch.currentUserChanged.connect([this]() {
postToThread([this] {
this->connect();
this->pubsub->setAccount(getApp()->accounts->twitch.getCurrent());
});
});

View file

@ -4,7 +4,7 @@
#include "common/Outcome.hpp"
#include "messages/SharedMessageBuilder.hpp"
#include "providers/twitch/ChannelPointReward.hpp"
#include "providers/twitch/PubsubActions.hpp"
#include "providers/twitch/PubSubActions.hpp"
#include "providers/twitch/TwitchBadge.hpp"
#include <IrcMessage>

View file

@ -0,0 +1,40 @@
#include "providers/twitch/pubsubmessages/AutoMod.hpp"
namespace chatterino {
PubSubAutoModQueueMessage::PubSubAutoModQueueMessage(const QJsonObject &root)
: typeString(root.value("type").toString())
, data(root.value("data").toObject())
, status(this->data.value("status").toString())
{
auto oType = magic_enum::enum_cast<Type>(this->typeString.toStdString());
if (oType.has_value())
{
this->type = oType.value();
}
auto contentClassification =
data.value("content_classification").toObject();
this->contentCategory = contentClassification.value("category").toString();
this->contentLevel = contentClassification.value("level").toInt();
auto message = data.value("message").toObject();
this->messageID = message.value("id").toString();
auto messageContent = message.value("content").toObject();
this->messageText = messageContent.value("text").toString();
auto messageSender = message.value("sender").toObject();
this->senderUserID = messageSender.value("user_id").toString();
this->senderUserLogin = messageSender.value("login").toString();
this->senderUserDisplayName =
messageSender.value("display_name").toString();
this->senderUserChatColor =
QColor(messageSender.value("chat_color").toString());
}
} // namespace chatterino

View file

@ -0,0 +1,53 @@
#pragma once
#include <QColor>
#include <QJsonObject>
#include <QString>
#include <magic_enum.hpp>
namespace chatterino {
struct PubSubAutoModQueueMessage {
enum class Type {
AutoModCaughtMessage,
INVALID,
};
QString typeString;
Type type = Type::INVALID;
QJsonObject data;
QString status;
QString contentCategory;
int contentLevel;
QString messageID;
QString messageText;
QString senderUserID;
QString senderUserLogin;
QString senderUserDisplayName;
QColor senderUserChatColor;
PubSubAutoModQueueMessage(const QJsonObject &root);
};
} // namespace chatterino
template <>
constexpr magic_enum::customize::customize_t magic_enum::customize::enum_name<
chatterino::PubSubAutoModQueueMessage::Type>(
chatterino::PubSubAutoModQueueMessage::Type value) noexcept
{
switch (value)
{
case chatterino::PubSubAutoModQueueMessage::Type::AutoModCaughtMessage:
return "automod_caught_message";
default:
return default_tag;
}
}

View file

@ -0,0 +1,19 @@
#include "providers/twitch/pubsubmessages/Base.hpp"
namespace chatterino {
PubSubMessage::PubSubMessage(QJsonObject _object)
: object(std::move(_object))
, nonce(this->object.value("nonce").toString())
, error(this->object.value("error").toString())
, typeString(this->object.value("type").toString())
{
auto oType = magic_enum::enum_cast<Type>(this->typeString.toStdString());
if (oType.has_value())
{
this->type = oType.value();
}
}
} // namespace chatterino

View file

@ -0,0 +1,83 @@
#pragma once
#include <QJsonDocument>
#include <QJsonObject>
#include <QString>
#include <magic_enum.hpp>
#include <boost/optional.hpp>
namespace chatterino {
struct PubSubMessage {
enum class Type {
Pong,
Response,
Message,
INVALID,
};
QJsonObject object;
QString nonce;
QString error;
QString typeString;
Type type;
PubSubMessage(QJsonObject _object);
template <class InnerClass>
boost::optional<InnerClass> toInner();
};
template <class InnerClass>
boost::optional<InnerClass> PubSubMessage::toInner()
{
auto dataValue = this->object.value("data");
if (!dataValue.isObject())
{
return boost::none;
}
auto data = dataValue.toObject();
return InnerClass{this->nonce, data};
}
static boost::optional<PubSubMessage> parsePubSubBaseMessage(
const QString &blob)
{
QJsonDocument jsonDoc(QJsonDocument::fromJson(blob.toUtf8()));
if (jsonDoc.isNull())
{
return boost::none;
}
return PubSubMessage(jsonDoc.object());
}
} // namespace chatterino
template <>
constexpr magic_enum::customize::customize_t
magic_enum::customize::enum_name<chatterino::PubSubMessage::Type>(
chatterino::PubSubMessage::Type value) noexcept
{
switch (value)
{
case chatterino::PubSubMessage::Type::Pong:
return "PONG";
case chatterino::PubSubMessage::Type::Response:
return "RESPONSE";
case chatterino::PubSubMessage::Type::Message:
return "MESSAGE";
default:
return default_tag;
}
}

View file

@ -0,0 +1,17 @@
#include "providers/twitch/pubsubmessages/ChannelPoints.hpp"
namespace chatterino {
PubSubCommunityPointsChannelV1Message::PubSubCommunityPointsChannelV1Message(
const QJsonObject &root)
: typeString(root.value("type").toString())
, data(root.value("data").toObject())
{
auto oType = magic_enum::enum_cast<Type>(this->typeString.toStdString());
if (oType.has_value())
{
this->type = oType.value();
}
}
} // namespace chatterino

View file

@ -0,0 +1,40 @@
#pragma once
#include <QJsonObject>
#include <QString>
#include <magic_enum.hpp>
namespace chatterino {
struct PubSubCommunityPointsChannelV1Message {
enum class Type {
RewardRedeemed,
INVALID,
};
QString typeString;
Type type = Type::INVALID;
QJsonObject data;
PubSubCommunityPointsChannelV1Message(const QJsonObject &root);
};
} // namespace chatterino
template <>
constexpr magic_enum::customize::customize_t magic_enum::customize::enum_name<
chatterino::PubSubCommunityPointsChannelV1Message::Type>(
chatterino::PubSubCommunityPointsChannelV1Message::Type value) noexcept
{
switch (value)
{
case chatterino::PubSubCommunityPointsChannelV1Message::Type::
RewardRedeemed:
return "reward-redeemed";
default:
return default_tag;
}
}

View file

@ -0,0 +1,17 @@
#include "providers/twitch/pubsubmessages/ChatModeratorAction.hpp"
namespace chatterino {
PubSubChatModeratorActionMessage::PubSubChatModeratorActionMessage(
const QJsonObject &root)
: typeString(root.value("type").toString())
, data(root.value("data").toObject())
{
auto oType = magic_enum::enum_cast<Type>(this->typeString.toStdString());
if (oType.has_value())
{
this->type = oType.value();
}
}
} // namespace chatterino

View file

@ -0,0 +1,46 @@
#pragma once
#include <QJsonObject>
#include <QString>
#include <magic_enum.hpp>
namespace chatterino {
struct PubSubChatModeratorActionMessage {
enum class Type {
ModerationAction,
ChannelTermsAction,
INVALID,
};
QString typeString;
Type type = Type::INVALID;
QJsonObject data;
PubSubChatModeratorActionMessage(const QJsonObject &root);
};
} // namespace chatterino
template <>
constexpr magic_enum::customize::customize_t magic_enum::customize::enum_name<
chatterino::PubSubChatModeratorActionMessage::Type>(
chatterino::PubSubChatModeratorActionMessage::Type value) noexcept
{
switch (value)
{
case chatterino::PubSubChatModeratorActionMessage::Type::
ModerationAction:
return "moderation_action";
case chatterino::PubSubChatModeratorActionMessage::Type::
ChannelTermsAction:
return "channel_terms_action";
default:
return default_tag;
}
}

View file

@ -0,0 +1,50 @@
#include "providers/twitch/pubsubmessages/Listen.hpp"
#include "util/Helpers.hpp"
#include <QJsonArray>
#include <QJsonDocument>
#include <QJsonObject>
namespace chatterino {
PubSubListenMessage::PubSubListenMessage(std::vector<QString> _topics)
: topics(std::move(_topics))
, nonce(generateUuid())
{
}
void PubSubListenMessage::setToken(const QString &_token)
{
this->token = _token;
}
QByteArray PubSubListenMessage::toJson() const
{
QJsonObject root;
root["type"] = "LISTEN";
root["nonce"] = this->nonce;
{
QJsonObject data;
QJsonArray jsonTopics;
std::copy(this->topics.begin(), this->topics.end(),
std::back_inserter(jsonTopics));
data["topics"] = jsonTopics;
if (!this->token.isEmpty())
{
data["auth_token"] = this->token;
}
root["data"] = data;
}
return QJsonDocument(root).toJson();
}
} // namespace chatterino

View file

@ -0,0 +1,24 @@
#pragma once
#include <QString>
#include <vector>
namespace chatterino {
// PubSubListenMessage is an outgoing LISTEN message that is sent for the client to subscribe to a list of topics
struct PubSubListenMessage {
const std::vector<QString> topics;
const QString nonce;
QString token;
PubSubListenMessage(std::vector<QString> _topics);
void setToken(const QString &_token);
QByteArray toJson() const;
};
} // namespace chatterino

View file

@ -0,0 +1,60 @@
#pragma once
#include "common/QLogging.hpp"
#include <QJsonDocument>
#include <QJsonObject>
#include <QString>
#include <boost/optional.hpp>
namespace chatterino {
struct PubSubMessageMessage {
QString nonce;
QString topic;
QJsonObject messageObject;
PubSubMessageMessage(QString _nonce, const QJsonObject &data)
: nonce(std::move(_nonce))
, topic(data.value("topic").toString())
{
auto messagePayload = data.value("message").toString().toUtf8();
auto messageDoc = QJsonDocument::fromJson(messagePayload);
if (messageDoc.isNull())
{
qCWarning(chatterinoPubSub) << "PubSub message (type MESSAGE) "
"missing inner message payload";
return;
}
if (!messageDoc.isObject())
{
qCWarning(chatterinoPubSub)
<< "PubSub message (type MESSAGE) inner message payload is not "
"an object";
return;
}
this->messageObject = messageDoc.object();
}
template <class InnerClass>
boost::optional<InnerClass> toInner() const;
};
template <class InnerClass>
boost::optional<InnerClass> PubSubMessageMessage::toInner() const
{
if (this->messageObject.empty())
{
return boost::none;
}
return InnerClass{this->messageObject};
}
} // namespace chatterino

View file

@ -0,0 +1,40 @@
#include "providers/twitch/pubsubmessages/Unlisten.hpp"
#include "util/Helpers.hpp"
#include <QJsonArray>
#include <QJsonDocument>
#include <QJsonObject>
namespace chatterino {
PubSubUnlistenMessage::PubSubUnlistenMessage(std::vector<QString> _topics)
: topics(std::move(_topics))
, nonce(generateUuid())
{
}
QByteArray PubSubUnlistenMessage::toJson() const
{
QJsonObject root;
root["type"] = "UNLISTEN";
root["nonce"] = this->nonce;
{
QJsonObject data;
QJsonArray jsonTopics;
std::copy(this->topics.begin(), this->topics.end(),
std::back_inserter(jsonTopics));
data["topics"] = jsonTopics;
root["data"] = data;
}
return QJsonDocument(root).toJson();
}
} // namespace chatterino

View file

@ -0,0 +1,20 @@
#pragma once
#include <QString>
#include <vector>
namespace chatterino {
// PubSubUnlistenMessage is an outgoing UNLISTEN message that is sent for the client to unsubscribe from a list of topics
struct PubSubUnlistenMessage {
const std::vector<QString> topics;
const QString nonce;
PubSubUnlistenMessage(std::vector<QString> _topics);
QByteArray toJson() const;
};
} // namespace chatterino

View file

@ -0,0 +1,38 @@
#include "providers/twitch/pubsubmessages/Whisper.hpp"
namespace chatterino {
PubSubWhisperMessage::PubSubWhisperMessage(const QJsonObject &root)
: typeString(root.value("type").toString())
{
auto oType = magic_enum::enum_cast<Type>(this->typeString.toStdString());
if (oType.has_value())
{
this->type = oType.value();
}
// Parse information from data_object
auto data = root.value("data_object").toObject();
this->messageID = data.value("message_id").toString();
this->id = data.value("id").toInt();
this->threadID = data.value("thread_id").toString();
this->body = data.value("body").toString();
auto fromID = data.value("from_id");
if (fromID.isString())
{
this->fromUserID = fromID.toString();
}
else
{
this->fromUserID = QString::number(data.value("from_id").toInt());
}
auto tags = data.value("tags").toObject();
this->fromUserLogin = tags.value("login").toString();
this->fromUserDisplayName = tags.value("display_name").toString();
this->fromUserColor = QColor(tags.value("color").toString());
}
} // namespace chatterino

View file

@ -0,0 +1,55 @@
#pragma once
#include <QColor>
#include <QJsonObject>
#include <QString>
#include <magic_enum.hpp>
namespace chatterino {
struct PubSubWhisperMessage {
enum class Type {
WhisperReceived,
WhisperSent,
Thread,
INVALID,
};
QString typeString;
Type type = Type::INVALID;
QString messageID;
int id;
QString threadID;
QString body;
QString fromUserID;
QString fromUserLogin;
QString fromUserDisplayName;
QColor fromUserColor;
PubSubWhisperMessage(const QJsonObject &root);
};
} // namespace chatterino
template <>
constexpr magic_enum::customize::customize_t
magic_enum::customize::enum_name<chatterino::PubSubWhisperMessage::Type>(
chatterino::PubSubWhisperMessage::Type value) noexcept
{
switch (value)
{
case chatterino::PubSubWhisperMessage::Type::WhisperReceived:
return "whisper_received";
case chatterino::PubSubWhisperMessage::Type::WhisperSent:
return "whisper_sent";
case chatterino::PubSubWhisperMessage::Type::Thread:
return "thread";
default:
return default_tag;
}
}

View file

@ -28,7 +28,8 @@
#ifndef NDEBUG
# include <rapidjson/document.h>
# include "providers/twitch/PubsubClient.hpp"
# include "providers/twitch/PubSubManager.hpp"
# include "providers/twitch/PubSubMessages.hpp"
# include "util/SampleCheerMessages.hpp"
# include "util/SampleLinks.hpp"
#endif
@ -56,10 +57,10 @@ Window::Window(WindowType type)
this->addMenuBar();
#endif
this->signalHolder_.managedConnect(
getApp()->accounts->twitch.currentUserChanged, [this] {
this->bSignals_.emplace_back(
getApp()->accounts->twitch.currentUserChanged.connect([this] {
this->onAccountSelected();
});
}));
this->onAccountSelected();
if (type == WindowType::Main)
@ -284,17 +285,24 @@ void Window::addDebugStuff(HotkeyController::HotkeyMap &actions)
static bool alt = true;
if (alt)
{
doc.Parse(channelRewardMessage);
auto oMessage = parsePubSubBaseMessage(channelRewardMessage);
auto oInnerMessage =
oMessage->toInner<PubSubMessageMessage>()
->toInner<PubSubCommunityPointsChannelV1Message>();
app->twitch->addFakeMessage(channelRewardIRCMessage);
app->twitch->pubsub->signals_.pointReward.redeemed.invoke(
doc["data"]["message"]["data"]["redemption"]);
oInnerMessage->data.value("redemption").toObject());
alt = !alt;
}
else
{
doc.Parse(channelRewardMessage2);
auto oMessage = parsePubSubBaseMessage(channelRewardMessage2);
auto oInnerMessage =
oMessage->toInner<PubSubMessageMessage>()
->toInner<PubSubCommunityPointsChannelV1Message>();
app->twitch->pubsub->signals_.pointReward.redeemed.invoke(
doc["data"]["message"]["data"]["redemption"]);
oInnerMessage->data.value("redemption").toObject());
alt = !alt;
}
return "";

View file

@ -2,6 +2,7 @@
#include "widgets/BaseWindow.hpp"
#include <boost/signals2.hpp>
#include <pajlada/settings/setting.hpp>
#include <pajlada/signals/signal.hpp>
#include <pajlada/signals/signalholder.hpp>
@ -48,6 +49,7 @@ private:
std::shared_ptr<UpdateDialog> updateDialogHandle_;
pajlada::Signals::SignalHolder signalHolder_;
std::vector<boost::signals2::scoped_connection> bSignals_;
friend class Notebook;
};

View file

@ -167,6 +167,9 @@ AboutPage::AboutPage()
addLicense(form.getElement(), "lrucache",
"https://github.com/lamerman/cpp-lru-cache",
":/licenses/lrucache.txt");
addLicense(form.getElement(), "magic_enum",
"https://github.com/Neargye/magic_enum",
":/licenses/magic_enum.txt");
}
// Attributions

View file

@ -103,10 +103,10 @@ Split::Split(QWidget *parent)
this->input_->ui_.textEdit->installEventFilter(parent);
// update placeholder text on Twitch account change and channel change
this->signalHolder_.managedConnect(
getApp()->accounts->twitch.currentUserChanged, [this] {
this->bSignals_.emplace_back(
getApp()->accounts->twitch.currentUserChanged.connect([this] {
this->updateInputPlaceholder();
});
}));
this->signalHolder_.managedConnect(channelChanged, [this] {
this->updateInputPlaceholder();
});

View file

@ -10,6 +10,7 @@
#include <QShortcut>
#include <QVBoxLayout>
#include <QWidget>
#include <boost/signals2.hpp>
namespace chatterino {
@ -151,6 +152,7 @@ private:
pajlada::Signals::Connection indirectChannelChangedConnection_;
pajlada::Signals::SignalHolder signalHolder_;
std::vector<boost::signals2::scoped_connection> bSignals_;
public slots:
void addSibling();

View file

@ -204,10 +204,10 @@ SplitHeader::SplitHeader(Split *_split)
this->handleChannelChanged();
});
this->managedConnections_.managedConnect(
getApp()->accounts->twitch.currentUserChanged, [this] {
this->bSignals_.emplace_back(
getApp()->accounts->twitch.currentUserChanged.connect([this] {
this->updateModerationModeIcon();
});
}));
auto _ = [this](const auto &, const auto &) {
this->updateChannelText();

View file

@ -2,15 +2,16 @@
#include "widgets/BaseWidget.hpp"
#include <QElapsedTimer>
#include <QMenu>
#include <QPoint>
#include <memory>
#include <boost/signals2.hpp>
#include <pajlada/settings/setting.hpp>
#include <pajlada/signals/connection.hpp>
#include <pajlada/signals/signalholder.hpp>
#include <vector>
#include <QElapsedTimer>
#include <memory>
#include <vector>
namespace chatterino {
@ -85,6 +86,7 @@ private:
pajlada::Signals::NoArgSignal modeUpdateRequested_;
pajlada::Signals::SignalHolder managedConnections_;
pajlada::Signals::SignalHolder channelConnections_;
std::vector<boost::signals2::scoped_connection> bSignals_;
public slots:
void reloadChannelEmotes();

View file

@ -16,6 +16,7 @@ set(test_SOURCES
${CMAKE_CURRENT_LIST_DIR}/src/Hotkeys.cpp
${CMAKE_CURRENT_LIST_DIR}/src/UtilTwitch.cpp
${CMAKE_CURRENT_LIST_DIR}/src/IrcHelpers.cpp
${CMAKE_CURRENT_LIST_DIR}/src/TwitchPubSubClient.cpp
# Add your new file above this line!
)

View file

@ -0,0 +1,439 @@
#include "providers/twitch/PubSubManager.hpp"
#include "providers/twitch/PubSubActions.hpp"
#include <gtest/gtest.h>
using namespace chatterino;
using namespace std::chrono_literals;
/**
* Server behaves normally and responds to pings (COMPLETE)
* Server doesn't respond to pings, client should disconnect (COMPLETE)
* Server randomly disconnects us, we should reconnect (COMPLETE)
* Client listens to more than 50 topics, so it opens 2 connections (COMPLETE)
* Server sends RECONNECT message to us, we should reconnect (INCOMPLETE, leaving for now since if we just ignore it and Twitch disconnects us we should already handle it properly)
* Listen that required authentication, but authentication is missing (COMPLETE)
* Listen that required authentication, but authentication is wrong (COMPLETE)
* Incoming Whisper message (COMPLETE)
* Incoming AutoMod message
* Incoming ChannelPoints message
* Incoming ChatModeratorAction message (COMPLETE)
**/
#define RUN_PUBSUB_TESTS
#ifdef RUN_PUBSUB_TESTS
TEST(TwitchPubSubClient, ServerRespondsToPings)
{
auto pingInterval = std::chrono::seconds(1);
const QString host("wss://127.0.0.1:9050");
auto *pubSub = new PubSub(host, pingInterval);
pubSub->setAccountData("token", "123456");
pubSub->start();
std::this_thread::sleep_for(50ms);
ASSERT_EQ(pubSub->diag.connectionsOpened, 0);
ASSERT_EQ(pubSub->diag.connectionsClosed, 0);
ASSERT_EQ(pubSub->diag.connectionsFailed, 0);
ASSERT_EQ(pubSub->diag.messagesReceived, 0);
pubSub->listenToTopic("test");
std::this_thread::sleep_for(50ms);
ASSERT_EQ(pubSub->diag.connectionsOpened, 1);
ASSERT_EQ(pubSub->diag.connectionsClosed, 0);
ASSERT_EQ(pubSub->diag.connectionsFailed, 0);
ASSERT_EQ(pubSub->diag.messagesReceived, 2);
ASSERT_EQ(pubSub->diag.listenResponses, 1);
std::this_thread::sleep_for(2s);
ASSERT_EQ(pubSub->diag.connectionsOpened, 1);
ASSERT_EQ(pubSub->diag.connectionsClosed, 0);
ASSERT_EQ(pubSub->diag.connectionsFailed, 0);
ASSERT_EQ(pubSub->diag.messagesReceived, 4);
pubSub->stop();
ASSERT_EQ(pubSub->diag.connectionsOpened, 1);
ASSERT_EQ(pubSub->diag.connectionsClosed, 1);
ASSERT_EQ(pubSub->diag.connectionsFailed, 0);
ASSERT_EQ(pubSub->diag.messagesReceived, 4);
ASSERT_EQ(pubSub->diag.listenResponses, 1);
}
TEST(TwitchPubSubClient, ServerDoesntRespondToPings)
{
auto pingInterval = std::chrono::seconds(1);
const QString host("wss://127.0.0.1:9050/dont-respond-to-ping");
auto *pubSub = new PubSub(host, pingInterval);
pubSub->setAccountData("token", "123456");
pubSub->start();
pubSub->listenToTopic("test");
std::this_thread::sleep_for(750ms);
ASSERT_EQ(pubSub->diag.connectionsOpened, 1);
ASSERT_EQ(pubSub->diag.connectionsClosed, 0);
ASSERT_EQ(pubSub->diag.connectionsFailed, 0);
ASSERT_EQ(pubSub->diag.messagesReceived, 1);
std::this_thread::sleep_for(500ms);
ASSERT_EQ(pubSub->diag.connectionsOpened, 2);
ASSERT_EQ(pubSub->diag.connectionsClosed, 1);
ASSERT_EQ(pubSub->diag.connectionsFailed, 0);
ASSERT_EQ(pubSub->diag.messagesReceived, 2);
pubSub->stop();
ASSERT_EQ(pubSub->diag.connectionsOpened, 2);
ASSERT_EQ(pubSub->diag.connectionsClosed, 2);
ASSERT_EQ(pubSub->diag.connectionsFailed, 0);
ASSERT_EQ(pubSub->diag.messagesReceived, 2);
}
TEST(TwitchPubSubClient, DisconnectedAfter1s)
{
auto pingInterval = std::chrono::seconds(10);
const QString host("wss://127.0.0.1:9050/disconnect-client-after-1s");
auto *pubSub = new PubSub(host, pingInterval);
pubSub->setAccountData("token", "123456");
pubSub->start();
std::this_thread::sleep_for(50ms);
ASSERT_EQ(pubSub->diag.connectionsOpened, 0);
ASSERT_EQ(pubSub->diag.connectionsClosed, 0);
ASSERT_EQ(pubSub->diag.connectionsFailed, 0);
ASSERT_EQ(pubSub->diag.messagesReceived, 0);
ASSERT_EQ(pubSub->diag.listenResponses, 0);
pubSub->listenToTopic("test");
std::this_thread::sleep_for(500ms);
ASSERT_EQ(pubSub->diag.connectionsOpened, 1);
ASSERT_EQ(pubSub->diag.connectionsClosed, 0);
ASSERT_EQ(pubSub->diag.connectionsFailed, 0);
ASSERT_EQ(pubSub->diag.messagesReceived, 2); // Listen RESPONSE & Pong
ASSERT_EQ(pubSub->diag.listenResponses, 1);
std::this_thread::sleep_for(350ms);
ASSERT_EQ(pubSub->diag.connectionsOpened, 1);
ASSERT_EQ(pubSub->diag.connectionsClosed, 0);
ASSERT_EQ(pubSub->diag.connectionsFailed, 0);
ASSERT_EQ(pubSub->diag.messagesReceived, 2);
std::this_thread::sleep_for(600ms);
ASSERT_EQ(pubSub->diag.connectionsOpened, 2);
ASSERT_EQ(pubSub->diag.connectionsClosed, 1);
ASSERT_EQ(pubSub->diag.connectionsFailed, 0);
ASSERT_EQ(pubSub->diag.listenResponses, 2);
ASSERT_EQ(pubSub->diag.messagesReceived, 4); // new listen & new pong
pubSub->stop();
}
TEST(TwitchPubSubClient, ExceedTopicLimit)
{
auto pingInterval = std::chrono::seconds(1);
const QString host("wss://127.0.0.1:9050");
auto *pubSub = new PubSub(host, pingInterval);
pubSub->setAccountData("token", "123456");
pubSub->start();
ASSERT_EQ(pubSub->diag.connectionsOpened, 0);
ASSERT_EQ(pubSub->diag.connectionsClosed, 0);
ASSERT_EQ(pubSub->diag.connectionsFailed, 0);
ASSERT_EQ(pubSub->diag.messagesReceived, 0);
for (auto i = 0; i < PubSubClient::MAX_LISTENS; ++i)
{
pubSub->listenToTopic(QString("test-1.%1").arg(i));
}
std::this_thread::sleep_for(50ms);
ASSERT_EQ(pubSub->diag.connectionsOpened, 1);
ASSERT_EQ(pubSub->diag.connectionsClosed, 0);
ASSERT_EQ(pubSub->diag.connectionsFailed, 0);
for (auto i = 0; i < PubSubClient::MAX_LISTENS; ++i)
{
pubSub->listenToTopic(QString("test-2.%1").arg(i));
}
std::this_thread::sleep_for(50ms);
ASSERT_EQ(pubSub->diag.connectionsOpened, 2);
ASSERT_EQ(pubSub->diag.connectionsClosed, 0);
ASSERT_EQ(pubSub->diag.connectionsFailed, 0);
pubSub->stop();
ASSERT_EQ(pubSub->diag.connectionsOpened, 2);
ASSERT_EQ(pubSub->diag.connectionsClosed, 2);
ASSERT_EQ(pubSub->diag.connectionsFailed, 0);
}
TEST(TwitchPubSubClient, ExceedTopicLimitSingleStep)
{
auto pingInterval = std::chrono::seconds(1);
const QString host("wss://127.0.0.1:9050");
auto *pubSub = new PubSub(host, pingInterval);
pubSub->setAccountData("token", "123456");
pubSub->start();
ASSERT_EQ(pubSub->diag.connectionsOpened, 0);
ASSERT_EQ(pubSub->diag.connectionsClosed, 0);
ASSERT_EQ(pubSub->diag.connectionsFailed, 0);
ASSERT_EQ(pubSub->diag.messagesReceived, 0);
for (auto i = 0; i < PubSubClient::MAX_LISTENS * 2; ++i)
{
pubSub->listenToTopic("test");
}
std::this_thread::sleep_for(50ms);
ASSERT_EQ(pubSub->diag.connectionsOpened, 2);
ASSERT_EQ(pubSub->diag.connectionsClosed, 0);
ASSERT_EQ(pubSub->diag.connectionsFailed, 0);
pubSub->stop();
ASSERT_EQ(pubSub->diag.connectionsOpened, 2);
ASSERT_EQ(pubSub->diag.connectionsClosed, 2);
ASSERT_EQ(pubSub->diag.connectionsFailed, 0);
}
TEST(TwitchPubSubClient, ReceivedWhisper)
{
auto pingInterval = std::chrono::seconds(1);
const QString host("wss://127.0.0.1:9050/receive-whisper");
auto *pubSub = new PubSub(host, pingInterval);
pubSub->setAccountData("token", "123456");
pubSub->start();
boost::optional<PubSubWhisperMessage> oReceivedWhisper;
pubSub->signals_.whisper.received.connect(
[&oReceivedWhisper](const auto &whisperMessage) {
oReceivedWhisper = whisperMessage;
});
pubSub->listenToTopic("whispers.123456");
std::this_thread::sleep_for(50ms);
ASSERT_EQ(pubSub->diag.connectionsOpened, 1);
ASSERT_EQ(pubSub->diag.connectionsClosed, 0);
ASSERT_EQ(pubSub->diag.connectionsFailed, 0);
ASSERT_EQ(pubSub->diag.messagesReceived, 3);
ASSERT_EQ(pubSub->diag.listenResponses, 1);
ASSERT_TRUE(oReceivedWhisper);
auto receivedWhisper = *oReceivedWhisper;
ASSERT_EQ(receivedWhisper.body, QString("me Kappa"));
ASSERT_EQ(receivedWhisper.fromUserLogin, QString("pajbot"));
ASSERT_EQ(receivedWhisper.fromUserID, QString("82008718"));
pubSub->stop();
ASSERT_EQ(pubSub->diag.connectionsOpened, 1);
ASSERT_EQ(pubSub->diag.connectionsClosed, 1);
ASSERT_EQ(pubSub->diag.connectionsFailed, 0);
}
TEST(TwitchPubSubClient, ModeratorActionsUserBanned)
{
auto pingInterval = std::chrono::seconds(1);
const QString host("wss://127.0.0.1:9050/moderator-actions-user-banned");
auto *pubSub = new PubSub(host, pingInterval);
pubSub->setAccountData("token", "123456");
pubSub->start();
boost::optional<BanAction> oReceivedAction;
pubSub->signals_.moderation.userBanned.connect(
[&oReceivedAction](const auto &action) {
oReceivedAction = action;
});
ASSERT_EQ(pubSub->diag.listenResponses, 0);
pubSub->listenToTopic("chat_moderator_actions.123456.123456");
std::this_thread::sleep_for(50ms);
ASSERT_EQ(pubSub->diag.connectionsOpened, 1);
ASSERT_EQ(pubSub->diag.connectionsClosed, 0);
ASSERT_EQ(pubSub->diag.connectionsFailed, 0);
ASSERT_EQ(pubSub->diag.messagesReceived, 3);
ASSERT_EQ(pubSub->diag.listenResponses, 1);
ASSERT_TRUE(oReceivedAction);
auto receivedAction = *oReceivedAction;
ActionUser expectedTarget{"140114344", "1xelerate", "", QColor()};
ActionUser expectedSource{"117691339", "mm2pl", "", QColor()};
ASSERT_EQ(receivedAction.reason, QString());
ASSERT_EQ(receivedAction.duration, 0);
ASSERT_EQ(receivedAction.target, expectedTarget);
ASSERT_EQ(receivedAction.source, expectedSource);
pubSub->stop();
ASSERT_EQ(pubSub->diag.connectionsOpened, 1);
ASSERT_EQ(pubSub->diag.connectionsClosed, 1);
ASSERT_EQ(pubSub->diag.connectionsFailed, 0);
}
TEST(TwitchPubSubClient, MissingToken)
{
auto pingInterval = std::chrono::seconds(1);
// The token that's required is "xD"
const QString host("wss://127.0.0.1:9050/authentication-required");
auto *pubSub = new PubSub(host, pingInterval);
// pubSub->setAccountData("", "123456");
pubSub->start();
pubSub->listenToTopic("chat_moderator_actions.123456.123456");
std::this_thread::sleep_for(50ms);
ASSERT_EQ(pubSub->diag.connectionsOpened, 1);
ASSERT_EQ(pubSub->diag.connectionsClosed, 0);
ASSERT_EQ(pubSub->diag.connectionsFailed, 0);
ASSERT_EQ(pubSub->diag.messagesReceived, 2);
ASSERT_EQ(pubSub->diag.listenResponses, 0);
ASSERT_EQ(pubSub->diag.failedListenResponses, 1);
pubSub->stop();
ASSERT_EQ(pubSub->diag.connectionsOpened, 1);
ASSERT_EQ(pubSub->diag.connectionsClosed, 1);
ASSERT_EQ(pubSub->diag.connectionsFailed, 0);
}
TEST(TwitchPubSubClient, WrongToken)
{
auto pingInterval = std::chrono::seconds(1);
// The token that's required is "xD"
const QString host("wss://127.0.0.1:9050/authentication-required");
auto *pubSub = new PubSub(host, pingInterval);
pubSub->setAccountData("wrongtoken", "123456");
pubSub->start();
pubSub->listenToTopic("chat_moderator_actions.123456.123456");
std::this_thread::sleep_for(50ms);
ASSERT_EQ(pubSub->diag.connectionsOpened, 1);
ASSERT_EQ(pubSub->diag.connectionsClosed, 0);
ASSERT_EQ(pubSub->diag.connectionsFailed, 0);
ASSERT_EQ(pubSub->diag.messagesReceived, 2);
ASSERT_EQ(pubSub->diag.listenResponses, 0);
ASSERT_EQ(pubSub->diag.failedListenResponses, 1);
pubSub->stop();
ASSERT_EQ(pubSub->diag.connectionsOpened, 1);
ASSERT_EQ(pubSub->diag.connectionsClosed, 1);
ASSERT_EQ(pubSub->diag.connectionsFailed, 0);
}
TEST(TwitchPubSubClient, CorrectToken)
{
auto pingInterval = std::chrono::seconds(1);
// The token that's required is "xD"
const QString host("wss://127.0.0.1:9050/authentication-required");
auto *pubSub = new PubSub(host, pingInterval);
pubSub->setAccountData("xD", "123456");
pubSub->start();
pubSub->listenToTopic("chat_moderator_actions.123456.123456");
std::this_thread::sleep_for(50ms);
ASSERT_EQ(pubSub->diag.connectionsOpened, 1);
ASSERT_EQ(pubSub->diag.connectionsClosed, 0);
ASSERT_EQ(pubSub->diag.connectionsFailed, 0);
ASSERT_EQ(pubSub->diag.messagesReceived, 2);
ASSERT_EQ(pubSub->diag.listenResponses, 1);
ASSERT_EQ(pubSub->diag.failedListenResponses, 0);
pubSub->stop();
ASSERT_EQ(pubSub->diag.connectionsOpened, 1);
ASSERT_EQ(pubSub->diag.connectionsClosed, 1);
ASSERT_EQ(pubSub->diag.connectionsFailed, 0);
}
TEST(TwitchPubSubClient, AutoModMessageHeld)
{
auto pingInterval = std::chrono::seconds(1);
const QString host("wss://127.0.0.1:9050/automod-held");
auto *pubSub = new PubSub(host, pingInterval);
pubSub->setAccountData("xD", "123456");
pubSub->start();
boost::optional<PubSubAutoModQueueMessage> oReceived;
boost::optional<QString> oChannelID;
pubSub->signals_.moderation.autoModMessageCaught.connect(
[&](const auto &msg, const QString &channelID) {
oReceived = msg;
oChannelID = channelID;
});
pubSub->listenToTopic("automod-queue.117166826.117166826");
std::this_thread::sleep_for(50ms);
ASSERT_EQ(pubSub->diag.connectionsOpened, 1);
ASSERT_EQ(pubSub->diag.connectionsClosed, 0);
ASSERT_EQ(pubSub->diag.connectionsFailed, 0);
ASSERT_EQ(pubSub->diag.messagesReceived, 3);
ASSERT_EQ(pubSub->diag.listenResponses, 1);
ASSERT_EQ(pubSub->diag.failedListenResponses, 0);
ASSERT_TRUE(oReceived);
ASSERT_TRUE(oChannelID);
auto received = *oReceived;
auto channelID = *oChannelID;
ASSERT_EQ(channelID, "117166826");
ASSERT_EQ(received.messageText, "kurwa");
pubSub->stop();
ASSERT_EQ(pubSub->diag.connectionsOpened, 1);
ASSERT_EQ(pubSub->diag.connectionsClosed, 1);
ASSERT_EQ(pubSub->diag.connectionsFailed, 0);
}
#endif

View file

@ -22,10 +22,13 @@
using namespace chatterino;
#define SUPPORT_QT_NETWORK_TESTS
int main(int argc, char **argv)
{
::testing::InitGoogleTest(&argc, argv);
#ifdef SUPPORT_QT_NETWORK_TESTS
QApplication app(argc, argv);
chatterino::NetworkManager::init();
@ -39,4 +42,7 @@ int main(int argc, char **argv)
});
return app.exec();
#else
return RUN_ALL_TESTS();
#endif
}