chore: use condition variable to shutdown websocket pools (#5721)

This commit is contained in:
nerix 2024-11-20 22:29:47 +01:00 committed by GitHub
parent 19f449866e
commit 1c827f6288
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 237 additions and 30 deletions

View file

@ -131,6 +131,7 @@
- Dev: Unified parsing of historic and live IRC messages. (#5678)
- Dev: 7TV's `entitlement.reset` is now explicitly ignored. (#5685)
- Dev: Qt 6.8 and later now default to the GDI fontengine. (#5710)
- Dev: Moved to condition variables when shutting down worker threads. (#5721)
## 2.5.1

View file

@ -514,6 +514,8 @@ set(SOURCE_FILES
util/LayoutHelper.hpp
util/LoadPixmap.cpp
util/LoadPixmap.hpp
util/OnceFlag.cpp
util/OnceFlag.hpp
util/RapidjsonHelpers.cpp
util/RapidjsonHelpers.hpp
util/RatelimitBucket.cpp

View file

@ -8,10 +8,12 @@
#include "providers/twitch/PubSubHelpers.hpp"
#include "util/DebugCount.hpp"
#include "util/ExponentialBackoff.hpp"
#include "util/OnceFlag.hpp"
#include "util/RenameThread.hpp"
#include <pajlada/signals/signal.hpp>
#include <QJsonObject>
#include <QScopeGuard>
#include <QString>
#include <QStringBuilder>
#include <websocketpp/client.hpp>
@ -120,6 +122,11 @@ public:
this->work_ = std::make_shared<boost::asio::io_service::work>(
this->websocketClient_.get_io_service());
this->mainThread_.reset(new std::thread([this] {
// make sure we set in any case, even exceptions
auto guard = qScopeGuard([&] {
this->stoppedFlag_.set();
});
runThread();
}));
@ -142,22 +149,34 @@ public:
this->work_.reset();
if (this->mainThread_->joinable())
if (!this->mainThread_->joinable())
{
// NOTE: We spawn a new thread to join the websocket thread.
// There is a case where a new client was initiated but not added to the clients list.
// We just don't join the thread & let the operating system nuke the thread if joining fails
// within 1s.
auto joiner = std::async(std::launch::async, &std::thread::join,
this->mainThread_.get());
if (joiner.wait_for(std::chrono::seconds(1)) ==
std::future_status::timeout)
{
qCWarning(chatterinoLiveupdates)
<< "Thread didn't join within 1 second, rip it out";
this->websocketClient_.stop();
}
return;
}
// NOTE:
// There is a case where a new client was initiated but not added to the clients list.
// We just don't join the thread & let the operating system nuke the thread if joining fails
// within 1s.
if (this->stoppedFlag_.waitFor(std::chrono::seconds{1}))
{
this->mainThread_->join();
return;
}
qCWarning(chatterinoLiveupdates)
<< "Thread didn't finish within 1 second, force-stop the client";
this->websocketClient_.stop();
if (this->stoppedFlag_.waitFor(std::chrono::milliseconds{100}))
{
this->mainThread_->join();
return;
}
qCWarning(chatterinoLiveupdates)
<< "Thread didn't finish after stopping, discard it";
// detach the thread so the destructor doesn't attempt any joining
this->mainThread_->detach();
}
protected:
@ -394,6 +413,7 @@ private:
liveupdates::WebsocketClient websocketClient_;
std::unique_ptr<std::thread> mainThread_;
OnceFlag stoppedFlag_;
const QString host_;

View file

@ -15,10 +15,10 @@
#include "util/RenameThread.hpp"
#include <QJsonArray>
#include <QScopeGuard>
#include <algorithm>
#include <exception>
#include <future>
#include <iostream>
#include <memory>
#include <thread>
@ -560,6 +560,11 @@ void PubSub::start()
this->work = std::make_shared<boost::asio::io_service::work>(
this->websocketClient.get_io_service());
this->thread = std::make_unique<std::thread>([this] {
// make sure we set in any case, even exceptions
auto guard = qScopeGuard([&] {
this->stoppedFlag_.set();
});
runThread();
});
renameThread(*this->thread, "PubSub");
@ -578,23 +583,36 @@ void PubSub::stop()
this->work.reset();
if (this->thread->joinable())
if (!this->thread->joinable())
{
// NOTE: We spawn a new thread to join the websocket thread.
// There is a case where a new client was initiated but not added to the clients list.
// We just don't join the thread & let the operating system nuke the thread if joining fails
// within 1s.
// We could fix the underlying bug, but this is easier & we realistically won't use this exact code
// for super much longer.
auto joiner = std::async(std::launch::async, &std::thread::join,
this->thread.get());
if (joiner.wait_for(1s) == std::future_status::timeout)
{
qCWarning(chatterinoPubSub)
<< "Thread didn't join within 1 second, rip it out";
this->websocketClient.stop();
}
return;
}
// NOTE:
// There is a case where a new client was initiated but not added to the clients list.
// We just don't join the thread & let the operating system nuke the thread if joining fails
// within 1s.
// We could fix the underlying bug, but this is easier & we realistically won't use this exact code
// for super much longer.
if (this->stoppedFlag_.waitFor(std::chrono::seconds{1}))
{
this->thread->join();
return;
}
qCWarning(chatterinoLiveupdates)
<< "Thread didn't finish within 1 second, force-stop the client";
this->websocketClient.stop();
if (this->stoppedFlag_.waitFor(std::chrono::milliseconds{100}))
{
this->thread->join();
return;
}
qCWarning(chatterinoLiveupdates)
<< "Thread didn't finish after stopping, discard it";
// detach the thread so the destructor doesn't attempt any joining
this->thread->detach();
}
bool PubSub::listenToWhispers()

View file

@ -3,6 +3,7 @@
#include "providers/twitch/PubSubClientOptions.hpp"
#include "providers/twitch/PubSubWebsocket.hpp"
#include "util/ExponentialBackoff.hpp"
#include "util/OnceFlag.hpp"
#include <boost/asio/io_service.hpp>
#include <boost/asio/ssl/context.hpp>
@ -267,6 +268,8 @@ private:
const QString host_;
const PubSubClientOptions clientOptions_;
OnceFlag stoppedFlag_;
bool stopping_{false};
#ifdef FRIEND_TEST

33
src/util/OnceFlag.cpp Normal file
View file

@ -0,0 +1,33 @@
#include "util/OnceFlag.hpp"
namespace chatterino {
OnceFlag::OnceFlag() = default;
OnceFlag::~OnceFlag() = default;
void OnceFlag::set()
{
{
std::unique_lock guard(this->mutex);
this->flag = true;
}
this->condvar.notify_all();
}
bool OnceFlag::waitFor(std::chrono::milliseconds ms)
{
std::unique_lock lock(this->mutex);
return this->condvar.wait_for(lock, ms, [this] {
return this->flag;
});
}
void OnceFlag::wait()
{
std::unique_lock lock(this->mutex);
this->condvar.wait(lock, [this] {
return this->flag;
});
}
} // namespace chatterino

41
src/util/OnceFlag.hpp Normal file
View file

@ -0,0 +1,41 @@
#pragma once
#include <chrono>
#include <condition_variable>
#include <mutex>
namespace chatterino {
/// @brief A flag that can only be set once which notifies waiters.
///
/// This can be used to synchronize with other threads. Note that waiting
/// threads will be suspended.
class OnceFlag
{
public:
OnceFlag();
~OnceFlag();
/// Set this flag and notify waiters
void set();
/// @brief Wait for at most `ms` until this flag is set.
///
/// The calling thread will be suspended during the wait.
///
/// @param ms The maximum time to wait for this flag
/// @returns `true` if this flag was set during the wait or before
bool waitFor(std::chrono::milliseconds ms);
/// @brief Wait until this flag is set by another thread
///
/// The calling thread will be suspended during the wait.
void wait();
private:
std::mutex mutex;
std::condition_variable condvar;
bool flag = false;
};
} // namespace chatterino

View file

@ -51,6 +51,7 @@ set(test_SOURCES
${CMAKE_CURRENT_LIST_DIR}/src/Plugins.cpp
${CMAKE_CURRENT_LIST_DIR}/src/TwitchIrc.cpp
${CMAKE_CURRENT_LIST_DIR}/src/IgnoreController.cpp
${CMAKE_CURRENT_LIST_DIR}/src/OnceFlag.cpp
${CMAKE_CURRENT_LIST_DIR}/src/lib/Snapshot.cpp
${CMAKE_CURRENT_LIST_DIR}/src/lib/Snapshot.hpp
# Add your new file above this line!

88
tests/src/OnceFlag.cpp Normal file
View file

@ -0,0 +1,88 @@
#include "util/OnceFlag.hpp"
#include "Test.hpp"
#include <thread>
using namespace chatterino;
// this test shouldn't time out (no assert necessary)
TEST(OnceFlag, basic)
{
OnceFlag startedFlag;
OnceFlag startedAckFlag;
OnceFlag stoppedFlag;
std::thread t([&] {
startedFlag.set();
startedAckFlag.wait();
std::this_thread::sleep_for(std::chrono::milliseconds{50});
stoppedFlag.set();
});
startedFlag.wait();
startedAckFlag.set();
stoppedFlag.wait();
t.join();
}
TEST(OnceFlag, waitFor)
{
OnceFlag startedFlag;
OnceFlag startedAckFlag;
OnceFlag stoppedFlag;
std::thread t([&] {
startedFlag.set();
startedAckFlag.wait();
std::this_thread::sleep_for(std::chrono::milliseconds{100});
stoppedFlag.set();
});
startedFlag.wait();
startedAckFlag.set();
auto start = std::chrono::system_clock::now();
ASSERT_TRUE(stoppedFlag.waitFor(std::chrono::milliseconds{200}));
auto stop = std::chrono::system_clock::now();
ASSERT_LT(stop - start, std::chrono::milliseconds{150});
start = std::chrono::system_clock::now();
ASSERT_TRUE(stoppedFlag.waitFor(std::chrono::milliseconds{1000}));
stop = std::chrono::system_clock::now();
ASSERT_LT(stop - start, std::chrono::milliseconds{10});
start = std::chrono::system_clock::now();
stoppedFlag.wait();
stop = std::chrono::system_clock::now();
ASSERT_LT(stop - start, std::chrono::milliseconds{10});
t.join();
}
TEST(OnceFlag, waitForTimeout)
{
OnceFlag startedFlag;
OnceFlag startedAckFlag;
OnceFlag stoppedFlag;
std::thread t([&] {
startedFlag.set();
startedAckFlag.wait();
std::this_thread::sleep_for(std::chrono::milliseconds{100});
stoppedFlag.set();
});
startedFlag.wait();
startedAckFlag.set();
ASSERT_FALSE(stoppedFlag.waitFor(std::chrono::milliseconds{25}));
stoppedFlag.wait();
t.join();
}