* Copyright 2005, Ingo Weinhold, bonefish@users.sf.net. All rights reserved.
* Distributed under the terms of the MIT License.
*/
#include <map>
#include <new>
#include <string.h>
#include <Autolock.h>
#include <syscalls.h>
#include "Debug.h"
#include "MessageDeliverer.h"
#include "MessagingService.h"
using std::map;
using std::nothrow;
MessagingService *MessagingService::sService = NULL;
\brief Represents an area of the messaging service shared between kernel
and registrar.
The main purpose of the class is to retrieve (and remove) commands from
the area.
*/
MessagingArea::MessagingArea()
{
}
MessagingArea::~MessagingArea()
{
if (fID >= 0)
delete_area(fID);
}
status_t
MessagingArea::Create(area_id kernelAreaID, sem_id lockSem, sem_id counterSem,
MessagingArea *&_area)
{
MessagingArea *area = new(nothrow) MessagingArea;
if (!area)
return B_NO_MEMORY;
area_id areaID = clone_area("messaging", (void**)&area->fHeader,
B_ANY_ADDRESS, B_READ_AREA | B_WRITE_AREA, kernelAreaID);
if (areaID < 0) {
delete area;
return areaID;
}
area->fID = areaID;
area->fSize = area->fHeader->size;
area->fLockSem = lockSem;
area->fCounterSem = counterSem;
area->fNextArea = NULL;
_area = area;
return B_OK;
}
bool
MessagingArea::Lock()
{
if (atomic_add(&fHeader->lock_counter, 1) == 0)
return true;
return (acquire_sem(fLockSem) == B_OK);
}
void
MessagingArea::Unlock()
{
if (atomic_add(&fHeader->lock_counter, -1) > 1)
release_sem(fLockSem);
}
area_id
MessagingArea::ID() const
{
return fID;
}
int32
MessagingArea::Size() const
{
return fSize;
}
int32
MessagingArea::CountCommands() const
{
return fHeader->command_count;
}
const messaging_command *
MessagingArea::PopCommand()
{
if (fHeader->command_count == 0)
return NULL;
messaging_command *command
= (messaging_command*)((char*)fHeader + fHeader->first_command);
if (--fHeader->command_count == 0)
fHeader->first_command = fHeader->last_command = 0;
else
fHeader->first_command = command->next_command;
return command;
}
void
MessagingArea::Discard()
{
fHeader->size = 0;
}
area_id
MessagingArea::NextKernelAreaID() const
{
return fHeader->next_kernel_area;
}
void
MessagingArea::SetNextArea(MessagingArea *area)
{
fNextArea = area;
}
MessagingArea *
MessagingArea::NextArea() const
{
return fNextArea;
}
MessagingCommandHandler::MessagingCommandHandler()
{
}
MessagingCommandHandler::~MessagingCommandHandler()
{
}
class MessagingService::DefaultSendCommandHandler
: public MessagingCommandHandler {
virtual void HandleMessagingCommand(uint32 _command, const void *data,
int32 dataSize)
{
const messaging_command_send_message *sendData
= (const messaging_command_send_message*)data;
const void *messageData = (uint8*)data
+ sizeof(messaging_command_send_message)
+ sizeof(messaging_target) * sendData->target_count;
DefaultMessagingTargetSet set(sendData->targets,
sendData->target_count);
MessageDeliverer::Default()->DeliverMessage(messageData,
sendData->message_size, set);
}
};
struct MessagingService::CommandHandlerMap
: map<uint32, MessagingCommandHandler*> {
};
\brief Userland implementation of the kernel -> userland messaging service.
This service provides a way for the kernel to send BMessages (usually
notification (e.g. node monitoring) messages) to userland applications.
The kernel could write the messages directly to the respective target ports,
but this has the disadvantage, that a message needs to be dropped, if the
port is full at the moment of sending. By transferring the message to the
registrar, it is possible to use the MessageDeliverer which retries sending
messages on full ports.
The message transfer is implemented via areas shared between kernel
and registrar. By default one area is used as a ring buffer. The kernel
adds messages to it, the registrar removes them. If the area is full, the
kernel creates a new one and adds it to the area list.
While the service is called `messaging service' and we were speaking of
`messages' being passed through the areas, the service is actually more
general. In fact `commands' are passed through the areas. Currently the
only implemented command type is to send a message, but it is very easy
to add further command types (e.g. one for alerting the user in case of
errors).
The MessagingService maintains a mapping of command types to command
handlers (MessagingCommandHandler, which perform the actual processing
of the commands), that can be altered via
MessagingService::SetCommandHandler().
*/
MessagingService::MessagingService()
: fLock("messaging service"),
fLockSem(-1),
fCounterSem(-1),
fFirstArea(NULL),
fCommandHandlers(NULL),
fCommandProcessor(-1),
fTerminating(false)
{
}
MessagingService::~MessagingService()
{
fTerminating = true;
if (fLockSem >= 0)
delete_sem(fLockSem);
if (fCounterSem >= 0)
delete_sem(fCounterSem);
if (fCommandProcessor >= 0) {
int32 result;
wait_for_thread(fCommandProcessor, &result);
}
delete fCommandHandlers;
delete fFirstArea;
}
status_t
MessagingService::Init()
{
fLockSem = create_sem(0, "messaging lock");
if (fLockSem < 0)
return fLockSem;
fCounterSem = create_sem(0, "messaging counter");
if (fCounterSem < 0)
return fCounterSem;
fCommandHandlers = new(nothrow) CommandHandlerMap;
if (!fCommandHandlers)
return B_NO_MEMORY;
fCommandProcessor = spawn_thread(MessagingService::_CommandProcessorEntry,
"messaging command processor", B_DISPLAY_PRIORITY, this);
if (fCommandProcessor < 0)
return fCommandProcessor;
area_id areaID = _kern_register_messaging_service(fLockSem, fCounterSem);
if (areaID < 0)
return areaID;
status_t error = MessagingArea::Create(areaID, fLockSem, fCounterSem,
fFirstArea);
if (error != B_OK) {
_kern_unregister_messaging_service();
return error;
}
resume_thread(fCommandProcessor);
MessagingCommandHandler *handler = new(nothrow) DefaultSendCommandHandler;
if (!handler)
return B_NO_MEMORY;
SetCommandHandler(MESSAGING_COMMAND_SEND_MESSAGE, handler);
return B_OK;
}
status_t
MessagingService::CreateDefault()
{
if (sService)
return B_OK;
MessagingService *service = new(nothrow) MessagingService;
if (!service)
return B_NO_MEMORY;
status_t error = service->Init();
if (error != B_OK) {
delete service;
return error;
}
sService = service;
return B_OK;
}
void
MessagingService::DeleteDefault()
{
if (sService) {
delete sService;
sService = NULL;
}
}
MessagingService *
MessagingService::Default()
{
return sService;
}
void
MessagingService::SetCommandHandler(uint32 command,
MessagingCommandHandler *handler)
{
BAutolock _(fLock);
if (handler) {
(*fCommandHandlers)[command] = handler;
} else {
CommandHandlerMap::iterator it = fCommandHandlers->find(command);
if (it != fCommandHandlers->end())
fCommandHandlers->erase(it);
}
}
MessagingCommandHandler *
MessagingService::_GetCommandHandler(uint32 command) const
{
BAutolock _(fLock);
CommandHandlerMap::iterator it = fCommandHandlers->find(command);
return (it != fCommandHandlers->end() ? it->second : NULL);
}
int32
MessagingService::_CommandProcessorEntry(void *data)
{
return ((MessagingService*)data)->_CommandProcessor();
}
int32
MessagingService::_CommandProcessor()
{
bool commandWaiting = false;
while (!fTerminating) {
if (!commandWaiting) {
status_t error = acquire_sem(fCounterSem);
if (error != B_OK)
continue;
} else
commandWaiting = false;
MessagingArea *area = fFirstArea;
area->Lock();
while (area->CountCommands() > 0) {
const messaging_command *command = area->PopCommand();
if (!command) {
ERROR("MessagingService::_CommandProcessor(): area %p (%"
B_PRId32 ") has command count %" B_PRId32 ", but doesn't "
"return any more commands.", area, area->ID(),
area->CountCommands());
break;
}
PRINT("MessagingService::_CommandProcessor(): got command %" B_PRIu32 "\n",
command->command);
MessagingCommandHandler *handler
= _GetCommandHandler(command->command);
if (handler) {
handler->HandleMessagingCommand(command->command, command->data,
command->size - sizeof(messaging_command));
} else {
WARNING("MessagingService::_CommandProcessor(): No handler "
"found for command %" B_PRIu32 "\n", command->command);
}
}
if (!area->NextArea() && area->NextKernelAreaID() >= 0) {
MessagingArea *nextArea;
status_t error = MessagingArea::Create(area->NextKernelAreaID(),
fLockSem, fCounterSem, nextArea);
if (error == B_OK) {
area->SetNextArea(nextArea);
commandWaiting = true;
} else {
ERROR("MessagingService::_CommandProcessor(): Failed to clone "
"kernel area %" B_PRId32 ": %s\n", area->NextKernelAreaID(),
strerror(error));
}
}
if (area->NextArea() && area->CountCommands() == 0) {
fFirstArea = area->NextArea();
area->Discard();
area->Unlock();
delete area;
} else {
area->Unlock();
}
}
return 0;
}