#pragma once #include "providers/twitch/ChatterinoWebSocketppLogger.hpp" #include "providers/twitch/PubSubActions.hpp" #include "providers/twitch/PubSubClient.hpp" #include "providers/twitch/PubSubClientOptions.hpp" #include "providers/twitch/PubSubMessages.hpp" #include "providers/twitch/PubSubWebsocket.hpp" #include "providers/twitch/TwitchAccount.hpp" #include "util/ExponentialBackoff.hpp" #include #include #include #include #include #include #include #include #include #include #include namespace chatterino { class PubSub { using WebsocketMessagePtr = websocketpp::config::asio_tls_client::message_type::ptr; using WebsocketContextPtr = websocketpp::lib::shared_ptr; template using Signal = pajlada::Signals::Signal; // type-id is vector> struct NonceInfo { std::weak_ptr client; QString messageType; // e.g. LISTEN or UNLISTEN std::vector topics; std::vector::size_type topicCount; }; WebsocketClient websocketClient; std::unique_ptr mainThread; // Account credentials // Set from setAccount or setAccountData QString token_; QString userID_; public: // The max amount of connections we may open static constexpr int maxConnections = 10; PubSub(const QString &host, std::chrono::seconds pingInterval = std::chrono::seconds(15)); void setAccount(std::shared_ptr account) { this->token_ = account->getOAuthToken(); this->userID_ = account->getUserId(); } void setAccountData(QString token, QString userID) { this->token_ = token; this->userID_ = userID; } ~PubSub() = delete; enum class State { Connected, Disconnected, }; void start(); void stop(); bool isConnected() const { return this->state == State::Connected; } struct { struct { Signal chatCleared; Signal messageDeleted; Signal modeChanged; Signal moderationStateChanged; Signal userBanned; Signal userUnbanned; // Message caught by automod // channelID pajlada::Signals::Signal autoModMessageCaught; // Message blocked by moderator Signal autoModMessageBlocked; Signal automodUserMessage; Signal automodInfoMessage; } moderation; struct { // Parsing should be done in PubSubManager as well, // but for now we just send the raw data Signal received; Signal sent; } whisper; struct { Signal redeemed; } pointReward; } signals_; void unlistenAllModerationActions(); void unlistenWhispers(); bool listenToWhispers(); void listenToChannelModerationActions(const QString &channelID); void listenToAutomod(const QString &channelID); void listenToChannelPointRewards(const QString &channelID); std::vector requests; struct { std::atomic connectionsClosed{0}; std::atomic connectionsOpened{0}; std::atomic connectionsFailed{0}; std::atomic messagesReceived{0}; std::atomic messagesFailedToParse{0}; std::atomic failedListenResponses{0}; std::atomic listenResponses{0}; std::atomic unlistenResponses{0}; } diag; void listenToTopic(const QString &topic); private: void listen(PubSubListenMessage msg); bool tryListen(PubSubListenMessage msg); bool isListeningToTopic(const QString &topic); void addClient(); std::atomic addingClient{false}; ExponentialBackoff<5> connectBackoff{std::chrono::milliseconds(1000)}; State state = State::Connected; std::map, std::owner_less> clients; std::unordered_map< QString, std::function> moderationActionHandlers; std::unordered_map< QString, std::function> channelTermsActionHandlers; void onMessage(websocketpp::connection_hdl hdl, WebsocketMessagePtr msg); void onConnectionOpen(websocketpp::connection_hdl hdl); void onConnectionFail(websocketpp::connection_hdl hdl); void onConnectionClose(websocketpp::connection_hdl hdl); WebsocketContextPtr onTLSInit(websocketpp::connection_hdl hdl); void handleResponse(const PubSubMessage &message); void handleListenResponse(const NonceInfo &info, bool failed); void handleUnlistenResponse(const NonceInfo &info, bool failed); void handleMessageResponse(const PubSubMessageMessage &message); // Register a nonce for a specific client void registerNonce(QString nonce, NonceInfo nonceInfo); // Find client associated with a nonce boost::optional findNonceInfo(QString nonce); std::unordered_map nonces_; void runThread(); std::shared_ptr work{nullptr}; const QString host_; const PubSubClientOptions clientOptions_; bool stopping_{false}; }; } // namespace chatterino