rt: Remove rust_chan

This commit is contained in:
Brian Anderson
2011-11-11 11:46:07 -08:00
parent 39084fb881
commit 5d1e321ecb
9 changed files with 29 additions and 140 deletions

View File

@@ -45,7 +45,6 @@ RUNTIME_CS_$(1) := \
rt/rust_scheduler.cpp \ rt/rust_scheduler.cpp \
rt/rust_task.cpp \ rt/rust_task.cpp \
rt/rust_task_list.cpp \ rt/rust_task_list.cpp \
rt/rust_chan.cpp \
rt/rust_port.cpp \ rt/rust_port.cpp \
rt/rust_upcall.cpp \ rt/rust_upcall.cpp \
rt/rust_log.cpp \ rt/rust_log.cpp \
@@ -78,7 +77,6 @@ RUNTIME_HDR_$(1) := rt/globals.h \
rt/rust_gc.h \ rt/rust_gc.h \
rt/rust_internal.h \ rt/rust_internal.h \
rt/rust_util.h \ rt/rust_util.h \
rt/rust_chan.h \
rt/rust_env.h \ rt/rust_env.h \
rt/rust_obstack.h \ rt/rust_obstack.h \
rt/rust_unwind.h \ rt/rust_unwind.h \

View File

@@ -463,8 +463,7 @@ new_port(size_t unit_sz) {
rust_task *task = rust_scheduler::get_task(); rust_task *task = rust_scheduler::get_task();
LOG(task, comm, "new_port(task=0x%" PRIxPTR " (%s), unit_sz=%d)", LOG(task, comm, "new_port(task=0x%" PRIxPTR " (%s), unit_sz=%d)",
(uintptr_t) task, task->name, unit_sz); (uintptr_t) task, task->name, unit_sz);
// take a reference on behalf of the port // port starts with refcount == 1
task->ref();
return new (task->kernel, "rust_port") rust_port(task, unit_sz); return new (task->kernel, "rust_port") rust_port(task, unit_sz);
} }
@@ -472,11 +471,7 @@ extern "C" CDECL void
del_port(rust_port *port) { del_port(rust_port *port) {
rust_task *task = rust_scheduler::get_task(); rust_task *task = rust_scheduler::get_task();
LOG(task, comm, "del_port(0x%" PRIxPTR ")", (uintptr_t) port); LOG(task, comm, "del_port(0x%" PRIxPTR ")", (uintptr_t) port);
I(task->sched, !port->ref_count); port->deref();
delete port;
// FIXME: this should happen in the port.
task->deref();
} }
extern "C" CDECL rust_port_id extern "C" CDECL rust_port_id
@@ -486,7 +481,6 @@ get_port_id(rust_port *port) {
extern "C" CDECL extern "C" CDECL
void drop_port(rust_port *port) { void drop_port(rust_port *port) {
port->ref_count--;
} }
extern "C" CDECL void extern "C" CDECL void
@@ -497,10 +491,11 @@ chan_id_send(type_desc *t, rust_task_id target_task_id,
rust_task *target_task = task->kernel->get_task_by_id(target_task_id); rust_task *target_task = task->kernel->get_task_by_id(target_task_id);
if(target_task) { if(target_task) {
rust_port *port = target_task->get_port_by_id(target_port_id); rust_port *port = target_task->get_port_by_id(target_port_id);
target_task->deref();
if(port) { if(port) {
port->send(sptr); port->send(sptr);
port->deref();
} }
target_task->deref();
} }
} }

View File

@@ -1,53 +0,0 @@
#include "rust_internal.h"
#include "rust_chan.h"
/**
* Create a new rust channel and associate it with the specified port.
*/
rust_chan::rust_chan(rust_kernel *kernel, rust_port *port,
size_t unit_sz)
: ref_count(0),
kernel(kernel),
port(port),
buffer(kernel, unit_sz) {
KLOG(kernel, comm, "new rust_chan(task=0x%" PRIxPTR
", port=0x%" PRIxPTR ") -> chan=0x%" PRIxPTR,
(uintptr_t) task, (uintptr_t) port, (uintptr_t) this);
A(kernel, port != NULL, "Port must not be null");
this->task = port->task;
this->task->ref();
}
rust_chan::~rust_chan() {
KLOG(kernel, comm, "del rust_chan(task=0x%" PRIxPTR ")",
(uintptr_t) this);
I(this->kernel, !is_associated());
A(kernel, is_associated() == false,
"Channel must be disassociated before being freed.");
task->deref();
task = NULL;
}
bool rust_chan::is_associated() {
return port != NULL;
}
rust_chan *rust_chan::clone(rust_task *target) {
return new (target->kernel, "cloned chan")
rust_chan(kernel, port, buffer.unit_sz);
}
//
// Local Variables:
// mode: C++
// fill-column: 78;
// indent-tabs-mode: nil
// c-basic-offset: 4
// buffer-file-coding-system: utf-8-unix
// compile-command: "make -k -C $RBUILD 2>&1 | sed -e 's/\\/x\\//x:\\//g'";
// End:
//

View File

@@ -1,35 +0,0 @@
#ifndef RUST_CHAN_H
#define RUST_CHAN_H
class rust_chan : public kernel_owned<rust_chan>,
public rust_cond {
~rust_chan();
public:
RUST_ATOMIC_REFCOUNT();
rust_chan(rust_kernel *kernel, rust_port *port,
size_t unit_sz);
rust_kernel *kernel;
rust_task *task;
rust_port *port;
size_t idx;
circular_buffer buffer;
bool is_associated();
rust_chan *clone(rust_task *target);
};
//
// Local Variables:
// mode: C++
// fill-column: 78;
// indent-tabs-mode: nil
// c-basic-offset: 4
// buffer-file-coding-system: utf-8-unix
// compile-command: "make -k -C $RBUILD 2>&1 | sed -e 's/\\/x\\//x:\\//g'";
// End:
//
#endif /* RUST_CHAN_H */

View File

@@ -55,7 +55,6 @@ struct rust_scheduler;
struct rust_task; struct rust_task;
class rust_log; class rust_log;
class rust_port; class rust_port;
class rust_chan;
class rust_kernel; class rust_kernel;
class rust_crate_cache; class rust_crate_cache;
@@ -282,7 +281,6 @@ struct type_desc {
#include "circular_buffer.h" #include "circular_buffer.h"
#include "rust_task.h" #include "rust_task.h"
#include "rust_chan.h"
#include "rust_port.h" #include "rust_port.h"
#include "memory.h" #include "memory.h"

View File

@@ -1,61 +1,47 @@
#include "rust_internal.h" #include "rust_internal.h"
#include "rust_port.h" #include "rust_port.h"
#include "rust_chan.h"
rust_port::rust_port(rust_task *task, size_t unit_sz) rust_port::rust_port(rust_task *task, size_t unit_sz)
: ref_count(1), kernel(task->kernel), task(task), : ref_count(1), kernel(task->kernel), task(task),
unit_sz(unit_sz) { unit_sz(unit_sz), buffer(kernel, unit_sz) {
LOG(task, comm, LOG(task, comm,
"new rust_port(task=0x%" PRIxPTR ", unit_sz=%d) -> port=0x%" "new rust_port(task=0x%" PRIxPTR ", unit_sz=%d) -> port=0x%"
PRIxPTR, (uintptr_t)task, unit_sz, (uintptr_t)this); PRIxPTR, (uintptr_t)task, unit_sz, (uintptr_t)this);
task->ref();
id = task->register_port(this); id = task->register_port(this);
remote_chan = new (task->kernel, "rust_chan")
rust_chan(task->kernel, this, unit_sz);
remote_chan->ref();
remote_chan->port = this;
} }
rust_port::~rust_port() { rust_port::~rust_port() {
LOG(task, comm, "~rust_port 0x%" PRIxPTR, (uintptr_t) this); LOG(task, comm, "~rust_port 0x%" PRIxPTR, (uintptr_t) this);
{
scoped_lock with(lock);
remote_chan->port = NULL;
remote_chan->deref();
remote_chan = NULL;
}
task->release_port(id); task->release_port(id);
task->deref();
} }
void rust_port::send(void *sptr) { void rust_port::send(void *sptr) {
if (!remote_chan->is_associated()) { // FIXME: Is this lock really necessary? Why do we send with the lock
W(kernel, remote_chan->is_associated(), // but not receive with the lock?
"rust_chan::transmit with no associated port.");
return;
}
scoped_lock with(lock); scoped_lock with(lock);
remote_chan->buffer.enqueue(sptr); buffer.enqueue(sptr);
A(kernel, !remote_chan->buffer.is_empty(), A(kernel, !buffer.is_empty(),
"rust_chan::transmit with nothing to send."); "rust_chan::transmit with nothing to send.");
if (task->blocked_on(this)) { if (task->blocked_on(this)) {
KLOG(kernel, comm, "dequeued in rendezvous_ptr"); KLOG(kernel, comm, "dequeued in rendezvous_ptr");
remote_chan->buffer.dequeue(task->rendezvous_ptr); buffer.dequeue(task->rendezvous_ptr);
task->rendezvous_ptr = 0; task->rendezvous_ptr = 0;
task->wakeup(this); task->wakeup(this);
} }
} }
bool rust_port::receive(void *dptr) { bool rust_port::receive(void *dptr) {
if (remote_chan->buffer.is_empty() == false) { if (buffer.is_empty() == false) {
remote_chan->buffer.dequeue(dptr); buffer.dequeue(dptr);
LOG(task, comm, "<=== read data ==="); LOG(task, comm, "<=== read data ===");
return true; return true;
} }
@@ -64,9 +50,8 @@ bool rust_port::receive(void *dptr) {
void rust_port::log_state() { void rust_port::log_state() {
LOG(task, comm, LOG(task, comm,
"\tchan: 0x%" PRIxPTR ", size: %d", "port size: %d",
remote_chan, buffer.size());
remote_chan->buffer.size());
} }
// //

View File

@@ -3,14 +3,14 @@
class rust_port : public kernel_owned<rust_port>, public rust_cond { class rust_port : public kernel_owned<rust_port>, public rust_cond {
public: public:
RUST_REFCOUNTED(rust_port); RUST_ATOMIC_REFCOUNT();
rust_port_id id; rust_port_id id;
rust_kernel *kernel; rust_kernel *kernel;
rust_task *task; rust_task *task;
rust_chan *remote_chan;
size_t unit_sz; size_t unit_sz;
circular_buffer buffer;
lock_and_signal lock; lock_and_signal lock;

View File

@@ -122,23 +122,22 @@ rust_task::rust_task(rust_scheduler *sched, rust_task_list *state,
rust_task::~rust_task() rust_task::~rust_task()
{ {
I(sched, !sched->lock.lock_held_by_current_thread()); I(sched, !sched->lock.lock_held_by_current_thread());
I(sched, port_table.is_empty());
DLOG(sched, task, "~rust_task %s @0x%" PRIxPTR ", refcnt=%d", DLOG(sched, task, "~rust_task %s @0x%" PRIxPTR ", refcnt=%d",
name, (uintptr_t)this, ref_count); name, (uintptr_t)this, ref_count);
if(user.notify_enabled) { if(user.notify_enabled) {
rust_chan *target = rust_port *target =
get_chan_by_handle(&user.notify_chan); get_port_by_chan_handle(&user.notify_chan);
if(target) { if(target) {
task_notification msg; task_notification msg;
msg.id = user.id; msg.id = user.id;
msg.result = failed ? tr_failure : tr_success; msg.result = failed ? tr_failure : tr_success;
if (target->is_associated()) { target->send(&msg);
target->port->send(&msg);
target->deref(); target->deref();
} }
} }
}
if (supervisor) { if (supervisor) {
supervisor->deref(); supervisor->deref();
@@ -552,16 +551,18 @@ rust_port *rust_task::get_port_by_id(rust_port_id id) {
scoped_lock with(lock); scoped_lock with(lock);
rust_port *port = NULL; rust_port *port = NULL;
port_table.get(id, &port); port_table.get(id, &port);
if (port) {
port->ref();
}
return port; return port;
} }
rust_chan *rust_task::get_chan_by_handle(chan_handle *handle) { rust_port *rust_task::get_port_by_chan_handle(chan_handle *handle) {
rust_task *target_task = kernel->get_task_by_id(handle->task); rust_task *target_task = kernel->get_task_by_id(handle->task);
if(target_task) { if(target_task) {
rust_port *port = target_task->get_port_by_id(handle->port); rust_port *port = target_task->get_port_by_id(handle->port);
target_task->deref(); target_task->deref();
port->remote_chan->ref(); return port;
return port->remote_chan;
} }
return NULL; return NULL;
} }

View File

@@ -209,7 +209,7 @@ rust_task : public kernel_owned<rust_task>, rust_cond
// not at all safe. // not at all safe.
intptr_t get_ref_count() const { return ref_count; } intptr_t get_ref_count() const { return ref_count; }
rust_chan *get_chan_by_handle(chan_handle *handle); rust_port *get_port_by_chan_handle(chan_handle *handle);
// FIXME: These functions only exist to get the tasking system off the // FIXME: These functions only exist to get the tasking system off the
// ground. We should never be migrating shared boxes between tasks. // ground. We should never be migrating shared boxes between tasks.