* Copyright 2008-2023, Haiku, Inc. All rights reserved.
* Distributed under the terms of the MIT License.
*
* Authors:
* Salvatore Benedetto <salvatore.benedetto@gmail.com>
*/
#include <posix/xsi_message_queue.h>
#include <new>
#include <sys/ipc.h>
#include <sys/types.h>
#include <OS.h>
#include <kernel.h>
#include <syscall_restart.h>
#include <util/atomic.h>
#include <util/AutoLock.h>
#include <util/DoublyLinkedList.h>
#include <util/OpenHashTable.h>
#define TRACE_XSI_MSG_QUEUE
#ifdef TRACE_XSI_MSG_QUEUE
# define TRACE(x) dprintf x
# define TRACE_ERROR(x) dprintf x
#else
# define TRACE(x)
# define TRACE_ERROR(x) dprintf x
#endif
namespace {
struct queued_message : DoublyLinkedListLinkImpl<queued_message> {
queued_message(const void *_message, ssize_t _length)
:
initOK(false),
length(_length)
{
message = (char *)malloc(sizeof(char) * _length);
if (message == NULL)
return;
if (user_memcpy(&type, _message, sizeof(long)) != B_OK
|| user_memcpy(message, (void *)((char *)_message + sizeof(long)),
_length) != B_OK) {
free(message);
return;
}
initOK = true;
}
~queued_message()
{
if (initOK)
free(message);
}
ssize_t copy_to_user_buffer(void *_message, ssize_t _length)
{
if (_length > length)
_length = length;
if (user_memcpy(_message, &type, sizeof(long)) != B_OK
|| user_memcpy((void *)((char *)_message + sizeof(long)), message,
_length) != B_OK)
return B_ERROR;
return _length;
}
bool initOK;
ssize_t length;
char *message;
long type;
};
typedef DoublyLinkedList<queued_message> MessageQueue;
#define MAX_BYTES_PER_QUEUE 2048
class XsiMessageQueue {
public:
XsiMessageQueue(int flags)
:
fBytesInQueue(0)
{
mutex_init(&fLock, "XsiMessageQueue private mutex");
fWaitingToReceive.Init(this, "XsiMessageQueue");
fWaitingToSend.Init(this, "XsiMessageQueue");
SetIpcKey((key_t)-1);
SetPermissions(flags);
memset((void *)&fMessageQueue, 0, sizeof(struct msqid_ds));
fMessageQueue.msg_ctime = (time_t)real_time_clock();
fMessageQueue.msg_qbytes = MAX_BYTES_PER_QUEUE;
}
~XsiMessageQueue();
status_t BlockAndUnlock(ConditionVariableEntry *queueEntry, MutexLocker *queueLocker)
{
queueLocker->Unlock();
return queueEntry->Wait(B_CAN_INTERRUPT);
}
void DoIpcSet(struct msqid_ds *result)
{
fMessageQueue.msg_perm.uid = result->msg_perm.uid;
fMessageQueue.msg_perm.gid = result->msg_perm.gid;
fMessageQueue.msg_perm.mode = (fMessageQueue.msg_perm.mode & ~0x01ff)
| (result->msg_perm.mode & 0x01ff);
fMessageQueue.msg_qbytes = result->msg_qbytes;
fMessageQueue.msg_ctime = (time_t)real_time_clock();
}
void Dequeue(ConditionVariableEntry *queueEntry)
{
queueEntry->Wait(B_RELATIVE_TIMEOUT, 0);
}
void Enqueue(ConditionVariableEntry *queueEntry, bool waitForMessage)
{
if (waitForMessage) {
fWaitingToReceive.Add(queueEntry);
} else {
fWaitingToSend.Add(queueEntry);
}
}
struct msqid_ds &GetMessageQueue()
{
return fMessageQueue;
}
bool HasPermission() const
{
if ((fMessageQueue.msg_perm.mode & S_IWOTH) != 0)
return true;
uid_t uid = geteuid();
if (uid == 0 || (uid == fMessageQueue.msg_perm.uid
&& (fMessageQueue.msg_perm.mode & S_IWUSR) != 0))
return true;
gid_t gid = getegid();
if (gid == fMessageQueue.msg_perm.gid
&& (fMessageQueue.msg_perm.mode & S_IWGRP) != 0)
return true;
return false;
}
bool HasReadPermission() const
{
return HasPermission();
}
int ID() const
{
return fID;
}
bool Insert(queued_message *message);
key_t IpcKey() const
{
return fMessageQueue.msg_perm.key;
}
mutex &Lock()
{
return fLock;
}
msglen_t MaxBytes() const
{
return fMessageQueue.msg_qbytes;
}
queued_message *Remove(long typeRequested);
uint32 SequenceNumber() const
{
return fSequenceNumber;
}
void SetID();
void SetIpcKey(key_t key)
{
fMessageQueue.msg_perm.key = key;
}
void SetPermissions(int flags)
{
fMessageQueue.msg_perm.uid = fMessageQueue.msg_perm.cuid = geteuid();
fMessageQueue.msg_perm.gid = fMessageQueue.msg_perm.cgid = getegid();
fMessageQueue.msg_perm.mode = (flags & 0x01ff);
}
void WakeUpThread(bool waitForMessage)
{
if (waitForMessage) {
fWaitingToReceive.NotifyAll();
} else {
fWaitingToSend.NotifyOne();
}
}
XsiMessageQueue*& Link()
{
return fLink;
}
private:
msglen_t fBytesInQueue;
int fID;
mutex fLock;
MessageQueue fMessage;
struct msqid_ds fMessageQueue;
uint32 fSequenceNumber;
ConditionVariable fWaitingToReceive;
ConditionVariable fWaitingToSend;
XsiMessageQueue* fLink;
};
struct MessageQueueHashTableDefinition {
typedef int KeyType;
typedef XsiMessageQueue ValueType;
size_t HashKey (const int key) const
{
return (size_t)key;
}
size_t Hash(XsiMessageQueue *variable) const
{
return (size_t)variable->ID();
}
bool Compare(const int key, XsiMessageQueue *variable) const
{
return (int)key == (int)variable->ID();
}
XsiMessageQueue*& GetLink(XsiMessageQueue *variable) const
{
return variable->Link();
}
};
class Ipc {
public:
Ipc(key_t key)
: fKey(key),
fMessageQueueId(-1)
{
}
key_t Key() const
{
return fKey;
}
int MessageQueueID() const
{
return fMessageQueueId;
}
void SetMessageQueueID(XsiMessageQueue *messageQueue)
{
fMessageQueueId = messageQueue->ID();
}
Ipc*& Link()
{
return fLink;
}
private:
key_t fKey;
int fMessageQueueId;
Ipc* fLink;
};
struct IpcHashTableDefinition {
typedef key_t KeyType;
typedef Ipc ValueType;
size_t HashKey (const key_t key) const
{
return (size_t)(key);
}
size_t Hash(Ipc *variable) const
{
return (size_t)HashKey(variable->Key());
}
bool Compare(const key_t key, Ipc *variable) const
{
return (key_t)key == (key_t)variable->Key();
}
Ipc*& GetLink(Ipc *variable) const
{
return variable->Link();
}
};
}
#define MAX_XSI_MESSAGE 4096
#define MAX_XSI_MESSAGE_QUEUE 1024
static BOpenHashTable<IpcHashTableDefinition> sIpcHashTable;
static BOpenHashTable<MessageQueueHashTableDefinition> sMessageQueueHashTable;
static mutex sIpcLock;
static mutex sXsiMessageQueueLock;
static uint32 sGlobalSequenceNumber = 1;
static int32 sXsiMessageCount = 0;
static int32 sXsiMessageQueueCount = 0;
XsiMessageQueue::~XsiMessageQueue()
{
mutex_destroy(&fLock);
fWaitingToReceive.NotifyAll(EIDRM);
fWaitingToSend.NotifyAll(EIDRM);
if (fMessageQueue.msg_qnum) {
while (queued_message *message = fMessage.RemoveHead()) {
atomic_add(&sXsiMessageCount, -1);
delete message;
}
}
}
bool
XsiMessageQueue::Insert(queued_message *message)
{
if (fBytesInQueue + message->length > fMessageQueue.msg_qbytes)
return true;
while (true) {
int32 oldCount = atomic_get(&sXsiMessageCount);
if (oldCount >= MAX_XSI_MESSAGE)
return true;
if (atomic_test_and_set(&sXsiMessageCount, oldCount + 1, oldCount)
== oldCount)
break;
}
fMessage.Add(message);
fMessageQueue.msg_qnum++;
fMessageQueue.msg_lspid = getpid();
fMessageQueue.msg_stime = real_time_clock();
fBytesInQueue += message->length;
WakeUpThread(true );
return false;
}
queued_message*
XsiMessageQueue::Remove(long typeRequested)
{
queued_message *message = NULL;
if (typeRequested < 0) {
MessageQueue::Iterator iterator = fMessage.GetIterator();
while (iterator.HasNext()) {
queued_message *current = iterator.Next();
if (current->type <= -typeRequested) {
message = iterator.Remove();
break;
}
}
} else if (typeRequested == 0) {
message = fMessage.RemoveHead();
} else {
MessageQueue::Iterator iterator = fMessage.GetIterator();
while (iterator.HasNext()) {
queued_message *current = iterator.Next();
if (current->type == typeRequested) {
message = iterator.Remove();
break;
}
}
}
if (message == NULL)
return NULL;
fMessageQueue.msg_qnum--;
fMessageQueue.msg_lrpid = getpid();
fMessageQueue.msg_rtime = real_time_clock();
fBytesInQueue -= message->length;
atomic_add(&sXsiMessageCount, -1);
WakeUpThread(false );
return message;
}
void
XsiMessageQueue::SetID()
{
fID = real_time_clock();
while (true) {
if (sMessageQueueHashTable.Lookup(fID) == NULL)
break;
fID++;
}
sGlobalSequenceNumber = (sGlobalSequenceNumber + 1) % UINT_MAX;
fSequenceNumber = sGlobalSequenceNumber;
}
void
xsi_msg_init()
{
status_t status = sIpcHashTable.Init();
if (status != B_OK)
panic("xsi_msg_init() failed to initialize ipc hash table\n");
status = sMessageQueueHashTable.Init();
if (status != B_OK)
panic("xsi_msg_init() failed to initialize message queue hash table\n");
mutex_init(&sIpcLock, "global POSIX message queue IPC table");
mutex_init(&sXsiMessageQueueLock, "global POSIX xsi message queue table");
}
int
_user_xsi_msgctl(int messageQueueID, int command, struct msqid_ds *buffer)
{
TRACE(("xsi_msgctl: messageQueueID = %d, command = %d\n", messageQueueID, command));
MutexLocker ipcHashLocker(sIpcLock);
MutexLocker messageQueueHashLocker(sXsiMessageQueueLock);
XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
if (messageQueue == NULL) {
TRACE(("xsi_msgctl: message queue id %d not valid\n", messageQueueID));
return EINVAL;
}
if (buffer != NULL && !IS_USER_ADDRESS(buffer)) {
TRACE(("xsi_msgctl: buffer address is not valid\n"));
return B_BAD_ADDRESS;
}
MutexLocker messageQueueLocker;
if (command != IPC_RMID) {
messageQueueLocker.SetTo(&messageQueue->Lock(), false);
messageQueueHashLocker.Unlock();
ipcHashLocker.Unlock();
} else
mutex_lock(&messageQueue->Lock());
switch (command) {
case IPC_STAT: {
if (!messageQueue->HasReadPermission()) {
TRACE(("xsi_msgctl: calling process has not read "
"permission on message queue %d, key %d\n", messageQueueID,
(int)messageQueue->IpcKey()));
return EACCES;
}
struct msqid_ds msg = messageQueue->GetMessageQueue();
if (user_memcpy(buffer, &msg, sizeof(struct msqid_ds)) < B_OK) {
TRACE_ERROR(("xsi_msgctl: user_memcpy failed\n"));
return B_BAD_ADDRESS;
}
break;
}
case IPC_SET: {
if (!messageQueue->HasPermission()) {
TRACE(("xsi_msgctl: calling process has not permission "
"on message queue %d, key %d\n", messageQueueID,
(int)messageQueue->IpcKey()));
return EPERM;
}
struct msqid_ds msg;
if (user_memcpy(&msg, buffer, sizeof(struct msqid_ds)) < B_OK) {
TRACE_ERROR(("xsi_msgctl: user_memcpy failed\n"));
return B_BAD_ADDRESS;
}
if (msg.msg_qbytes > messageQueue->MaxBytes() && getuid() != 0) {
TRACE(("xsi_msgctl: user does not have permission to "
"increase the maximum number of bytes allowed on queue\n"));
return EPERM;
}
if (msg.msg_qbytes == 0) {
TRACE(("xsi_msgctl: can't set msg_qbytes to 0!\n"));
return EINVAL;
}
messageQueue->DoIpcSet(&msg);
break;
}
case IPC_RMID: {
if (!messageQueue->HasPermission()) {
TRACE(("xsi_msgctl: calling process has not permission "
"on message queue %d, key %d\n", messageQueueID,
(int)messageQueue->IpcKey()));
return EPERM;
}
key_t key = messageQueue->IpcKey();
Ipc *ipcKey = NULL;
if (key != -1) {
ipcKey = sIpcHashTable.Lookup(key);
sIpcHashTable.Remove(ipcKey);
}
sMessageQueueHashTable.Remove(messageQueue);
if (key != -1)
delete ipcKey;
atomic_add(&sXsiMessageQueueCount, -1);
delete messageQueue;
break;
}
default:
TRACE_ERROR(("xsi_semctl: command %d not valid\n", command));
return EINVAL;
}
return B_OK;
}
int
_user_xsi_msgget(key_t key, int flags)
{
TRACE(("xsi_msgget: key = %d, flags = %d\n", (int)key, flags));
XsiMessageQueue *messageQueue = NULL;
Ipc *ipcKey = NULL;
bool isPrivate = true;
bool create = true;
if (key != IPC_PRIVATE) {
isPrivate = false;
ipcKey = sIpcHashTable.Lookup(key);
if (ipcKey == NULL || ipcKey->MessageQueueID() == -1) {
if (!(flags & IPC_CREAT)) {
TRACE(("xsi_msgget: key %d does not exist, but the "
"caller did not ask for creation\n", (int)key));
return ENOENT;
}
if (ipcKey == NULL) {
ipcKey = new(std::nothrow) Ipc(key);
if (ipcKey == NULL) {
TRACE(("xsi_msgget: failed to create new Ipc object "
"for key %d\n", (int)key));
return ENOMEM;
}
sIpcHashTable.Insert(ipcKey);
}
} else {
if ((flags & IPC_CREAT) && (flags & IPC_EXCL)) {
TRACE_ERROR(("xsi_msgget: key %d already exist\n", (int)key));
return EEXIST;
}
int messageQueueID = ipcKey->MessageQueueID();
MutexLocker _(sXsiMessageQueueLock);
messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
if (!messageQueue->HasPermission()) {
TRACE(("xsi_msgget: calling process has not permission "
"on message queue %d, key %d\n", messageQueue->ID(),
(int)key));
return EACCES;
}
create = false;
}
}
if (create) {
if (atomic_get(&sXsiMessageQueueCount) >= MAX_XSI_MESSAGE_QUEUE) {
TRACE_ERROR(("xsi_msgget: reached limit of maximun number of "
"message queues\n"));
return ENOSPC;
}
messageQueue = new(std::nothrow) XsiMessageQueue(flags);
if (messageQueue == NULL) {
TRACE_ERROR(("xsi_msgget: failed to allocate new xsi "
"message queue\n"));
return ENOMEM;
}
atomic_add(&sXsiMessageQueueCount, 1);
MutexLocker _(sXsiMessageQueueLock);
messageQueue->SetID();
if (isPrivate)
messageQueue->SetIpcKey((key_t)-1);
else {
messageQueue->SetIpcKey(key);
ipcKey->SetMessageQueueID(messageQueue);
}
sMessageQueueHashTable.Insert(messageQueue);
}
return messageQueue->ID();
}
ssize_t
_user_xsi_msgrcv(int messageQueueID, void *messagePointer,
size_t messageSize, long messageType, int messageFlags)
{
TRACE(("xsi_msgrcv: messageQueueID = %d, messageSize = %ld\n",
messageQueueID, messageSize));
MutexLocker messageQueueHashLocker(sXsiMessageQueueLock);
XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
if (messageQueue == NULL) {
TRACE(("xsi_msgrcv: message queue id %d not valid\n",
messageQueueID));
return EINVAL;
}
MutexLocker messageQueueLocker(messageQueue->Lock());
messageQueueHashLocker.Unlock();
if (messageSize > MAX_BYTES_PER_QUEUE) {
TRACE_ERROR(("xsi_msgrcv: message size is out of range\n"));
return EINVAL;
}
if (!messageQueue->HasPermission()) {
TRACE(("xsi_msgrcv: calling process has not permission "
"on message queue id %d, key %d\n", messageQueueID,
(int)messageQueue->IpcKey()));
return EACCES;
}
if (!IS_USER_ADDRESS(messagePointer)) {
TRACE(("xsi_msgrcv: message address is not valid\n"));
return B_BAD_ADDRESS;
}
queued_message *message = NULL;
while (true) {
message = messageQueue->Remove(messageType);
if (message == NULL && !(messageFlags & IPC_NOWAIT)) {
ConditionVariableEntry queueEntry;
messageQueue->Enqueue(&queueEntry, true);
uint32 sequenceNumber = messageQueue->SequenceNumber();
TRACE(("xsi_msgrcv: thread %d going to sleep\n", (int)thread_get_current_thread_id()));
status_t result
= messageQueue->BlockAndUnlock(&queueEntry, &messageQueueLocker);
TRACE(("xsi_msgrcv: thread %d back to life\n", (int)thread_get_current_thread_id()));
messageQueueHashLocker.Lock();
messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
if (result == EIDRM || messageQueue == NULL || (messageQueue != NULL
&& sequenceNumber != messageQueue->SequenceNumber())) {
TRACE(("xsi_msgrcv: message queue id %d (sequence = "
"%" B_PRIu32 ") got destroyed\n", messageQueueID,
sequenceNumber));
return EIDRM;
} else if (result == B_INTERRUPTED) {
TRACE(("xsi_msgrcv: thread %d got interrupted while "
"waiting on message queue %d\n", (int)thread_get_current_thread_id(),
messageQueueID));
messageQueue->Dequeue(&queueEntry);
return EINTR;
} else {
messageQueueLocker.Lock();
messageQueueHashLocker.Unlock();
}
} else if (message == NULL) {
return ENOMSG;
} else {
if ((ssize_t)messageSize < message->length
&& !(messageFlags & MSG_NOERROR)) {
TRACE_ERROR(("xsi_msgrcv: message too big!\n"));
messageQueue->Insert(message);
return E2BIG;
}
ssize_t result
= message->copy_to_user_buffer(messagePointer, messageSize);
if (result < 0) {
messageQueue->Insert(message);
return B_BAD_ADDRESS;
}
delete message;
TRACE(("xsi_msgrcv: message received correctly\n"));
return result;
}
}
return B_OK;
}
int
_user_xsi_msgsnd(int messageQueueID, const void *messagePointer,
size_t messageSize, int messageFlags)
{
TRACE(("xsi_msgsnd: messageQueueID = %d, messageSize = %ld\n",
messageQueueID, messageSize));
MutexLocker messageQueueHashLocker(sXsiMessageQueueLock);
XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
if (messageQueue == NULL) {
TRACE(("xsi_msgsnd: message queue id %d not valid\n",
messageQueueID));
return EINVAL;
}
MutexLocker messageQueueLocker(messageQueue->Lock());
messageQueueHashLocker.Unlock();
if (messageSize > MAX_BYTES_PER_QUEUE) {
TRACE_ERROR(("xsi_msgsnd: message size is out of range\n"));
return EINVAL;
}
if (!messageQueue->HasPermission()) {
TRACE(("xsi_msgsnd: calling process has not permission "
"on message queue id %d, key %d\n", messageQueueID,
(int)messageQueue->IpcKey()));
return EACCES;
}
if (!IS_USER_ADDRESS(messagePointer)) {
TRACE(("xsi_msgsnd: message address is not valid\n"));
return B_BAD_ADDRESS;
}
queued_message *message
= new(std::nothrow) queued_message(messagePointer, messageSize);
if (message == NULL || message->initOK != true) {
TRACE_ERROR(("xsi_msgsnd: failed to create new message to queue\n"));
delete message;
return ENOMEM;
}
bool notSent = true;
status_t result = B_OK;
while (notSent) {
bool goToSleep = messageQueue->Insert(message);
if (goToSleep && !(messageFlags & IPC_NOWAIT)) {
ConditionVariableEntry queueEntry;
messageQueue->Enqueue(&queueEntry, false);
uint32 sequenceNumber = messageQueue->SequenceNumber();
TRACE(("xsi_msgsnd: thread %d going to sleep\n", (int)thread_get_current_thread_id()));
result = messageQueue->BlockAndUnlock(&queueEntry, &messageQueueLocker);
TRACE(("xsi_msgsnd: thread %d back to life\n", (int)thread_get_current_thread_id()));
messageQueueHashLocker.Lock();
messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
if (result == EIDRM || messageQueue == NULL || (messageQueue != NULL
&& sequenceNumber != messageQueue->SequenceNumber())) {
TRACE(("xsi_msgsnd: message queue id %d (sequence = "
"%" B_PRIu32 ") got destroyed\n", messageQueueID,
sequenceNumber));
delete message;
notSent = false;
result = EIDRM;
} else if (result == B_INTERRUPTED) {
TRACE(("xsi_msgsnd: thread %d got interrupted while "
"waiting on message queue %d\n", (int)thread_get_current_thread_id(),
messageQueueID));
messageQueue->Dequeue(&queueEntry);
delete message;
notSent = false;
result = EINTR;
} else {
messageQueueLocker.Lock();
messageQueueHashLocker.Unlock();
}
} else if (goToSleep) {
delete message;
notSent = false;
result = EAGAIN;
} else {
TRACE(("xsi_msgsnd: message sent correctly\n"));
notSent = false;
}
}
return result;
}