#include "singletons/pubsubmanager.hpp" #include "debug/log.hpp" #include "singletons/accountmanager.hpp" #include "singletons/helper/pubsubactions.hpp" #include "singletons/helper/pubsubhelpers.hpp" #include "util/rapidjson-helpers.hpp" #include #include #include #define TWITCH_PUBSUB_URL "wss://pubsub-edge.twitch.tv" using websocketpp::lib::bind; using websocketpp::lib::placeholders::_1; using websocketpp::lib::placeholders::_2; namespace chatterino { namespace singletons { static const char *pingPayload = "{\"type\":\"PING\"}"; static std::map sentMessages; PubSubClient::PubSubClient(WebsocketClient &_websocketClient, WebsocketHandle _handle) : websocketClient(_websocketClient) , handle(_handle) { } void PubSubClient::Start() { assert(!this->started); this->started = true; this->Ping(); } void PubSubClient::Stop() { assert(this->started); this->started = false; } bool PubSubClient::Listen(rapidjson::Document &message) { int numRequestedListens = message["data"]["topics"].Size(); if (this->numListens + numRequestedListens > MAX_PUBSUB_LISTENS) { // This PubSubClient is already at its peak listens return false; } this->numListens += numRequestedListens; for (const auto &topic : message["data"]["topics"].GetArray()) { this->listeners.emplace_back(Listener{topic.GetString(), false, false, false}); } auto uuid = CreateUUID(); rj::set(message, "nonce", uuid); std::string payload = Stringify(message); sentMessages[uuid.toStdString()] = payload; this->Send(payload.c_str()); return true; } void PubSubClient::UnlistenPrefix(const std::string &prefix) { std::vector topics; for (auto it = this->listeners.begin(); it != this->listeners.end();) { const auto &listener = *it; if (listener.topic.find(prefix) == 0) { topics.push_back(listener.topic); it = this->listeners.erase(it); } else { ++it; } } if (topics.empty()) { return; } auto message = CreateUnlistenMessage(topics); auto uuid = CreateUUID(); rj::set(message, "nonce", CreateUUID()); std::string payload = Stringify(message); sentMessages[uuid.toStdString()] = payload; this->Send(payload.c_str()); } void PubSubClient::HandlePong() { assert(this->awaitingPong); debug::Log("Got pong!"); this->awaitingPong = false; } bool PubSubClient::isListeningToTopic(const std::string &payload) { for (const auto &listener : this->listeners) { if (listener.topic == payload) { return true; } } return false; } void PubSubClient::Ping() { assert(this->started); if (!this->Send(pingPayload)) { return; } this->awaitingPong = true; auto self = this->shared_from_this(); RunAfter(this->websocketClient.get_io_service(), std::chrono::seconds(15), [self](auto timer) { if (!self->started) { return; } if (self->awaitingPong) { debug::Log("No pong respnose, disconnect!"); // TODO(pajlada): Label this connection as "disconnect me" } }); RunAfter(this->websocketClient.get_io_service(), std::chrono::minutes(5), [self](auto timer) { if (!self->started) { return; } self->Ping(); // }); } bool PubSubClient::Send(const char *payload) { WebsocketErrorCode ec; this->websocketClient.send(this->handle, payload, websocketpp::frame::opcode::text, ec); if (ec) { debug::Log("Error sending message {}: {}", payload, ec.message()); // TODO(pajlada): Check which error code happened and maybe gracefully handle it return false; } return true; } PubSubManager::PubSubManager() { this->moderationActionHandlers["clear"] = [this](const auto &data) { ClearChatAction action(data); this->sig.moderation.chatCleared.invoke(action); }; this->moderationActionHandlers["slowoff"] = [this](const auto &data) { ModeChangedAction action(data); action.mode = ModeChangedAction::Mode::Slow; action.state = ModeChangedAction::State::Off; this->sig.moderation.modeChanged.invoke(action); }; this->moderationActionHandlers["slow"] = [this](const auto &data) { ModeChangedAction action(data); action.mode = ModeChangedAction::Mode::Slow; action.state = ModeChangedAction::State::On; if (!data.HasMember("args")) { debug::Log("Missing required args member"); return; } const auto &args = data["args"]; if (!args.IsArray()) { debug::Log("args member must be an array"); return; } if (args.Size() == 0) { debug::Log("Missing duration argument in slowmode on"); return; } const auto &durationArg = args[0]; if (!durationArg.IsString()) { debug::Log("Duration arg must be a string"); return; } bool ok; action.args.duration = QString(durationArg.GetString()).toUInt(&ok, 10); this->sig.moderation.modeChanged.invoke(action); }; this->moderationActionHandlers["r9kbetaoff"] = [this](const auto &data) { ModeChangedAction action(data); action.mode = ModeChangedAction::Mode::R9K; action.state = ModeChangedAction::State::Off; this->sig.moderation.modeChanged.invoke(action); }; this->moderationActionHandlers["r9kbeta"] = [this](const auto &data) { ModeChangedAction action(data); action.mode = ModeChangedAction::Mode::R9K; action.state = ModeChangedAction::State::On; this->sig.moderation.modeChanged.invoke(action); }; this->moderationActionHandlers["subscribersoff"] = [this](const auto &data) { ModeChangedAction action(data); action.mode = ModeChangedAction::Mode::SubscribersOnly; action.state = ModeChangedAction::State::Off; this->sig.moderation.modeChanged.invoke(action); }; this->moderationActionHandlers["subscribers"] = [this](const auto &data) { ModeChangedAction action(data); action.mode = ModeChangedAction::Mode::SubscribersOnly; action.state = ModeChangedAction::State::On; this->sig.moderation.modeChanged.invoke(action); }; this->moderationActionHandlers["emoteonlyoff"] = [this](const auto &data) { ModeChangedAction action(data); action.mode = ModeChangedAction::Mode::EmoteOnly; action.state = ModeChangedAction::State::Off; this->sig.moderation.modeChanged.invoke(action); }; this->moderationActionHandlers["emoteonly"] = [this](const auto &data) { ModeChangedAction action(data); action.mode = ModeChangedAction::Mode::EmoteOnly; action.state = ModeChangedAction::State::On; this->sig.moderation.modeChanged.invoke(action); }; this->moderationActionHandlers["unmod"] = [this](const auto &data) { ModerationStateAction action(data); getTargetUser(data, action.target); action.modded = false; this->sig.moderation.moderationStateChanged.invoke(action); }; this->moderationActionHandlers["mod"] = [this](const auto &data) { ModerationStateAction action(data); getTargetUser(data, action.target); action.modded = true; this->sig.moderation.moderationStateChanged.invoke(action); }; this->moderationActionHandlers["timeout"] = [this](const auto &data) { TimeoutAction action(data); getCreatedByUser(data, action.source); getTargetUser(data, action.target); try { const auto &args = getArgs(data); if (args.Size() < 2) { return; } if (!rj::getSafe(args[0], action.target.name)) { return; } QString durationString; if (!rj::getSafe(args[1], durationString)) { return; } bool ok; action.duration = durationString.toUInt(&ok, 10); if (args.Size() >= 3) { if (!rj::getSafe(args[2], action.reason)) { return; } } this->sig.moderation.userTimedOut.invoke(action); } catch (const std::runtime_error &ex) { debug::Log("Error parsing moderation action: {}", ex.what()); } }; this->moderationActionHandlers["ban"] = [this](const auto &data) { BanAction action(data); getCreatedByUser(data, action.source); getTargetUser(data, action.target); try { const auto &args = getArgs(data); if (args.Size() < 1) { return; } if (!rj::getSafe(args[0], action.target.name)) { return; } if (args.Size() >= 2) { if (!rj::getSafe(args[1], action.reason)) { return; } } this->sig.moderation.userBanned.invoke(action); } catch (const std::runtime_error &ex) { debug::Log("Error parsing moderation action: {}", ex.what()); } }; this->moderationActionHandlers["unban"] = [this](const auto &data) { UnbanAction action(data); getCreatedByUser(data, action.source); getTargetUser(data, action.target); action.previousState = UnbanAction::Banned; try { const auto &args = getArgs(data); if (args.Size() < 1) { return; } if (!rj::getSafe(args[0], action.target.name)) { return; } this->sig.moderation.userUnbanned.invoke(action); } catch (const std::runtime_error &ex) { debug::Log("Error parsing moderation action: {}", ex.what()); } }; this->moderationActionHandlers["untimeout"] = [this](const auto &data) { UnbanAction action(data); getCreatedByUser(data, action.source); getTargetUser(data, action.target); action.previousState = UnbanAction::TimedOut; try { const auto &args = getArgs(data); if (args.Size() < 1) { return; } if (!rj::getSafe(args[0], action.target.name)) { return; } this->sig.moderation.userUnbanned.invoke(action); } catch (const std::runtime_error &ex) { debug::Log("Error parsing moderation action: {}", ex.what()); } }; this->websocketClient.set_access_channels(websocketpp::log::alevel::all); this->websocketClient.clear_access_channels(websocketpp::log::alevel::frame_payload); this->websocketClient.init_asio(); // SSL Handshake this->websocketClient.set_tls_init_handler(bind(&PubSubManager::OnTLSInit, this, ::_1)); this->websocketClient.set_message_handler(bind(&PubSubManager::OnMessage, this, ::_1, ::_2)); this->websocketClient.set_open_handler(bind(&PubSubManager::OnConnectionOpen, this, ::_1)); this->websocketClient.set_close_handler(bind(&PubSubManager::OnConnectionClose, this, ::_1)); // Add an initial client this->AddClient(); } void PubSubManager::AddClient() { websocketpp::lib::error_code ec; auto con = this->websocketClient.get_connection(TWITCH_PUBSUB_URL, ec); if (ec) { debug::Log("Unable to establish connection: {}", ec.message()); return; } this->websocketClient.connect(con); } PubSubManager &PubSubManager::getInstance() { static PubSubManager instance; return instance; } void PubSubManager::Start() { this->mainThread.reset(new std::thread(std::bind(&PubSubManager::RunThread, this))); } void PubSubManager::ListenToWhispers(std::shared_ptr account) { assert(account != nullptr); std::string userID = account->getUserId().toStdString(); debug::Log("Connection open!"); websocketpp::lib::error_code ec; std::vector topics({"whispers." + userID}); this->Listen(std::move(CreateListenMessage(topics, account))); if (ec) { debug::Log("Unable to send message to websocket server: {}", ec.message()); return; } } void PubSubManager::UnlistenAllModerationActions() { for (const auto &p : this->clients) { const auto &client = p.second; client->UnlistenPrefix("chat_moderator_actions."); } } void PubSubManager::ListenToChannelModerationActions( const QString &channelID, std::shared_ptr account) { assert(!channelID.isEmpty()); assert(account != nullptr); QString userID = account->getUserId(); assert(!userID.isEmpty()); std::string topic(fS("chat_moderator_actions.{}.{}", userID, channelID)); if (this->isListeningToTopic(topic)) { debug::Log("We are already listening to topic {}", topic); return; } debug::Log("Listen to topic {}", topic); this->listenToTopic(topic, account); } void PubSubManager::listenToTopic(const std::string &topic, std::shared_ptr account) { auto message = CreateListenMessage({topic}, account); this->Listen(std::move(message)); } void PubSubManager::Listen(rapidjson::Document &&msg) { if (this->TryListen(msg)) { debug::Log("Successfully listened!"); return; } debug::Log("Added to the back of the queue"); this->requests.emplace_back(std::make_unique(std::move(msg))); } bool PubSubManager::TryListen(rapidjson::Document &msg) { debug::Log("TryListen with {} clients", this->clients.size()); for (const auto &p : this->clients) { const auto &client = p.second; if (client->Listen(msg)) { return true; } } return false; } bool PubSubManager::isListeningToTopic(const std::string &topic) { for (const auto &p : this->clients) { const auto &client = p.second; if (client->isListeningToTopic(topic)) { return true; } } return false; } void PubSubManager::OnMessage(websocketpp::connection_hdl hdl, WebsocketMessagePtr websocketMessage) { const std::string &payload = websocketMessage->get_payload(); rapidjson::Document msg; rapidjson::ParseResult res = msg.Parse(payload.c_str()); if (!res) { debug::Log("Error parsing message '{}' from PubSub: {}", payload, rapidjson::GetParseError_En(res.Code())); return; } if (!msg.IsObject()) { debug::Log("Error parsing message '{}' from PubSub. Root object is not an object", payload); return; } std::string type; if (!rj::getSafe(msg, "type", type)) { debug::Log("Missing required string member `type` in message root"); return; } if (type == "RESPONSE") { this->HandleListenResponse(msg); } else if (type == "MESSAGE") { if (!msg.HasMember("data")) { debug::Log("Missing required object member `data` in message root"); return; } const auto &data = msg["data"]; if (!data.IsObject()) { debug::Log("Member `data` must be an object"); return; } this->HandleMessageResponse(data); } else if (type == "PONG") { auto clientIt = this->clients.find(hdl); // If this assert goes off, there's something wrong with the connection creation/preserving // code KKona assert(clientIt != this->clients.end()); auto &client = *clientIt; client.second->HandlePong(); } else { debug::Log("Unknown message type: {}", type); } } void PubSubManager::OnConnectionOpen(WebsocketHandle hdl) { auto client = std::make_shared(this->websocketClient, hdl); // We separate the starting from the constructor because we will want to use shared_from_this client->Start(); this->clients.emplace(hdl, client); this->connected.invoke(); } void PubSubManager::OnConnectionClose(WebsocketHandle hdl) { auto clientIt = this->clients.find(hdl); // If this assert goes off, there's something wrong with the connection creation/preserving // code KKona assert(clientIt != this->clients.end()); auto &client = clientIt->second; client->Stop(); this->clients.erase(clientIt); this->connected.invoke(); } PubSubManager::WebsocketContextPtr PubSubManager::OnTLSInit(websocketpp::connection_hdl hdl) { WebsocketContextPtr ctx(new boost::asio::ssl::context(boost::asio::ssl::context::tlsv1)); try { ctx->set_options(boost::asio::ssl::context::default_workarounds | boost::asio::ssl::context::no_sslv2 | boost::asio::ssl::context::single_dh_use); } catch (const std::exception &e) { debug::Log("Exception caught in OnTLSInit: {}", e.what()); } return ctx; } void PubSubManager::HandleListenResponse(const rapidjson::Document &msg) { std::string error; if (rj::getSafe(msg, "error", error)) { std::string nonce; rj::getSafe(msg, "nonce", nonce); const auto &xd = sentMessages; const auto &payload = sentMessages[nonce]; if (error.empty()) { debug::Log("Successfully listened to nonce {}", nonce); // Nothing went wrong return; } debug::Log("PubSub error: {} on nonce {}", error, nonce); return; } } void PubSubManager::HandleMessageResponse(const rapidjson::Value &outerData) { std::string topic; if (!rj::getSafe(outerData, "topic", topic)) { debug::Log("Missing required string member `topic` in outerData"); return; } std::string payload; if (!rj::getSafe(outerData, "message", payload)) { debug::Log("Expected string message in outerData"); return; } rapidjson::Document msg; rapidjson::ParseResult res = msg.Parse(payload.c_str()); if (!res) { debug::Log("Error parsing message '{}' from PubSub: {}", payload, rapidjson::GetParseError_En(res.Code())); return; } if (topic.find("whispers.") == 0) { std::string whisperType; if (!rj::getSafe(msg, "type", whisperType)) { debug::Log("Bad whisper data"); return; } if (whisperType == "whisper_received") { this->sig.whisper.received.invoke(msg); } else if (whisperType == "whisper_sent") { this->sig.whisper.sent.invoke(msg); } else if (whisperType == "thread") { // Handle thread? } else { debug::Log("Invalid whisper type: {}", whisperType); assert(false); return; } } else if (topic.find("chat_moderator_actions.") == 0) { const auto &data = msg["data"]; std::string moderationAction; if (!rj::getSafe(data, "moderation_action", moderationAction)) { debug::Log("Missing moderation action in data: {}", Stringify(data)); return; } auto handlerIt = this->moderationActionHandlers.find(moderationAction); if (handlerIt == this->moderationActionHandlers.end()) { debug::Log("No handler found for moderation action {}", moderationAction); return; } // Invoke handler function handlerIt->second(data); } else { debug::Log("Unknown topic: {}", topic); return; } } void PubSubManager::RunThread() { debug::Log("Start pubsub manager thread"); this->websocketClient.run(); debug::Log("Done with pubsub manager thread"); } } // namespace singletons } // namespace chatterino