Conservatively serialize nearly all upcalls. Successfuly ran make check with RUST_THREADS=8, so we're probably fairly safe now. In the future we can relax the synchronization to get better performance.

This commit is contained in:
Eric Holk
2011-06-22 15:44:47 -07:00
committed by Graydon Hoare
parent 6367bcf427
commit 681c063ec0
7 changed files with 80 additions and 42 deletions

View File

@@ -391,12 +391,17 @@ task_yield(rust_task *task) {
extern "C" CDECL void extern "C" CDECL void
task_join(rust_task *task, rust_task *join_task) { task_join(rust_task *task, rust_task *join_task) {
task->dom->scheduler_lock.lock();
// If the other task is already dying, we don't have to wait for it. // If the other task is already dying, we don't have to wait for it.
if (join_task->dead() == false) { if (join_task->dead() == false) {
join_task->tasks_waiting_to_join.push(task); join_task->tasks_waiting_to_join.push(task);
task->block(join_task, "joining local task"); task->block(join_task, "joining local task");
task->dom->scheduler_lock.unlock();
task->yield(2); task->yield(2);
} }
else {
task->dom->scheduler_lock.unlock();
}
} }
/* Debug builtins for std.dbg. */ /* Debug builtins for std.dbg. */

View File

@@ -10,6 +10,7 @@ rust_chan::rust_chan(rust_task *task,
task(task), task(task),
port(port), port(port),
buffer(task->dom, unit_sz) { buffer(task->dom, unit_sz) {
++task->ref_count;
if (port) { if (port) {
associate(port); associate(port);
} }
@@ -23,6 +24,7 @@ rust_chan::~rust_chan() {
A(task->dom, is_associated() == false, A(task->dom, is_associated() == false,
"Channel must be disassociated before being freed."); "Channel must be disassociated before being freed.");
--task->ref_count;
} }
/** /**
@@ -31,10 +33,10 @@ rust_chan::~rust_chan() {
void rust_chan::associate(maybe_proxy<rust_port> *port) { void rust_chan::associate(maybe_proxy<rust_port> *port) {
this->port = port; this->port = port;
if (port->is_proxy() == false) { if (port->is_proxy() == false) {
scoped_lock sync(port->referent()->lock);
LOG(task, task, LOG(task, task,
"associating chan: 0x%" PRIxPTR " with port: 0x%" PRIxPTR, "associating chan: 0x%" PRIxPTR " with port: 0x%" PRIxPTR,
this, port); this, port);
++this->ref_count;
this->port->referent()->chans.push(this); this->port->referent()->chans.push(this);
} }
} }
@@ -50,10 +52,10 @@ void rust_chan::disassociate() {
A(task->dom, is_associated(), "Channel must be associated with a port."); A(task->dom, is_associated(), "Channel must be associated with a port.");
if (port->is_proxy() == false) { if (port->is_proxy() == false) {
scoped_lock sync(port->referent()->lock);
LOG(task, task, LOG(task, task,
"disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR, "disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR,
this, port->referent()); this, port->referent());
--this->ref_count;
port->referent()->chans.swap_delete(this); port->referent()->chans.swap_delete(this);
} }
@@ -83,7 +85,6 @@ void rust_chan::send(void *sptr) {
buffer.dequeue(NULL); buffer.dequeue(NULL);
} else { } else {
rust_port *target_port = port->referent(); rust_port *target_port = port->referent();
scoped_lock sync(target_port->lock);
if (target_port->task->blocked_on(target_port)) { if (target_port->task->blocked_on(target_port)) {
DLOG(dom, comm, "dequeued in rendezvous_ptr"); DLOG(dom, comm, "dequeued in rendezvous_ptr");
buffer.dequeue(target_port->task->rendezvous_ptr); buffer.dequeue(target_port->task->rendezvous_ptr);

View File

@@ -268,7 +268,7 @@ rust_dom::start_main_loop(int id) {
scheduler_lock.lock(); scheduler_lock.lock();
// Make sure someone is watching, to pull us out of infinite loops. // Make sure someone is watching, to pull us out of infinite loops.
rust_timer timer(this); //rust_timer timer(this);
DLOG(this, dom, "started domain loop %d", id); DLOG(this, dom, "started domain loop %d", id);
@@ -395,13 +395,13 @@ rust_dom::get_cache() {
rust_task * rust_task *
rust_dom::create_task(rust_task *spawner, const char *name) { rust_dom::create_task(rust_task *spawner, const char *name) {
scheduler_lock.lock(); //scheduler_lock.lock();
rust_task *task = rust_task *task =
new (this) rust_task (this, &newborn_tasks, spawner, name); new (this) rust_task (this, &newborn_tasks, spawner, name);
DLOG(this, task, "created task: " PTR ", spawner: %s, name: %s", DLOG(this, task, "created task: " PTR ", spawner: %s, name: %s",
task, spawner ? spawner->name : "null", name); task, spawner ? spawner->name : "null", name);
newborn_tasks.append(task); newborn_tasks.append(task);
scheduler_lock.unlock(); //scheduler_lock.unlock();
return task; return task;
} }

View File

@@ -13,8 +13,6 @@ public:
// Data sent to this port from remote tasks is buffered in this channel. // Data sent to this port from remote tasks is buffered in this channel.
rust_chan *remote_channel; rust_chan *remote_channel;
lock_and_signal lock;
rust_port(rust_task *task, size_t unit_sz); rust_port(rust_task *task, size_t unit_sz);
~rust_port(); ~rust_port();
void log_state(); void log_state();

View File

@@ -131,19 +131,24 @@ void task_start_wrapper(spawn_args *a)
int rval = 42; int rval = 42;
a->f(&rval, task, a->a3, a->a4); a->f(&rval, task, a->a3, a->a4);
LOG(task, task, "task exited with value %d", rval); LOG(task, task, "task exited with value %d", rval);
// TODO: the old exit glue does some magical argument copying stuff. This {
// is probably still needed. scoped_lock with(task->dom->scheduler_lock);
// TODO: the old exit glue does some magical argument copying
// stuff. This is probably still needed.
// This is duplicated from upcall_exit, which is probably dead code by // This is duplicated from upcall_exit, which is probably dead code by
// now. // now.
LOG(task, task, "task ref_count: %d", task->ref_count); LOG(task, task, "task ref_count: %d", task->ref_count);
A(task->dom, task->ref_count >= 0, A(task->dom, task->ref_count >= 0,
"Task ref_count should not be negative on exit!"); "Task ref_count should not be negative on exit!");
task->die(); task->die();
task->notify_tasks_waiting_to_join(); task->notify_tasks_waiting_to_join();
}
task->yield(1); task->yield(1);
} }
@@ -154,6 +159,9 @@ rust_task::start(uintptr_t spawnee_fn,
LOGPTR(dom, "from spawnee", spawnee_fn); LOGPTR(dom, "from spawnee", spawnee_fn);
I(dom, stk->data != NULL); I(dom, stk->data != NULL);
I(dom, !dom->scheduler_lock.lock_held_by_current_thread());
scoped_lock with(dom->scheduler_lock);
char *sp = (char *)rust_sp; char *sp = (char *)rust_sp;
@@ -405,7 +413,7 @@ rust_task::free(void *p, bool is_gc)
void void
rust_task::transition(rust_task_list *src, rust_task_list *dst) { rust_task::transition(rust_task_list *src, rust_task_list *dst) {
scoped_lock sync(dom->scheduler_lock); I(dom, dom->scheduler_lock.lock_held_by_current_thread());
DLOG(dom, task, DLOG(dom, task,
"task %s " PTR " state change '%s' -> '%s' while in '%s'", "task %s " PTR " state change '%s' -> '%s' while in '%s'",
name, (uintptr_t)this, src->name, dst->name, state->name); name, (uintptr_t)this, src->name, dst->name, state->name);

View File

@@ -23,6 +23,7 @@ str_buf(rust_task *task, rust_str *s);
extern "C" void extern "C" void
upcall_grow_task(rust_task *task, size_t n_frame_bytes) { upcall_grow_task(rust_task *task, size_t n_frame_bytes) {
I(task->dom, false);
LOG_UPCALL_ENTRY(task); LOG_UPCALL_ENTRY(task);
task->grow(n_frame_bytes); task->grow(n_frame_bytes);
} }
@@ -74,6 +75,7 @@ extern "C" CDECL rust_port*
upcall_new_port(rust_task *task, size_t unit_sz) { upcall_new_port(rust_task *task, size_t unit_sz) {
LOG_UPCALL_ENTRY(task); LOG_UPCALL_ENTRY(task);
rust_dom *dom = task->dom; rust_dom *dom = task->dom;
scoped_lock with(dom->scheduler_lock);
LOG(task, comm, "upcall_new_port(task=0x%" PRIxPTR " (%s), unit_sz=%d)", LOG(task, comm, "upcall_new_port(task=0x%" PRIxPTR " (%s), unit_sz=%d)",
(uintptr_t) task, task->name, unit_sz); (uintptr_t) task, task->name, unit_sz);
return new (dom) rust_port(task, unit_sz); return new (dom) rust_port(task, unit_sz);
@@ -82,6 +84,7 @@ upcall_new_port(rust_task *task, size_t unit_sz) {
extern "C" CDECL void extern "C" CDECL void
upcall_del_port(rust_task *task, rust_port *port) { upcall_del_port(rust_task *task, rust_port *port) {
LOG_UPCALL_ENTRY(task); LOG_UPCALL_ENTRY(task);
scoped_lock with(task->dom->scheduler_lock);
LOG(task, comm, "upcall del_port(0x%" PRIxPTR ")", (uintptr_t) port); LOG(task, comm, "upcall del_port(0x%" PRIxPTR ")", (uintptr_t) port);
I(task->dom, !port->ref_count); I(task->dom, !port->ref_count);
delete port; delete port;
@@ -121,6 +124,7 @@ upcall_flush_chan(rust_task *task, rust_chan *chan) {
extern "C" CDECL extern "C" CDECL
void upcall_del_chan(rust_task *task, rust_chan *chan) { void upcall_del_chan(rust_task *task, rust_chan *chan) {
LOG_UPCALL_ENTRY(task); LOG_UPCALL_ENTRY(task);
scoped_lock with(task->dom->scheduler_lock);
LOG(task, comm, "upcall del_chan(0x%" PRIxPTR ")", (uintptr_t) chan); LOG(task, comm, "upcall del_chan(0x%" PRIxPTR ")", (uintptr_t) chan);
@@ -143,7 +147,7 @@ void upcall_del_chan(rust_task *task, rust_chan *chan) {
// here is that we can get ourselves in a deadlock if the // here is that we can get ourselves in a deadlock if the
// parent task tries to join us. // parent task tries to join us.
// //
// 2. We can leave the channel in a "dormnat" state by not freeing // 2. We can leave the channel in a "dormant" state by not freeing
// it and letting the receiver task delete it for us instead. // it and letting the receiver task delete it for us instead.
if (chan->buffer.is_empty() == false) { if (chan->buffer.is_empty() == false) {
return; return;
@@ -162,6 +166,7 @@ extern "C" CDECL rust_chan *
upcall_clone_chan(rust_task *task, maybe_proxy<rust_task> *target, upcall_clone_chan(rust_task *task, maybe_proxy<rust_task> *target,
rust_chan *chan) { rust_chan *chan) {
LOG_UPCALL_ENTRY(task); LOG_UPCALL_ENTRY(task);
scoped_lock with(task->dom->scheduler_lock);
size_t unit_sz = chan->buffer.unit_sz; size_t unit_sz = chan->buffer.unit_sz;
maybe_proxy<rust_port> *port = chan->port; maybe_proxy<rust_port> *port = chan->port;
rust_task *target_task = NULL; rust_task *target_task = NULL;
@@ -203,28 +208,30 @@ upcall_sleep(rust_task *task, size_t time_in_us) {
extern "C" CDECL void extern "C" CDECL void
upcall_send(rust_task *task, rust_chan *chan, void *sptr) { upcall_send(rust_task *task, rust_chan *chan, void *sptr) {
LOG_UPCALL_ENTRY(task); LOG_UPCALL_ENTRY(task);
scoped_lock with(task->dom->scheduler_lock);
chan->send(sptr); chan->send(sptr);
LOG(task, comm, "=== sent data ===>"); LOG(task, comm, "=== sent data ===>");
} }
extern "C" CDECL void extern "C" CDECL void
upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) { upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) {
LOG_UPCALL_ENTRY(task);
LOG(task, comm, "port: 0x%" PRIxPTR ", dptr: 0x%" PRIxPTR
", size: 0x%" PRIxPTR ", chan_no: %d",
(uintptr_t) port, (uintptr_t) dptr, port->unit_sz,
port->chans.length());
if (port->receive(dptr)) {
return;
}
// No data was buffered on any incoming channel, so block this task
// on the port. Remember the rendezvous location so that any sender
// task can write to it before waking up this task.
{ {
scoped_lock sync(port->lock); LOG_UPCALL_ENTRY(task);
scoped_lock with(task->dom->scheduler_lock);
LOG(task, comm, "port: 0x%" PRIxPTR ", dptr: 0x%" PRIxPTR
", size: 0x%" PRIxPTR ", chan_no: %d",
(uintptr_t) port, (uintptr_t) dptr, port->unit_sz,
port->chans.length());
if (port->receive(dptr)) {
return;
}
// No data was buffered on any incoming channel, so block this task
// on the port. Remember the rendezvous location so that any sender
// task can write to it before waking up this task.
LOG(task, comm, "<=== waiting for rendezvous data ==="); LOG(task, comm, "<=== waiting for rendezvous data ===");
task->rendezvous_ptr = dptr; task->rendezvous_ptr = dptr;
task->block(port, "waiting for rendezvous data"); task->block(port, "waiting for rendezvous data");
@@ -248,6 +255,7 @@ upcall_fail(rust_task *task,
extern "C" CDECL void extern "C" CDECL void
upcall_kill(rust_task *task, maybe_proxy<rust_task> *target) { upcall_kill(rust_task *task, maybe_proxy<rust_task> *target) {
LOG_UPCALL_ENTRY(task); LOG_UPCALL_ENTRY(task);
scoped_lock with(task->dom->scheduler_lock);
if (target->is_proxy()) { if (target->is_proxy()) {
notify_message:: notify_message::
send(notify_message::KILL, "kill", task->get_handle(), send(notify_message::KILL, "kill", task->get_handle(),
@@ -264,18 +272,22 @@ upcall_kill(rust_task *task, maybe_proxy<rust_task> *target) {
*/ */
extern "C" CDECL void extern "C" CDECL void
upcall_exit(rust_task *task) { upcall_exit(rust_task *task) {
LOG_UPCALL_ENTRY(task); {
LOG(task, task, "task ref_count: %d", task->ref_count); LOG_UPCALL_ENTRY(task);
A(task->dom, task->ref_count >= 0, scoped_lock with(task->dom->scheduler_lock);
"Task ref_count should not be negative on exit!"); LOG(task, task, "task ref_count: %d", task->ref_count);
task->die(); A(task->dom, task->ref_count >= 0,
task->notify_tasks_waiting_to_join(); "Task ref_count should not be negative on exit!");
task->die();
task->notify_tasks_waiting_to_join();
}
task->yield(1); task->yield(1);
} }
extern "C" CDECL uintptr_t extern "C" CDECL uintptr_t
upcall_malloc(rust_task *task, size_t nbytes, type_desc *td) { upcall_malloc(rust_task *task, size_t nbytes, type_desc *td) {
LOG_UPCALL_ENTRY(task); LOG_UPCALL_ENTRY(task);
scoped_lock with(task->dom->scheduler_lock);
LOG(task, mem, LOG(task, mem,
"upcall malloc(%" PRIdPTR ", 0x%" PRIxPTR ")" "upcall malloc(%" PRIdPTR ", 0x%" PRIxPTR ")"
@@ -296,6 +308,7 @@ upcall_malloc(rust_task *task, size_t nbytes, type_desc *td) {
extern "C" CDECL void extern "C" CDECL void
upcall_free(rust_task *task, void* ptr, uintptr_t is_gc) { upcall_free(rust_task *task, void* ptr, uintptr_t is_gc) {
LOG_UPCALL_ENTRY(task); LOG_UPCALL_ENTRY(task);
scoped_lock with(task->dom->scheduler_lock);
rust_dom *dom = task->dom; rust_dom *dom = task->dom;
DLOG(dom, mem, DLOG(dom, mem,
"upcall free(0x%" PRIxPTR ", is_gc=%" PRIdPTR ")", "upcall free(0x%" PRIxPTR ", is_gc=%" PRIdPTR ")",
@@ -306,6 +319,7 @@ upcall_free(rust_task *task, void* ptr, uintptr_t is_gc) {
extern "C" CDECL uintptr_t extern "C" CDECL uintptr_t
upcall_mark(rust_task *task, void* ptr) { upcall_mark(rust_task *task, void* ptr) {
LOG_UPCALL_ENTRY(task); LOG_UPCALL_ENTRY(task);
scoped_lock with(task->dom->scheduler_lock);
rust_dom *dom = task->dom; rust_dom *dom = task->dom;
if (ptr) { if (ptr) {
@@ -336,6 +350,7 @@ rust_str *make_str(rust_task *task, char const *s, size_t fill) {
extern "C" CDECL rust_str * extern "C" CDECL rust_str *
upcall_new_str(rust_task *task, char const *s, size_t fill) { upcall_new_str(rust_task *task, char const *s, size_t fill) {
LOG_UPCALL_ENTRY(task); LOG_UPCALL_ENTRY(task);
scoped_lock with(task->dom->scheduler_lock);
return make_str(task, s, fill); return make_str(task, s, fill);
} }
@@ -343,6 +358,7 @@ upcall_new_str(rust_task *task, char const *s, size_t fill) {
extern "C" CDECL rust_str * extern "C" CDECL rust_str *
upcall_dup_str(rust_task *task, rust_str *str) { upcall_dup_str(rust_task *task, rust_str *str) {
LOG_UPCALL_ENTRY(task); LOG_UPCALL_ENTRY(task);
scoped_lock with(task->dom->scheduler_lock);
return make_str(task, (char const *)str->data, str->fill); return make_str(task, (char const *)str->data, str->fill);
} }
@@ -350,6 +366,7 @@ upcall_dup_str(rust_task *task, rust_str *str) {
extern "C" CDECL rust_vec * extern "C" CDECL rust_vec *
upcall_new_vec(rust_task *task, size_t fill, type_desc *td) { upcall_new_vec(rust_task *task, size_t fill, type_desc *td) {
LOG_UPCALL_ENTRY(task); LOG_UPCALL_ENTRY(task);
scoped_lock with(task->dom->scheduler_lock);
rust_dom *dom = task->dom; rust_dom *dom = task->dom;
DLOG(dom, mem, "upcall new_vec(%" PRIdPTR ")", fill); DLOG(dom, mem, "upcall new_vec(%" PRIdPTR ")", fill);
size_t alloc = next_power_of_two(sizeof(rust_vec) + fill); size_t alloc = next_power_of_two(sizeof(rust_vec) + fill);
@@ -454,6 +471,7 @@ upcall_vec_append(rust_task *task, type_desc *t, type_desc *elem_t,
rust_vec **dst_ptr, rust_vec *src, bool skip_null) rust_vec **dst_ptr, rust_vec *src, bool skip_null)
{ {
LOG_UPCALL_ENTRY(task); LOG_UPCALL_ENTRY(task);
scoped_lock with(task->dom->scheduler_lock);
rust_vec *dst = *dst_ptr; rust_vec *dst = *dst_ptr;
uintptr_t need_copy; uintptr_t need_copy;
size_t n_src_bytes = skip_null ? src->fill - 1 : src->fill; size_t n_src_bytes = skip_null ? src->fill - 1 : src->fill;
@@ -483,6 +501,7 @@ upcall_get_type_desc(rust_task *task,
size_t n_descs, size_t n_descs,
type_desc const **descs) { type_desc const **descs) {
LOG_UPCALL_ENTRY(task); LOG_UPCALL_ENTRY(task);
scoped_lock with(task->dom->scheduler_lock);
LOG(task, cache, "upcall get_type_desc with size=%" PRIdPTR LOG(task, cache, "upcall get_type_desc with size=%" PRIdPTR
", align=%" PRIdPTR ", %" PRIdPTR " descs", size, align, ", align=%" PRIdPTR ", %" PRIdPTR " descs", size, align,
n_descs); n_descs);
@@ -496,6 +515,7 @@ extern "C" CDECL rust_task *
upcall_new_task(rust_task *spawner, rust_vec *name) { upcall_new_task(rust_task *spawner, rust_vec *name) {
// name is a rust string structure. // name is a rust string structure.
LOG_UPCALL_ENTRY(spawner); LOG_UPCALL_ENTRY(spawner);
scoped_lock with(spawner->dom->scheduler_lock);
rust_dom *dom = spawner->dom; rust_dom *dom = spawner->dom;
rust_task *task = dom->create_task(spawner, (const char *)name->data); rust_task *task = dom->create_task(spawner, (const char *)name->data);
return task; return task;
@@ -535,6 +555,7 @@ upcall_start_task(rust_task *spawner,
*/ */
extern "C" CDECL maybe_proxy<rust_task> * extern "C" CDECL maybe_proxy<rust_task> *
upcall_new_thread(rust_task *task, const char *name) { upcall_new_thread(rust_task *task, const char *name) {
I(task->dom, false);
LOG_UPCALL_ENTRY(task); LOG_UPCALL_ENTRY(task);
rust_dom *parent_dom = task->dom; rust_dom *parent_dom = task->dom;
rust_kernel *kernel = parent_dom->kernel; rust_kernel *kernel = parent_dom->kernel;
@@ -583,6 +604,7 @@ upcall_start_thread(rust_task *task,
rust_proxy<rust_task> *child_task_proxy, rust_proxy<rust_task> *child_task_proxy,
uintptr_t spawnee_fn, uintptr_t spawnee_fn,
size_t callsz) { size_t callsz) {
I(task->dom, false);
LOG_UPCALL_ENTRY(task); LOG_UPCALL_ENTRY(task);
#if 0 #if 0
rust_dom *parenet_dom = task->dom; rust_dom *parenet_dom = task->dom;
@@ -615,6 +637,7 @@ extern "C" CDECL void
upcall_ivec_resize(rust_task *task, upcall_ivec_resize(rust_task *task,
rust_ivec *v, rust_ivec *v,
size_t newsz) { size_t newsz) {
scoped_lock with(task->dom->scheduler_lock);
I(task->dom, !v->fill); I(task->dom, !v->fill);
size_t new_alloc = next_power_of_two(newsz); size_t new_alloc = next_power_of_two(newsz);
@@ -633,6 +656,7 @@ extern "C" CDECL void
upcall_ivec_spill(rust_task *task, upcall_ivec_spill(rust_task *task,
rust_ivec *v, rust_ivec *v,
size_t newsz) { size_t newsz) {
scoped_lock with(task->dom->scheduler_lock);
size_t new_alloc = next_power_of_two(newsz); size_t new_alloc = next_power_of_two(newsz);
rust_ivec_heap *heap_part = (rust_ivec_heap *) rust_ivec_heap *heap_part = (rust_ivec_heap *)

View File

@@ -21,7 +21,9 @@ lock_and_signal::lock_and_signal() {
} }
#else #else
lock_and_signal::lock_and_signal() { lock_and_signal::lock_and_signal()
: _locked(false)
{
CHECKED(pthread_cond_init(&_cond, NULL)); CHECKED(pthread_cond_init(&_cond, NULL));
CHECKED(pthread_mutex_init(&_mutex, NULL)); CHECKED(pthread_mutex_init(&_mutex, NULL));
} }