#include "AbstractConnection.h"
#include <AutoLocker.h>
#include "Channel.h"
#include "DebugSupport.h"
AbstractConnection::AbstractConnection()
: Connection(),
fInitStatus(B_NO_INIT),
fDownStreamChannels(),
fUpStreamChannelSemaphore(-1),
fUpStreamChannelLock(),
fUpStreamChannels(),
fFreeUpStreamChannels(0)
{
}
AbstractConnection::~AbstractConnection()
{
Close();
for (ChannelVector::Iterator it = fDownStreamChannels.Begin();
it != fDownStreamChannels.End();
it++) {
delete *it;
}
fDownStreamChannels.MakeEmpty();
AutoLocker<Locker> _(fUpStreamChannelLock);
for (ChannelVector::Iterator it = fUpStreamChannels.Begin();
it != fUpStreamChannels.End();
it++) {
delete *it;
}
fUpStreamChannels.MakeEmpty();
}
status_t
AbstractConnection::Init()
{
fInitStatus = B_OK;
fUpStreamChannelSemaphore = create_sem(0, "upstream channels");
if (fUpStreamChannelSemaphore < 0)
return (fInitStatus = fUpStreamChannelSemaphore);
return fInitStatus;
}
void
AbstractConnection::Close()
{
for (ChannelVector::Iterator it = fDownStreamChannels.Begin();
it != fDownStreamChannels.End();
it++) {
(*it)->Close();
}
AutoLocker<Locker> _(fUpStreamChannelLock);
for (ChannelVector::Iterator it = fUpStreamChannels.Begin();
it != fUpStreamChannels.End();
it++) {
(*it)->Close();
}
}
status_t
AbstractConnection::AddDownStreamChannel(Channel* channel)
{
if (!channel)
return B_BAD_VALUE;
return fDownStreamChannels.PushBack(channel);
}
status_t
AbstractConnection::AddUpStreamChannel(Channel* channel)
{
if (!channel)
return B_BAD_VALUE;
AutoLocker<Locker> _(fUpStreamChannelLock);
status_t error = fUpStreamChannels.PushBack(channel);
if (error != B_OK)
return error;
PutUpStreamChannel(channel);
return B_OK;
}
int32
AbstractConnection::CountDownStreamChannels() const
{
return fDownStreamChannels.Count();
}
Channel*
AbstractConnection::DownStreamChannelAt(int32 index) const
{
if (index < 0 || index >= fDownStreamChannels.Count())
return NULL;
return fDownStreamChannels.ElementAt(index);
}
status_t
AbstractConnection::GetUpStreamChannel(Channel** channel, bigtime_t timeout)
{
if (!channel)
return B_BAD_VALUE;
if (timeout < 0)
timeout = 0;
status_t error = acquire_sem_etc(fUpStreamChannelSemaphore, 1,
B_RELATIVE_TIMEOUT, timeout);
if (error != B_OK)
return error;
AutoLocker<Locker> _(fUpStreamChannelLock);
if (fFreeUpStreamChannels < 1) {
ERROR(("AbstractConnection::GetUpStreamChannel(): Acquired the "
"upstream semaphore successfully, but there's no free channel!\n"));
return B_ERROR;
}
fFreeUpStreamChannels--;
*channel = fUpStreamChannels.ElementAt(fFreeUpStreamChannels);
return B_OK;
}
status_t
AbstractConnection::PutUpStreamChannel(Channel* channel)
{
if (!channel)
return B_BAD_VALUE;
AutoLocker<Locker> _(fUpStreamChannelLock);
int32 index = fUpStreamChannels.IndexOf(channel, fFreeUpStreamChannels);
if (index < 0)
return B_BAD_VALUE;
if (index != fFreeUpStreamChannels) {
Channel*& target = fUpStreamChannels.ElementAt(fFreeUpStreamChannels);
fUpStreamChannels.ElementAt(index) = target;
target = channel;
}
status_t error = release_sem(fUpStreamChannelSemaphore);
if (error == B_OK)
fFreeUpStreamChannels++;
return error;
}