Modify pubsub functions to follow the Chatterino function style

This commit is contained in:
Rasmus Karlsson 2018-04-28 16:07:18 +02:00
parent 0f22d9d002
commit 01b2230bcf
6 changed files with 86 additions and 86 deletions

View file

@ -152,14 +152,14 @@ void Application::initialize()
util::postToThread([chan, msg] { chan->addMessage(msg); }); util::postToThread([chan, msg] { chan->addMessage(msg); });
}); });
this->twitch.pubsub->Start(); this->twitch.pubsub->start();
auto RequestModerationActions = [=]() { auto RequestModerationActions = [=]() {
this->twitch.pubsub->UnlistenAllModerationActions(); this->twitch.pubsub->unlistenAllModerationActions();
// TODO(pajlada): Unlisten to all authed topics instead of only moderation topics // TODO(pajlada): Unlisten to all authed topics instead of only moderation topics
// this->twitch.pubsub->UnlistenAllAuthedTopics(); // this->twitch.pubsub->UnlistenAllAuthedTopics();
this->twitch.pubsub->ListenToWhispers(this->accounts->Twitch.getCurrent()); // this->twitch.pubsub->listenToWhispers(this->accounts->Twitch.getCurrent()); //
}; };
this->accounts->Twitch.userChanged.connect(RequestModerationActions); this->accounts->Twitch.userChanged.connect(RequestModerationActions);

View file

@ -33,23 +33,23 @@ PubSubClient::PubSubClient(WebsocketClient &_websocketClient, WebsocketHandle _h
{ {
} }
void PubSubClient::Start() void PubSubClient::start()
{ {
assert(!this->started); assert(!this->started);
this->started = true; this->started = true;
this->Ping(); this->ping();
} }
void PubSubClient::Stop() void PubSubClient::stop()
{ {
assert(this->started); assert(this->started);
this->started = false; this->started = false;
} }
bool PubSubClient::Listen(rapidjson::Document &message) bool PubSubClient::listen(rapidjson::Document &message)
{ {
int numRequestedListens = message["data"]["topics"].Size(); int numRequestedListens = message["data"]["topics"].Size();
@ -68,15 +68,15 @@ bool PubSubClient::Listen(rapidjson::Document &message)
rj::set(message, "nonce", uuid); rj::set(message, "nonce", uuid);
std::string payload = Stringify(message); std::string payload = stringify(message);
sentMessages[uuid.toStdString()] = payload; sentMessages[uuid.toStdString()] = payload;
this->Send(payload.c_str()); this->send(payload.c_str());
return true; return true;
} }
void PubSubClient::UnlistenPrefix(const std::string &prefix) void PubSubClient::unlistenPrefix(const std::string &prefix)
{ {
std::vector<std::string> topics; std::vector<std::string> topics;
@ -94,19 +94,19 @@ void PubSubClient::UnlistenPrefix(const std::string &prefix)
return; return;
} }
auto message = CreateUnlistenMessage(topics); auto message = createUnlistenMessage(topics);
auto uuid = CreateUUID(); auto uuid = CreateUUID();
rj::set(message, "nonce", CreateUUID()); rj::set(message, "nonce", CreateUUID());
std::string payload = Stringify(message); std::string payload = stringify(message);
sentMessages[uuid.toStdString()] = payload; sentMessages[uuid.toStdString()] = payload;
this->Send(payload.c_str()); this->send(payload.c_str());
} }
void PubSubClient::HandlePong() void PubSubClient::handlePong()
{ {
assert(this->awaitingPong); assert(this->awaitingPong);
@ -126,11 +126,11 @@ bool PubSubClient::isListeningToTopic(const std::string &payload)
return false; return false;
} }
void PubSubClient::Ping() void PubSubClient::ping()
{ {
assert(this->started); assert(this->started);
if (!this->Send(pingPayload)) { if (!this->send(pingPayload)) {
return; return;
} }
@ -138,7 +138,7 @@ void PubSubClient::Ping()
auto self = this->shared_from_this(); auto self = this->shared_from_this();
RunAfter(this->websocketClient.get_io_service(), std::chrono::seconds(15), [self](auto timer) { runAfter(this->websocketClient.get_io_service(), std::chrono::seconds(15), [self](auto timer) {
if (!self->started) { if (!self->started) {
return; return;
} }
@ -149,16 +149,16 @@ void PubSubClient::Ping()
} }
}); });
RunAfter(this->websocketClient.get_io_service(), std::chrono::minutes(5), [self](auto timer) { runAfter(this->websocketClient.get_io_service(), std::chrono::minutes(5), [self](auto timer) {
if (!self->started) { if (!self->started) {
return; return;
} }
self->Ping(); // self->ping(); //
}); });
} }
bool PubSubClient::Send(const char *payload) bool PubSubClient::send(const char *payload)
{ {
WebsocketErrorCode ec; WebsocketErrorCode ec;
this->websocketClient.send(this->handle, payload, websocketpp::frame::opcode::text, ec); this->websocketClient.send(this->handle, payload, websocketpp::frame::opcode::text, ec);
@ -425,17 +425,17 @@ PubSub::PubSub()
this->websocketClient.init_asio(); this->websocketClient.init_asio();
// SSL Handshake // SSL Handshake
this->websocketClient.set_tls_init_handler(bind(&PubSub::OnTLSInit, this, ::_1)); this->websocketClient.set_tls_init_handler(bind(&PubSub::onTLSInit, this, ::_1));
this->websocketClient.set_message_handler(bind(&PubSub::OnMessage, this, ::_1, ::_2)); this->websocketClient.set_message_handler(bind(&PubSub::onMessage, this, ::_1, ::_2));
this->websocketClient.set_open_handler(bind(&PubSub::OnConnectionOpen, this, ::_1)); this->websocketClient.set_open_handler(bind(&PubSub::onConnectionOpen, this, ::_1));
this->websocketClient.set_close_handler(bind(&PubSub::OnConnectionClose, this, ::_1)); this->websocketClient.set_close_handler(bind(&PubSub::onConnectionClose, this, ::_1));
// Add an initial client // Add an initial client
this->AddClient(); this->addClient();
} }
void PubSub::AddClient() void PubSub::addClient()
{ {
websocketpp::lib::error_code ec; websocketpp::lib::error_code ec;
auto con = this->websocketClient.get_connection(TWITCH_PUBSUB_URL, ec); auto con = this->websocketClient.get_connection(TWITCH_PUBSUB_URL, ec);
@ -448,12 +448,12 @@ void PubSub::AddClient()
this->websocketClient.connect(con); this->websocketClient.connect(con);
} }
void PubSub::Start() void PubSub::start()
{ {
this->mainThread.reset(new std::thread(std::bind(&PubSub::RunThread, this))); this->mainThread.reset(new std::thread(std::bind(&PubSub::runThread, this)));
} }
void PubSub::ListenToWhispers(std::shared_ptr<providers::twitch::TwitchAccount> account) void PubSub::listenToWhispers(std::shared_ptr<providers::twitch::TwitchAccount> account)
{ {
assert(account != nullptr); assert(account != nullptr);
@ -464,7 +464,7 @@ void PubSub::ListenToWhispers(std::shared_ptr<providers::twitch::TwitchAccount>
std::vector<std::string> topics({"whispers." + userID}); std::vector<std::string> topics({"whispers." + userID});
this->Listen(std::move(CreateListenMessage(topics, account))); this->listen(std::move(createListenMessage(topics, account)));
if (ec) { if (ec) {
debug::Log("Unable to send message to websocket server: {}", ec.message()); debug::Log("Unable to send message to websocket server: {}", ec.message());
@ -472,15 +472,15 @@ void PubSub::ListenToWhispers(std::shared_ptr<providers::twitch::TwitchAccount>
} }
} }
void PubSub::UnlistenAllModerationActions() void PubSub::unlistenAllModerationActions()
{ {
for (const auto &p : this->clients) { for (const auto &p : this->clients) {
const auto &client = p.second; const auto &client = p.second;
client->UnlistenPrefix("chat_moderator_actions."); client->unlistenPrefix("chat_moderator_actions.");
} }
} }
void PubSub::ListenToChannelModerationActions( void PubSub::listenToChannelModerationActions(
const QString &channelID, std::shared_ptr<providers::twitch::TwitchAccount> account) const QString &channelID, std::shared_ptr<providers::twitch::TwitchAccount> account)
{ {
assert(!channelID.isEmpty()); assert(!channelID.isEmpty());
@ -503,14 +503,14 @@ void PubSub::ListenToChannelModerationActions(
void PubSub::listenToTopic(const std::string &topic, void PubSub::listenToTopic(const std::string &topic,
std::shared_ptr<providers::twitch::TwitchAccount> account) std::shared_ptr<providers::twitch::TwitchAccount> account)
{ {
auto message = CreateListenMessage({topic}, account); auto message = createListenMessage({topic}, account);
this->Listen(std::move(message)); this->listen(std::move(message));
} }
void PubSub::Listen(rapidjson::Document &&msg) void PubSub::listen(rapidjson::Document &&msg)
{ {
if (this->TryListen(msg)) { if (this->tryListen(msg)) {
debug::Log("Successfully listened!"); debug::Log("Successfully listened!");
return; return;
} }
@ -519,12 +519,12 @@ void PubSub::Listen(rapidjson::Document &&msg)
this->requests.emplace_back(std::make_unique<rapidjson::Document>(std::move(msg))); this->requests.emplace_back(std::make_unique<rapidjson::Document>(std::move(msg)));
} }
bool PubSub::TryListen(rapidjson::Document &msg) bool PubSub::tryListen(rapidjson::Document &msg)
{ {
debug::Log("TryListen with {} clients", this->clients.size()); debug::Log("tryListen with {} clients", this->clients.size());
for (const auto &p : this->clients) { for (const auto &p : this->clients) {
const auto &client = p.second; const auto &client = p.second;
if (client->Listen(msg)) { if (client->listen(msg)) {
return true; return true;
} }
} }
@ -544,7 +544,7 @@ bool PubSub::isListeningToTopic(const std::string &topic)
return false; return false;
} }
void PubSub::OnMessage(websocketpp::connection_hdl hdl, WebsocketMessagePtr websocketMessage) void PubSub::onMessage(websocketpp::connection_hdl hdl, WebsocketMessagePtr websocketMessage)
{ {
const std::string &payload = websocketMessage->get_payload(); const std::string &payload = websocketMessage->get_payload();
@ -571,7 +571,7 @@ void PubSub::OnMessage(websocketpp::connection_hdl hdl, WebsocketMessagePtr webs
} }
if (type == "RESPONSE") { if (type == "RESPONSE") {
this->HandleListenResponse(msg); this->handleListenResponse(msg);
} else if (type == "MESSAGE") { } else if (type == "MESSAGE") {
if (!msg.HasMember("data")) { if (!msg.HasMember("data")) {
debug::Log("Missing required object member `data` in message root"); debug::Log("Missing required object member `data` in message root");
@ -585,7 +585,7 @@ void PubSub::OnMessage(websocketpp::connection_hdl hdl, WebsocketMessagePtr webs
return; return;
} }
this->HandleMessageResponse(data); this->handleMessageResponse(data);
} else if (type == "PONG") { } else if (type == "PONG") {
auto clientIt = this->clients.find(hdl); auto clientIt = this->clients.find(hdl);
@ -595,25 +595,25 @@ void PubSub::OnMessage(websocketpp::connection_hdl hdl, WebsocketMessagePtr webs
auto &client = *clientIt; auto &client = *clientIt;
client.second->HandlePong(); client.second->handlePong();
} else { } else {
debug::Log("Unknown message type: {}", type); debug::Log("Unknown message type: {}", type);
} }
} }
void PubSub::OnConnectionOpen(WebsocketHandle hdl) void PubSub::onConnectionOpen(WebsocketHandle hdl)
{ {
auto client = std::make_shared<detail::PubSubClient>(this->websocketClient, hdl); auto client = std::make_shared<detail::PubSubClient>(this->websocketClient, hdl);
// We separate the starting from the constructor because we will want to use shared_from_this // We separate the starting from the constructor because we will want to use shared_from_this
client->Start(); client->start();
this->clients.emplace(hdl, client); this->clients.emplace(hdl, client);
this->connected.invoke(); this->connected.invoke();
} }
void PubSub::OnConnectionClose(WebsocketHandle hdl) void PubSub::onConnectionClose(WebsocketHandle hdl)
{ {
auto clientIt = this->clients.find(hdl); auto clientIt = this->clients.find(hdl);
@ -623,14 +623,14 @@ void PubSub::OnConnectionClose(WebsocketHandle hdl)
auto &client = clientIt->second; auto &client = clientIt->second;
client->Stop(); client->stop();
this->clients.erase(clientIt); this->clients.erase(clientIt);
this->connected.invoke(); this->connected.invoke();
} }
PubSub::WebsocketContextPtr PubSub::OnTLSInit(websocketpp::connection_hdl hdl) PubSub::WebsocketContextPtr PubSub::onTLSInit(websocketpp::connection_hdl hdl)
{ {
WebsocketContextPtr ctx(new boost::asio::ssl::context(boost::asio::ssl::context::tlsv1)); WebsocketContextPtr ctx(new boost::asio::ssl::context(boost::asio::ssl::context::tlsv1));
@ -645,7 +645,7 @@ PubSub::WebsocketContextPtr PubSub::OnTLSInit(websocketpp::connection_hdl hdl)
return ctx; return ctx;
} }
void PubSub::HandleListenResponse(const rapidjson::Document &msg) void PubSub::handleListenResponse(const rapidjson::Document &msg)
{ {
std::string error; std::string error;
@ -666,7 +666,7 @@ void PubSub::HandleListenResponse(const rapidjson::Document &msg)
} }
} }
void PubSub::HandleMessageResponse(const rapidjson::Value &outerData) void PubSub::handleMessageResponse(const rapidjson::Value &outerData)
{ {
QString topic; QString topic;
@ -719,7 +719,7 @@ void PubSub::HandleMessageResponse(const rapidjson::Value &outerData)
std::string moderationAction; std::string moderationAction;
if (!rj::getSafe(data, "moderation_action", moderationAction)) { if (!rj::getSafe(data, "moderation_action", moderationAction)) {
debug::Log("Missing moderation action in data: {}", Stringify(data)); debug::Log("Missing moderation action in data: {}", stringify(data));
return; return;
} }
@ -738,7 +738,7 @@ void PubSub::HandleMessageResponse(const rapidjson::Value &outerData)
} }
} }
void PubSub::RunThread() void PubSub::runThread()
{ {
debug::Log("Start pubsub manager thread"); debug::Log("Start pubsub manager thread");
this->websocketClient.run(); this->websocketClient.run();

View file

@ -54,19 +54,19 @@ class PubSubClient : public std::enable_shared_from_this<PubSubClient>
public: public:
PubSubClient(WebsocketClient &_websocketClient, WebsocketHandle _handle); PubSubClient(WebsocketClient &_websocketClient, WebsocketHandle _handle);
void Start(); void start();
void Stop(); void stop();
bool Listen(rapidjson::Document &message); bool listen(rapidjson::Document &message);
void UnlistenPrefix(const std::string &prefix); void unlistenPrefix(const std::string &prefix);
void HandlePong(); void handlePong();
bool isListeningToTopic(const std::string &topic); bool isListeningToTopic(const std::string &topic);
private: private:
void Ping(); void ping();
bool Send(const char *payload); bool send(const char *payload);
}; };
} // namespace detail } // namespace detail
@ -92,9 +92,9 @@ public:
Disconnected, Disconnected,
}; };
void Start(); void start();
bool IsConnected() const bool isConnected() const
{ {
return this->state == State::Connected; return this->state == State::Connected;
} }
@ -119,11 +119,11 @@ public:
} whisper; } whisper;
} sig; } sig;
void ListenToWhispers(std::shared_ptr<providers::twitch::TwitchAccount> account); void listenToWhispers(std::shared_ptr<providers::twitch::TwitchAccount> account);
void UnlistenAllModerationActions(); void unlistenAllModerationActions();
void ListenToChannelModerationActions( void listenToChannelModerationActions(
const QString &channelID, std::shared_ptr<providers::twitch::TwitchAccount> account); const QString &channelID, std::shared_ptr<providers::twitch::TwitchAccount> account);
std::vector<std::unique_ptr<rapidjson::Document>> requests; std::vector<std::unique_ptr<rapidjson::Document>> requests;
@ -132,12 +132,12 @@ private:
void listenToTopic(const std::string &topic, void listenToTopic(const std::string &topic,
std::shared_ptr<providers::twitch::TwitchAccount> account); std::shared_ptr<providers::twitch::TwitchAccount> account);
void Listen(rapidjson::Document &&msg); void listen(rapidjson::Document &&msg);
bool TryListen(rapidjson::Document &msg); bool tryListen(rapidjson::Document &msg);
bool isListeningToTopic(const std::string &topic); bool isListeningToTopic(const std::string &topic);
void AddClient(); void addClient();
State state = State::Connected; State state = State::Connected;
@ -148,15 +148,15 @@ private:
std::unordered_map<std::string, std::function<void(const rapidjson::Value &, const QString &)>> std::unordered_map<std::string, std::function<void(const rapidjson::Value &, const QString &)>>
moderationActionHandlers; moderationActionHandlers;
void OnMessage(websocketpp::connection_hdl hdl, WebsocketMessagePtr msg); void onMessage(websocketpp::connection_hdl hdl, WebsocketMessagePtr msg);
void OnConnectionOpen(websocketpp::connection_hdl hdl); void onConnectionOpen(websocketpp::connection_hdl hdl);
void OnConnectionClose(websocketpp::connection_hdl hdl); void onConnectionClose(websocketpp::connection_hdl hdl);
WebsocketContextPtr OnTLSInit(websocketpp::connection_hdl hdl); WebsocketContextPtr onTLSInit(websocketpp::connection_hdl hdl);
void HandleListenResponse(const rapidjson::Document &msg); void handleListenResponse(const rapidjson::Document &msg);
void HandleMessageResponse(const rapidjson::Value &data); void handleMessageResponse(const rapidjson::Value &data);
void RunThread(); void runThread();
}; };
} // namespace twitch } // namespace twitch

View file

@ -34,12 +34,12 @@ bool getTargetUser(const rapidjson::Value &data, ActionUser &user)
return rj::getSafe(data, "target_user_id", user.id); return rj::getSafe(data, "target_user_id", user.id);
} }
std::string Stringify(const rapidjson::Value &v) std::string stringify(const rapidjson::Value &v)
{ {
return pajlada::Settings::SettingManager::stringify(v); return pajlada::Settings::SettingManager::stringify(v);
} }
rapidjson::Document CreateListenMessage(const std::vector<std::string> &topicsVec, rapidjson::Document createListenMessage(const std::vector<std::string> &topicsVec,
std::shared_ptr<providers::twitch::TwitchAccount> account) std::shared_ptr<providers::twitch::TwitchAccount> account)
{ {
rapidjson::Document msg(rapidjson::kObjectType); rapidjson::Document msg(rapidjson::kObjectType);
@ -65,7 +65,7 @@ rapidjson::Document CreateListenMessage(const std::vector<std::string> &topicsVe
return msg; return msg;
} }
rapidjson::Document CreateUnlistenMessage(const std::vector<std::string> &topicsVec) rapidjson::Document createUnlistenMessage(const std::vector<std::string> &topicsVec)
{ {
rapidjson::Document msg(rapidjson::kObjectType); rapidjson::Document msg(rapidjson::kObjectType);
auto &a = msg.GetAllocator(); auto &a = msg.GetAllocator();

View file

@ -21,22 +21,22 @@ bool getCreatedByUser(const rapidjson::Value &data, ActionUser &user);
bool getTargetUser(const rapidjson::Value &data, ActionUser &user); bool getTargetUser(const rapidjson::Value &data, ActionUser &user);
std::string Stringify(const rapidjson::Value &v); std::string stringify(const rapidjson::Value &v);
rapidjson::Document CreateListenMessage(const std::vector<std::string> &topicsVec, rapidjson::Document createListenMessage(const std::vector<std::string> &topicsVec,
std::shared_ptr<providers::twitch::TwitchAccount> account); std::shared_ptr<providers::twitch::TwitchAccount> account);
rapidjson::Document CreateUnlistenMessage(const std::vector<std::string> &topicsVec); rapidjson::Document createUnlistenMessage(const std::vector<std::string> &topicsVec);
// Create timer using given ioService // Create timer using given ioService
template <typename Duration, typename Callback> template <typename Duration, typename Callback>
void RunAfter(boost::asio::io_service &ioService, Duration duration, Callback cb) void runAfter(boost::asio::io_service &ioService, Duration duration, Callback cb)
{ {
auto timer = std::make_shared<boost::asio::steady_timer>(ioService); auto timer = std::make_shared<boost::asio::steady_timer>(ioService);
timer->expires_from_now(duration); timer->expires_from_now(duration);
timer->async_wait([timer, cb](const boost::system::error_code &ec) { timer->async_wait([timer, cb](const boost::system::error_code &ec) {
if (ec) { if (ec) {
debug::Log("Error in RunAfter: {}", ec.message()); debug::Log("Error in runAfter: {}", ec.message());
return; return;
} }
@ -46,13 +46,13 @@ void RunAfter(boost::asio::io_service &ioService, Duration duration, Callback cb
// Use provided timer // Use provided timer
template <typename Duration, typename Callback> template <typename Duration, typename Callback>
void RunAfter(std::shared_ptr<boost::asio::steady_timer> timer, Duration duration, Callback cb) void runAfter(std::shared_ptr<boost::asio::steady_timer> timer, Duration duration, Callback cb)
{ {
timer->expires_from_now(duration); timer->expires_from_now(duration);
timer->async_wait([timer, cb](const boost::system::error_code &ec) { timer->async_wait([timer, cb](const boost::system::error_code &ec) {
if (ec) { if (ec) {
debug::Log("Error in RunAfter: {}", ec.message()); debug::Log("Error in runAfter: {}", ec.message());
return; return;
} }

View file

@ -58,7 +58,7 @@ TwitchChannel::TwitchChannel(const QString &channelName, Communi::IrcConnection
auto account = app->accounts->Twitch.getCurrent(); auto account = app->accounts->Twitch.getCurrent();
if (account && !account->getUserId().isEmpty()) { if (account && !account->getUserId().isEmpty()) {
app->twitch.pubsub->ListenToChannelModerationActions(this->roomID, account); app->twitch.pubsub->listenToChannelModerationActions(this->roomID, account);
} }
}; };