Port future to pipes. Graph500 is about 21% faster now.
Making all tests pass.
This commit is contained in:
@@ -22,7 +22,10 @@ export get;
|
||||
export with;
|
||||
export spawn;
|
||||
|
||||
/// The future type
|
||||
// for task.rs
|
||||
export future_pipe;
|
||||
|
||||
#[doc = "The future type"]
|
||||
enum future<A> = {
|
||||
mut v: either<@A, fn@() -> A>
|
||||
};
|
||||
@@ -56,16 +59,34 @@ fn from_value<A>(+val: A) -> future<A> {
|
||||
})
|
||||
}
|
||||
|
||||
fn from_port<A:send>(-port: comm::port<A>) -> future<A> {
|
||||
/*!
|
||||
* Create a future from a port
|
||||
*
|
||||
* The first time that the value is requested the task will block
|
||||
* waiting for the result to be received on the port.
|
||||
*/
|
||||
fn macros() {
|
||||
#macro[
|
||||
[#recv[chan],
|
||||
chan.recv()(chan)]
|
||||
];
|
||||
#macro[
|
||||
[#move[x],
|
||||
unsafe { let y <- *ptr::addr_of(x); y }]
|
||||
];
|
||||
}
|
||||
|
||||
do from_fn {
|
||||
comm::recv(port)
|
||||
fn from_port<A:send>(-port: future_pipe::client::waiting<A>) -> future<A> {
|
||||
#[doc = "
|
||||
Create a future from a port
|
||||
|
||||
The first time that the value is requested the task will block
|
||||
waiting for the result to be received on the port.
|
||||
"];
|
||||
import future_pipe::client::recv;
|
||||
|
||||
let port = ~mut some(port);
|
||||
do from_fn |move port| {
|
||||
let mut port_ = none;
|
||||
port_ <-> *port;
|
||||
let port = option::unwrap(port_);
|
||||
alt (#recv(port)) {
|
||||
future_pipe::completed(data, _next) { #move(data) }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -91,12 +112,9 @@ fn spawn<A:send>(+blk: fn~() -> A) -> future<A> {
|
||||
* value of the future.
|
||||
*/
|
||||
|
||||
let mut po = comm::port();
|
||||
let ch = comm::chan(po);
|
||||
do task::spawn {
|
||||
comm::send(ch, blk())
|
||||
};
|
||||
from_port(po)
|
||||
from_port(pipes::spawn_service_recv(future_pipe::init, |ch| {
|
||||
future_pipe::server::completed(ch, blk());
|
||||
}))
|
||||
}
|
||||
|
||||
fn get<A:copy>(future: future<A>) -> A {
|
||||
@@ -119,6 +137,48 @@ fn with<A,B>(future: future<A>, blk: fn(A) -> B) -> B {
|
||||
blk(*v)
|
||||
}
|
||||
|
||||
// The pipe protocol, generated by pipec
|
||||
mod future_pipe {
|
||||
fn init<T: send>() -> (client::waiting<T>, server::waiting<T>) {
|
||||
{ let (s, c) = pipes::entangle(); (c, s) }
|
||||
}
|
||||
enum waiting<T: send> { completed(T, client::terminated), }
|
||||
enum terminated { }
|
||||
mod client {
|
||||
impl recv<T: send> for waiting<T> {
|
||||
fn recv() -> extern fn(+waiting<T>) -> future_pipe::waiting<T> {
|
||||
fn recv<T: send>(+pipe: waiting<T>) ->
|
||||
future_pipe::waiting<T> {
|
||||
option::unwrap(pipes::recv(pipe))
|
||||
}
|
||||
recv
|
||||
}
|
||||
}
|
||||
type waiting<T: send> = pipes::recv_packet<future_pipe::waiting<T>>;
|
||||
type terminated = pipes::send_packet<future_pipe::terminated>;
|
||||
}
|
||||
mod server {
|
||||
fn completed<T: send>(+pipe: waiting<T>, +x_0: T) -> terminated {
|
||||
{
|
||||
let (s, c) = pipes::entangle();
|
||||
let message = future_pipe::completed(x_0, s);
|
||||
pipes::send(pipe, message);
|
||||
c
|
||||
}
|
||||
}
|
||||
type waiting<T: send> = pipes::send_packet<future_pipe::waiting<T>>;
|
||||
impl recv for terminated {
|
||||
fn recv() -> extern fn(+terminated) -> future_pipe::terminated {
|
||||
fn recv(+pipe: terminated) -> future_pipe::terminated {
|
||||
option::unwrap(pipes::recv(pipe))
|
||||
}
|
||||
recv
|
||||
}
|
||||
}
|
||||
type terminated = pipes::recv_packet<future_pipe::terminated>;
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_from_value() {
|
||||
let f = from_value("snail");
|
||||
@@ -127,9 +187,8 @@ fn test_from_value() {
|
||||
|
||||
#[test]
|
||||
fn test_from_port() {
|
||||
let po = comm::port();
|
||||
let ch = comm::chan(po);
|
||||
comm::send(ch, "whale");
|
||||
let (po, ch) = future_pipe::init();
|
||||
future_pipe::server::completed(ch, "whale");
|
||||
let f = from_port(po);
|
||||
assert get(f) == "whale";
|
||||
}
|
||||
|
||||
@@ -239,3 +239,22 @@ fn spawn_service<T: send>(
|
||||
|
||||
client
|
||||
}
|
||||
|
||||
fn spawn_service_recv<T: send>(
|
||||
init: native fn() -> (recv_packet<T>, send_packet<T>),
|
||||
+service: fn~(+send_packet<T>))
|
||||
-> recv_packet<T>
|
||||
{
|
||||
let (client, server) = init();
|
||||
|
||||
// This is some nasty gymnastics required to safely move the pipe
|
||||
// into a new task.
|
||||
let server = ~mut some(server);
|
||||
do task::spawn |move service| {
|
||||
let mut server_ = none;
|
||||
server_ <-> *server;
|
||||
service(option::unwrap(server_))
|
||||
}
|
||||
|
||||
client
|
||||
}
|
||||
|
||||
@@ -308,11 +308,21 @@ fn future_result(builder: builder) -> future::future<task_result> {
|
||||
fn future_task(builder: builder) -> future::future<task> {
|
||||
//! Get a future representing the handle to the new task
|
||||
|
||||
let mut po = comm::port();
|
||||
let ch = comm::chan(po);
|
||||
do add_wrapper(builder) |body| {
|
||||
fn~() {
|
||||
comm::send(ch, get_task());
|
||||
import future::future_pipe;
|
||||
|
||||
let (po, ch) = future_pipe::init();
|
||||
|
||||
let ch = ~mut some(ch);
|
||||
|
||||
do add_wrapper(builder) |body, move ch| {
|
||||
let ch = { let mut t = none;
|
||||
t <-> *ch;
|
||||
~mut t};
|
||||
fn~(move ch) {
|
||||
let mut po = none;
|
||||
po <-> *ch;
|
||||
future_pipe::server::completed(option::unwrap(po),
|
||||
get_task());
|
||||
body();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user