Work on debugging race conditions.

Ports and channels have been moved to the kernel pool, since they've
been known to outlive their associated task. This probably isn't the
right thing to do, the life cycle needs fixed instead.

Some refactorying in memory_region.cpp. Added a helper function to
increment and decrement the allocation counter. This makes it easier
to switch between atomic and non-atomic increments. Using atomic
increments for now, although this still does not fix the problem.
This commit is contained in:
Eric Holk
2011-07-07 11:53:08 -07:00
parent dcd2563a3a
commit 8acadb17c2
10 changed files with 91 additions and 36 deletions

View File

@@ -19,8 +19,18 @@ memory_region::memory_region(memory_region *parent) :
// Nop.
}
void memory_region::add_alloc() {
//_live_allocations++;
sync::increment(_live_allocations);
}
void memory_region::dec_alloc() {
//_live_allocations--;
sync::decrement(_live_allocations);
}
void memory_region::free(void *mem) {
// printf("free: ptr 0x%" PRIxPTR"\n", (uintptr_t) mem);
// printf("free: ptr 0x%" PRIxPTR" region=%p\n", (uintptr_t) mem, this);
if (!mem) { return; }
if (_synchronized) { _lock.lock(); }
#ifdef TRACK_ALLOCATIONS
@@ -33,7 +43,7 @@ void memory_region::free(void *mem) {
if (_live_allocations < 1) {
_srv->fatal("live_allocs < 1", __FILE__, __LINE__, "");
}
_live_allocations--;
dec_alloc();
_srv->free(mem);
if (_synchronized) { _lock.unlock(); }
}
@@ -42,7 +52,7 @@ void *
memory_region::realloc(void *mem, size_t size) {
if (_synchronized) { _lock.lock(); }
if (!mem) {
_live_allocations++;
add_alloc();
}
void *newMem = _srv->realloc(mem, size);
#ifdef TRACK_ALLOCATIONS
@@ -59,12 +69,13 @@ memory_region::realloc(void *mem, size_t size) {
void *
memory_region::malloc(size_t size) {
if (_synchronized) { _lock.lock(); }
_live_allocations++;
add_alloc();
void *mem = _srv->malloc(size);
#ifdef TRACK_ALLOCATIONS
_allocation_list.append(mem);
#endif
// printf("malloc: ptr 0x%" PRIxPTR "\n", (uintptr_t) mem);
// printf("malloc: ptr 0x%" PRIxPTR " region=%p\n",
// (uintptr_t) mem, this);
if (_synchronized) { _lock.unlock(); }
return mem;
}
@@ -72,7 +83,7 @@ memory_region::malloc(size_t size) {
void *
memory_region::calloc(size_t size) {
if (_synchronized) { _lock.lock(); }
_live_allocations++;
add_alloc();
void *mem = _srv->malloc(size);
memset(mem, 0, size);
#ifdef TRACK_ALLOCATIONS

View File

@@ -22,6 +22,9 @@ private:
const bool _detailed_leaks;
const bool _synchronized;
lock_and_signal _lock;
void add_alloc();
void dec_alloc();
public:
memory_region(rust_srv *srv, bool synchronized);
memory_region(memory_region *parent);

View File

@@ -6,11 +6,12 @@
*/
rust_chan::rust_chan(rust_task *task,
maybe_proxy<rust_port> *port,
size_t unit_sz) :
ref_count(1),
task(task),
port(port),
buffer(task, unit_sz) {
size_t unit_sz)
: ref_count(1),
kernel(task->kernel),
task(task),
port(port),
buffer(task, unit_sz) {
++task->ref_count;
if (port) {
associate(port);
@@ -87,6 +88,7 @@ void rust_chan::send(void *sptr) {
buffer.dequeue(NULL);
} else {
rust_port *target_port = port->referent();
scoped_lock right(target_port->lock);
if (target_port->task->blocked_on(target_port)) {
DLOG(sched, comm, "dequeued in rendezvous_ptr");
buffer.dequeue(target_port->task->rendezvous_ptr);
@@ -114,7 +116,7 @@ rust_chan *rust_chan::clone(maybe_proxy<rust_task> *target) {
port = proxy;
target_task = target->as_proxy()->handle()->referent();
}
return new (target_task) rust_chan(target_task, port, unit_sz);
return new (target_task->kernel) rust_chan(target_task, port, unit_sz);
}
/**

View File

@@ -1,7 +1,7 @@
#ifndef RUST_CHAN_H
#define RUST_CHAN_H
class rust_chan : public task_owned<rust_chan>,
class rust_chan : public kernel_owned<rust_chan>,
public rust_cond {
public:
RUST_REFCOUNTED_WITH_DTOR(rust_chan, destroy())
@@ -9,6 +9,7 @@ public:
~rust_chan();
rust_kernel *kernel;
rust_task *task;
maybe_proxy<rust_port> *port;
size_t idx;

View File

@@ -1,16 +1,16 @@
#include "rust_internal.h"
#include "rust_port.h"
rust_port::rust_port(rust_task *task, size_t unit_sz) :
maybe_proxy<rust_port>(this), task(task),
unit_sz(unit_sz), writers(task), chans(task) {
rust_port::rust_port(rust_task *task, size_t unit_sz)
: maybe_proxy<rust_port>(this), kernel(task->kernel), task(task),
unit_sz(unit_sz), writers(task), chans(task) {
LOG(task, comm,
"new rust_port(task=0x%" PRIxPTR ", unit_sz=%d) -> port=0x%"
PRIxPTR, (uintptr_t)task, unit_sz, (uintptr_t)this);
// Allocate a remote channel, for remote channel data.
remote_channel = new (task) rust_chan(task, this, unit_sz);
remote_channel = new (task->kernel) rust_chan(task, this, unit_sz);
}
rust_port::~rust_port() {

View File

@@ -2,9 +2,10 @@
#define RUST_PORT_H
class rust_port : public maybe_proxy<rust_port>,
public task_owned<rust_port> {
public kernel_owned<rust_port> {
public:
rust_kernel *kernel;
rust_task *task;
size_t unit_sz;
ptr_vec<rust_token> writers;
@@ -13,6 +14,8 @@ public:
// Data sent to this port from remote tasks is buffered in this channel.
rust_chan *remote_channel;
lock_and_signal lock;
rust_port(rust_task *task, size_t unit_sz);
~rust_port();
void log_state();

View File

@@ -401,6 +401,7 @@ rust_task::transition(rust_task_list *src, rust_task_list *dst) {
void
rust_task::block(rust_cond *on, const char* name) {
scoped_lock with(lock);
LOG(this, task, "Blocking on 0x%" PRIxPTR ", cond: 0x%" PRIxPTR,
(uintptr_t) on, (uintptr_t) cond);
A(sched, cond == NULL, "Cannot block an already blocked task.");
@@ -413,6 +414,7 @@ rust_task::block(rust_cond *on, const char* name) {
void
rust_task::wakeup(rust_cond *from) {
scoped_lock with(lock);
A(sched, cond != NULL, "Cannot wake up unblocked task.");
LOG(this, task, "Blocked on 0x%" PRIxPTR " woken up on 0x%" PRIxPTR,
(uintptr_t) cond, (uintptr_t) from);
@@ -430,6 +432,7 @@ rust_task::wakeup(rust_cond *from) {
void
rust_task::die() {
scoped_lock with(lock);
transition(&sched->running_tasks, &sched->dead_tasks);
}

View File

@@ -89,6 +89,8 @@ rust_task : public maybe_proxy<rust_task>,
wakeup_callback *_on_wakeup;
lock_and_signal lock;
// Only a pointer to 'name' is kept, so it must live as long as this task.
rust_task(rust_scheduler *sched,
rust_task_list *state,

View File

@@ -92,7 +92,9 @@ upcall_new_port(rust_task *task, size_t unit_sz) {
LOG_UPCALL_ENTRY(task);
LOG(task, comm, "upcall_new_port(task=0x%" PRIxPTR " (%s), unit_sz=%d)",
(uintptr_t) task, task->name, unit_sz);
return new (task) rust_port(task, unit_sz);
// take a reference on behalf of the port
task->ref();
return new (task->kernel) rust_port(task, unit_sz);
}
extern "C" CDECL void
@@ -101,6 +103,9 @@ upcall_del_port(rust_task *task, rust_port *port) {
LOG(task, comm, "upcall del_port(0x%" PRIxPTR ")", (uintptr_t) port);
I(task->sched, !port->ref_count);
delete port;
// FIXME: We shouldn't ever directly manipulate the ref count.
--task->ref_count;
}
/**
@@ -114,7 +119,7 @@ upcall_new_chan(rust_task *task, rust_port *port) {
"task=0x%" PRIxPTR " (%s), port=0x%" PRIxPTR ")",
(uintptr_t) task, task->name, port);
I(sched, port);
return new (task) rust_chan(task, port, port->unit_sz);
return new (task->kernel) rust_chan(task, port, port->unit_sz);
}
/**
@@ -138,6 +143,8 @@ extern "C" CDECL
void upcall_del_chan(rust_task *task, rust_chan *chan) {
LOG_UPCALL_ENTRY(task);
I(task->sched, chan->task == task);
LOG(task, comm, "upcall del_chan(0x%" PRIxPTR ")", (uintptr_t) chan);
chan->destroy();
}
@@ -183,25 +190,27 @@ upcall_send(rust_task *task, rust_chan *chan, void *sptr) {
extern "C" CDECL void
upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) {
LOG_UPCALL_ENTRY(task);
{
scoped_lock with(port->lock);
LOG_UPCALL_ENTRY(task);
LOG(task, comm, "port: 0x%" PRIxPTR ", dptr: 0x%" PRIxPTR
", size: 0x%" PRIxPTR ", chan_no: %d",
(uintptr_t) port, (uintptr_t) dptr, port->unit_sz,
port->chans.length());
LOG(task, comm, "port: 0x%" PRIxPTR ", dptr: 0x%" PRIxPTR
", size: 0x%" PRIxPTR ", chan_no: %d",
(uintptr_t) port, (uintptr_t) dptr, port->unit_sz,
port->chans.length());
if (port->receive(dptr)) {
return;
if (port->receive(dptr)) {
return;
}
// No data was buffered on any incoming channel, so block this task on
// the port. Remember the rendezvous location so that any sender task
// can write to it before waking up this task.
LOG(task, comm, "<=== waiting for rendezvous data ===");
task->rendezvous_ptr = dptr;
task->block(port, "waiting for rendezvous data");
}
// No data was buffered on any incoming channel, so block this task on the
// port. Remember the rendezvous location so that any sender task can
// write to it before waking up this task.
LOG(task, comm, "<=== waiting for rendezvous data ===");
task->rendezvous_ptr = dptr;
task->block(port, "waiting for rendezvous data");
task->yield(3);
}

View File

@@ -1,3 +1,4 @@
// -*- c++-mode -*-
#ifndef SYNC_H
#define SYNC_H
@@ -10,6 +11,26 @@ public:
T oldValue, T newValue) {
return __sync_bool_compare_and_swap(address, oldValue, newValue);
}
template <class T>
static T increment(T *address) {
return __sync_add_and_fetch(address, 1);
}
template <class T>
static T decrement(T *address) {
return __sync_sub_and_fetch(address, 1);
}
template <class T>
static T increment(T &address) {
return __sync_add_and_fetch(&address, 1);
}
template <class T>
static T decrement(T &address) {
return __sync_sub_and_fetch(&address, 1);
}
};
/**