* Copyright 2005, Ingo Weinhold, bonefish@users.sf.net. All rights reserved.
* Distributed under the terms of the MIT License.
*/
#include <map>
#include <new>
#include <set>
#include <string.h>
#include <AutoDeleter.h>
#include <Autolock.h>
#include <MessagePrivate.h>
#include <MessengerPrivate.h>
#include <OS.h>
#include <StackOrHeapArray.h>
#include <TokenSpace.h>
#include <util/DoublyLinkedList.h>
#include <messaging.h>
#include "Debug.h"
#include "MessageDeliverer.h"
#include "Referenceable.h"
using std::map;
using std::nothrow;
using std::set;
MessageDeliverer *MessageDeliverer::sDeliverer = NULL;
static const bigtime_t kRetryDelay = 100000;
static const int32 kMaxMessagesPerPort = 10000;
static const int32 kMaxDataPerPort = 50 * 1024 * 1024;
MessagingTargetSet::~MessagingTargetSet()
{
}
DefaultMessagingTargetSet::DefaultMessagingTargetSet(
const messaging_target *targets, int32 targetCount)
: MessagingTargetSet(),
fTargets(targets),
fTargetCount(targetCount),
fNextIndex(0)
{
}
DefaultMessagingTargetSet::~DefaultMessagingTargetSet()
{
}
bool
DefaultMessagingTargetSet::HasNext() const
{
return (fNextIndex < fTargetCount);
}
bool
DefaultMessagingTargetSet::Next(port_id &port, int32 &token)
{
if (fNextIndex >= fTargetCount)
return false;
port = fTargets[fNextIndex].port;
token = fTargets[fNextIndex].token;
fNextIndex++;
return true;
}
void
DefaultMessagingTargetSet::Rewind()
{
fNextIndex = 0;
}
SingleMessagingTargetSet::SingleMessagingTargetSet(BMessenger target)
: MessagingTargetSet(),
fAtBeginning(true)
{
BMessenger::Private messengerPrivate(target);
fPort = messengerPrivate.Port();
fToken = (messengerPrivate.IsPreferredTarget()
? B_PREFERRED_TOKEN : messengerPrivate.Token());
}
SingleMessagingTargetSet::SingleMessagingTargetSet(port_id port, int32 token)
: MessagingTargetSet(),
fPort(port),
fToken(token),
fAtBeginning(true)
{
}
SingleMessagingTargetSet::~SingleMessagingTargetSet()
{
}
bool
SingleMessagingTargetSet::HasNext() const
{
return fAtBeginning;
}
bool
SingleMessagingTargetSet::Next(port_id &port, int32 &token)
{
if (!fAtBeginning)
return false;
port = fPort;
token = fToken;
fAtBeginning = false;
return true;
}
void
SingleMessagingTargetSet::Rewind()
{
fAtBeginning = true;
}
Besides the flattened message it also stores the when the message was
created and when the delivery attempts shall time out.
*/
class MessageDeliverer::Message : public BReferenceable {
public:
Message(void *data, int32 dataSize, bigtime_t timeout)
: BReferenceable(),
fData(data),
fDataSize(dataSize),
fCreationTime(system_time()),
fBusy(false)
{
if (B_INFINITE_TIMEOUT - fCreationTime <= timeout)
fTimeoutTime = B_INFINITE_TIMEOUT;
else if (timeout <= 0)
fTimeoutTime = fCreationTime;
else
fTimeoutTime = fCreationTime + timeout;
}
~Message()
{
free(fData);
}
void *Data() const
{
return fData;
}
int32 DataSize() const
{
return fDataSize;
}
bigtime_t CreationTime() const
{
return fCreationTime;
}
bigtime_t TimeoutTime() const
{
return fTimeoutTime;
}
bool HasTimeout() const
{
return (fTimeoutTime < B_INFINITE_TIMEOUT);
}
void SetBusy(bool busy)
{
fBusy = busy;
}
bool IsBusy() const
{
return fBusy;
}
private:
void *fData;
int32 fDataSize;
bigtime_t fCreationTime;
bigtime_t fTimeoutTime;
bool fBusy;
};
A TargetMessage is always associated with (i.e. queued in) a TargetPort.
While a Message stores only the message data and some timing info, this
object adds the token of a the target BHandler.
A Message can be referred to by more than one TargetMessage (when
broadcasting), but a TargetMessage is referred to exactly once, by
the TargetPort.
*/
class MessageDeliverer::TargetMessage
: public DoublyLinkedListLinkImpl<MessageDeliverer::TargetMessage> {
public:
TargetMessage(Message *message, int32 token)
: fMessage(message),
fToken(token)
{
if (fMessage)
fMessage->AcquireReference();
}
~TargetMessage()
{
if (fMessage)
fMessage->ReleaseReference();
}
Message *GetMessage() const
{
return fMessage;
}
int32 Token() const
{
return fToken;
}
private:
Message *fMessage;
int32 fToken;
};
This class only exists to provide the comparison operators required to
put a TargetMessage into a set. The order implemented is by ascending by
timeout time (primary) and by TargetMessage pointer (secondary).
Hence TargetMessageHandles referring to the same TargetMessage are equal
(and only those).
*/
class MessageDeliverer::TargetMessageHandle {
public:
TargetMessageHandle(TargetMessage *message)
: fMessage(message)
{
}
TargetMessageHandle(const TargetMessageHandle &other)
: fMessage(other.fMessage)
{
}
TargetMessage *GetMessage() const
{
return fMessage;
}
TargetMessageHandle &operator=(const TargetMessageHandle &other)
{
fMessage = other.fMessage;
return *this;
}
bool operator==(const TargetMessageHandle &other) const
{
return (fMessage == other.fMessage);
}
bool operator!=(const TargetMessageHandle &other) const
{
return (fMessage != other.fMessage);
}
bool operator<(const TargetMessageHandle &other) const
{
bigtime_t timeout = fMessage->GetMessage()->TimeoutTime();
bigtime_t otherTimeout = other.fMessage->GetMessage()->TimeoutTime();
if (timeout < otherTimeout)
return true;
if (timeout > otherTimeout)
return false;
return (fMessage < other.fMessage);
}
private:
TargetMessage *fMessage;
};
messages.
A TargetPort internally queues TargetMessages in the order the are to be
delivered. Furthermore the object maintains an ordered set of
TargetMessages that can timeout (in ascending order of timeout time), so
that timed out messages can be dropped easily.
*/
class MessageDeliverer::TargetPort {
public:
TargetPort(port_id portID)
: fPortID(portID),
fMessages(),
fMessageCount(0),
fMessageSize(0)
{
}
~TargetPort()
{
while (!fMessages.IsEmpty())
PopMessage();
}
port_id PortID() const
{
return fPortID;
}
status_t PushMessage(Message *message, int32 token)
{
PRINT("MessageDeliverer::TargetPort::PushMessage(port: %" B_PRId32 ", %p, %"
B_PRId32 ")\n", fPortID, message, token);
TargetMessage *targetMessage
= new(nothrow) TargetMessage(message, token);
if (!targetMessage)
return B_NO_MEMORY;
fMessages.Insert(targetMessage);
fMessageCount++;
fMessageSize += targetMessage->GetMessage()->DataSize();
if (message->HasTimeout())
fTimeoutableMessages.insert(targetMessage);
_EnforceLimits();
return B_OK;
}
Message *PeekMessage(int32 &token) const
{
if (!fMessages.Head())
return NULL;
token = fMessages.Head()->Token();
return fMessages.Head()->GetMessage();
}
void PopMessage()
{
if (fMessages.Head()) {
PRINT("MessageDeliverer::TargetPort::PopMessage(): port: %" B_PRId32 ", %p\n",
fPortID, fMessages.Head()->GetMessage());
_RemoveMessage(fMessages.Head());
}
}
void DropTimedOutMessages()
{
bigtime_t now = system_time();
while (fTimeoutableMessages.begin() != fTimeoutableMessages.end()) {
TargetMessage *message = fTimeoutableMessages.begin()->GetMessage();
if (message->GetMessage()->TimeoutTime() > now)
break;
PRINT("MessageDeliverer::TargetPort::DropTimedOutMessages(): port: %" B_PRId32
": message %p timed out\n", fPortID, message->GetMessage());
_RemoveMessage(message);
}
}
bool IsEmpty() const
{
return fMessages.IsEmpty();
}
private:
void _RemoveMessage(TargetMessage *message)
{
fMessages.Remove(message);
fMessageCount--;
fMessageSize -= message->GetMessage()->DataSize();
if (message->GetMessage()->HasTimeout())
fTimeoutableMessages.erase(message);
delete message;
}
void _EnforceLimits()
{
while (fMessageCount > kMaxMessagesPerPort) {
PRINT("MessageDeliverer::TargetPort::_EnforceLimits(): port: %" B_PRId32
": hit maximum message count limit.\n", fPortID);
PopMessage();
}
while (fMessageSize > kMaxDataPerPort) {
PRINT("MessageDeliverer::TargetPort::_EnforceLimits(): port: %" B_PRId32
": hit maximum message size limit.\n", fPortID);
PopMessage();
}
}
typedef DoublyLinkedList<TargetMessage> MessageList;
port_id fPortID;
MessageList fMessages;
int32 fMessageCount;
int32 fMessageSize;
set<TargetMessageHandle> fTimeoutableMessages;
};
struct MessageDeliverer::TargetPortMap : public map<port_id, TargetPort*> {
};
\brief Service for delivering messages, which retries the delivery as long
as the target port is full.
For the user of the service only the MessageDeliverer::DeliverMessage()
will be of interest. Some of them allow broadcasting a message to several
recepients.
The class maintains a TargetPort for each target port which was full at the
time a message was to be delivered to it. A TargetPort has a queue of
undelivered messages. A separate worker thread retries periodically to send
the yet undelivered messages to the respective target ports.
*/
MessageDeliverer::MessageDeliverer()
: fLock("message deliverer"),
fTargetPorts(NULL),
fDelivererThread(-1),
fTerminating(false)
{
}
MessageDeliverer::~MessageDeliverer()
{
fTerminating = true;
if (fDelivererThread >= 0) {
int32 result;
wait_for_thread(fDelivererThread, &result);
}
delete fTargetPorts;
}
status_t
MessageDeliverer::Init()
{
fTargetPorts = new(nothrow) TargetPortMap;
if (!fTargetPorts)
return B_NO_MEMORY;
fDelivererThread = spawn_thread(MessageDeliverer::_DelivererThreadEntry,
"message deliverer", B_NORMAL_PRIORITY + 1, this);
if (fDelivererThread < 0)
return fDelivererThread;
resume_thread(fDelivererThread);
return B_OK;
}
status_t
MessageDeliverer::CreateDefault()
{
if (sDeliverer)
return B_OK;
MessageDeliverer *deliverer = new(nothrow) MessageDeliverer;
if (!deliverer)
return B_NO_MEMORY;
status_t error = deliverer->Init();
if (error != B_OK) {
delete deliverer;
return error;
}
sDeliverer = deliverer;
return B_OK;
}
void
MessageDeliverer::DeleteDefault()
{
if (sDeliverer) {
delete sDeliverer;
sDeliverer = NULL;
}
}
MessageDeliverer *
MessageDeliverer::Default()
{
return sDeliverer;
}
The method tries to send the message right now (if there are not already
messages pending for the target port). If that fails due to a full target
port, the message is queued for later delivery.
\param message The message to be delivered.
\param target A BMessenger identifying the delivery target.
\param timeout If given, the message will be dropped, when it couldn't be
delivered after this amount of microseconds.
\return
- \c B_OK, if sending the message succeeded or if the target port was
full and the message has been queued,
- another error code otherwise.
*/
status_t
MessageDeliverer::DeliverMessage(BMessage *message, BMessenger target,
bigtime_t timeout)
{
SingleMessagingTargetSet set(target);
return DeliverMessage(message, set, timeout);
}
The method tries to send the message right now to each of the given targets
(if there are not already messages pending for a target port). If that
fails due to a full target port, the message is queued for later delivery.
\param message The message to be delivered.
\param targets MessagingTargetSet providing the the delivery targets.
\param timeout If given, the message will be dropped, when it couldn't be
delivered after this amount of microseconds.
\return
- \c B_OK, if for each of the given targets sending the message succeeded
or if the target port was full and the message has been queued,
- another error code otherwise.
*/
status_t
MessageDeliverer::DeliverMessage(BMessage *message, MessagingTargetSet &targets,
bigtime_t timeout)
{
if (message == NULL)
return B_BAD_VALUE;
ssize_t size = message->FlattenedSize();
BStackOrHeapArray<char, 4096> buffer(size);
if (!buffer.IsValid())
return B_NO_MEMORY;
status_t error = message->Flatten(buffer, size);
if (error < B_OK)
return error;
return DeliverMessage(buffer, size, targets, timeout);
}
The method tries to send the message right now to each of the given targets
(if there are not already messages pending for a target port). If that
fails due to a full target port, the message is queued for later delivery.
\param message The flattened message to be delivered. This may be a
flattened BMessage or KMessage.
\param messageSize The size of the flattened message buffer.
\param targets MessagingTargetSet providing the the delivery targets.
\param timeout If given, the message will be dropped, when it couldn't be
delivered after this amount of microseconds.
\return
- \c B_OK, if for each of the given targets sending the message succeeded
or if the target port was full and the message has been queued,
- another error code otherwise.
*/
status_t
MessageDeliverer::DeliverMessage(const void *messageData, int32 messageSize,
MessagingTargetSet &targets, bigtime_t timeout)
{
if (!messageData || messageSize <= 0)
return B_BAD_VALUE;
BReference<Message> messageRef;
BAutolock locker(fLock);
for (int32 targetIndex = 0; targets.HasNext(); targetIndex++) {
port_id portID;
int32 token;
targets.Next(portID, token);
TargetPort *port = _GetTargetPort(portID, true);
if (!port)
return B_NO_MEMORY;
if (port->IsEmpty()) {
status_t error = BMessage::Private::SendFlattenedMessage((void*)messageData,
messageSize, portID, token, 0);
if (error == B_OK) {
_PutTargetPort(port);
continue;
}
if (error != B_WOULD_BLOCK) {
_PutTargetPort(port);
if (targetIndex == 0 && !targets.HasNext())
return error;
continue;
}
}
if (!messageRef.IsSet()) {
void *data = malloc(messageSize);
if (!data)
return B_NO_MEMORY;
memcpy(data, messageData, messageSize);
Message *message = new(nothrow) Message(data, messageSize, timeout);
if (!message) {
free(data);
return B_NO_MEMORY;
}
messageRef.SetTo(message, true);
}
status_t error = port->PushMessage(messageRef, token);
_PutTargetPort(port);
if (error != B_OK)
return error;
}
return B_OK;
}
MessageDeliverer::TargetPort *
MessageDeliverer::_GetTargetPort(port_id portID, bool create)
{
TargetPortMap::iterator it = fTargetPorts->find(portID);
if (it != fTargetPorts->end())
return it->second;
if (!create)
return NULL;
TargetPort *port = new(nothrow) TargetPort(portID);
if (!port)
return NULL;
(*fTargetPorts)[portID] = port;
return port;
}
void
MessageDeliverer::_PutTargetPort(TargetPort *port)
{
if (!port)
return;
if (port->IsEmpty()) {
fTargetPorts->erase(port->PortID());
delete port;
}
}
status_t
MessageDeliverer::_SendMessage(Message *message, port_id portID, int32 token)
{
status_t error = BMessage::Private::SendFlattenedMessage(message->Data(),
message->DataSize(), portID, token, 0);
return error;
}
int32
MessageDeliverer::_DelivererThreadEntry(void *data)
{
return ((MessageDeliverer*)data)->_DelivererThread();
}
int32
MessageDeliverer::_DelivererThread()
{
while (!fTerminating) {
snooze(kRetryDelay);
if (fTerminating)
break;
BAutolock _(fLock);
for (TargetPortMap::iterator it = fTargetPorts->begin();
it != fTargetPorts->end();) {
TargetPort *port = it->second;
bool portError = false;
port->DropTimedOutMessages();
int32 token;
while (Message *message = port->PeekMessage(token)) {
status_t error = B_OK;
error = _SendMessage(message, port->PortID(), token);
if (error == B_OK) {
port->PopMessage();
} else if (error == B_WOULD_BLOCK) {
break;
} else {
portError = true;
break;
}
}
if (portError || port->IsEmpty()) {
TargetPortMap::iterator oldIt = it;
++it;
delete port;
fTargetPorts->erase(oldIt);
} else
++it;
}
}
return 0;
}