/* * Copyright 2012 Haiku, Inc. All rights reserved. * Distributed under the terms of the MIT License. * * Authors: * Paweł Dziepak, pdziepak@quarnos.org */ #include "WorkQueue.h" #include #define MAX_BUFFER_SIZE (1024 * 1024) WorkQueue* gWorkQueue = NULL; void WorkQueueEntry::Dump(void (*xprintf)(const char*, ...)) const { if (fType == DelegationRecall) { xprintf("\tType: DelegationRecall\n"); DelegationRecallArgs* args = reinterpret_cast(fArguments); xprintf("\t\tDelegation at %p for Inode at %p (ino %" B_PRIdINO "), truncate %d\n", args->fDelegation, args->fDelegation->GetInode(), args->fDelegation->GetInode()->ID(), args->fTruncate); } else if (fType == IORequest) { xprintf("\tType: IORequest\n"); IORequestArgs* args = reinterpret_cast(fArguments); xprintf("\t\tFor Inode at %p (ino %" B_PRIdINO ")\n", args->fInode, args->fInode->ID()); xprintf("\t\twrite %d, offset %" B_PRIdOFF ", length %" B_PRIdOFF "\n", io_request_is_write(args->fRequest), io_request_offset(args->fRequest), io_request_length(args->fRequest)); } return; } WorkQueue::WorkQueue() : fQueueSemaphore(create_sem(0, NULL)), fThreadCancel(create_sem(0, NULL)) { mutex_init(&fQueueLock, NULL); fThread = spawn_kernel_thread(&WorkQueue::LaunchWorkingThread, "NFSv4 Work Queue", B_NORMAL_PRIORITY, this); if (fThread < B_OK) { fInitError = fThread; return; } status_t result = resume_thread(fThread); if (result != B_OK) { kill_thread(fThread); fInitError = result; return; } fInitError = B_OK; } WorkQueue::~WorkQueue() { release_sem(fThreadCancel); status_t result; wait_for_thread(fThread, &result); mutex_destroy(&fQueueLock); delete_sem(fThreadCancel); delete_sem(fQueueSemaphore); } status_t WorkQueue::EnqueueJob(JobType type, void* args) { WorkQueueEntry* entry = new(std::nothrow) WorkQueueEntry; if (entry == NULL) return B_NO_MEMORY; entry->fType = type; entry->fArguments = args; if (type == IORequest) reinterpret_cast(args)->fInode->BeginAIOOp(); MutexLocker locker(fQueueLock); fQueue.InsertAfter(fQueue.Tail(), entry); locker.Unlock(); release_sem(fQueueSemaphore); return B_OK; } void WorkQueue::Dump(void (*xprintf)(const char*, ...)) { xprintf("WorkQueue\n"); if (xprintf != kprintf) { status_t status = mutex_trylock(&fQueueLock); if (status != B_OK) { xprintf("\t Locked\n"); return; } } _DumpLocked(xprintf); if (xprintf != kprintf) mutex_unlock(&fQueueLock); return; } status_t WorkQueue::LaunchWorkingThread(void* object) { ASSERT(object != NULL); WorkQueue* queue = reinterpret_cast(object); return queue->WorkingThread(); } status_t WorkQueue::WorkingThread() { while (true) { object_wait_info object[2]; object[0].object = fThreadCancel; object[0].type = B_OBJECT_TYPE_SEMAPHORE; object[0].events = B_EVENT_ACQUIRE_SEMAPHORE; object[1].object = fQueueSemaphore; object[1].type = B_OBJECT_TYPE_SEMAPHORE; object[1].events = B_EVENT_ACQUIRE_SEMAPHORE; status_t result = wait_for_objects(object, 2); if (result < B_OK || (object[0].events & B_EVENT_ACQUIRE_SEMAPHORE) != 0) { return result; } else if ((object[1].events & B_EVENT_ACQUIRE_SEMAPHORE) == 0) continue; acquire_sem(fQueueSemaphore); DequeueJob(); } return B_OK; } void WorkQueue::DequeueJob() { MutexLocker locker(fQueueLock); WorkQueueEntry* entry = fQueue.RemoveHead(); locker.Unlock(); ASSERT(entry != NULL); void* args = entry->fArguments; switch (entry->fType) { case DelegationRecall: JobRecall(reinterpret_cast(args)); break; case IORequest: JobIO(reinterpret_cast(args)); break; } delete entry; } void WorkQueue::JobRecall(DelegationRecallArgs* args) { ASSERT(args != NULL); Inode* inode = args->fDelegation->GetInode(); if (inode->AIOIncomplete()) { // Re-queue and try again later. WorkQueueEntry* entry = new(std::nothrow) WorkQueueEntry; if (entry == NULL) return; entry->fType = DelegationRecall; entry->fArguments = args; // The queue is already locked. fQueue.InsertAfter(fQueue.Tail(), entry); } else { args->fDelegation->GetInode()->RecallDelegationAsync(args->fTruncate); } return; } void WorkQueue::JobIO(IORequestArgs* args) { ASSERT(args != NULL); uint64 offset = io_request_offset(args->fRequest); uint64 length = io_request_length(args->fRequest); size_t bufferLength = min_c(MAX_BUFFER_SIZE, length); char* buffer = reinterpret_cast(malloc(bufferLength)); if (buffer == NULL) { notify_io_request(args->fRequest, B_NO_MEMORY); args->fInode->EndAIOOp(); return; } status_t result; if (io_request_is_write(args->fRequest)) { if (offset + length > args->fInode->MaxFileSize()) length = args->fInode->MaxFileSize() - offset; uint64 position = 0; do { size_t size = 0; size_t thisBufferLength = min_c(bufferLength, length - position); result = read_from_io_request(args->fRequest, buffer, thisBufferLength); while (size < thisBufferLength && result == B_OK) { size_t bytesWritten = thisBufferLength - size; result = args->fInode->WriteDirect(NULL, offset + position + size, buffer + size, &bytesWritten); size += bytesWritten; } position += thisBufferLength; } while (position < length && result == B_OK); } else { bool eof = false; uint64 position = 0; do { size_t size = 0; size_t thisBufferLength = min_c(bufferLength, length - position); do { size_t bytesRead = thisBufferLength - size; result = args->fInode->ReadDirect(NULL, offset + position + size, buffer + size, &bytesRead, &eof); if (result != B_OK) break; result = write_to_io_request(args->fRequest, buffer + size, bytesRead); if (result != B_OK) break; size += bytesRead; } while (size < length && result == B_OK && !eof); position += thisBufferLength; } while (position < length && result == B_OK && !eof); } free(buffer); notify_io_request(args->fRequest, result); args->fInode->EndAIOOp(); } void WorkQueue::_DumpLocked(void (*xprintf)(const char*, ...)) const { uint64 entries = 0; for (DoublyLinkedList::ConstIterator it = fQueue.GetIterator(); const WorkQueueEntry* entry = it.Next(); ++entries) { entry->Dump(xprintf); } if (entries == 0) xprintf("\tEmpty\n"); return; }