* Copyright 2012 Haiku, Inc. All rights reserved.
* Distributed under the terms of the MIT License.
*
* Authors:
* Paweł Dziepak, pdziepak@quarnos.org
*/
#include "WorkQueue.h"
#include <io_requests.h>
#define MAX_BUFFER_SIZE (1024 * 1024)
WorkQueue* gWorkQueue = NULL;
void
WorkQueueEntry::Dump(void (*xprintf)(const char*, ...)) const
{
if (fType == DelegationRecall) {
xprintf("\tType: DelegationRecall\n");
DelegationRecallArgs* args = reinterpret_cast<DelegationRecallArgs*>(fArguments);
xprintf("\t\tDelegation at %p for Inode at %p (ino %" B_PRIdINO "), truncate %d\n",
args->fDelegation, args->fDelegation->GetInode(), args->fDelegation->GetInode()->ID(),
args->fTruncate);
} else if (fType == IORequest) {
xprintf("\tType: IORequest\n");
IORequestArgs* args = reinterpret_cast<IORequestArgs*>(fArguments);
xprintf("\t\tFor Inode at %p (ino %" B_PRIdINO ")\n", args->fInode, args->fInode->ID());
xprintf("\t\twrite %d, offset %" B_PRIdOFF ", length %" B_PRIdOFF "\n",
io_request_is_write(args->fRequest), io_request_offset(args->fRequest),
io_request_length(args->fRequest));
}
return;
}
WorkQueue::WorkQueue()
:
fQueueSemaphore(create_sem(0, NULL)),
fThreadCancel(create_sem(0, NULL))
{
mutex_init(&fQueueLock, NULL);
fThread = spawn_kernel_thread(&WorkQueue::LaunchWorkingThread,
"NFSv4 Work Queue", B_NORMAL_PRIORITY, this);
if (fThread < B_OK) {
fInitError = fThread;
return;
}
status_t result = resume_thread(fThread);
if (result != B_OK) {
kill_thread(fThread);
fInitError = result;
return;
}
fInitError = B_OK;
}
WorkQueue::~WorkQueue()
{
release_sem(fThreadCancel);
status_t result;
wait_for_thread(fThread, &result);
mutex_destroy(&fQueueLock);
delete_sem(fThreadCancel);
delete_sem(fQueueSemaphore);
}
status_t
WorkQueue::EnqueueJob(JobType type, void* args)
{
WorkQueueEntry* entry = new(std::nothrow) WorkQueueEntry;
if (entry == NULL)
return B_NO_MEMORY;
entry->fType = type;
entry->fArguments = args;
if (type == IORequest)
reinterpret_cast<IORequestArgs*>(args)->fInode->BeginAIOOp();
MutexLocker locker(fQueueLock);
fQueue.InsertAfter(fQueue.Tail(), entry);
locker.Unlock();
release_sem(fQueueSemaphore);
return B_OK;
}
void
WorkQueue::Dump(void (*xprintf)(const char*, ...))
{
xprintf("WorkQueue\n");
if (xprintf != kprintf) {
status_t status = mutex_trylock(&fQueueLock);
if (status != B_OK) {
xprintf("\t Locked\n");
return;
}
}
_DumpLocked(xprintf);
if (xprintf != kprintf)
mutex_unlock(&fQueueLock);
return;
}
status_t
WorkQueue::LaunchWorkingThread(void* object)
{
ASSERT(object != NULL);
WorkQueue* queue = reinterpret_cast<WorkQueue*>(object);
return queue->WorkingThread();
}
status_t
WorkQueue::WorkingThread()
{
while (true) {
object_wait_info object[2];
object[0].object = fThreadCancel;
object[0].type = B_OBJECT_TYPE_SEMAPHORE;
object[0].events = B_EVENT_ACQUIRE_SEMAPHORE;
object[1].object = fQueueSemaphore;
object[1].type = B_OBJECT_TYPE_SEMAPHORE;
object[1].events = B_EVENT_ACQUIRE_SEMAPHORE;
status_t result = wait_for_objects(object, 2);
if (result < B_OK
|| (object[0].events & B_EVENT_ACQUIRE_SEMAPHORE) != 0) {
return result;
} else if ((object[1].events & B_EVENT_ACQUIRE_SEMAPHORE) == 0)
continue;
acquire_sem(fQueueSemaphore);
DequeueJob();
}
return B_OK;
}
void
WorkQueue::DequeueJob()
{
MutexLocker locker(fQueueLock);
WorkQueueEntry* entry = fQueue.RemoveHead();
locker.Unlock();
ASSERT(entry != NULL);
void* args = entry->fArguments;
switch (entry->fType) {
case DelegationRecall:
JobRecall(reinterpret_cast<DelegationRecallArgs*>(args));
break;
case IORequest:
JobIO(reinterpret_cast<IORequestArgs*>(args));
break;
}
delete entry;
}
void
WorkQueue::JobRecall(DelegationRecallArgs* args)
{
ASSERT(args != NULL);
Inode* inode = args->fDelegation->GetInode();
if (inode->AIOIncomplete()) {
WorkQueueEntry* entry = new(std::nothrow) WorkQueueEntry;
if (entry == NULL)
return;
entry->fType = DelegationRecall;
entry->fArguments = args;
fQueue.InsertAfter(fQueue.Tail(), entry);
} else {
args->fDelegation->GetInode()->RecallDelegationAsync(args->fTruncate);
}
return;
}
void
WorkQueue::JobIO(IORequestArgs* args)
{
ASSERT(args != NULL);
uint64 offset = io_request_offset(args->fRequest);
uint64 length = io_request_length(args->fRequest);
size_t bufferLength = min_c(MAX_BUFFER_SIZE, length);
char* buffer = reinterpret_cast<char*>(malloc(bufferLength));
if (buffer == NULL) {
notify_io_request(args->fRequest, B_NO_MEMORY);
args->fInode->EndAIOOp();
return;
}
status_t result;
if (io_request_is_write(args->fRequest)) {
if (offset + length > args->fInode->MaxFileSize())
length = args->fInode->MaxFileSize() - offset;
uint64 position = 0;
do {
size_t size = 0;
size_t thisBufferLength = min_c(bufferLength, length - position);
result = read_from_io_request(args->fRequest, buffer,
thisBufferLength);
while (size < thisBufferLength && result == B_OK) {
size_t bytesWritten = thisBufferLength - size;
result = args->fInode->WriteDirect(NULL,
offset + position + size, buffer + size, &bytesWritten);
size += bytesWritten;
}
position += thisBufferLength;
} while (position < length && result == B_OK);
} else {
bool eof = false;
uint64 position = 0;
do {
size_t size = 0;
size_t thisBufferLength = min_c(bufferLength, length - position);
do {
size_t bytesRead = thisBufferLength - size;
result = args->fInode->ReadDirect(NULL,
offset + position + size, buffer + size, &bytesRead, &eof);
if (result != B_OK)
break;
result = write_to_io_request(args->fRequest, buffer + size,
bytesRead);
if (result != B_OK)
break;
size += bytesRead;
} while (size < length && result == B_OK && !eof);
position += thisBufferLength;
} while (position < length && result == B_OK && !eof);
}
free(buffer);
notify_io_request(args->fRequest, result);
args->fInode->EndAIOOp();
}
void
WorkQueue::_DumpLocked(void (*xprintf)(const char*, ...)) const
{
uint64 entries = 0;
for (DoublyLinkedList<WorkQueueEntry>::ConstIterator it = fQueue.GetIterator();
const WorkQueueEntry* entry = it.Next(); ++entries) {
entry->Dump(xprintf);
}
if (entries == 0)
xprintf("\tEmpty\n");
return;
}