std: mod cleanup, impl/test for conn. refused err + mem leak fix

This commit is contained in:
Jeff Olson
2012-05-27 22:50:11 -07:00
committed by Brian Anderson
parent 900e446015
commit 5d86686e7f

View File

@@ -12,8 +12,10 @@ import result::*;
import libc::size_t; import libc::size_t;
import str::extensions; import str::extensions;
// data // tcp interfaces
export tcp_socket, tcp_conn_port, tcp_err_data; export tcp_socket, tcp_conn_port;
// errors
export tcp_err_data, tcp_connect_err_data;
// operations on a tcp_socket // operations on a tcp_socket
export write, write_future, read_start, read_stop; export write, write_future, read_start, read_stop;
// tcp server stuff // tcp server stuff
@@ -22,6 +24,7 @@ export new_listener, conn_recv, conn_recv_spawn, conn_peek;
// tcp client stuff // tcp client stuff
export connect; export connect;
// helper methods // helper methods
import methods = net_tcp_methods;
export methods; export methods;
#[nolink] #[nolink]
@@ -43,25 +46,7 @@ class tcp_socket {
new(socket_data: @tcp_socket_data) { self.socket_data = socket_data; } new(socket_data: @tcp_socket_data) { self.socket_data = socket_data; }
drop { drop {
unsafe { unsafe {
let closed_po = comm::port::<()>(); tear_down_socket_data(socket_data)
let closed_ch = comm::chan(closed_po);
let close_data = {
closed_ch: closed_ch
};
let close_data_ptr = ptr::addr_of(close_data);
let stream_handle_ptr = (*(self.socket_data)).stream_handle_ptr;
iotask::interact((*(self.socket_data)).iotask) {|loop_ptr|
log(debug, #fmt("interact dtor for tcp_socket stream %? loop %?",
stream_handle_ptr, loop_ptr));
uv::ll::set_data_for_uv_handle(stream_handle_ptr,
close_data_ptr);
uv::ll::close(stream_handle_ptr, tcp_socket_dtor_close_cb);
};
comm::recv(closed_po);
log(debug, #fmt("about to free socket_data at %?", self.socket_data));
rustrt::rust_uv_current_kernel_free(stream_handle_ptr
as *libc::c_void);
log(debug, "exiting dtor for tcp_socket");
} }
} }
} }
@@ -90,6 +75,17 @@ type tcp_err_data = {
err_name: str, err_name: str,
err_msg: str err_msg: str
}; };
enum tcp_connect_err_data {
#[doc="
Some unplanned-for error. The first and second fields correspond
to libuv's `err_name` and `err_msg` fields, respectively.
"]
generic_connect_err(str, str),
#[doc="
Invalid IP or invalid port
"]
connection_refused
}
#[doc=" #[doc="
Initiate a client connection over TCP/IP Initiate a client connection over TCP/IP
@@ -108,7 +104,7 @@ of failure, a `tcp_err_data` will be returned
"] "]
fn connect(input_ip: ip::ip_addr, port: uint, fn connect(input_ip: ip::ip_addr, port: uint,
iotask: iotask) iotask: iotask)
-> result::result<tcp_socket, tcp_err_data> unsafe { -> result::result<tcp_socket, tcp_connect_err_data> unsafe {
let result_po = comm::port::<conn_attempt>(); let result_po = comm::port::<conn_attempt>();
let closed_signal_po = comm::port::<()>(); let closed_signal_po = comm::port::<()>();
let conn_data = { let conn_data = {
@@ -198,7 +194,14 @@ fn connect(input_ip: ip::ip_addr, port: uint,
conn_failure(err_data) { conn_failure(err_data) {
comm::recv(closed_signal_po); comm::recv(closed_signal_po);
log(debug, "tcp::connect - received failure on result_po"); log(debug, "tcp::connect - received failure on result_po");
result::err(err_data.to_tcp_err()) // still have to free the malloc'd stream handle..
rustrt::rust_uv_current_kernel_free(stream_handle_ptr
as *libc::c_void);
let tcp_conn_err = alt err_data.err_name {
"ECONNREFUSED" { connection_refused }
_ { generic_connect_err(err_data.err_name, err_data.err_msg) }
};
result::err(tcp_conn_err)
} }
} }
} }
@@ -777,7 +780,8 @@ fn listen_for_conn(host_ip: ip::ip_addr, port: uint, backlog: uint,
} }
} }
} }
mod methods {
mod net_tcp_methods {
#[doc=" #[doc="
Convenience methods extending `net::tcp::tcp_conn_port` Convenience methods extending `net::tcp::tcp_conn_port`
"] "]
@@ -821,6 +825,28 @@ mod methods {
} }
// INTERNAL API // INTERNAL API
fn tear_down_socket_data(socket_data: @tcp_socket_data) unsafe {
let closed_po = comm::port::<()>();
let closed_ch = comm::chan(closed_po);
let close_data = {
closed_ch: closed_ch
};
let close_data_ptr = ptr::addr_of(close_data);
let stream_handle_ptr = (*socket_data).stream_handle_ptr;
iotask::interact((*socket_data).iotask) {|loop_ptr|
log(debug, #fmt("interact dtor for tcp_socket stream %? loop %?",
stream_handle_ptr, loop_ptr));
uv::ll::set_data_for_uv_handle(stream_handle_ptr,
close_data_ptr);
uv::ll::close(stream_handle_ptr, tcp_socket_dtor_close_cb);
};
comm::recv(closed_po);
log(debug, #fmt("about to free socket_data at %?", socket_data));
rustrt::rust_uv_current_kernel_free(stream_handle_ptr
as *libc::c_void);
log(debug, "exiting dtor for tcp_socket");
}
// shared implementation for tcp::read // shared implementation for tcp::read
fn read_common_impl(socket_data: *tcp_socket_data, timeout_msecs: uint) fn read_common_impl(socket_data: *tcp_socket_data, timeout_msecs: uint)
-> result::result<[u8]/~,tcp_err_data> unsafe { -> result::result<[u8]/~,tcp_err_data> unsafe {
@@ -1308,6 +1334,10 @@ mod test {
fn test_gl_tcp_server_listener_and_client_ipv4() unsafe { fn test_gl_tcp_server_listener_and_client_ipv4() unsafe {
impl_gl_tcp_ipv4_server_listener_and_client(); impl_gl_tcp_ipv4_server_listener_and_client();
} }
#[test]
fn test_gl_tcp_ipv4_client_error_connection_refused() unsafe {
impl_gl_tcp_ipv4_client_error_connection_refused();
}
} }
#[cfg(target_arch="x86")] #[cfg(target_arch="x86")]
mod impl32 { mod impl32 {
@@ -1321,6 +1351,11 @@ mod test {
fn test_gl_tcp_server_listener_and_client_ipv4() unsafe { fn test_gl_tcp_server_listener_and_client_ipv4() unsafe {
impl_gl_tcp_ipv4_server_listener_and_client(); impl_gl_tcp_ipv4_server_listener_and_client();
} }
#[test]
#[ignore(cfg(target_os = "linux"))]
fn test_gl_tcp_ipv4_client_error_connection_refused() unsafe {
impl_gl_tcp_ipv4_client_error_connection_refused();
}
} }
} }
fn impl_gl_tcp_ipv4_server_and_client() { fn impl_gl_tcp_ipv4_server_and_client() {
@@ -1351,7 +1386,7 @@ mod test {
comm::recv(cont_po); comm::recv(cont_po);
// client // client
log(debug, "server started, firing up client.."); log(debug, "server started, firing up client..");
let actual_resp = comm::listen {|client_ch| let actual_resp_result = comm::listen {|client_ch|
run_tcp_test_client( run_tcp_test_client(
server_ip, server_ip,
server_port, server_port,
@@ -1359,6 +1394,8 @@ mod test {
client_ch, client_ch,
hl_loop) hl_loop)
}; };
assert actual_resp_result.is_success();
let actual_resp = actual_resp_result.get();
let actual_req = comm::recv(server_result_po); let actual_req = comm::recv(server_result_po);
log(debug, #fmt("REQ: expected: '%s' actual: '%s'", log(debug, #fmt("REQ: expected: '%s' actual: '%s'",
expected_req, actual_req)); expected_req, actual_req));
@@ -1395,7 +1432,7 @@ mod test {
comm::recv(cont_po); comm::recv(cont_po);
// client // client
log(debug, "server started, firing up client.."); log(debug, "server started, firing up client..");
let actual_resp = comm::listen {|client_ch| let actual_resp_result = comm::listen {|client_ch|
run_tcp_test_client( run_tcp_test_client(
server_ip, server_ip,
server_port, server_port,
@@ -1403,6 +1440,8 @@ mod test {
client_ch, client_ch,
hl_loop) hl_loop)
}; };
assert actual_resp_result.is_success();
let actual_resp = actual_resp_result.get();
let actual_req = comm::recv(server_result_po); let actual_req = comm::recv(server_result_po);
log(debug, #fmt("REQ: expected: '%s' actual: '%s'", log(debug, #fmt("REQ: expected: '%s' actual: '%s'",
expected_req, actual_req)); expected_req, actual_req));
@@ -1411,6 +1450,29 @@ mod test {
assert str::contains(actual_req, expected_req); assert str::contains(actual_req, expected_req);
assert str::contains(actual_resp, expected_resp); assert str::contains(actual_resp, expected_resp);
} }
fn impl_gl_tcp_ipv4_client_error_connection_refused() {
let hl_loop = uv::global_loop::get();
let server_ip = "127.0.0.1";
let server_port = 8890u;
let expected_req = "ping";
// client
log(debug, "firing up client..");
let actual_resp_result = comm::listen {|client_ch|
run_tcp_test_client(
server_ip,
server_port,
expected_req,
client_ch,
hl_loop)
};
alt actual_resp_result.get_err() {
connection_refused {
}
_ {
fail "unknown error.. expected connection_refused"
}
}
}
fn run_tcp_test_server(server_ip: str, server_port: uint, resp: str, fn run_tcp_test_server(server_ip: str, server_port: uint, resp: str,
server_ch: comm::chan<str>, server_ch: comm::chan<str>,
@@ -1453,7 +1515,7 @@ mod test {
let sock = result::unwrap(accept_result); let sock = result::unwrap(accept_result);
log(debug, "SERVER: successfully accepted"+ log(debug, "SERVER: successfully accepted"+
"connection!"); "connection!");
let received_req_bytes = sock.read(0u); let received_req_bytes = read(sock, 0u);
alt received_req_bytes { alt received_req_bytes {
result::ok(data) { result::ok(data) {
server_ch.send( server_ch.send(
@@ -1522,7 +1584,7 @@ mod test {
log(debug, "SERVER: successfully accepted"+ log(debug, "SERVER: successfully accepted"+
"connection!"); "connection!");
let received_req_bytes = let received_req_bytes =
sock.read(0u); read(sock, 0u);
alt received_req_bytes { alt received_req_bytes {
result::ok(data) { result::ok(data) {
server_ch.send( server_ch.send(
@@ -1543,8 +1605,8 @@ mod test {
fn run_tcp_test_client(server_ip: str, server_port: uint, resp: str, fn run_tcp_test_client(server_ip: str, server_port: uint, resp: str,
client_ch: comm::chan<str>, client_ch: comm::chan<str>,
iotask: iotask) -> str { iotask: iotask) -> result::result<str,
tcp_connect_err_data> {
let server_ip_addr = ip::v4::parse_addr(server_ip); let server_ip_addr = ip::v4::parse_addr(server_ip);
log(debug, "CLIENT: starting.."); log(debug, "CLIENT: starting..");
@@ -1552,9 +1614,7 @@ mod test {
if result::is_err(connect_result) { if result::is_err(connect_result) {
log(debug, "CLIENT: failed to connect"); log(debug, "CLIENT: failed to connect");
let err_data = result::get_err(connect_result); let err_data = result::get_err(connect_result);
log(debug, #fmt("CLIENT: connect err name: %s msg: %s", err(err_data)
err_data.err_name, err_data.err_msg));
""
} }
else { else {
let sock = result::unwrap(connect_result); let sock = result::unwrap(connect_result);
@@ -1563,20 +1623,25 @@ mod test {
let read_result = sock.read(0u); let read_result = sock.read(0u);
if read_result.is_err() { if read_result.is_err() {
log(debug, "CLIENT: failure to read"); log(debug, "CLIENT: failure to read");
"" ok("")
} }
else { else {
client_ch.send(str::from_bytes(read_result.get())); client_ch.send(str::from_bytes(read_result.get()));
let ret_val = client_ch.recv(); let ret_val = client_ch.recv();
log(debug, #fmt("CLIENT: after client_ch recv ret: '%s'", log(debug, #fmt("CLIENT: after client_ch recv ret: '%s'",
ret_val)); ret_val));
ret_val ok(ret_val)
} }
} }
} }
<<<<<<< HEAD
fn tcp_write_single(sock: tcp_socket, val: [u8]/~) { fn tcp_write_single(sock: tcp_socket, val: [u8]/~) {
let write_result_future = sock.write_future(val); let write_result_future = sock.write_future(val);
=======
fn tcp_write_single(sock: tcp_socket, val: [u8]) {
let write_result_future = write_future(sock, val);
>>>>>>> std: mod cleanup, impl/test for conn. refused err + mem leak fix
let write_result = write_result_future.get(); let write_result = write_result_future.get();
if result::is_err(write_result) { if result::is_err(write_result) {
log(debug, "tcp_write_single: write failed!"); log(debug, "tcp_write_single: write failed!");