⛏️ index : haiku.git

/*
 * Copyright 2012 Haiku, Inc. All rights reserved.
 * Distributed under the terms of the MIT License.
 *
 * Authors:
 *		Paweł Dziepak, pdziepak@quarnos.org
 */


#include "WorkQueue.h"

#include <io_requests.h>


#define	MAX_BUFFER_SIZE			(1024 * 1024)

WorkQueue*		gWorkQueue		= NULL;


void
WorkQueueEntry::Dump(void (*xprintf)(const char*, ...)) const
{
	if (fType == DelegationRecall) {
		xprintf("\tType: DelegationRecall\n");
		DelegationRecallArgs* args = reinterpret_cast<DelegationRecallArgs*>(fArguments);
		xprintf("\t\tDelegation at %p for Inode at %p (ino %" B_PRIdINO "), truncate %d\n",
			args->fDelegation, args->fDelegation->GetInode(), args->fDelegation->GetInode()->ID(),
			args->fTruncate);
	} else if (fType == IORequest) {
		xprintf("\tType: IORequest\n");
		IORequestArgs* args = reinterpret_cast<IORequestArgs*>(fArguments);
		xprintf("\t\tFor Inode at %p (ino %" B_PRIdINO ")\n", args->fInode, args->fInode->ID());
		xprintf("\t\twrite %d, offset %" B_PRIdOFF ", length %" B_PRIdOFF "\n",
			io_request_is_write(args->fRequest), io_request_offset(args->fRequest),
			io_request_length(args->fRequest));
	}

	return;
}


WorkQueue::WorkQueue()
	:
	fQueueSemaphore(create_sem(0, NULL)),
	fThreadCancel(create_sem(0, NULL))
{
	mutex_init(&fQueueLock, NULL);

	fThread = spawn_kernel_thread(&WorkQueue::LaunchWorkingThread,
		"NFSv4 Work Queue", B_NORMAL_PRIORITY, this);
	if (fThread < B_OK) {
		fInitError = fThread;
		return;
	}

	status_t result = resume_thread(fThread);
	if (result != B_OK) {
		kill_thread(fThread);
		fInitError = result;
		return;
	}

	fInitError = B_OK;
}


WorkQueue::~WorkQueue()
{
	release_sem(fThreadCancel);

	status_t result;
	wait_for_thread(fThread, &result);

	mutex_destroy(&fQueueLock);
	delete_sem(fThreadCancel);
	delete_sem(fQueueSemaphore);
}


status_t
WorkQueue::EnqueueJob(JobType type, void* args)
{
	WorkQueueEntry* entry = new(std::nothrow) WorkQueueEntry;
	if (entry == NULL)
		return B_NO_MEMORY;

	entry->fType = type;
	entry->fArguments = args;
	if (type == IORequest)
		reinterpret_cast<IORequestArgs*>(args)->fInode->BeginAIOOp();

	MutexLocker locker(fQueueLock);
	fQueue.InsertAfter(fQueue.Tail(), entry);
	locker.Unlock();

	release_sem(fQueueSemaphore);
	return B_OK;
}


void
WorkQueue::Dump(void (*xprintf)(const char*, ...))
{
	xprintf("WorkQueue\n");

	if (xprintf != kprintf) {
		status_t status = mutex_trylock(&fQueueLock);
		if (status != B_OK) {
			xprintf("\t Locked\n");
			return;
		}
	}

	_DumpLocked(xprintf);

	if (xprintf != kprintf)
		mutex_unlock(&fQueueLock);

	return;
}


status_t
WorkQueue::LaunchWorkingThread(void* object)
{
	ASSERT(object != NULL);

	WorkQueue* queue = reinterpret_cast<WorkQueue*>(object);
	return queue->WorkingThread();
}


status_t
WorkQueue::WorkingThread()
{
	while (true) {
		object_wait_info object[2];
		object[0].object = fThreadCancel;
		object[0].type = B_OBJECT_TYPE_SEMAPHORE;
		object[0].events = B_EVENT_ACQUIRE_SEMAPHORE;

		object[1].object = fQueueSemaphore;
		object[1].type = B_OBJECT_TYPE_SEMAPHORE;
		object[1].events = B_EVENT_ACQUIRE_SEMAPHORE;

		status_t result = wait_for_objects(object, 2);

		if (result < B_OK
			|| (object[0].events & B_EVENT_ACQUIRE_SEMAPHORE) != 0) {
			return result;
		} else if ((object[1].events & B_EVENT_ACQUIRE_SEMAPHORE) == 0)
			continue;

		acquire_sem(fQueueSemaphore);

		DequeueJob();
	}

	return B_OK;
}


void
WorkQueue::DequeueJob()
{
	MutexLocker locker(fQueueLock);
	WorkQueueEntry* entry = fQueue.RemoveHead();
	locker.Unlock();
	ASSERT(entry != NULL);

	void* args = entry->fArguments;
	switch (entry->fType) {
		case DelegationRecall:
			JobRecall(reinterpret_cast<DelegationRecallArgs*>(args));
			break;
		case IORequest:
			JobIO(reinterpret_cast<IORequestArgs*>(args));
			break;
	}

	delete entry;
}


void
WorkQueue::JobRecall(DelegationRecallArgs* args)
{
	ASSERT(args != NULL);

	Inode* inode = args->fDelegation->GetInode();
	if (inode->AIOIncomplete()) {
		// Re-queue and try again later.
		WorkQueueEntry* entry = new(std::nothrow) WorkQueueEntry;
		if (entry == NULL)
			return;

		entry->fType = DelegationRecall;
		entry->fArguments = args;

		// The queue is already locked.
		fQueue.InsertAfter(fQueue.Tail(), entry);
	} else {
		args->fDelegation->GetInode()->RecallDelegationAsync(args->fTruncate);
	}

	return;
}


void
WorkQueue::JobIO(IORequestArgs* args)
{
	ASSERT(args != NULL);

	uint64 offset = io_request_offset(args->fRequest);
	uint64 length = io_request_length(args->fRequest);

	size_t bufferLength = min_c(MAX_BUFFER_SIZE, length);
	char* buffer = reinterpret_cast<char*>(malloc(bufferLength));
	if (buffer == NULL) {
		notify_io_request(args->fRequest, B_NO_MEMORY);
		args->fInode->EndAIOOp();
		return;
	}

	status_t result;
	if (io_request_is_write(args->fRequest)) {
		if (offset + length > args->fInode->MaxFileSize())
				length = args->fInode->MaxFileSize() - offset;

		uint64 position = 0;
		do {
			size_t size = 0;
			size_t thisBufferLength = min_c(bufferLength, length - position);

			result = read_from_io_request(args->fRequest, buffer,
				thisBufferLength);

			while (size < thisBufferLength && result == B_OK) {
				size_t bytesWritten = thisBufferLength - size;
				result = args->fInode->WriteDirect(NULL,
					offset + position + size, buffer + size, &bytesWritten);
				size += bytesWritten;
			}

			position += thisBufferLength;
		} while (position < length && result == B_OK);
	} else {
		bool eof = false;
		uint64 position = 0;
		do {
			size_t size = 0;
			size_t thisBufferLength = min_c(bufferLength, length - position);

			do {
				size_t bytesRead = thisBufferLength - size;
				result = args->fInode->ReadDirect(NULL,
					offset + position + size, buffer + size, &bytesRead, &eof);
				if (result != B_OK)
					break;

				result = write_to_io_request(args->fRequest, buffer + size,
					bytesRead);
				if (result != B_OK)
					break;

				size += bytesRead;
			} while (size < length && result == B_OK && !eof);

			position += thisBufferLength;
		} while (position < length && result == B_OK && !eof);
	}

	free(buffer);

	notify_io_request(args->fRequest, result);
	args->fInode->EndAIOOp();
}


void
WorkQueue::_DumpLocked(void (*xprintf)(const char*, ...)) const
{
	uint64 entries = 0;
	for (DoublyLinkedList<WorkQueueEntry>::ConstIterator it = fQueue.GetIterator();
		const WorkQueueEntry* entry = it.Next(); ++entries) {
		entry->Dump(xprintf);
	}

	if (entries == 0)
		xprintf("\tEmpty\n");

	return;
}