* Copyright 2008, Ingo Weinhold, ingo_weinhold@gmx.de.
* Distributed under the terms of the MIT License.
*/
#include "UnixStreamEndpoint.h"
#include <stdio.h>
#include <sys/stat.h>
#include <AutoDeleter.h>
#include <vfs.h>
#include "UnixAddressManager.h"
#include "UnixFifo.h"
#define UNIX_STREAM_ENDPOINT_DEBUG_LEVEL 0
#define UNIX_DEBUG_LEVEL UNIX_STREAM_ENDPOINT_DEBUG_LEVEL
#include "UnixDebug.h"
UnixStreamEndpoint::UnixStreamEndpoint(net_socket* socket, bool atomic)
:
UnixEndpoint(socket),
fPeerEndpoint(NULL),
fReceiveFifo(NULL),
fState(unix_stream_endpoint_state::Closed),
fAcceptSemaphore(-1),
fIsChild(false),
fWasConnected(false),
fAtomic(atomic)
{
TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::UnixStreamEndpoint()\n",
find_thread(NULL), this);
}
UnixStreamEndpoint::~UnixStreamEndpoint()
{
TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::~UnixStreamEndpoint()\n",
find_thread(NULL), this);
}
status_t
UnixStreamEndpoint::Init()
{
TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Init()\n", find_thread(NULL),
this);
RETURN_ERROR(B_OK);
}
void
UnixStreamEndpoint::Uninit()
{
TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Uninit()\n", find_thread(NULL),
this);
UnixStreamEndpointLocker locker(this);
bool closed = (fState == unix_stream_endpoint_state::Closed);
locker.Unlock();
if (!closed) {
Close();
}
ReleaseReference();
}
status_t
UnixStreamEndpoint::Open()
{
TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Open()\n", find_thread(NULL),
this);
status_t error = ProtocolSocket::Open();
if (error != B_OK)
RETURN_ERROR(error);
fState = unix_stream_endpoint_state::NotConnected;
RETURN_ERROR(B_OK);
}
status_t
UnixStreamEndpoint::Close()
{
TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Close()\n", find_thread(NULL),
this);
UnixStreamEndpointLocker locker(this);
if (fState == unix_stream_endpoint_state::Connected) {
BReference<UnixStreamEndpoint> peerReference;
UnixStreamEndpointLocker peerLocker;
if (_LockConnectedEndpoints(locker, peerLocker) == B_OK) {
peerReference.SetTo(peerLocker.Get(), false);
fPeerEndpoint->_Disconnect();
_Disconnect();
}
}
if (fState == unix_stream_endpoint_state::Listening)
_StopListening();
_Unbind();
fState = unix_stream_endpoint_state::Closed;
RETURN_ERROR(B_OK);
}
status_t
UnixStreamEndpoint::Free()
{
TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Free()\n", find_thread(NULL),
this);
UnixStreamEndpointLocker locker(this);
_UnsetReceiveFifo();
RETURN_ERROR(B_OK);
}
status_t
UnixStreamEndpoint::Bind(const struct sockaddr* _address)
{
if (_address->sa_family != AF_UNIX)
RETURN_ERROR(EAFNOSUPPORT);
TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Bind(\"%s\")\n",
find_thread(NULL), this,
ConstSocketAddress(&gAddressModule, _address).AsString().Data());
const sockaddr_un* address = (const sockaddr_un*)_address;
UnixStreamEndpointLocker endpointLocker(this);
if (fState != unix_stream_endpoint_state::NotConnected || IsBound())
RETURN_ERROR(B_BAD_VALUE);
RETURN_ERROR(_Bind(address));
}
status_t
UnixStreamEndpoint::Unbind()
{
TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Unbind()\n", find_thread(NULL),
this);
UnixStreamEndpointLocker endpointLocker(this);
RETURN_ERROR(_Unbind());
}
status_t
UnixStreamEndpoint::Listen(int backlog)
{
TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Listen(%d)\n", find_thread(NULL),
this, backlog);
UnixStreamEndpointLocker endpointLocker(this);
if (!IsBound())
RETURN_ERROR(EDESTADDRREQ);
if (fState != unix_stream_endpoint_state::NotConnected
&& fState != unix_stream_endpoint_state::Listening)
RETURN_ERROR(EINVAL);
gSocketModule->set_max_backlog(socket, backlog);
if (fState == unix_stream_endpoint_state::NotConnected) {
fAcceptSemaphore = create_sem(0, "unix accept");
if (fAcceptSemaphore < 0)
RETURN_ERROR(ENOBUFS);
_UnsetReceiveFifo();
fCredentials.pid = getpid();
fCredentials.uid = geteuid();
fCredentials.gid = getegid();
fState = unix_stream_endpoint_state::Listening;
}
RETURN_ERROR(B_OK);
}
status_t
UnixStreamEndpoint::Connect(const struct sockaddr* _address)
{
if (_address->sa_family != AF_UNIX)
RETURN_ERROR(EAFNOSUPPORT);
TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Connect(\"%s\")\n",
find_thread(NULL), this,
ConstSocketAddress(&gAddressModule, _address).AsString().Data());
const sockaddr_un* address = (const sockaddr_un*)_address;
UnixStreamEndpointLocker endpointLocker(this);
if (fState == unix_stream_endpoint_state::Connected)
RETURN_ERROR(EISCONN);
if (fState != unix_stream_endpoint_state::NotConnected)
RETURN_ERROR(B_BAD_VALUE);
UnixAddress unixAddress;
if (address->sun_path[0] == '\0') {
int32 internalID;
if (UnixAddress::IsEmptyAddress(*address))
RETURN_ERROR(B_BAD_VALUE);
internalID = UnixAddress::InternalID(*address);
if (internalID < 0)
RETURN_ERROR(internalID);
unixAddress.SetTo(internalID);
} else {
size_t pathLen = strnlen(address->sun_path, sizeof(address->sun_path));
if (pathLen == 0 || pathLen == sizeof(address->sun_path))
RETURN_ERROR(B_BAD_VALUE);
struct stat st;
status_t error = vfs_read_stat(-1, address->sun_path, true, &st,
!gStackModule->is_syscall());
if (error != B_OK)
RETURN_ERROR(error);
if (!S_ISSOCK(st.st_mode))
RETURN_ERROR(B_BAD_VALUE);
unixAddress.SetTo(st.st_dev, st.st_ino, NULL);
}
UnixAddressManagerLocker addressLocker(gAddressManager);
UnixEndpoint* listeningUnixEndpoint = gAddressManager.Lookup(unixAddress);
if (listeningUnixEndpoint == NULL)
RETURN_ERROR(ECONNREFUSED);
UnixStreamEndpoint* listeningEndpoint
= dynamic_cast<UnixStreamEndpoint*>(listeningUnixEndpoint);
if (listeningEndpoint == NULL)
RETURN_ERROR(EPROTOTYPE);
BReference<UnixStreamEndpoint> peerReference(listeningEndpoint);
addressLocker.Unlock();
UnixStreamEndpointLocker peerLocker(listeningEndpoint);
if (!listeningEndpoint->IsBound()
|| listeningEndpoint->fState != unix_stream_endpoint_state::Listening
|| listeningEndpoint->fAddress != unixAddress) {
RETURN_ERROR(ECONNREFUSED);
}
UnixFifoType type = fAtomic ? UnixFifoType::Datagram : UnixFifoType::Stream;
UnixFifo* fifo = new(nothrow) UnixFifo(socket->receive.buffer_size, type);
UnixFifo* peerFifo = new(nothrow) UnixFifo(socket->send.buffer_size, type);
ObjectDeleter<UnixFifo> fifoDeleter(fifo);
ObjectDeleter<UnixFifo> peerFifoDeleter(peerFifo);
status_t error;
if ((error = fifo->Init()) != B_OK || (error = peerFifo->Init()) != B_OK)
return error;
net_socket* newSocket;
error = gSocketModule->spawn_pending_socket(listeningEndpoint->socket,
&newSocket);
if (error != B_OK)
RETURN_ERROR(error);
UnixStreamEndpoint* connectedEndpoint = (UnixStreamEndpoint*)newSocket->first_protocol;
UnixStreamEndpointLocker connectedLocker(connectedEndpoint);
connectedEndpoint->_Spawn(this, listeningEndpoint, peerFifo);
_UnsetReceiveFifo();
fPeerEndpoint = connectedEndpoint;
PeerAddress().SetTo(&connectedEndpoint->socket->address);
fPeerEndpoint->AcquireReference();
fReceiveFifo = fifo;
fCredentials.pid = getpid();
fCredentials.uid = geteuid();
fCredentials.gid = getegid();
fifoDeleter.Detach();
peerFifoDeleter.Detach();
fState = unix_stream_endpoint_state::Connected;
fWasConnected = true;
gSocketModule->set_connected(Socket());
release_sem(listeningEndpoint->fAcceptSemaphore);
connectedLocker.Unlock();
peerLocker.Unlock();
endpointLocker.Unlock();
RETURN_ERROR(B_OK);
}
status_t
UnixStreamEndpoint::Accept(net_socket** _acceptedSocket)
{
TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Accept()\n", find_thread(NULL),
this);
bigtime_t timeout = absolute_timeout(socket->receive.timeout);
if (gStackModule->is_restarted_syscall())
timeout = gStackModule->restore_syscall_restart_timeout();
else
gStackModule->store_syscall_restart_timeout(timeout);
UnixStreamEndpointLocker locker(this);
status_t error;
do {
locker.Unlock();
error = acquire_sem_etc(fAcceptSemaphore, 1,
B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, timeout);
if (error < B_OK)
break;
locker.Lock();
error = gSocketModule->dequeue_connected(socket, _acceptedSocket);
} while (error != B_OK);
if (error == B_TIMED_OUT && timeout == 0) {
error = B_WOULD_BLOCK;
}
RETURN_ERROR(error);
}
ssize_t
UnixStreamEndpoint::Send(const iovec* vecs, size_t vecCount,
ancillary_data_container* ancillaryData,
const struct sockaddr* address, socklen_t addressLength, int flags)
{
TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Send(%p, %ld, %p)\n",
find_thread(NULL), this, vecs, vecCount, ancillaryData);
bool atomic = (flags & MSG_EOR) != 0;
if ((atomic && !fAtomic) || (flags & ~(MSG_DONTWAIT | MSG_EOR)) != 0)
return EOPNOTSUPP;
bigtime_t timeout = 0;
if ((flags & MSG_DONTWAIT) == 0) {
timeout = absolute_timeout(socket->send.timeout);
if (gStackModule->is_restarted_syscall())
timeout = gStackModule->restore_syscall_restart_timeout();
else
gStackModule->store_syscall_restart_timeout(timeout);
}
UnixStreamEndpointLocker locker(this);
BReference<UnixStreamEndpoint> peerReference;
UnixStreamEndpointLocker peerLocker;
status_t error = _LockConnectedEndpoints(locker, peerLocker);
if (error != B_OK)
RETURN_ERROR(error);
UnixStreamEndpoint* peerEndpoint = fPeerEndpoint;
peerReference.SetTo(peerEndpoint);
UnixFifo* peerFifo = peerEndpoint->fReceiveFifo;
BReference<UnixFifo> _(peerFifo);
UnixFifoLocker fifoLocker(peerFifo);
locker.Unlock();
peerLocker.Unlock();
ssize_t result = peerFifo->Write(vecs, vecCount, ancillaryData, NULL, timeout);
size_t readable = peerFifo->Readable();
bool notifyRead = (error == B_OK && readable > 0
&& !peerFifo->IsReadShutdown());
size_t writable = peerFifo->Writable();
bool notifyWrite = (error != B_OK && writable > 0
&& !peerFifo->IsWriteShutdown());
fifoLocker.Unlock();
locker.Lock();
bool peerLocked = (fPeerEndpoint == peerEndpoint
&& _LockConnectedEndpoints(locker, peerLocker) == B_OK);
if (peerLocked && notifyRead)
gSocketModule->notify(peerEndpoint->socket, B_SELECT_READ, readable);
if (notifyWrite)
gSocketModule->notify(socket, B_SELECT_WRITE, writable);
switch (result) {
case UNIX_FIFO_SHUTDOWN:
if (fPeerEndpoint == peerEndpoint
&& fState == unix_stream_endpoint_state::Connected) {
result = EPIPE;
} else {
result = EBADF;
}
break;
case EPIPE:
break;
case B_TIMED_OUT:
if (timeout == 0)
result = B_WOULD_BLOCK;
break;
}
RETURN_ERROR(result);
}
ssize_t
UnixStreamEndpoint::Receive(const iovec* vecs, size_t vecCount,
ancillary_data_container** _ancillaryData, struct sockaddr* _address,
socklen_t* _addressLength, int flags)
{
TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Receive(%p, %ld)\n",
find_thread(NULL), this, vecs, vecCount);
if ((flags & ~(MSG_DONTWAIT | MSG_PEEK | MSG_WAITALL)) != 0)
return EOPNOTSUPP;
bigtime_t timeout = 0;
if ((flags & MSG_DONTWAIT) == 0) {
timeout = absolute_timeout(socket->receive.timeout);
if (gStackModule->is_restarted_syscall())
timeout = gStackModule->restore_syscall_restart_timeout();
else
gStackModule->store_syscall_restart_timeout(timeout);
}
UnixStreamEndpointLocker locker(this);
if (fReceiveFifo == NULL)
RETURN_ERROR(ENOTCONN);
UnixStreamEndpoint* peerEndpoint = fPeerEndpoint;
BReference<UnixStreamEndpoint> peerReference(peerEndpoint);
if (_address != NULL) {
socklen_t addrLen = min_c(*_addressLength, socket->peer.ss_len);
memcpy(_address, &socket->peer, addrLen);
*_addressLength = addrLen;
}
UnixFifo* fifo = fReceiveFifo;
BReference<UnixFifo> _(fifo);
UnixFifoLocker fifoLocker(fifo);
locker.Unlock();
ssize_t result = fifo->Read(vecs, vecCount, _ancillaryData, NULL, timeout,
(flags & MSG_PEEK) != 0);
size_t writable = fifo->Writable();
bool notifyWrite = (result >= 0 && writable > 0
&& !fifo->IsWriteShutdown());
size_t readable = fifo->Readable();
bool notifyRead = (result < 0 && readable > 0
&& !fifo->IsReadShutdown());
fifoLocker.Unlock();
locker.Lock();
UnixStreamEndpointLocker peerLocker;
bool peerLocked = (peerEndpoint != NULL && fPeerEndpoint == peerEndpoint
&& _LockConnectedEndpoints(locker, peerLocker) == B_OK);
if (notifyRead)
gSocketModule->notify(socket, B_SELECT_READ, readable);
if (peerLocked && notifyWrite)
gSocketModule->notify(peerEndpoint->socket, B_SELECT_WRITE, writable);
switch (result) {
case UNIX_FIFO_SHUTDOWN:
if (fState == unix_stream_endpoint_state::Closed) {
result = EBADF;
} else {
result = 0;
}
break;
case B_TIMED_OUT:
if (timeout == 0)
result = B_WOULD_BLOCK;
break;
}
RETURN_ERROR(result);
}
ssize_t
UnixStreamEndpoint::Sendable()
{
TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Sendable()\n", find_thread(NULL),
this);
UnixStreamEndpointLocker locker(this);
UnixStreamEndpointLocker peerLocker;
status_t error = _LockConnectedEndpoints(locker, peerLocker);
if (error != B_OK)
RETURN_ERROR(error);
UnixFifo* peerFifo = fPeerEndpoint->fReceiveFifo;
UnixFifoLocker fifoLocker(peerFifo);
RETURN_ERROR(peerFifo->Writable());
}
ssize_t
UnixStreamEndpoint::Receivable()
{
TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Receivable()\n", find_thread(NULL),
this);
UnixStreamEndpointLocker locker(this);
if (fState == unix_stream_endpoint_state::Listening)
return gSocketModule->count_connected(socket);
if (fState != unix_stream_endpoint_state::Connected)
RETURN_ERROR(ENOTCONN);
UnixFifoLocker fifoLocker(fReceiveFifo);
ssize_t readable = fReceiveFifo->Readable();
if (readable == 0 && (fReceiveFifo->IsWriteShutdown()
|| fReceiveFifo->IsReadShutdown())) {
RETURN_ERROR(ENOTCONN);
}
RETURN_ERROR(readable);
}
status_t
UnixStreamEndpoint::SetReceiveBufferSize(size_t size)
{
TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::SetReceiveBufferSize(%lu)\n",
find_thread(NULL), this, size);
UnixStreamEndpointLocker locker(this);
if (fReceiveFifo == NULL)
return B_OK;
UnixFifoLocker fifoLocker(fReceiveFifo);
return fReceiveFifo->SetBufferCapacity(size);
}
status_t
UnixStreamEndpoint::GetPeerCredentials(ucred* credentials)
{
UnixStreamEndpointLocker locker(this);
UnixStreamEndpointLocker peerLocker;
status_t error = _LockConnectedEndpoints(locker, peerLocker);
if (error != B_OK)
RETURN_ERROR(error);
*credentials = fPeerEndpoint->fCredentials;
return B_OK;
}
status_t
UnixStreamEndpoint::Shutdown(int direction)
{
TRACE("[%" B_PRId32 "] %p->UnixStreamEndpoint::Shutdown(%d)\n",
find_thread(NULL), this, direction);
uint32 shutdown;
uint32 peerShutdown;
switch (direction) {
case SHUT_RD:
shutdown = UNIX_FIFO_SHUTDOWN_READ;
peerShutdown = 0;
break;
case SHUT_WR:
shutdown = 0;
peerShutdown = UNIX_FIFO_SHUTDOWN_WRITE;
break;
case SHUT_RDWR:
shutdown = UNIX_FIFO_SHUTDOWN_READ;
peerShutdown = UNIX_FIFO_SHUTDOWN_WRITE;
break;
default:
RETURN_ERROR(B_BAD_VALUE);
}
UnixStreamEndpointLocker locker(this);
UnixStreamEndpointLocker peerLocker;
status_t error = _LockConnectedEndpoints(locker, peerLocker);
if (error != B_OK)
RETURN_ERROR(error);
fReceiveFifo->Lock();
fReceiveFifo->Shutdown(shutdown);
fReceiveFifo->Unlock();
fPeerEndpoint->fReceiveFifo->Lock();
fPeerEndpoint->fReceiveFifo->Shutdown(peerShutdown);
fPeerEndpoint->fReceiveFifo->Unlock();
if (direction == SHUT_RD || direction == SHUT_RDWR) {
gSocketModule->notify(socket, B_SELECT_READ, EPIPE);
gSocketModule->notify(fPeerEndpoint->socket, B_SELECT_WRITE, EPIPE);
}
if (direction == SHUT_WR || direction == SHUT_RDWR) {
gSocketModule->notify(socket, B_SELECT_WRITE, EPIPE);
gSocketModule->notify(fPeerEndpoint->socket, B_SELECT_READ, EPIPE);
}
RETURN_ERROR(B_OK);
}
void
UnixStreamEndpoint::_Spawn(UnixStreamEndpoint* connectingEndpoint,
UnixStreamEndpoint* listeningEndpoint, UnixFifo* fifo)
{
ProtocolSocket::Open();
fIsChild = true;
fPeerEndpoint = connectingEndpoint;
fPeerEndpoint->AcquireReference();
fReceiveFifo = fifo;
PeerAddress().SetTo(&connectingEndpoint->socket->address);
fCredentials = listeningEndpoint->fCredentials;
fState = unix_stream_endpoint_state::Connected;
gSocketModule->set_connected(Socket());
}
void
UnixStreamEndpoint::_Disconnect()
{
fReceiveFifo->Lock();
fReceiveFifo->Shutdown(UNIX_FIFO_SHUTDOWN_WRITE);
fReceiveFifo->Unlock();
gSocketModule->notify(socket, B_SELECT_READ, ECONNRESET);
gSocketModule->notify(socket, B_SELECT_WRITE, ECONNRESET);
fPeerEndpoint->ReleaseReference();
fPeerEndpoint = NULL;
fIsChild = false;
fState = unix_stream_endpoint_state::NotConnected;
}
status_t
UnixStreamEndpoint::_LockConnectedEndpoints(UnixStreamEndpointLocker& locker,
UnixStreamEndpointLocker& peerLocker)
{
if (fState != unix_stream_endpoint_state::Connected)
RETURN_ERROR(fWasConnected ? EPIPE : ENOTCONN);
BReference<UnixStreamEndpoint> peerReference(fPeerEndpoint);
UnixStreamEndpoint* peerEndpoint = fPeerEndpoint;
if (fIsChild) {
locker.Unlock();
peerLocker.SetTo(peerEndpoint, false);
locker.Lock();
if (fState != unix_stream_endpoint_state::Connected || peerEndpoint != fPeerEndpoint) {
peerLocker.Unset();
RETURN_ERROR(ENOTCONN);
}
} else
peerLocker.SetTo(peerEndpoint, false);
RETURN_ERROR(B_OK);
}
status_t
UnixStreamEndpoint::_Unbind()
{
if (fState == unix_stream_endpoint_state::Connected
|| fState == unix_stream_endpoint_state::Listening)
RETURN_ERROR(B_BAD_VALUE);
if (IsBound())
RETURN_ERROR(UnixEndpoint::_Unbind());
RETURN_ERROR(B_OK);
}
void
UnixStreamEndpoint::_UnsetReceiveFifo()
{
if (fReceiveFifo) {
fReceiveFifo->ReleaseReference();
fReceiveFifo = NULL;
}
}
void
UnixStreamEndpoint::_StopListening()
{
if (fState == unix_stream_endpoint_state::Listening) {
delete_sem(fAcceptSemaphore);
fAcceptSemaphore = -1;
fState = unix_stream_endpoint_state::NotConnected;
}
}