From 8acadb17c2d679291aa94e94af8cc96513cab830 Mon Sep 17 00:00:00 2001 From: Eric Holk Date: Thu, 7 Jul 2011 11:53:08 -0700 Subject: [PATCH] Work on debugging race conditions. Ports and channels have been moved to the kernel pool, since they've been known to outlive their associated task. This probably isn't the right thing to do, the life cycle needs fixed instead. Some refactorying in memory_region.cpp. Added a helper function to increment and decrement the allocation counter. This makes it easier to switch between atomic and non-atomic increments. Using atomic increments for now, although this still does not fix the problem. --- src/rt/memory_region.cpp | 23 ++++++++++++++------ src/rt/memory_region.h | 3 +++ src/rt/rust_chan.cpp | 14 +++++++------ src/rt/rust_chan.h | 3 ++- src/rt/rust_port.cpp | 8 +++---- src/rt/rust_port.h | 5 ++++- src/rt/rust_task.cpp | 3 +++ src/rt/rust_task.h | 2 ++ src/rt/rust_upcall.cpp | 45 ++++++++++++++++++++++++---------------- src/rt/sync/sync.h | 21 +++++++++++++++++++ 10 files changed, 91 insertions(+), 36 deletions(-) diff --git a/src/rt/memory_region.cpp b/src/rt/memory_region.cpp index 6c50bf42d8c8..809ac81f716b 100644 --- a/src/rt/memory_region.cpp +++ b/src/rt/memory_region.cpp @@ -19,8 +19,18 @@ memory_region::memory_region(memory_region *parent) : // Nop. } +void memory_region::add_alloc() { + //_live_allocations++; + sync::increment(_live_allocations); +} + +void memory_region::dec_alloc() { + //_live_allocations--; + sync::decrement(_live_allocations); +} + void memory_region::free(void *mem) { - // printf("free: ptr 0x%" PRIxPTR"\n", (uintptr_t) mem); + // printf("free: ptr 0x%" PRIxPTR" region=%p\n", (uintptr_t) mem, this); if (!mem) { return; } if (_synchronized) { _lock.lock(); } #ifdef TRACK_ALLOCATIONS @@ -33,7 +43,7 @@ void memory_region::free(void *mem) { if (_live_allocations < 1) { _srv->fatal("live_allocs < 1", __FILE__, __LINE__, ""); } - _live_allocations--; + dec_alloc(); _srv->free(mem); if (_synchronized) { _lock.unlock(); } } @@ -42,7 +52,7 @@ void * memory_region::realloc(void *mem, size_t size) { if (_synchronized) { _lock.lock(); } if (!mem) { - _live_allocations++; + add_alloc(); } void *newMem = _srv->realloc(mem, size); #ifdef TRACK_ALLOCATIONS @@ -59,12 +69,13 @@ memory_region::realloc(void *mem, size_t size) { void * memory_region::malloc(size_t size) { if (_synchronized) { _lock.lock(); } - _live_allocations++; + add_alloc(); void *mem = _srv->malloc(size); #ifdef TRACK_ALLOCATIONS _allocation_list.append(mem); #endif - // printf("malloc: ptr 0x%" PRIxPTR "\n", (uintptr_t) mem); + // printf("malloc: ptr 0x%" PRIxPTR " region=%p\n", + // (uintptr_t) mem, this); if (_synchronized) { _lock.unlock(); } return mem; } @@ -72,7 +83,7 @@ memory_region::malloc(size_t size) { void * memory_region::calloc(size_t size) { if (_synchronized) { _lock.lock(); } - _live_allocations++; + add_alloc(); void *mem = _srv->malloc(size); memset(mem, 0, size); #ifdef TRACK_ALLOCATIONS diff --git a/src/rt/memory_region.h b/src/rt/memory_region.h index a72faf76338e..36b2e1a41640 100644 --- a/src/rt/memory_region.h +++ b/src/rt/memory_region.h @@ -22,6 +22,9 @@ private: const bool _detailed_leaks; const bool _synchronized; lock_and_signal _lock; + + void add_alloc(); + void dec_alloc(); public: memory_region(rust_srv *srv, bool synchronized); memory_region(memory_region *parent); diff --git a/src/rt/rust_chan.cpp b/src/rt/rust_chan.cpp index 78301e3d879d..141d60a3e9c1 100644 --- a/src/rt/rust_chan.cpp +++ b/src/rt/rust_chan.cpp @@ -6,11 +6,12 @@ */ rust_chan::rust_chan(rust_task *task, maybe_proxy *port, - size_t unit_sz) : - ref_count(1), - task(task), - port(port), - buffer(task, unit_sz) { + size_t unit_sz) + : ref_count(1), + kernel(task->kernel), + task(task), + port(port), + buffer(task, unit_sz) { ++task->ref_count; if (port) { associate(port); @@ -87,6 +88,7 @@ void rust_chan::send(void *sptr) { buffer.dequeue(NULL); } else { rust_port *target_port = port->referent(); + scoped_lock right(target_port->lock); if (target_port->task->blocked_on(target_port)) { DLOG(sched, comm, "dequeued in rendezvous_ptr"); buffer.dequeue(target_port->task->rendezvous_ptr); @@ -114,7 +116,7 @@ rust_chan *rust_chan::clone(maybe_proxy *target) { port = proxy; target_task = target->as_proxy()->handle()->referent(); } - return new (target_task) rust_chan(target_task, port, unit_sz); + return new (target_task->kernel) rust_chan(target_task, port, unit_sz); } /** diff --git a/src/rt/rust_chan.h b/src/rt/rust_chan.h index 4172ca5f09d8..752667b8a3c6 100644 --- a/src/rt/rust_chan.h +++ b/src/rt/rust_chan.h @@ -1,7 +1,7 @@ #ifndef RUST_CHAN_H #define RUST_CHAN_H -class rust_chan : public task_owned, +class rust_chan : public kernel_owned, public rust_cond { public: RUST_REFCOUNTED_WITH_DTOR(rust_chan, destroy()) @@ -9,6 +9,7 @@ public: ~rust_chan(); + rust_kernel *kernel; rust_task *task; maybe_proxy *port; size_t idx; diff --git a/src/rt/rust_port.cpp b/src/rt/rust_port.cpp index a2bd3b34c38d..0aebda859f88 100644 --- a/src/rt/rust_port.cpp +++ b/src/rt/rust_port.cpp @@ -1,16 +1,16 @@ #include "rust_internal.h" #include "rust_port.h" -rust_port::rust_port(rust_task *task, size_t unit_sz) : - maybe_proxy(this), task(task), - unit_sz(unit_sz), writers(task), chans(task) { +rust_port::rust_port(rust_task *task, size_t unit_sz) + : maybe_proxy(this), kernel(task->kernel), task(task), + unit_sz(unit_sz), writers(task), chans(task) { LOG(task, comm, "new rust_port(task=0x%" PRIxPTR ", unit_sz=%d) -> port=0x%" PRIxPTR, (uintptr_t)task, unit_sz, (uintptr_t)this); // Allocate a remote channel, for remote channel data. - remote_channel = new (task) rust_chan(task, this, unit_sz); + remote_channel = new (task->kernel) rust_chan(task, this, unit_sz); } rust_port::~rust_port() { diff --git a/src/rt/rust_port.h b/src/rt/rust_port.h index 7a58f839c44f..301422cc765d 100644 --- a/src/rt/rust_port.h +++ b/src/rt/rust_port.h @@ -2,9 +2,10 @@ #define RUST_PORT_H class rust_port : public maybe_proxy, - public task_owned { + public kernel_owned { public: + rust_kernel *kernel; rust_task *task; size_t unit_sz; ptr_vec writers; @@ -13,6 +14,8 @@ public: // Data sent to this port from remote tasks is buffered in this channel. rust_chan *remote_channel; + lock_and_signal lock; + rust_port(rust_task *task, size_t unit_sz); ~rust_port(); void log_state(); diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp index 721f5b0ed203..6c7ab5540021 100644 --- a/src/rt/rust_task.cpp +++ b/src/rt/rust_task.cpp @@ -401,6 +401,7 @@ rust_task::transition(rust_task_list *src, rust_task_list *dst) { void rust_task::block(rust_cond *on, const char* name) { + scoped_lock with(lock); LOG(this, task, "Blocking on 0x%" PRIxPTR ", cond: 0x%" PRIxPTR, (uintptr_t) on, (uintptr_t) cond); A(sched, cond == NULL, "Cannot block an already blocked task."); @@ -413,6 +414,7 @@ rust_task::block(rust_cond *on, const char* name) { void rust_task::wakeup(rust_cond *from) { + scoped_lock with(lock); A(sched, cond != NULL, "Cannot wake up unblocked task."); LOG(this, task, "Blocked on 0x%" PRIxPTR " woken up on 0x%" PRIxPTR, (uintptr_t) cond, (uintptr_t) from); @@ -430,6 +432,7 @@ rust_task::wakeup(rust_cond *from) { void rust_task::die() { + scoped_lock with(lock); transition(&sched->running_tasks, &sched->dead_tasks); } diff --git a/src/rt/rust_task.h b/src/rt/rust_task.h index e15f105222bf..c20eae7ece85 100644 --- a/src/rt/rust_task.h +++ b/src/rt/rust_task.h @@ -89,6 +89,8 @@ rust_task : public maybe_proxy, wakeup_callback *_on_wakeup; + lock_and_signal lock; + // Only a pointer to 'name' is kept, so it must live as long as this task. rust_task(rust_scheduler *sched, rust_task_list *state, diff --git a/src/rt/rust_upcall.cpp b/src/rt/rust_upcall.cpp index d89b278790b8..383e856f69a6 100644 --- a/src/rt/rust_upcall.cpp +++ b/src/rt/rust_upcall.cpp @@ -92,7 +92,9 @@ upcall_new_port(rust_task *task, size_t unit_sz) { LOG_UPCALL_ENTRY(task); LOG(task, comm, "upcall_new_port(task=0x%" PRIxPTR " (%s), unit_sz=%d)", (uintptr_t) task, task->name, unit_sz); - return new (task) rust_port(task, unit_sz); + // take a reference on behalf of the port + task->ref(); + return new (task->kernel) rust_port(task, unit_sz); } extern "C" CDECL void @@ -101,6 +103,9 @@ upcall_del_port(rust_task *task, rust_port *port) { LOG(task, comm, "upcall del_port(0x%" PRIxPTR ")", (uintptr_t) port); I(task->sched, !port->ref_count); delete port; + + // FIXME: We shouldn't ever directly manipulate the ref count. + --task->ref_count; } /** @@ -114,7 +119,7 @@ upcall_new_chan(rust_task *task, rust_port *port) { "task=0x%" PRIxPTR " (%s), port=0x%" PRIxPTR ")", (uintptr_t) task, task->name, port); I(sched, port); - return new (task) rust_chan(task, port, port->unit_sz); + return new (task->kernel) rust_chan(task, port, port->unit_sz); } /** @@ -138,6 +143,8 @@ extern "C" CDECL void upcall_del_chan(rust_task *task, rust_chan *chan) { LOG_UPCALL_ENTRY(task); + I(task->sched, chan->task == task); + LOG(task, comm, "upcall del_chan(0x%" PRIxPTR ")", (uintptr_t) chan); chan->destroy(); } @@ -183,25 +190,27 @@ upcall_send(rust_task *task, rust_chan *chan, void *sptr) { extern "C" CDECL void upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) { - LOG_UPCALL_ENTRY(task); + { + scoped_lock with(port->lock); + LOG_UPCALL_ENTRY(task); - LOG(task, comm, "port: 0x%" PRIxPTR ", dptr: 0x%" PRIxPTR - ", size: 0x%" PRIxPTR ", chan_no: %d", - (uintptr_t) port, (uintptr_t) dptr, port->unit_sz, - port->chans.length()); + LOG(task, comm, "port: 0x%" PRIxPTR ", dptr: 0x%" PRIxPTR + ", size: 0x%" PRIxPTR ", chan_no: %d", + (uintptr_t) port, (uintptr_t) dptr, port->unit_sz, + port->chans.length()); - if (port->receive(dptr)) { - return; + if (port->receive(dptr)) { + return; + } + + // No data was buffered on any incoming channel, so block this task on + // the port. Remember the rendezvous location so that any sender task + // can write to it before waking up this task. + + LOG(task, comm, "<=== waiting for rendezvous data ==="); + task->rendezvous_ptr = dptr; + task->block(port, "waiting for rendezvous data"); } - - // No data was buffered on any incoming channel, so block this task on the - // port. Remember the rendezvous location so that any sender task can - // write to it before waking up this task. - - LOG(task, comm, "<=== waiting for rendezvous data ==="); - task->rendezvous_ptr = dptr; - task->block(port, "waiting for rendezvous data"); - task->yield(3); } diff --git a/src/rt/sync/sync.h b/src/rt/sync/sync.h index eb220e462c66..360fff1fabda 100644 --- a/src/rt/sync/sync.h +++ b/src/rt/sync/sync.h @@ -1,3 +1,4 @@ +// -*- c++-mode -*- #ifndef SYNC_H #define SYNC_H @@ -10,6 +11,26 @@ public: T oldValue, T newValue) { return __sync_bool_compare_and_swap(address, oldValue, newValue); } + + template + static T increment(T *address) { + return __sync_add_and_fetch(address, 1); + } + + template + static T decrement(T *address) { + return __sync_sub_and_fetch(address, 1); + } + + template + static T increment(T &address) { + return __sync_add_and_fetch(&address, 1); + } + + template + static T decrement(T &address) { + return __sync_sub_and_fetch(&address, 1); + } }; /**