PubSub system (#346)

* Add websocketpp dependency

* Initial pubsub commit

Renamed selection min and max variables to selectionMin and selectionMax
to bypass windows min/max macros being stupid.

TwitchAccount is now initialized with its User ID. It cannot be changed
after it has been initialized.

* Update openssl folder

* Update installation instructions

* Split up websocketpp dependency to its own code only and openssl.pri

* Add missing include to asio steady_timer

* Update dependencies for linux
This commit is contained in:
pajlada 2018-04-15 15:09:31 +02:00 committed by GitHub
parent d5097e71a3
commit 23cf8cc484
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
33 changed files with 1502 additions and 68 deletions

4
.gitmodules vendored
View file

@ -13,3 +13,7 @@
[submodule "lib/rapidjson"] [submodule "lib/rapidjson"]
path = lib/rapidjson path = lib/rapidjson
url = https://github.com/Tencent/rapidjson.git url = https://github.com/Tencent/rapidjson.git
[submodule "lib/websocketpp"]
path = lib/websocketpp
url = https://github.com/zaphoyd/websocketpp.git
branch = develop

View file

@ -16,10 +16,36 @@ Before building run `git submodule update --init --recursive` to get required su
#### Using Qt Creator #### Using Qt Creator
##### Visual Studio 2017 ##### Visual Studio 2017
Install Visual Studio 2017 and select "Desktop development with C++" and "Universal Windows Platform development. Install Visual Studio 2017 and select "Desktop development with C++" and "Universal Windows Platform development.
download the [boost library](https://sourceforge.net/projects/boost/files/boost/1.63.0/boost_1_63_0.zip/download) and extract it to `C:\local\boost`
##### open-ssl ###### Boost
1. download binaries for OpenSSL >= 1.0.2 or compile it from source. [example download](https://indy.fulgan.com/SSL/) Visual Studio 2017 64-bit: https://dl.bintray.com/boostorg/release/1.66.0/binaries/boost_1_66_0-msvc-14.1-64.exe
2. Place `libeay32.dll` and `ssleay32.dll` from OpenSSL in a directory in PATH. When prompted, install boost to C:\local\boost
When the installation is finished, go to C:\local\boost and rename the "lib64-msvc-14.1" folder to "lib"
###### OpenSSL
For our websocket library, we need OpenSSL 1.1
Download OpenSSL development library: https://slproweb.com/download/Win64OpenSSL-1_1_0h.exe
When prompted, install openssl to C:\local\openssl
When prompted, copy the OpenSSL DLLs to "The OpenSSL binaries (/bin) directory"
For Qt SSL, we need OpenSSL 1.0
Download OpenSSL light: https://slproweb.com/download/Win64OpenSSL_Light-1_0_2o.exe
When prompted, install it anywhere
When prompted, copy the OpenSSL DLLS to "The OpenSSL binaries (/bin) directory"
Copy the OpenSSL 1.0 files from its /bin folder to C:/local/bin (You will need to create the folder)
Then copy the OpenSSL 1.1 files from its /bin folder to C:/local/bin (Overwrite any duplicate files)
Add C:/local/bin to your path folder (Follow guide here if you don't know how to do it: https://www.computerhope.com/issues/ch000549.htm#windows8 )
###### Qt
Download Qt: https://www.qt.io/download
Select "Open source" at the bottom of this page
Then select "Download"
When prompted which components to install:
- Under the latest Qt version:
- Select MSVC 2017 64-bit (or MSVC 2015 64-bit if you still use Visual Studio 2015)
- Optionally, enable Qt WebEngine
- Under Tools:
- Select Qt Creator, and Qt Creator CDB Debugger Support
#### Using MSYS2 #### Using MSYS2
Building using MSYS2 can be quite easier process. Check out MSYS2 at [msys2.org](http://www.msys2.org/). Building using MSYS2 can be quite easier process. Check out MSYS2 at [msys2.org](http://www.msys2.org/).

View file

@ -34,6 +34,8 @@ include(dependencies/humanize.pri)
include(dependencies/fmt.pri) include(dependencies/fmt.pri)
DEFINES += IRC_NAMESPACE=Communi DEFINES += IRC_NAMESPACE=Communi
include(dependencies/libcommuni.pri) include(dependencies/libcommuni.pri)
include(dependencies/websocketpp.pri)
include(dependencies/openssl.pri)
include(dependencies/boost.pri) include(dependencies/boost.pri)
# Optional feature: QtWebEngine # Optional feature: QtWebEngine
@ -172,7 +174,11 @@ SOURCES += \
src/widgets/helper/signallabel.cpp \ src/widgets/helper/signallabel.cpp \
src/widgets/helper/debugpopup.cpp \ src/widgets/helper/debugpopup.cpp \
src/util/debugcount.cpp \ src/util/debugcount.cpp \
src/singletons/nativemessagingmanager.cpp src/singletons/nativemessagingmanager.cpp \
src/singletons/pubsubmanager.cpp \
src/util/rapidjson-helpers.cpp \
src/singletons/helper/pubsubhelpers.cpp \
src/singletons/helper/pubsubactions.cpp
HEADERS += \ HEADERS += \
src/precompiled_header.hpp \ src/precompiled_header.hpp \
@ -289,7 +295,11 @@ HEADERS += \
src/widgets/helper/debugpopup.hpp \ src/widgets/helper/debugpopup.hpp \
src/version.hpp \ src/version.hpp \
src/singletons/settingsmanager.hpp \ src/singletons/settingsmanager.hpp \
src/singletons/nativemessagingmanager.hpp src/singletons/nativemessagingmanager.hpp \
src/singletons/pubsubmanager.hpp \
src/util/rapidjson-helpers.hpp \
src/singletons/helper/pubsubhelpers.hpp \
src/singletons/helper/pubsubactions.hpp
RESOURCES += \ RESOURCES += \
resources/resources.qrc resources/resources.qrc

View file

@ -16,4 +16,6 @@ win32 {
} }
LIBS += -L$$BOOST_DIRECTORY\\$$BOOST_LIB_SUFFIX LIBS += -L$$BOOST_DIRECTORY\\$$BOOST_LIB_SUFFIX
} else {
LIBS += -lboost_system
} }

12
dependencies/openssl.pri vendored Normal file
View file

@ -0,0 +1,12 @@
win32 {
INCLUDEPATH += C:/local/openssl/include
LIBS += -LC:\local\openssl\lib
LIBS += -llibssl
LIBS += -llibcrypto
} else {
PKGCONFIG += openssl
LIBS += -lssl -lcrypto
}

1
dependencies/websocketpp.pri vendored Normal file
View file

@ -0,0 +1 @@
INCLUDEPATH += $$PWD/../lib/websocketpp

@ -1 +1 @@
Subproject commit 2fa3adf42da988dc2a34b9b625654aa08e906d4f Subproject commit ad31b38866d80a17ced902476ed06da69edce3a0

@ -1 +1 @@
Subproject commit b8e560a3b8eaeb1607927b87306484a27b315e76 Subproject commit 3f6645c615ff7bf412c05fe322e589cbdd34ff9b

1
lib/websocketpp Submodule

@ -0,0 +1 @@
Subproject commit 19cad9925f83d15d7487c16f0491f4741ec9f674

View file

@ -5,6 +5,7 @@
#include "singletons/emotemanager.hpp" #include "singletons/emotemanager.hpp"
#include "singletons/loggingmanager.hpp" #include "singletons/loggingmanager.hpp"
#include "singletons/nativemessagingmanager.hpp" #include "singletons/nativemessagingmanager.hpp"
#include "singletons/pubsubmanager.hpp"
#include "singletons/settingsmanager.hpp" #include "singletons/settingsmanager.hpp"
#include "singletons/thememanager.hpp" #include "singletons/thememanager.hpp"
#include "singletons/windowmanager.hpp" #include "singletons/windowmanager.hpp"
@ -38,6 +39,63 @@ Application::Application()
singletons::SettingManager::getInstance().updateWordTypeMask(); singletons::SettingManager::getInstance().updateWordTypeMask();
singletons::NativeMessagingManager::getInstance().openGuiMessageQueue(); singletons::NativeMessagingManager::getInstance().openGuiMessageQueue();
auto &pubsub = singletons::PubSubManager::getInstance();
pubsub.sig.whisper.sent.connect([](const auto &msg) {
debug::Log("WHISPER SENT LOL"); //
});
pubsub.sig.whisper.received.connect([](const auto &msg) {
debug::Log("WHISPER RECEIVED LOL"); //
});
pubsub.sig.moderation.chatCleared.connect([&](const auto &action) {
debug::Log("Chat cleared by {}", action.source.name); //
});
pubsub.sig.moderation.modeChanged.connect([&](const auto &action) {
debug::Log("Mode {} was turned {} by {} (duration {})", (int &)action.mode,
(bool &)action.state, action.source.name, action.args.duration);
});
pubsub.sig.moderation.moderationStateChanged.connect([&](const auto &action) {
debug::Log("User {} was {} by {}", action.target.id, action.modded ? "modded" : "unmodded",
action.source.name);
});
pubsub.sig.moderation.userTimedOut.connect([&](const auto &action) {
debug::Log("User {}({}) was timed out by {} for {} seconds with reason: '{}'",
action.target.name, action.target.id, action.source.name, action.duration,
action.reason);
});
pubsub.sig.moderation.userBanned.connect([&](const auto &action) {
debug::Log("User {}({}) was banned by {} with reason: '{}'", action.target.name,
action.target.id, action.source.name, action.reason);
});
pubsub.sig.moderation.userUnbanned.connect([&](const auto &action) {
debug::Log(
"User {}({}) was unbanned by {}. User was previously {}", action.target.name,
action.target.id, action.source.name,
action.previousState == singletons::UnbanAction::Banned ? "banned" : "timed out");
});
auto &accountManager = singletons::AccountManager::getInstance();
pubsub.Start();
auto RequestModerationActions = [&]() {
pubsub.UnlistenAllModerationActions();
// TODO(pajlada): Unlisten to all authed topics instead of only moderation topics
// pubsub.UnlistenAllAuthedTopics();
pubsub.ListenToWhispers(singletons::AccountManager::getInstance().Twitch.getCurrent()); //
};
accountManager.Twitch.userChanged.connect(RequestModerationActions);
RequestModerationActions();
} }
Application::~Application() Application::~Application()

View file

@ -203,12 +203,12 @@ void MessageLayoutContainer::paintSelection(QPainter &painter, int messageIndex,
QColor selectionColor = themeManager.messages.selection; QColor selectionColor = themeManager.messages.selection;
// don't draw anything // don't draw anything
if (selection.min.messageIndex > messageIndex || selection.max.messageIndex < messageIndex) { if (selection.selectionMin.messageIndex > messageIndex || selection.selectionMax.messageIndex < messageIndex) {
return; return;
} }
// fully selected // fully selected
if (selection.min.messageIndex < messageIndex && selection.max.messageIndex > messageIndex) { if (selection.selectionMin.messageIndex < messageIndex && selection.selectionMax.messageIndex > messageIndex) {
for (Line &line : this->lines) { for (Line &line : this->lines) {
QRect rect = line.rect; QRect rect = line.rect;
@ -226,7 +226,7 @@ void MessageLayoutContainer::paintSelection(QPainter &painter, int messageIndex,
int index = 0; int index = 0;
// start in this message // start in this message
if (selection.min.messageIndex == messageIndex) { if (selection.selectionMin.messageIndex == messageIndex) {
for (; lineIndex < this->lines.size(); lineIndex++) { for (; lineIndex < this->lines.size(); lineIndex++) {
Line &line = this->lines[lineIndex]; Line &line = this->lines[lineIndex];
index = line.startCharIndex; index = line.startCharIndex;
@ -236,27 +236,27 @@ void MessageLayoutContainer::paintSelection(QPainter &painter, int messageIndex,
int x = this->elements[line.startIndex]->getRect().left(); int x = this->elements[line.startIndex]->getRect().left();
int r = this->elements[line.endIndex - 1]->getRect().right(); int r = this->elements[line.endIndex - 1]->getRect().right();
if (line.endCharIndex < selection.min.charIndex) { if (line.endCharIndex < selection.selectionMin.charIndex) {
continue; continue;
} }
for (int i = line.startIndex; i < line.endIndex; i++) { for (int i = line.startIndex; i < line.endIndex; i++) {
int c = this->elements[i]->getSelectionIndexCount(); int c = this->elements[i]->getSelectionIndexCount();
if (index + c > selection.min.charIndex) { if (index + c > selection.selectionMin.charIndex) {
x = this->elements[i]->getXFromIndex(selection.min.charIndex - index); x = this->elements[i]->getXFromIndex(selection.selectionMin.charIndex - index);
// ends in same line // ends in same line
if (selection.max.messageIndex == messageIndex && if (selection.selectionMax.messageIndex == messageIndex &&
line.endCharIndex > /*=*/selection.max.charIndex) // line.endCharIndex > /*=*/selection.selectionMax.charIndex) //
{ {
returnAfter = true; returnAfter = true;
index = line.startCharIndex; index = line.startCharIndex;
for (int i = line.startIndex; i < line.endIndex; i++) { for (int i = line.startIndex; i < line.endIndex; i++) {
int c = this->elements[i]->getSelectionIndexCount(); int c = this->elements[i]->getSelectionIndexCount();
if (index + c > selection.max.charIndex) { if (index + c > selection.selectionMax.charIndex) {
r = this->elements[i]->getXFromIndex(selection.max.charIndex - r = this->elements[i]->getXFromIndex(selection.selectionMax.charIndex -
index); index);
break; break;
} }
@ -265,7 +265,7 @@ void MessageLayoutContainer::paintSelection(QPainter &painter, int messageIndex,
} }
// ends in same line end // ends in same line end
if (selection.max.messageIndex != messageIndex) { if (selection.selectionMax.messageIndex != messageIndex) {
int lineIndex2 = lineIndex + 1; int lineIndex2 = lineIndex + 1;
for (; lineIndex2 < this->lines.size(); lineIndex2++) { for (; lineIndex2 < this->lines.size(); lineIndex2++) {
Line &line = this->lines[lineIndex2]; Line &line = this->lines[lineIndex2];
@ -314,7 +314,7 @@ void MessageLayoutContainer::paintSelection(QPainter &painter, int messageIndex,
index = line.startCharIndex; index = line.startCharIndex;
// just draw the garbage // just draw the garbage
if (line.endCharIndex < /*=*/selection.max.charIndex) { if (line.endCharIndex < /*=*/selection.selectionMax.charIndex) {
QRect rect = line.rect; QRect rect = line.rect;
rect.setTop(std::max(0, rect.top()) + yOffset); rect.setTop(std::max(0, rect.top()) + yOffset);
@ -331,8 +331,8 @@ void MessageLayoutContainer::paintSelection(QPainter &painter, int messageIndex,
for (int i = line.startIndex; i < line.endIndex; i++) { for (int i = line.startIndex; i < line.endIndex; i++) {
int c = this->elements[i]->getSelectionIndexCount(); int c = this->elements[i]->getSelectionIndexCount();
if (index + c > selection.max.charIndex) { if (index + c > selection.selectionMax.charIndex) {
r = this->elements[i]->getXFromIndex(selection.max.charIndex - index); r = this->elements[i]->getXFromIndex(selection.selectionMax.charIndex - index);
break; break;
} }

View file

@ -47,19 +47,19 @@ struct SelectionItem {
struct Selection { struct Selection {
SelectionItem start; SelectionItem start;
SelectionItem end; SelectionItem end;
SelectionItem min; SelectionItem selectionMin;
SelectionItem max; SelectionItem selectionMax;
Selection() = default; Selection() = default;
Selection(const SelectionItem &start, const SelectionItem &end) Selection(const SelectionItem &start, const SelectionItem &end)
: start(start) : start(start)
, end(end) , end(end)
, min(start) , selectionMin(start)
, max(end) , selectionMax(end)
{ {
if (min > max) { if (selectionMin > selectionMax) {
std::swap(this->min, this->max); std::swap(this->selectionMin, this->selectionMax);
} }
} }
@ -70,7 +70,7 @@ struct Selection {
bool isSingleMessage() const bool isSingleMessage() const
{ {
return this->min.messageIndex == this->max.messageIndex; return this->selectionMin.messageIndex == this->selectionMax.messageIndex;
} }
}; };

View file

@ -6,10 +6,11 @@ namespace providers {
namespace twitch { namespace twitch {
TwitchAccount::TwitchAccount(const QString &_username, const QString &_oauthToken, TwitchAccount::TwitchAccount(const QString &_username, const QString &_oauthToken,
const QString &_oauthClient) const QString &_oauthClient, const QString &_userID)
: oauthClient(_oauthClient) : oauthClient(_oauthClient)
, oauthToken(_oauthToken) , oauthToken(_oauthToken)
, userName(_username) , userName(_username)
, userId(_userID)
, _isAnon(_username == ANONYMOUS_USERNAME) , _isAnon(_username == ANONYMOUS_USERNAME)
{ {
} }
@ -34,11 +35,6 @@ const QString &TwitchAccount::getUserId() const
return this->userId; return this->userId;
} }
void TwitchAccount::setUserId(const QString &id)
{
this->userId = id;
}
bool TwitchAccount::setOAuthClient(const QString &newClientID) bool TwitchAccount::setOAuthClient(const QString &newClientID)
{ {
if (this->oauthClient.compare(newClientID) == 0) { if (this->oauthClient.compare(newClientID) == 0) {

View file

@ -10,14 +10,14 @@ namespace twitch {
class TwitchAccount class TwitchAccount
{ {
public: public:
TwitchAccount(const QString &username, const QString &oauthToken, const QString &oauthClient); TwitchAccount(const QString &username, const QString &oauthToken, const QString &oauthClient,
const QString &_userID);
const QString &getUserName() const; const QString &getUserName() const;
const QString &getOAuthToken() const; const QString &getOAuthToken() const;
const QString &getOAuthClient() const; const QString &getOAuthClient() const;
const QString &getUserId() const; const QString &getUserId() const;
void setUserId(const QString &id);
// Attempts to update the users OAuth Client ID // Attempts to update the users OAuth Client ID
// Returns true if the value has changed, otherwise false // Returns true if the value has changed, otherwise false
@ -34,8 +34,8 @@ public:
private: private:
QString oauthClient; QString oauthClient;
QString oauthToken; QString oauthToken;
QString userId;
QString userName; QString userName;
QString userId;
const bool _isAnon; const bool _isAnon;
}; };

View file

@ -10,7 +10,7 @@ namespace twitch {
TwitchAccountManager::TwitchAccountManager() TwitchAccountManager::TwitchAccountManager()
{ {
this->anonymousUser.reset(new TwitchAccount(ANONYMOUS_USERNAME, "", "")); this->anonymousUser.reset(new TwitchAccount(ANONYMOUS_USERNAME, "", "", ""));
this->currentUsername.connect([this](const auto &newValue, auto) { this->currentUsername.connect([this](const auto &newValue, auto) {
QString newUsername(QString::fromStdString(newValue)); QString newUsername(QString::fromStdString(newValue));
@ -175,11 +175,8 @@ TwitchAccountManager::AddUserResponse TwitchAccountManager::addUser(
} }
} }
auto newUser = auto newUser = std::make_shared<TwitchAccount>(userData.username, userData.oauthToken,
std::make_shared<TwitchAccount>(userData.username, userData.oauthToken, userData.clientID); userData.clientID, userData.userID);
// Set users User ID without the uid prefix
newUser->setUserId(userData.userID);
std::lock_guard<std::mutex> lock(this->mutex); std::lock_guard<std::mutex> lock(this->mutex);

View file

@ -3,9 +3,12 @@
#include "debug/log.hpp" #include "debug/log.hpp"
#include "messages/message.hpp" #include "messages/message.hpp"
#include "providers/twitch/twitchmessagebuilder.hpp" #include "providers/twitch/twitchmessagebuilder.hpp"
#include "singletons/accountmanager.hpp"
#include "singletons/emotemanager.hpp" #include "singletons/emotemanager.hpp"
#include "singletons/ircmanager.hpp" #include "singletons/ircmanager.hpp"
#include "singletons/pubsubmanager.hpp"
#include "singletons/settingsmanager.hpp" #include "singletons/settingsmanager.hpp"
#include "util/posttothread.hpp"
#include "util/urlfetch.hpp" #include "util/urlfetch.hpp"
#include <IrcConnection> #include <IrcConnection>
@ -40,6 +43,32 @@ TwitchChannel::TwitchChannel(const QString &channelName, Communi::IrcConnection
this->refreshLiveStatus(); // this->refreshLiveStatus(); //
}); });
this->managedConnect(singletons::AccountManager::getInstance().Twitch.userChanged,
[this]() { this->setMod(false); });
auto refreshPubSubState = [this]() {
const auto &x = this;
if (!this->hasModRights()) {
return;
}
if (this->roomID.isEmpty()) {
return;
}
auto account = singletons::AccountManager::getInstance().Twitch.getCurrent();
if (account && !account->getUserId().isEmpty()) {
singletons::PubSubManager::getInstance().ListenToChannelModerationActions(this->roomID,
account);
}
};
this->userStateChanged.connect(refreshPubSubState);
this->roomIDchanged.connect(refreshPubSubState);
this->managedConnect(singletons::AccountManager::getInstance().Twitch.userChanged,
refreshPubSubState);
refreshPubSubState();
this->fetchMessages.connect([this] { this->fetchMessages.connect([this] {
this->fetchRecentMessages(); // this->fetchRecentMessages(); //
}); });

View file

@ -8,6 +8,8 @@
#include "singletons/ircmanager.hpp" #include "singletons/ircmanager.hpp"
#include "util/concurrentmap.hpp" #include "util/concurrentmap.hpp"
#include <pajlada/signals/signalholder.hpp>
#include <mutex> #include <mutex>
namespace chatterino { namespace chatterino {
@ -16,7 +18,7 @@ namespace twitch {
class TwitchServer; class TwitchServer;
class TwitchChannel final : public Channel class TwitchChannel final : public Channel, pajlada::Signals::SignalHolder
{ {
QTimer *liveStatusTimer; QTimer *liveStatusTimer;
QTimer *chattersListTimer; QTimer *chattersListTimer;
@ -31,6 +33,11 @@ public:
QString uptime; QString uptime;
}; };
struct UserState {
bool mod;
bool broadcaster;
};
~TwitchChannel() final; ~TwitchChannel() final;
void reloadChannelEmotes(); void reloadChannelEmotes();
@ -88,6 +95,9 @@ private:
mutable std::mutex streamStatusMutex; mutable std::mutex streamStatusMutex;
StreamStatus streamStatus; StreamStatus streamStatus;
mutable std::mutex userStateMutex;
UserState userState;
void fetchRecentMessages(); void fetchRecentMessages();
bool mod; bool mod;

View file

@ -606,7 +606,7 @@ void TwitchMessageBuilder::appendTwitchBadges()
} }
} else if (badge.startsWith("subscriber/")) { } else if (badge.startsWith("subscriber/")) {
if (channelResources.loaded == false) { if (channelResources.loaded == false) {
qDebug() << "Channel resources are not loaded, can't add the subscriber badge"; // qDebug() << "Channel resources are not loaded, can't add the subscriber badge";
continue; continue;
} }

View file

@ -1,11 +1,11 @@
#pragma once #pragma once
#include <memory>
#include "providers/irc/abstractircserver.hpp" #include "providers/irc/abstractircserver.hpp"
#include "providers/twitch/twitchaccount.hpp" #include "providers/twitch/twitchaccount.hpp"
#include "providers/twitch/twitchchannel.hpp" #include "providers/twitch/twitchchannel.hpp"
#include <memory>
namespace chatterino { namespace chatterino {
namespace providers { namespace providers {
namespace twitch { namespace twitch {

View file

@ -17,10 +17,6 @@ inline QString getEnvString(const char *target)
} // namespace } // namespace
AccountManager::AccountManager()
{
}
AccountManager &AccountManager::getInstance() AccountManager &AccountManager::getInstance()
{ {
static AccountManager instance; static AccountManager instance;

View file

@ -7,7 +7,7 @@ namespace singletons {
class AccountManager class AccountManager
{ {
AccountManager(); AccountManager() = default;
public: public:
static AccountManager &getInstance(); static AccountManager &getInstance();

View file

@ -114,7 +114,6 @@ void EmoteManager::reloadBTTVChannelEmotes(const QString &channelName,
req.setCaller(QThread::currentThread()); req.setCaller(QThread::currentThread());
req.setTimeout(3000); req.setTimeout(3000);
req.getJSON([this, channelName, _map](QJsonObject &rootNode) { req.getJSON([this, channelName, _map](QJsonObject &rootNode) {
debug::Log("Got bttv channel emotes for {}", channelName);
auto map = _map.lock(); auto map = _map.lock();
if (_map.expired()) { if (_map.expired()) {
@ -470,7 +469,6 @@ void EmoteManager::loadBTTVEmotes()
req.setTimeout(30000); req.setTimeout(30000);
req.setUseQuickLoadCache(true); req.setUseQuickLoadCache(true);
req.getJSON([this](QJsonObject &root) { req.getJSON([this](QJsonObject &root) {
debug::Log("Got global bttv emotes");
auto emotes = root.value("emotes").toArray(); auto emotes = root.value("emotes").toArray();
QString urlTemplate = "https:" + root.value("urlTemplate").toString(); QString urlTemplate = "https:" + root.value("urlTemplate").toString();
@ -504,8 +502,6 @@ void EmoteManager::loadFFZEmotes()
req.setCaller(QThread::currentThread()); req.setCaller(QThread::currentThread());
req.setTimeout(30000); req.setTimeout(30000);
req.getJSON([this](QJsonObject &root) { req.getJSON([this](QJsonObject &root) {
debug::Log("Got global ffz emotes");
auto sets = root.value("sets").toObject(); auto sets = root.value("sets").toObject();
std::vector<std::string> codes; std::vector<std::string> codes;

View file

@ -0,0 +1,15 @@
#include "singletons/helper/pubsubactions.hpp"
#include "singletons/helper/pubsubhelpers.hpp"
namespace chatterino {
namespace singletons {
PubSubAction::PubSubAction(const rapidjson::Value &data)
: timestamp(std::chrono::steady_clock::now())
{
getCreatedByUser(data, this->source);
}
} // namespace singletons
} // namespace chatterino

View file

@ -0,0 +1,109 @@
#pragma once
#include <rapidjson/document.h>
#include <QString>
#include <chrono>
#include <cinttypes>
namespace chatterino {
namespace singletons {
struct ActionUser {
QString id;
QString name;
};
struct PubSubAction {
PubSubAction(const rapidjson::Value &data);
ActionUser source;
std::chrono::steady_clock::time_point timestamp;
};
// Used when a chat mode (i.e. slowmode, subscribers only mode) is enabled or disabled
struct ModeChangedAction : PubSubAction {
ModeChangedAction(const rapidjson::Value &data)
: PubSubAction(data)
{
}
enum Mode {
Unknown,
Slow,
R9K,
SubscribersOnly,
EmoteOnly,
} mode;
// Whether the mode was turned on or off
enum State {
Off,
On,
} state;
union {
uint32_t duration;
} args;
};
struct TimeoutAction : PubSubAction {
TimeoutAction(const rapidjson::Value &data)
: PubSubAction(data)
{
}
ActionUser target;
QString reason;
uint32_t duration;
};
struct BanAction : PubSubAction {
BanAction(const rapidjson::Value &data)
: PubSubAction(data)
{
}
ActionUser target;
QString reason;
};
struct UnbanAction : PubSubAction {
UnbanAction(const rapidjson::Value &data)
: PubSubAction(data)
{
}
ActionUser target;
enum {
Banned,
TimedOut,
} previousState;
};
struct ClearChatAction : PubSubAction {
ClearChatAction(const rapidjson::Value &data)
: PubSubAction(data)
{
}
};
struct ModerationStateAction : PubSubAction {
ModerationStateAction(const rapidjson::Value &data)
: PubSubAction(data)
{
}
ActionUser target;
// true = modded
// false = unmodded
bool modded;
};
} // namespace singletons
} // namespace chatterino

View file

@ -0,0 +1,91 @@
#include "singletons/helper/pubsubhelpers.hpp"
#include "singletons/accountmanager.hpp"
#include "singletons/helper/pubsubactions.hpp"
#include "util/rapidjson-helpers.hpp"
namespace chatterino {
namespace singletons {
const rapidjson::Value &getArgs(const rapidjson::Value &data)
{
if (!data.HasMember("args")) {
throw std::runtime_error("Missing member args");
}
const auto &args = data["args"];
if (!args.IsArray()) {
throw std::runtime_error("args must be an array");
}
return args;
}
bool getCreatedByUser(const rapidjson::Value &data, ActionUser &user)
{
return rj::getSafe(data, "created_by", user.name) &&
rj::getSafe(data, "created_by_user_id", user.id);
}
bool getTargetUser(const rapidjson::Value &data, ActionUser &user)
{
return rj::getSafe(data, "target_user_id", user.id);
}
std::string Stringify(const rapidjson::Value &v)
{
return pajlada::Settings::SettingManager::stringify(v);
}
rapidjson::Document CreateListenMessage(const std::vector<std::string> &topicsVec,
std::shared_ptr<providers::twitch::TwitchAccount> account)
{
rapidjson::Document msg(rapidjson::kObjectType);
auto &a = msg.GetAllocator();
rj::set(msg, "type", "LISTEN");
rapidjson::Value data(rapidjson::kObjectType);
if (account) {
rj::set(data, "auth_token", account->getOAuthToken(), a);
}
rapidjson::Value topics(rapidjson::kArrayType);
for (const auto &topic : topicsVec) {
rj::add(topics, topic, a);
}
rj::set(data, "topics", topics, a);
rj::set(msg, "data", data);
return msg;
}
rapidjson::Document CreateUnlistenMessage(const std::vector<std::string> &topicsVec)
{
rapidjson::Document msg(rapidjson::kObjectType);
auto &a = msg.GetAllocator();
rj::set(msg, "type", "UNLISTEN");
auto &accountManager = AccountManager::getInstance();
rapidjson::Value data(rapidjson::kObjectType);
rapidjson::Value topics(rapidjson::kArrayType);
for (const auto &topic : topicsVec) {
rj::add(topics, topic, a);
}
rj::set(data, "topics", topics, a);
rj::set(msg, "data", data);
return msg;
}
} // namespace singletons
} // namespace chatterino

View file

@ -0,0 +1,63 @@
#pragma once
#include "debug/log.hpp"
#include "providers/twitch/twitchaccount.hpp"
#include "util/rapidjson-helpers.hpp"
#include <boost/asio.hpp>
#include <boost/asio/steady_timer.hpp>
#include <memory>
namespace chatterino {
namespace singletons {
struct ActionUser;
const rapidjson::Value &getArgs(const rapidjson::Value &data);
bool getCreatedByUser(const rapidjson::Value &data, ActionUser &user);
bool getTargetUser(const rapidjson::Value &data, ActionUser &user);
std::string Stringify(const rapidjson::Value &v);
rapidjson::Document CreateListenMessage(const std::vector<std::string> &topicsVec,
std::shared_ptr<providers::twitch::TwitchAccount> account);
rapidjson::Document CreateUnlistenMessage(const std::vector<std::string> &topicsVec);
// Create timer using given ioService
template <typename Duration, typename Callback>
void RunAfter(boost::asio::io_service &ioService, Duration duration, Callback cb)
{
auto timer = std::make_shared<boost::asio::steady_timer>(ioService);
timer->expires_from_now(duration);
timer->async_wait([timer, cb](const boost::system::error_code &ec) {
if (ec) {
debug::Log("Error in RunAfter: {}", ec.message());
return;
}
cb(timer);
});
}
// Use provided timer
template <typename Duration, typename Callback>
void RunAfter(std::shared_ptr<boost::asio::steady_timer> timer, Duration duration, Callback cb)
{
timer->expires_from_now(duration);
timer->async_wait([timer, cb](const boost::system::error_code &ec) {
if (ec) {
debug::Log("Error in RunAfter: {}", ec.message());
return;
}
cb(timer);
});
}
} // namespace singletons
} // namespace chatterino

View file

@ -0,0 +1,746 @@
#include "singletons/pubsubmanager.hpp"
#include "debug/log.hpp"
#include "singletons/accountmanager.hpp"
#include "singletons/helper/pubsubactions.hpp"
#include "singletons/helper/pubsubhelpers.hpp"
#include "util/rapidjson-helpers.hpp"
#include <rapidjson/error/en.h>
#include <exception>
#include <thread>
#define TWITCH_PUBSUB_URL "wss://pubsub-edge.twitch.tv"
using websocketpp::lib::bind;
using websocketpp::lib::placeholders::_1;
using websocketpp::lib::placeholders::_2;
namespace chatterino {
namespace singletons {
static const char *pingPayload = "{\"type\":\"PING\"}";
static std::map<std::string, std::string> sentMessages;
PubSubClient::PubSubClient(WebsocketClient &_websocketClient, WebsocketHandle _handle)
: websocketClient(_websocketClient)
, handle(_handle)
{
}
void PubSubClient::Start()
{
assert(!this->started);
this->started = true;
this->Ping();
}
void PubSubClient::Stop()
{
assert(this->started);
this->started = false;
}
bool PubSubClient::Listen(rapidjson::Document &message)
{
int numRequestedListens = message["data"]["topics"].Size();
if (this->numListens + numRequestedListens > MAX_PUBSUB_LISTENS) {
// This PubSubClient is already at its peak listens
return false;
}
this->numListens += numRequestedListens;
for (const auto &topic : message["data"]["topics"].GetArray()) {
this->listeners.emplace_back(Listener{topic.GetString(), false, false, false});
}
auto uuid = CreateUUID();
rj::set(message, "nonce", uuid);
std::string payload = Stringify(message);
sentMessages[uuid.toStdString()] = payload;
this->Send(payload.c_str());
return true;
}
void PubSubClient::UnlistenPrefix(const std::string &prefix)
{
std::vector<std::string> topics;
for (auto it = this->listeners.begin(); it != this->listeners.end();) {
const auto &listener = *it;
if (listener.topic.find(prefix) == 0) {
topics.push_back(listener.topic);
it = this->listeners.erase(it);
} else {
++it;
}
}
if (topics.empty()) {
return;
}
auto message = CreateUnlistenMessage(topics);
auto uuid = CreateUUID();
rj::set(message, "nonce", CreateUUID());
std::string payload = Stringify(message);
sentMessages[uuid.toStdString()] = payload;
this->Send(payload.c_str());
}
void PubSubClient::HandlePong()
{
assert(this->awaitingPong);
debug::Log("Got pong!");
this->awaitingPong = false;
}
bool PubSubClient::isListeningToTopic(const std::string &payload)
{
for (const auto &listener : this->listeners) {
if (listener.topic == payload) {
return true;
}
}
return false;
}
void PubSubClient::Ping()
{
assert(this->started);
if (!this->Send(pingPayload)) {
return;
}
this->awaitingPong = true;
auto self = this->shared_from_this();
RunAfter(this->websocketClient.get_io_service(), std::chrono::seconds(15), [self](auto timer) {
if (!self->started) {
return;
}
if (self->awaitingPong) {
debug::Log("No pong respnose, disconnect!");
// TODO(pajlada): Label this connection as "disconnect me"
}
});
RunAfter(this->websocketClient.get_io_service(), std::chrono::minutes(5), [self](auto timer) {
if (!self->started) {
return;
}
self->Ping(); //
});
}
bool PubSubClient::Send(const char *payload)
{
WebsocketErrorCode ec;
this->websocketClient.send(this->handle, payload, websocketpp::frame::opcode::text, ec);
if (ec) {
debug::Log("Error sending message {}: {}", payload, ec.message());
// TODO(pajlada): Check which error code happened and maybe gracefully handle it
return false;
}
return true;
}
PubSubManager::PubSubManager()
{
this->moderationActionHandlers["clear"] = [this](const auto &data) {
ClearChatAction action(data);
this->sig.moderation.chatCleared.invoke(action);
};
this->moderationActionHandlers["slowoff"] = [this](const auto &data) {
ModeChangedAction action(data);
action.mode = ModeChangedAction::Mode::Slow;
action.state = ModeChangedAction::State::Off;
this->sig.moderation.modeChanged.invoke(action);
};
this->moderationActionHandlers["slow"] = [this](const auto &data) {
ModeChangedAction action(data);
action.mode = ModeChangedAction::Mode::Slow;
action.state = ModeChangedAction::State::On;
if (!data.HasMember("args")) {
debug::Log("Missing required args member");
return;
}
const auto &args = data["args"];
if (!args.IsArray()) {
debug::Log("args member must be an array");
return;
}
if (args.Size() == 0) {
debug::Log("Missing duration argument in slowmode on");
return;
}
const auto &durationArg = args[0];
if (!durationArg.IsString()) {
debug::Log("Duration arg must be a string");
return;
}
bool ok;
action.args.duration = QString(durationArg.GetString()).toUInt(&ok, 10);
this->sig.moderation.modeChanged.invoke(action);
};
this->moderationActionHandlers["r9kbetaoff"] = [this](const auto &data) {
ModeChangedAction action(data);
action.mode = ModeChangedAction::Mode::R9K;
action.state = ModeChangedAction::State::Off;
this->sig.moderation.modeChanged.invoke(action);
};
this->moderationActionHandlers["r9kbeta"] = [this](const auto &data) {
ModeChangedAction action(data);
action.mode = ModeChangedAction::Mode::R9K;
action.state = ModeChangedAction::State::On;
this->sig.moderation.modeChanged.invoke(action);
};
this->moderationActionHandlers["subscribersoff"] = [this](const auto &data) {
ModeChangedAction action(data);
action.mode = ModeChangedAction::Mode::SubscribersOnly;
action.state = ModeChangedAction::State::Off;
this->sig.moderation.modeChanged.invoke(action);
};
this->moderationActionHandlers["subscribers"] = [this](const auto &data) {
ModeChangedAction action(data);
action.mode = ModeChangedAction::Mode::SubscribersOnly;
action.state = ModeChangedAction::State::On;
this->sig.moderation.modeChanged.invoke(action);
};
this->moderationActionHandlers["emoteonlyoff"] = [this](const auto &data) {
ModeChangedAction action(data);
action.mode = ModeChangedAction::Mode::EmoteOnly;
action.state = ModeChangedAction::State::Off;
this->sig.moderation.modeChanged.invoke(action);
};
this->moderationActionHandlers["emoteonly"] = [this](const auto &data) {
ModeChangedAction action(data);
action.mode = ModeChangedAction::Mode::EmoteOnly;
action.state = ModeChangedAction::State::On;
this->sig.moderation.modeChanged.invoke(action);
};
this->moderationActionHandlers["unmod"] = [this](const auto &data) {
ModerationStateAction action(data);
getTargetUser(data, action.target);
action.modded = false;
this->sig.moderation.moderationStateChanged.invoke(action);
};
this->moderationActionHandlers["mod"] = [this](const auto &data) {
ModerationStateAction action(data);
getTargetUser(data, action.target);
action.modded = true;
this->sig.moderation.moderationStateChanged.invoke(action);
};
this->moderationActionHandlers["timeout"] = [this](const auto &data) {
TimeoutAction action(data);
getCreatedByUser(data, action.source);
getTargetUser(data, action.target);
try {
const auto &args = getArgs(data);
if (args.Size() < 2) {
return;
}
if (!rj::getSafe(args[0], action.target.name)) {
return;
}
QString durationString;
if (!rj::getSafe(args[1], durationString)) {
return;
}
bool ok;
action.duration = durationString.toUInt(&ok, 10);
if (args.Size() >= 3) {
if (!rj::getSafe(args[2], action.reason)) {
return;
}
}
this->sig.moderation.userTimedOut.invoke(action);
} catch (const std::runtime_error &ex) {
debug::Log("Error parsing moderation action: {}", ex.what());
}
};
this->moderationActionHandlers["ban"] = [this](const auto &data) {
BanAction action(data);
getCreatedByUser(data, action.source);
getTargetUser(data, action.target);
try {
const auto &args = getArgs(data);
if (args.Size() < 1) {
return;
}
if (!rj::getSafe(args[0], action.target.name)) {
return;
}
if (args.Size() >= 2) {
if (!rj::getSafe(args[1], action.reason)) {
return;
}
}
this->sig.moderation.userBanned.invoke(action);
} catch (const std::runtime_error &ex) {
debug::Log("Error parsing moderation action: {}", ex.what());
}
};
this->moderationActionHandlers["unban"] = [this](const auto &data) {
UnbanAction action(data);
getCreatedByUser(data, action.source);
getTargetUser(data, action.target);
action.previousState = UnbanAction::Banned;
try {
const auto &args = getArgs(data);
if (args.Size() < 1) {
return;
}
if (!rj::getSafe(args[0], action.target.name)) {
return;
}
this->sig.moderation.userUnbanned.invoke(action);
} catch (const std::runtime_error &ex) {
debug::Log("Error parsing moderation action: {}", ex.what());
}
};
this->moderationActionHandlers["untimeout"] = [this](const auto &data) {
UnbanAction action(data);
getCreatedByUser(data, action.source);
getTargetUser(data, action.target);
action.previousState = UnbanAction::TimedOut;
try {
const auto &args = getArgs(data);
if (args.Size() < 1) {
return;
}
if (!rj::getSafe(args[0], action.target.name)) {
return;
}
this->sig.moderation.userUnbanned.invoke(action);
} catch (const std::runtime_error &ex) {
debug::Log("Error parsing moderation action: {}", ex.what());
}
};
this->websocketClient.set_access_channels(websocketpp::log::alevel::all);
this->websocketClient.clear_access_channels(websocketpp::log::alevel::frame_payload);
this->websocketClient.init_asio();
// SSL Handshake
this->websocketClient.set_tls_init_handler(bind(&PubSubManager::OnTLSInit, this, ::_1));
this->websocketClient.set_message_handler(bind(&PubSubManager::OnMessage, this, ::_1, ::_2));
this->websocketClient.set_open_handler(bind(&PubSubManager::OnConnectionOpen, this, ::_1));
this->websocketClient.set_close_handler(bind(&PubSubManager::OnConnectionClose, this, ::_1));
// Add an initial client
this->AddClient();
}
void PubSubManager::AddClient()
{
websocketpp::lib::error_code ec;
auto con = this->websocketClient.get_connection(TWITCH_PUBSUB_URL, ec);
if (ec) {
debug::Log("Unable to establish connection: {}", ec.message());
return;
}
this->websocketClient.connect(con);
}
PubSubManager &PubSubManager::getInstance()
{
static PubSubManager instance;
return instance;
}
void PubSubManager::Start()
{
this->mainThread.reset(new std::thread(std::bind(&PubSubManager::RunThread, this)));
}
void PubSubManager::ListenToWhispers(std::shared_ptr<providers::twitch::TwitchAccount> account)
{
assert(account != nullptr);
std::string userID = account->getUserId().toStdString();
debug::Log("Connection open!");
websocketpp::lib::error_code ec;
std::vector<std::string> topics({"whispers." + userID});
this->Listen(std::move(CreateListenMessage(topics, account)));
if (ec) {
debug::Log("Unable to send message to websocket server: {}", ec.message());
return;
}
}
void PubSubManager::UnlistenAllModerationActions()
{
for (const auto &p : this->clients) {
const auto &client = p.second;
client->UnlistenPrefix("chat_moderator_actions.");
}
}
void PubSubManager::ListenToChannelModerationActions(
const QString &channelID, std::shared_ptr<providers::twitch::TwitchAccount> account)
{
assert(!channelID.isEmpty());
assert(account != nullptr);
QString userID = account->getUserId();
assert(!userID.isEmpty());
std::string topic(fS("chat_moderator_actions.{}.{}", userID, channelID));
if (this->isListeningToTopic(topic)) {
debug::Log("We are already listening to topic {}", topic);
return;
}
debug::Log("Listen to topic {}", topic);
this->listenToTopic(topic, account);
}
void PubSubManager::listenToTopic(const std::string &topic,
std::shared_ptr<providers::twitch::TwitchAccount> account)
{
auto message = CreateListenMessage({topic}, account);
this->Listen(std::move(message));
}
void PubSubManager::Listen(rapidjson::Document &&msg)
{
if (this->TryListen(msg)) {
debug::Log("Successfully listened!");
return;
}
debug::Log("Added to the back of the queue");
this->requests.emplace_back(std::make_unique<rapidjson::Document>(std::move(msg)));
}
bool PubSubManager::TryListen(rapidjson::Document &msg)
{
debug::Log("TryListen with {} clients", this->clients.size());
for (const auto &p : this->clients) {
const auto &client = p.second;
if (client->Listen(msg)) {
return true;
}
}
return false;
}
bool PubSubManager::isListeningToTopic(const std::string &topic)
{
for (const auto &p : this->clients) {
const auto &client = p.second;
if (client->isListeningToTopic(topic)) {
return true;
}
}
return false;
}
void PubSubManager::OnMessage(websocketpp::connection_hdl hdl, WebsocketMessagePtr websocketMessage)
{
const std::string &payload = websocketMessage->get_payload();
rapidjson::Document msg;
rapidjson::ParseResult res = msg.Parse(payload.c_str());
if (!res) {
debug::Log("Error parsing message '{}' from PubSub: {}", payload,
rapidjson::GetParseError_En(res.Code()));
return;
}
if (!msg.IsObject()) {
debug::Log("Error parsing message '{}' from PubSub. Root object is not an object", payload);
return;
}
std::string type;
if (!rj::getSafe(msg, "type", type)) {
debug::Log("Missing required string member `type` in message root");
return;
}
if (type == "RESPONSE") {
this->HandleListenResponse(msg);
} else if (type == "MESSAGE") {
if (!msg.HasMember("data")) {
debug::Log("Missing required object member `data` in message root");
return;
}
const auto &data = msg["data"];
if (!data.IsObject()) {
debug::Log("Member `data` must be an object");
return;
}
this->HandleMessageResponse(data);
} else if (type == "PONG") {
auto clientIt = this->clients.find(hdl);
// If this assert goes off, there's something wrong with the connection creation/preserving
// code KKona
assert(clientIt != this->clients.end());
auto &client = *clientIt;
client.second->HandlePong();
} else {
debug::Log("Unknown message type: {}", type);
}
}
void PubSubManager::OnConnectionOpen(WebsocketHandle hdl)
{
auto client = std::make_shared<PubSubClient>(this->websocketClient, hdl);
// We separate the starting from the constructor because we will want to use shared_from_this
client->Start();
this->clients.emplace(hdl, client);
this->connected.invoke();
}
void PubSubManager::OnConnectionClose(WebsocketHandle hdl)
{
auto clientIt = this->clients.find(hdl);
// If this assert goes off, there's something wrong with the connection creation/preserving
// code KKona
assert(clientIt != this->clients.end());
auto &client = clientIt->second;
client->Stop();
this->clients.erase(clientIt);
this->connected.invoke();
}
PubSubManager::WebsocketContextPtr PubSubManager::OnTLSInit(websocketpp::connection_hdl hdl)
{
WebsocketContextPtr ctx(new boost::asio::ssl::context(boost::asio::ssl::context::tlsv1));
try {
ctx->set_options(boost::asio::ssl::context::default_workarounds |
boost::asio::ssl::context::no_sslv2 |
boost::asio::ssl::context::single_dh_use);
} catch (const std::exception &e) {
debug::Log("Exception caught in OnTLSInit: {}", e.what());
}
return ctx;
}
void PubSubManager::HandleListenResponse(const rapidjson::Document &msg)
{
std::string error;
if (rj::getSafe(msg, "error", error)) {
std::string nonce;
rj::getSafe(msg, "nonce", nonce);
const auto &xd = sentMessages;
const auto &payload = sentMessages[nonce];
if (error.empty()) {
debug::Log("Successfully listened to nonce {}", nonce);
// Nothing went wrong
return;
}
debug::Log("PubSub error: {} on nonce {}", error, nonce);
return;
}
}
void PubSubManager::HandleMessageResponse(const rapidjson::Value &outerData)
{
std::string topic;
if (!rj::getSafe(outerData, "topic", topic)) {
debug::Log("Missing required string member `topic` in outerData");
return;
}
std::string payload;
if (!rj::getSafe(outerData, "message", payload)) {
debug::Log("Expected string message in outerData");
return;
}
rapidjson::Document msg;
rapidjson::ParseResult res = msg.Parse(payload.c_str());
if (!res) {
debug::Log("Error parsing message '{}' from PubSub: {}", payload,
rapidjson::GetParseError_En(res.Code()));
return;
}
if (topic.find("whispers.") == 0) {
std::string whisperType;
if (!rj::getSafe(msg, "type", whisperType)) {
debug::Log("Bad whisper data");
return;
}
if (whisperType == "whisper_received") {
this->sig.whisper.received.invoke(msg);
} else if (whisperType == "whisper_sent") {
this->sig.whisper.sent.invoke(msg);
} else if (whisperType == "thread") {
// Handle thread?
} else {
debug::Log("Invalid whisper type: {}", whisperType);
assert(false);
return;
}
} else if (topic.find("chat_moderator_actions.") == 0) {
const auto &data = msg["data"];
std::string moderationAction;
if (!rj::getSafe(data, "moderation_action", moderationAction)) {
debug::Log("Missing moderation action in data: {}", Stringify(data));
return;
}
auto handlerIt = this->moderationActionHandlers.find(moderationAction);
if (handlerIt == this->moderationActionHandlers.end()) {
debug::Log("No handler found for moderation action {}", moderationAction);
return;
}
// Invoke handler function
handlerIt->second(data);
} else {
debug::Log("Unknown topic: {}", topic);
return;
}
}
void PubSubManager::RunThread()
{
debug::Log("Start pubsub manager thread");
this->websocketClient.run();
debug::Log("Done with pubsub manager thread");
}
} // namespace singletons
} // namespace chatterino

View file

@ -0,0 +1,158 @@
#pragma once
#include "providers/twitch/twitchaccount.hpp"
#include "providers/twitch/twitchserver.hpp"
#include "singletons/helper/pubsubactions.hpp"
#include <rapidjson/document.h>
#include <QString>
#include <pajlada/signals/signal.hpp>
#include <websocketpp/client.hpp>
#include <websocketpp/config/asio_client.hpp>
#include <atomic>
#include <chrono>
#include <map>
#include <memory>
#include <set>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>
namespace chatterino {
namespace singletons {
using WebsocketClient = websocketpp::client<websocketpp::config::asio_tls_client>;
using WebsocketHandle = websocketpp::connection_hdl;
using WebsocketErrorCode = websocketpp::lib::error_code;
#define MAX_PUBSUB_LISTENS 50
#define MAX_PUBSUB_CONNECTIONS 10
struct Listener {
std::string topic;
bool authed;
bool persistent;
bool confirmed = false;
};
class PubSubClient : public std::enable_shared_from_this<PubSubClient>
{
WebsocketClient &websocketClient;
WebsocketHandle handle;
uint16_t numListens = 0;
std::vector<Listener> listeners;
std::atomic<bool> awaitingPong{false};
std::atomic<bool> started{false};
public:
PubSubClient(WebsocketClient &_websocketClient, WebsocketHandle _handle);
void Start();
void Stop();
bool Listen(rapidjson::Document &message);
void UnlistenPrefix(const std::string &prefix);
void HandlePong();
bool isListeningToTopic(const std::string &topic);
private:
void Ping();
bool Send(const char *payload);
};
class PubSubManager
{
PubSubManager();
using WebsocketMessagePtr = websocketpp::config::asio_tls_client::message_type::ptr;
using WebsocketContextPtr = websocketpp::lib::shared_ptr<boost::asio::ssl::context>;
template <typename T>
using Signal = pajlada::Signals::Signal<T>; // type-id is vector<T, Alloc<T>>
WebsocketClient websocketClient;
std::unique_ptr<std::thread> mainThread;
public:
enum class State {
Connected,
Disconnected,
};
static PubSubManager &getInstance();
void Start();
bool IsConnected() const
{
return this->state == State::Connected;
}
pajlada::Signals::NoArgSignal connected;
struct {
struct {
Signal<ClearChatAction> chatCleared;
Signal<ModeChangedAction> modeChanged;
Signal<ModerationStateAction> moderationStateChanged;
Signal<TimeoutAction> userTimedOut;
Signal<BanAction> userBanned;
Signal<UnbanAction> userUnbanned;
} moderation;
struct {
// Parsing should be done in PubSubManager as well,
// but for now we just send the raw data
Signal<const rapidjson::Value &> received;
Signal<const rapidjson::Value &> sent;
} whisper;
} sig;
void ListenToWhispers(std::shared_ptr<providers::twitch::TwitchAccount> account);
void UnlistenAllModerationActions();
void ListenToChannelModerationActions(
const QString &channelID, std::shared_ptr<providers::twitch::TwitchAccount> account);
std::vector<std::unique_ptr<rapidjson::Document>> requests;
private:
void listenToTopic(const std::string &topic,
std::shared_ptr<providers::twitch::TwitchAccount> account);
void Listen(rapidjson::Document &&msg);
bool TryListen(rapidjson::Document &msg);
bool isListeningToTopic(const std::string &topic);
void AddClient();
State state = State::Connected;
std::map<WebsocketHandle, std::shared_ptr<PubSubClient>, std::owner_less<WebsocketHandle>>
clients;
std::unordered_map<std::string, std::function<void(const rapidjson::Value &)>>
moderationActionHandlers;
void OnMessage(websocketpp::connection_hdl hdl, WebsocketMessagePtr msg);
void OnConnectionOpen(websocketpp::connection_hdl hdl);
void OnConnectionClose(websocketpp::connection_hdl hdl);
WebsocketContextPtr OnTLSInit(websocketpp::connection_hdl hdl);
void HandleListenResponse(const rapidjson::Document &msg);
void HandleMessageResponse(const rapidjson::Value &data);
void RunThread();
};
} // namespace singletons
} // namespace chatterino

View file

@ -312,8 +312,6 @@ ResourceManager::BadgeVersion::BadgeVersion(QJsonObject &&root)
void ResourceManager::loadChannelData(const QString &roomID, bool bypassCache) void ResourceManager::loadChannelData(const QString &roomID, bool bypassCache)
{ {
qDebug() << "Load channel data for" << roomID;
QString url = "https://badges.twitch.tv/v1/badges/channels/" + roomID + "/display?language=en"; QString url = "https://badges.twitch.tv/v1/badges/channels/" + roomID + "/display?language=en";
util::NetworkRequest req(url); util::NetworkRequest req(url);
@ -393,7 +391,6 @@ void ResourceManager::loadDynamicTwitchBadges()
req.setCaller(QThread::currentThread()); req.setCaller(QThread::currentThread());
req.getJSON([this](QJsonObject &root) { req.getJSON([this](QJsonObject &root) {
QJsonObject sets = root.value("badge_sets").toObject(); QJsonObject sets = root.value("badge_sets").toObject();
qDebug() << "badges fetched";
for (QJsonObject::iterator it = sets.begin(); it != sets.end(); ++it) { for (QJsonObject::iterator it = sets.begin(); it != sets.end(); ++it) {
QJsonObject versions = it.value().toObject().value("versions").toObject(); QJsonObject versions = it.value().toObject().value("versions").toObject();
@ -424,7 +421,6 @@ void ResourceManager::loadChatterinoBadges()
req.getJSON([this](QJsonObject &root) { req.getJSON([this](QJsonObject &root) {
QJsonArray badgeVariants = root.value("badges").toArray(); QJsonArray badgeVariants = root.value("badges").toArray();
qDebug() << "chatbadges fetched";
for (QJsonArray::iterator it = badgeVariants.begin(); it != badgeVariants.end(); ++it) { for (QJsonArray::iterator it = badgeVariants.begin(); it != badgeVariants.end(); ++it) {
QJsonObject badgeVariant = it->toObject(); QJsonObject badgeVariant = it->toObject();
const std::string badgeVariantTooltip = const std::string badgeVariantTooltip =

View file

@ -133,7 +133,7 @@ public:
if (cachedFile.open(QIODevice::ReadOnly)) { if (cachedFile.open(QIODevice::ReadOnly)) {
QByteArray bytes = cachedFile.readAll(); QByteArray bytes = cachedFile.readAll();
qDebug() << "loaded cached resource" << this->data.request.url(); // qDebug() << "Loaded cached resource" << this->data.request.url();
onFinished(bytes); onFinished(bytes);

View file

@ -0,0 +1,19 @@
#include "util/rapidjson-helpers.hpp"
namespace chatterino {
namespace rj {
void addMember(rapidjson::Value &obj, const char *key, rapidjson::Value &&value,
rapidjson::Document::AllocatorType &a)
{
obj.AddMember(rapidjson::Value(key, a).Move(), value, a);
}
void addMember(rapidjson::Value &obj, const char *key, rapidjson::Value &value,
rapidjson::Document::AllocatorType &a)
{
obj.AddMember(rapidjson::Value(key, a).Move(), value.Move(), a);
}
} // namespace rj
} // namespace chatterino

View file

@ -0,0 +1,97 @@
#pragma once
#include "util/serialize-custom.hpp"
#include <rapidjson/document.h>
#include <pajlada/settings/serialize.hpp>
#include <cassert>
namespace chatterino {
namespace rj {
void addMember(rapidjson::Value &obj, const char *key, rapidjson::Value &&value,
rapidjson::Document::AllocatorType &a);
void addMember(rapidjson::Value &obj, const char *key, rapidjson::Value &value,
rapidjson::Document::AllocatorType &a);
template <typename Type>
void set(rapidjson::Value &obj, const char *key, const Type &value,
rapidjson::Document::AllocatorType &a)
{
assert(obj.IsObject());
addMember(obj, key, pajlada::Settings::Serialize<Type>::get(value, a), a);
}
template <>
inline void set(rapidjson::Value &obj, const char *key, const rapidjson::Value &value,
rapidjson::Document::AllocatorType &a)
{
assert(obj.IsObject());
addMember(obj, key, const_cast<rapidjson::Value &>(value), a);
}
template <typename Type>
void set(rapidjson::Document &obj, const char *key, const Type &value)
{
assert(obj.IsObject());
auto &a = obj.GetAllocator();
addMember(obj, key, pajlada::Settings::Serialize<Type>::get(value, a), a);
}
template <>
inline void set(rapidjson::Document &obj, const char *key, const rapidjson::Value &value)
{
assert(obj.IsObject());
auto &a = obj.GetAllocator();
addMember(obj, key, const_cast<rapidjson::Value &>(value), a);
}
template <typename Type>
void add(rapidjson::Value &arr, const Type &value, rapidjson::Document::AllocatorType &a)
{
assert(arr.IsArray());
arr.PushBack(pajlada::Settings::Serialize<Type>::get(value, a), a);
}
template <typename Type>
bool getSafe(const rapidjson::Value &obj, const char *key, Type &out)
{
if (!obj.IsObject()) {
return false;
}
if (!obj.HasMember(key)) {
return false;
}
try {
out = pajlada::Settings::Deserialize<Type>::get(obj[key]);
} catch (const std::runtime_error &) {
return false;
}
return true;
}
template <typename Type>
bool getSafe(const rapidjson::Value &value, Type &out)
{
try {
out = pajlada::Settings::Deserialize<Type>::get(value);
} catch (const std::runtime_error &) {
return false;
}
return true;
}
} // namespace rj
} // namespace chatterino

View file

@ -279,11 +279,13 @@ QString ChannelView::getSelectedText()
} }
qDebug() << "xd >>>>"; qDebug() << "xd >>>>";
for (int msg = selection.min.messageIndex; msg <= selection.max.messageIndex; msg++) { for (int msg = selection.selectionMin.messageIndex; msg <= selection.selectionMax.messageIndex;
msg++) {
MessageLayoutPtr layout = messagesSnapshot[msg]; MessageLayoutPtr layout = messagesSnapshot[msg];
int from = msg == selection.min.messageIndex ? selection.min.charIndex : 0; int from =
int to = msg == selection.max.messageIndex ? selection.max.charIndex msg == selection.selectionMin.messageIndex ? selection.selectionMin.charIndex : 0;
: layout->getLastCharacterIndex() + 1; int to = msg == selection.selectionMax.messageIndex ? selection.selectionMax.charIndex
: layout->getLastCharacterIndex() + 1;
qDebug() << "from:" << from << ", to:" << to; qDebug() << "from:" << from << ", to:" << to;
@ -405,8 +407,8 @@ void ChannelView::setChannel(ChannelPtr newChannel)
// on message removed // on message removed
this->messageRemovedConnection = this->messageRemovedConnection =
newChannel->messageRemovedFromStart.connect([this](MessagePtr &) { newChannel->messageRemovedFromStart.connect([this](MessagePtr &) {
this->selection.min.messageIndex--; this->selection.selectionMin.messageIndex--;
this->selection.max.messageIndex--; this->selection.selectionMax.messageIndex--;
this->selection.start.messageIndex--; this->selection.start.messageIndex--;
this->selection.end.messageIndex--; this->selection.end.messageIndex--;