#include "gold.h"
#include "debug.h"
#include "options.h"
#include "timer.h"
#include "workqueue.h"
#include "workqueue-internal.h"
namespace gold
{
inline void
Task_list::push_back(Task* t)
{
gold_assert(t->list_next() == NULL);
if (this->head_ == NULL)
{
this->head_ = t;
this->tail_ = t;
}
else
{
this->tail_->set_list_next(t);
this->tail_ = t;
}
}
inline void
Task_list::push_front(Task* t)
{
gold_assert(t->list_next() == NULL);
if (this->head_ == NULL)
{
this->head_ = t;
this->tail_ = t;
}
else
{
t->set_list_next(this->head_);
this->head_ = t;
}
}
inline Task*
Task_list::pop_front()
{
Task* ret = this->head_;
if (ret != NULL)
{
if (ret == this->tail_)
{
gold_assert(ret->list_next() == NULL);
this->head_ = NULL;
this->tail_ = NULL;
}
else
{
this->head_ = ret->list_next();
gold_assert(this->head_ != NULL);
ret->clear_list_next();
}
}
return ret;
}
class Workqueue_threader_single : public Workqueue_threader
{
public:
Workqueue_threader_single(Workqueue* workqueue)
: Workqueue_threader(workqueue)
{ }
~Workqueue_threader_single()
{ }
void
set_thread_count(int thread_count)
{ gold_assert(thread_count > 0); }
bool
should_cancel_thread(int)
{ return false; }
};
Workqueue::Workqueue(const General_options& options)
: lock_(),
first_tasks_(),
tasks_(),
running_(0),
waiting_(0),
condvar_(this->lock_),
threader_(NULL)
{
bool threads = options.threads();
#ifndef ENABLE_THREADS
threads = false;
#endif
if (!threads)
this->threader_ = new Workqueue_threader_single(this);
else
{
#ifdef ENABLE_THREADS
this->threader_ = new Workqueue_threader_threadpool(this);
#else
gold_unreachable();
#endif
}
}
Workqueue::~Workqueue()
{
}
void
Workqueue::add_to_queue(Task_list* queue, Task* t, bool front)
{
Hold_lock hl(this->lock_);
Task_token* token = t->is_runnable();
if (token != NULL)
{
if (front)
token->add_waiting_front(t);
else
token->add_waiting(t);
++this->waiting_;
}
else
{
if (front)
queue->push_front(t);
else
queue->push_back(t);
this->condvar_.signal();
}
}
void
Workqueue::queue(Task* t)
{
this->add_to_queue(&this->tasks_, t, false);
}
void
Workqueue::queue_soon(Task* t)
{
t->set_should_run_soon();
this->add_to_queue(&this->first_tasks_, t, false);
}
void
Workqueue::queue_next(Task* t)
{
t->set_should_run_soon();
this->add_to_queue(&this->first_tasks_, t, true);
}
inline bool
Workqueue::should_cancel_thread(int thread_number)
{
return this->threader_->should_cancel_thread(thread_number);
}
Task*
Workqueue::find_runnable_in_list(Task_list* tasks)
{
Task* t;
while ((t = tasks->pop_front()) != NULL)
{
Task_token* token = t->is_runnable();
if (token == NULL)
return t;
token->add_waiting(t);
++this->waiting_;
}
return NULL;
}
Task*
Workqueue::find_runnable()
{
Task* t = this->find_runnable_in_list(&this->first_tasks_);
if (t == NULL)
t = this->find_runnable_in_list(&this->tasks_);
return t;
}
Task*
Workqueue::find_runnable_or_wait(int thread_number)
{
Task* t = this->find_runnable();
while (t == NULL)
{
if (this->running_ == 0
&& this->first_tasks_.empty()
&& this->tasks_.empty())
{
this->condvar_.broadcast();
gold_assert(this->waiting_ == 0);
return NULL;
}
if (this->should_cancel_thread(thread_number))
return NULL;
gold_debug(DEBUG_TASK, "%3d sleeping", thread_number);
this->condvar_.wait();
gold_debug(DEBUG_TASK, "%3d awake", thread_number);
t = this->find_runnable();
}
return t;
}
bool
Workqueue::find_and_run_task(int thread_number)
{
Task* t;
Task_locker tl;
{
Hold_lock hl(this->lock_);
t = this->find_runnable_or_wait(thread_number);
if (t == NULL)
return false;
t->locks(&tl);
++this->running_;
}
while (t != NULL)
{
gold_debug(DEBUG_TASK, "%3d running task %s", thread_number,
t->name().c_str());
Timer timer;
if (is_debugging_enabled(DEBUG_TASK))
timer.start();
t->run(this);
if (is_debugging_enabled(DEBUG_TASK))
{
Timer::TimeStats elapsed = timer.get_elapsed_time();
gold_debug(DEBUG_TASK,
"%3d completed task %s "
"(user: %ld.%06ld sys: %ld.%06ld wall: %ld.%06ld)",
thread_number, t->name().c_str(),
elapsed.user / 1000, (elapsed.user % 1000) * 1000,
elapsed.sys / 1000, (elapsed.sys % 1000) * 1000,
elapsed.wall / 1000, (elapsed.wall % 1000) * 1000);
}
Task* next;
{
Hold_lock hl(this->lock_);
--this->running_;
next = this->release_locks(t, &tl);
if (next == NULL)
next = this->find_runnable();
if (next != NULL)
{
tl.clear();
next->locks(&tl);
++this->running_;
}
}
delete t;
t = next;
}
return true;
}
bool
Workqueue::return_or_queue(Task* t, bool is_blocker, Task** pret)
{
Task_token* token = t->is_runnable();
if (token != NULL)
{
token->add_waiting(t);
++this->waiting_;
return false;
}
bool should_queue = false;
bool should_return = false;
if (*pret != NULL)
should_queue = true;
else if (!is_blocker)
should_return = true;
else if (t->should_run_soon())
should_return = true;
else if (!this->first_tasks_.empty() || !this->tasks_.empty())
should_queue = true;
else
should_return = true;
if (should_return)
{
gold_assert(*pret == NULL);
*pret = t;
return true;
}
else if (should_queue)
{
if (t->should_run_soon())
this->first_tasks_.push_back(t);
else
this->tasks_.push_back(t);
this->condvar_.signal();
return false;
}
gold_unreachable();
}
Task*
Workqueue::release_locks(Task* t, Task_locker* tl)
{
Task* ret = NULL;
for (Task_locker::iterator p = tl->begin(); p != tl->end(); ++p)
{
Task_token* token = *p;
if (token->is_blocker())
{
if (token->remove_blocker())
{
Task* t;
while ((t = token->remove_first_waiting()) != NULL)
{
--this->waiting_;
this->return_or_queue(t, true, &ret);
}
}
}
else
{
token->remove_writer(t);
Task* t;
while ((t = token->remove_first_waiting()) != NULL)
{
--this->waiting_;
if (this->return_or_queue(t, false, &ret))
break;
}
}
}
return ret;
}
void
Workqueue::process(int thread_number)
{
while (this->find_and_run_task(thread_number))
;
}
void
Workqueue::set_thread_count(int threads)
{
Hold_lock hl(this->lock_);
this->threader_->set_thread_count(threads);
this->condvar_.broadcast();
}
void
Workqueue::add_blocker(Task_token* token)
{
Hold_lock hl(this->lock_);
token->add_blocker();
}
}