std: change tcp_*_result to use result::result.. flatter!
This commit is contained in:
committed by
Brian Anderson
parent
082a95a077
commit
83cca50240
@@ -4,9 +4,12 @@ High-level interface to libuv's TCP functionality
|
|||||||
|
|
||||||
import ip = net_ip;
|
import ip = net_ip;
|
||||||
|
|
||||||
export tcp_err_data, tcp_connect_result, tcp_write_result, tcp_read_start_result;
|
export tcp_socket, tcp_err_data;
|
||||||
export connect, write;
|
export connect, write, read_start, read_stop;
|
||||||
|
|
||||||
|
#[doc="
|
||||||
|
Encapsulates an open TCP/IP connection through libuv
|
||||||
|
"]
|
||||||
resource tcp_socket(socket_data: @tcp_socket_data) unsafe {
|
resource tcp_socket(socket_data: @tcp_socket_data) unsafe {
|
||||||
let closed_po = comm::port::<()>();
|
let closed_po = comm::port::<()>();
|
||||||
let closed_ch = comm::chan(closed_po);
|
let closed_ch = comm::chan(closed_po);
|
||||||
@@ -26,42 +29,14 @@ resource tcp_socket(socket_data: @tcp_socket_data) unsafe {
|
|||||||
log(debug, "exiting dtor for tcp_socket");
|
log(debug, "exiting dtor for tcp_socket");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[doc="
|
||||||
|
Contains raw, string-based, error information returned from libuv
|
||||||
|
"]
|
||||||
type tcp_err_data = {
|
type tcp_err_data = {
|
||||||
err_name: str,
|
err_name: str,
|
||||||
err_msg: str
|
err_msg: str
|
||||||
};
|
};
|
||||||
|
|
||||||
iface to_tcp_err_iface {
|
|
||||||
fn to_tcp_err() -> tcp_err_data;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl of to_tcp_err_iface for uv::ll::uv_err_data {
|
|
||||||
fn to_tcp_err() -> tcp_err_data {
|
|
||||||
{ err_name: self.err_name, err_msg: self.err_msg }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
enum tcp_connect_result {
|
|
||||||
tcp_connected(tcp_socket),
|
|
||||||
tcp_connect_error(tcp_err_data)
|
|
||||||
}
|
|
||||||
|
|
||||||
enum tcp_write_result {
|
|
||||||
tcp_write_success,
|
|
||||||
tcp_write_error(tcp_err_data)
|
|
||||||
}
|
|
||||||
|
|
||||||
enum tcp_read_start_result {
|
|
||||||
tcp_read_start_success(comm::port<tcp_read_result>),
|
|
||||||
tcp_read_start_error(tcp_err_data)
|
|
||||||
}
|
|
||||||
|
|
||||||
enum tcp_read_result {
|
|
||||||
tcp_read_data([u8]),
|
|
||||||
tcp_read_done,
|
|
||||||
tcp_read_err(tcp_err_data)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[doc="
|
#[doc="
|
||||||
Initiate a client connection over TCP/IP
|
Initiate a client connection over TCP/IP
|
||||||
|
|
||||||
@@ -72,10 +47,12 @@ Initiate a client connection over TCP/IP
|
|||||||
|
|
||||||
# Returns
|
# Returns
|
||||||
|
|
||||||
A `tcp_connect_result` that can be used to determine the connection and,
|
A `result` that, if the operation succeeds, contains a `tcp_socket` that
|
||||||
if successful, send and receive data to/from the remote host
|
can be used to send and receive data to/from the remote host. In the event
|
||||||
|
of failure, a `tcp_err_data` will be returned
|
||||||
"]
|
"]
|
||||||
fn connect(input_ip: ip::ip_addr, port: uint) -> tcp_connect_result unsafe {
|
fn connect(input_ip: ip::ip_addr, port: uint)
|
||||||
|
-> result::result<tcp_socket, tcp_err_data> unsafe {
|
||||||
let result_po = comm::port::<conn_attempt>();
|
let result_po = comm::port::<conn_attempt>();
|
||||||
let closed_signal_po = comm::port::<()>();
|
let closed_signal_po = comm::port::<()>();
|
||||||
let conn_data = {
|
let conn_data = {
|
||||||
@@ -84,7 +61,7 @@ fn connect(input_ip: ip::ip_addr, port: uint) -> tcp_connect_result unsafe {
|
|||||||
};
|
};
|
||||||
let conn_data_ptr = ptr::addr_of(conn_data);
|
let conn_data_ptr = ptr::addr_of(conn_data);
|
||||||
let hl_loop = uv::global_loop::get();
|
let hl_loop = uv::global_loop::get();
|
||||||
let reader_po = comm::port::<tcp_read_result>();
|
let reader_po = comm::port::<result::result<[u8], tcp_err_data>>();
|
||||||
let socket_data = @{
|
let socket_data = @{
|
||||||
reader_po: reader_po,
|
reader_po: reader_po,
|
||||||
reader_ch: comm::chan(reader_po),
|
reader_ch: comm::chan(reader_po),
|
||||||
@@ -161,21 +138,32 @@ fn connect(input_ip: ip::ip_addr, port: uint) -> tcp_connect_result unsafe {
|
|||||||
alt comm::recv(result_po) {
|
alt comm::recv(result_po) {
|
||||||
conn_success {
|
conn_success {
|
||||||
log(debug, "tcp::connect - received success on result_po");
|
log(debug, "tcp::connect - received success on result_po");
|
||||||
tcp_connected(tcp_socket(socket_data))
|
result::ok(tcp_socket(socket_data))
|
||||||
}
|
}
|
||||||
conn_failure(err_data) {
|
conn_failure(err_data) {
|
||||||
comm::recv(closed_signal_po);
|
comm::recv(closed_signal_po);
|
||||||
log(debug, "tcp::connect - received failure on result_po");
|
log(debug, "tcp::connect - received failure on result_po");
|
||||||
tcp_connect_error(err_data.to_tcp_err())
|
result::err(err_data.to_tcp_err())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[doc="
|
#[doc="
|
||||||
Write binary data to a tcp stream
|
Write binary data to a tcp stream
|
||||||
|
|
||||||
|
# Arguments
|
||||||
|
|
||||||
|
* sock - a `tcp_socket` to write to
|
||||||
|
* raw_write_data - a vector of `[u8]` that will be written to the stream.
|
||||||
|
This value must remain valid for the duration of the `write` call
|
||||||
|
|
||||||
|
# Returns
|
||||||
|
|
||||||
|
A `result` object with a `()` value, in the event of success, or a
|
||||||
|
`tcp_err_data` value in the event of failure
|
||||||
"]
|
"]
|
||||||
fn write(sock: tcp_socket, raw_write_data: [[u8]]) -> tcp_write_result
|
fn write(sock: tcp_socket, raw_write_data: [[u8]])
|
||||||
unsafe {
|
-> result::result<(), tcp_err_data> unsafe {
|
||||||
let socket_data_ptr = ptr::addr_of(**sock);
|
let socket_data_ptr = ptr::addr_of(**sock);
|
||||||
let write_req_ptr = ptr::addr_of((*socket_data_ptr).write_req);
|
let write_req_ptr = ptr::addr_of((*socket_data_ptr).write_req);
|
||||||
let stream_handle_ptr =
|
let stream_handle_ptr =
|
||||||
@@ -208,12 +196,28 @@ fn write(sock: tcp_socket, raw_write_data: [[u8]]) -> tcp_write_result
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
comm::recv(result_po)
|
alt comm::recv(result_po) {
|
||||||
|
tcp_write_success { result::ok(()) }
|
||||||
|
tcp_write_error(err_data) { result::err(err_data.to_tcp_err()) }
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[doc="
|
#[doc="
|
||||||
|
Begin reading binary data from an open TCP connection.
|
||||||
|
|
||||||
|
# Arguments
|
||||||
|
|
||||||
|
* sock -- a `tcp_socket` for the connection to read from
|
||||||
|
|
||||||
|
# Returns
|
||||||
|
|
||||||
|
* A `result` instance that will either contain a
|
||||||
|
`comm::port<tcp_read_result>` that the user can read (and optionally, loop
|
||||||
|
on) from until `read_stop` is called, or a `tcp_err_data` record
|
||||||
"]
|
"]
|
||||||
fn read_start(sock: tcp_socket) -> tcp_read_start_result unsafe {
|
fn read_start(sock: tcp_socket)
|
||||||
|
-> result::result<comm::port<
|
||||||
|
result::result<[u8], tcp_err_data>>, tcp_err_data> unsafe {
|
||||||
let stream_handle_ptr = ptr::addr_of((**sock).stream_handle);
|
let stream_handle_ptr = ptr::addr_of((**sock).stream_handle);
|
||||||
let start_po = comm::port::<option<uv::ll::uv_err_data>>();
|
let start_po = comm::port::<option<uv::ll::uv_err_data>>();
|
||||||
let start_ch = comm::chan(start_po);
|
let start_ch = comm::chan(start_po);
|
||||||
@@ -235,15 +239,19 @@ fn read_start(sock: tcp_socket) -> tcp_read_start_result unsafe {
|
|||||||
};
|
};
|
||||||
alt comm::recv(start_po) {
|
alt comm::recv(start_po) {
|
||||||
some(err_data) {
|
some(err_data) {
|
||||||
tcp_read_start_error(err_data.to_tcp_err())
|
result::err(err_data.to_tcp_err())
|
||||||
}
|
}
|
||||||
none {
|
none {
|
||||||
tcp_read_start_success((**sock).reader_po)
|
result::ok((**sock).reader_po)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn read_stop(sock: tcp_socket) -> option<uv::ll::uv_err_data> unsafe {
|
#[doc="
|
||||||
|
Stop reading from an open TCP connection.
|
||||||
|
"]
|
||||||
|
fn read_stop(sock: tcp_socket) ->
|
||||||
|
result::result<(), tcp_err_data> unsafe {
|
||||||
let stream_handle_ptr = ptr::addr_of((**sock).stream_handle);
|
let stream_handle_ptr = ptr::addr_of((**sock).stream_handle);
|
||||||
let stop_po = comm::port::<option<tcp_err_data>>();
|
let stop_po = comm::port::<option<tcp_err_data>>();
|
||||||
let stop_ch = comm::chan(stop_po);
|
let stop_ch = comm::chan(stop_po);
|
||||||
@@ -261,10 +269,49 @@ fn read_stop(sock: tcp_socket) -> option<uv::ll::uv_err_data> unsafe {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
comm::recv(stop_po)
|
alt comm::recv(stop_po) {
|
||||||
|
some(err_data) {
|
||||||
|
result::err(err_data.to_tcp_err())
|
||||||
|
}
|
||||||
|
none {
|
||||||
|
result::ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// INTERNAL API
|
// INTERNAL API
|
||||||
|
|
||||||
|
enum tcp_connect_result {
|
||||||
|
tcp_connected(tcp_socket),
|
||||||
|
tcp_connect_error(tcp_err_data)
|
||||||
|
}
|
||||||
|
|
||||||
|
enum tcp_write_result {
|
||||||
|
tcp_write_success,
|
||||||
|
tcp_write_error(tcp_err_data)
|
||||||
|
}
|
||||||
|
|
||||||
|
enum tcp_read_start_result {
|
||||||
|
tcp_read_start_success(comm::port<tcp_read_result>),
|
||||||
|
tcp_read_start_error(tcp_err_data)
|
||||||
|
}
|
||||||
|
|
||||||
|
enum tcp_read_result {
|
||||||
|
tcp_read_data([u8]),
|
||||||
|
tcp_read_done,
|
||||||
|
tcp_read_err(tcp_err_data)
|
||||||
|
}
|
||||||
|
|
||||||
|
iface to_tcp_err_iface {
|
||||||
|
fn to_tcp_err() -> tcp_err_data;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl of to_tcp_err_iface for uv::ll::uv_err_data {
|
||||||
|
fn to_tcp_err() -> tcp_err_data {
|
||||||
|
{ err_name: self.err_name, err_msg: self.err_msg }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
crust fn on_tcp_read_cb(stream: *uv::ll::uv_stream_t,
|
crust fn on_tcp_read_cb(stream: *uv::ll::uv_stream_t,
|
||||||
nread: libc::ssize_t,
|
nread: libc::ssize_t,
|
||||||
++buf: uv::ll::uv_buf_t) unsafe {
|
++buf: uv::ll::uv_buf_t) unsafe {
|
||||||
@@ -276,7 +323,7 @@ crust fn on_tcp_read_cb(stream: *uv::ll::uv_stream_t,
|
|||||||
// incoming err.. probably eof
|
// incoming err.. probably eof
|
||||||
-1 {
|
-1 {
|
||||||
let err_data = uv::ll::get_last_err_data(loop_ptr);
|
let err_data = uv::ll::get_last_err_data(loop_ptr);
|
||||||
comm::send(reader_ch, tcp_read_err(err_data));
|
comm::send(reader_ch, result::err(err_data.to_tcp_err()));
|
||||||
}
|
}
|
||||||
// do nothing .. unneeded buf
|
// do nothing .. unneeded buf
|
||||||
0 {}
|
0 {}
|
||||||
@@ -287,7 +334,7 @@ crust fn on_tcp_read_cb(stream: *uv::ll::uv_stream_t,
|
|||||||
let buf_base = uv::ll::get_base_from_buf(buf);
|
let buf_base = uv::ll::get_base_from_buf(buf);
|
||||||
let buf_len = uv::ll::get_len_from_buf(buf);
|
let buf_len = uv::ll::get_len_from_buf(buf);
|
||||||
let new_bytes = vec::unsafe::from_buf(buf_base, buf_len);
|
let new_bytes = vec::unsafe::from_buf(buf_base, buf_len);
|
||||||
comm::send(reader_ch, tcp_read_data(new_bytes));
|
comm::send(reader_ch, result::ok(new_bytes));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
uv::ll::free_base_of_buf(buf);
|
uv::ll::free_base_of_buf(buf);
|
||||||
@@ -391,8 +438,8 @@ enum conn_attempt {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type tcp_socket_data = {
|
type tcp_socket_data = {
|
||||||
reader_po: comm::port<tcp_read_result>,
|
reader_po: comm::port<result::result<[u8], tcp_err_data>>,
|
||||||
reader_ch: comm::chan<tcp_read_result>,
|
reader_ch: comm::chan<result::result<[u8], tcp_err_data>>,
|
||||||
stream_handle: uv::ll::uv_tcp_t,
|
stream_handle: uv::ll::uv_tcp_t,
|
||||||
connect_req: uv::ll::uv_connect_t,
|
connect_req: uv::ll::uv_connect_t,
|
||||||
write_req: uv::ll::uv_write_t,
|
write_req: uv::ll::uv_write_t,
|
||||||
@@ -405,10 +452,14 @@ fn ipv4_ip_addr_to_sockaddr_in(input: ip::ip_addr,
|
|||||||
uv::ll::ip4_addr(ip::format_addr(input), port as int)
|
uv::ll::ip4_addr(ip::format_addr(input), port as int)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
//#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
#[test]
|
#[test]
|
||||||
fn test_gl_tcp_ipv4_request() {
|
fn test_gl_tcp_ipv4_request() {
|
||||||
|
impl_gl_tcp_ipv4_request();
|
||||||
|
}
|
||||||
|
fn impl_gl_tcp_ipv4_request() {
|
||||||
|
// pre-connection/input data
|
||||||
let ip_str = "173.194.79.99";
|
let ip_str = "173.194.79.99";
|
||||||
let port = 80u;
|
let port = 80u;
|
||||||
let expected_read_msg = "foo";
|
let expected_read_msg = "foo";
|
||||||
@@ -418,79 +469,76 @@ mod test {
|
|||||||
let data_po = comm::port::<[u8]>();
|
let data_po = comm::port::<[u8]>();
|
||||||
let data_ch = comm::chan(data_po);
|
let data_ch = comm::chan(data_po);
|
||||||
|
|
||||||
alt connect(host_ip, port) {
|
// connect to remote host
|
||||||
tcp_connected(sock) {
|
let connect_result = connect(host_ip, port);
|
||||||
log(debug, "successful tcp connect");
|
if result::is_failure(connect_result) {
|
||||||
let mut write_data: [[u8]] = [];
|
let err_data = result::get_err(connect_result);
|
||||||
let write_data = [str::as_bytes(actual_write_msg) {|str_bytes|
|
log(debug, "tcp_connect_error received..");
|
||||||
str_bytes
|
log(debug, #fmt("tcp connect error: %? %?", err_data.err_name,
|
||||||
}];
|
|
||||||
alt write(sock, write_data) {
|
|
||||||
tcp_write_success {
|
|
||||||
log(debug, "tcp::write successful");
|
|
||||||
let mut total_read_data: [u8] = [];
|
|
||||||
alt read_start(sock) {
|
|
||||||
tcp_read_start_success(reader_po) {
|
|
||||||
loop {
|
|
||||||
alt comm::recv(reader_po) {
|
|
||||||
tcp_read_data(new_data) {
|
|
||||||
total_read_data += new_data;
|
|
||||||
// theoretically, we could keep iterating, if
|
|
||||||
// we expect the server on the other end to keep
|
|
||||||
// streaming/chunking data to us, but..
|
|
||||||
alt read_stop(sock) {
|
|
||||||
some(err_data) {
|
|
||||||
log(debug, "error while calling read_stop");
|
|
||||||
log(debug, #fmt("read_stop error: %? %?",
|
|
||||||
err_data.err_name,
|
|
||||||
err_data.err_msg));
|
err_data.err_msg));
|
||||||
assert false;
|
assert false;
|
||||||
}
|
}
|
||||||
none {
|
|
||||||
// exiting the read loop
|
// this is our tcp_socket resource instance. It's dtor will
|
||||||
break;
|
// clean-up/close the underlying TCP stream when the fn scope
|
||||||
|
// ends
|
||||||
|
let sock = result::unwrap(connect_result);
|
||||||
|
log(debug, "successful tcp connect");
|
||||||
|
|
||||||
|
// set up write data
|
||||||
|
let write_data = [str::as_bytes(actual_write_msg) {|str_bytes|
|
||||||
|
str_bytes
|
||||||
|
}];
|
||||||
|
|
||||||
|
// write data to tcp socket
|
||||||
|
let write_result = write(sock, write_data);
|
||||||
|
if result::is_failure(write_result) {
|
||||||
|
let err_data = result::get_err(write_result);
|
||||||
|
log(debug, "tcp_write_error received..");
|
||||||
|
log(debug, #fmt("tcp write error: %? %?", err_data.err_name,
|
||||||
|
err_data.err_msg));
|
||||||
|
assert false;
|
||||||
}
|
}
|
||||||
|
log(debug, "tcp::write successful");
|
||||||
|
|
||||||
|
// set up read data
|
||||||
|
let mut total_read_data: [u8] = [];
|
||||||
|
let read_start_result = read_start(sock);
|
||||||
|
if result::is_failure(read_start_result) {
|
||||||
|
let err_data = result::get_err(read_start_result);
|
||||||
|
log(debug, "tcp read_start err received..");
|
||||||
|
log(debug, #fmt("read_start error: %? %?", err_data.err_name,
|
||||||
|
err_data.err_msg));
|
||||||
|
assert false;
|
||||||
}
|
}
|
||||||
}
|
let reader_po = result::get(read_start_result);
|
||||||
tcp_read_done {
|
loop {
|
||||||
break;
|
let read_data_result = comm::recv(reader_po);
|
||||||
}
|
if result::is_failure(read_data_result) {
|
||||||
tcp_read_err(err_data) {
|
let err_data = result::get_err(read_data_result);
|
||||||
log(debug, "read error data recv'd");
|
log(debug, "read error data recv'd");
|
||||||
log(debug, #fmt("read error: %? %?",
|
log(debug, #fmt("read error: %? %?",
|
||||||
err_data.err_name,
|
err_data.err_name,
|
||||||
err_data.err_msg));
|
err_data.err_msg));
|
||||||
assert false;
|
assert false;
|
||||||
}
|
}
|
||||||
}
|
let new_data = result::unwrap(read_data_result);
|
||||||
}
|
total_read_data += new_data;
|
||||||
comm::send(data_ch, total_read_data);
|
// theoretically, we could keep iterating, if
|
||||||
}
|
// we expect the server on the other end to keep
|
||||||
tcp_read_start_error(err_data) {
|
// streaming/chunking data to us, but..
|
||||||
log(debug, "tcp_read_start_error received..");
|
let read_stop_result = read_stop(sock);
|
||||||
log(debug, #fmt("tcp read_start error: %? %?",
|
if result::is_failure(read_stop_result) {
|
||||||
|
let err_data = result::get_err(read_stop_result);
|
||||||
|
log(debug, "error while calling read_stop");
|
||||||
|
log(debug, #fmt("read_stop error: %? %?",
|
||||||
err_data.err_name,
|
err_data.err_name,
|
||||||
err_data.err_msg));
|
err_data.err_msg));
|
||||||
assert false;
|
assert false;
|
||||||
}
|
}
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
comm::send(data_ch, total_read_data);
|
||||||
tcp_write_error(err_data) {
|
|
||||||
log(debug, "tcp_write_error received..");
|
|
||||||
log(debug, #fmt("tcp write error: %? %?", err_data.err_name,
|
|
||||||
err_data.err_msg));
|
|
||||||
assert false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
tcp_connect_error(err_data) {
|
|
||||||
log(debug, "tcp_connect_error received..");
|
|
||||||
log(debug, #fmt("tcp connect error: %? %?", err_data.err_name,
|
|
||||||
err_data.err_msg));
|
|
||||||
assert false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let actual_data = comm::recv(data_po);
|
let actual_data = comm::recv(data_po);
|
||||||
let resp = str::from_bytes(actual_data);
|
let resp = str::from_bytes(actual_data);
|
||||||
log(debug, "DATA RECEIVED: "+resp);
|
log(debug, "DATA RECEIVED: "+resp);
|
||||||
|
|||||||
Reference in New Issue
Block a user