Added a message passing system based on lock free queues for inter-thread communication. Channels now buffer on the sending side, and no longer require blocking when sending. Lots of other refactoring and bug fixes.

This commit is contained in:
Michael Bebenita
2010-07-19 14:05:18 -07:00
parent 1f0656d908
commit 00d1465d13
35 changed files with 1498 additions and 838 deletions

View File

@@ -4,6 +4,24 @@
template class ptr_vec<rust_task>;
rust_message::rust_message(rust_dom *dom) : dom(dom) {
}
void rust_message::process() {
}
kill_task_message::kill_task_message(rust_dom *dom, rust_task *task) :
rust_message(dom), _task(task) {
}
void kill_task_message::process() {
_task->ref_count--;
_task->kill();
}
rust_dom::rust_dom(rust_srv *srv, rust_crate const *root_crate) :
interrupt_flag(0),
root_crate(root_crate),
@@ -80,6 +98,18 @@ rust_dom::activate(rust_task *task) {
curr_task = NULL;
}
void
rust_dom::log(rust_task *task, uint32_t type_bits, char const *fmt, ...) {
char buf[256];
if (_log.is_tracing(type_bits)) {
va_list args;
va_start(args, fmt);
vsnprintf(buf, sizeof(buf), fmt, args);
_log.trace_ln(task, type_bits, buf);
va_end(args);
}
}
void
rust_dom::log(uint32_t type_bits, char const *fmt, ...) {
char buf[256];
@@ -87,7 +117,7 @@ rust_dom::log(uint32_t type_bits, char const *fmt, ...) {
va_list args;
va_start(args, fmt);
vsnprintf(buf, sizeof(buf), fmt, args);
_log.trace_ln(type_bits, buf);
_log.trace_ln(NULL, type_bits, buf);
va_end(args);
}
}
@@ -189,7 +219,7 @@ rust_dom::remove_task_from_state_vec(ptr_vec<rust_task> *v, rust_task *task)
"removing task 0x%" PRIxPTR " in state '%s' from vec 0x%" PRIxPTR,
(uintptr_t)task, state_vec_name(v), (uintptr_t)v);
I(this, (*v)[task->idx] == task);
v->swapdel(task);
v->swap_delete(task);
}
const char *
@@ -203,25 +233,67 @@ rust_dom::state_vec_name(ptr_vec<rust_task> *v)
return "dead";
}
/**
* Delete any dead tasks.
*/
void
rust_dom::reap_dead_tasks()
{
rust_dom::reap_dead_tasks() {
for (size_t i = 0; i < dead_tasks.length(); ) {
rust_task *t = dead_tasks[i];
if (t == root_task || t->refcnt == 0) {
I(this, !t->waiting_tasks.length());
dead_tasks.swapdel(t);
rust_task *task = dead_tasks[i];
// log(rust_log::TASK, "dead task 0x%" PRIxPTR " with ref_count: %d",
// task, task->ref_count);
if (task->ref_count == 0) {
I(this, !task->waiting_tasks.length());
dead_tasks.swap_delete(task);
log(rust_log::TASK,
"deleting unreferenced dead task 0x%" PRIxPTR, t);
delete t;
"deleting unreferenced dead task 0x%" PRIxPTR, task);
delete task;
continue;
}
++i;
}
}
/**
* Enqueues a message in this domain's incoming message queue. It's the
* responsibility of the receiver to free the message once it's processed.
*/
void rust_dom::send_message(rust_message *message) {
log(rust_log::COMM, "enqueueing message 0x%" PRIxPTR
" in queue 0x%" PRIxPTR,
message,
&_incoming_message_queue);
_incoming_message_queue.enqueue(message);
_incoming_message_pending.signal();
}
/**
* Drains and processes incoming pending messages.
*/
void rust_dom::drain_incoming_message_queue() {
rust_message *message;
while ((message = (rust_message *) _incoming_message_queue.dequeue())) {
log(rust_log::COMM, "read 0x%" PRIxPTR
" from queue 0x%" PRIxPTR,
message,
&_incoming_message_queue);
log(rust_log::COMM, "processing incoming message 0x%" PRIxPTR,
message);
message->process();
delete message;
}
}
/**
* Schedules a running task for execution. Only running tasks can be
* activated. Blocked tasks have to be unblocked before they can be
* activated.
*
* Returns NULL if no tasks can be scheduled.
*/
rust_task *
rust_dom::sched()
rust_dom::schedule_task()
{
I(this, this);
// FIXME: in the face of failing tasks, this is not always right.
@@ -231,11 +303,88 @@ rust_dom::sched()
i %= running_tasks.length();
return (rust_task *)running_tasks[i];
}
log(rust_log::DOM|rust_log::TASK,
"no schedulable tasks");
// log(rust_log::DOM|rust_log::TASK, "no schedulable tasks");
return NULL;
}
/**
* Starts the main scheduler loop which performs task scheduling for this
* domain.
*
* Returns once no more tasks can be scheduled.
*/
int
rust_dom::start_main_loop()
{
// Make sure someone is watching, to pull us out of infinite loops.
rust_timer timer(this);
log(rust_log::DOM, "running main-loop on domain 0x%" PRIxPTR, this);
logptr("exit-task glue", root_crate->get_exit_task_glue());
while (n_live_tasks() > 0) {
rust_task *scheduled_task = schedule_task();
// If we cannot schedule a task because all other live tasks
// are blocked, wait on a condition variable which is signaled
// if progress is made in other domains.
if (scheduled_task == NULL) {
log(rust_log::TASK,
"all tasks are blocked, waiting for progress ...");
_progress.wait();
continue;
}
I(this, scheduled_task->running());
log(rust_log::TASK,
"activating task 0x%" PRIxPTR ", sp=x%" PRIxPTR,
(uintptr_t)scheduled_task, scheduled_task->rust_sp);
interrupt_flag = 0;
activate(scheduled_task);
log(rust_log::TASK,
"returned from task 0x%" PRIxPTR
" in state '%s', sp=0x%" PRIxPTR,
(uintptr_t)scheduled_task,
state_vec_name(scheduled_task->state),
scheduled_task->rust_sp);
I(this, scheduled_task->rust_sp >=
(uintptr_t) &scheduled_task->stk->data[0]);
I(this, scheduled_task->rust_sp < scheduled_task->stk->limit);
drain_incoming_message_queue();
reap_dead_tasks();
}
log(rust_log::DOM, "terminated scheduler loop, reaping dead tasks ...");
while (dead_tasks.length() > 0) {
log(rust_log::DOM,
"waiting for %d dead tasks to become dereferenced ...",
dead_tasks.length());
log(rust_log::DOM,
"waiting for %" PRIxPTR, dead_tasks[0]);
if (_incoming_message_queue.is_empty()) {
_incoming_message_pending.wait();
} else {
drain_incoming_message_queue();
}
reap_dead_tasks();
}
log(rust_log::DOM, "finished main-loop (dom.rval = %d)", rval);
return rval;
}
rust_crate_cache *
rust_dom::get_cache(rust_crate const *crate) {
log(rust_log::CACHE,