diff --git a/src/rt/memory_region.cpp b/src/rt/memory_region.cpp index 6c50bf42d8c8..809ac81f716b 100644 --- a/src/rt/memory_region.cpp +++ b/src/rt/memory_region.cpp @@ -19,8 +19,18 @@ memory_region::memory_region(memory_region *parent) : // Nop. } +void memory_region::add_alloc() { + //_live_allocations++; + sync::increment(_live_allocations); +} + +void memory_region::dec_alloc() { + //_live_allocations--; + sync::decrement(_live_allocations); +} + void memory_region::free(void *mem) { - // printf("free: ptr 0x%" PRIxPTR"\n", (uintptr_t) mem); + // printf("free: ptr 0x%" PRIxPTR" region=%p\n", (uintptr_t) mem, this); if (!mem) { return; } if (_synchronized) { _lock.lock(); } #ifdef TRACK_ALLOCATIONS @@ -33,7 +43,7 @@ void memory_region::free(void *mem) { if (_live_allocations < 1) { _srv->fatal("live_allocs < 1", __FILE__, __LINE__, ""); } - _live_allocations--; + dec_alloc(); _srv->free(mem); if (_synchronized) { _lock.unlock(); } } @@ -42,7 +52,7 @@ void * memory_region::realloc(void *mem, size_t size) { if (_synchronized) { _lock.lock(); } if (!mem) { - _live_allocations++; + add_alloc(); } void *newMem = _srv->realloc(mem, size); #ifdef TRACK_ALLOCATIONS @@ -59,12 +69,13 @@ memory_region::realloc(void *mem, size_t size) { void * memory_region::malloc(size_t size) { if (_synchronized) { _lock.lock(); } - _live_allocations++; + add_alloc(); void *mem = _srv->malloc(size); #ifdef TRACK_ALLOCATIONS _allocation_list.append(mem); #endif - // printf("malloc: ptr 0x%" PRIxPTR "\n", (uintptr_t) mem); + // printf("malloc: ptr 0x%" PRIxPTR " region=%p\n", + // (uintptr_t) mem, this); if (_synchronized) { _lock.unlock(); } return mem; } @@ -72,7 +83,7 @@ memory_region::malloc(size_t size) { void * memory_region::calloc(size_t size) { if (_synchronized) { _lock.lock(); } - _live_allocations++; + add_alloc(); void *mem = _srv->malloc(size); memset(mem, 0, size); #ifdef TRACK_ALLOCATIONS diff --git a/src/rt/memory_region.h b/src/rt/memory_region.h index a72faf76338e..36b2e1a41640 100644 --- a/src/rt/memory_region.h +++ b/src/rt/memory_region.h @@ -22,6 +22,9 @@ private: const bool _detailed_leaks; const bool _synchronized; lock_and_signal _lock; + + void add_alloc(); + void dec_alloc(); public: memory_region(rust_srv *srv, bool synchronized); memory_region(memory_region *parent); diff --git a/src/rt/rust_chan.cpp b/src/rt/rust_chan.cpp index 78301e3d879d..141d60a3e9c1 100644 --- a/src/rt/rust_chan.cpp +++ b/src/rt/rust_chan.cpp @@ -6,11 +6,12 @@ */ rust_chan::rust_chan(rust_task *task, maybe_proxy *port, - size_t unit_sz) : - ref_count(1), - task(task), - port(port), - buffer(task, unit_sz) { + size_t unit_sz) + : ref_count(1), + kernel(task->kernel), + task(task), + port(port), + buffer(task, unit_sz) { ++task->ref_count; if (port) { associate(port); @@ -87,6 +88,7 @@ void rust_chan::send(void *sptr) { buffer.dequeue(NULL); } else { rust_port *target_port = port->referent(); + scoped_lock right(target_port->lock); if (target_port->task->blocked_on(target_port)) { DLOG(sched, comm, "dequeued in rendezvous_ptr"); buffer.dequeue(target_port->task->rendezvous_ptr); @@ -114,7 +116,7 @@ rust_chan *rust_chan::clone(maybe_proxy *target) { port = proxy; target_task = target->as_proxy()->handle()->referent(); } - return new (target_task) rust_chan(target_task, port, unit_sz); + return new (target_task->kernel) rust_chan(target_task, port, unit_sz); } /** diff --git a/src/rt/rust_chan.h b/src/rt/rust_chan.h index 4172ca5f09d8..752667b8a3c6 100644 --- a/src/rt/rust_chan.h +++ b/src/rt/rust_chan.h @@ -1,7 +1,7 @@ #ifndef RUST_CHAN_H #define RUST_CHAN_H -class rust_chan : public task_owned, +class rust_chan : public kernel_owned, public rust_cond { public: RUST_REFCOUNTED_WITH_DTOR(rust_chan, destroy()) @@ -9,6 +9,7 @@ public: ~rust_chan(); + rust_kernel *kernel; rust_task *task; maybe_proxy *port; size_t idx; diff --git a/src/rt/rust_port.cpp b/src/rt/rust_port.cpp index a2bd3b34c38d..0aebda859f88 100644 --- a/src/rt/rust_port.cpp +++ b/src/rt/rust_port.cpp @@ -1,16 +1,16 @@ #include "rust_internal.h" #include "rust_port.h" -rust_port::rust_port(rust_task *task, size_t unit_sz) : - maybe_proxy(this), task(task), - unit_sz(unit_sz), writers(task), chans(task) { +rust_port::rust_port(rust_task *task, size_t unit_sz) + : maybe_proxy(this), kernel(task->kernel), task(task), + unit_sz(unit_sz), writers(task), chans(task) { LOG(task, comm, "new rust_port(task=0x%" PRIxPTR ", unit_sz=%d) -> port=0x%" PRIxPTR, (uintptr_t)task, unit_sz, (uintptr_t)this); // Allocate a remote channel, for remote channel data. - remote_channel = new (task) rust_chan(task, this, unit_sz); + remote_channel = new (task->kernel) rust_chan(task, this, unit_sz); } rust_port::~rust_port() { diff --git a/src/rt/rust_port.h b/src/rt/rust_port.h index 7a58f839c44f..301422cc765d 100644 --- a/src/rt/rust_port.h +++ b/src/rt/rust_port.h @@ -2,9 +2,10 @@ #define RUST_PORT_H class rust_port : public maybe_proxy, - public task_owned { + public kernel_owned { public: + rust_kernel *kernel; rust_task *task; size_t unit_sz; ptr_vec writers; @@ -13,6 +14,8 @@ public: // Data sent to this port from remote tasks is buffered in this channel. rust_chan *remote_channel; + lock_and_signal lock; + rust_port(rust_task *task, size_t unit_sz); ~rust_port(); void log_state(); diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp index 721f5b0ed203..6c7ab5540021 100644 --- a/src/rt/rust_task.cpp +++ b/src/rt/rust_task.cpp @@ -401,6 +401,7 @@ rust_task::transition(rust_task_list *src, rust_task_list *dst) { void rust_task::block(rust_cond *on, const char* name) { + scoped_lock with(lock); LOG(this, task, "Blocking on 0x%" PRIxPTR ", cond: 0x%" PRIxPTR, (uintptr_t) on, (uintptr_t) cond); A(sched, cond == NULL, "Cannot block an already blocked task."); @@ -413,6 +414,7 @@ rust_task::block(rust_cond *on, const char* name) { void rust_task::wakeup(rust_cond *from) { + scoped_lock with(lock); A(sched, cond != NULL, "Cannot wake up unblocked task."); LOG(this, task, "Blocked on 0x%" PRIxPTR " woken up on 0x%" PRIxPTR, (uintptr_t) cond, (uintptr_t) from); @@ -430,6 +432,7 @@ rust_task::wakeup(rust_cond *from) { void rust_task::die() { + scoped_lock with(lock); transition(&sched->running_tasks, &sched->dead_tasks); } diff --git a/src/rt/rust_task.h b/src/rt/rust_task.h index e15f105222bf..c20eae7ece85 100644 --- a/src/rt/rust_task.h +++ b/src/rt/rust_task.h @@ -89,6 +89,8 @@ rust_task : public maybe_proxy, wakeup_callback *_on_wakeup; + lock_and_signal lock; + // Only a pointer to 'name' is kept, so it must live as long as this task. rust_task(rust_scheduler *sched, rust_task_list *state, diff --git a/src/rt/rust_upcall.cpp b/src/rt/rust_upcall.cpp index d89b278790b8..383e856f69a6 100644 --- a/src/rt/rust_upcall.cpp +++ b/src/rt/rust_upcall.cpp @@ -92,7 +92,9 @@ upcall_new_port(rust_task *task, size_t unit_sz) { LOG_UPCALL_ENTRY(task); LOG(task, comm, "upcall_new_port(task=0x%" PRIxPTR " (%s), unit_sz=%d)", (uintptr_t) task, task->name, unit_sz); - return new (task) rust_port(task, unit_sz); + // take a reference on behalf of the port + task->ref(); + return new (task->kernel) rust_port(task, unit_sz); } extern "C" CDECL void @@ -101,6 +103,9 @@ upcall_del_port(rust_task *task, rust_port *port) { LOG(task, comm, "upcall del_port(0x%" PRIxPTR ")", (uintptr_t) port); I(task->sched, !port->ref_count); delete port; + + // FIXME: We shouldn't ever directly manipulate the ref count. + --task->ref_count; } /** @@ -114,7 +119,7 @@ upcall_new_chan(rust_task *task, rust_port *port) { "task=0x%" PRIxPTR " (%s), port=0x%" PRIxPTR ")", (uintptr_t) task, task->name, port); I(sched, port); - return new (task) rust_chan(task, port, port->unit_sz); + return new (task->kernel) rust_chan(task, port, port->unit_sz); } /** @@ -138,6 +143,8 @@ extern "C" CDECL void upcall_del_chan(rust_task *task, rust_chan *chan) { LOG_UPCALL_ENTRY(task); + I(task->sched, chan->task == task); + LOG(task, comm, "upcall del_chan(0x%" PRIxPTR ")", (uintptr_t) chan); chan->destroy(); } @@ -183,25 +190,27 @@ upcall_send(rust_task *task, rust_chan *chan, void *sptr) { extern "C" CDECL void upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) { - LOG_UPCALL_ENTRY(task); + { + scoped_lock with(port->lock); + LOG_UPCALL_ENTRY(task); - LOG(task, comm, "port: 0x%" PRIxPTR ", dptr: 0x%" PRIxPTR - ", size: 0x%" PRIxPTR ", chan_no: %d", - (uintptr_t) port, (uintptr_t) dptr, port->unit_sz, - port->chans.length()); + LOG(task, comm, "port: 0x%" PRIxPTR ", dptr: 0x%" PRIxPTR + ", size: 0x%" PRIxPTR ", chan_no: %d", + (uintptr_t) port, (uintptr_t) dptr, port->unit_sz, + port->chans.length()); - if (port->receive(dptr)) { - return; + if (port->receive(dptr)) { + return; + } + + // No data was buffered on any incoming channel, so block this task on + // the port. Remember the rendezvous location so that any sender task + // can write to it before waking up this task. + + LOG(task, comm, "<=== waiting for rendezvous data ==="); + task->rendezvous_ptr = dptr; + task->block(port, "waiting for rendezvous data"); } - - // No data was buffered on any incoming channel, so block this task on the - // port. Remember the rendezvous location so that any sender task can - // write to it before waking up this task. - - LOG(task, comm, "<=== waiting for rendezvous data ==="); - task->rendezvous_ptr = dptr; - task->block(port, "waiting for rendezvous data"); - task->yield(3); } diff --git a/src/rt/sync/sync.h b/src/rt/sync/sync.h index eb220e462c66..360fff1fabda 100644 --- a/src/rt/sync/sync.h +++ b/src/rt/sync/sync.h @@ -1,3 +1,4 @@ +// -*- c++-mode -*- #ifndef SYNC_H #define SYNC_H @@ -10,6 +11,26 @@ public: T oldValue, T newValue) { return __sync_bool_compare_and_swap(address, oldValue, newValue); } + + template + static T increment(T *address) { + return __sync_add_and_fetch(address, 1); + } + + template + static T decrement(T *address) { + return __sync_sub_and_fetch(address, 1); + } + + template + static T increment(T &address) { + return __sync_add_and_fetch(&address, 1); + } + + template + static T decrement(T &address) { + return __sync_sub_and_fetch(&address, 1); + } }; /**