* Copyright (c) 1999-2000, Eric Moon.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions, and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions, and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* 3. The name of the author may not be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR "AS IS" AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY AND FITNESS FOR A PARTICULAR
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
* TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "NodeManager.h"
#include "AddOnHost.h"
#include "Connection.h"
#include "NodeGroup.h"
#include "NodeRef.h"
#include <Debug.h>
#include <MediaRoster.h>
#include <algorithm>
#include <cstring>
#include <functional>
#include <list>
#include <set>
#undef B_CATALOG
#define B_CATALOG (&sCatalog)
#include <Catalog.h>
#undef B_TRANSLATION_CONTEXT
#define B_TRANSLATION_CONTEXT "NodeManager"
#include "set_tools.h"
#include "functional_tools.h"
#include "node_manager_impl.h"
using namespace std;
__USE_CORTEX_NAMESPACE
#define D_METHOD(x)
#define D_MESSAGE(x)
#define D_ROSTER(x)
#define D_LOCK(x)
static BCatalog sCatalog("x-vnd.Cortex.NodeManager");
const char* const _connectionField = "__connection_id";
const char* const _sourceNodeField = "__source_node_id";
const char* const _destNodeField = "__destination_node_id";
void NodeManager::nodeCreated(
NodeRef* ref) {}
void NodeManager::nodeDeleted(
const NodeRef* ref) {}
void NodeManager::connectionMade(
Connection* connection) {}
void NodeManager::connectionBroken(
const Connection* connection) {}
void NodeManager::connectionFailed(
const media_output & output,
const media_input & input,
const media_format & format,
status_t error) {}
class _for_each_state {
public:
set<media_node_id> visited;
};
template<class Op>
void _do_for_each_connected(
NodeManager* manager,
NodeRef* origin,
NodeGroup* inGroup,
bool recurse,
Op operation,
_for_each_state* state) {
ASSERT(manager->IsLocked());
ASSERT(origin);
ASSERT(state);
status_t err __attribute__((unused));
if(state->visited.find(origin->id()) != state->visited.end()) {
return;
}
vector<Connection> connections;
state->visited.insert(origin->id());
origin->getInputConnections(connections);
for(uint32 n = 0; n < connections.size(); ++n) {
if(!connections[n].isValid())
continue;
NodeRef* targetRef;
err = manager->getNodeRef(
connections[n].sourceNode(),
&targetRef);
ASSERT(err == B_OK);
ASSERT(targetRef);
if(inGroup && targetRef->group() != inGroup) {
return;
}
operation(targetRef);
if(recurse)
_do_for_each_connected(
manager,
targetRef,
inGroup,
true,
operation,
state);
}
connections.clear();
origin->getOutputConnections(connections);
for(uint32 n = 0; n < connections.size(); ++n) {
if(!connections[n].isValid())
continue;
NodeRef* targetRef;
err = manager->getNodeRef(
connections[n].destinationNode(),
&targetRef);
ASSERT(err == B_OK);
ASSERT(targetRef);
if(inGroup && targetRef->group() != inGroup) {
return;
}
operation(targetRef);
if(recurse)
_do_for_each_connected(
manager,
targetRef,
inGroup,
true,
operation,
state);
}
}
inline void NodeManager::_clearGroup(
NodeGroup* group) {
Autolock _l(group);
D_METHOD((
"NodeManager::_clearGroup()\n"));
group->_stop();
int32 n;
while((n = group->countNodes()) > 0) {
group->removeNode(n-1);
}
}
inline void NodeManager::_freeConnection(
Connection* connection) {
ASSERT(connection);
D_METHOD((
"NodeManager::_freeConnection(%ld)\n", connection->id()));
status_t err;
if(
connection->isValid() &&
connection->flags() & Connection::INTERNAL &&
!(connection->flags() & Connection::LOCKED)) {
D_METHOD((
"! breaking connection:\n"
" source node: %ld\n"
" source id: %ld\n"
" source port: %ld\n"
" dest node: %ld\n"
" dest id: %ld\n"
" dest port: %ld\n",
connection->sourceNode(),
connection->source().id, connection->source().port,
connection->destinationNode(),
connection->destination().id, connection->destination().port));
D_ROSTER(("# roster->Disconnect()\n"));
err = roster->Disconnect(
connection->sourceNode(),
connection->source(),
connection->destinationNode(),
connection->destination());
if(err < B_OK) {
D_METHOD((
"!!! BMediaRoster::Disconnect('%s' -> '%s') failed:\n"
" %s\n",
connection->outputName(), connection->inputName(),
strerror(err)));
}
}
delete connection;
}
NodeManager::~NodeManager() {
D_METHOD((
"~NodeManager()\n"));
ASSERT(IsLocked());
list<NodeRef*> deadNodes;
for(node_ref_map::const_iterator it = m_nodeRefMap.begin();
it != m_nodeRefMap.end(); ++it) {
deadNodes.push_back((*it).second);
}
for(node_group_set::iterator it = m_nodeGroupSet.begin();
it != m_nodeGroupSet.end(); ++it) {
_clearGroup(*it);
}
ptr_set_delete(
m_nodeGroupSet.begin(),
m_nodeGroupSet.end());
m_nodeGroupSet.clear();
for(con_map::iterator it = m_conSourceMap.begin();
it != m_conSourceMap.end(); ++it) {
_freeConnection((*it).second);
}
m_conSourceMap.clear();
m_conDestinationMap.clear();
for(list<NodeRef*>::const_iterator it = deadNodes.begin();
it != deadNodes.end(); ++it) {
(*it)->release();
}
if(m_nodeRefMap.size()) {
PRINT(("*** %ld nodes remaining!\n", m_nodeRefMap.size()));
deadNodes.clear();
for(node_ref_map::const_iterator it = m_nodeRefMap.begin();
it != m_nodeRefMap.end(); ++it)
deadNodes.push_back((*it).second);
ptr_set_delete(
deadNodes.begin(),
deadNodes.end());
}
}
const char* const
NodeManager::s_defaultGroupPrefix = B_TRANSLATE("No name");
const char* const
NodeManager::s_timeSourceGroup = B_TRANSLATE("Time sources");
const char* const
NodeManager::s_audioInputGroup = B_TRANSLATE("System audio input");
const char* const
NodeManager::s_videoInputGroup = B_TRANSLATE("System video input");
const char* const
NodeManager::s_audioMixerGroup = B_TRANSLATE("System audio mixer");
const char* const
NodeManager::s_videoOutputGroup = B_TRANSLATE("System video output");
NodeManager::NodeManager(
bool useAddOnHost) :
ObservableLooper("NodeManager"),
roster(BMediaRoster::Roster()),
m_audioInputNode(0),
m_videoInputNode(0),
m_audioMixerNode(0),
m_audioOutputNode(0),
m_videoOutputNode(0),
m_nextConID(1),
m_existingNodesInit(false),
m_useAddOnHost(useAddOnHost) {
D_METHOD((
"NodeManager()\n"));
ASSERT(roster);
_initCommonNodes();
Run();
D_ROSTER(("# roster->StartWatching(%p)\n", this));
roster->StartWatching(BMessenger(this));
}
status_t NodeManager::getNodeRef(
media_node_id id,
NodeRef** outRef) const {
Autolock _l(this);
D_METHOD((
"NodeManager::getNodeRef(%ld)\n", id));
node_ref_map::const_iterator it = m_nodeRefMap.find(id);
if(it == m_nodeRefMap.end()) {
*outRef = 0;
return B_BAD_VALUE;
}
*outRef = (*it).second;
return B_OK;
}
status_t NodeManager::findConnection(
media_node_id node,
const media_source& source,
Connection* outConnection) const {
Autolock _l(this);
D_METHOD((
"NodeManager::findConnection()\n"));
ASSERT(source != media_source::null);
con_map::const_iterator it = m_conSourceMap.lower_bound(node);
con_map::const_iterator itEnd = m_conSourceMap.upper_bound(node);
for(; it != itEnd; ++it)
if((*it).second->source() == source) {
*outConnection = *((*it).second);
return B_OK;
}
*outConnection = Connection();
return B_BAD_VALUE;
}
status_t NodeManager::findConnection(
media_node_id node,
const media_destination& destination,
Connection* outConnection) const {
Autolock _l(this);
D_METHOD((
"NodeManager::findConnection()\n"));
ASSERT(destination != media_destination::null);
con_map::const_iterator it = m_conDestinationMap.lower_bound(node);
con_map::const_iterator itEnd = m_conDestinationMap.upper_bound(node);
for(; it != itEnd; ++it)
if((*it).second->destination() == destination) {
*outConnection = *((*it).second);
return B_OK;
}
*outConnection = Connection();
return B_BAD_VALUE;
}
status_t NodeManager::findConnection(
media_node_id sourceNode,
media_node_id destinationNode,
Connection* outConnection) const {
Autolock _l(this);
D_METHOD((
"NodeManager::findConnection(source %ld, dest %ld)\n", sourceNode, destinationNode));
con_map::const_iterator it = m_conSourceMap.lower_bound(sourceNode);
con_map::const_iterator itEnd = m_conSourceMap.upper_bound(sourceNode);
for(; it != itEnd; ++it) {
if((*it).second->destinationNode() == destinationNode) {
*outConnection = *((*it).second);
return B_OK;
}
}
*outConnection = Connection();
return B_BAD_VALUE;
}
class NodeManager::_find_route_state {
public:
set<media_node_id> visited;
};
bool NodeManager::findRoute(
media_node_id nodeA,
media_node_id nodeB) {
Autolock _l(this);
D_METHOD((
"NodeManager::findRoute(%ld, %ld)\n", nodeA, nodeB));
status_t err;
NodeRef* ref;
err = getNodeRef(nodeA, &ref);
if(err < B_OK) {
PRINT((
"!!! NodeManager::findRoute(%" B_PRId32 ", %" B_PRId32
"): no ref for node %" B_PRId32 "\n", nodeA, nodeB, nodeA));
return false;
}
_find_route_state st;
return _find_route_recurse(ref, nodeB, &st);
}
bool NodeManager::_find_route_recurse(
NodeRef* origin,
media_node_id target,
_find_route_state* state) {
ASSERT(IsLocked());
ASSERT(origin);
ASSERT(state);
status_t err __attribute__((unused));
if(state->visited.find(origin->id()) != state->visited.end()) {
return false;
}
state->visited.insert(origin->id());
vector<Connection> connections;
origin->getInputConnections(connections);
for(uint32 n = 0; n < connections.size(); ++n) {
if(!connections[n].isValid())
continue;
if(connections[n].sourceNode() == target)
return true;
NodeRef* ref;
err = getNodeRef(
connections[n].sourceNode(),
&ref);
ASSERT(err == B_OK);
ASSERT(ref);
if(_find_route_recurse(
ref,
target,
state))
return true;
}
connections.clear();
origin->getOutputConnections(connections);
for(uint32 n = 0; n < connections.size(); ++n) {
if(!connections[n].isValid())
continue;
if(connections[n].destinationNode() == target)
return true;
NodeRef* ref;
err = getNodeRef(
connections[n].destinationNode(),
&ref);
ASSERT(err == B_OK);
ASSERT(ref);
if(_find_route_recurse(
ref,
target,
state))
return true;
}
return false;
}
status_t NodeManager::findConnection(
const media_source& source,
Connection* outConnection) const {
Autolock _l(this);
D_METHOD((
"NodeManager::findConnection()\n"));
ASSERT(source != media_source::null);
for(con_map::const_iterator it = m_conSourceMap.begin();
it != m_conSourceMap.end(); ++it) {
if((*it).second->source() == source) {
*outConnection = *((*it).second);
return B_OK;
}
}
*outConnection = Connection();
return B_BAD_VALUE;
}
status_t NodeManager::findConnection(
const media_destination& destination,
Connection* outConnection) const {
Autolock _l(this);
D_METHOD((
"NodeManager::findConnection()\n"));
ASSERT(destination != media_destination::null);
for(con_map::const_iterator it = m_conDestinationMap.begin();
it != m_conDestinationMap.end(); ++it) {
if((*it).second->destination() == destination) {
*outConnection = *((*it).second);
return B_OK;
}
}
*outConnection = Connection();
return B_BAD_VALUE;
}
NodeRef* NodeManager::audioInputNode() const {
Autolock _l(this);
return m_audioInputNode;
}
NodeRef* NodeManager::videoInputNode() const {
Autolock _l(this);
return m_videoInputNode;
}
NodeRef* NodeManager::audioMixerNode() const {
Autolock _l(this);
return m_audioMixerNode;
}
NodeRef* NodeManager::audioOutputNode() const {
Autolock _l(this);
return m_audioOutputNode;
}
NodeRef* NodeManager::videoOutputNode() const {
Autolock _l(this);
return m_videoOutputNode;
}
uint32 NodeManager::countGroups() const {
Autolock _l(this);
D_METHOD((
"NodeManager::countGroups()\n"));
return m_nodeGroupSet.size();
}
NodeGroup* NodeManager::groupAt(
uint32 index) const {
Autolock _l(this);
D_METHOD((
"NodeManager::groupAt()\n"));
return (index < m_nodeGroupSet.size()) ?
m_nodeGroupSet[index] :
0;
}
status_t NodeManager::findGroup(uint32 id, NodeGroup** outGroup) const
{
Autolock _l(this);
D_METHOD(("NodeManager::findGroup(id)\n"));
node_group_set::const_iterator it;
for (it = m_nodeGroupSet.begin(); it != m_nodeGroupSet.end(); it++)
{
if ((*it)->id() == id) {
*outGroup = *it;
return B_OK;
}
}
*outGroup = 0;
return B_BAD_VALUE;
}
status_t NodeManager::findGroup(const char* name, NodeGroup** outGroup) const
{
Autolock _l(this);
D_METHOD(("NodeManager::findGroup(name)\n"));
node_group_set::const_iterator it;
for (it = m_nodeGroupSet.begin(); it != m_nodeGroupSet.end(); it++)
{
if (strcmp((*it)->name(), name) == 0) {
*outGroup = *it;
return B_OK;
}
}
*outGroup = 0;
return B_BAD_VALUE;
}
status_t NodeManager::mergeGroups(
NodeGroup* sourceGroup,
NodeGroup* destinationGroup) {
Autolock _l(this);
D_METHOD((
"NodeManager::mergeGroups(name)\n"));
if(sourceGroup->id() == destinationGroup->id())
return B_OK;
if(sourceGroup->isReleased() || destinationGroup->isReleased())
return B_NOT_ALLOWED;
for(uint32 n = sourceGroup->countNodes(); n; --n) {
NodeRef* node = sourceGroup->nodeAt(n-1);
ASSERT(node);
status_t err __attribute__((unused)) = sourceGroup->removeNode(n-1);
ASSERT(err == B_OK);
err = destinationGroup->addNode(node);
ASSERT(err == B_OK);
}
_removeGroup(sourceGroup);
sourceGroup->release();
return B_OK;
}
class _changeNodeGroupFn {
public:
NodeGroup* newGroup;
_changeNodeGroupFn(
NodeGroup* _newGroup) : newGroup(_newGroup) {
ASSERT(newGroup);
}
void operator()(
NodeRef* node) {
PRINT((
"_changeNodeGroupFn(): '%s'\n", node->name()));
status_t err __attribute__((unused));
NodeGroup* oldGroup = node->group();
if(oldGroup) {
err = oldGroup->removeNode(node);
ASSERT(err == B_OK);
}
err = newGroup->addNode(node);
ASSERT(err == B_OK);
}
};
status_t NodeManager::splitGroup(
NodeRef* insideNode,
NodeRef* outsideNode,
NodeGroup** outGroup) {
ASSERT(insideNode);
ASSERT(outsideNode);
Autolock _l(this);
if(findRoute(insideNode->id(), outsideNode->id())) {
PRINT((
"!!! NodeManager::splitGroup(): route exists from %" B_PRId32
" to %" B_PRId32 "\n", insideNode->id(), outsideNode->id()));
return B_NOT_ALLOWED;
}
NodeGroup* oldGroup = insideNode->group();
if(!oldGroup) {
PRINT(("!!! NodeManager::splitGroup(): invalid group\n"));
return B_NOT_ALLOWED;
}
if(oldGroup != outsideNode->group()) {
PRINT((
"!!! NodeManager::splitGroup(): mismatched groups for %" B_PRId32
" and %" B_PRId32 "\n", insideNode->id(), outsideNode->id()));
return B_NOT_ALLOWED;
}
Autolock _l_old_group(oldGroup);
BString nameBuffer = oldGroup->name();
nameBuffer << " split";
NodeGroup* newGroup = createGroup(
nameBuffer.String(),
oldGroup->runMode());
*outGroup = newGroup;
_changeNodeGroupFn fn(newGroup);
fn(outsideNode);
_for_each_state st;
_do_for_each_connected(
this,
outsideNode,
oldGroup,
true,
fn,
&st);
if(newGroup->countNodes() == 1)
newGroup->setName(newGroup->nodeAt(0)->name());
if(oldGroup->countNodes() == 1)
oldGroup->setName(oldGroup->nodeAt(0)->name());
return B_OK;
}
status_t NodeManager::instantiate(
const dormant_node_info& info,
NodeRef** outRef,
bigtime_t timeout,
uint32 nodeFlags) {
Autolock _l(this);
status_t err;
D_METHOD((
"NodeManager::instantiate()\n"));
media_node node;
if(m_useAddOnHost) {
err = AddOnHost::InstantiateDormantNode(
info, &node, timeout);
if(err < B_OK) {
node = media_node::null;
BMessenger mess;
err = AddOnHost::Launch(&mess);
if(err < B_OK) {
PRINT((
"!!! NodeManager::instantiate(): giving up on AddOnHost\n"));
m_useAddOnHost = false;
}
else {
err = AddOnHost::InstantiateDormantNode(
info, &node, timeout);
}
}
}
if(!m_useAddOnHost || node == media_node::null) {
D_ROSTER((
"# roster->InstantiateDormantNode()\n"));
err = roster->InstantiateDormantNode(info, &node);
}
if(err < B_OK) {
*outRef = 0;
return err;
}
if(node == media_node::null) {
PRINT((
"! InstantiateDormantNode(): invalid media node\n"));
*outRef = 0;
return B_BAD_INDEX;
}
NodeRef* ref = new NodeRef(
node,
this,
nodeFlags,
NodeRef::_INTERNAL);
ref->_setAddonHint(&info);
_addRef(ref);
*outRef = ref;
return B_OK;
}
status_t NodeManager::instantiate(
const entry_ref& file,
uint64 requireNodeKinds,
NodeRef** outRef,
bigtime_t timeout,
uint32 nodeFlags,
bigtime_t* outDuration) {
D_METHOD((
"NodeManager::instantiate(ref)\n"));
status_t err;
dormant_node_info info;
D_ROSTER((
"# roster->SniffRef()\n"));
err = roster->SniffRef(
file,
requireNodeKinds,
&info);
if(err < B_OK) {
*outRef = 0;
return err;
}
err = instantiate(info, outRef, timeout, nodeFlags);
if(err < B_OK)
return err;
ASSERT(*outRef);
bigtime_t dur;
D_ROSTER(("# roster->SetRefFor()\n"));
err = roster->SetRefFor(
(*outRef)->node(),
file,
false,
&dur);
if(err < B_OK) {
PRINT((
"* SetRefFor() failed: %s\n", strerror(err)));
}
else if(outDuration)
*outDuration = dur;
Autolock _l(*outRef);
(*outRef)->_setAddonHint(&info, &file);
return err;
}
status_t NodeManager::reference(
BMediaNode* node,
NodeRef** outRef,
uint32 nodeFlags) {
Autolock _l(this);
D_METHOD((
"NodeManager::reference()\n"));
NodeRef* ref = new NodeRef(node->Node(), this, nodeFlags, 0);
_addRef(ref);
*outRef = ref;
return B_OK;
}
status_t NodeManager::connect(
const media_output& output,
const media_input& input,
const media_format& templateFormat,
Connection* outConnection ) {
Autolock _l(this);
status_t err;
D_METHOD((
"NodeManager::connect()\n"));
NodeRef* outputRef;
if(getNodeRef(output.node.node, &outputRef) < B_OK)
outputRef = _addRefFor(output.node, 0);
NodeRef* inputRef;
if(getNodeRef(input.node.node, &inputRef) < B_OK)
inputRef = _addRefFor(input.node, 0);
media_output finalOutput;
media_input finalInput;
media_format finalFormat = templateFormat;
D_ROSTER(("# roster->Connect()\n"));
err = roster->Connect(
output.source,
input.destination,
&finalFormat,
&finalOutput,
&finalInput);
if(err < B_OK) {
if(outConnection)
*outConnection = Connection();
connectionFailed(output, input, templateFormat, err);
return err;
}
D_METHOD((
"! creating connection:\n"
" source id: %" B_PRId32 "\n"
" source port: %" B_PRId32 "\n"
" dest id: %" B_PRId32 "\n"
" dest port: %" B_PRId32 "\n",
finalOutput.source.id, finalOutput.source.port,
finalInput.destination.id, finalInput.destination.port));
uint32 cflags = Connection::INTERNAL;
if(outputRef->m_info.node.kind & B_FILE_INTERFACE) {
cflags |= Connection::LOCKED;
}
Connection* con = new Connection(
m_nextConID++,
output.node,
finalOutput.source,
finalOutput.name,
input.node,
finalInput.destination,
finalInput.name,
finalFormat,
cflags);
con->setOutputHint(
output.name,
output.format);
con->setInputHint(
input.name,
input.format);
con->setRequestedFormat(
templateFormat);
_addConnection(con);
_updateLatenciesFrom(inputRef, true);
if(outConnection) {
*outConnection = *con;
}
return B_OK;
}
status_t NodeManager::connect(
const media_output& output,
const media_input& input,
Connection* outConnection ) {
D_METHOD((
"NodeManager::connect(guess)\n"));
media_format f;
if(output.format.type > B_MEDIA_UNKNOWN_TYPE) {
f = output.format;
if ((input.format.type > B_MEDIA_UNKNOWN_TYPE) &&
(f.type != input.format.type)) {
connectionFailed(output, input, f, B_MEDIA_BAD_FORMAT);
return B_MEDIA_BAD_FORMAT;
}
}
else if(input.format.type > B_MEDIA_UNKNOWN_TYPE) {
f = input.format;
}
else {
f.type = B_MEDIA_UNKNOWN_TYPE;
}
return connect(
output,
input,
f,
outConnection);
}
status_t NodeManager::disconnect(
const Connection& connection) {
Autolock _l(this);
status_t err;
D_METHOD((
"NodeManager::disconnect()\n"));
if(!connection.isValid())
return B_NOT_ALLOWED;
if(connection.flags() & Connection::LOCKED) {
PRINT((
"NodeManager::disconnect(): connection locked:\n"
" %" B_PRId32 ":%s -> %" B_PRId32 ":%s\n",
connection.sourceNode(),
connection.outputName(),
connection.destinationNode(),
connection.inputName()));
return B_NOT_ALLOWED;
}
D_METHOD((
"! breaking connection:\n"
" source node: %" B_PRId32 "\n"
" source id: %" B_PRId32 "\n"
" source port: %" B_PRId32 "\n"
" dest node: %" B_PRId32 "\n"
" dest id: %" B_PRId32 "\n"
" dest port: %" B_PRId32 "\n",
connection.sourceNode(),
connection.source().id, connection.source().port,
connection.destinationNode(),
connection.destination().id, connection.destination().port));
D_ROSTER(("# roster->Disconnect()\n"));
err = roster->Disconnect(
connection.sourceNode(),
connection.source(),
connection.destinationNode(),
connection.destination());
if(err == B_OK) {
con_map::iterator it = m_conSourceMap.lower_bound(connection.sourceNode());
con_map::iterator itEnd = m_conSourceMap.upper_bound(connection.sourceNode());
for(; it != itEnd; ++it)
if((*it).second->id() == connection.id()) {
(*it).second->m_disconnected = true;
break;
}
ASSERT(it != itEnd);
NodeRef* ref;
if(getNodeRef(connection.sourceNode(), &ref) == B_OK)
_updateLatenciesFrom(ref, true);
if(getNodeRef(connection.destinationNode(), &ref) == B_OK)
_updateLatenciesFrom(ref, true);
} else {
PRINT((
"NodeManager::disconnect(): Disconnect() failed:\n %s\n",
strerror(err)));
}
return err;
}
NodeGroup* NodeManager::createGroup(
const char* name,
BMediaNode::run_mode runMode) {
Autolock _l(this);
D_METHOD((
"NodeManager::createGroup()\n"));
NodeGroup* g = new NodeGroup(name, this, runMode);
_addGroup(g);
return g;
}
status_t NodeManager::getNextRef(
NodeRef** ref,
void** cookie) {
ASSERT(IsLocked());
ASSERT(cookie);
if(!*cookie)
*cookie = new node_ref_map::iterator(m_nodeRefMap.begin());
node_ref_map::iterator* pit = (node_ref_map::iterator*)*cookie;
if(*pit == m_nodeRefMap.end()) {
delete pit;
*cookie = 0;
return B_BAD_INDEX;
}
*ref = (*(*pit)).second;
++(*pit);
return B_OK;
}
void NodeManager::disposeRefCookie(
void** cookie) {
if(!cookie)
return;
node_ref_map::iterator* it =
reinterpret_cast<node_ref_map::iterator*>(*cookie);
ASSERT(it);
if(it)
delete it;
}
status_t NodeManager::getNextConnection(
Connection* connection,
void** cookie) {
ASSERT(IsLocked());
ASSERT(cookie);
if(!*cookie)
*cookie = new con_map::iterator(m_conSourceMap.begin());
con_map::iterator* pit = (con_map::iterator*)*cookie;
if(*pit == m_conSourceMap.end()) {
delete pit;
*cookie = 0;
return B_BAD_INDEX;
}
*connection = *((*(*pit)).second);
++(*pit);
return B_OK;
}
void NodeManager::disposeConnectionCookie(
void** cookie) {
if(!cookie)
return;
con_map::iterator* it =
reinterpret_cast<con_map::iterator*>(*cookie);
ASSERT(it);
if(it)
delete it;
}
void NodeManager::MessageReceived(BMessage* message) {
D_MESSAGE((
"NodeManager::MessageReceived(): %c%c%c%c\n",
message->what >> 24,
(message->what >> 16) & 0xff,
(message->what >> 8) & 0xff,
(message->what) & 0xff));
switch(message->what) {
case B_MEDIA_NODE_CREATED:
if(_handleNodesCreated(message) == B_OK)
notify(message);
break;
case B_MEDIA_NODE_DELETED:
_handleNodesDeleted(message);
notify(message);
break;
case B_MEDIA_CONNECTION_MADE:
_handleConnectionMade(message);
notify(message);
break;
case B_MEDIA_CONNECTION_BROKEN:
_handleConnectionBroken(message);
notify(message);
break;
case B_MEDIA_FORMAT_CHANGED:
_handleFormatChanged(message);
notify(message);
break;
default:
_inherited::MessageReceived(message);
break;
}
}
void NodeManager::observerAdded(
const BMessenger& observer) {
BMessage m(M_OBSERVER_ADDED);
m.AddMessenger("target", BMessenger(this));
observer.SendMessage(&m);
}
void NodeManager::observerRemoved(
const BMessenger& observer) {
BMessage m(M_OBSERVER_REMOVED);
m.AddMessenger("target", BMessenger(this));
observer.SendMessage(&m);
}
void NodeManager::notifyRelease() {
BMessage m(M_RELEASED);
m.AddMessenger("target", BMessenger(this));
notify(&m);
}
void NodeManager::releaseComplete() {
D_ROSTER(("# roster->StopWatching()\n"));
status_t err = roster->StopWatching(
BMessenger(this));
if(err < B_OK) {
PRINT((
"* roster->StopWatching() failed: %s\n", strerror(err)));
}
}
bool NodeManager::lock(
lock_t type,
bigtime_t timeout) {
D_LOCK(("*** NodeManager::lock(): %ld\n", find_thread(0)));
status_t err = LockWithTimeout(timeout);
D_LOCK(("*** NodeManager::lock() ACQUIRED: %ld\n", find_thread(0)));
return err == B_OK;
}
bool NodeManager::unlock(
lock_t type) {
D_LOCK(("*** NodeManager::unlock(): %ld\n", find_thread(0)));
Unlock();
D_LOCK(("*** NodeManager::unlock() RELEASED: %ld\n", find_thread(0)));
return true;
}
bool NodeManager::isLocked(
lock_t type) const {
return IsLocked();
}
void NodeManager::_initCommonNodes() {
ASSERT(IsLocked());
status_t err;
media_node node;
D_METHOD((
"NodeManager::_initCommonNodes()\n"));
uint32 disableTransport =
(NodeRef::NO_START_STOP | NodeRef::NO_SEEK | NodeRef::NO_PREROLL);
D_ROSTER(("# roster->GetVideoInput()\n"));
err = roster->GetVideoInput(&node);
if(err == B_OK)
m_videoInputNode = _addRefFor(
node,
_userFlagsForKind(node.kind),
_implFlagsForKind(node.kind));
D_ROSTER(("# roster->GetVideoOutput()\n"));
err = roster->GetVideoOutput(&node);
if(err == B_OK) {
if(m_videoInputNode && node.node == m_videoInputNode->id()) {
m_videoOutputNode = m_videoInputNode;
}
else {
m_videoOutputNode = _addRefFor(
node,
_userFlagsForKind(node.kind) & ~NodeRef::NO_START_STOP,
_implFlagsForKind(node.kind));
}
}
D_ROSTER(("# roster->GetAudioMixer()\n"));
err = roster->GetAudioMixer(&node);
if(err == B_OK)
m_audioMixerNode = _addRefFor(
node,
_userFlagsForKind(node.kind) | disableTransport,
_implFlagsForKind(node.kind));
D_ROSTER(("# roster->GetAudioInput()\n"));
err = roster->GetAudioInput(&node);
if(err == B_OK)
m_audioInputNode = _addRefFor(
node,
_userFlagsForKind(node.kind),
_implFlagsForKind(node.kind));
D_ROSTER(("# roster->GetAudioOutput()\n"));
err = roster->GetAudioOutput(&node);
if(err == B_OK) {
if(m_audioInputNode && node.node == m_audioInputNode->id()) {
m_audioOutputNode = m_audioInputNode;
m_audioOutputNode->setFlags(
m_audioOutputNode->flags() | disableTransport);
}
else {
m_audioOutputNode = _addRefFor(
node,
_userFlagsForKind(node.kind) | disableTransport,
_implFlagsForKind(node.kind));
}
}
}
void NodeManager::_addRef(
NodeRef* ref) {
ASSERT(ref);
ASSERT(IsLocked());
D_METHOD((
"NodeManager::_addRef()\n"));
ASSERT(
m_nodeRefMap.find(ref->id()) == m_nodeRefMap.end());
m_nodeRefMap.insert(node_ref_map::value_type(ref->id(), ref));
nodeCreated(ref);
}
void NodeManager::_removeRef(
media_node_id id) {
ASSERT(IsLocked());
D_METHOD((
"NodeManager::_removeRef()\n"));
node_ref_map::iterator it = m_nodeRefMap.find(id);
ASSERT(it != m_nodeRefMap.end());
nodeDeleted((*it).second);
m_nodeRefMap.erase(it);
}
NodeRef* NodeManager::_addRefFor(
const media_node& node,
uint32 nodeFlags,
uint32 nodeImplFlags) {
ASSERT(IsLocked());
D_METHOD((
"NodeManager::_addRefFor()\n"));
ASSERT(
m_nodeRefMap.find(node.node) == m_nodeRefMap.end());
NodeRef* ref = new NodeRef(node, this, nodeFlags, nodeImplFlags);
_addRef(ref);
return ref;
}
void NodeManager::_addConnection(
Connection* connection) {
ASSERT(connection);
ASSERT(IsLocked());
D_METHOD((
"NodeManager::_addConnection()\n"));
#ifdef DEBUG
for(con_map::iterator it = m_conSourceMap.lower_bound(connection->sourceNode());
it != m_conSourceMap.upper_bound(connection->sourceNode()); ++it) {
ASSERT((*it).second->id() != connection->id());
}
for(con_map::iterator it = m_conDestinationMap.lower_bound(connection->destinationNode());
it != m_conDestinationMap.upper_bound(connection->destinationNode()); ++it) {
ASSERT((*it).second->id() != connection->id());
}
#endif
m_conSourceMap.insert(
con_map::value_type(
connection->sourceNode(),
connection));
m_conDestinationMap.insert(
con_map::value_type(
connection->destinationNode(),
connection));
connectionMade(connection);
}
void NodeManager::_removeConnection(
const Connection& connection) {
ASSERT(IsLocked());
con_map::iterator itSource, itDestination;
D_METHOD((
"NodeManager::_removeConnection()\n"));
connectionBroken(&connection);
for(
itSource = m_conSourceMap.lower_bound(connection.sourceNode());
itSource != m_conSourceMap.upper_bound(connection.sourceNode());
++itSource)
if((*itSource).second->id() == connection.id())
break;
ASSERT(itSource != m_conSourceMap.end());
for(
itDestination = m_conDestinationMap.lower_bound(connection.destinationNode());
itDestination != m_conDestinationMap.upper_bound(connection.destinationNode());
++itDestination)
if((*itDestination).second->id() == connection.id())
break;
ASSERT(itDestination != m_conDestinationMap.end());
delete (*itSource).second;
m_conSourceMap.erase(itSource);
m_conDestinationMap.erase(itDestination);
}
void NodeManager::_addGroup(
NodeGroup* group) {
ASSERT(group);
ASSERT(IsLocked());
D_METHOD((
"NodeManager::_addGroup()\n"));
ASSERT(
find(
m_nodeGroupSet.begin(),
m_nodeGroupSet.end(),
group) == m_nodeGroupSet.end());
m_nodeGroupSet.push_back(group);
}
void NodeManager::_removeGroup(
NodeGroup* group) {
ASSERT(group);
ASSERT(IsLocked());
D_METHOD((
"NodeManager::_removeGroup()\n"));
node_group_set::iterator it = find(
m_nodeGroupSet.begin(),
m_nodeGroupSet.end(),
group);
if(it == m_nodeGroupSet.end()) {
PRINT((
"* NodeManager::_removeGroup(%" B_PRId32 "): group not in set.\n",
group->id()));
return;
}
m_nodeGroupSet.erase(it);
}
inline status_t NodeManager::_handleNodesCreated(
BMessage* message) {
ASSERT(IsLocked());
status_t err = B_OK;
type_code type;
int32 count;
err = message->GetInfo("media_node_id", &type, &count);
if(err < B_OK) {
PRINT((
"* NodeManager::_handleNodesCreated(): GetInfo() failed:\n"
" %s\n",
strerror(err)));
return err;
}
if(!count) {
PRINT((
"* NodeManager::_handleNodesCreated(): no node IDs in message.\n"));
return err;
}
D_METHOD((
"NodeManager::_handleNodesCreated(): %" B_PRId32 " nodes\n",
count));
typedef map<port_id, NodeRef*> port_ref_map;
port_ref_map* initialNodes = m_existingNodesInit ? 0 : new port_ref_map;
bool refsCreated = false;
for(int32 n = 0; n < count; ++n) {
int32 id;
err = message->FindInt32("media_node_id", n, &id);
if(err < B_OK) {
PRINT((
"* NodeManager::_handleNodesCreated(): FindInt32() failed:\n"
" %s", strerror(err)));
continue;
}
media_node node;
err = roster->GetNodeFor(id, &node);
if(err < B_OK) {
PRINT((
"* NodeManager::_handleNodesCreated(): roster->GetNodeFor(%"
B_PRId32 ") failed:\n"
" %s\n",
id, strerror(err)));
continue;
}
NodeRef* ref = 0;
if(getNodeRef(node.node, &ref) < B_OK) {
ref = _addRefFor(
node,
_userFlagsForKind(node.kind),
_implFlagsForKind(node.kind) | NodeRef::_CREATE_NOTIFIED);
refsCreated = true;
} else {
if(!(ref->m_implFlags & NodeRef::_CREATE_NOTIFIED)) {
ref->m_implFlags |= NodeRef::_CREATE_NOTIFIED;
refsCreated = true;
}
err = roster->ReleaseNode(node);
if(err < B_OK) {
PRINT((
"* NodeManager::_handleNodesCreated(): roster->ReleaseNode(%"
B_PRId32 ") failed:\n"
" %s\n",
id, strerror(err)));
}
}
if(initialNodes)
initialNodes->insert(
port_ref_map::value_type(
node.port, ref));
}
if(initialNodes) {
for(port_ref_map::const_iterator itDest = initialNodes->begin();
itDest != initialNodes->end(); ++itDest) {
NodeRef* destRef = (*itDest).second;
ASSERT(destRef);
if(!(destRef->kind() & B_BUFFER_CONSUMER))
continue;
vector<media_input> inputs;
err = destRef->getConnectedInputs(inputs);
if(err < B_OK) {
PRINT((
"!!! NodeManager::_handleNodesCreated():\n"
" NodeRef('%s')::getConnectedInputs() failed:\n"
" %s\n",
destRef->name(), strerror(err)));
continue;
}
for(vector<media_input>::const_iterator itInput = inputs.begin();
itInput != inputs.end(); ++itInput) {
const media_input& input = *itInput;
port_ref_map::const_iterator itSource = initialNodes->find(
input.source.port);
if(itSource == initialNodes->end()) {
PRINT((
"* NodeManager::_handleNodesCreated():\n"
" Building initial Connection set: couldn't find source node\n"
" connected to input '%s' of '%s' (source port %" B_PRId32 ").\n",
input.name, destRef->name(), input.source.port));
continue;
}
NodeRef* sourceRef = (*itSource).second;
ASSERT(sourceRef);
media_output output;
err = sourceRef->findOutput(input.source, &output);
if(err < B_OK) {
PRINT((
"* NodeManager::_handleNodesCreated():\n"
" Building initial Connection set: couldn't find output\n"
" in node '%s' connected to input '%s' of '%s'.\n",
sourceRef->name(),
input.name, destRef->name()));
continue;
}
if(input.source != output.source ||
input.destination != output.destination) {
PRINT((
"!!! NodeManager::_handleNodesCreated():\n"
" input/output mismatch for connection\n"
" '%s' (%s) -> '%s' (%s)\n"
" input.source: port %" B_PRId32 ", ID %" B_PRId32 "\n"
" output.source: port %" B_PRId32 ", ID %" B_PRId32 "\n"
" input.destination: port %" B_PRId32 ", ID %" B_PRId32 "\n"
" output.destination: port %" B_PRId32 ", ID %" B_PRId32 "\n\n",
sourceRef->name(), output.name,
destRef->name(), input.name,
input.source.port, input.source.id,
output.source.port, output.source.id,
input.destination.port, input.destination.id,
output.destination.port, output.destination.id));
continue;
}
Connection* con = new Connection(
m_nextConID++,
output.node,
output.source,
output.name,
input.node,
input.destination,
input.name,
input.format,
0);
_addConnection(con);
}
}
m_existingNodesInit = true;
delete initialNodes;
}
return refsCreated ? B_OK : B_ERROR;
}
inline void NodeManager::_handleNodesDeleted(
BMessage* message) {
ASSERT(IsLocked());
D_METHOD((
"NodeManager::_handleNodesDeleted()\n"));
type_code type;
int32 count;
status_t err = message->GetInfo("media_node_id", &type, &count);
if(err < B_OK) {
PRINT((
"* NodeManager::_handleNodesDeleted(): GetInfo() failed:\n"
" %s\n",
strerror(err)));
return;
}
if(!count)
return;
for(int32 n = 0; n < count; n++) {
int32 id;
err = message->FindInt32("media_node_id", n, &id);
if(err < B_OK) {
PRINT((
"* NodeManager::_handleNodesDeleted(): FindInt32() failed\n"
" %s\n",
strerror(err)));
continue;
}
NodeRef* ref;
err = getNodeRef(id, &ref);
if(err < B_OK) {
PRINT((
"* NodeManager::_handleNodesDeleted(): getNodeRef(%" B_PRId32
") failed\n"
" %s\n",
id, strerror(err)));
continue;
}
vector<Connection> stuckConnections;
ref->getInputConnections(stuckConnections);
ref->getOutputConnections(stuckConnections);
BMessage message(B_MEDIA_CONNECTION_BROKEN);
for(uint32 n = 0; n < stuckConnections.size(); ++n) {
Connection& c = stuckConnections[n];
message.AddData("source", B_RAW_TYPE, &c.source(), sizeof(media_source));
message.AddData("destination", B_RAW_TYPE, &c.destination(), sizeof(media_destination));
message.AddInt32(_connectionField, c.id());
message.AddInt32(_sourceNodeField, c.sourceNode());
message.AddInt32(_destNodeField, c.destinationNode());
_removeConnection(c);
}
notify(&message);
if(ref->m_group) {
ASSERT(!ref->isReleased());
ref->m_group->removeNode(ref);
}
ref->m_nodeReleased = true;
ref->release();
}
}
inline void NodeManager::_handleConnectionMade(
BMessage* message) {
ASSERT(IsLocked());
D_METHOD((
"NodeManager::_handleConnectionMade()\n"));
status_t err;
for(int32 n = 0;;++n) {
media_input input;
media_output output;
const void* data;
ssize_t dataSize;
err = message->FindData("output", B_RAW_TYPE, n, &data, &dataSize);
if(err < B_OK) {
if(!n) {
PRINT((
"* NodeManager::_handleConnectionMade(): no entries in message.\n"));
}
break;
}
if(dataSize < ssize_t(sizeof(media_output))) {
PRINT((
"* NodeManager::_handleConnectionMade(): not enough data for output.\n"));
break;
}
output = *(media_output*)data;
err = message->FindData("input", B_RAW_TYPE, n, &data, &dataSize);
if(err < B_OK) {
if(!n) {
PRINT((
"* NodeManager::_handleConnectionMade(): no complete entries in message.\n"));
}
break;
}
if(dataSize < ssize_t(sizeof(media_input))) {
PRINT((
"* NodeManager::_handleConnectionMade(): not enough data for input.\n"));
break;
}
input = *(media_input*)data;
Connection found;
err = findConnection(
output.node.node,
output.source,
&found);
if(err == B_OK) {
PRINT((
" - existing connection for %s -> %s found\n",
found.outputName(), found.inputName()));
continue;
}
Connection* con = new Connection(
m_nextConID++,
output.node,
output.source,
output.name,
input.node,
input.destination,
input.name,
input.format,
0);
_addConnection(con);
}
}
inline void NodeManager::_handleConnectionBroken(
BMessage* message) {
D_METHOD((
"NodeManager::_handleConnectionBroken()\n"));
status_t err;
for(int32 n=0;;n++) {
media_source source;
const void* data;
ssize_t dataSize;
err = message->FindData("source", B_RAW_TYPE, n, &data, &dataSize);
if(err < B_OK) {
if(!n) {
PRINT((
"* NodeManager::_handleConnectionBroken(): incomplete entry in message.\n"));
}
break;
}
if(dataSize < ssize_t(sizeof(media_source))) {
PRINT((
"* NodeManager::_handleConnectionBroken(): not enough data for source.\n"));
continue;
}
source = *(media_source*)data;
Connection con;
err = findConnection(source, &con);
if(err < B_OK) {
PRINT((
"* NodeManager::_handleConnectionBroken(): connection not found:\n"
" %" B_PRId32 ":%" B_PRId32 "\n",
source.port, source.id));
message->AddInt32(_connectionField, 0);
message->AddInt32(_sourceNodeField, 0);
message->AddInt32(_destNodeField, 0);
continue;
}
message->AddInt32(_connectionField, con.id());
message->AddInt32(_sourceNodeField, con.sourceNode());
message->AddInt32(_destNodeField, con.destinationNode());
_removeConnection(con);
}
}
inline void
NodeManager::_handleFormatChanged(BMessage *message)
{
D_METHOD((
"NodeManager::_handleFormatChanged()\n"));
status_t err;
ssize_t dataSize;
media_source* source;
err = message->FindData("be:source", B_RAW_TYPE, (const void**)&source, &dataSize);
if(err < B_OK) {
PRINT((
"* NodeManager::_handleFormatChanged(): incomplete entry in message.\n"));
return;
}
media_destination* destination;
err = message->FindData("be:destination", B_RAW_TYPE, (const void**)&destination, &dataSize);
if(err < B_OK) {
PRINT((
"* NodeManager::_handleFormatChanged(): incomplete entry in message.\n"));
return;
}
media_format* format;
err = message->FindData("be:format", B_RAW_TYPE, (const void**)&format, &dataSize);
if(err < B_OK) {
PRINT((
"* NodeManager::_handleFormatChanged(): incomplete entry in message.\n"));
return;
}
for(con_map::const_iterator it = m_conSourceMap.begin();
it != m_conSourceMap.end(); ++it) {
if((*it).second->source() == *source) {
if((*it).second->destination() != *destination) {
return;
}
(*it).second->m_format = *format;
message->AddInt32(_connectionField, (*it).second->id());
message->AddInt32(_sourceNodeField, (*it).second->sourceNode());
message->AddInt32(_destNodeField, (*it).second->destinationNode());
break;
}
}
}
inline uint32 NodeManager::_userFlagsForKind(
uint32 kind) {
uint32 f = 0;
if(
kind & B_PHYSICAL_OUTPUT
)
f |= (NodeRef::NO_START_STOP | NodeRef::NO_SEEK | NodeRef::NO_PREROLL);
return f;
}
inline uint32 NodeManager::_implFlagsForKind(
uint32 kind) {
return 0;
}
inline void NodeManager::_updateLatencies(
NodeGroup* group) {
ASSERT(IsLocked());
if(group) {
ASSERT(group->isLocked());
}
if(group) {
for(NodeGroup::node_set::iterator it = group->m_nodes.begin();
it != group->m_nodes.end(); ++it) {
(*it)->_updateLatency();
}
}
else {
for(node_ref_map::iterator it = m_nodeRefMap.begin();
it != m_nodeRefMap.end(); ++it) {
(*it).second->_updateLatency();
}
}
}
inline void NodeManager::_updateLatenciesFrom(
NodeRef* origin,
bool recurse) {
ASSERT(IsLocked());
origin->lock();
origin->_updateLatency();
origin->unlock();
_lockAllGroups();
_for_each_state st;
_do_for_each_connected(
this,
origin,
0,
recurse,
#if __GNUC__ <= 2
mem_fun(&NodeRef::_updateLatency),
#else
[](NodeRef* node) { return node->_updateLatency(); },
#endif
&st);
_unlockAllGroups();
}
void NodeManager::_lockAllGroups() {
ASSERT(IsLocked());
for(node_group_set::iterator it = m_nodeGroupSet.begin();
it != m_nodeGroupSet.end(); ++it) {
(*it)->lock();
}
}
void NodeManager::_unlockAllGroups() {
ASSERT(IsLocked());
for(node_group_set::iterator it = m_nodeGroupSet.begin();
it != m_nodeGroupSet.end(); ++it) {
(*it)->unlock();
}
}