Pipe code cleanup

This commit is contained in:
Eric Holk
2012-07-10 16:46:16 -07:00
parent 877ebed6cf
commit 71339d9e69
2 changed files with 11 additions and 21 deletions

View File

@@ -133,7 +133,6 @@ fn try_recv<T: send>(-p: recv_packet<T>) -> option<T> {
rustrt::task_clear_event_reject(this); rustrt::task_clear_event_reject(this);
let old_state = swap_state_acq(p.header.state, let old_state = swap_state_acq(p.header.state,
blocked); blocked);
#debug("%?", old_state);
alt old_state { alt old_state {
empty { empty {
#debug("no data available on %?, going to sleep.", p_); #debug("no data available on %?, going to sleep.", p_);
@@ -230,7 +229,7 @@ impl private_methods for packet_header {
#[doc = "Returns when one of the packet headers reports data is #[doc = "Returns when one of the packet headers reports data is
available."] available."]
fn wait_many(pkts: ~[&a.packet_header]) -> uint { fn wait_many(pkts: &[&a.packet_header]) -> uint {
let this = rustrt::rust_get_task(); let this = rustrt::rust_get_task();
rustrt::task_clear_event_reject(this); rustrt::task_clear_event_reject(this);
@@ -283,39 +282,30 @@ fn select2<A: send, B: send>(
+b: recv_packet<B>) +b: recv_packet<B>)
-> either<(option<A>, recv_packet<B>), (recv_packet<A>, option<B>)> -> either<(option<A>, recv_packet<B>), (recv_packet<A>, option<B>)>
{ {
let a = unsafe { uniquify(a.unwrap()) }; let i = wait_many([a.header(), b.header()]/_);
let b = unsafe { uniquify(b.unwrap()) };
let i = {
let headers = ~[&a.header,
&b.header];
wait_many(headers)
};
unsafe { unsafe {
alt i { alt i {
0 { left((try_recv(recv_packet(transmute(a))), 0 { left((try_recv(a), b)) }
recv_packet(transmute(b)))) } 1 { right((a, try_recv(b))) }
1 { right((recv_packet(transmute(a)),
try_recv(recv_packet(transmute(b))))) }
_ { fail "select2 return an invalid packet" } _ { fail "select2 return an invalid packet" }
} }
} }
} }
fn selecti<T: send>(endpoints: &[&recv_packet<T>]) -> uint {
wait_many(endpoints.map(|p| p.header()))
}
#[doc = "Waits on a set of endpoints. Returns a message, its index, #[doc = "Waits on a set of endpoints. Returns a message, its index,
and a list of the remaining endpoints."] and a list of the remaining endpoints."]
fn select<T: send>(+endpoints: ~[recv_packet<T>]) fn select<T: send>(+endpoints: ~[recv_packet<T>])
-> (uint, option<T>, ~[recv_packet<T>]) -> (uint, option<T>, ~[recv_packet<T>])
{ {
let endpoints = vec::map_consume( let ready = wait_many(endpoints.map(|p| p.header()));
endpoints,
|p| unsafe { uniquify(p.unwrap()) });
let endpoints_r = vec::view(endpoints, 0, endpoints.len());
let ready = wait_many(endpoints_r.map_r(|p| &p.header));
let mut remaining = ~[]; let mut remaining = ~[];
let mut result = none; let mut result = none;
do vec::consume(endpoints) |i, p| { do vec::consume(endpoints) |i, p| {
let p = recv_packet(unsafe { unsafe::transmute(p) });
if i == ready { if i == ready {
result = try_recv(p); result = try_recv(p);
} }

View File

@@ -51,7 +51,7 @@ fn main() {
}); });
let (c1, p1) = oneshot::init(); let (c1, p1) = oneshot::init();
let (c2, p2) = oneshot::init(); let (_c2, p2) = oneshot::init();
let c = send(c, (p1, p2)); let c = send(c, (p1, p2));
@@ -59,7 +59,7 @@ fn main() {
signal(c1); signal(c1);
let (c1, p1) = oneshot::init(); let (_c1, p1) = oneshot::init();
let (c2, p2) = oneshot::init(); let (c2, p2) = oneshot::init();
send(c, (p1, p2)); send(c, (p1, p2));