Make moves explicit in pipes and pipe compiler
This commit is contained in:
@@ -75,7 +75,7 @@ fn from_value<A>(+val: A) -> Future<A> {
|
|||||||
* not block.
|
* not block.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
Future {state: Forced(~val)}
|
Future {state: Forced(~(move val))}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn from_port<A:Send>(+port: future_pipe::client::waiting<A>) -> Future<A> {
|
fn from_port<A:Send>(+port: future_pipe::client::waiting<A>) -> Future<A> {
|
||||||
@@ -86,13 +86,13 @@ fn from_port<A:Send>(+port: future_pipe::client::waiting<A>) -> Future<A> {
|
|||||||
* waiting for the result to be received on the port.
|
* waiting for the result to be received on the port.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
let port = ~mut Some(port);
|
let port = ~mut Some(move port);
|
||||||
do from_fn |move port| {
|
do from_fn |move port| {
|
||||||
let mut port_ = None;
|
let mut port_ = None;
|
||||||
port_ <-> *port;
|
port_ <-> *port;
|
||||||
let port = option::unwrap(port_);
|
let port = option::unwrap(port_);
|
||||||
match recv(port) {
|
match recv(move port) {
|
||||||
future_pipe::completed(move data) => data
|
future_pipe::completed(move data) => move data
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -106,7 +106,7 @@ fn from_fn<A>(+f: ~fn() -> A) -> Future<A> {
|
|||||||
* function. It is not spawned into another task.
|
* function. It is not spawned into another task.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
Future {state: Pending(f)}
|
Future {state: Pending(move f)}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn spawn<A:Send>(+blk: fn~() -> A) -> Future<A> {
|
fn spawn<A:Send>(+blk: fn~() -> A) -> Future<A> {
|
||||||
@@ -117,8 +117,8 @@ fn spawn<A:Send>(+blk: fn~() -> A) -> Future<A> {
|
|||||||
* value of the future.
|
* value of the future.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
from_port(pipes::spawn_service_recv(future_pipe::init, |ch| {
|
from_port(pipes::spawn_service_recv(future_pipe::init, |move blk, ch| {
|
||||||
future_pipe::server::completed(ch, blk());
|
future_pipe::server::completed(move ch, blk());
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -113,7 +113,7 @@ export send_packet, recv_packet, buffer_header;
|
|||||||
const SPIN_COUNT: uint = 0;
|
const SPIN_COUNT: uint = 0;
|
||||||
|
|
||||||
macro_rules! move_it (
|
macro_rules! move_it (
|
||||||
{ $x:expr } => { unsafe { let y <- *ptr::addr_of($x); y } }
|
{ $x:expr } => { unsafe { let y <- *ptr::addr_of($x); move y } }
|
||||||
)
|
)
|
||||||
|
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
@@ -262,8 +262,7 @@ fn unibuffer<T: Send>() -> ~Buffer<Packet<T>> {
|
|||||||
unsafe {
|
unsafe {
|
||||||
b.data.header.buffer = reinterpret_cast(&b);
|
b.data.header.buffer = reinterpret_cast(&b);
|
||||||
}
|
}
|
||||||
|
move b
|
||||||
b
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
@@ -411,7 +410,7 @@ fn send<T: Send, Tbuffer: Send>(+p: SendPacketBuffered<T, Tbuffer>,
|
|||||||
let p = unsafe { &*p_ };
|
let p = unsafe { &*p_ };
|
||||||
assert ptr::addr_of(p.header) == header;
|
assert ptr::addr_of(p.header) == header;
|
||||||
assert p.payload.is_none();
|
assert p.payload.is_none();
|
||||||
p.payload <- Some(payload);
|
p.payload <- Some(move payload);
|
||||||
let old_state = swap_state_rel(&mut p.header.state, Full);
|
let old_state = swap_state_rel(&mut p.header.state, Full);
|
||||||
match old_state {
|
match old_state {
|
||||||
Empty => {
|
Empty => {
|
||||||
@@ -449,7 +448,7 @@ Fails if the sender closes the connection.
|
|||||||
|
|
||||||
*/
|
*/
|
||||||
fn recv<T: Send, Tbuffer: Send>(+p: RecvPacketBuffered<T, Tbuffer>) -> T {
|
fn recv<T: Send, Tbuffer: Send>(+p: RecvPacketBuffered<T, Tbuffer>) -> T {
|
||||||
option::unwrap_expect(try_recv(p), "connection closed")
|
option::unwrap_expect(try_recv(move p), "connection closed")
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Attempts to receive a message from a pipe.
|
/** Attempts to receive a message from a pipe.
|
||||||
@@ -713,8 +712,8 @@ fn select2<A: Send, Ab: Send, B: Send, Bb: Send>(
|
|||||||
let i = wait_many([a.header(), b.header()]/_);
|
let i = wait_many([a.header(), b.header()]/_);
|
||||||
|
|
||||||
match i {
|
match i {
|
||||||
0 => Left((try_recv(a), b)),
|
0 => Left((try_recv(move a), move b)),
|
||||||
1 => Right((a, try_recv(b))),
|
1 => Right((move a, try_recv(move b))),
|
||||||
_ => fail ~"select2 return an invalid packet"
|
_ => fail ~"select2 return an invalid packet"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -750,10 +749,10 @@ fn select<T: Send, Tb: Send>(+endpoints: ~[RecvPacketBuffered<T, Tb>])
|
|||||||
-> (uint, Option<T>, ~[RecvPacketBuffered<T, Tb>])
|
-> (uint, Option<T>, ~[RecvPacketBuffered<T, Tb>])
|
||||||
{
|
{
|
||||||
let ready = wait_many(endpoints.map(|p| p.header()));
|
let ready = wait_many(endpoints.map(|p| p.header()));
|
||||||
let mut remaining = endpoints;
|
let mut remaining <- endpoints;
|
||||||
let port = vec::swap_remove(remaining, ready);
|
let port = vec::swap_remove(remaining, ready);
|
||||||
let result = try_recv(port);
|
let result = try_recv(move port);
|
||||||
(ready, result, remaining)
|
(ready, move result, move remaining)
|
||||||
}
|
}
|
||||||
|
|
||||||
/** The sending end of a pipe. It can be used to send exactly one
|
/** The sending end of a pipe. It can be used to send exactly one
|
||||||
@@ -943,14 +942,14 @@ fn spawn_service<T: Send, Tb: Send>(
|
|||||||
|
|
||||||
// This is some nasty gymnastics required to safely move the pipe
|
// This is some nasty gymnastics required to safely move the pipe
|
||||||
// into a new task.
|
// into a new task.
|
||||||
let server = ~mut Some(server);
|
let server = ~mut Some(move server);
|
||||||
do task::spawn |move service| {
|
do task::spawn |move service, move server| {
|
||||||
let mut server_ = None;
|
let mut server_ = None;
|
||||||
server_ <-> *server;
|
server_ <-> *server;
|
||||||
service(option::unwrap(server_))
|
service(option::unwrap(server_))
|
||||||
}
|
}
|
||||||
|
|
||||||
client
|
move client
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Like `spawn_service_recv`, but for protocols that start in the
|
/** Like `spawn_service_recv`, but for protocols that start in the
|
||||||
@@ -967,14 +966,14 @@ fn spawn_service_recv<T: Send, Tb: Send>(
|
|||||||
|
|
||||||
// This is some nasty gymnastics required to safely move the pipe
|
// This is some nasty gymnastics required to safely move the pipe
|
||||||
// into a new task.
|
// into a new task.
|
||||||
let server = ~mut Some(server);
|
let server = ~mut Some(move server);
|
||||||
do task::spawn |move service| {
|
do task::spawn |move service, move server| {
|
||||||
let mut server_ = None;
|
let mut server_ = None;
|
||||||
server_ <-> *server;
|
server_ <-> *server;
|
||||||
service(option::unwrap(server_))
|
service(option::unwrap(server_))
|
||||||
}
|
}
|
||||||
|
|
||||||
client
|
move client
|
||||||
}
|
}
|
||||||
|
|
||||||
// Streams - Make pipes a little easier in general.
|
// Streams - Make pipes a little easier in general.
|
||||||
@@ -1039,7 +1038,7 @@ These allow sending or receiving an unlimited number of messages.
|
|||||||
fn stream<T:Send>() -> (Chan<T>, Port<T>) {
|
fn stream<T:Send>() -> (Chan<T>, Port<T>) {
|
||||||
let (c, s) = streamp::init();
|
let (c, s) = streamp::init();
|
||||||
|
|
||||||
(Chan_({ mut endp: Some(c) }), Port_({ mut endp: Some(s) }))
|
(Chan_({ mut endp: Some(move c) }), Port_({ mut endp: Some(move s) }))
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: Send> Chan<T>: Channel<T> {
|
impl<T: Send> Chan<T>: Channel<T> {
|
||||||
@@ -1047,15 +1046,15 @@ impl<T: Send> Chan<T>: Channel<T> {
|
|||||||
let mut endp = None;
|
let mut endp = None;
|
||||||
endp <-> self.endp;
|
endp <-> self.endp;
|
||||||
self.endp = Some(
|
self.endp = Some(
|
||||||
streamp::client::data(unwrap(endp), x))
|
streamp::client::data(unwrap(endp), move x))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn try_send(+x: T) -> bool {
|
fn try_send(+x: T) -> bool {
|
||||||
let mut endp = None;
|
let mut endp = None;
|
||||||
endp <-> self.endp;
|
endp <-> self.endp;
|
||||||
match move streamp::client::try_data(unwrap(endp), x) {
|
match move streamp::client::try_data(unwrap(endp), move x) {
|
||||||
Some(move next) => {
|
Some(move next) => {
|
||||||
self.endp = Some(next);
|
self.endp = Some(move next);
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
None => false
|
None => false
|
||||||
@@ -1068,8 +1067,8 @@ impl<T: Send> Port<T>: Recv<T> {
|
|||||||
let mut endp = None;
|
let mut endp = None;
|
||||||
endp <-> self.endp;
|
endp <-> self.endp;
|
||||||
let streamp::data(x, endp) = pipes::recv(unwrap(endp));
|
let streamp::data(x, endp) = pipes::recv(unwrap(endp));
|
||||||
self.endp = Some(endp);
|
self.endp = Some(move endp);
|
||||||
x
|
move x
|
||||||
}
|
}
|
||||||
|
|
||||||
fn try_recv() -> Option<T> {
|
fn try_recv() -> Option<T> {
|
||||||
@@ -1077,8 +1076,8 @@ impl<T: Send> Port<T>: Recv<T> {
|
|||||||
endp <-> self.endp;
|
endp <-> self.endp;
|
||||||
match move pipes::try_recv(unwrap(endp)) {
|
match move pipes::try_recv(unwrap(endp)) {
|
||||||
Some(streamp::data(move x, move endp)) => {
|
Some(streamp::data(move x, move endp)) => {
|
||||||
self.endp = Some(endp);
|
self.endp = Some(move endp);
|
||||||
Some(x)
|
Some(move x)
|
||||||
}
|
}
|
||||||
None => None
|
None => None
|
||||||
}
|
}
|
||||||
@@ -1101,13 +1100,13 @@ struct PortSet<T: Send> : Recv<T> {
|
|||||||
mut ports: ~[pipes::Port<T>],
|
mut ports: ~[pipes::Port<T>],
|
||||||
|
|
||||||
fn add(+port: pipes::Port<T>) {
|
fn add(+port: pipes::Port<T>) {
|
||||||
vec::push(self.ports, port)
|
vec::push(self.ports, move port)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn chan() -> Chan<T> {
|
fn chan() -> Chan<T> {
|
||||||
let (ch, po) = stream();
|
let (ch, po) = stream();
|
||||||
self.add(po);
|
self.add(move po);
|
||||||
ch
|
move ch
|
||||||
}
|
}
|
||||||
|
|
||||||
fn try_recv() -> Option<T> {
|
fn try_recv() -> Option<T> {
|
||||||
@@ -1120,7 +1119,7 @@ struct PortSet<T: Send> : Recv<T> {
|
|||||||
let i = wait_many(ports);
|
let i = wait_many(ports);
|
||||||
match move ports[i].try_recv() {
|
match move ports[i].try_recv() {
|
||||||
Some(move m) => {
|
Some(move m) => {
|
||||||
result = Some(m);
|
result = Some(move m);
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
// Remove this port.
|
// Remove this port.
|
||||||
@@ -1129,7 +1128,7 @@ struct PortSet<T: Send> : Recv<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
ports <-> self.ports;
|
ports <-> self.ports;
|
||||||
result
|
move result
|
||||||
}
|
}
|
||||||
|
|
||||||
fn recv() -> T {
|
fn recv() -> T {
|
||||||
@@ -1166,7 +1165,7 @@ type SharedChan<T: Send> = unsafe::Exclusive<Chan<T>>;
|
|||||||
|
|
||||||
impl<T: Send> SharedChan<T>: Channel<T> {
|
impl<T: Send> SharedChan<T>: Channel<T> {
|
||||||
fn send(+x: T) {
|
fn send(+x: T) {
|
||||||
let mut xx = Some(x);
|
let mut xx = Some(move x);
|
||||||
do self.with |chan| {
|
do self.with |chan| {
|
||||||
let mut x = None;
|
let mut x = None;
|
||||||
x <-> xx;
|
x <-> xx;
|
||||||
@@ -1175,7 +1174,7 @@ impl<T: Send> SharedChan<T>: Channel<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn try_send(+x: T) -> bool {
|
fn try_send(+x: T) -> bool {
|
||||||
let mut xx = Some(x);
|
let mut xx = Some(move x);
|
||||||
do self.with |chan| {
|
do self.with |chan| {
|
||||||
let mut x = None;
|
let mut x = None;
|
||||||
x <-> xx;
|
x <-> xx;
|
||||||
@@ -1186,7 +1185,7 @@ impl<T: Send> SharedChan<T>: Channel<T> {
|
|||||||
|
|
||||||
/// Converts a `chan` into a `shared_chan`.
|
/// Converts a `chan` into a `shared_chan`.
|
||||||
fn SharedChan<T:Send>(+c: Chan<T>) -> SharedChan<T> {
|
fn SharedChan<T:Send>(+c: Chan<T>) -> SharedChan<T> {
|
||||||
unsafe::exclusive(c)
|
unsafe::exclusive(move c)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Receive a message from one of two endpoints.
|
/// Receive a message from one of two endpoints.
|
||||||
@@ -1240,24 +1239,24 @@ fn oneshot<T: Send>() -> (ChanOne<T>, PortOne<T>) {
|
|||||||
* closed.
|
* closed.
|
||||||
*/
|
*/
|
||||||
fn recv_one<T: Send>(+port: PortOne<T>) -> T {
|
fn recv_one<T: Send>(+port: PortOne<T>) -> T {
|
||||||
let oneshot::send(message) = recv(port);
|
let oneshot::send(message) = recv(move port);
|
||||||
message
|
move message
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Receive a message from a oneshot pipe unless the connection was closed.
|
/// Receive a message from a oneshot pipe unless the connection was closed.
|
||||||
fn try_recv_one<T: Send> (+port: PortOne<T>) -> Option<T> {
|
fn try_recv_one<T: Send> (+port: PortOne<T>) -> Option<T> {
|
||||||
let message = try_recv(port);
|
let message = try_recv(move port);
|
||||||
|
|
||||||
if message.is_none() { None }
|
if message.is_none() { None }
|
||||||
else {
|
else {
|
||||||
let oneshot::send(message) = option::unwrap(message);
|
let oneshot::send(message) = option::unwrap(message);
|
||||||
Some(message)
|
Some(move message)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send a message on a oneshot pipe, failing if the connection was closed.
|
/// Send a message on a oneshot pipe, failing if the connection was closed.
|
||||||
fn send_one<T: Send>(+chan: ChanOne<T>, +data: T) {
|
fn send_one<T: Send>(+chan: ChanOne<T>, +data: T) {
|
||||||
oneshot::client::send(chan, data);
|
oneshot::client::send(move chan, move data);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -1266,13 +1265,13 @@ fn send_one<T: Send>(+chan: ChanOne<T>, +data: T) {
|
|||||||
*/
|
*/
|
||||||
fn try_send_one<T: Send>(+chan: ChanOne<T>, +data: T)
|
fn try_send_one<T: Send>(+chan: ChanOne<T>, +data: T)
|
||||||
-> bool {
|
-> bool {
|
||||||
oneshot::client::try_send(chan, data).is_some()
|
oneshot::client::try_send(move chan, move data).is_some()
|
||||||
}
|
}
|
||||||
|
|
||||||
mod rt {
|
mod rt {
|
||||||
// These are used to hide the option constructors from the
|
// These are used to hide the option constructors from the
|
||||||
// compiler because their names are changing
|
// compiler because their names are changing
|
||||||
fn make_some<T>(+val: T) -> Option<T> { Some(val) }
|
fn make_some<T>(+val: T) -> Option<T> { Some(move val) }
|
||||||
fn make_none<T>() -> Option<T> { None }
|
fn make_none<T>() -> Option<T> { None }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -64,10 +64,10 @@ impl message: gen_send {
|
|||||||
|
|
||||||
if this.proto.is_bounded() {
|
if this.proto.is_bounded() {
|
||||||
let (sp, rp) = match (this.dir, next.dir) {
|
let (sp, rp) = match (this.dir, next.dir) {
|
||||||
(send, send) => (~"c", ~"s"),
|
(send, send) => (~"move c", ~"move s"),
|
||||||
(send, recv) => (~"s", ~"c"),
|
(send, recv) => (~"s", ~"c"),
|
||||||
(recv, send) => (~"s", ~"c"),
|
(recv, send) => (~"s", ~"c"),
|
||||||
(recv, recv) => (~"c", ~"s")
|
(recv, recv) => (~"move c", ~"move s")
|
||||||
};
|
};
|
||||||
|
|
||||||
body += ~"let b = pipe.reuse_buffer();\n";
|
body += ~"let b = pipe.reuse_buffer();\n";
|
||||||
@@ -80,10 +80,10 @@ impl message: gen_send {
|
|||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
let pat = match (this.dir, next.dir) {
|
let pat = match (this.dir, next.dir) {
|
||||||
(send, send) => "(c, s)",
|
(send, send) => "(move c, move s)",
|
||||||
(send, recv) => "(s, c)",
|
(send, recv) => "(s, c)",
|
||||||
(recv, send) => "(s, c)",
|
(recv, send) => "(s, c)",
|
||||||
(recv, recv) => "(c, s)"
|
(recv, recv) => "(move c, move s)"
|
||||||
};
|
};
|
||||||
|
|
||||||
body += fmt!("let %s = pipes::entangle();\n", pat);
|
body += fmt!("let %s = pipes::entangle();\n", pat);
|
||||||
@@ -92,17 +92,17 @@ impl message: gen_send {
|
|||||||
this.proto.name,
|
this.proto.name,
|
||||||
self.name(),
|
self.name(),
|
||||||
str::connect(vec::append_one(
|
str::connect(vec::append_one(
|
||||||
arg_names.map(|x| cx.str_of(x)), ~"s"),
|
arg_names.map(|x| ~"move " + cx.str_of(x)),
|
||||||
~", "));
|
~"move s"), ~", "));
|
||||||
|
|
||||||
if !try {
|
if !try {
|
||||||
body += fmt!("pipes::send(pipe, message);\n");
|
body += fmt!("pipes::send(move pipe, move message);\n");
|
||||||
// return the new channel
|
// return the new channel
|
||||||
body += ~"c }";
|
body += ~"move c }";
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
body += fmt!("if pipes::send(pipe, message) {\n \
|
body += fmt!("if pipes::send(move pipe, move message) {\n \
|
||||||
pipes::rt::make_some(c) \
|
pipes::rt::make_some(move c) \
|
||||||
} else { pipes::rt::make_none() } }");
|
} else { pipes::rt::make_none() } }");
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -145,7 +145,8 @@ impl message: gen_send {
|
|||||||
~""
|
~""
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
~"(" + str::connect(arg_names, ~", ") + ~")"
|
~"(" + str::connect(arg_names.map(|x| ~"move " + x),
|
||||||
|
~", ") + ~")"
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut body = ~"{ ";
|
let mut body = ~"{ ";
|
||||||
@@ -155,10 +156,10 @@ impl message: gen_send {
|
|||||||
message_args);
|
message_args);
|
||||||
|
|
||||||
if !try {
|
if !try {
|
||||||
body += fmt!("pipes::send(pipe, message);\n");
|
body += fmt!("pipes::send(move pipe, move message);\n");
|
||||||
body += ~" }";
|
body += ~" }";
|
||||||
} else {
|
} else {
|
||||||
body += fmt!("if pipes::send(pipe, message) { \
|
body += fmt!("if pipes::send(move pipe, move message) { \
|
||||||
pipes::rt::make_some(()) \
|
pipes::rt::make_some(()) \
|
||||||
} else { pipes::rt::make_none() } }");
|
} else { pipes::rt::make_none() } }");
|
||||||
}
|
}
|
||||||
@@ -301,7 +302,7 @@ impl protocol: gen_init {
|
|||||||
recv => {
|
recv => {
|
||||||
#ast {{
|
#ast {{
|
||||||
let (s, c) = pipes::entangle();
|
let (s, c) = pipes::entangle();
|
||||||
(c, s)
|
(move c, move s)
|
||||||
}}
|
}}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -313,7 +314,7 @@ impl protocol: gen_init {
|
|||||||
recv => {
|
recv => {
|
||||||
#ast {{
|
#ast {{
|
||||||
let (s, c) = $(body);
|
let (s, c) = $(body);
|
||||||
(c, s)
|
(move c, move s)
|
||||||
}}
|
}}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -356,7 +357,7 @@ impl protocol: gen_init {
|
|||||||
|
|
||||||
#ast {{
|
#ast {{
|
||||||
let buffer = $(buffer);
|
let buffer = $(buffer);
|
||||||
do pipes::entangle_buffer(buffer) |buffer, data| {
|
do pipes::entangle_buffer(move buffer) |buffer, data| {
|
||||||
$(entangle_body)
|
$(entangle_body)
|
||||||
}
|
}
|
||||||
}}
|
}}
|
||||||
|
|||||||
Reference in New Issue
Block a user