* Copyright 1991-1999, Be Incorporated.
* Copyright (c) 1999-2000, Eric Moon.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions, and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions, and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* 3. The name of the author may not be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR "AS IS" AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY AND FITNESS FOR A PARTICULAR
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
* TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "LoggingConsumer.h"
#include "LogWriter.h"
#include <media/MediaRoster.h>
#include <media/TimeSource.h>
#include <media/ParameterWeb.h>
#include <media/Buffer.h>
#include <kernel/OS.h>
#include <stdio.h>
#include <string.h>
#include <Catalog.h>
#include <Debug.h>
#undef B_TRANSLATION_CONTEXT
#define B_TRANSLATION_CONTEXT "CortexAddOnsLoggingConsumer"
const int32 INPUT_NULL_PARAM = 1;
const int32 LATENCY_PARAM = 2;
const int32 OUTPUT_NULL_PARAM = 3;
const int32 CPU_NULL_PARAM = 11;
const int32 CPU_SPIN_PARAM = 12;
const int32 PRIO_NULL_PARAM = 21;
const int32 PRIORITY_PARAM = 22;
static BParameterWeb* build_parameter_web()
{
BParameterWeb* web = new BParameterWeb;
BParameterGroup* mainGroup = web->MakeGroup(
B_TRANSLATE("LoggingConsumer parameters"));
BParameterGroup* group = mainGroup->MakeGroup(
B_TRANSLATE("Latency control"));
BParameter* nullParam = group->MakeNullParameter(INPUT_NULL_PARAM,
B_MEDIA_NO_TYPE, B_TRANSLATE("Latency"), B_GENERIC);
BParameter* latencyParam = group->MakeContinuousParameter(LATENCY_PARAM,
B_MEDIA_NO_TYPE, "", B_GAIN, "ms", 5, 100, 5);
nullParam->AddOutput(latencyParam);
latencyParam->AddInput(nullParam);
group = mainGroup->MakeGroup(B_TRANSLATE("CPU percentage"));
nullParam = group->MakeNullParameter(CPU_NULL_PARAM, B_MEDIA_NO_TYPE,
B_TRANSLATE("CPU spin percentage"), B_GENERIC);
BContinuousParameter* cpuParam = group->MakeContinuousParameter(
CPU_SPIN_PARAM, B_MEDIA_NO_TYPE, "",
B_GAIN, B_TRANSLATE("percent"), 5, 80, 5);
nullParam->AddOutput(cpuParam);
cpuParam->AddInput(nullParam);
group = mainGroup->MakeGroup(B_TRANSLATE("Priority"));
nullParam = group->MakeNullParameter(PRIO_NULL_PARAM, B_MEDIA_NO_TYPE,
B_TRANSLATE("Thread priority"), B_GENERIC);
BDiscreteParameter* prioParam = group->MakeDiscreteParameter(PRIORITY_PARAM,
B_MEDIA_NO_TYPE, "", B_GENERIC);
prioParam->AddItem(5, "B_LOW_PRIORITY");
prioParam->AddItem(10, "B_NORMAL_PRIORITY");
prioParam->AddItem(15, "B_DISPLAY_PRIORITY");
prioParam->AddItem(20, "B_URGENT_DISPLAY_PRIORITY");
prioParam->AddItem(100, "B_REAL_TIME_DISPLAY_PRIORITY");
prioParam->AddItem(110, "B_URGENT_PRIORITY");
prioParam->AddItem(120, "B_REAL_TIME_PRIORITY");
return web;
}
LoggingConsumer::LoggingConsumer(
const entry_ref& logFile,
BMediaAddOn* pAddOn)
: BMediaNode("LoggingConsumer"),
BBufferConsumer(B_MEDIA_UNKNOWN_TYPE),
BControllable(),
BMediaEventLooper(),
mLogRef(logFile),
mWeb(NULL),
mLateBuffers(0),
mLatency(50 * 1000),
mSpinPercentage(0.10),
mPriority(B_URGENT_DISPLAY_PRIORITY),
mLastLatencyChange(0),
mLastSpinChange(0),
mLastPrioChange(0),
m_pAddOn(pAddOn)
{
mLogger = new LogWriter(logFile);
}
LoggingConsumer::~LoggingConsumer()
{
PRINT(("~LoggingConsumer()\n"));
BMediaEventLooper::Quit();
delete mLogger;
}
void
LoggingConsumer::SetEnabled(log_what what, bool enable)
{
mLogger->SetEnabled(what, enable);
}
void
LoggingConsumer::EnableAllMessages()
{
mLogger->EnableAllMessages();
}
void
LoggingConsumer::DisableAllMessages()
{
mLogger->DisableAllMessages();
}
BMediaAddOn*
LoggingConsumer::AddOn(int32 *internal_id) const
{
PRINT(("~LoggingConsumer::AddOn()\n"));
if(m_pAddOn) {
*internal_id = 0;
return m_pAddOn;
} else
return NULL;
}
void
LoggingConsumer::SetRunMode(run_mode mode)
{
log_message logMsg;
logMsg.now = TimeSource()->Now();
mLogger->Log(LOG_SET_RUN_MODE, logMsg);
BMediaEventLooper::SetRunMode(mode);
}
void
LoggingConsumer::Preroll()
{
log_message logMsg;
logMsg.now = TimeSource()->Now();
mLogger->Log(LOG_PREROLL, logMsg);
BMediaEventLooper::Preroll();
}
void
LoggingConsumer::SetTimeSource(BTimeSource* time_source)
{
log_message logMsg;
logMsg.now = TimeSource()->Now();
mLogger->Log(LOG_SET_TIME_SOURCE, logMsg);
BMediaNode::SetTimeSource(time_source);
}
status_t
LoggingConsumer::RequestCompleted(const media_request_info &info)
{
log_message logMsg;
logMsg.now = TimeSource()->Now();
mLogger->Log(LOG_REQUEST_COMPLETED, logMsg);
return BMediaNode::RequestCompleted(info);
}
status_t
LoggingConsumer::DeleteHook(BMediaNode* pNode) {
PRINT(("LoggingConsumer::DeleteHook(%p)\n", pNode));
return BBufferConsumer::DeleteHook(pNode);
}
status_t
LoggingConsumer::GetParameterValue(int32 id, bigtime_t* last_change, void* value, size_t* ioSize)
{
log_message logMsg;
logMsg.now = TimeSource()->Now();
logMsg.param.id = id;
mLogger->Log(LOG_GET_PARAM_VALUE, logMsg);
if (*ioSize < sizeof(float)) return B_ERROR;
switch (id)
{
case LATENCY_PARAM:
*last_change = mLastLatencyChange;
*((float*) value) = mLatency / 1000;
*ioSize = sizeof(float);
break;
case CPU_SPIN_PARAM:
*last_change = mLastSpinChange;
*((float*) value) = mSpinPercentage;
*ioSize = sizeof(float);
break;
case PRIORITY_PARAM:
*last_change = mLastPrioChange;
*((int32*) value) = mPriority;
*ioSize = sizeof(int32);
break;
default:
return B_ERROR;
}
return B_OK;
}
void
LoggingConsumer::SetParameterValue(int32 id, bigtime_t performance_time, const void* value, size_t size)
{
log_message logMsg;
logMsg.now = TimeSource()->Now();
logMsg.param.id = id;
mLogger->Log(LOG_SET_PARAM_VALUE, logMsg);
switch (id)
{
case LATENCY_PARAM:
case CPU_SPIN_PARAM:
case PRIORITY_PARAM:
{
media_timed_event event(performance_time, BTimedEventQueue::B_USER_EVENT,
(void*) value, BTimedEventQueue::B_NO_CLEANUP, size, id, NULL);
EventQueue()->AddEvent(event);
}
break;
default:
break;
}
return;
}
status_t
LoggingConsumer::HandleMessage(int32 message, const void *data, size_t size)
{
log_message logMsg;
logMsg.now = TimeSource()->Now();
mLogger->Log(LOG_HANDLE_MESSAGE, logMsg);
status_t err;
err = BControllable::HandleMessage(message, data, size);
if (err) err = BBufferConsumer::HandleMessage(message, data, size);
if (err) err = BMediaNode::HandleMessage(message, data, size);
return err;
}
status_t
LoggingConsumer::AcceptFormat(const media_destination& dest, media_format* format)
{
char formatStr[256];
string_for_format(*format, formatStr, 255);
PRINT(("LoggingConsumer::AcceptFormat:\n\tformat %s\n", formatStr));
log_message logMsg;
logMsg.now = TimeSource()->Now();
mLogger->Log(LOG_ACCEPT_FORMAT, logMsg);
if (dest != mInput.destination) return B_MEDIA_BAD_DESTINATION;
return B_OK;
}
status_t
LoggingConsumer::GetNextInput(int32* cookie, media_input* out_input)
{
if (0 == *cookie)
{
mInput.format.type = B_MEDIA_UNKNOWN_TYPE;
*out_input = mInput;
*cookie = 1;
return B_OK;
}
else return B_BAD_INDEX;
}
void
LoggingConsumer::DisposeInputCookie(int32 )
{
}
void
LoggingConsumer::BufferReceived(BBuffer* buffer)
{
bigtime_t bufferStart = buffer->Header()->start_time;
bigtime_t now = TimeSource()->Now();
bigtime_t how_early = bufferStart - EventLatency() - SchedulingLatency() - now;
log_message logMsg;
logMsg.now = now;
logMsg.buffer_data.start_time = bufferStart;
logMsg.buffer_data.offset = how_early;
logMsg.buffer_data.size_available = buffer->SizeAvailable();
logMsg.buffer_data.size_used = buffer->SizeUsed();
logMsg.buffer_data.flags = buffer->Flags();
logMsg.buffer_data.id = buffer->ID();
logMsg.buffer_data.type = buffer->Type();
mLogger->Log(LOG_BUFFER_RECEIVED, logMsg);
if (B_MEDIA_PARAMETERS == buffer->Header()->type)
{
ApplyParameterData(buffer->Data(), buffer->SizeUsed());
buffer->Recycle();
}
else
{
status_t err;
media_timed_event event(buffer->Header()->start_time, BTimedEventQueue::B_HANDLE_BUFFER,
buffer, BTimedEventQueue::B_RECYCLE_BUFFER);
err = EventQueue()->AddEvent(event);
if (err) buffer->Recycle();
}
}
void
LoggingConsumer::ProducerDataStatus(const media_destination& for_whom, int32 status, bigtime_t at_performance_time)
{
log_message logMsg;
logMsg.now = TimeSource()->Now();
logMsg.data_status.status = status;
mLogger->Log(LOG_PRODUCER_DATA_STATUS, logMsg);
if (for_whom == mInput.destination)
{
media_timed_event event(at_performance_time, BTimedEventQueue::B_DATA_STATUS,
&mInput, BTimedEventQueue::B_NO_CLEANUP, status, 0, NULL);
EventQueue()->AddEvent(event);
}
}
status_t
LoggingConsumer::GetLatencyFor(const media_destination& for_whom, bigtime_t* out_latency, media_node_id* out_timesource)
{
if (for_whom != mInput.destination) return B_MEDIA_BAD_DESTINATION;
*out_latency = mLatency;
*out_timesource = TimeSource()->ID();
return B_OK;
}
status_t
LoggingConsumer::Connected(
const media_source& producer,
const media_destination& where,
const media_format& with_format,
media_input* out_input)
{
char formatStr[256];
string_for_format(with_format, formatStr, 255);
PRINT(("LoggingConsumer::Connected:\n\tformat %s\n", formatStr));
string_for_format(mInput.format, formatStr, 255);
PRINT(("\tinput format %s\n", formatStr));
log_message logMsg;
logMsg.now = TimeSource()->Now();
mLogger->Log(LOG_CONNECTED, logMsg);
if (where != mInput.destination) return B_MEDIA_BAD_DESTINATION;
SetEventLatency(mLatency);
mInput.format = with_format;
mInput.source = producer;
*out_input = mInput;
return B_OK;
}
void
LoggingConsumer::Disconnected(
const media_source& producer,
const media_destination& where)
{
log_message logMsg;
logMsg.now = TimeSource()->Now();
mLogger->Log(LOG_DISCONNECTED, logMsg);
mInput = media_input();
}
status_t
LoggingConsumer::FormatChanged(
const media_source& producer,
const media_destination& consumer,
int32 change_tag,
const media_format& format)
{
log_message logMsg;
logMsg.now = TimeSource()->Now();
mLogger->Log(LOG_FORMAT_CHANGED, logMsg);
return B_OK;
}
status_t
LoggingConsumer::SeekTagRequested(
const media_destination& destination,
bigtime_t in_target_time,
uint32 in_flags,
media_seek_tag* out_seek_tag,
bigtime_t* out_tagged_time,
uint32* out_flags)
{
log_message logMsg;
logMsg.now = TimeSource()->Now();
mLogger->Log(LOG_SEEK_TAG, logMsg);
return B_OK;
}
void
LoggingConsumer::NodeRegistered()
{
log_message logMsg;
logMsg.now = TimeSource()->Now();
mLogger->Log(LOG_REGISTERED, logMsg);
mWeb = build_parameter_web();
SetParameterWeb(mWeb);
SetPriority(mPriority);
Run();
mInput.destination.port = ControlPort();
mInput.destination.id = 0;
mInput.node = Node();
strcpy(mInput.name, B_TRANSLATE("Logged input"));
}
void
LoggingConsumer::Start(bigtime_t performance_time)
{
PRINT(("LoggingConsumer::Start(%" B_PRIdBIGTIME "): now %" B_PRIdBIGTIME "\n",
performance_time, TimeSource()->Now()));
log_message logMsg;
logMsg.now = TimeSource()->Now();
mLogger->Log(LOG_START, logMsg);
BMediaEventLooper::Start(performance_time);
}
void
LoggingConsumer::Stop(bigtime_t performance_time, bool immediate)
{
log_message logMsg;
logMsg.now = TimeSource()->Now();
mLogger->Log(LOG_STOP, logMsg);
BMediaEventLooper::Stop(performance_time, immediate);
}
void
LoggingConsumer::Seek(bigtime_t media_time, bigtime_t performance_time)
{
log_message logMsg;
logMsg.now = TimeSource()->Now();
mLogger->Log(LOG_SEEK, logMsg);
BMediaEventLooper::Seek(media_time, performance_time);
}
void
LoggingConsumer::TimeWarp(bigtime_t at_real_time, bigtime_t to_performance_time)
{
log_message logMsg;
logMsg.now = TimeSource()->Now();
mLogger->Log(LOG_TIMEWARP, logMsg);
BMediaEventLooper::TimeWarp(at_real_time, to_performance_time);
}
void
LoggingConsumer::HandleEvent(const media_timed_event *event, bigtime_t , bool )
{
log_message logMsg;
logMsg.now = TimeSource()->Now();
mLogger->Log(LOG_HANDLE_EVENT, logMsg);
switch (event->type)
{
case BTimedEventQueue::B_HANDLE_BUFFER:
{
BBuffer* buffer = const_cast<BBuffer*>((BBuffer*) event->pointer);
if (buffer)
{
media_header* hdr = buffer->Header();
if (hdr->destination == mInput.destination.id)
{
bigtime_t now = TimeSource()->Now();
bigtime_t perf_time = hdr->start_time;
bigtime_t how_early = perf_time - mLatency - now;
logMsg.buffer_data.start_time = perf_time;
logMsg.buffer_data.offset = how_early;
mLogger->Log(LOG_BUFFER_HANDLED, logMsg);
if (how_early < 0)
{
mLateBuffers++;
NotifyLateProducer(mInput.source, -how_early, perf_time);
}
else
{
bigtime_t spin_start = ::system_time();
bigtime_t spin_now = spin_start;
bigtime_t usecToSpin = bigtime_t(mSpinPercentage / 100.0 * mLatency);
while (spin_now - spin_start < usecToSpin)
{
for (long k = 0; k < 1000000; k++) { }
spin_now = ::system_time();
}
}
buffer->Recycle();
}
else
{
}
}
}
break;
case BTimedEventQueue::B_USER_EVENT:
{
size_t dataSize = size_t(event->data);
int32 param = int32(event->bigdata);
logMsg.param.id = param;
if (dataSize >= sizeof(float)) switch (param)
{
case LATENCY_PARAM:
{
float value = *((float*) event->pointer);
mLatency = bigtime_t(value* 1000);
mLastLatencyChange = logMsg.now;
SetEventLatency(mLatency);
SendLatencyChange(mInput.source, mInput.destination, EventLatency() + SchedulingLatency());
BroadcastNewParameterValue(logMsg.now, param, &value, sizeof(value));
logMsg.param.value = value;
mLogger->Log(LOG_SET_PARAM_HANDLED, logMsg);
}
break;
case CPU_SPIN_PARAM:
{
float value = *((float*) event->pointer);
mSpinPercentage = value;
mLastSpinChange = logMsg.now;
BroadcastNewParameterValue(logMsg.now, param, &value, sizeof(value));
logMsg.param.value = value;
mLogger->Log(LOG_SET_PARAM_HANDLED, logMsg);
}
break;
case PRIORITY_PARAM:
{
mPriority = *((int32*) event->pointer);
SetPriority(mPriority);
mLastPrioChange = logMsg.now;
BroadcastNewParameterValue(logMsg.now, param, &mPriority, sizeof(mPriority));
logMsg.param.value = (float) mPriority;
mLogger->Log(LOG_SET_PARAM_HANDLED, logMsg);
}
break;
default:
mLogger->Log(LOG_INVALID_PARAM_HANDLED, logMsg);
break;
}
}
break;
case BTimedEventQueue::B_START:
mLogger->Log(LOG_START_HANDLED, logMsg);
break;
case BTimedEventQueue::B_STOP:
mLogger->Log(LOG_STOP_HANDLED, logMsg);
EventQueue()->FlushEvents(0, BTimedEventQueue::B_ALWAYS, true, BTimedEventQueue::B_HANDLE_BUFFER);
break;
case BTimedEventQueue::B_SEEK:
mLogger->Log(LOG_SEEK_HANDLED, logMsg);
break;
case BTimedEventQueue::B_WARP:
mLogger->Log(LOG_WARP_HANDLED, logMsg);
break;
case BTimedEventQueue::B_DATA_STATUS:
logMsg.data_status.status = event->data;
mLogger->Log(LOG_DATA_STATUS_HANDLED, logMsg);
break;
default:
logMsg.unknown.what = event->type;
mLogger->Log(LOG_HANDLE_UNKNOWN, logMsg);
break;
}
}