* Copyright 2009, 2017, Haiku, Inc.
* Distributed under the terms of the MIT License.
*
* Authors:
* Michael Lotz <mmlr@mlotz.ch>
*/
#include "StreamingRingBuffer.h"
#include <Autolock.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#ifdef CLIENT_COMPILE
#define TRACE_ALWAYS(x...) printf("StreamingRingBuffer: " x)
#else
#define TRACE_ALWAYS(x...) debug_printf("StreamingRingBuffer: " x)
#endif
#define TRACE(x...)
#define TRACE_ERROR(x...) TRACE_ALWAYS(x)
StreamingRingBuffer::StreamingRingBuffer(size_t bufferSize)
:
fReaderWaiting(false),
fWriterWaiting(false),
fCancelRead(false),
fCancelWrite(false),
fReaderNotifier(-1),
fWriterNotifier(-1),
fReaderLocker("StreamingRingBuffer reader"),
fWriterLocker("StreamingRingBuffer writer"),
fDataLocker("StreamingRingBuffer data"),
fBuffer(NULL),
fBufferSize(bufferSize),
fReadable(0),
fReadPosition(0),
fWritePosition(0)
{
fReaderNotifier = create_sem(0, "StreamingRingBuffer read notify");
fWriterNotifier = create_sem(0, "StreamingRingBuffer write notify");
fBuffer = (uint8 *)malloc(fBufferSize);
if (fBuffer == NULL)
fBufferSize = 0;
}
StreamingRingBuffer::~StreamingRingBuffer()
{
delete_sem(fReaderNotifier);
delete_sem(fWriterNotifier);
free(fBuffer);
}
status_t
StreamingRingBuffer::InitCheck()
{
if (fReaderNotifier < 0)
return fReaderNotifier;
if (fWriterNotifier < 0)
return fWriterNotifier;
if (fBuffer == NULL)
return B_NO_MEMORY;
return B_OK;
}
int32
StreamingRingBuffer::Read(void *buffer, size_t length, bool onlyBlockOnNoData)
{
BAutolock readerLock(fReaderLocker);
if (!readerLock.IsLocked())
return B_ERROR;
BAutolock dataLock(fDataLocker);
if (!dataLock.IsLocked())
return B_ERROR;
int32 readSize = 0;
while (length > 0) {
size_t copyLength = min_c(length, fBufferSize - fReadPosition);
copyLength = min_c(copyLength, fReadable);
if (copyLength == 0) {
if (onlyBlockOnNoData && readSize > 0)
return readSize;
fReaderWaiting = true;
dataLock.Unlock();
status_t result;
do {
TRACE("waiting in reader\n");
result = acquire_sem(fReaderNotifier);
TRACE("done waiting in reader with status: %#" B_PRIx32 "\n",
result);
} while (result == B_INTERRUPTED);
if (result != B_OK)
return result;
if (!dataLock.Lock()) {
TRACE_ERROR("failed to acquire data lock\n");
return B_ERROR;
}
if (fCancelRead) {
TRACE("read canceled\n");
fCancelRead = false;
return B_CANCELED;
}
continue;
}
if (buffer != NULL) {
memcpy(buffer, fBuffer + fReadPosition, copyLength);
buffer = (uint8 *)buffer + copyLength;
}
fReadPosition = (fReadPosition + copyLength) % fBufferSize;
fReadable -= copyLength;
readSize += copyLength;
length -= copyLength;
if (fWriterWaiting) {
release_sem_etc(fWriterNotifier, 1, B_DO_NOT_RESCHEDULE);
fWriterWaiting = false;
}
}
return readSize;
}
status_t
StreamingRingBuffer::Write(const void *buffer, size_t length)
{
BAutolock writerLock(fWriterLocker);
if (!writerLock.IsLocked())
return B_ERROR;
BAutolock dataLock(fDataLocker);
if (!dataLock.IsLocked())
return B_ERROR;
while (length > 0) {
size_t copyLength = min_c(length, fBufferSize - fWritePosition);
copyLength = min_c(copyLength, fBufferSize - fReadable);
if (copyLength == 0) {
fWriterWaiting = true;
dataLock.Unlock();
status_t result;
do {
TRACE("waiting in writer\n");
result = acquire_sem(fWriterNotifier);
TRACE("done waiting in writer with status: %#" B_PRIx32 "\n",
result);
} while (result == B_INTERRUPTED);
if (result != B_OK)
return result;
if (!dataLock.Lock()) {
TRACE_ERROR("failed to acquire data lock\n");
return B_ERROR;
}
if (fCancelWrite) {
TRACE("write canceled\n");
fCancelWrite = false;
return B_CANCELED;
}
continue;
}
memcpy(fBuffer + fWritePosition, buffer, copyLength);
fWritePosition = (fWritePosition + copyLength) % fBufferSize;
fReadable += copyLength;
buffer = (uint8 *)buffer + copyLength;
length -= copyLength;
if (fReaderWaiting) {
release_sem_etc(fReaderNotifier, 1, B_DO_NOT_RESCHEDULE);
fReaderWaiting = false;
}
}
return B_OK;
}
void
StreamingRingBuffer::MakeEmpty()
{
BAutolock dataLock(fDataLocker);
if (!dataLock.IsLocked())
return;
fReadPosition = fWritePosition = 0;
fReadable = 0;
if (fWriterWaiting) {
release_sem_etc(fWriterNotifier, 1, 0);
fWriterWaiting = false;
fCancelWrite = true;
}
if (fReaderWaiting) {
release_sem_etc(fReaderNotifier, 1, 0);
fReaderWaiting = false;
fCancelRead = true;
}
}