Disconnect ports before draining them. Issue #1155
This commit is contained in:
@@ -45,6 +45,7 @@ native mod rustrt {
|
|||||||
|
|
||||||
fn new_port(unit_sz: uint) -> *rust_port;
|
fn new_port(unit_sz: uint) -> *rust_port;
|
||||||
fn del_port(po: *rust_port);
|
fn del_port(po: *rust_port);
|
||||||
|
fn rust_port_detach(po: *rust_port);
|
||||||
fn get_port_id(po: *rust_port) -> port_id;
|
fn get_port_id(po: *rust_port) -> port_id;
|
||||||
fn rust_port_size(po: *rust_port) -> ctypes::size_t;
|
fn rust_port_size(po: *rust_port) -> ctypes::size_t;
|
||||||
}
|
}
|
||||||
@@ -79,6 +80,9 @@ tag chan<uniq T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
resource port_ptr<uniq T>(po: *rustrt::rust_port) {
|
resource port_ptr<uniq T>(po: *rustrt::rust_port) {
|
||||||
|
// Once the port is detached it's guaranteed not to receive further
|
||||||
|
// messages
|
||||||
|
rustrt::rust_port_detach(po);
|
||||||
// Drain the port so that all the still-enqueued items get dropped
|
// Drain the port so that all the still-enqueued items get dropped
|
||||||
while rustrt::rust_port_size(po) > 0u {
|
while rustrt::rust_port_size(po) > 0u {
|
||||||
// FIXME: For some reason if we don't assign to something here
|
// FIXME: For some reason if we don't assign to something here
|
||||||
|
|||||||
@@ -467,11 +467,24 @@ new_port(size_t unit_sz) {
|
|||||||
return new (task->kernel, "rust_port") rust_port(task, unit_sz);
|
return new (task->kernel, "rust_port") rust_port(task, unit_sz);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
extern "C" CDECL void
|
||||||
|
rust_port_detach(rust_port *port) {
|
||||||
|
rust_task *task = rust_scheduler::get_task();
|
||||||
|
LOG(task, comm, "rust_port_detach(0x%" PRIxPTR ")", (uintptr_t) port);
|
||||||
|
port->detach();
|
||||||
|
// FIXME: Busy waiting until we're the only ref
|
||||||
|
bool done = false;
|
||||||
|
while (!done) {
|
||||||
|
scoped_lock with(port->lock);
|
||||||
|
done = port->ref_count == 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
extern "C" CDECL void
|
extern "C" CDECL void
|
||||||
del_port(rust_port *port) {
|
del_port(rust_port *port) {
|
||||||
rust_task *task = rust_scheduler::get_task();
|
rust_task *task = rust_scheduler::get_task();
|
||||||
LOG(task, comm, "del_port(0x%" PRIxPTR ")", (uintptr_t) port);
|
LOG(task, comm, "del_port(0x%" PRIxPTR ")", (uintptr_t) port);
|
||||||
scoped_lock with(task->lock);
|
A(task->sched, port->ref_count == 1, "Expected port ref_count == 1");
|
||||||
port->deref();
|
port->deref();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -17,10 +17,17 @@ rust_port::rust_port(rust_task *task, size_t unit_sz)
|
|||||||
rust_port::~rust_port() {
|
rust_port::~rust_port() {
|
||||||
LOG(task, comm, "~rust_port 0x%" PRIxPTR, (uintptr_t) this);
|
LOG(task, comm, "~rust_port 0x%" PRIxPTR, (uintptr_t) this);
|
||||||
|
|
||||||
task->release_port(id);
|
|
||||||
task->deref();
|
task->deref();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void rust_port::detach() {
|
||||||
|
I(task->sched, !task->lock.lock_held_by_current_thread());
|
||||||
|
scoped_lock with(task->lock);
|
||||||
|
{
|
||||||
|
task->release_port(id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void rust_port::send(void *sptr) {
|
void rust_port::send(void *sptr) {
|
||||||
I(task->sched, !lock.lock_held_by_current_thread());
|
I(task->sched, !lock.lock_held_by_current_thread());
|
||||||
scoped_lock with(lock);
|
scoped_lock with(lock);
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ public:
|
|||||||
void send(void *sptr);
|
void send(void *sptr);
|
||||||
bool receive(void *dptr);
|
bool receive(void *dptr);
|
||||||
size_t size();
|
size_t size();
|
||||||
|
void detach();
|
||||||
};
|
};
|
||||||
|
|
||||||
//
|
//
|
||||||
|
|||||||
@@ -34,6 +34,7 @@ rust_get_stdout
|
|||||||
rust_get_stderr
|
rust_get_stderr
|
||||||
rust_str_push
|
rust_str_push
|
||||||
rust_list_files
|
rust_list_files
|
||||||
|
rust_port_detach
|
||||||
rust_port_size
|
rust_port_size
|
||||||
rust_process_wait
|
rust_process_wait
|
||||||
rust_ptr_eq
|
rust_ptr_eq
|
||||||
|
|||||||
27
src/test/run-pass/task-comm-chan-cleanup4.rs
Normal file
27
src/test/run-pass/task-comm-chan-cleanup4.rs
Normal file
@@ -0,0 +1,27 @@
|
|||||||
|
use std;
|
||||||
|
import std::int;
|
||||||
|
import std::comm;
|
||||||
|
import std::task;
|
||||||
|
|
||||||
|
// We're trying to trigger a race between send and port destruction that
|
||||||
|
// results in the string not being freed
|
||||||
|
|
||||||
|
fn starship(&&ch: std::comm::chan<str>) {
|
||||||
|
int::range(0, 10) { |_i|
|
||||||
|
comm::send(ch, "pew pew");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn starbase(&&_args: ()) {
|
||||||
|
int::range(0, 10) { |_i|
|
||||||
|
let p = comm::port();
|
||||||
|
task::spawn(comm::chan(p), starship);
|
||||||
|
task::yield();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn main() {
|
||||||
|
int::range(0, 10) { |_i|
|
||||||
|
task::spawn((), starbase);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user