perf: query fewer historical messages on reconnects (#5001)

Co-authored-by: Ruben Anders <ruben.anders@robotty.de>
Co-authored-by: pajlada <rasmus.karlsson@pajlada.com>
This commit is contained in:
iProdigy 2023-12-09 10:46:30 -08:00 committed by GitHub
parent 401e097d62
commit 13dc306506
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 171 additions and 77 deletions

View file

@ -85,6 +85,7 @@
- Dev: Move `clang-tidy` checker to its own CI job. (#4996)
- Dev: Refactored the Image Uploader feature. (#4971)
- Dev: Fixed deadlock and use-after-free in tests. (#4981)
- Dev: Load less message history upon reconnects. (#5001)
## 2.4.6

View file

@ -61,8 +61,6 @@ public:
pajlada::Signals::Signal<const std::vector<MessagePtr> &> filledInMessages;
pajlada::Signals::NoArgSignal destroyed;
pajlada::Signals::NoArgSignal displayNameChanged;
/// Invoked when AbstractIrcServer::onReadConnected occurs
pajlada::Signals::NoArgSignal connected;
Type getType() const;
const QString &getName() const;

View file

@ -5,6 +5,7 @@
#include "messages/LimitedQueueSnapshot.hpp"
#include "messages/Message.hpp"
#include "messages/MessageBuilder.hpp"
#include "providers/twitch/TwitchChannel.hpp"
#include <QCoreApplication>
@ -331,8 +332,6 @@ void AbstractIrcServer::onReadConnected(IrcConnection *connection)
{
chan->addMessage(connectedMsg);
}
chan->connected.invoke();
}
this->falloffCounter_ = 1;
@ -360,6 +359,11 @@ void AbstractIrcServer::onDisconnected()
}
chan->addMessage(disconnectedMsg);
if (auto *channel = dynamic_cast<TwitchChannel *>(chan.get()))
{
channel->markDisconnectedNow();
}
}
}

View file

@ -18,72 +18,84 @@ namespace chatterino::recentmessages {
using namespace recentmessages::detail;
void load(const QString &channelName, std::weak_ptr<Channel> channelPtr,
ResultCallback onLoaded, ErrorCallback onError)
void load(
const QString &channelName, std::weak_ptr<Channel> channelPtr,
ResultCallback onLoaded, ErrorCallback onError, const int limit,
const std::optional<std::chrono::time_point<std::chrono::system_clock>>
after,
const std::optional<std::chrono::time_point<std::chrono::system_clock>>
before,
const bool jitter)
{
qCDebug(LOG) << "Loading recent messages for" << channelName;
const auto url = constructRecentMessagesUrl(channelName);
const auto url =
constructRecentMessagesUrl(channelName, limit, after, before);
NetworkRequest(url)
.onSuccess([channelPtr, onLoaded](const auto &result) {
auto shared = channelPtr.lock();
if (!shared)
{
return;
}
qCDebug(LOG) << "Successfully loaded recent messages for"
<< shared->getName();
auto root = result.parseJson();
auto parsedMessages = parseRecentMessages(root);
// build the Communi messages into chatterino messages
auto builtMessages =
buildRecentMessages(parsedMessages, shared.get());
postToThread([shared = std::move(shared), root = std::move(root),
messages = std::move(builtMessages),
onLoaded]() mutable {
// Notify user about a possible gap in logs if it returned some messages
// but isn't currently joined to a channel
const auto errorCode = root.value("error_code").toString();
if (!errorCode.isEmpty())
const long delayMs = jitter ? std::rand() % 100 : 0;
QTimer::singleShot(delayMs, [=] {
NetworkRequest(url)
.onSuccess([channelPtr, onLoaded](const auto &result) {
auto shared = channelPtr.lock();
if (!shared)
{
qCDebug(LOG)
<< QString("Got error from API: error_code=%1, "
"channel=%2")
.arg(errorCode, shared->getName());
if (errorCode == "channel_not_joined" && !messages.empty())
{
shared->addMessage(makeSystemMessage(
"Message history service recovering, there may "
"be gaps in the message history."));
}
return;
}
onLoaded(messages);
});
})
.onError([channelPtr, onError](const NetworkResult &result) {
auto shared = channelPtr.lock();
if (!shared)
{
return;
}
qCDebug(LOG) << "Successfully loaded recent messages for"
<< shared->getName();
qCDebug(LOG) << "Failed to load recent messages for"
<< shared->getName();
auto root = result.parseJson();
auto parsedMessages = parseRecentMessages(root);
shared->addMessage(makeSystemMessage(
QStringLiteral(
"Message history service unavailable (Error: %1)")
.arg(result.formatError())));
// build the Communi messages into chatterino messages
auto builtMessages =
buildRecentMessages(parsedMessages, shared.get());
onError();
})
.execute();
postToThread([shared = std::move(shared),
root = std::move(root),
messages = std::move(builtMessages),
onLoaded]() mutable {
// Notify user about a possible gap in logs if it returned some messages
// but isn't currently joined to a channel
const auto errorCode = root.value("error_code").toString();
if (!errorCode.isEmpty())
{
qCDebug(LOG)
<< QString("Got error from API: error_code=%1, "
"channel=%2")
.arg(errorCode, shared->getName());
if (errorCode == "channel_not_joined" &&
!messages.empty())
{
shared->addMessage(makeSystemMessage(
"Message history service recovering, there may "
"be gaps in the message history."));
}
}
onLoaded(messages);
});
})
.onError([channelPtr, onError](const NetworkResult &result) {
auto shared = channelPtr.lock();
if (!shared)
{
return;
}
qCDebug(LOG) << "Failed to load recent messages for"
<< shared->getName();
shared->addMessage(makeSystemMessage(
QStringLiteral(
"Message history service unavailable (Error: %1)")
.arg(result.formatError())));
onError();
})
.execute();
});
}
} // namespace chatterino::recentmessages

View file

@ -2,8 +2,10 @@
#include <QString>
#include <chrono>
#include <functional>
#include <memory>
#include <optional>
#include <vector>
namespace chatterino {
@ -28,8 +30,16 @@ using ErrorCallback = std::function<void()>;
* @param channelPtr Weak pointer to Channel to use to build messages
* @param onLoaded Callback taking the built messages as a const std::vector<MessagePtr> &
* @param onError Callback called when the network request fails
* @param limit Maximum number of messages to query
* @param after Only return messages that were received after this timestamp; ignored if `std::nullopt`
* @param before Only return messages that were received before this timestamp; ignored if `std::nullopt`
* @param jitter Whether to delay the request by a small random duration
*/
void load(const QString &channelName, std::weak_ptr<Channel> channelPtr,
ResultCallback onLoaded, ErrorCallback onError);
void load(
const QString &channelName, std::weak_ptr<Channel> channelPtr,
ResultCallback onLoaded, ErrorCallback onError, int limit,
std::optional<std::chrono::time_point<std::chrono::system_clock>> after,
std::optional<std::chrono::time_point<std::chrono::system_clock>> before,
bool jitter);
} // namespace chatterino::recentmessages

View file

@ -5,7 +5,6 @@
#include "providers/twitch/IrcMessageHandler.hpp"
#include "providers/twitch/TwitchChannel.hpp"
#include "providers/twitch/TwitchMessageBuilder.hpp"
#include "singletons/Settings.hpp"
#include "util/FormatTime.hpp"
#include <QJsonArray>
@ -94,14 +93,34 @@ std::vector<MessagePtr> buildRecentMessages(
// Returns the URL to be used for querying the Recent Messages API for the
// given channel.
QUrl constructRecentMessagesUrl(const QString &name)
QUrl constructRecentMessagesUrl(
const QString &name, const int limit,
const std::optional<std::chrono::time_point<std::chrono::system_clock>>
after,
const std::optional<std::chrono::time_point<std::chrono::system_clock>>
before)
{
QUrl url(Env::get().recentMessagesApiUrl.arg(name));
QUrlQuery urlQuery(url);
if (!urlQuery.hasQueryItem("limit"))
{
urlQuery.addQueryItem("limit", QString::number(limit));
}
if (after.has_value())
{
urlQuery.addQueryItem(
"limit", QString::number(getSettings()->twitchMessageHistoryLimit));
"after", QString::number(
std::chrono::duration_cast<std::chrono::milliseconds>(
after->time_since_epoch())
.count()));
}
if (before.has_value())
{
urlQuery.addQueryItem(
"before", QString::number(
std::chrono::duration_cast<std::chrono::milliseconds>(
before->time_since_epoch())
.count()));
}
url.setQuery(urlQuery);
return url;

View file

@ -8,7 +8,9 @@
#include <QString>
#include <QUrl>
#include <chrono>
#include <memory>
#include <optional>
#include <vector>
namespace chatterino::recentmessages::detail {
@ -24,6 +26,9 @@ std::vector<MessagePtr> buildRecentMessages(
// Returns the URL to be used for querying the Recent Messages API for the
// given channel.
QUrl constructRecentMessagesUrl(const QString &name);
QUrl constructRecentMessagesUrl(
const QString &name, int limit,
std::optional<std::chrono::time_point<std::chrono::system_clock>> after,
std::optional<std::chrono::time_point<std::chrono::system_clock>> before);
} // namespace chatterino::recentmessages::detail

View file

@ -1136,6 +1136,7 @@ void IrcMessageHandler::handleJoinMessage(Communi::IrcMessage *message)
getApp()->accounts->twitch.getCurrent()->getUserName())
{
twitchChannel->addMessage(makeSystemMessage("joined channel"));
twitchChannel->joined.invoke();
}
else if (getSettings()->showJoins.getValue())
{

View file

@ -103,17 +103,12 @@ TwitchChannel::TwitchChannel(const QString &name)
// We can safely ignore this signal connection this has no external dependencies - once the signal
// is destroyed, it will no longer be able to fire
std::ignore = this->connected.connect([this]() {
if (this->roomId().isEmpty())
std::ignore = this->joined.connect([this]() {
if (this->disconnectedAt_.has_value())
{
// If we get a reconnected event when the room id is not set, we
// just connected for the first time. After receiving the first
// message from a channel, setRoomId is called and further
// invocations of this event will load recent messages.
return;
this->loadRecentMessagesReconnect();
this->disconnectedAt_ = std::nullopt;
}
this->loadRecentMessagesReconnect();
});
// timers
@ -1111,6 +1106,24 @@ bool TwitchChannel::setLive(bool newLiveStatus)
return true;
}
void TwitchChannel::markDisconnectedNow()
{
if (this->roomId().isEmpty())
{
// we were never joined in the first place
return;
}
if (this->disconnectedAt_.has_value())
{
// don't overwrite prior timestamp since
// a reconnection hasn't happened yet
return;
}
this->disconnectedAt_ = std::chrono::system_clock::now();
}
void TwitchChannel::loadRecentMessages()
{
if (!getSettings()->loadTwitchMessageHistoryOnConnect)
@ -1163,7 +1176,9 @@ void TwitchChannel::loadRecentMessages()
return;
tc->loadingRecentMessages_.clear();
});
},
getSettings()->twitchMessageHistoryLimit.getValue(), std::nullopt,
std::nullopt, false);
}
void TwitchChannel::loadRecentMessagesReconnect()
@ -1178,6 +1193,21 @@ void TwitchChannel::loadRecentMessagesReconnect()
return; // already loading
}
const auto now = std::chrono::system_clock::now();
int limit = getSettings()->twitchMessageHistoryLimit.getValue();
if (this->disconnectedAt_.has_value())
{
// calculate how many messages could have occured
// while we were not connected to the channel
// assuming a maximum of 10 messages per second
const auto secondsSinceDisconnect =
std::chrono::duration_cast<std::chrono::seconds>(
now - this->disconnectedAt_.value())
.count();
limit =
std::min(static_cast<int>(secondsSinceDisconnect + 1) * 10, limit);
}
auto weak = weakOf<Channel>(this);
recentmessages::load(
this->getName(), weak,
@ -1203,7 +1233,8 @@ void TwitchChannel::loadRecentMessagesReconnect()
return;
tc->loadingRecentMessages_.clear();
});
},
limit, this->disconnectedAt_, now, true);
}
void TwitchChannel::refreshPubSub()

View file

@ -136,6 +136,12 @@ public:
SharedAccessGuard<const RoomModes> accessRoomModes() const;
SharedAccessGuard<const StreamStatus> accessStreamStatus() const;
/**
* Records the current timestamp the channel was disconnected.
* This can be used to calculate the time spent disconnected after a successful reconnect
*/
void markDisconnectedNow();
// Emotes
std::optional<EmotePtr> bttvEmote(const EmoteName &name) const;
std::optional<EmotePtr> ffzEmote(const EmoteName &name) const;
@ -200,6 +206,11 @@ public:
*/
std::shared_ptr<MessageThread> getOrCreateThread(const MessagePtr &message);
/**
* This signal fires when the local user has joined the channel
**/
pajlada::Signals::NoArgSignal joined;
// Only TwitchChannel may invoke this signal
pajlada::Signals::NoArgSignal userStateChanged;
@ -353,6 +364,8 @@ private:
int chatterCount_{};
UniqueAccess<StreamStatus> streamStatus_;
UniqueAccess<RoomModes> roomModes_;
std::optional<std::chrono::time_point<std::chrono::system_clock>>
disconnectedAt_{};
std::atomic_flag loadingRecentMessages_ = ATOMIC_FLAG_INIT;
std::unordered_map<QString, std::weak_ptr<MessageThread>> threads_;