Added send and receive to comm library.
This commit is contained in:
@@ -1,4 +1,6 @@
|
||||
import sys;
|
||||
import ptr;
|
||||
import unsafe;
|
||||
|
||||
export _chan;
|
||||
export _port;
|
||||
@@ -6,16 +8,19 @@ export _port;
|
||||
export mk_port;
|
||||
|
||||
native "rust" mod rustrt {
|
||||
type void;
|
||||
type rust_chan;
|
||||
type rust_port;
|
||||
|
||||
fn new_chan(po : *rust_port) -> *rust_chan;
|
||||
fn del_chan(ch : *rust_chan);
|
||||
fn drop_chan(ch : *rust_chan);
|
||||
fn chan_send(ch: *rust_chan, v : *void);
|
||||
|
||||
fn new_port(unit_sz : uint) -> *rust_port;
|
||||
fn del_port(po : *rust_port);
|
||||
fn drop_port(po : *rust_port);
|
||||
fn port_recv(dp : *void, po : *rust_port);
|
||||
}
|
||||
|
||||
resource chan_ptr(ch: *rustrt::rust_chan) {
|
||||
@@ -32,7 +37,8 @@ resource port_ptr(po: *rustrt::rust_port) {
|
||||
|
||||
obj _chan[T](raw_chan : @chan_ptr) {
|
||||
fn send(v : &T) {
|
||||
|
||||
rustrt::chan_send(**raw_chan,
|
||||
unsafe::reinterpret_cast(ptr::addr_of(v)));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -42,7 +48,8 @@ obj _port[T](raw_port : @port_ptr) {
|
||||
}
|
||||
|
||||
fn recv_into(v : &T) {
|
||||
|
||||
rustrt::port_recv(unsafe::reinterpret_cast(ptr::addr_of(v)),
|
||||
**raw_port);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -914,6 +914,36 @@ void drop_port(rust_task *, rust_port *port) {
|
||||
port->ref_count--;
|
||||
}
|
||||
|
||||
extern "C" CDECL void
|
||||
chan_send(rust_task *task, rust_chan *chan, void *sptr) {
|
||||
chan->send(sptr);
|
||||
}
|
||||
|
||||
extern "C" CDECL void
|
||||
port_recv(rust_task *task, uintptr_t *dptr, rust_port *port) {
|
||||
{
|
||||
scoped_lock with(port->lock);
|
||||
|
||||
LOG(task, comm, "port: 0x%" PRIxPTR ", dptr: 0x%" PRIxPTR
|
||||
", size: 0x%" PRIxPTR ", chan_no: %d",
|
||||
(uintptr_t) port, (uintptr_t) dptr, port->unit_sz,
|
||||
port->chans.length());
|
||||
|
||||
if (port->receive(dptr)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// No data was buffered on any incoming channel, so block this task on
|
||||
// the port. Remember the rendezvous location so that any sender task
|
||||
// can write to it before waking up this task.
|
||||
|
||||
LOG(task, comm, "<=== waiting for rendezvous data ===");
|
||||
task->rendezvous_ptr = dptr;
|
||||
task->block(port, "waiting for rendezvous data");
|
||||
}
|
||||
task->yield(3);
|
||||
}
|
||||
|
||||
//
|
||||
// Local Variables:
|
||||
// mode: C++
|
||||
|
||||
@@ -193,30 +193,12 @@ upcall_send(rust_task *task, rust_chan *chan, void *sptr) {
|
||||
LOG(task, comm, "=== sent data ===>");
|
||||
}
|
||||
|
||||
extern "C" CDECL void
|
||||
port_recv(rust_task *task, uintptr_t *dptr, rust_port *port);
|
||||
extern "C" CDECL void
|
||||
upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) {
|
||||
LOG_UPCALL_ENTRY(task);
|
||||
{
|
||||
scoped_lock with(port->lock);
|
||||
|
||||
LOG(task, comm, "port: 0x%" PRIxPTR ", dptr: 0x%" PRIxPTR
|
||||
", size: 0x%" PRIxPTR ", chan_no: %d",
|
||||
(uintptr_t) port, (uintptr_t) dptr, port->unit_sz,
|
||||
port->chans.length());
|
||||
|
||||
if (port->receive(dptr)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// No data was buffered on any incoming channel, so block this task on
|
||||
// the port. Remember the rendezvous location so that any sender task
|
||||
// can write to it before waking up this task.
|
||||
|
||||
LOG(task, comm, "<=== waiting for rendezvous data ===");
|
||||
task->rendezvous_ptr = dptr;
|
||||
task->block(port, "waiting for rendezvous data");
|
||||
}
|
||||
task->yield(3);
|
||||
port_recv(task, dptr, port);
|
||||
}
|
||||
|
||||
extern "C" CDECL void
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
align_of
|
||||
chan_send
|
||||
check_claims
|
||||
clone_chan
|
||||
debug_box
|
||||
@@ -27,6 +28,7 @@ nano_time
|
||||
new_chan
|
||||
new_port
|
||||
pin_task
|
||||
port_recv
|
||||
unpin_task
|
||||
rand_free
|
||||
rand_new
|
||||
|
||||
@@ -6,3 +6,15 @@ fn create_port_and_chan() {
|
||||
let p = comm::mk_port[int]();
|
||||
let c = p.mk_chan();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn send_recv() {
|
||||
let p = comm::mk_port();
|
||||
let c = p.mk_chan();
|
||||
|
||||
c.send(42);
|
||||
let v = 0;
|
||||
p.recv_into(v);
|
||||
|
||||
assert(42 == v);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user