Automatically load recent messages on reconnect (#3878)

* Add working reconnect recent messages

* Rename method to messagesUpdated

* Use audo declarations

* Add docs to new LimitedQueue methods

* Add more documentation, try atomic loading flag

* Update CHANGELOG.md

* Remove unused include

* Rename 'reconnected' signal to 'connected'

* Reserve before filtering on arbitrary update

* Extract recent messages fetching to own class

* Use std::atomic_flag instead of std::atomic_bool

* Add PostToThread include

* Add chatterino.recentmessages logging

* Remove unneeded parameters, lambda move capture

* Remove TwitchChannel::buildRecentMessages

* Add documentation, use more clear method name

* Reword changelog entry

I think it sounds better like this :)

* Rework how filling in missing messages is handled

This should hopefully prevent issues with filtered channels with old messages
that no longer exist in the underlying channel

* Check existing messages when looking for reply

* Clean up string distribution in file

* Try to improve documentation

* Use std::function for RecentMessagesApi

* Only trigger filledInMessages if we inserted

* Remove old unused lines

* Use make_shared<MessageLayout> instead of new MessageLayout

* Alphabetize QLogging categories

* Reorder CHANGELOG.md
This commit is contained in:
Daniel Sage 2022-08-06 12:18:34 -04:00 committed by GitHub
parent 2dd37ca210
commit 46f43f3ce8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 580 additions and 174 deletions

View file

@ -27,6 +27,7 @@
- Minor: Add Quick Switcher item to open a channel in a new popup window. (#3828)
- Minor: Reduced GIF frame window from 30ms to 20ms, causing fewer frame skips in animated emotes. (#3886)
- Minor: Warn when parsing an environment variable fails. (#3904)
- Minor: Load missing messages from Recent Messages API upon reconnecting (#3878)
- Bugfix: Fix crash that can occur when closing and quickly reopening a split, then running a command. (#3852)
- Bugfix: Connection to Twitch PubSub now recovers more reliably. (#3643, #3716)
- Bugfix: Fix crash that can occur when changing channels. (#3799)

View file

@ -171,6 +171,8 @@ set(SOURCE_FILES
providers/IvrApi.hpp
providers/LinkResolver.cpp
providers/LinkResolver.hpp
providers/RecentMessagesApi.cpp
providers/RecentMessagesApi.hpp
providers/bttv/BttvEmotes.cpp
providers/bttv/BttvEmotes.hpp
@ -734,7 +736,6 @@ if (LIBRT)
)
endif ()
# Configure compiler warnings
if (MSVC)
# 4714 - function marked as __forceinline not inlined

View file

@ -220,7 +220,7 @@ void Channel::disableAllMessages()
}
}
void Channel::addMessagesAtStart(std::vector<MessagePtr> &_messages)
void Channel::addMessagesAtStart(const std::vector<MessagePtr> &_messages)
{
std::vector<MessagePtr> addedMessages =
this->messages_.pushFront(_messages);
@ -231,6 +231,80 @@ void Channel::addMessagesAtStart(std::vector<MessagePtr> &_messages)
}
}
void Channel::fillInMissingMessages(const std::vector<MessagePtr> &messages)
{
auto snapshot = this->getMessageSnapshot();
std::unordered_set<QString> existingMessageIds;
existingMessageIds.reserve(snapshot.size());
// First, collect the ids of every message already present in the channel
for (auto &msg : snapshot)
{
if (msg->flags.has(MessageFlag::System) || msg->id.isEmpty())
{
continue;
}
existingMessageIds.insert(msg->id);
}
bool anyInserted = false;
// Keep track of the last message in the channel. We need this value
// to allow concurrent appends to the end of the channel while still
// being able to insert just-loaded historical messages at the end
// in the correct place.
auto lastMsg = snapshot[snapshot.size() - 1];
for (auto &msg : messages)
{
// check if message already exists
if (existingMessageIds.count(msg->id) != 0)
{
continue;
}
// If we get to this point, we know we'll be inserting a message
anyInserted = true;
bool insertedFlag = false;
for (auto &snapshotMsg : snapshot)
{
if (snapshotMsg->flags.has(MessageFlag::System))
{
continue;
}
if (msg->serverReceivedTime < snapshotMsg->serverReceivedTime)
{
// We found the first message that comes after the current message.
// Therefore, we can put the current message directly before. We
// assume that the messages we are filling in are in ascending
// order by serverReceivedTime.
this->messages_.insertBefore(snapshotMsg, msg);
insertedFlag = true;
break;
}
}
if (!insertedFlag)
{
// We never found a message already in the channel that came after
// the current message. Put it at the end and make sure to update
// which message is considered "the end".
this->messages_.insertAfter(lastMsg, msg);
lastMsg = msg;
}
}
if (anyInserted)
{
// We only invoke a signal once at the end of filling all messages to
// prevent doing any unnecessary repaints.
this->filledInMessages.invoke(messages);
}
}
void Channel::replaceMessage(MessagePtr message, MessagePtr replacement)
{
int index = this->messages_.replaceItem(message, replacement);

View file

@ -57,8 +57,12 @@ public:
messageAppended;
pajlada::Signals::Signal<std::vector<MessagePtr> &> messagesAddedAtStart;
pajlada::Signals::Signal<size_t, MessagePtr &> messageReplaced;
/// Invoked when some number of messages were filled in using time received
pajlada::Signals::Signal<const std::vector<MessagePtr> &> filledInMessages;
pajlada::Signals::NoArgSignal destroyed;
pajlada::Signals::NoArgSignal displayNameChanged;
/// Invoked when AbstractIrcServer::onReadConnected occurs
pajlada::Signals::NoArgSignal connected;
Type getType() const;
const QString &getName() const;
@ -75,12 +79,17 @@ public:
void addMessage(
MessagePtr message,
boost::optional<MessageFlags> overridingFlags = boost::none);
void addMessagesAtStart(std::vector<MessagePtr> &messages_);
void addMessagesAtStart(const std::vector<MessagePtr> &messages_);
/// Inserts the given messages in order by Message::serverReceivedTime.
void fillInMissingMessages(const std::vector<MessagePtr> &messages);
void addOrReplaceTimeout(MessagePtr message);
void disableAllMessages();
void replaceMessage(MessagePtr message, MessagePtr replacement);
void replaceMessage(size_t index, MessagePtr replacement);
void deleteMessage(QString messageID);
MessagePtr findMessage(QString messageID);
bool hasMessages() const;

View file

@ -16,6 +16,7 @@ Q_LOGGING_CATEGORY(chatterinoEmoji, "chatterino.emoji", logThreshold);
Q_LOGGING_CATEGORY(chatterinoEnv, "chatterino.env", logThreshold);
Q_LOGGING_CATEGORY(chatterinoFfzemotes, "chatterino.ffzemotes", logThreshold);
Q_LOGGING_CATEGORY(chatterinoHelper, "chatterino.helper", logThreshold);
Q_LOGGING_CATEGORY(chatterinoHighlights, "chatterino.highlights", logThreshold);
Q_LOGGING_CATEGORY(chatterinoHotkeys, "chatterino.hotkeys", logThreshold);
Q_LOGGING_CATEGORY(chatterinoHTTP, "chatterino.http", logThreshold);
Q_LOGGING_CATEGORY(chatterinoImage, "chatterino.image", logThreshold);
@ -30,6 +31,8 @@ Q_LOGGING_CATEGORY(chatterinoNotification, "chatterino.notification",
Q_LOGGING_CATEGORY(chatterinoNuulsuploader, "chatterino.nuulsuploader",
logThreshold);
Q_LOGGING_CATEGORY(chatterinoPubSub, "chatterino.pubsub", logThreshold);
Q_LOGGING_CATEGORY(chatterinoRecentMessages, "chatterino.recentmessages",
logThreshold);
Q_LOGGING_CATEGORY(chatterinoStreamlink, "chatterino.streamlink", logThreshold);
Q_LOGGING_CATEGORY(chatterinoStreamerMode, "chatterino.streamermode",
logThreshold);
@ -40,4 +43,3 @@ Q_LOGGING_CATEGORY(chatterinoWebsocket, "chatterino.websocket", logThreshold);
Q_LOGGING_CATEGORY(chatterinoWidget, "chatterino.widget", logThreshold);
Q_LOGGING_CATEGORY(chatterinoWindowmanager, "chatterino.windowmanager",
logThreshold);
Q_LOGGING_CATEGORY(chatterinoHighlights, "chatterino.highlights", logThreshold);

View file

@ -12,6 +12,7 @@ Q_DECLARE_LOGGING_CATEGORY(chatterinoEmoji);
Q_DECLARE_LOGGING_CATEGORY(chatterinoEnv);
Q_DECLARE_LOGGING_CATEGORY(chatterinoFfzemotes);
Q_DECLARE_LOGGING_CATEGORY(chatterinoHelper);
Q_DECLARE_LOGGING_CATEGORY(chatterinoHighlights);
Q_DECLARE_LOGGING_CATEGORY(chatterinoHotkeys);
Q_DECLARE_LOGGING_CATEGORY(chatterinoHTTP);
Q_DECLARE_LOGGING_CATEGORY(chatterinoImage);
@ -23,6 +24,7 @@ Q_DECLARE_LOGGING_CATEGORY(chatterinoNativeMessage);
Q_DECLARE_LOGGING_CATEGORY(chatterinoNotification);
Q_DECLARE_LOGGING_CATEGORY(chatterinoNuulsuploader);
Q_DECLARE_LOGGING_CATEGORY(chatterinoPubSub);
Q_DECLARE_LOGGING_CATEGORY(chatterinoRecentMessages);
Q_DECLARE_LOGGING_CATEGORY(chatterinoStreamlink);
Q_DECLARE_LOGGING_CATEGORY(chatterinoStreamerMode);
Q_DECLARE_LOGGING_CATEGORY(chatterinoTokenizer);
@ -31,4 +33,3 @@ Q_DECLARE_LOGGING_CATEGORY(chatterinoUpdate);
Q_DECLARE_LOGGING_CATEGORY(chatterinoWebsocket);
Q_DECLARE_LOGGING_CATEGORY(chatterinoWidget);
Q_DECLARE_LOGGING_CATEGORY(chatterinoWindowmanager);
Q_DECLARE_LOGGING_CATEGORY(chatterinoHighlights);

View file

@ -187,15 +187,18 @@ public:
*
* @param[in] needle the item to search for
* @param[in] replacement the item to replace needle with
* @tparam Equality function object to use for comparison
* @return the index of the replaced item, or -1 if no replacement took place
*/
template <typename Equals = std::equal_to<T>>
int replaceItem(const T &needle, const T &replacement)
{
std::unique_lock lock(this->mutex_);
Equals eq;
for (int i = 0; i < this->buffer_.size(); ++i)
{
if (this->buffer_[i] == needle)
if (eq(this->buffer_[i], needle))
{
this->buffer_[i] = replacement;
return i;
@ -224,6 +227,59 @@ public:
return true;
}
/**
* @brief Inserts the given item before another item
*
* @param[in] needle the item to use as positional reference
* @param[in] item the item to insert before needle
* @tparam Equality function object to use for comparison
* @return true if an insertion took place
*/
template <typename Equals = std::equal_to<T>>
bool insertBefore(const T &needle, const T &item)
{
std::unique_lock lock(this->mutex_);
Equals eq;
for (auto it = this->buffer_.begin(); it != this->buffer_.end(); ++it)
{
if (eq(*it, needle))
{
this->buffer_.insert(it, item);
return true;
}
}
return false;
}
/**
* @brief Inserts the given item after another item
*
* @param[in] needle the item to use as positional reference
* @param[in] item the item to insert after needle
* @tparam Equality function object to use for comparison
* @return true if an insertion took place
*/
template <typename Equals = std::equal_to<T>>
bool insertAfter(const T &needle, const T &item)
{
std::unique_lock lock(this->mutex_);
Equals eq;
for (auto it = this->buffer_.begin(); it != this->buffer_.end(); ++it)
{
if (eq(*it, needle))
{
++it; // advance to insert after it
this->buffer_.insert(it, item);
return true;
}
}
return false;
}
[[nodiscard]] LimitedQueueSnapshot<T> getSnapshot() const
{
std::shared_lock lock(this->mutex_);

View file

@ -0,0 +1,229 @@
#include "RecentMessagesApi.hpp"
#include "common/Channel.hpp"
#include "common/Common.hpp"
#include "common/Env.hpp"
#include "common/NetworkRequest.hpp"
#include "common/QLogging.hpp"
#include "providers/twitch/IrcMessageHandler.hpp"
#include "providers/twitch/TwitchChannel.hpp"
#include "providers/twitch/TwitchMessageBuilder.hpp"
#include "singletons/Settings.hpp"
#include "util/FormatTime.hpp"
#include "util/PostToThread.hpp"
#include <IrcMessage>
#include <QJsonArray>
#include <QJsonObject>
#include <QUrl>
namespace chatterino {
namespace {
// convertClearchatToNotice takes a Communi::IrcMessage that is a CLEARCHAT
// command and converts it to a readable NOTICE message. This has
// historically been done in the Recent Messages API, but this functionality
// has been moved to Chatterino instead.
auto convertClearchatToNotice(Communi::IrcMessage *message)
{
auto channelName = message->parameter(0);
QString noticeMessage{};
if (message->tags().contains("target-user-id"))
{
auto target = message->parameter(1);
if (message->tags().contains("ban-duration"))
{
// User was timed out
noticeMessage =
QString("%1 has been timed out for %2.")
.arg(target)
.arg(formatTime(
message->tag("ban-duration").toString()));
}
else
{
// User was permanently banned
noticeMessage =
QString("%1 has been permanently banned.").arg(target);
}
}
else
{
// Chat was cleared
noticeMessage = "Chat has been cleared by a moderator.";
}
// rebuild the raw IRC message so we can convert it back to an ircmessage again!
// this could probably be done in a smarter way
auto s = QString(":tmi.twitch.tv NOTICE %1 :%2")
.arg(channelName)
.arg(noticeMessage);
auto newMessage = Communi::IrcMessage::fromData(s.toUtf8(), nullptr);
newMessage->setTags(message->tags());
return newMessage;
}
// Parse the IRC messages returned in JSON form into Communi messages
std::vector<Communi::IrcMessage *> parseRecentMessages(
const QJsonObject &jsonRoot)
{
QJsonArray jsonMessages = jsonRoot.value("messages").toArray();
std::vector<Communi::IrcMessage *> messages;
if (jsonMessages.empty())
return messages;
for (const auto jsonMessage : jsonMessages)
{
auto content = jsonMessage.toString();
content.replace(COMBINED_FIXER, ZERO_WIDTH_JOINER);
auto message =
Communi::IrcMessage::fromData(content.toUtf8(), nullptr);
if (message->command() == "CLEARCHAT")
{
message = convertClearchatToNotice(message);
}
messages.emplace_back(std::move(message));
}
return messages;
}
// Build Communi messages retrieved from the recent messages API into
// proper chatterino messages.
std::vector<MessagePtr> buildRecentMessages(
std::vector<Communi::IrcMessage *> &messages, Channel *channel)
{
auto &handler = IrcMessageHandler::instance();
std::vector<MessagePtr> allBuiltMessages;
for (auto message : messages)
{
if (message->tags().contains("rm-received-ts"))
{
QDate msgDate =
QDateTime::fromMSecsSinceEpoch(
message->tags().value("rm-received-ts").toLongLong())
.date();
// Check if we need to insert a message stating that a new day began
if (msgDate != channel->lastDate_)
{
channel->lastDate_ = msgDate;
auto msg = makeSystemMessage(
QLocale().toString(msgDate, QLocale::LongFormat),
QTime(0, 0));
msg->flags.set(MessageFlag::RecentMessage);
allBuiltMessages.emplace_back(msg);
}
}
auto builtMessages = handler.parseMessageWithReply(
channel, message, allBuiltMessages);
for (auto builtMessage : builtMessages)
{
builtMessage->flags.set(MessageFlag::RecentMessage);
allBuiltMessages.emplace_back(builtMessage);
}
}
return allBuiltMessages;
}
// Returns the URL to be used for querying the Recent Messages API for the
// given channel.
QUrl constructRecentMessagesUrl(const QString &name)
{
QUrl url(Env::get().recentMessagesApiUrl.arg(name));
QUrlQuery urlQuery(url);
if (!urlQuery.hasQueryItem("limit"))
{
urlQuery.addQueryItem(
"limit",
QString::number(getSettings()->twitchMessageHistoryLimit));
}
url.setQuery(urlQuery);
return url;
}
} // namespace
void RecentMessagesApi::loadRecentMessages(const QString &channelName,
std::weak_ptr<Channel> channelPtr,
ResultCallback onLoaded,
ErrorCallback onError)
{
qCDebug(chatterinoRecentMessages)
<< "Loading recent messages for" << channelName;
QUrl url = constructRecentMessagesUrl(channelName);
NetworkRequest(url)
.onSuccess([channelPtr, onLoaded](NetworkResult result) -> Outcome {
auto shared = channelPtr.lock();
if (!shared)
return Failure;
qCDebug(chatterinoRecentMessages)
<< "Successfully loaded recent messages for"
<< shared->getName();
auto root = result.parseJson();
auto parsedMessages = parseRecentMessages(root);
// build the Communi messages into chatterino messages
auto builtMessages =
buildRecentMessages(parsedMessages, shared.get());
postToThread([shared = std::move(shared), root = std::move(root),
messages = std::move(builtMessages),
onLoaded]() mutable {
// Notify user about a possible gap in logs if it returned some messages
// but isn't currently joined to a channel
if (QString errorCode = root.value("error_code").toString();
!errorCode.isEmpty())
{
qCDebug(chatterinoRecentMessages)
<< QString("Got error from API: error_code=%1, "
"channel=%2")
.arg(errorCode, shared->getName());
if (errorCode == "channel_not_joined" && !messages.empty())
{
shared->addMessage(makeSystemMessage(
"Message history service recovering, there may "
"be gaps in the message history."));
}
}
onLoaded(messages);
});
return Success;
})
.onError([channelPtr, onError](NetworkResult result) {
auto shared = channelPtr.lock();
if (!shared)
return;
qCDebug(chatterinoRecentMessages)
<< "Failed to load recent messages for" << shared->getName();
shared->addMessage(makeSystemMessage(
QString("Message history service unavailable (Error %1)")
.arg(result.status())));
onError();
})
.execute();
}
} // namespace chatterino

View file

@ -0,0 +1,33 @@
#pragma once
#include "ForwardDecl.hpp"
#include <QString>
#include <functional>
#include <memory>
#include <vector>
namespace chatterino {
class RecentMessagesApi
{
public:
using ResultCallback = std::function<void(const std::vector<MessagePtr> &)>;
using ErrorCallback = std::function<void()>;
/**
* @brief Loads recent messages for a channel using the Recent Messages API
*
* @param channelName Name of Twitch channel
* @param channelPtr Weak pointer to Channel to use to build messages
* @param onLoaded Callback taking the built messages as a const std::vector<MessagePtr> &
* @param onError Callback called when the network request fails
*/
static void loadRecentMessages(const QString &channelName,
std::weak_ptr<Channel> channelPtr,
ResultCallback onLoaded,
ErrorCallback onError);
};
} // namespace chatterino

View file

@ -327,10 +327,13 @@ void AbstractIrcServer::onReadConnected(IrcConnection *connection)
if (replaceMessage)
{
chan->replaceMessage(snapshot[snapshot.size() - 1], reconnected);
continue;
}
else
{
chan->addMessage(connectedMsg);
}
chan->addMessage(connectedMsg);
chan->connected.invoke();
}
this->falloffCounter_ = 1;

View file

@ -304,6 +304,8 @@ void IrcMessageHandler::populateReply(
}
}
MessagePtr foundMessage;
// Thread does not yet exist, find root reply and create thread.
// Linear search is justified by the infrequent use of replies
for (auto &otherMsg : otherLoaded)
@ -311,15 +313,29 @@ void IrcMessageHandler::populateReply(
if (otherMsg->id == replyID)
{
// Found root reply message
std::shared_ptr<MessageThread> newThread =
std::make_shared<MessageThread>(otherMsg);
builder.setThread(newThread);
// Store weak reference to thread in channel
channel->addReplyThread(newThread);
foundMessage = otherMsg;
break;
}
}
if (!foundMessage)
{
// We didn't find the reply root message in the otherLoaded messages
// which are typically the already-parsed recent messages from the
// Recent Messages API. We could have a really old message that
// still exists being replied to, so check for that here.
foundMessage = channel->findMessage(replyID);
}
if (foundMessage)
{
std::shared_ptr<MessageThread> newThread =
std::make_shared<MessageThread>(foundMessage);
builder.setThread(newThread);
// Store weak reference to thread in channel
channel->addReplyThread(newThread);
}
}
}

View file

@ -7,6 +7,7 @@
#include "controllers/accounts/AccountController.hpp"
#include "controllers/notifications/NotificationController.hpp"
#include "messages/Message.hpp"
#include "providers/RecentMessagesApi.hpp"
#include "providers/bttv/BttvEmotes.hpp"
#include "providers/bttv/LoadBttvChannelEmote.hpp"
#include "providers/twitch/IrcMessageHandler.hpp"
@ -19,7 +20,6 @@
#include "singletons/Settings.hpp"
#include "singletons/Toasts.hpp"
#include "singletons/WindowManager.hpp"
#include "util/FormatTime.hpp"
#include "util/PostToThread.hpp"
#include "util/QStringHash.hpp"
#include "widgets/Window.hpp"
@ -48,79 +48,6 @@ namespace {
const QString LOGIN_PROMPT_TEXT("Click here to add your account again.");
const Link ACCOUNTS_LINK(Link::OpenAccountsPage, QString());
// convertClearchatToNotice takes a Communi::IrcMessage that is a CLEARCHAT command and converts it to a readable NOTICE message
// This has historically been done in the Recent Messages API, but this functionality is being moved to Chatterino instead
auto convertClearchatToNotice(Communi::IrcMessage *message)
{
auto channelName = message->parameter(0);
QString noticeMessage{};
if (message->tags().contains("target-user-id"))
{
auto target = message->parameter(1);
if (message->tags().contains("ban-duration"))
{
// User was timed out
noticeMessage =
QString("%1 has been timed out for %2.")
.arg(target)
.arg(formatTime(
message->tag("ban-duration").toString()));
}
else
{
// User was permanently banned
noticeMessage =
QString("%1 has been permanently banned.").arg(target);
}
}
else
{
// Chat was cleared
noticeMessage = "Chat has been cleared by a moderator.";
}
// rebuild the raw IRC message so we can convert it back to an ircmessage again!
// this could probably be done in a smarter way
auto s = QString(":tmi.twitch.tv NOTICE %1 :%2")
.arg(channelName)
.arg(noticeMessage);
auto newMessage = Communi::IrcMessage::fromData(s.toUtf8(), nullptr);
newMessage->setTags(message->tags());
return newMessage;
}
// parseRecentMessages takes a json object and returns a vector of
// Communi IrcMessages
auto parseRecentMessages(const QJsonObject &jsonRoot, ChannelPtr channel)
{
QJsonArray jsonMessages = jsonRoot.value("messages").toArray();
std::vector<Communi::IrcMessage *> messages;
if (jsonMessages.empty())
return messages;
for (const auto jsonMessage : jsonMessages)
{
auto content = jsonMessage.toString();
content.replace(COMBINED_FIXER, ZERO_WIDTH_JOINER);
auto message =
Communi::IrcMessage::fromData(content.toUtf8(), nullptr);
if (message->command() == "CLEARCHAT")
{
message = convertClearchatToNotice(message);
}
messages.emplace_back(std::move(message));
}
return messages;
}
std::pair<Outcome, std::unordered_set<QString>> parseChatters(
const QJsonObject &jsonRoot)
{
@ -143,6 +70,7 @@ namespace {
return {Success, std::move(usernames)};
}
} // namespace
TwitchChannel::TwitchChannel(const QString &name)
@ -181,6 +109,19 @@ TwitchChannel::TwitchChannel(const QString &name)
this->refreshBTTVChannelEmotes(false);
});
this->connected.connect([this]() {
if (this->roomId().isEmpty())
{
// If we get a reconnected event when the room id is not set, we
// just connected for the first time. After receiving the first
// message from a channel, setRoomId is called and further
// invocations of this event will load recent messages.
return;
}
this->loadRecentMessagesReconnect();
});
this->messageRemovedFromStart.connect([this](MessagePtr &msg) {
if (msg->replyThread)
{
@ -819,93 +760,77 @@ void TwitchChannel::loadRecentMessages()
return;
}
QUrl url(Env::get().recentMessagesApiUrl.arg(this->getName()));
QUrlQuery urlQuery(url);
if (!urlQuery.hasQueryItem("limit"))
if (this->loadingRecentMessages_.test_and_set())
{
urlQuery.addQueryItem(
"limit", QString::number(getSettings()->twitchMessageHistoryLimit));
return; // already loading
}
url.setQuery(urlQuery);
auto weak = weakOf<Channel>(this);
NetworkRequest(url)
.onSuccess([this, weak](NetworkResult result) -> Outcome {
auto shared = weak.lock();
if (!shared)
return Failure;
auto root = result.parseJson();
auto messages = parseRecentMessages(root, shared);
auto &handler = IrcMessageHandler::instance();
std::vector<MessagePtr> allBuiltMessages;
for (auto message : messages)
{
if (message->tags().contains("rm-received-ts"))
{
QDate msgDate = QDateTime::fromMSecsSinceEpoch(
message->tags()
.value("rm-received-ts")
.toLongLong())
.date();
if (msgDate != shared.get()->lastDate_)
{
shared.get()->lastDate_ = msgDate;
auto msg = makeSystemMessage(
QLocale().toString(msgDate, QLocale::LongFormat),
QTime(0, 0));
msg->flags.set(MessageFlag::RecentMessage);
allBuiltMessages.emplace_back(msg);
}
}
auto builtMessages = handler.parseMessageWithReply(
shared.get(), message, allBuiltMessages);
for (auto builtMessage : builtMessages)
{
builtMessage->flags.set(MessageFlag::RecentMessage);
allBuiltMessages.emplace_back(builtMessage);
}
}
postToThread([this, shared, root,
messages = std::move(allBuiltMessages)]() mutable {
shared->addMessagesAtStart(messages);
// Notify user about a possible gap in logs if it returned some messages
// but isn't currently joined to a channel
if (QString errorCode = root.value("error_code").toString();
!errorCode.isEmpty())
{
qCDebug(chatterinoTwitch)
<< QString("rm error_code=%1, channel=%2")
.arg(errorCode, this->getName());
if (errorCode == "channel_not_joined" && !messages.empty())
{
shared->addMessage(makeSystemMessage(
"Message history service recovering, there may be "
"gaps in the message history."));
}
}
});
return Success;
})
.onError([weak](NetworkResult result) {
RecentMessagesApi::loadRecentMessages(
this->getName(), weak,
[weak](const auto &messages) {
auto shared = weak.lock();
if (!shared)
return;
shared->addMessage(makeSystemMessage(
QString("Message history service unavailable (Error %1)")
.arg(result.status())));
})
.execute();
auto tc = dynamic_cast<TwitchChannel *>(shared.get());
if (!tc)
return;
tc->addMessagesAtStart(messages);
tc->loadingRecentMessages_.clear();
},
[weak]() {
auto shared = weak.lock();
if (!shared)
return;
auto tc = dynamic_cast<TwitchChannel *>(shared.get());
if (!tc)
return;
tc->loadingRecentMessages_.clear();
});
}
void TwitchChannel::loadRecentMessagesReconnect()
{
if (!getSettings()->loadTwitchMessageHistoryOnConnect)
{
return;
}
if (this->loadingRecentMessages_.test_and_set())
{
return; // already loading
}
auto weak = weakOf<Channel>(this);
RecentMessagesApi::loadRecentMessages(
this->getName(), weak,
[weak](const auto &messages) {
auto shared = weak.lock();
if (!shared)
return;
auto tc = dynamic_cast<TwitchChannel *>(shared.get());
if (!tc)
return;
tc->fillInMissingMessages(messages);
tc->loadingRecentMessages_.clear();
},
[weak]() {
auto shared = weak.lock();
if (!shared)
return;
auto tc = dynamic_cast<TwitchChannel *>(shared.get());
if (!tc)
return;
tc->loadingRecentMessages_.clear();
});
}
void TwitchChannel::refreshPubSub()

View file

@ -21,6 +21,7 @@
#include <boost/signals2.hpp>
#include <pajlada/signals/signalholder.hpp>
#include <atomic>
#include <mutex>
#include <unordered_map>
@ -163,6 +164,7 @@ private:
void refreshBadges();
void refreshCheerEmotes();
void loadRecentMessages();
void loadRecentMessagesReconnect();
void fetchDisplayName();
void cleanUpReplyThreads();
void showLoginMessage();
@ -188,6 +190,7 @@ private:
int chatterCount_;
UniqueAccess<StreamStatus> streamStatus_;
UniqueAccess<RoomModes> roomModes_;
std::atomic_flag loadingRecentMessages_ = ATOMIC_FLAG_INIT;
std::unordered_map<QString, std::weak_ptr<MessageThread>> threads_;
protected:

View file

@ -657,6 +657,17 @@ void ChannelView::setChannel(ChannelPtr underlyingChannel)
this->channel_->replaceMessage(index, replacement);
});
this->channelConnections_.managedConnect(
underlyingChannel->filledInMessages, [this](const auto &messages) {
std::vector<MessagePtr> filtered;
filtered.reserve(messages.size());
std::copy_if(messages.begin(), messages.end(),
std::back_inserter(filtered), [this](MessagePtr msg) {
return this->shouldIncludeMessage(msg);
});
this->channel_->fillInMissingMessages(filtered);
});
//
// Standard channel connections
//
@ -688,11 +699,17 @@ void ChannelView::setChannel(ChannelPtr underlyingChannel)
this->messageReplaced(index, replacement);
});
// on messages filled in
this->channelConnections_.managedConnect(this->channel_->filledInMessages,
[this](const auto &) {
this->messagesUpdated();
});
auto snapshot = underlyingChannel->getMessageSnapshot();
for (const auto &msg : snapshot)
{
auto messageLayout = new MessageLayout(msg);
auto messageLayout = std::make_shared<MessageLayout>(msg);
if (this->lastMessageHasAlternateBackground_)
{
@ -706,7 +723,7 @@ void ChannelView::setChannel(ChannelPtr underlyingChannel)
messageLayout->flags.set(MessageLayoutFlag::IgnoreHighlights);
}
this->messages_.pushBack(MessageLayoutPtr(messageLayout));
this->messages_.pushBack(messageLayout);
if (this->showScrollbarHighlights())
{
this->scrollBar_->addHighlight(msg->getScrollBarHighlight());
@ -787,7 +804,7 @@ void ChannelView::messageAppended(MessagePtr &message,
messageFlags = overridingFlags.get_ptr();
}
auto messageRef = new MessageLayout(message);
auto messageRef = std::make_shared<MessageLayout>(message);
if (this->lastMessageHasAlternateBackground_)
{
@ -812,7 +829,7 @@ void ChannelView::messageAppended(MessagePtr &message,
loop.exec();
}
if (this->messages_.pushBack(MessageLayoutPtr(messageRef)))
if (this->messages_.pushBack(messageRef))
{
if (this->paused())
{
@ -863,7 +880,7 @@ void ChannelView::messageAddedAtStart(std::vector<MessagePtr> &messages)
for (size_t i = 0; i < messages.size(); i++)
{
auto message = messages.at(i);
auto layout = new MessageLayout(message);
auto layout = std::make_shared<MessageLayout>(message);
// alternate color
if (!this->lastMessageHasAlternateBackgroundReverse_)
@ -871,7 +888,7 @@ void ChannelView::messageAddedAtStart(std::vector<MessagePtr> &messages)
this->lastMessageHasAlternateBackgroundReverse_ =
!this->lastMessageHasAlternateBackgroundReverse_;
messageRefs.at(i) = MessageLayoutPtr(layout);
messageRefs.at(i) = std::move(layout);
}
/// Add the messages at the start
@ -926,7 +943,7 @@ void ChannelView::messageReplaced(size_t index, MessagePtr &replacement)
auto message = *oMessage;
MessageLayoutPtr newItem(new MessageLayout(replacement));
auto newItem = std::make_shared<MessageLayout>(replacement);
if (message->flags.has(MessageLayoutFlag::AlternateBackground))
{
@ -940,6 +957,41 @@ void ChannelView::messageReplaced(size_t index, MessagePtr &replacement)
this->queueLayout();
}
void ChannelView::messagesUpdated()
{
auto snapshot = this->channel_->getMessageSnapshot();
this->messages_.clear();
this->scrollBar_->clearHighlights();
this->lastMessageHasAlternateBackground_ = false;
this->lastMessageHasAlternateBackgroundReverse_ = true;
for (const auto &msg : snapshot)
{
auto messageLayout = std::make_shared<MessageLayout>(msg);
if (this->lastMessageHasAlternateBackground_)
{
messageLayout->flags.set(MessageLayoutFlag::AlternateBackground);
}
this->lastMessageHasAlternateBackground_ =
!this->lastMessageHasAlternateBackground_;
if (this->channel_->shouldIgnoreHighlights())
{
messageLayout->flags.set(MessageLayoutFlag::IgnoreHighlights);
}
this->messages_.pushBack(messageLayout);
if (this->showScrollbarHighlights())
{
this->scrollBar_->addHighlight(msg->getScrollBarHighlight());
}
}
this->queueLayout();
}
void ChannelView::updateLastReadMessage()
{
if (auto lastMessage = this->messages_.last())

View file

@ -162,6 +162,7 @@ private:
void messageAddedAtStart(std::vector<MessagePtr> &messages);
void messageRemoveFromStart(MessagePtr &message);
void messageReplaced(size_t index, MessagePtr &replacement);
void messagesUpdated();
void performLayout(bool causedByScollbar = false);
void layoutVisibleMessages(