* Copyright (c) 1999-2000, Eric Moon.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions, and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions, and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* 3. The name of the author may not be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR "AS IS" AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY AND FITNESS FOR A PARTICULAR
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
* TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "NodeGroup.h"
#include "NodeManager.h"
#include "NodeRef.h"
#include <MediaRoster.h>
#include <OS.h>
#include <TimeSource.h>
#include <algorithm>
#include <functional>
#include "array_delete.h"
#include "BasicThread.h"
#include "node_manager_impl.h"
#include "functional_tools.h"
using namespace std;
__USE_CORTEX_NAMESPACE
#define D_METHOD(x)
#define D_ROSTER(x)
#define D_LOCK(x)
status_t NodeGroup::release() {
D_METHOD((
"NodeGroup::release()\n"));
if(isReleased())
return B_NOT_ALLOWED;
lock();
_stop();
while(m_nodes.size()) {
NodeRef* last = m_nodes.back();
removeNode(m_nodes.size()-1);
last->release();
}
unlock();
if(!m_manager->lock()) {
ASSERT(!"* m_manager->lock() failed.\n");
}
m_manager->_removeGroup(this);
m_manager->unlock();
return _inherited::release();
}
NodeGroup::~NodeGroup() {
Autolock _l(this);
D_METHOD((
"~NodeGroup()\n"));
ASSERT(!m_nodes.size());
if(m_timeSourceObj) {
m_timeSourceObj->Release();
m_timeSourceObj = 0;
}
}
const char* NodeGroup::name() const {
Autolock _l(this);
return m_name.String();
}
status_t NodeGroup::setName(const char* name) {
Autolock _l(this);
m_name = name;
return B_OK;
}
uint32 NodeGroup::countNodes() const {
Autolock _l(this);
return m_nodes.size();
}
NodeRef* NodeGroup::nodeAt(
uint32 index) const {
Autolock _l(this);
return (index < m_nodes.size()) ?
m_nodes[index] :
0;
}
status_t NodeGroup::addNode(
NodeRef* node) {
D_METHOD((
"NodeGroup::addNode()\n"));
m_manager->lock();
Autolock _l(this);
if(m_flags & GROUP_LOCKED)
return B_NOT_ALLOWED;
if(node->m_group) {
PRINT((
"!!! node already in group '%s'\n", node->m_group->name()));
m_manager->unlock();
return B_NOT_ALLOWED;
}
m_nodes.push_back(node);
node->_setGroup(this);
m_manager->unlock();
if(m_nodes.size() == 1) {
_changeState(TRANSPORT_INVALID, TRANSPORT_STOPPED);
}
status_t err = node->_initTransportState();
if(err < B_OK)
return err;
node->_setTimeSource(m_timeSource.node);
node->_setRunMode(m_runMode);
if(node->m_cycle)
_refCycleChanged(node);
if(m_transportState == TRANSPORT_RUNNING) {
}
if(!LockLooper()) {
ASSERT(!"LockLooper() failed.");
}
BMessage m(M_NODE_ADDED);
m.AddInt32("groupID", id());
m.AddInt32("nodeID", node->id());
notify(&m);
UnlockLooper();
return B_OK;
}
status_t NodeGroup::removeNode(
NodeRef* node) {
D_METHOD((
"NodeGroup::removeNode()\n"));
m_manager->lock();
Autolock _l(this);
if(node->m_group != this) {
PRINT((
"!!! node not in group '%s'\n", node->m_group->name()));
m_manager->unlock();
return B_NOT_ALLOWED;
}
if(node->m_cycle)
_cycleRemoveRef(node);
ASSERT(m_nodes.size());
remove(
m_nodes.begin(),
m_nodes.end(),
node);
m_nodes.resize(m_nodes.size()-1);
status_t err = node->_stop();
if(err < B_OK) {
PRINT((
"*** NodeGroup::removeNode('%s'): error from node->_stop():\n"
" %s\n",
node->name(),
strerror(err)));
}
node->_setGroup(0);
m_manager->unlock();
if(!m_nodes.size()) {
_changeState(TRANSPORT_INVALID);
}
if(!LockLooper()) {
ASSERT(!"LockLooper() failed.");
}
BMessage m(M_NODE_REMOVED);
m.AddInt32("groupID", id());
m.AddInt32("nodeID", node->id());
notify(&m);
UnlockLooper();
return B_OK;
}
status_t NodeGroup::removeNode(
uint32 index) {
D_METHOD((
"NodeGroup::removeNode(by index)\n"));
Autolock _l(this);
ASSERT(m_nodes.size() > index);
return removeNode(m_nodes[index]);
}
uint32 NodeGroup::groupFlags() const {
Autolock _l(this);
return m_flags;
}
status_t NodeGroup::setGroupFlags(
uint32 flags) {
Autolock _l(this);
m_flags = flags;
return B_OK;
}
bool NodeGroup::canCycle() const {
Autolock _l(this);
return
m_cycleNodes.size() > 0 &&
m_endPosition - m_startPosition > s_minCyclePeriod;
}
NodeGroup::transport_state_t NodeGroup::transportState() const {
Autolock _l(this);
return m_transportState;
}
status_t NodeGroup::setStartPosition(
bigtime_t start) {
Autolock _l(this);
D_METHOD((
"NodeGroup::setStartPosition(%lld)\n", start));
if(
m_transportState == TRANSPORT_RUNNING ||
m_transportState == TRANSPORT_ROLLING ||
m_transportState == TRANSPORT_STARTING) {
if(m_runMode == BMediaNode::B_OFFLINE)
return B_NOT_ALLOWED;
ASSERT(m_timeSourceObj);
if(_cycleValid()) {
if(m_timeSourceObj->Now() >= m_cycleDeadline) {
m_newStartPosition = start;
m_newStart = true;
return B_OK;
}
}
}
m_startPosition = start;
return B_OK;
}
bigtime_t NodeGroup::startPosition() const {
Autolock _l(this);
return m_startPosition;
}
status_t NodeGroup::setEndPosition(
bigtime_t end) {
Autolock _l(this);
D_METHOD((
"NodeGroup::setEndPosition(%lld)\n", end));
if(
m_transportState == TRANSPORT_RUNNING ||
m_transportState == TRANSPORT_ROLLING ||
m_transportState == TRANSPORT_STARTING) {
if(m_runMode == BMediaNode::B_OFFLINE)
return B_NOT_ALLOWED;
ASSERT(m_timeSourceObj);
bigtime_t endDelta = end - m_endPosition;
if(_cycleValid()) {
if(m_timeSourceObj->Now() >= m_cycleDeadline + endDelta) {
m_newEndPosition = end;
m_newEnd = true;
return B_OK;
}
else {
m_endPosition = end;
ASSERT(m_cyclePort);
write_port(
m_cyclePort,
_CYCLE_END_CHANGED,
0,
0);
}
}
}
m_endPosition = end;
return B_OK;
}
bigtime_t NodeGroup::endPosition() const {
Autolock _l(this);
return m_endPosition;
}
status_t NodeGroup::preroll() {
D_METHOD((
"NodeGroup::preroll()\n"));
Autolock _l(this);
return _preroll();
}
status_t NodeGroup::start() {
D_METHOD((
"NodeGroup::start()\n"));
Autolock _l(this);
return _start();
}
status_t NodeGroup::stop() {
D_METHOD((
"NodeGroup::stop()\n"));
Autolock _l(this);
return _stop();
}
status_t NodeGroup::roll() {
D_METHOD((
"NodeGroup::roll()\n"));
Autolock _l(this);
return _roll();
}
status_t NodeGroup::getTimeSource(
media_node* outTimeSource) const {
Autolock _l(this);
if(m_timeSource != media_node::null) {
*outTimeSource = m_timeSource;
return B_OK;
}
return B_ERROR;
}
status_t NodeGroup::setTimeSource(
const media_node& timeSource) {
Autolock _l(this);
if(m_transportState == TRANSPORT_RUNNING || m_transportState == TRANSPORT_ROLLING)
return B_NOT_ALLOWED;
if(m_timeSourceObj)
m_timeSourceObj->Release();
m_timeSource = timeSource;
m_timeSourceObj = m_manager->roster->MakeTimeSourceFor(timeSource);
ASSERT(m_timeSourceObj);
for_each(
m_nodes.begin(),
m_nodes.end(),
#if __GNUC__ <= 2
bind2nd(
mem_fun(&NodeRef::_setTimeSource),
m_timeSource.node
)
#else
[this](NodeRef* nodeRef) { nodeRef->_setTimeSource(m_timeSource.node); }
#endif
);
if(!LockLooper()) {
ASSERT(!"LockLooper() failed.");
}
BMessage m(M_TIME_SOURCE_CHANGED);
m.AddInt32("groupID", id());
m.AddInt32("timeSourceID", timeSource.node);
notify(&m);
UnlockLooper();
return B_OK;
}
BMediaNode::run_mode NodeGroup::runMode() const {
Autolock _l(this);
return m_runMode;
}
status_t NodeGroup::setRunMode(BMediaNode::run_mode mode) {
Autolock _l(this);
m_runMode = mode;
for_each(
m_nodes.begin(),
m_nodes.end(),
#if __GNUC__ <= 2
bind2nd(
mem_fun(&NodeRef::_setRunModeAuto),
m_runMode
)
#else
[this](NodeRef* ref) { ref->_setRunModeAuto(this->m_runMode); }
#endif
);
return B_OK;
}
void NodeGroup::MessageReceived(
BMessage* message) {
status_t err;
switch(message->what) {
case M_SET_TIME_SOURCE:
{
media_node timeSource;
void* data;
ssize_t dataSize;
err = message->FindData(
"timeSourceNode",
B_RAW_TYPE,
(const void**)&data,
&dataSize);
if(err < B_OK) {
PRINT((
"* NodeGroup::MessageReceived(M_SET_TIME_SOURCE):\n"
" no timeSourceNode!\n"));
break;
}
timeSource = *(media_node*)data;
setTimeSource(timeSource);
}
break;
case M_SET_RUN_MODE:
{
uint32 runMode;
err = message->FindInt32("runMode", (int32*)&runMode);
if(err < B_OK) {
PRINT((
"* NodeGroup::MessageReceived(M_SET_RUN_MODE):\n"
" no runMode!\n"));
break;
}
if(runMode < BMediaNode::B_OFFLINE ||
runMode > BMediaNode::B_RECORDING) {
PRINT((
"* NodeGroup::MessageReceived(M_SET_RUN_MODE):\n"
" invalid run mode (%" B_PRIu32 ")\n", runMode));
break;
}
setRunMode((BMediaNode::run_mode)runMode);
}
break;
case M_SET_START_POSITION:
{
bigtime_t position;
err = message->FindInt64("position", (int64*)&position);
if(err < B_OK) {
PRINT((
"* NodeGroup::MessageReceived(M_SET_START_POSITION):\n"
" no position!\n"));
break;
}
setStartPosition(position);
}
break;
case M_SET_END_POSITION:
{
bigtime_t position;
err = message->FindInt64("position", (int64*)&position);
if(err < B_OK) {
PRINT((
"* NodeGroup::MessageReceived(M_SET_END_POSITION):\n"
" no position!\n"));
break;
}
setEndPosition(position);
}
break;
case M_PREROLL:
preroll();
break;
case M_START:
start();
break;
case M_STOP:
stop();
break;
case M_ROLL:
roll();
break;
default:
_inherited::MessageReceived(message);
break;
}
}
#if CORTEX_XML
NodeGroup::NodeGroup() :
m_manager(0) {}
#endif
void NodeGroup::observerAdded(
const BMessenger& observer) {
BMessage m(M_OBSERVER_ADDED);
m.AddInt32("groupID", id());
m.AddMessenger("target", BMessenger(this));
observer.SendMessage(&m);
}
void NodeGroup::observerRemoved(
const BMessenger& observer) {
BMessage m(M_OBSERVER_REMOVED);
m.AddInt32("groupID", id());
m.AddMessenger("target", BMessenger(this));
observer.SendMessage(&m);
}
void NodeGroup::notifyRelease() {
BMessage m(M_RELEASED);
m.AddInt32("groupID", id());
m.AddMessenger("target", BMessenger(this));
notify(&m);
}
void NodeGroup::releaseComplete() {
}
bool NodeGroup::lock(
lock_t type,
bigtime_t timeout) {
D_LOCK(("*** NodeGroup::lock(): %ld\n", find_thread(0)));
ASSERT(type == WRITE);
status_t err = m_lock.LockWithTimeout(timeout);
D_LOCK(("*** NodeGroup::lock() ACQUIRED: %ld\n", find_thread(0)));
return err == B_OK;
}
bool NodeGroup::unlock(
lock_t type) {
D_LOCK(("*** NodeGroup::unlock(): %ld\n", find_thread(0)));
ASSERT(type == WRITE);
m_lock.Unlock();
D_LOCK(("*** NodeGroup::unlock() RELEASED: %ld\n", find_thread(0)));
return true;
}
bool NodeGroup::isLocked(
lock_t type) const {
ASSERT(type == WRITE);
return m_lock.IsLocked();
}
NodeGroup::NodeGroup(
const char* name,
NodeManager* manager,
BMediaNode::run_mode runMode) :
ObservableHandler(name),
m_lock("NodeGroup::m_lock"),
m_manager(manager),
m_id(NextID()),
m_name(name),
m_flags(0),
m_transportState(TRANSPORT_INVALID),
m_runMode(runMode),
m_timeSourceObj(0),
m_released(false),
m_cycleThread(0),
m_cyclePort(0),
m_startPosition(0LL),
m_endPosition(0LL),
m_newStart(false),
m_newEnd(false) {
ASSERT(m_manager);
if(!m_manager->Lock()) {
ASSERT(!"m_manager->Lock() failed");
}
m_manager->AddHandler(this);
m_manager->Unlock();
media_node ts;
D_ROSTER(("# roster->GetTimeSource()\n"));
status_t err = m_manager->roster->GetTimeSource(&ts);
if(err < B_OK) {
PRINT((
"*** NodeGroup(): roster->GetTimeSource() failed:\n"
" %s\n", strerror(err)));
}
setTimeSource(ts);
}
uint32 NodeGroup::s_nextID = 1;
uint32 NodeGroup::NextID() {
return atomic_add((int32*)&s_nextID, 1);
}
void NodeGroup::_refCycleChanged(
NodeRef* ref) {
assert_locked(this);
D_METHOD((
"NodeGroup::_refCycleChanged('%s')\n",
ref->name()));
if(ref->m_cycle) {
_cycleAddRef(ref);
} else {
_cycleRemoveRef(ref);
}
}
void NodeGroup::_refLatencyChanged(
NodeRef* ref) {
assert_locked(this);
D_METHOD((
"NodeGroup::_refLatencyChanged('%s')\n",
ref->name()));
if(!_cycleValid())
return;
_cycleRemoveRef(ref);
_cycleAddRef(ref);
ASSERT(m_cyclePort);
write_port(
m_cyclePort,
_CYCLE_LATENCY_CHANGED,
0,
0);
}
void NodeGroup::_refStopped(
NodeRef* ref) {
assert_locked(this);
D_METHOD((
"NodeGroup::_refStopped('%s')\n",
ref->name()));
if(m_transportState == TRANSPORT_ROLLING) {
bool nodesRunning = false;
for(node_set::iterator it = m_nodes.begin();
it != m_nodes.end(); ++it) {
if((*it)->isRunning()) {
nodesRunning = true;
break;
}
}
if(!nodesRunning)
_changeState(TRANSPORT_STOPPED);
}
}
status_t NodeGroup::_preroll() {
assert_locked(this);
D_METHOD((
"NodeGroup::_preroll()\n"));
if(
m_transportState == TRANSPORT_RUNNING ||
m_transportState == TRANSPORT_ROLLING)
return B_NOT_ALLOWED;
for(node_set::iterator it = m_nodes.begin();
it != m_nodes.end(); ++it) {
(*it)->_preroll(m_startPosition);
}
return B_OK;
}
status_t NodeGroup::_start() {
assert_locked(this);
D_METHOD((
"NodeGroup::_start()\n"));
status_t err;
if(m_transportState != TRANSPORT_STOPPED)
return B_NOT_ALLOWED;
if(m_runMode == BMediaNode::B_OFFLINE)
return B_NOT_ALLOWED;
ASSERT(m_nodes.size());
_changeState(TRANSPORT_STARTING);
bigtime_t offset = 0LL;
calcLatencyFn _f(offset);
for_each(
m_nodes.begin(),
m_nodes.end(),
_f);
offset += s_rosterLatency;
PRINT((
"- offset: %" B_PRIdBIGTIME "\n", offset));
for(node_set::iterator it = m_nodes.begin();
it != m_nodes.end(); ++it) {
err = (*it)->_seekStopped(m_startPosition);
if(err < B_OK) {
PRINT((
"! NodeGroup('%s')::_start():\n"
" ref('%s')->_seekStopped(%" B_PRIdBIGTIME ") failed:\n"
" %s\n",
name(), (*it)->name(), m_startPosition,
strerror(err)));
}
}
ASSERT(m_timeSourceObj);
bigtime_t when = m_timeSourceObj->Now() + offset;
if(_cycleValid()) {
_initCycleThread();
_cycleInit(when);
}
for(node_set::iterator it = m_nodes.begin();
it != m_nodes.end(); ++it) {
err = (*it)->_start(when);
if(err < B_OK) {
PRINT((
"! NodeGroup('%s')::_start():\n"
" ref('%s')->_start(%" B_PRIdBIGTIME ") failed:\n"
" %s\n",
name(), (*it)->name(), when,
strerror(err)));
}
}
_changeState(TRANSPORT_RUNNING);
return B_OK;
}
status_t NodeGroup::_stop() {
D_METHOD((
"NodeGroup::_stop()\n"));
assert_locked(this);
if(
m_transportState != TRANSPORT_RUNNING &&
m_transportState != TRANSPORT_ROLLING)
return B_NOT_ALLOWED;
_changeState(TRANSPORT_STOPPING);
_destroyCycleThread();
for_each(
m_nodes.begin(),
m_nodes.end(),
#if __GNUC__ <= 2
mem_fun(&NodeRef::_stop)
#else
[](NodeRef* ref) { ref->_stop(); }
#endif
);
_changeState(TRANSPORT_STOPPED);
return B_OK;
}
status_t NodeGroup::_roll() {
D_METHOD((
"NodeGroup::_roll()\n"));
assert_locked(this);
status_t err;
if(m_transportState != TRANSPORT_STOPPED)
return B_NOT_ALLOWED;
bigtime_t period = m_endPosition - m_startPosition;
if(period <= 0LL)
return B_NOT_ALLOWED;
_changeState(TRANSPORT_STARTING);
bigtime_t tpStart = 0LL;
bigtime_t tpStop = period;
if(m_runMode != BMediaNode::B_OFFLINE) {
bigtime_t offset = 0LL;
calcLatencyFn _f(offset);
for_each(
m_nodes.begin(),
m_nodes.end(),
_f);
offset += s_rosterLatency;
PRINT((
"- offset: %" B_PRIdBIGTIME "\n", offset));
ASSERT(m_timeSourceObj);
tpStart = m_timeSourceObj->Now() + offset;
tpStop += tpStart;
}
bool allFailed = true;
err = B_OK;
for(
node_set::iterator it = m_nodes.begin();
it != m_nodes.end(); ++it) {
status_t e = (*it)->_roll(
tpStart,
tpStop,
m_startPosition);
if(e < B_OK)
err = e;
else
allFailed = false;
}
if(!allFailed)
_changeState(TRANSPORT_ROLLING);
return err;
}
inline void NodeGroup::_changeState(
transport_state_t to) {
assert_locked(this);
m_transportState = to;
if(!LockLooper()) {
ASSERT(!"LockLooper() failed.");
}
BMessage m(M_TRANSPORT_STATE_CHANGED);
m.AddInt32("groupID", id());
m.AddInt32("transportState", m_transportState);
notify(&m);
UnlockLooper();
}
inline void NodeGroup::_changeState(
transport_state_t from,
transport_state_t to) {
assert_locked(this);
ASSERT(m_transportState == from);
_changeState(to);
}
const int32 _portLength = 32;
const char* const _portName = "NodeGroup::m_cyclePort";
const size_t _portMsgMaxSize = 256;
status_t NodeGroup::_initCycleThread() {
assert_locked(this);
status_t err;
D_METHOD((
"NodeGroup::_initCycleThread()\n"));
if(m_cycleThread) {
err = _destroyCycleThread();
if(err < B_OK)
return err;
}
m_cycleThreadDone = false;
m_cycleThread = spawn_thread(
&_CycleThread,
"NodeGroup[cycleThread]",
B_NORMAL_PRIORITY,
(void*)this);
if(m_cycleThread < B_OK) {
PRINT((
"* NodeGroup::_initCycleThread(): spawn_thread() failed:\n"
" %s\n",
strerror(m_cycleThread)));
return m_cycleThread;
}
return resume_thread(m_cycleThread);
}
status_t NodeGroup::_destroyCycleThread() {
assert_locked(this);
status_t err;
D_METHOD((
"NodeGroup::_destroyCycleThread()\n"));
if(!m_cycleThread)
return B_OK;
if(!m_cycleThreadDone) {
ASSERT(m_cyclePort);
err = write_port_etc(
m_cyclePort,
_CYCLE_STOP,
0,
0,
B_TIMEOUT,
10000LL);
if(err < B_OK) {
PRINT((
"* NodeGroup::_destroyCycleThread(): port write failed; killing.\n"));
delete_port(m_cyclePort);
m_cyclePort = 0;
kill_thread(m_cycleThread);
m_cycleThread = 0;
return B_OK;
}
unlock();
while(wait_for_thread(m_cycleThread, &err) == B_INTERRUPTED) {
PRINT((
"! wait_for_thread(m_cycleThread, &err) == B_INTERRUPTED\n"));
}
lock();
}
ASSERT(!m_cyclePort);
m_cycleThread = 0;
return B_OK;
}
bool NodeGroup::_cycleValid() {
assert_locked(this);
return
(m_transportState == TRANSPORT_RUNNING ||
m_transportState == TRANSPORT_STARTING) &&
canCycle();
}
void NodeGroup::_cycleInit(
bigtime_t startTime) {
assert_locked(this);
ASSERT(m_cycleNodes.size() > 0);
D_METHOD((
"NodeGroup::_cycleInit(%lld)\n",
startTime));
bigtime_t cyclePeriod = m_endPosition - m_startPosition;
if(cyclePeriod <= 0) {
m_cycleBoundary = 0LL;
m_cycleDeadline = 0LL;
return;
}
m_cycleStart = startTime;
m_cycleBoundary = startTime + cyclePeriod;
m_cycleDeadline = m_cycleBoundary - (m_cycleMaxLatency + s_rosterLatency);
}
void NodeGroup::_cycleAddRef(
NodeRef* ref) {
assert_locked(this);
ASSERT(find(
m_cycleNodes.begin(),
m_cycleNodes.end(),
ref) == m_cycleNodes.end());
if(!ref->m_latency)
ref->_updateLatency();
node_set::iterator it;
for(it = m_cycleNodes.begin();
it != m_cycleNodes.end(); ++it) {
if(ref->m_latency > (*it)->m_latency) {
m_cycleNodes.insert(it, ref);
break;
}
}
if(it == m_cycleNodes.end())
m_cycleNodes.insert(it, ref);
}
void NodeGroup::_cycleRemoveRef(
NodeRef* ref) {
assert_locked(this);
node_set::iterator it = find(
m_cycleNodes.begin(),
m_cycleNodes.end(),
ref);
ASSERT(it != m_cycleNodes.end());
m_cycleNodes.erase(it);
}
bigtime_t NodeGroup::_cycleBoundary() const {
Autolock _l(this);
return m_cycleBoundary;
}
status_t NodeGroup::_CycleThread(void* user) {
((NodeGroup*)user)->_cycleThread();
return B_OK;
}
void NodeGroup::_cycleThread() {
status_t err;
int32 code;
int32 errorCount = 0;
char* msgBuffer = new char[_portMsgMaxSize];
array_delete<char> _d(msgBuffer);
ASSERT(!m_cyclePort);
m_cyclePort = create_port(
_portLength,
_portName);
ASSERT(m_cyclePort >= B_OK);
bool done = false;
while(!done) {
lock();
if(!_cycleValid()) {
unlock();
break;
}
ASSERT(m_cycleNodes.size() > 0);
ASSERT(m_timeSourceObj);
bigtime_t maxLatency = m_cycleNodes.front()->m_latency;
bigtime_t wakeUpAt = m_timeSourceObj->RealTimeFor(
m_cycleBoundary, maxLatency + s_rosterLatency);
bigtime_t timeout = wakeUpAt - m_timeSourceObj->RealTime();
if(timeout <= 0) {
PRINT((
"*** NodeGroup::_cycleThread(): LATE\n"
" by %" B_PRIdBIGTIME "\n", -timeout));
}
unlock();
err = read_port_etc(
m_cyclePort,
&code,
msgBuffer,
_portMsgMaxSize,
B_TIMEOUT,
timeout);
if(err == B_TIMED_OUT) {
_handleCycleService();
continue;
}
else if(err < B_OK) {
PRINT((
"* NodeGroup::_cycleThread(): read_port error:\n"
" %s\n"
" ABORTING\n\n", strerror(err)));
if(++errorCount > 10) {
PRINT((
"*** Too many errors; aborting.\n"));
break;
}
continue;
}
errorCount = 0;
switch(code) {
case _CYCLE_STOP:
done = true;
break;
case _CYCLE_END_CHANGED:
case _CYCLE_LATENCY_CHANGED:
break;
default:
PRINT((
"* NodeGroup::_cycleThread(): unknown message code '%"
B_PRId32 "'\n", code));
break;
}
}
delete_port(m_cyclePort);
m_cyclePort = 0;
m_cycleThreadDone = true;
}
void NodeGroup::_handleCycleService() {
Autolock _l(this);
status_t err;
if(!_cycleValid()) {
return;
}
for(node_set::iterator it = m_cycleNodes.begin();
it != m_cycleNodes.end(); ++it) {
err = (*it)->_seek(
m_startPosition,
m_cycleBoundary);
if(err < B_OK) {
PRINT((
"- _handleCycleService(): node('%s')::_seek() failed:\n"
" %s\n",
(*it)->name(), strerror(err)));
}
}
if(m_newStart) {
m_newStart = false;
m_startPosition = m_newStartPosition;
}
if(m_newEnd) {
m_newEnd = false;
m_endPosition = m_newEndPosition;
}
_cycleInit(m_cycleBoundary);
}