* Copyright 2018-2022, Andrew Lindesay <apl@lindesay.co.nz>.
* All rights reserved. Distributed under the terms of the MIT License.
*/
#include "ProcessCoordinator.h"
#include <AutoLocker.h>
#include <Catalog.h>
#include <StringFormat.h>
#include <Uuid.h>
#include "Logger.h"
#undef B_TRANSLATION_CONTEXT
#define B_TRANSLATION_CONTEXT "ProcessCoordinator"
#define LOCK_TIMEOUT_MICROS (1000 * 1000)
#define KEY_PROCESS_COORDINATOR_IDENTIFIER "processCoordinatorIdentifier"
#define KEY_PROGRESS "progress"
#define KEY_MESSAGE "message"
#define KEY_IS_RUNNING "isRunning"
#define KEY_ERROR_STATUS "errorStatus"
ProcessCoordinatorState::ProcessCoordinatorState(BMessage* from)
{
if (from->FindString(KEY_PROCESS_COORDINATOR_IDENTIFIER,
&fProcessCoordinatorIdentifier) != B_OK) {
HDFATAL("unable to find the key [%s]",
KEY_PROCESS_COORDINATOR_IDENTIFIER);
}
if (from->FindFloat(KEY_PROGRESS, &fProgress) != B_OK) {
HDFATAL("unable to find the key [%s]", KEY_PROGRESS);
}
if (from->FindString(KEY_MESSAGE, &fMessage) != B_OK) {
HDFATAL("unable to find the key [%s]", KEY_MESSAGE);
}
if (from->FindBool(KEY_IS_RUNNING, &fIsRunning) != B_OK) {
HDFATAL("unable to find the key [%s]", KEY_IS_RUNNING);
}
int64 errorStatusNumeric;
if (from->FindInt64(KEY_ERROR_STATUS, &errorStatusNumeric) != B_OK) {
HDFATAL("unable to find the key [%s]", KEY_ERROR_STATUS);
}
fErrorStatus = static_cast<status_t>(errorStatusNumeric);
}
ProcessCoordinatorState::ProcessCoordinatorState(
const ProcessCoordinator* processCoordinator, float progress,
const BString& message, bool isRunning, status_t errorStatus)
:
fProcessCoordinatorIdentifier(processCoordinator->Identifier()),
fProgress(progress),
fMessage(message),
fIsRunning(isRunning),
fErrorStatus(errorStatus)
{
}
ProcessCoordinatorState::~ProcessCoordinatorState()
{
}
const BString
ProcessCoordinatorState::ProcessCoordinatorIdentifier() const
{
return fProcessCoordinatorIdentifier;
}
float
ProcessCoordinatorState::Progress() const
{
return fProgress;
}
BString
ProcessCoordinatorState::Message() const
{
return fMessage;
}
bool
ProcessCoordinatorState::IsRunning() const
{
return fIsRunning;
}
status_t
ProcessCoordinatorState::ErrorStatus() const
{
return fErrorStatus;
}
status_t
ProcessCoordinatorState::Archive(BMessage* into, bool deep) const
{
status_t result = B_OK;
if (result == B_OK) {
result = into->AddString(KEY_PROCESS_COORDINATOR_IDENTIFIER,
fProcessCoordinatorIdentifier);
}
if (result == B_OK)
result = into->AddFloat(KEY_PROGRESS, fProgress);
if (result == B_OK)
result = into->AddString(KEY_MESSAGE, fMessage);
if (result == B_OK)
result = into->AddBool(KEY_IS_RUNNING, fIsRunning);
if (result == B_OK)
result = into->AddInt64(KEY_ERROR_STATUS, static_cast<int64>(fErrorStatus));
return result;
}
ProcessCoordinator::ProcessCoordinator(const char* name, BMessage* message)
:
fName(name),
fLock(),
fCoordinateAndCallListenerRerun(false),
fCoordinateAndCallListenerRerunLock(),
fListener(NULL),
fMessage(message),
fWasStopped(false),
fIdentifier(BUuid().ToString())
{
}
ProcessCoordinator::~ProcessCoordinator()
{
AutoLocker<BLocker> locker(&fLock);
for (int32 i = 0; i < fNodes.CountItems(); i++) {
AbstractProcessNode* node = fNodes.ItemAt(i);
node->Process()->SetListener(NULL);
delete node;
}
delete fMessage;
}
const BString&
ProcessCoordinator::Identifier() const
{
return fIdentifier;
}
void
ProcessCoordinator::SetListener(ProcessCoordinatorListener* listener)
{
fListener = listener;
}
void
ProcessCoordinator::AddNode(AbstractProcessNode* node)
{
AutoLocker<BLocker> locker(&fLock);
fNodes.AddItem(node);
node->SetListener(this);
node->Process()->SetListener(this);
}
void
ProcessCoordinator::ProcessChanged()
{
_CoordinateAndCallListener();
}
bool
ProcessCoordinator::IsRunning()
{
AutoLocker<BLocker> locker(&fLock);
for (int32 i = 0; i < fNodes.CountItems(); i++) {
AbstractProcessNode* node = fNodes.ItemAt(i);
if (node->IsRunning())
return true;
}
return false;
}
void
ProcessCoordinator::Start()
{
_CoordinateAndCallListener();
}
void
ProcessCoordinator::RequestStop()
{
AutoLocker<BLocker> locker(&fLock);
if (!fWasStopped) {
fWasStopped = true;
HDINFO("[Coordinator] will stop process coordinator");
for (int32 i = 0; i < fNodes.CountItems(); i++) {
AbstractProcessNode* node = fNodes.ItemAt(i);
HDINFO("[Coordinator] stopping process [%s]",
node->Process()->Name());
node->RequestStop();
}
}
}
status_t
ProcessCoordinator::ErrorStatus()
{
AutoLocker<BLocker> locker(&fLock);
for (int32 i = 0; i < fNodes.CountItems(); i++) {
status_t result = fNodes.ItemAt(i)->Process()->ErrorStatus();
if (result != B_OK)
return result;
}
return B_OK;
}
float
ProcessCoordinator::Progress()
{
AutoLocker<BLocker> locker(&fLock);
float result = 0.0f;
if (!fWasStopped) {
int32 count = fNodes.CountItems();
if (count == 1)
result = fNodes.ItemAt(0)->Process()->Progress();
else {
float progressPerNode = 1.0f / ((float) count);
for (int32 i = count - 1; i >= 0; i--) {
AbstractProcess* process = fNodes.ItemAt(i)->Process();
switch(process->ProcessState()) {
case PROCESS_INITIAL:
break;
case PROCESS_RUNNING:
result += (progressPerNode * fmaxf(
0.0f, fminf(1.0, process->Progress())));
break;
case PROCESS_COMPLETE:
result += progressPerNode;
break;
}
}
}
}
return result;
}
const BString&
ProcessCoordinator::Name() const
{
return fName;
}
BMessage*
ProcessCoordinator::Message() const
{
return fMessage;
}
BString
ProcessCoordinator::_CreateStatusMessage()
{
BString firstProcessDescription;
uint32 additionalRunningProcesses = 0;
for (int32 i = fNodes.CountItems() - 1; i >= 0; i--) {
AbstractProcess* process = fNodes.ItemAt(i)->Process();
if (process->ProcessState() == PROCESS_RUNNING) {
if (firstProcessDescription.IsEmpty()) {
if (strlen(process->Description()) != 0)
firstProcessDescription = process->Description();
else
additionalRunningProcesses++;
}
else
additionalRunningProcesses++;
}
}
if (firstProcessDescription.IsEmpty())
return "???";
if (additionalRunningProcesses == 0)
return firstProcessDescription;
static BStringFormat format(B_TRANSLATE(
"%FIRST_PROCESS_DESCRIPTION% +"
"{0, plural, one{# process} other{# processes}}"));
BString result;
format.Format(result, additionalRunningProcesses);
result.ReplaceAll("%FIRST_PROCESS_DESCRIPTION%", firstProcessDescription);
return result;
}
ProcessCoordinatorState
ProcessCoordinator::_CreateStatus()
{
return ProcessCoordinatorState(
this, Progress(), _CreateStatusMessage(), IsRunning(), ErrorStatus());
}
it will flag that when the coordinator has finished its current
coordination, it should initiate another coordination.
*/
void
ProcessCoordinator::_CoordinateAndCallListener()
{
if (fLock.LockWithTimeout(LOCK_TIMEOUT_MICROS) != B_OK) {
HDDEBUG("[Coordinator] would coordinate nodes, but coordination is "
"in progress - will defer");
AutoLocker<BLocker> locker(&fCoordinateAndCallListenerRerunLock);
fCoordinateAndCallListenerRerun = true;
return;
}
ProcessCoordinatorState state = _Coordinate();
if (fListener != NULL)
fListener->CoordinatorChanged(state);
fLock.Unlock();
bool coordinateAndCallListenerRerun = false;
{
AutoLocker<BLocker> locker(&fCoordinateAndCallListenerRerunLock);
coordinateAndCallListenerRerun = fCoordinateAndCallListenerRerun;
fCoordinateAndCallListenerRerun = false;
}
if (coordinateAndCallListenerRerun) {
HDDEBUG("[Coordinator] will run deferred coordination");
_CoordinateAndCallListener();
}
}
ProcessCoordinatorState
ProcessCoordinator::_Coordinate()
{
HDTRACE("[Coordinator] will coordinate nodes");
AutoLocker<BLocker> locker(&fLock);
_StopSuccessorNodesToErroredOrStoppedNodes();
for (int32 i = 0; i < fNodes.CountItems(); i++) {
AbstractProcessNode* node = fNodes.ItemAt(i);
if (node->Process()->ProcessState() == PROCESS_INITIAL) {
if (node->AllPredecessorsComplete())
node->Start();
else {
HDTRACE("[Coordinator] all predecessors not complete -> "
"[%s] not started", node->Process()->Name());
}
} else {
HDTRACE("[Coordinator] process [%s] running or complete",
node->Process()->Name());
}
}
return _CreateStatus();
}
void
ProcessCoordinator::_StopSuccessorNodesToErroredOrStoppedNodes()
{
for (int32 i = 0; i < fNodes.CountItems(); i++) {
AbstractProcessNode* node = fNodes.ItemAt(i);
AbstractProcess* process = node->Process();
if (process->WasStopped() || process->ErrorStatus() != B_OK)
_StopSuccessorNodes(node);
}
}
void
ProcessCoordinator::_StopSuccessorNodes(AbstractProcessNode* predecessorNode)
{
for (int32 i = 0; i < predecessorNode->CountSuccessors(); i++) {
AbstractProcessNode* node = predecessorNode->SuccessorAt(i);
AbstractProcess* process = node->Process();
if (process->ProcessState() == PROCESS_INITIAL) {
HDDEBUG("[Coordinator] [%s] (failed) --> [%s] (stopping)",
predecessorNode->Process()->Name(), process->Name());
node->RequestStop();
_StopSuccessorNodes(node);
}
}
}
int32
ProcessCoordinator::_CountNodesCompleted()
{
int32 nodesCompleted = 0;
for (int32 i = 0; i < fNodes.CountItems(); i++) {
AbstractProcess *process = fNodes.ItemAt(i)->Process();
if (process->ProcessState() == PROCESS_COMPLETE)
nodesCompleted++;
}
return nodesCompleted;
}
BString
ProcessCoordinator::LogReport()
{
BString result;
AutoLocker<BLocker> locker(&fLock);
for (int32 i = 0; i < fNodes.CountItems(); i++) {
if (0 != result.Length())
result.Append("\n");
AbstractProcessNode* node = fNodes.ItemAt(i);
result.Append(node->LogReport());
}
return result;
}