From 1c827f6288e32e397418c7815f8123ab6c0acb52 Mon Sep 17 00:00:00 2001 From: nerix Date: Wed, 20 Nov 2024 22:29:47 +0100 Subject: [PATCH] chore: use condition variable to shutdown websocket pools (#5721) --- CHANGELOG.md | 1 + src/CMakeLists.txt | 2 + .../liveupdates/BasicPubSubManager.hpp | 48 +++++++--- src/providers/twitch/PubSubManager.cpp | 50 +++++++---- src/providers/twitch/PubSubManager.hpp | 3 + src/util/OnceFlag.cpp | 33 +++++++ src/util/OnceFlag.hpp | 41 +++++++++ tests/CMakeLists.txt | 1 + tests/src/OnceFlag.cpp | 88 +++++++++++++++++++ 9 files changed, 237 insertions(+), 30 deletions(-) create mode 100644 src/util/OnceFlag.cpp create mode 100644 src/util/OnceFlag.hpp create mode 100644 tests/src/OnceFlag.cpp diff --git a/CHANGELOG.md b/CHANGELOG.md index 9b3d80fd5..7428d23d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -131,6 +131,7 @@ - Dev: Unified parsing of historic and live IRC messages. (#5678) - Dev: 7TV's `entitlement.reset` is now explicitly ignored. (#5685) - Dev: Qt 6.8 and later now default to the GDI fontengine. (#5710) +- Dev: Moved to condition variables when shutting down worker threads. (#5721) ## 2.5.1 diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 04b863c79..61c19e011 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -514,6 +514,8 @@ set(SOURCE_FILES util/LayoutHelper.hpp util/LoadPixmap.cpp util/LoadPixmap.hpp + util/OnceFlag.cpp + util/OnceFlag.hpp util/RapidjsonHelpers.cpp util/RapidjsonHelpers.hpp util/RatelimitBucket.cpp diff --git a/src/providers/liveupdates/BasicPubSubManager.hpp b/src/providers/liveupdates/BasicPubSubManager.hpp index e8d95b528..299c84bc0 100644 --- a/src/providers/liveupdates/BasicPubSubManager.hpp +++ b/src/providers/liveupdates/BasicPubSubManager.hpp @@ -8,10 +8,12 @@ #include "providers/twitch/PubSubHelpers.hpp" #include "util/DebugCount.hpp" #include "util/ExponentialBackoff.hpp" +#include "util/OnceFlag.hpp" #include "util/RenameThread.hpp" #include #include +#include #include #include #include @@ -120,6 +122,11 @@ public: this->work_ = std::make_shared( this->websocketClient_.get_io_service()); this->mainThread_.reset(new std::thread([this] { + // make sure we set in any case, even exceptions + auto guard = qScopeGuard([&] { + this->stoppedFlag_.set(); + }); + runThread(); })); @@ -142,22 +149,34 @@ public: this->work_.reset(); - if (this->mainThread_->joinable()) + if (!this->mainThread_->joinable()) { - // NOTE: We spawn a new thread to join the websocket thread. - // There is a case where a new client was initiated but not added to the clients list. - // We just don't join the thread & let the operating system nuke the thread if joining fails - // within 1s. - auto joiner = std::async(std::launch::async, &std::thread::join, - this->mainThread_.get()); - if (joiner.wait_for(std::chrono::seconds(1)) == - std::future_status::timeout) - { - qCWarning(chatterinoLiveupdates) - << "Thread didn't join within 1 second, rip it out"; - this->websocketClient_.stop(); - } + return; } + + // NOTE: + // There is a case where a new client was initiated but not added to the clients list. + // We just don't join the thread & let the operating system nuke the thread if joining fails + // within 1s. + if (this->stoppedFlag_.waitFor(std::chrono::seconds{1})) + { + this->mainThread_->join(); + return; + } + + qCWarning(chatterinoLiveupdates) + << "Thread didn't finish within 1 second, force-stop the client"; + this->websocketClient_.stop(); + if (this->stoppedFlag_.waitFor(std::chrono::milliseconds{100})) + { + this->mainThread_->join(); + return; + } + + qCWarning(chatterinoLiveupdates) + << "Thread didn't finish after stopping, discard it"; + // detach the thread so the destructor doesn't attempt any joining + this->mainThread_->detach(); } protected: @@ -394,6 +413,7 @@ private: liveupdates::WebsocketClient websocketClient_; std::unique_ptr mainThread_; + OnceFlag stoppedFlag_; const QString host_; diff --git a/src/providers/twitch/PubSubManager.cpp b/src/providers/twitch/PubSubManager.cpp index 9a2f03848..ba41a7b50 100644 --- a/src/providers/twitch/PubSubManager.cpp +++ b/src/providers/twitch/PubSubManager.cpp @@ -15,10 +15,10 @@ #include "util/RenameThread.hpp" #include +#include #include #include -#include #include #include #include @@ -560,6 +560,11 @@ void PubSub::start() this->work = std::make_shared( this->websocketClient.get_io_service()); this->thread = std::make_unique([this] { + // make sure we set in any case, even exceptions + auto guard = qScopeGuard([&] { + this->stoppedFlag_.set(); + }); + runThread(); }); renameThread(*this->thread, "PubSub"); @@ -578,23 +583,36 @@ void PubSub::stop() this->work.reset(); - if (this->thread->joinable()) + if (!this->thread->joinable()) { - // NOTE: We spawn a new thread to join the websocket thread. - // There is a case where a new client was initiated but not added to the clients list. - // We just don't join the thread & let the operating system nuke the thread if joining fails - // within 1s. - // We could fix the underlying bug, but this is easier & we realistically won't use this exact code - // for super much longer. - auto joiner = std::async(std::launch::async, &std::thread::join, - this->thread.get()); - if (joiner.wait_for(1s) == std::future_status::timeout) - { - qCWarning(chatterinoPubSub) - << "Thread didn't join within 1 second, rip it out"; - this->websocketClient.stop(); - } + return; } + + // NOTE: + // There is a case where a new client was initiated but not added to the clients list. + // We just don't join the thread & let the operating system nuke the thread if joining fails + // within 1s. + // We could fix the underlying bug, but this is easier & we realistically won't use this exact code + // for super much longer. + if (this->stoppedFlag_.waitFor(std::chrono::seconds{1})) + { + this->thread->join(); + return; + } + + qCWarning(chatterinoLiveupdates) + << "Thread didn't finish within 1 second, force-stop the client"; + this->websocketClient.stop(); + if (this->stoppedFlag_.waitFor(std::chrono::milliseconds{100})) + { + this->thread->join(); + return; + } + + qCWarning(chatterinoLiveupdates) + << "Thread didn't finish after stopping, discard it"; + // detach the thread so the destructor doesn't attempt any joining + this->thread->detach(); } bool PubSub::listenToWhispers() diff --git a/src/providers/twitch/PubSubManager.hpp b/src/providers/twitch/PubSubManager.hpp index f14eabf77..bfa228719 100644 --- a/src/providers/twitch/PubSubManager.hpp +++ b/src/providers/twitch/PubSubManager.hpp @@ -3,6 +3,7 @@ #include "providers/twitch/PubSubClientOptions.hpp" #include "providers/twitch/PubSubWebsocket.hpp" #include "util/ExponentialBackoff.hpp" +#include "util/OnceFlag.hpp" #include #include @@ -267,6 +268,8 @@ private: const QString host_; const PubSubClientOptions clientOptions_; + OnceFlag stoppedFlag_; + bool stopping_{false}; #ifdef FRIEND_TEST diff --git a/src/util/OnceFlag.cpp b/src/util/OnceFlag.cpp new file mode 100644 index 000000000..cb42ea5ab --- /dev/null +++ b/src/util/OnceFlag.cpp @@ -0,0 +1,33 @@ +#include "util/OnceFlag.hpp" + +namespace chatterino { + +OnceFlag::OnceFlag() = default; +OnceFlag::~OnceFlag() = default; + +void OnceFlag::set() +{ + { + std::unique_lock guard(this->mutex); + this->flag = true; + } + this->condvar.notify_all(); +} + +bool OnceFlag::waitFor(std::chrono::milliseconds ms) +{ + std::unique_lock lock(this->mutex); + return this->condvar.wait_for(lock, ms, [this] { + return this->flag; + }); +} + +void OnceFlag::wait() +{ + std::unique_lock lock(this->mutex); + this->condvar.wait(lock, [this] { + return this->flag; + }); +} + +} // namespace chatterino diff --git a/src/util/OnceFlag.hpp b/src/util/OnceFlag.hpp new file mode 100644 index 000000000..4060dff29 --- /dev/null +++ b/src/util/OnceFlag.hpp @@ -0,0 +1,41 @@ +#pragma once + +#include +#include +#include + +namespace chatterino { + +/// @brief A flag that can only be set once which notifies waiters. +/// +/// This can be used to synchronize with other threads. Note that waiting +/// threads will be suspended. +class OnceFlag +{ +public: + OnceFlag(); + ~OnceFlag(); + + /// Set this flag and notify waiters + void set(); + + /// @brief Wait for at most `ms` until this flag is set. + /// + /// The calling thread will be suspended during the wait. + /// + /// @param ms The maximum time to wait for this flag + /// @returns `true` if this flag was set during the wait or before + bool waitFor(std::chrono::milliseconds ms); + + /// @brief Wait until this flag is set by another thread + /// + /// The calling thread will be suspended during the wait. + void wait(); + +private: + std::mutex mutex; + std::condition_variable condvar; + bool flag = false; +}; + +} // namespace chatterino diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 547f0e7c0..06349c0e0 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -51,6 +51,7 @@ set(test_SOURCES ${CMAKE_CURRENT_LIST_DIR}/src/Plugins.cpp ${CMAKE_CURRENT_LIST_DIR}/src/TwitchIrc.cpp ${CMAKE_CURRENT_LIST_DIR}/src/IgnoreController.cpp + ${CMAKE_CURRENT_LIST_DIR}/src/OnceFlag.cpp ${CMAKE_CURRENT_LIST_DIR}/src/lib/Snapshot.cpp ${CMAKE_CURRENT_LIST_DIR}/src/lib/Snapshot.hpp # Add your new file above this line! diff --git a/tests/src/OnceFlag.cpp b/tests/src/OnceFlag.cpp new file mode 100644 index 000000000..0b0dded29 --- /dev/null +++ b/tests/src/OnceFlag.cpp @@ -0,0 +1,88 @@ +#include "util/OnceFlag.hpp" + +#include "Test.hpp" + +#include + +using namespace chatterino; + +// this test shouldn't time out (no assert necessary) +TEST(OnceFlag, basic) +{ + OnceFlag startedFlag; + OnceFlag startedAckFlag; + OnceFlag stoppedFlag; + + std::thread t([&] { + startedFlag.set(); + startedAckFlag.wait(); + std::this_thread::sleep_for(std::chrono::milliseconds{50}); + stoppedFlag.set(); + }); + + startedFlag.wait(); + startedAckFlag.set(); + stoppedFlag.wait(); + + t.join(); +} + +TEST(OnceFlag, waitFor) +{ + OnceFlag startedFlag; + OnceFlag startedAckFlag; + OnceFlag stoppedFlag; + + std::thread t([&] { + startedFlag.set(); + startedAckFlag.wait(); + + std::this_thread::sleep_for(std::chrono::milliseconds{100}); + stoppedFlag.set(); + }); + + startedFlag.wait(); + startedAckFlag.set(); + + auto start = std::chrono::system_clock::now(); + ASSERT_TRUE(stoppedFlag.waitFor(std::chrono::milliseconds{200})); + auto stop = std::chrono::system_clock::now(); + + ASSERT_LT(stop - start, std::chrono::milliseconds{150}); + + start = std::chrono::system_clock::now(); + ASSERT_TRUE(stoppedFlag.waitFor(std::chrono::milliseconds{1000})); + stop = std::chrono::system_clock::now(); + + ASSERT_LT(stop - start, std::chrono::milliseconds{10}); + + start = std::chrono::system_clock::now(); + stoppedFlag.wait(); + stop = std::chrono::system_clock::now(); + + ASSERT_LT(stop - start, std::chrono::milliseconds{10}); + + t.join(); +} + +TEST(OnceFlag, waitForTimeout) +{ + OnceFlag startedFlag; + OnceFlag startedAckFlag; + OnceFlag stoppedFlag; + + std::thread t([&] { + startedFlag.set(); + startedAckFlag.wait(); + std::this_thread::sleep_for(std::chrono::milliseconds{100}); + stoppedFlag.set(); + }); + + startedFlag.wait(); + startedAckFlag.set(); + + ASSERT_FALSE(stoppedFlag.waitFor(std::chrono::milliseconds{25})); + stoppedFlag.wait(); + + t.join(); +}