refactor io queue to use mutex. I feel this is the best solution given the irql requirements for each of the potential reporting threads at the irql they will parse the report at, even though there is a potential fro delay. Will soon implement work items that the reports are queued to, allowing execution to return once report is queued.

This commit is contained in:
donnaskiez 2024-05-12 17:26:36 +10:00
parent 15ef3a1d75
commit ee658477f8
5 changed files with 42 additions and 74 deletions

View file

@ -185,9 +185,9 @@ typedef struct _DEFERRED_REPORT {
} DEFERRED_REPORT, *PDEFERRED_REPORT;
typedef struct _DEFERRED_REPORTS_LIST {
LIST_ENTRY head;
UINT32 count;
KSPIN_LOCK lock;
LIST_ENTRY head;
UINT32 count;
KGUARDED_MUTEX lock;
} DEFERRED_REPORTS_LIST, *PDEFERRED_REPORTS_LIST;
@ -200,9 +200,8 @@ typedef struct _IRP_QUEUE_HEAD {
volatile UINT32 total_irps_completed;
volatile UINT32 total_heartbeats_completed;
IO_CSQ csq;
KSPIN_LOCK lock;
KGUARDED_MUTEX lock;
DEFERRED_REPORTS_LIST deferred_reports;
KDPC dpc[EVENT_COUNT];
} IRP_QUEUE_HEAD, *PIRP_QUEUE_HEAD;
@ -271,7 +270,7 @@ typedef struct _ACTIVE_SESSION {
};
HEARTBEAT_CONFIGURATION heartbeat_config;
KSPIN_LOCK lock;
KGUARDED_MUTEX lock;
} ACTIVE_SESSION, *PACTIVE_SESSION;

View file

@ -88,14 +88,14 @@ STATIC
VOID
IrpQueueAcquireLock(_In_ PIO_CSQ Csq, _Out_ PKIRQL Irql)
{
KeAcquireSpinLock(&GetIrpQueueHead()->lock, Irql);
KeAcquireGuardedMutex(&GetActiveSession()->lock);
}
STATIC
VOID
IrpQueueReleaseLock(_In_ PIO_CSQ Csq, _In_ KIRQL Irql)
{
KeReleaseSpinLock(&GetIrpQueueHead()->lock, Irql);
KeReleaseGuardedMutex(&GetActiveSession()->lock);
}
STATIC
@ -204,7 +204,9 @@ IrpQueueQueryPendingPackets(_In_ PIRP Irp)
* leading to a bugcheck in the subsequent call to
* CompleteDeferredReport.
*/
KeAcquireSpinLock(&GetIrpQueueHead()->deferred_reports.lock, &irql);
KeAcquireGuardedMutex(&queue->deferred_reports.lock);
DEBUG_INFO("deferred packet count: %lx", queue->deferred_reports.count);
if (IrpQueueIsThereDeferredPackets(queue)) {
report = IrpQueueRemoveDeferredPacket(queue);
@ -219,7 +221,7 @@ IrpQueueQueryPendingPackets(_In_ PIRP Irp)
}
end:
KeReleaseSpinLock(&GetIrpQueueHead()->deferred_reports.lock, irql);
KeReleaseGuardedMutex(&queue->deferred_reports.lock);
return status;
}
@ -266,7 +268,6 @@ IrpQueueDeferPacket(_In_ PIRP_QUEUE_HEAD Queue,
_In_ UINT32 BufferSize)
{
PDEFERRED_REPORT report = NULL;
KIRQL irql = {0};
/*
* arbitrary number, if we ever do have 100 deferred reports, theres
* probably a catastrophic error somewhere else
@ -281,10 +282,10 @@ IrpQueueDeferPacket(_In_ PIRP_QUEUE_HEAD Queue,
if (!report)
return;
KeAcquireSpinLock(&GetIrpQueueHead()->deferred_reports.lock, &irql);
KeAcquireGuardedMutex(&Queue->deferred_reports.lock);
InsertTailList(&Queue->deferred_reports.head, &report->list_entry);
Queue->deferred_reports.count++;
KeReleaseSpinLock(&GetIrpQueueHead()->deferred_reports.lock, irql);
KeReleaseGuardedMutex(&Queue->deferred_reports.lock);
}
/*
@ -335,26 +336,6 @@ IrpQueueCompletePacket(_In_ PVOID Buffer, _In_ ULONG BufferSize)
return status;
}
STATIC
VOID
IrpQueueSchedulePacketDpc(_In_ struct _KDPC* Dpc,
_In_opt_ PVOID DeferredContext,
_In_opt_ PVOID SystemArgument1,
_In_opt_ PVOID SystemArgument2)
{
UNREFERENCED_PARAMETER(Dpc);
UNREFERENCED_PARAMETER(DeferredContext);
if (!ARGUMENT_PRESENT(SystemArgument1) ||
!ARGUMENT_PRESENT(SystemArgument2))
return;
PVOID buffer = SystemArgument1;
UINT32 buffer_length = (UINT32)SystemArgument2;
IrpQueueCompletePacket(buffer, buffer_length);
}
/*
* Not only does this allow reporting threads to continue execution once the
* report is scheduled (which in some cases such as handle reporting is very
@ -366,16 +347,7 @@ IrpQueueSchedulePacketDpc(_In_ struct _KDPC* Dpc,
VOID
IrpQueueSchedulePacket(_In_ PVOID Buffer, _In_ UINT32 BufferLength)
{
PIRP_QUEUE_HEAD queue = GetIrpQueueHead();
/* Maybe not the best implementation, but 99.9999% of the time there should
* be a dpc available.*/
while (TRUE) {
for (UINT32 index = 0; index < EVENT_COUNT; index++) {
if (KeInsertQueueDpc(&queue->dpc[index], Buffer, BufferLength))
return;
}
}
IrpQueueCompletePacket(Buffer, BufferLength);
}
STATIC
@ -387,14 +359,14 @@ IrpQueueFreeDeferredPackets()
KIRQL irql = 0;
/* just in case... */
KeAcquireSpinLock(&GetIrpQueueHead()->deferred_reports.lock, &irql);
KeAcquireGuardedMutex(&queue->deferred_reports.lock);
while (IrpQueueIsThereDeferredPackets(queue)) {
report = IrpQueueRemoveDeferredPacket(queue);
IrpQueueFreeDeferredPacket(report);
}
KeReleaseSpinLock(&GetIrpQueueHead()->deferred_reports.lock, irql);
KeReleaseGuardedMutex(&queue->deferred_reports.lock);
}
NTSTATUS
@ -403,15 +375,11 @@ IrpQueueInitialise()
NTSTATUS status = STATUS_UNSUCCESSFUL;
PIRP_QUEUE_HEAD queue = GetIrpQueueHead();
KeInitializeSpinLock(&queue->lock);
KeInitializeSpinLock(&queue->deferred_reports.lock);
KeInitializeGuardedMutex(&queue->lock);
KeInitializeGuardedMutex(&queue->deferred_reports.lock);
InitializeListHead(&queue->queue);
InitializeListHead(&queue->deferred_reports.head);
for (UINT32 index = 0; index < EVENT_COUNT; index++) {
KeInitializeDpc(&queue->dpc[index], IrpQueueSchedulePacketDpc, NULL);
}
status = IoCsqInitialize(&queue->csq,
IrpQueueInsert,
IrpQueueRemove,

View file

@ -9,7 +9,7 @@ SessionInitialiseStructure()
NTSTATUS status = STATUS_UNSUCCESSFUL;
PACTIVE_SESSION session = GetActiveSession();
KeInitializeSpinLock(&session->lock);
KeInitializeGuardedMutex(&session->lock);
status = CryptInitialiseProvider();
@ -28,34 +28,34 @@ SessionInitialiseCallbackConfiguration()
VOID
SessionIsActive(_Out_ PBOOLEAN Flag)
{
KIRQL irql = KeAcquireSpinLockRaiseToDpc(&GetActiveSession()->lock);
*Flag = GetActiveSession()->is_session_active;
KeReleaseSpinLock(&GetActiveSession()->lock, irql);
KeAcquireGuardedMutex(&GetActiveSession()->lock);
*Flag = GetActiveSession()->is_session_active;
KeReleaseGuardedMutex(&GetActiveSession()->lock);
}
VOID
SessionGetProcess(_Out_ PEPROCESS* Process)
{
KIRQL irql = KeAcquireSpinLockRaiseToDpc(&GetActiveSession()->lock);
*Process = GetActiveSession()->process;
KeReleaseSpinLock(&GetActiveSession()->lock, irql);
KeAcquireGuardedMutex(&GetActiveSession()->lock);
*Process = GetActiveSession()->process;
KeReleaseGuardedMutex(&GetActiveSession()->lock);
}
VOID
SessionGetProcessId(_Out_ PLONG ProcessId)
{
KIRQL irql = KeAcquireSpinLockRaiseToDpc(&GetActiveSession()->lock);
KeAcquireGuardedMutex(&GetActiveSession()->lock);
*ProcessId = GetActiveSession()->km_handle;
KeReleaseSpinLock(&GetActiveSession()->lock, irql);
KeReleaseGuardedMutex(&GetActiveSession()->lock);
}
VOID
SessionGetCallbackConfiguration(
_Out_ POB_CALLBACKS_CONFIG* CallbackConfiguration)
{
KIRQL irql = KeAcquireSpinLockRaiseToDpc(&GetActiveSession()->lock);
KeAcquireGuardedMutex(&GetActiveSession()->lock);
*CallbackConfiguration = &GetActiveSession()->callback_configuration;
KeReleaseSpinLock(&GetActiveSession()->lock, irql);
KeReleaseGuardedMutex(&GetActiveSession()->lock);
}
STATIC
@ -73,14 +73,14 @@ SessionTerminate()
PACTIVE_SESSION session = GetActiveSession();
KIRQL irql = {0};
KeAcquireSpinLock(&session->lock, &irql);
KeAcquireGuardedMutex(&session->lock);
session->km_handle = NULL;
session->um_handle = NULL;
session->process = NULL;
session->is_session_active = FALSE;
SessionTerminateHeartbeat(&session->heartbeat_config);
CryptCloseSessionCryptObjects();
KeReleaseSpinLock(&GetActiveSession()->lock, irql);
KeReleaseGuardedMutex(&session->lock);
}
NTSTATUS
@ -103,7 +103,7 @@ SessionInitialise(_In_ PIRP Irp)
initiation = (PSESSION_INITIATION_PACKET)Irp->AssociatedIrp.SystemBuffer;
KeAcquireSpinLock(&session->lock, &irql);
KeAcquireGuardedMutex(&session->lock);
session->um_handle = initiation->process_id;
@ -138,7 +138,7 @@ SessionInitialise(_In_ PIRP Irp)
}
end:
KeReleaseSpinLock(&GetActiveSession()->lock, irql);
KeReleaseGuardedMutex(&session->lock);
return status;
}
@ -175,23 +175,23 @@ SessionTerminateProcess()
VOID
SessionIncrementIrpsProcessedCount()
{
KIRQL irql = KeAcquireSpinLockRaiseToDpc(&GetActiveSession()->lock);
KeAcquireGuardedMutex(&GetActiveSession()->lock);
GetActiveSession()->irps_received;
KeReleaseSpinLock(&GetActiveSession()->lock, irql);
KeReleaseGuardedMutex(&GetActiveSession()->lock);
}
VOID
SessionIncrementReportCount()
{
KIRQL irql = KeAcquireSpinLockRaiseToDpc(&GetActiveSession()->lock);
KeAcquireGuardedMutex(&GetActiveSession()->lock);
GetActiveSession()->report_count++;
KeReleaseSpinLock(&GetActiveSession()->lock, irql);
KeReleaseGuardedMutex(&GetActiveSession()->lock);
}
VOID
SessionIncrementHeartbeatCount()
{
KIRQL irql = KeAcquireSpinLockRaiseToDpc(&GetActiveSession()->lock);
KeAcquireGuardedMutex(&GetActiveSession()->lock);
GetActiveSession()->heartbeat_count++;
KeReleaseSpinLock(&GetActiveSession()->lock, irql);
KeReleaseGuardedMutex(&GetActiveSession()->lock);
}

View file

@ -56,8 +56,8 @@ void dispatcher::dispatcher::run() {
this->run_io_port_thread();
thread_pool.queue_job([this]() { k_interface.run_completion_port(); });
while (true) {
// this->issue_kernel_job();
this->k_interface.initiate_apc_stackwalk();
LOG_INFO("issueing kernel job!");
this->issue_kernel_job();
helper::sleep_thread(DISPATCH_LOOP_SLEEP_TIME);
}
}

View file

@ -65,6 +65,7 @@ void kernel_interface::kernel_interface::initiate_completion_port() {
for (int index = 0; index < EVENT_COUNT; index++) {
send_pending_irp();
}
LOG_INFO("Finished initialising completion port.");
}
void kernel_interface::kernel_interface::release_event_object(