refactor: Twitch PubSub client (#5059)

* Remove unused `setAccountData` function

* Move PubSub out of TwitchIrcServer and into Application

* Add changelog entry

* fix: assert feedback

* Add PubSub::unlistenPrefix as per review suggestion

* Fix tests

* quit pubsub on exit

might conflict with exit removal, so can be reverted but this shows it's possible

* Don't manually call stop on clients, it's called when the connection is closed

* nit: rename `mainThread` to `thread`

* Join in a thread!!!!!!!!
This commit is contained in:
pajlada 2024-01-06 13:18:37 +01:00 committed by GitHub
parent e48d868e8c
commit 416806bb0a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 458 additions and 484 deletions

View file

@ -89,6 +89,7 @@
- Dev: Refactor `DebugCount` and add copy button to debug popup. (#4921)
- Dev: Refactor `common/Credentials`. (#4979)
- Dev: Refactor chat logger. (#5058)
- Dev: Refactor Twitch PubSub client. (#5059)
- Dev: Changed lifetime of context menus. (#4924)
- Dev: Renamed `tools` directory to `scripts`. (#5035)
- Dev: Refactor `ChannelView`, removing a bunch of clang-tidy warnings. (#4926)

View file

@ -104,6 +104,12 @@ public:
return nullptr;
}
PubSub *getTwitchPubSub() override
{
assert(false && "getTwitchPubSub was called without being initialized");
return nullptr;
}
Logging *getChatLogger() override
{
assert(!"getChatLogger was called without being initialized");

View file

@ -87,6 +87,8 @@ ISoundController *makeSoundController(Settings &settings)
}
}
const QString TWITCH_PUBSUB_URL = "wss://pubsub-edge.twitch.tv";
} // namespace
namespace chatterino {
@ -128,6 +130,7 @@ Application::Application(Settings &_settings, Paths &_paths, const Args &_args)
, userData(&this->emplace<UserDataController>())
, sound(&this->emplace<ISoundController>(makeSoundController(_settings)))
, twitchLiveController(&this->emplace<TwitchLiveController>())
, twitchPubSub(new PubSub(TWITCH_PUBSUB_URL))
, logging(new Logging(_settings))
#ifdef CHATTERINO_HAVE_PLUGINS
, plugins(&this->emplace<PluginController>())
@ -144,6 +147,11 @@ Application::Application(Settings &_settings, Paths &_paths, const Args &_args)
Application::~Application() = default;
void Application::fakeDtor()
{
this->twitchPubSub.reset();
}
void Application::initialize(Settings &settings, Paths &paths)
{
assert(isAppInitialized == false);
@ -314,6 +322,11 @@ ITwitchIrcServer *Application::getTwitch()
return this->twitch;
}
PubSub *Application::getTwitchPubSub()
{
return this->twitchPubSub.get();
}
Logging *Application::getChatLogger()
{
return this->logging.get();
@ -343,7 +356,7 @@ void Application::initPubSub()
{
// We can safely ignore these signal connections since the twitch object will always
// be destroyed before the Application
std::ignore = this->twitch->pubsub->signals_.moderation.chatCleared.connect(
std::ignore = this->twitchPubSub->moderation.chatCleared.connect(
[this](const auto &action) {
auto chan = this->twitch->getChannelOrEmptyByID(action.roomID);
if (chan->isEmpty())
@ -360,7 +373,7 @@ void Application::initPubSub()
});
});
std::ignore = this->twitch->pubsub->signals_.moderation.modeChanged.connect(
std::ignore = this->twitchPubSub->moderation.modeChanged.connect(
[this](const auto &action) {
auto chan = this->twitch->getChannelOrEmptyByID(action.roomID);
if (chan->isEmpty())
@ -386,29 +399,28 @@ void Application::initPubSub()
});
});
std::ignore =
this->twitch->pubsub->signals_.moderation.moderationStateChanged
.connect([this](const auto &action) {
auto chan = this->twitch->getChannelOrEmptyByID(action.roomID);
if (chan->isEmpty())
{
return;
}
std::ignore = this->twitchPubSub->moderation.moderationStateChanged.connect(
[this](const auto &action) {
auto chan = this->twitch->getChannelOrEmptyByID(action.roomID);
if (chan->isEmpty())
{
return;
}
QString text;
QString text;
text = QString("%1 %2 %3.")
.arg(action.source.login,
(action.modded ? "modded" : "unmodded"),
action.target.login);
text = QString("%1 %2 %3.")
.arg(action.source.login,
(action.modded ? "modded" : "unmodded"),
action.target.login);
auto msg = makeSystemMessage(text);
postToThread([chan, msg] {
chan->addMessage(msg);
});
auto msg = makeSystemMessage(text);
postToThread([chan, msg] {
chan->addMessage(msg);
});
});
std::ignore = this->twitch->pubsub->signals_.moderation.userBanned.connect(
std::ignore = this->twitchPubSub->moderation.userBanned.connect(
[&](const auto &action) {
auto chan = this->twitch->getChannelOrEmptyByID(action.roomID);
@ -423,67 +435,65 @@ void Application::initPubSub()
chan->addOrReplaceTimeout(msg.release());
});
});
std::ignore =
this->twitch->pubsub->signals_.moderation.messageDeleted.connect(
[&](const auto &action) {
auto chan = this->twitch->getChannelOrEmptyByID(action.roomID);
std::ignore = this->twitchPubSub->moderation.messageDeleted.connect(
[&](const auto &action) {
auto chan = this->twitch->getChannelOrEmptyByID(action.roomID);
if (chan->isEmpty() || getSettings()->hideDeletionActions)
if (chan->isEmpty() || getSettings()->hideDeletionActions)
{
return;
}
MessageBuilder msg;
TwitchMessageBuilder::deletionMessage(action, &msg);
msg->flags.set(MessageFlag::PubSub);
postToThread([chan, msg = msg.release()] {
auto replaced = false;
LimitedQueueSnapshot<MessagePtr> snapshot =
chan->getMessageSnapshot();
int snapshotLength = snapshot.size();
// without parens it doesn't build on windows
int end = (std::max)(0, snapshotLength - 200);
for (int i = snapshotLength - 1; i >= end; --i)
{
return;
auto &s = snapshot[i];
if (!s->flags.has(MessageFlag::PubSub) &&
s->timeoutUser == msg->timeoutUser)
{
chan->replaceMessage(s, msg);
replaced = true;
break;
}
}
MessageBuilder msg;
TwitchMessageBuilder::deletionMessage(action, &msg);
msg->flags.set(MessageFlag::PubSub);
postToThread([chan, msg = msg.release()] {
auto replaced = false;
LimitedQueueSnapshot<MessagePtr> snapshot =
chan->getMessageSnapshot();
int snapshotLength = snapshot.size();
// without parens it doesn't build on windows
int end = (std::max)(0, snapshotLength - 200);
for (int i = snapshotLength - 1; i >= end; --i)
{
auto &s = snapshot[i];
if (!s->flags.has(MessageFlag::PubSub) &&
s->timeoutUser == msg->timeoutUser)
{
chan->replaceMessage(s, msg);
replaced = true;
break;
}
}
if (!replaced)
{
chan->addMessage(msg);
}
});
});
std::ignore =
this->twitch->pubsub->signals_.moderation.userUnbanned.connect(
[&](const auto &action) {
auto chan = this->twitch->getChannelOrEmptyByID(action.roomID);
if (chan->isEmpty())
if (!replaced)
{
return;
}
auto msg = MessageBuilder(action).release();
postToThread([chan, msg] {
chan->addMessage(msg);
});
}
});
});
std::ignore = this->twitchPubSub->moderation.userUnbanned.connect(
[&](const auto &action) {
auto chan = this->twitch->getChannelOrEmptyByID(action.roomID);
if (chan->isEmpty())
{
return;
}
auto msg = MessageBuilder(action).release();
postToThread([chan, msg] {
chan->addMessage(msg);
});
});
std::ignore =
this->twitch->pubsub->signals_.moderation.suspiciousMessageReceived
.connect([&](const auto &action) {
this->twitchPubSub->moderation.suspiciousMessageReceived.connect(
[&](const auto &action) {
if (action.treatment ==
PubSubLowTrustUsersMessage::Treatment::INVALID)
{
@ -525,8 +535,8 @@ void Application::initPubSub()
});
std::ignore =
this->twitch->pubsub->signals_.moderation.suspiciousTreatmentUpdated
.connect([&](const auto &action) {
this->twitchPubSub->moderation.suspiciousTreatmentUpdated.connect(
[&](const auto &action) {
if (action.treatment ==
PubSubLowTrustUsersMessage::Treatment::INVALID)
{
@ -562,170 +572,162 @@ void Application::initPubSub()
});
});
std::ignore =
this->twitch->pubsub->signals_.moderation.autoModMessageCaught.connect(
[&](const auto &msg, const QString &channelID) {
auto chan = this->twitch->getChannelOrEmptyByID(channelID);
if (chan->isEmpty())
{
return;
}
std::ignore = this->twitchPubSub->moderation.autoModMessageCaught.connect(
[&](const auto &msg, const QString &channelID) {
auto chan = this->twitch->getChannelOrEmptyByID(channelID);
if (chan->isEmpty())
{
return;
}
switch (msg.type)
{
case PubSubAutoModQueueMessage::Type::
AutoModCaughtMessage: {
if (msg.status == "PENDING")
switch (msg.type)
{
case PubSubAutoModQueueMessage::Type::AutoModCaughtMessage: {
if (msg.status == "PENDING")
{
AutomodAction action(msg.data, channelID);
action.reason = QString("%1 level %2")
.arg(msg.contentCategory)
.arg(msg.contentLevel);
action.msgID = msg.messageID;
action.message = msg.messageText;
// this message also contains per-word automod data, which could be implemented
// extract sender data manually because Twitch loves not being consistent
QString senderDisplayName =
msg.senderUserDisplayName; // Might be transformed later
bool hasLocalizedName = false;
if (!msg.senderUserDisplayName.isEmpty())
{
AutomodAction action(msg.data, channelID);
action.reason = QString("%1 level %2")
.arg(msg.contentCategory)
.arg(msg.contentLevel);
action.msgID = msg.messageID;
action.message = msg.messageText;
// this message also contains per-word automod data, which could be implemented
// extract sender data manually because Twitch loves not being consistent
QString senderDisplayName =
msg.senderUserDisplayName; // Might be transformed later
bool hasLocalizedName = false;
if (!msg.senderUserDisplayName.isEmpty())
// check for non-ascii display names
if (QString::compare(msg.senderUserDisplayName,
msg.senderUserLogin,
Qt::CaseInsensitive) != 0)
{
// check for non-ascii display names
if (QString::compare(msg.senderUserDisplayName,
msg.senderUserLogin,
Qt::CaseInsensitive) != 0)
{
hasLocalizedName = true;
}
hasLocalizedName = true;
}
QColor senderColor = msg.senderUserChatColor;
QString senderColor_;
if (!senderColor.isValid() &&
getSettings()->colorizeNicknames)
{
// color may be not present if user is a grey-name
senderColor = getRandomColor(msg.senderUserID);
}
// handle username style based on prefered setting
switch (
getSettings()->usernameDisplayMode.getValue())
{
case UsernameDisplayMode::Username: {
if (hasLocalizedName)
{
senderDisplayName = msg.senderUserLogin;
}
break;
}
case UsernameDisplayMode::LocalizedName: {
break;
}
case UsernameDisplayMode::
UsernameAndLocalizedName: {
if (hasLocalizedName)
{
senderDisplayName =
QString("%1(%2)").arg(
msg.senderUserLogin,
msg.senderUserDisplayName);
}
break;
}
}
action.target = ActionUser{
msg.senderUserID, msg.senderUserLogin,
senderDisplayName, senderColor};
postToThread([chan, action] {
const auto p =
TwitchMessageBuilder::makeAutomodMessage(
action, chan->getName());
chan->addMessage(p.first);
chan->addMessage(p.second);
getApp()->twitch->automodChannel->addMessage(
p.first);
getApp()->twitch->automodChannel->addMessage(
p.second);
});
}
// "ALLOWED" and "DENIED" statuses remain unimplemented
// They are versions of automod_message_(denied|approved) but for mods.
QColor senderColor = msg.senderUserChatColor;
QString senderColor_;
if (!senderColor.isValid() &&
getSettings()->colorizeNicknames)
{
// color may be not present if user is a grey-name
senderColor = getRandomColor(msg.senderUserID);
}
// handle username style based on prefered setting
switch (getSettings()->usernameDisplayMode.getValue())
{
case UsernameDisplayMode::Username: {
if (hasLocalizedName)
{
senderDisplayName = msg.senderUserLogin;
}
break;
}
case UsernameDisplayMode::LocalizedName: {
break;
}
case UsernameDisplayMode::
UsernameAndLocalizedName: {
if (hasLocalizedName)
{
senderDisplayName = QString("%1(%2)").arg(
msg.senderUserLogin,
msg.senderUserDisplayName);
}
break;
}
}
action.target =
ActionUser{msg.senderUserID, msg.senderUserLogin,
senderDisplayName, senderColor};
postToThread([chan, action] {
const auto p =
TwitchMessageBuilder::makeAutomodMessage(
action, chan->getName());
chan->addMessage(p.first);
chan->addMessage(p.second);
getApp()->twitch->automodChannel->addMessage(
p.first);
getApp()->twitch->automodChannel->addMessage(
p.second);
});
}
break;
case PubSubAutoModQueueMessage::Type::INVALID:
default: {
}
break;
// "ALLOWED" and "DENIED" statuses remain unimplemented
// They are versions of automod_message_(denied|approved) but for mods.
}
break;
case PubSubAutoModQueueMessage::Type::INVALID:
default: {
}
break;
}
});
std::ignore = this->twitchPubSub->moderation.autoModMessageBlocked.connect(
[&](const auto &action) {
auto chan = this->twitch->getChannelOrEmptyByID(action.roomID);
if (chan->isEmpty())
{
return;
}
postToThread([chan, action] {
const auto p = TwitchMessageBuilder::makeAutomodMessage(
action, chan->getName());
chan->addMessage(p.first);
chan->addMessage(p.second);
});
});
std::ignore = this->twitchPubSub->moderation.automodUserMessage.connect(
[&](const auto &action) {
// This condition has been set up to execute isInStreamerMode() as the last thing
// as it could end up being expensive.
if (getSettings()->streamerModeHideModActions && isInStreamerMode())
{
return;
}
auto chan = this->twitch->getChannelOrEmptyByID(action.roomID);
if (chan->isEmpty())
{
return;
}
auto msg = MessageBuilder(action).release();
postToThread([chan, msg] {
chan->addMessage(msg);
});
chan->deleteMessage(msg->id);
});
std::ignore = this->twitchPubSub->moderation.automodInfoMessage.connect(
[&](const auto &action) {
auto chan = this->twitch->getChannelOrEmptyByID(action.roomID);
if (chan->isEmpty())
{
return;
}
postToThread([chan, action] {
const auto p =
TwitchMessageBuilder::makeAutomodInfoMessage(action);
chan->addMessage(p);
});
});
std::ignore =
this->twitch->pubsub->signals_.moderation.autoModMessageBlocked.connect(
[&](const auto &action) {
auto chan = this->twitch->getChannelOrEmptyByID(action.roomID);
if (chan->isEmpty())
{
return;
}
postToThread([chan, action] {
const auto p = TwitchMessageBuilder::makeAutomodMessage(
action, chan->getName());
chan->addMessage(p.first);
chan->addMessage(p.second);
});
});
std::ignore =
this->twitch->pubsub->signals_.moderation.automodUserMessage.connect(
[&](const auto &action) {
// This condition has been set up to execute isInStreamerMode() as the last thing
// as it could end up being expensive.
if (getSettings()->streamerModeHideModActions &&
isInStreamerMode())
{
return;
}
auto chan = this->twitch->getChannelOrEmptyByID(action.roomID);
if (chan->isEmpty())
{
return;
}
auto msg = MessageBuilder(action).release();
postToThread([chan, msg] {
chan->addMessage(msg);
});
chan->deleteMessage(msg->id);
});
std::ignore =
this->twitch->pubsub->signals_.moderation.automodInfoMessage.connect(
[&](const auto &action) {
auto chan = this->twitch->getChannelOrEmptyByID(action.roomID);
if (chan->isEmpty())
{
return;
}
postToThread([chan, action] {
const auto p =
TwitchMessageBuilder::makeAutomodInfoMessage(action);
chan->addMessage(p);
});
});
std::ignore = this->twitch->pubsub->signals_.pointReward.redeemed.connect(
[&](auto &data) {
this->twitchPubSub->pointReward.redeemed.connect([&](auto &data) {
QString channelId = data.value("channel_id").toString();
if (channelId.isEmpty())
{
@ -746,29 +748,19 @@ void Application::initPubSub()
});
});
this->twitch->pubsub->start();
auto RequestModerationActions = [this]() {
this->twitch->pubsub->setAccount(
getApp()->accounts->twitch.getCurrent());
// TODO(pajlada): Unlisten to all authed topics instead of only
// moderation topics this->twitch->pubsub->UnlistenAllAuthedTopics();
this->twitch->pubsub->listenToWhispers();
};
this->twitchPubSub->start();
this->twitchPubSub->setAccount(this->accounts->twitch.getCurrent());
this->accounts->twitch.currentUserChanged.connect(
[this] {
this->twitch->pubsub->unlistenAllModerationActions();
this->twitch->pubsub->unlistenAutomod();
this->twitch->pubsub->unlistenLowTrustUsers();
this->twitch->pubsub->unlistenWhispers();
this->twitchPubSub->unlistenChannelModerationActions();
this->twitchPubSub->unlistenAutomod();
this->twitchPubSub->unlistenLowTrustUsers();
this->twitchPubSub->unlistenChannelPointRewards();
this->twitchPubSub->setAccount(this->accounts->twitch.getCurrent());
},
boost::signals2::at_front);
this->accounts->twitch.currentUserChanged.connect(RequestModerationActions);
RequestModerationActions();
}
void Application::initBttvLiveUpdates()

View file

@ -68,6 +68,7 @@ public:
virtual HighlightController *getHighlights() = 0;
virtual NotificationController *getNotifications() = 0;
virtual ITwitchIrcServer *getTwitch() = 0;
virtual PubSub *getTwitchPubSub() = 0;
virtual Logging *getChatLogger() = 0;
virtual ChatterinoBadges *getChatterinoBadges() = 0;
virtual FfzBadges *getFfzBadges() = 0;
@ -97,6 +98,12 @@ public:
Application &operator=(const Application &) = delete;
Application &operator=(Application &&) = delete;
/**
* In the interim, before we remove _exit(0); from RunGui.cpp,
* this will destroy things we know can be destroyed
*/
void fakeDtor();
void initialize(Settings &settings, Paths &paths);
void load();
void save();
@ -128,6 +135,7 @@ public:
private:
TwitchLiveController *const twitchLiveController{};
std::unique_ptr<PubSub> twitchPubSub;
const std::unique_ptr<Logging> logging;
public:
@ -181,6 +189,7 @@ public:
return this->highlights;
}
ITwitchIrcServer *getTwitch() override;
PubSub *getTwitchPubSub() override;
Logging *getChatLogger() override;
ChatterinoBadges *getChatterinoBadges() override
{

View file

@ -288,6 +288,8 @@ void runGui(QApplication &a, Paths &paths, Settings &settings, const Args &args)
flushClipboard();
#endif
app.fakeDtor();
_exit(0);
}
} // namespace chatterino

View file

@ -22,6 +22,8 @@ PubSubClient::PubSubClient(WebsocketClient &websocketClient,
const PubSubClientOptions &clientOptions)
: websocketClient_(websocketClient)
, handle_(handle)
, heartbeatTimer_(std::make_shared<boost::asio::steady_timer>(
this->websocketClient_.get_io_service()))
, clientOptions_(clientOptions)
{
}
@ -40,6 +42,7 @@ void PubSubClient::stop()
assert(this->started_);
this->started_ = false;
this->heartbeatTimer_->cancel();
}
void PubSubClient::close(const std::string &reason,
@ -187,8 +190,8 @@ void PubSubClient::ping()
auto self = this->shared_from_this();
runAfter(this->websocketClient_.get_io_service(),
this->clientOptions_.pingInterval_, [self](auto timer) {
runAfter(this->heartbeatTimer_, this->clientOptions_.pingInterval_,
[self](auto timer) {
if (!self->started_)
{
return;

View file

@ -70,6 +70,7 @@ private:
std::atomic<bool> awaitingPong_{false};
std::atomic<bool> started_{false};
std::shared_ptr<boost::asio::steady_timer> heartbeatTimer_;
const PubSubClientOptions &clientOptions_;
};

View file

@ -17,12 +17,14 @@
#include <algorithm>
#include <exception>
#include <future>
#include <iostream>
#include <thread>
using websocketpp::lib::bind;
using websocketpp::lib::placeholders::_1;
using websocketpp::lib::placeholders::_2;
using namespace std::chrono_literals;
namespace chatterino {
@ -36,7 +38,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval)
const auto &roomID) {
ClearChatAction action(data, roomID);
this->signals_.moderation.chatCleared.invoke(action);
this->moderation.chatCleared.invoke(action);
};
this->moderationActionHandlers["slowoff"] = [this](const auto &data,
@ -46,7 +48,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval)
action.mode = ModeChangedAction::Mode::Slow;
action.state = ModeChangedAction::State::Off;
this->signals_.moderation.modeChanged.invoke(action);
this->moderation.modeChanged.invoke(action);
};
this->moderationActionHandlers["slow"] = [this](const auto &data,
@ -69,7 +71,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval)
action.duration = args.at(0).toString().toUInt(&ok, 10);
this->signals_.moderation.modeChanged.invoke(action);
this->moderation.modeChanged.invoke(action);
};
this->moderationActionHandlers["r9kbetaoff"] = [this](const auto &data,
@ -79,7 +81,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval)
action.mode = ModeChangedAction::Mode::R9K;
action.state = ModeChangedAction::State::Off;
this->signals_.moderation.modeChanged.invoke(action);
this->moderation.modeChanged.invoke(action);
};
this->moderationActionHandlers["r9kbeta"] = [this](const auto &data,
@ -89,7 +91,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval)
action.mode = ModeChangedAction::Mode::R9K;
action.state = ModeChangedAction::State::On;
this->signals_.moderation.modeChanged.invoke(action);
this->moderation.modeChanged.invoke(action);
};
this->moderationActionHandlers["subscribersoff"] =
@ -99,7 +101,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval)
action.mode = ModeChangedAction::Mode::SubscribersOnly;
action.state = ModeChangedAction::State::Off;
this->signals_.moderation.modeChanged.invoke(action);
this->moderation.modeChanged.invoke(action);
};
this->moderationActionHandlers["subscribers"] = [this](const auto &data,
@ -109,7 +111,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval)
action.mode = ModeChangedAction::Mode::SubscribersOnly;
action.state = ModeChangedAction::State::On;
this->signals_.moderation.modeChanged.invoke(action);
this->moderation.modeChanged.invoke(action);
};
this->moderationActionHandlers["emoteonlyoff"] =
@ -119,7 +121,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval)
action.mode = ModeChangedAction::Mode::EmoteOnly;
action.state = ModeChangedAction::State::Off;
this->signals_.moderation.modeChanged.invoke(action);
this->moderation.modeChanged.invoke(action);
};
this->moderationActionHandlers["emoteonly"] = [this](const auto &data,
@ -129,7 +131,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval)
action.mode = ModeChangedAction::Mode::EmoteOnly;
action.state = ModeChangedAction::State::On;
this->signals_.moderation.modeChanged.invoke(action);
this->moderation.modeChanged.invoke(action);
};
this->moderationActionHandlers["unmod"] = [this](const auto &data,
@ -149,7 +151,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval)
action.modded = false;
this->signals_.moderation.moderationStateChanged.invoke(action);
this->moderation.moderationStateChanged.invoke(action);
};
this->moderationActionHandlers["mod"] = [this](const auto &data,
@ -167,7 +169,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval)
action.target.id = data.value("target_user_id").toString();
action.target.login = data.value("target_user_login").toString();
this->signals_.moderation.moderationStateChanged.invoke(action);
this->moderation.moderationStateChanged.invoke(action);
};
this->moderationActionHandlers["timeout"] = [this](const auto &data,
@ -191,7 +193,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval)
action.duration = args[1].toString().toUInt(&ok, 10);
action.reason = args[2].toString(); // May be omitted
this->signals_.moderation.userBanned.invoke(action);
this->moderation.userBanned.invoke(action);
};
this->moderationActionHandlers["delete"] = [this](const auto &data,
@ -214,7 +216,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval)
action.messageText = args[1].toString();
action.messageId = args[2].toString();
this->signals_.moderation.messageDeleted.invoke(action);
this->moderation.messageDeleted.invoke(action);
};
this->moderationActionHandlers["ban"] = [this](const auto &data,
@ -236,7 +238,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval)
action.target.login = args[0].toString();
action.reason = args[1].toString(); // May be omitted
this->signals_.moderation.userBanned.invoke(action);
this->moderation.userBanned.invoke(action);
};
this->moderationActionHandlers["unban"] = [this](const auto &data,
@ -259,7 +261,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval)
action.target.login = args[0].toString();
this->signals_.moderation.userUnbanned.invoke(action);
this->moderation.userUnbanned.invoke(action);
};
this->moderationActionHandlers["untimeout"] = [this](const auto &data,
@ -282,7 +284,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval)
action.target.login = args[0].toString();
this->signals_.moderation.userUnbanned.invoke(action);
this->moderation.userUnbanned.invoke(action);
};
/*
@ -315,7 +317,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval)
action.message = args[1].toString(); // May be omitted
action.reason = args[2].toString(); // May be omitted
this->signals_.moderation.autoModMessageBlocked.invoke(action);
this->moderation.autoModMessageBlocked.invoke(action);
};
*/
@ -323,21 +325,21 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval)
[this](const auto &data, const auto &roomID) {
AutomodInfoAction action(data, roomID);
action.type = AutomodInfoAction::OnHold;
this->signals_.moderation.automodInfoMessage.invoke(action);
this->moderation.automodInfoMessage.invoke(action);
};
this->moderationActionHandlers["automod_message_denied"] =
[this](const auto &data, const auto &roomID) {
AutomodInfoAction action(data, roomID);
action.type = AutomodInfoAction::Denied;
this->signals_.moderation.automodInfoMessage.invoke(action);
this->moderation.automodInfoMessage.invoke(action);
};
this->moderationActionHandlers["automod_message_approved"] =
[this](const auto &data, const auto &roomID) {
AutomodInfoAction action(data, roomID);
action.type = AutomodInfoAction::Approved;
this->signals_.moderation.automodInfoMessage.invoke(action);
this->moderation.automodInfoMessage.invoke(action);
};
this->channelTermsActionHandlers["add_permitted_term"] =
@ -351,7 +353,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval)
action.message = data.value("text").toString();
action.source.login = data.value("requester_login").toString();
this->signals_.moderation.automodUserMessage.invoke(action);
this->moderation.automodUserMessage.invoke(action);
};
this->channelTermsActionHandlers["add_blocked_term"] =
@ -365,7 +367,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval)
action.message = data.value("text").toString();
action.source.login = data.value("requester_login").toString();
this->signals_.moderation.automodUserMessage.invoke(action);
this->moderation.automodUserMessage.invoke(action);
};
this->moderationActionHandlers["delete_permitted_term"] =
@ -385,7 +387,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval)
action.message = args[0].toString();
this->signals_.moderation.automodUserMessage.invoke(action);
this->moderation.automodUserMessage.invoke(action);
};
this->channelTermsActionHandlers["delete_permitted_term"] =
@ -399,7 +401,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval)
action.message = data.value("text").toString();
action.source.login = data.value("requester_login").toString();
this->signals_.moderation.automodUserMessage.invoke(action);
this->moderation.automodUserMessage.invoke(action);
};
this->moderationActionHandlers["delete_blocked_term"] =
@ -420,7 +422,7 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval)
action.message = args[0].toString();
this->signals_.moderation.automodUserMessage.invoke(action);
this->moderation.automodUserMessage.invoke(action);
};
this->channelTermsActionHandlers["delete_blocked_term"] =
[this](const auto &data, const auto &roomID) {
@ -434,21 +436,9 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval)
action.message = data.value("text").toString();
action.source.login = data.value("requester_login").toString();
this->signals_.moderation.automodUserMessage.invoke(action);
this->moderation.automodUserMessage.invoke(action);
};
// We don't get this one anymore or anything similiar
// We need some new topic so we can listen
//
//this->moderationActionHandlers["modified_automod_properties"] =
// [this](const auto &data, const auto &roomID) {
// // The automod settings got modified
// AutomodUserAction action(data, roomID);
// getCreatedByUser(data, action.source);
// action.type = AutomodUserAction::Properties;
// this->signals_.moderation.automodUserMessage.invoke(action);
// };
this->moderationActionHandlers["denied_automod_message"] =
[](const auto &data, const auto &roomID) {
// This message got denied by a moderator
@ -482,18 +472,17 @@ PubSub::PubSub(const QString &host, std::chrono::seconds pingInterval)
bind(&PubSub::onConnectionFail, this, ::_1));
}
PubSub::~PubSub()
{
this->stop();
}
void PubSub::setAccount(std::shared_ptr<TwitchAccount> account)
{
this->token_ = account->getOAuthToken();
this->userID_ = account->getUserId();
}
void PubSub::setAccountData(QString token, QString userID)
{
this->token_ = token;
this->userID_ = userID;
}
void PubSub::addClient()
{
if (this->addingClient)
@ -525,104 +514,43 @@ void PubSub::start()
{
this->work = std::make_shared<boost::asio::io_service::work>(
this->websocketClient.get_io_service());
this->mainThread.reset(
new std::thread(std::bind(&PubSub::runThread, this)));
this->thread.reset(new std::thread(std::bind(&PubSub::runThread, this)));
}
void PubSub::stop()
{
this->stopping_ = true;
for (const auto &client : this->clients)
for (const auto &[hdl, client] : this->clients)
{
client.second->close("Shutting down");
(void)hdl;
client->close("Shutting down");
}
this->work.reset();
if (this->mainThread->joinable())
if (this->thread->joinable())
{
this->mainThread->join();
// NOTE: We spawn a new thread to join the websocket thread.
// There is a case where a new client was initiated but not added to the clients list.
// We just don't join the thread & let the operating system nuke the thread if joining fails
// within 1s.
// We could fix the underlying bug, but this is easier & we realistically won't use this exact code
// for super much longer.
auto joiner = std::async(std::launch::async, &std::thread::join,
this->thread.get());
if (joiner.wait_for(1s) == std::future_status::timeout)
{
qCWarning(chatterinoPubSub)
<< "Thread didn't join within 1 second, rip it out";
this->websocketClient.stop();
}
}
assert(this->clients.empty());
}
void PubSub::unlistenAllModerationActions()
{
for (const auto &p : this->clients)
{
const auto &client = p.second;
if (const auto &[topics, nonce] =
client->unlistenPrefix("chat_moderator_actions.");
!topics.empty())
{
this->registerNonce(nonce, {
client,
"UNLISTEN",
topics,
topics.size(),
});
}
}
}
void PubSub::unlistenAutomod()
{
for (const auto &p : this->clients)
{
const auto &client = p.second;
if (const auto &[topics, nonce] =
client->unlistenPrefix("automod-queue.");
!topics.empty())
{
this->registerNonce(nonce, {
client,
"UNLISTEN",
topics,
topics.size(),
});
}
}
}
void PubSub::unlistenLowTrustUsers()
{
for (const auto &p : this->clients)
{
const auto &client = p.second;
if (const auto &[topics, nonce] =
client->unlistenPrefix("low-trust-users.");
!topics.empty())
{
this->registerNonce(nonce, {
client,
"UNLISTEN",
topics,
topics.size(),
});
}
}
}
void PubSub::unlistenWhispers()
{
for (const auto &p : this->clients)
{
const auto &client = p.second;
if (const auto &[topics, nonce] = client->unlistenPrefix("whispers.");
!topics.empty())
{
this->registerNonce(nonce, {
client,
"UNLISTEN",
topics,
topics.size(),
});
}
}
}
bool PubSub::listenToWhispers()
{
if (this->userID_.isEmpty())
@ -642,6 +570,11 @@ bool PubSub::listenToWhispers()
return true;
}
void PubSub::unlistenWhispers()
{
this->unlistenPrefix("whispers.");
}
void PubSub::listenToChannelModerationActions(const QString &channelID)
{
if (this->userID_.isEmpty())
@ -666,6 +599,11 @@ void PubSub::listenToChannelModerationActions(const QString &channelID)
this->listenToTopic(topic);
}
void PubSub::unlistenChannelModerationActions()
{
this->unlistenPrefix("chat_moderator_actions.");
}
void PubSub::listenToAutomod(const QString &channelID)
{
if (this->userID_.isEmpty())
@ -690,6 +628,11 @@ void PubSub::listenToAutomod(const QString &channelID)
this->listenToTopic(topic);
}
void PubSub::unlistenAutomod()
{
this->unlistenPrefix("automod-queue.");
}
void PubSub::listenToLowTrustUsers(const QString &channelID)
{
if (this->userID_.isEmpty())
@ -714,6 +657,11 @@ void PubSub::listenToLowTrustUsers(const QString &channelID)
this->listenToTopic(topic);
}
void PubSub::unlistenLowTrustUsers()
{
this->unlistenPrefix("low-trust-users.");
}
void PubSub::listenToChannelPointRewards(const QString &channelID)
{
static const QString topicFormat("community-points-channel-v1.%1");
@ -730,6 +678,30 @@ void PubSub::listenToChannelPointRewards(const QString &channelID)
this->listenToTopic(topic);
}
void PubSub::unlistenChannelPointRewards()
{
this->unlistenPrefix("community-points-channel-v1.");
}
void PubSub::unlistenPrefix(const QString &prefix)
{
for (const auto &p : this->clients)
{
const auto &client = p.second;
if (const auto &[topics, nonce] = client->unlistenPrefix(prefix);
!topics.empty())
{
NonceInfo nonceInfo{
client,
"UNLISTEN",
topics,
topics.size(),
};
this->registerNonce(nonce, nonceInfo);
}
}
}
void PubSub::listen(PubSubListenMessage msg)
{
if (this->tryListen(msg))
@ -1083,11 +1055,11 @@ void PubSub::handleMessageResponse(const PubSubMessageMessage &message)
switch (whisperMessage.type)
{
case PubSubWhisperMessage::Type::WhisperReceived: {
this->signals_.whisper.received.invoke(whisperMessage);
this->whisper.received.invoke(whisperMessage);
}
break;
case PubSubWhisperMessage::Type::WhisperSent: {
this->signals_.whisper.sent.invoke(whisperMessage);
this->whisper.sent.invoke(whisperMessage);
}
break;
case PubSubWhisperMessage::Type::Thread: {
@ -1182,7 +1154,7 @@ void PubSub::handleMessageResponse(const PubSubMessageMessage &message)
case PubSubCommunityPointsChannelV1Message::Type::RewardRedeemed: {
auto redemption =
innerMessage.data.value("redemption").toObject();
this->signals_.pointReward.redeemed.invoke(redemption);
this->pointReward.redeemed.invoke(redemption);
}
break;
@ -1210,8 +1182,7 @@ void PubSub::handleMessageResponse(const PubSubMessageMessage &message)
// Channel ID where the moderator actions are coming from
auto channelID = topicParts[2];
this->signals_.moderation.autoModMessageCaught.invoke(innerMessage,
channelID);
this->moderation.autoModMessageCaught.invoke(innerMessage, channelID);
}
else if (topic.startsWith("low-trust-users."))
{
@ -1226,13 +1197,12 @@ void PubSub::handleMessageResponse(const PubSubMessageMessage &message)
switch (innerMessage.type)
{
case PubSubLowTrustUsersMessage::Type::UserMessage: {
this->signals_.moderation.suspiciousMessageReceived.invoke(
innerMessage);
this->moderation.suspiciousMessageReceived.invoke(innerMessage);
}
break;
case PubSubLowTrustUsersMessage::Type::TreatmentUpdate: {
this->signals_.moderation.suspiciousTreatmentUpdated.invoke(
this->moderation.suspiciousTreatmentUpdated.invoke(
innerMessage);
}
break;

View file

@ -41,6 +41,13 @@ struct PubSubListenMessage;
struct PubSubMessage;
struct PubSubMessageMessage;
/**
* This handles the Twitch PubSub connection
*
* Known issues:
* - Upon closing a channel, we don't unsubscribe to its pubsub connections
* - Stop is never called, meaning we never do a clean shutdown
*/
class PubSub
{
using WebsocketMessagePtr =
@ -60,75 +67,62 @@ class PubSub
};
WebsocketClient websocketClient;
std::unique_ptr<std::thread> mainThread;
std::unique_ptr<std::thread> thread;
// Account credentials
// Set from setAccount or setAccountData
// Set from setAccount
QString token_;
QString userID_;
public:
PubSub(const QString &host,
std::chrono::seconds pingInterval = std::chrono::seconds(15));
~PubSub();
PubSub(const PubSub &) = delete;
PubSub(PubSub &&) = delete;
PubSub &operator=(const PubSub &) = delete;
PubSub &operator=(PubSub &&) = delete;
void setAccount(std::shared_ptr<TwitchAccount> account);
void setAccountData(QString token, QString userID);
enum class State {
Connected,
Disconnected,
};
void start();
void stop();
bool isConnected() const
{
return this->state == State::Connected;
}
struct {
Signal<ClearChatAction> chatCleared;
Signal<DeleteAction> messageDeleted;
Signal<ModeChangedAction> modeChanged;
Signal<ModerationStateAction> moderationStateChanged;
Signal<BanAction> userBanned;
Signal<UnbanAction> userUnbanned;
Signal<PubSubLowTrustUsersMessage> suspiciousMessageReceived;
Signal<PubSubLowTrustUsersMessage> suspiciousTreatmentUpdated;
// Message caught by automod
// channelID
pajlada::Signals::Signal<PubSubAutoModQueueMessage, QString>
autoModMessageCaught;
// Message blocked by moderator
Signal<AutomodAction> autoModMessageBlocked;
Signal<AutomodUserAction> automodUserMessage;
Signal<AutomodInfoAction> automodInfoMessage;
} moderation;
struct {
struct {
Signal<ClearChatAction> chatCleared;
Signal<DeleteAction> messageDeleted;
Signal<ModeChangedAction> modeChanged;
Signal<ModerationStateAction> moderationStateChanged;
// Parsing should be done in PubSubManager as well,
// but for now we just send the raw data
Signal<const PubSubWhisperMessage &> received;
Signal<const PubSubWhisperMessage &> sent;
} whisper;
Signal<BanAction> userBanned;
Signal<UnbanAction> userUnbanned;
Signal<PubSubLowTrustUsersMessage> suspiciousMessageReceived;
Signal<PubSubLowTrustUsersMessage> suspiciousTreatmentUpdated;
// Message caught by automod
// channelID
pajlada::Signals::Signal<PubSubAutoModQueueMessage, QString>
autoModMessageCaught;
// Message blocked by moderator
Signal<AutomodAction> autoModMessageBlocked;
Signal<AutomodUserAction> automodUserMessage;
Signal<AutomodInfoAction> automodInfoMessage;
} moderation;
struct {
// Parsing should be done in PubSubManager as well,
// but for now we just send the raw data
Signal<const PubSubWhisperMessage &> received;
Signal<const PubSubWhisperMessage &> sent;
} whisper;
struct {
Signal<const QJsonObject &> redeemed;
} pointReward;
} signals_;
void unlistenAllModerationActions();
void unlistenAutomod();
void unlistenLowTrustUsers();
void unlistenWhispers();
struct {
Signal<const QJsonObject &> redeemed;
} pointReward;
/**
* Listen to incoming whispers for the currently logged in user.
@ -137,6 +131,7 @@ public:
* PubSub topic: whispers.{currentUserID}
*/
bool listenToWhispers();
void unlistenWhispers();
/**
* Listen to moderation actions in the given channel.
@ -150,6 +145,7 @@ public:
* PubSub topic: chat_moderator_actions.{currentUserID}.{channelID}
*/
void listenToChannelModerationActions(const QString &channelID);
void unlistenChannelModerationActions();
/**
* Listen to Automod events in the given channel.
@ -160,6 +156,7 @@ public:
* PubSub topic: automod-queue.{currentUserID}.{channelID}
*/
void listenToAutomod(const QString &channelID);
void unlistenAutomod();
/**
* Listen to Low Trust events in the given channel.
@ -170,6 +167,7 @@ public:
* PubSub topic: low-trust-users.{currentUserID}.{channelID}
*/
void listenToLowTrustUsers(const QString &channelID);
void unlistenLowTrustUsers();
/**
* Listen to incoming channel point redemptions in the given channel.
@ -178,8 +176,7 @@ public:
* PubSub topic: community-points-channel-v1.{channelID}
*/
void listenToChannelPointRewards(const QString &channelID);
std::vector<QString> requests;
void unlistenChannelPointRewards();
struct {
std::atomic<uint32_t> connectionsClosed{0};
@ -192,20 +189,26 @@ public:
std::atomic<uint32_t> unlistenResponses{0};
} diag;
private:
/**
* Unlistens to all topics matching the prefix in all clients
*/
void unlistenPrefix(const QString &prefix);
void listenToTopic(const QString &topic);
private:
void listen(PubSubListenMessage msg);
bool tryListen(PubSubListenMessage msg);
bool isListeningToTopic(const QString &topic);
void addClient();
std::vector<QString> requests;
std::atomic<bool> addingClient{false};
ExponentialBackoff<5> connectBackoff{std::chrono::milliseconds(1000)};
State state = State::Connected;
std::map<WebsocketHandle, std::shared_ptr<PubSubClient>,
std::owner_less<WebsocketHandle>>
clients;

View file

@ -1250,15 +1250,15 @@ void TwitchChannel::refreshPubSub()
auto currentAccount = getApp()->accounts->twitch.getCurrent();
getApp()->twitch->pubsub->setAccount(currentAccount);
getIApp()->getTwitchPubSub()->setAccount(currentAccount);
getApp()->twitch->pubsub->listenToChannelModerationActions(roomId);
getIApp()->getTwitchPubSub()->listenToChannelModerationActions(roomId);
if (this->hasModRights())
{
getApp()->twitch->pubsub->listenToAutomod(roomId);
getApp()->twitch->pubsub->listenToLowTrustUsers(roomId);
getIApp()->getTwitchPubSub()->listenToAutomod(roomId);
getIApp()->getTwitchPubSub()->listenToLowTrustUsers(roomId);
}
getApp()->twitch->pubsub->listenToChannelPointRewards(roomId);
getIApp()->getTwitchPubSub()->listenToChannelPointRewards(roomId);
}
void TwitchChannel::refreshChatters()

View file

@ -1,4 +1,4 @@
#include "TwitchIrcServer.hpp"
#include "providers/twitch/TwitchIrcServer.hpp"
#include "Application.hpp"
#include "common/Channel.hpp"
@ -13,7 +13,6 @@
#include "providers/twitch/api/Helix.hpp"
#include "providers/twitch/ChannelPointReward.hpp"
#include "providers/twitch/IrcMessageHandler.hpp"
#include "providers/twitch/PubSubManager.hpp"
#include "providers/twitch/TwitchAccount.hpp"
#include "providers/twitch/TwitchChannel.hpp"
#include "singletons/Settings.hpp"
@ -29,7 +28,6 @@ using namespace std::chrono_literals;
namespace {
const QString TWITCH_PUBSUB_URL = "wss://pubsub-edge.twitch.tv";
const QString BTTV_LIVE_UPDATES_URL = "wss://sockets.betterttv.net/ws";
const QString SEVENTV_EVENTAPI_URL = "wss://events.7tv.io/v3";
@ -43,7 +41,6 @@ TwitchIrcServer::TwitchIrcServer()
, liveChannel(new Channel("/live", Channel::Type::TwitchLive))
, automodChannel(new Channel("/automod", Channel::Type::TwitchAutomod))
, watchingChannel(Channel::getEmpty(), Channel::Type::TwitchWatching)
, pubsub(new PubSub(TWITCH_PUBSUB_URL))
{
this->initializeIrc();
@ -72,7 +69,6 @@ void TwitchIrcServer::initialize(Settings &settings, Paths &paths)
getApp()->accounts->twitch.currentUserChanged.connect([this]() {
postToThread([this] {
this->connect();
this->pubsub->setAccount(getApp()->accounts->twitch.getCurrent());
});
});

View file

@ -18,7 +18,6 @@ namespace chatterino {
class Settings;
class Paths;
class PubSub;
class TwitchChannel;
class BttvLiveUpdates;
class SeventvEventAPI;
@ -80,8 +79,6 @@ public:
const ChannelPtr automodChannel;
IndirectChannel watchingChannel;
// NOTE: We currently leak this
PubSub *pubsub;
std::unique_ptr<BttvLiveUpdates> bttvLiveUpdates;
std::unique_ptr<SeventvEventAPI> seventvEventAPI;

View file

@ -281,7 +281,7 @@ void Window::addDebugStuff(HotkeyController::HotkeyMap &actions)
->toInner<PubSubCommunityPointsChannelV1Message>();
app->twitch->addFakeMessage(getSampleChannelRewardIRCMessage());
app->twitch->pubsub->signals_.pointReward.redeemed.invoke(
getIApp()->getTwitchPubSub()->pointReward.redeemed.invoke(
oInnerMessage->data.value("redemption").toObject());
alt = !alt;
}
@ -292,7 +292,7 @@ void Window::addDebugStuff(HotkeyController::HotkeyMap &actions)
auto oInnerMessage =
oMessage->toInner<PubSubMessageMessage>()
->toInner<PubSubCommunityPointsChannelV1Message>();
app->twitch->pubsub->signals_.pointReward.redeemed.invoke(
getIApp()->getTwitchPubSub()->pointReward.redeemed.invoke(
oInnerMessage->data.value("redemption").toObject());
alt = !alt;
}

View file

@ -3,6 +3,7 @@
#include "providers/twitch/PubSubManager.hpp"
#include "providers/twitch/pubsubmessages/AutoMod.hpp"
#include "providers/twitch/pubsubmessages/Whisper.hpp"
#include "providers/twitch/TwitchAccount.hpp"
#include "TestHelpers.hpp"
#include <gtest/gtest.h>
@ -35,9 +36,13 @@ using namespace std::chrono_literals;
class FTest : public PubSub
{
public:
explicit FTest(const char *path, std::chrono::seconds pingInterval)
explicit FTest(const char *path, std::chrono::seconds pingInterval,
QString token = "token")
: PubSub(QString("wss://127.0.0.1:9050%1").arg(path), pingInterval)
{
auto account = std::make_shared<TwitchAccount>("testaccount_420", token,
"clientid", "123456");
this->setAccount(account);
}
};
@ -45,7 +50,6 @@ TEST(TwitchPubSubClient, ServerRespondsToPings)
{
FTest pubSub("", 1s);
pubSub.setAccountData("token", "123456");
pubSub.start();
std::this_thread::sleep_for(50ms);
@ -55,7 +59,7 @@ TEST(TwitchPubSubClient, ServerRespondsToPings)
ASSERT_EQ(pubSub.diag.connectionsFailed, 0);
ASSERT_EQ(pubSub.diag.messagesReceived, 0);
pubSub.listenToTopic("test");
pubSub.listenToChannelModerationActions("123456");
std::this_thread::sleep_for(150ms);
@ -85,9 +89,8 @@ TEST(TwitchPubSubClient, ServerDoesntRespondToPings)
{
FTest pubSub("/dont-respond-to-ping", 1s);
pubSub.setAccountData("token", "123456");
pubSub.start();
pubSub.listenToTopic("test");
pubSub.listenToChannelModerationActions("123456");
std::this_thread::sleep_for(750ms);
@ -115,7 +118,6 @@ TEST(TwitchPubSubClient, DisconnectedAfter1s)
{
FTest pubSub("/disconnect-client-after-1s", 10s);
pubSub.setAccountData("token", "123456");
pubSub.start();
std::this_thread::sleep_for(50ms);
@ -126,7 +128,7 @@ TEST(TwitchPubSubClient, DisconnectedAfter1s)
ASSERT_EQ(pubSub.diag.messagesReceived, 0);
ASSERT_EQ(pubSub.diag.listenResponses, 0);
pubSub.listenToTopic("test");
pubSub.listenToChannelModerationActions("123456");
std::this_thread::sleep_for(500ms);
@ -158,7 +160,6 @@ TEST(TwitchPubSubClient, ExceedTopicLimit)
{
FTest pubSub("", 1s);
pubSub.setAccountData("token", "123456");
pubSub.start();
ASSERT_EQ(pubSub.diag.connectionsOpened, 0);
@ -168,7 +169,7 @@ TEST(TwitchPubSubClient, ExceedTopicLimit)
for (auto i = 0; i < PubSubClient::MAX_LISTENS; ++i)
{
pubSub.listenToTopic(QString("test-1.%1").arg(i));
pubSub.listenToChannelModerationActions(QString("1%1").arg(i));
}
std::this_thread::sleep_for(50ms);
@ -179,7 +180,7 @@ TEST(TwitchPubSubClient, ExceedTopicLimit)
for (auto i = 0; i < PubSubClient::MAX_LISTENS; ++i)
{
pubSub.listenToTopic(QString("test-2.%1").arg(i));
pubSub.listenToChannelModerationActions(QString("2%1").arg(i));
}
std::this_thread::sleep_for(50ms);
@ -199,7 +200,6 @@ TEST(TwitchPubSubClient, ExceedTopicLimitSingleStep)
{
FTest pubSub("", 1s);
pubSub.setAccountData("token", "123456");
pubSub.start();
ASSERT_EQ(pubSub.diag.connectionsOpened, 0);
@ -209,7 +209,7 @@ TEST(TwitchPubSubClient, ExceedTopicLimitSingleStep)
for (auto i = 0; i < PubSubClient::MAX_LISTENS * 2; ++i)
{
pubSub.listenToTopic("test");
pubSub.listenToChannelModerationActions("123456");
}
std::this_thread::sleep_for(150ms);
@ -229,17 +229,16 @@ TEST(TwitchPubSubClient, ReceivedWhisper)
{
FTest pubSub("/receive-whisper", 1s);
pubSub.setAccountData("token", "123456");
pubSub.start();
ReceivedMessage<PubSubWhisperMessage> aReceivedWhisper;
pubSub.signals_.whisper.received.connect(
std::ignore = pubSub.whisper.received.connect(
[&aReceivedWhisper](const auto &whisperMessage) {
aReceivedWhisper = whisperMessage;
});
pubSub.listenToTopic("whispers.123456");
pubSub.listenToWhispers();
std::this_thread::sleep_for(150ms);
@ -266,19 +265,18 @@ TEST(TwitchPubSubClient, ModeratorActionsUserBanned)
{
FTest pubSub("/moderator-actions-user-banned", 1s);
pubSub.setAccountData("token", "123456");
pubSub.start();
ReceivedMessage<BanAction> received;
pubSub.signals_.moderation.userBanned.connect(
[&received](const auto &action) {
std::ignore =
pubSub.moderation.userBanned.connect([&received](const auto &action) {
received = action;
});
ASSERT_EQ(pubSub.diag.listenResponses, 0);
pubSub.listenToTopic("chat_moderator_actions.123456.123456");
pubSub.listenToChannelModerationActions("123456");
std::this_thread::sleep_for(50ms);
@ -308,12 +306,11 @@ TEST(TwitchPubSubClient, ModeratorActionsUserBanned)
TEST(TwitchPubSubClient, MissingToken)
{
// The token that's required is "xD"
FTest pubSub("/authentication-required", 1s);
FTest pubSub("/authentication-required", 1s, "");
// pubSub.setAccountData("", "123456");
pubSub.start();
pubSub.listenToTopic("chat_moderator_actions.123456.123456");
pubSub.listenToChannelModerationActions("123456");
std::this_thread::sleep_for(150ms);
@ -336,10 +333,9 @@ TEST(TwitchPubSubClient, WrongToken)
// The token that's required is "xD"
FTest pubSub("/authentication-required", 1s);
pubSub.setAccountData("wrongtoken", "123456");
pubSub.start();
pubSub.listenToTopic("chat_moderator_actions.123456.123456");
pubSub.listenToChannelModerationActions("123456");
std::this_thread::sleep_for(50ms);
@ -360,12 +356,11 @@ TEST(TwitchPubSubClient, WrongToken)
TEST(TwitchPubSubClient, CorrectToken)
{
// The token that's required is "xD"
FTest pubSub("/authentication-required", 1s);
FTest pubSub("/authentication-required", 1s, "xD");
pubSub.setAccountData("xD", "123456");
pubSub.start();
pubSub.listenToTopic("chat_moderator_actions.123456.123456");
pubSub.listenToChannelModerationActions("123456");
std::this_thread::sleep_for(50ms);
@ -387,19 +382,18 @@ TEST(TwitchPubSubClient, AutoModMessageHeld)
{
FTest pubSub("/automod-held", 1s);
pubSub.setAccountData("xD", "123456");
pubSub.start();
ReceivedMessage<PubSubAutoModQueueMessage> received;
ReceivedMessage<QString> channelID;
pubSub.signals_.moderation.autoModMessageCaught.connect(
std::ignore = pubSub.moderation.autoModMessageCaught.connect(
[&](const auto &msg, const QString &incomingChannelID) {
received = msg;
channelID = incomingChannelID;
});
pubSub.listenToTopic("automod-queue.117166826.117166826");
pubSub.listenToAutomod("117166826");
std::this_thread::sleep_for(50ms);