* Copyright 2008, Ingo Weinhold, ingo_weinhold@gmx.de.
* Distributed under the terms of the MIT License.
*/
#include "UnixFifo.h"
#include <new>
#include <AutoDeleter.h>
#include <net_stack.h>
#include <util/ring_buffer.h>
#include "unix.h"
#define UNIX_FIFO_DEBUG_LEVEL 0
#define UNIX_DEBUG_LEVEL UNIX_FIFO_DEBUG_LEVEL
#include "UnixDebug.h"
UnixRequest::UnixRequest(const iovec* vecs, size_t count,
ancillary_data_container* ancillaryData,
struct sockaddr_storage* address)
:
fVecs(vecs),
fVecCount(count),
fAncillaryData(ancillaryData),
fTotalSize(0),
fBytesTransferred(0),
fVecIndex(0),
fVecOffset(0),
fAddress(address)
{
for (size_t i = 0; i < fVecCount; i++)
fTotalSize += fVecs[i].iov_len;
}
void
UnixRequest::AddBytesTransferred(size_t size)
{
fBytesTransferred += size;
while (fVecIndex < fVecCount
&& fVecs[fVecIndex].iov_len - fVecOffset <= size) {
size -= fVecs[fVecIndex].iov_len - fVecOffset;
fVecIndex++;
fVecOffset = 0;
}
if (fVecIndex < fVecCount)
fVecOffset += size;
}
bool
UnixRequest::GetCurrentChunk(void*& data, size_t& size)
{
while (fVecIndex < fVecCount
&& fVecOffset >= fVecs[fVecIndex].iov_len) {
fVecIndex++;
fVecOffset = 0;
}
if (fVecIndex >= fVecCount)
return false;
data = (uint8*)fVecs[fVecIndex].iov_base + fVecOffset;
size = fVecs[fVecIndex].iov_len - fVecOffset;
return true;
}
void
UnixRequest::UnsetAncillaryData()
{
fAncillaryData = NULL;
}
void
UnixRequest::AddAncillaryData(ancillary_data_container* data)
{
if (fAncillaryData != NULL) {
gStackModule->move_ancillary_data(data, fAncillaryData);
gStackModule->delete_ancillary_data_container(data);
} else
fAncillaryData = data;
}
UnixBufferQueue::UnixBufferQueue(size_t capacity, UnixFifoType type)
:
fBuffer(NULL),
fCapacity(capacity),
fType(type)
{
}
UnixBufferQueue::~UnixBufferQueue()
{
while (AncillaryDataEntry* entry = fAncillaryData.RemoveHead()) {
gStackModule->delete_ancillary_data_container(entry->data);
delete entry;
}
delete_ring_buffer(fBuffer);
}
status_t
UnixBufferQueue::Init()
{
fBuffer = create_ring_buffer(fCapacity);
if (fBuffer == NULL)
return B_NO_MEMORY;
return B_OK;
}
size_t
UnixBufferQueue::Readable() const
{
return ring_buffer_readable(fBuffer);
}
size_t
UnixBufferQueue::Writable() const
{
return ring_buffer_writable(fBuffer);
}
status_t
UnixBufferQueue::Read(UnixRequest& request)
{
bool user = gStackModule->is_syscall();
size_t readable = Readable();
void* data;
size_t size;
DatagramEntry* datagramEntry = NULL;
if (fType == UnixFifoType::Datagram) {
datagramEntry = fDatagrams.Head();
if (datagramEntry == NULL)
return B_ERROR;
if (datagramEntry->size > readable)
TRACE("UnixBufferQueue::Read(): expected to read a datagram of size %lu, "
"but only %lu bytes are readable\n", datagramEntry->size, readable);
else
readable = datagramEntry->size;
}
while (readable > 0 && request.GetCurrentChunk(data, size)) {
if (size > readable)
size = readable;
ssize_t bytesRead;
if (user)
bytesRead = ring_buffer_user_read(fBuffer, (uint8*)data, size);
else
bytesRead = ring_buffer_read(fBuffer, (uint8*)data, size);
if (bytesRead < 0)
return bytesRead;
if (bytesRead == 0)
return B_ERROR;
if (AncillaryDataEntry* entry = fAncillaryData.Head()) {
size_t offsetDelta = bytesRead;
while (entry != NULL && offsetDelta > entry->offset) {
fAncillaryData.RemoveHead();
offsetDelta -= entry->offset;
request.AddAncillaryData(entry->data);
delete entry;
entry = fAncillaryData.Head();
}
if (entry != NULL)
entry->offset -= offsetDelta;
}
request.AddBytesTransferred(bytesRead);
readable -= bytesRead;
}
if (fType == UnixFifoType::Datagram) {
fDatagrams.RemoveHead();
memcpy(request.Address(), &datagramEntry->address, sizeof(datagramEntry->address));
delete datagramEntry;
if (readable > 0) {
ring_buffer_flush(fBuffer, readable);
if (AncillaryDataEntry* entry = fAncillaryData.Head()) {
size_t offsetDelta = readable;
while (entry != NULL && offsetDelta > entry->offset) {
fAncillaryData.RemoveHead();
offsetDelta -= entry->offset;
gStackModule->delete_ancillary_data_container(entry->data);
delete entry;
entry = fAncillaryData.Head();
}
if (entry != NULL)
entry->offset -= offsetDelta;
}
}
}
return B_OK;
}
status_t
UnixBufferQueue::Write(UnixRequest& request)
{
bool user = gStackModule->is_syscall();
size_t writable = Writable();
void* data;
size_t size;
DatagramEntry* datagramEntry = NULL;
ObjectDeleter<DatagramEntry> datagramEntryDeleter;
if (fType == UnixFifoType::Datagram) {
datagramEntry = new(std::nothrow) DatagramEntry;
if (datagramEntry == NULL)
return B_NO_MEMORY;
datagramEntryDeleter.SetTo(datagramEntry);
memcpy(&datagramEntry->address, request.Address(),
sizeof(datagramEntry->address));
datagramEntry->size = request.TotalSize();
if (writable < datagramEntry->size) {
TRACE("UnixBufferQueue::Write(): not enough space for"
"datagram of size %lu (%lu bytes left)\n", datagramEntry->size, writable);
return B_ERROR;
}
}
AncillaryDataEntry* ancillaryEntry = NULL;
ObjectDeleter<AncillaryDataEntry> ancillaryEntryDeleter;
if (writable > 0 && request.AncillaryData() != NULL) {
ancillaryEntry = new(std::nothrow) AncillaryDataEntry;
if (ancillaryEntry == NULL)
return B_NO_MEMORY;
ancillaryEntryDeleter.SetTo(ancillaryEntry);
ancillaryEntry->data = request.AncillaryData();
ancillaryEntry->offset = Readable();
AncillaryDataList::Iterator it = fAncillaryData.GetIterator();
while (AncillaryDataEntry* entry = it.Next())
ancillaryEntry->offset -= entry->offset;
}
while (writable > 0 && request.GetCurrentChunk(data, size)) {
if (size > writable)
size = writable;
ssize_t bytesWritten;
if (user)
bytesWritten = ring_buffer_user_write(fBuffer, (uint8*)data, size);
else
bytesWritten = ring_buffer_write(fBuffer, (uint8*)data, size);
if (bytesWritten < 0)
return bytesWritten;
if (bytesWritten == 0)
return B_ERROR;
if (ancillaryEntry != NULL) {
fAncillaryData.Add(ancillaryEntry);
ancillaryEntryDeleter.Detach();
request.UnsetAncillaryData();
ancillaryEntry = NULL;
}
request.AddBytesTransferred(bytesWritten);
writable -= bytesWritten;
}
if (fType == UnixFifoType::Datagram) {
fDatagrams.Add(datagramEntry);
datagramEntryDeleter.Detach();
}
return B_OK;
}
status_t
UnixBufferQueue::SetCapacity(size_t capacity)
{
if (capacity <= fCapacity)
return B_OK;
ring_buffer* newBuffer = create_ring_buffer(capacity);
if (newBuffer == NULL)
return B_NO_MEMORY;
ring_buffer_move(newBuffer, ring_buffer_readable(fBuffer), fBuffer);
delete_ring_buffer(fBuffer);
fBuffer = newBuffer;
fCapacity = capacity;
return B_OK;
}
UnixFifo::UnixFifo(size_t capacity, UnixFifoType type)
:
fBuffer(capacity, type),
fReaders(),
fWriters(),
fReadRequested(0),
fWriteRequested(0),
fShutdown(0)
{
fReadCondition.Init(this, "unix fifo read");
fWriteCondition.Init(this, "unix fifo write");
mutex_init(&fLock, "unix fifo");
}
UnixFifo::~UnixFifo()
{
mutex_destroy(&fLock);
}
status_t
UnixFifo::Init()
{
return fBuffer.Init();
}
void
UnixFifo::Shutdown(uint32 shutdown)
{
TRACE("[%" B_PRId32 "] %p->UnixFifo::Shutdown(0x%" B_PRIx32 ")\n",
find_thread(NULL), this, shutdown);
fShutdown |= shutdown;
if (shutdown != 0) {
fReadCondition.NotifyAll();
fWriteCondition.NotifyAll();
}
}
ssize_t
UnixFifo::Read(const iovec* vecs, size_t vecCount,
ancillary_data_container** _ancillaryData,
struct sockaddr_storage* address, bigtime_t timeout)
{
TRACE("[%" B_PRId32 "] %p->UnixFifo::Read(%p, %ld, %" B_PRIdBIGTIME ")\n",
find_thread(NULL), this, vecs, vecCount, timeout);
if (IsReadShutdown() && fBuffer.Readable() == 0)
RETURN_ERROR(UNIX_FIFO_SHUTDOWN);
UnixRequest request(vecs, vecCount, NULL, address);
fReaders.Add(&request);
fReadRequested += request.TotalSize();
status_t error = _Read(request, timeout);
bool firstInQueue = fReaders.Head() == &request;
fReaders.Remove(&request);
fReadRequested -= request.TotalSize();
if (firstInQueue && !fReaders.IsEmpty() && fBuffer.Readable() > 0
&& !IsReadShutdown()) {
fReadCondition.NotifyAll();
}
if (request.BytesTransferred() > 0 && !fWriters.IsEmpty()
&& !IsWriteShutdown()) {
fWriteCondition.NotifyAll();
}
*_ancillaryData = request.AncillaryData();
if (request.BytesTransferred() > 0) {
if (request.BytesTransferred() > SSIZE_MAX)
RETURN_ERROR(SSIZE_MAX);
RETURN_ERROR((ssize_t)request.BytesTransferred());
}
RETURN_ERROR(error);
}
ssize_t
UnixFifo::Write(const iovec* vecs, size_t vecCount,
ancillary_data_container* ancillaryData,
const struct sockaddr_storage* address, bigtime_t timeout)
{
TRACE("[%" B_PRId32 "] %p->UnixFifo::Write(%p, %ld, %p, %" B_PRIdBIGTIME
")\n", find_thread(NULL), this, vecs, vecCount, ancillaryData,
timeout);
if (IsWriteShutdown())
RETURN_ERROR(UNIX_FIFO_SHUTDOWN);
if (IsReadShutdown())
RETURN_ERROR(EPIPE);
UnixRequest request(vecs, vecCount, ancillaryData,
(struct sockaddr_storage*)address);
fWriters.Add(&request);
fWriteRequested += request.TotalSize();
status_t error = _Write(request, timeout);
bool firstInQueue = fWriters.Head() == &request;
fWriters.Remove(&request);
fWriteRequested -= request.TotalSize();
if (firstInQueue && !fWriters.IsEmpty() && fBuffer.Writable() > 0
&& !IsWriteShutdown()) {
fWriteCondition.NotifyAll();
}
if (request.BytesTransferred() > 0 && !fReaders.IsEmpty()
&& !IsReadShutdown()) {
fReadCondition.NotifyAll();
}
if (request.BytesTransferred() > 0) {
if (request.BytesTransferred() > SSIZE_MAX)
RETURN_ERROR(SSIZE_MAX);
RETURN_ERROR((ssize_t)request.BytesTransferred());
}
RETURN_ERROR(error);
}
size_t
UnixFifo::Readable() const
{
size_t readable = fBuffer.Readable();
return (off_t)readable > fReadRequested ? readable - fReadRequested : 0;
}
size_t
UnixFifo::Writable() const
{
size_t writable = fBuffer.Writable();
return (off_t)writable > fWriteRequested ? writable - fWriteRequested : 0;
}
status_t
UnixFifo::SetBufferCapacity(size_t capacity)
{
if (capacity > UNIX_FIFO_MAXIMAL_CAPACITY)
capacity = UNIX_FIFO_MAXIMAL_CAPACITY;
else if (capacity < UNIX_FIFO_MINIMAL_CAPACITY)
capacity = UNIX_FIFO_MINIMAL_CAPACITY;
size_t oldCapacity = fBuffer.Capacity();
if (capacity == oldCapacity)
return B_OK;
status_t error = fBuffer.SetCapacity(capacity);
if (error != B_OK)
return error;
if (!fWriters.IsEmpty() && !IsWriteShutdown())
fWriteCondition.NotifyAll();
return B_OK;
}
status_t
UnixFifo::_Read(UnixRequest& request, bigtime_t timeout)
{
if (fReaders.Head() != &request && timeout == 0)
RETURN_ERROR(B_WOULD_BLOCK);
while (fReaders.Head() != &request
&& !(IsReadShutdown() && fBuffer.Readable() == 0)) {
ConditionVariableEntry entry;
fReadCondition.Add(&entry);
mutex_unlock(&fLock);
status_t error = entry.Wait(B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT,
timeout);
mutex_lock(&fLock);
if (error != B_OK)
RETURN_ERROR(error);
}
if (fBuffer.Readable() == 0) {
if (IsReadShutdown())
RETURN_ERROR(UNIX_FIFO_SHUTDOWN);
if (IsWriteShutdown())
RETURN_ERROR(0);
if (timeout == 0)
RETURN_ERROR(B_WOULD_BLOCK);
}
while (fBuffer.Readable() == 0
&& !IsReadShutdown() && !IsWriteShutdown()) {
ConditionVariableEntry entry;
fReadCondition.Add(&entry);
mutex_unlock(&fLock);
status_t error = entry.Wait(B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT,
timeout);
mutex_lock(&fLock);
if (error != B_OK)
RETURN_ERROR(error);
}
if (fBuffer.Readable() == 0) {
if (IsReadShutdown())
RETURN_ERROR(UNIX_FIFO_SHUTDOWN);
if (IsWriteShutdown())
RETURN_ERROR(0);
}
RETURN_ERROR(fBuffer.Read(request));
}
status_t
UnixFifo::_Write(UnixRequest& request, bigtime_t timeout)
{
if (timeout == 0)
RETURN_ERROR(_WriteNonBlocking(request));
while (fWriters.Head() != &request && !IsWriteShutdown()) {
ConditionVariableEntry entry;
fWriteCondition.Add(&entry);
mutex_unlock(&fLock);
status_t error = entry.Wait(B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT,
timeout);
mutex_lock(&fLock);
if (error != B_OK)
RETURN_ERROR(error);
}
if (IsWriteShutdown())
RETURN_ERROR(UNIX_FIFO_SHUTDOWN);
if (IsReadShutdown())
RETURN_ERROR(EPIPE);
if (request.TotalSize() == 0)
return 0;
status_t error = B_OK;
while (error == B_OK && request.BytesRemaining() > 0) {
while (error == B_OK && fBuffer.Writable() < _MinimumWritableSize(request)
&& !IsWriteShutdown() && !IsReadShutdown()) {
ConditionVariableEntry entry;
fWriteCondition.Add(&entry);
mutex_unlock(&fLock);
error = entry.Wait(B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, timeout);
mutex_lock(&fLock);
if (error != B_OK)
RETURN_ERROR(error);
}
if (IsWriteShutdown())
RETURN_ERROR(UNIX_FIFO_SHUTDOWN);
if (IsReadShutdown())
RETURN_ERROR(EPIPE);
error = fBuffer.Write(request);
if (error == B_OK) {
}
}
RETURN_ERROR(error);
}
status_t
UnixFifo::_WriteNonBlocking(UnixRequest& request)
{
if (fWriters.Head() != &request || fBuffer.Writable() < _MinimumWritableSize(request))
RETURN_ERROR(B_WOULD_BLOCK);
if (request.TotalSize() == 0)
return 0;
RETURN_ERROR(fBuffer.Write(request));
}
size_t
UnixFifo::_MinimumWritableSize(const UnixRequest& request) const
{
switch (fType) {
case UnixFifoType::Datagram:
return request.TotalSize();
case UnixFifoType::Stream:
default:
return 1;
}
}