Merge remote-tracking branch 'brson/io' into io-upstream

Conflicts:
	src/rt/rust_builtin.cpp
	src/rt/rustrt.def.in
This commit is contained in:
Brian Anderson
2013-06-20 12:17:00 -07:00
30 changed files with 3148 additions and 837 deletions

View File

@@ -10,18 +10,16 @@
#[macro_escape];
macro_rules! rterrln (
($( $arg:expr),+) => ( {
::rt::util::dumb_println(fmt!( $($arg),+ ));
} )
)
// Some basic logging
macro_rules! rtdebug_ (
($( $arg:expr),+) => ( {
dumb_println(fmt!( $($arg),+ ));
fn dumb_println(s: &str) {
use io::WriterUtil;
let dbg = ::libc::STDERR_FILENO as ::io::fd_t;
dbg.write_str(s);
dbg.write_str("\n");
}
rterrln!( $($arg),+ )
} )
)
@@ -33,21 +31,15 @@ macro_rules! rtdebug (
macro_rules! rtassert (
( $arg:expr ) => ( {
if !$arg {
abort!("assertion failed: %s", stringify!($arg));
rtabort!("assertion failed: %s", stringify!($arg));
}
} )
)
macro_rules! abort(
macro_rules! rtabort(
($( $msg:expr),+) => ( {
rtdebug!($($msg),+);
do_abort();
// NB: This is in a fn to avoid putting the `unsafe` block in a macro,
// which causes spurious 'unnecessary unsafe block' warnings.
fn do_abort() -> ! {
unsafe { ::libc::abort(); }
}
::rt::util::abort(fmt!($($msg),+));
} )
)

View File

@@ -22,10 +22,12 @@ use ops::Drop;
use kinds::Owned;
use rt::sched::{Scheduler, Coroutine};
use rt::local::Local;
use unstable::intrinsics::{atomic_xchg, atomic_load};
use unstable::atomics::{AtomicUint, AtomicOption, SeqCst};
use unstable::sync::UnsafeAtomicRcBox;
use util::Void;
use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable};
use cell::Cell;
use clone::Clone;
/// A combined refcount / ~Task pointer.
///
@@ -34,14 +36,14 @@ use cell::Cell;
/// * 2 - both endpoints are alive
/// * 1 - either the sender or the receiver is dead, determined by context
/// * <ptr> - A pointer to a blocked Task that can be transmuted to ~Task
type State = int;
type State = uint;
static STATE_BOTH: State = 2;
static STATE_ONE: State = 1;
/// The heap-allocated structure shared between two endpoints.
struct Packet<T> {
state: State,
state: AtomicUint,
payload: Option<T>,
}
@@ -70,7 +72,7 @@ pub struct PortOneHack<T> {
pub fn oneshot<T: Owned>() -> (PortOne<T>, ChanOne<T>) {
let packet: ~Packet<T> = ~Packet {
state: STATE_BOTH,
state: AtomicUint::new(STATE_BOTH),
payload: None
};
@@ -114,12 +116,20 @@ impl<T> ChanOne<T> {
// reordering of the payload write. This also issues an
// acquire barrier that keeps the subsequent access of the
// ~Task pointer from being reordered.
let oldstate = atomic_xchg(&mut (*packet).state, STATE_ONE);
let oldstate = (*packet).state.swap(STATE_ONE, SeqCst);
match oldstate {
STATE_BOTH => {
// Port is not waiting yet. Nothing to do
do Local::borrow::<Scheduler, ()> |sched| {
rtdebug!("non-rendezvous send");
sched.metrics.non_rendezvous_sends += 1;
}
}
STATE_ONE => {
do Local::borrow::<Scheduler, ()> |sched| {
rtdebug!("rendezvous send");
sched.metrics.rendezvous_sends += 1;
}
// Port has closed. Need to clean up.
let _packet: ~Packet<T> = cast::transmute(this.inner.void_packet);
recvr_active = false;
@@ -127,7 +137,9 @@ impl<T> ChanOne<T> {
task_as_state => {
// Port is blocked. Wake it up.
let recvr: ~Coroutine = cast::transmute(task_as_state);
let sched = Local::take::<Scheduler>();
let mut sched = Local::take::<Scheduler>();
rtdebug!("rendezvous send");
sched.metrics.rendezvous_sends += 1;
sched.schedule_task(recvr);
}
}
@@ -158,23 +170,30 @@ impl<T> PortOne<T> {
// Switch to the scheduler to put the ~Task into the Packet state.
let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then |task| {
do sched.deschedule_running_task_and_then |sched, task| {
unsafe {
// Atomically swap the task pointer into the Packet state, issuing
// an acquire barrier to prevent reordering of the subsequent read
// of the payload. Also issues a release barrier to prevent reordering
// of any previous writes to the task structure.
let task_as_state: State = cast::transmute(task);
let oldstate = atomic_xchg(&mut (*packet).state, task_as_state);
let oldstate = (*packet).state.swap(task_as_state, SeqCst);
match oldstate {
STATE_BOTH => {
// Data has not been sent. Now we're blocked.
rtdebug!("non-rendezvous recv");
sched.metrics.non_rendezvous_recvs += 1;
}
STATE_ONE => {
rtdebug!("rendezvous recv");
sched.metrics.rendezvous_recvs += 1;
// Channel is closed. Switch back and check the data.
// NB: We have to drop back into the scheduler event loop here
// instead of switching immediately back or we could end up
// triggering infinite recursion on the scheduler's stack.
let task: ~Coroutine = cast::transmute(task_as_state);
let sched = Local::take::<Scheduler>();
sched.resume_task_immediately(task);
sched.enqueue_task(task);
}
_ => util::unreachable()
}
@@ -210,7 +229,7 @@ impl<T> Peekable<T> for PortOne<T> {
fn peek(&self) -> bool {
unsafe {
let packet: *mut Packet<T> = self.inner.packet();
let oldstate = atomic_load(&mut (*packet).state);
let oldstate = (*packet).state.load(SeqCst);
match oldstate {
STATE_BOTH => false,
STATE_ONE => (*packet).payload.is_some(),
@@ -227,7 +246,7 @@ impl<T> Drop for ChanOneHack<T> {
unsafe {
let this = cast::transmute_mut(self);
let oldstate = atomic_xchg(&mut (*this.packet()).state, STATE_ONE);
let oldstate = (*this.packet()).state.swap(STATE_ONE, SeqCst);
match oldstate {
STATE_BOTH => {
// Port still active. It will destroy the Packet.
@@ -254,7 +273,7 @@ impl<T> Drop for PortOneHack<T> {
unsafe {
let this = cast::transmute_mut(self);
let oldstate = atomic_xchg(&mut (*this.packet()).state, STATE_ONE);
let oldstate = (*this.packet()).state.swap(STATE_ONE, SeqCst);
match oldstate {
STATE_BOTH => {
// Chan still active. It will destroy the packet.
@@ -295,16 +314,19 @@ struct StreamPayload<T> {
next: PortOne<StreamPayload<T>>
}
type StreamChanOne<T> = ChanOne<StreamPayload<T>>;
type StreamPortOne<T> = PortOne<StreamPayload<T>>;
/// A channel with unbounded size.
pub struct Chan<T> {
// FIXME #5372. Using Cell because we don't take &mut self
next: Cell<ChanOne<StreamPayload<T>>>
next: Cell<StreamChanOne<T>>
}
/// An port with unbounded size.
pub struct Port<T> {
// FIXME #5372. Using Cell because we don't take &mut self
next: Cell<PortOne<StreamPayload<T>>>
next: Cell<StreamPortOne<T>>
}
pub fn stream<T: Owned>() -> (Port<T>, Chan<T>) {
@@ -357,6 +379,148 @@ impl<T> Peekable<T> for Port<T> {
}
}
pub struct SharedChan<T> {
// Just like Chan, but a shared AtomicOption instead of Cell
priv next: UnsafeAtomicRcBox<AtomicOption<StreamChanOne<T>>>
}
impl<T> SharedChan<T> {
pub fn new(chan: Chan<T>) -> SharedChan<T> {
let next = chan.next.take();
let next = AtomicOption::new(~next);
SharedChan { next: UnsafeAtomicRcBox::new(next) }
}
}
impl<T: Owned> GenericChan<T> for SharedChan<T> {
fn send(&self, val: T) {
self.try_send(val);
}
}
impl<T: Owned> GenericSmartChan<T> for SharedChan<T> {
#[cfg(stage0)] // odd type checking errors
fn try_send(&self, _val: T) -> bool {
fail!()
}
#[cfg(not(stage0))]
fn try_send(&self, val: T) -> bool {
unsafe {
let (next_pone, next_cone) = oneshot();
let cone = (*self.next.get()).swap(~next_cone, SeqCst);
cone.unwrap().try_send(StreamPayload { val: val, next: next_pone })
}
}
}
impl<T> Clone for SharedChan<T> {
fn clone(&self) -> SharedChan<T> {
SharedChan {
next: self.next.clone()
}
}
}
pub struct SharedPort<T> {
// The next port on which we will receive the next port on which we will receive T
priv next_link: UnsafeAtomicRcBox<AtomicOption<PortOne<StreamPortOne<T>>>>
}
impl<T> SharedPort<T> {
pub fn new(port: Port<T>) -> SharedPort<T> {
// Put the data port into a new link pipe
let next_data_port = port.next.take();
let (next_link_port, next_link_chan) = oneshot();
next_link_chan.send(next_data_port);
let next_link = AtomicOption::new(~next_link_port);
SharedPort { next_link: UnsafeAtomicRcBox::new(next_link) }
}
}
impl<T: Owned> GenericPort<T> for SharedPort<T> {
fn recv(&self) -> T {
match self.try_recv() {
Some(val) => val,
None => {
fail!("receiving on a closed channel");
}
}
}
#[cfg(stage0)] // odd type checking errors
fn try_recv(&self) -> Option<T> {
fail!()
}
#[cfg(not(stage0))]
fn try_recv(&self) -> Option<T> {
unsafe {
let (next_link_port, next_link_chan) = oneshot();
let link_port = (*self.next_link.get()).swap(~next_link_port, SeqCst);
let link_port = link_port.unwrap();
let data_port = link_port.recv();
let (next_data_port, res) = match data_port.try_recv() {
Some(StreamPayload { val, next }) => {
(next, Some(val))
}
None => {
let (next_data_port, _) = oneshot();
(next_data_port, None)
}
};
next_link_chan.send(next_data_port);
return res;
}
}
}
impl<T> Clone for SharedPort<T> {
fn clone(&self) -> SharedPort<T> {
SharedPort {
next_link: self.next_link.clone()
}
}
}
// XXX: Need better name
type MegaPipe<T> = (SharedPort<T>, SharedChan<T>);
pub fn megapipe<T: Owned>() -> MegaPipe<T> {
let (port, chan) = stream();
(SharedPort::new(port), SharedChan::new(chan))
}
impl<T: Owned> GenericChan<T> for MegaPipe<T> {
fn send(&self, val: T) {
match *self {
(_, ref c) => c.send(val)
}
}
}
impl<T: Owned> GenericSmartChan<T> for MegaPipe<T> {
fn try_send(&self, val: T) -> bool {
match *self {
(_, ref c) => c.try_send(val)
}
}
}
impl<T: Owned> GenericPort<T> for MegaPipe<T> {
fn recv(&self) -> T {
match *self {
(ref p, _) => p.recv()
}
}
fn try_recv(&self) -> Option<T> {
match *self {
(ref p, _) => p.try_recv()
}
}
}
#[cfg(test)]
mod test {
use super::*;
@@ -584,7 +748,7 @@ mod test {
#[test]
fn stream_send_recv_stress() {
for stress_factor().times {
do run_in_newsched_task {
do run_in_mt_newsched_task {
let (port, chan) = stream::<~int>();
send(chan, 0);
@@ -594,18 +758,18 @@ mod test {
if i == 10 { return }
let chan_cell = Cell::new(chan);
let _thread = do spawntask_thread {
do spawntask_random {
let chan = chan_cell.take();
chan.send(~i);
send(chan, i + 1);
};
}
}
fn recv(port: Port<~int>, i: int) {
if i == 10 { return }
let port_cell = Cell::new(port);
let _thread = do spawntask_thread {
do spawntask_random {
let port = port_cell.take();
assert!(port.recv() == ~i);
recv(port, i + 1);
@@ -614,4 +778,143 @@ mod test {
}
}
}
#[test]
fn recv_a_lot() {
// Regression test that we don't run out of stack in scheduler context
do run_in_newsched_task {
let (port, chan) = stream();
for 10000.times { chan.send(()) }
for 10000.times { port.recv() }
}
}
#[test]
fn shared_chan_stress() {
do run_in_mt_newsched_task {
let (port, chan) = stream();
let chan = SharedChan::new(chan);
let total = stress_factor() + 100;
for total.times {
let chan_clone = chan.clone();
do spawntask_random {
chan_clone.send(());
}
}
for total.times {
port.recv();
}
}
}
#[test]
fn shared_port_stress() {
do run_in_mt_newsched_task {
// XXX: Removing these type annotations causes an ICE
let (end_port, end_chan) = stream::<()>();
let (port, chan) = stream::<()>();
let end_chan = SharedChan::new(end_chan);
let port = SharedPort::new(port);
let total = stress_factor() + 100;
for total.times {
let end_chan_clone = end_chan.clone();
let port_clone = port.clone();
do spawntask_random {
port_clone.recv();
end_chan_clone.send(());
}
}
for total.times {
chan.send(());
}
for total.times {
end_port.recv();
}
}
}
#[test]
fn shared_port_close_simple() {
do run_in_mt_newsched_task {
let (port, chan) = stream::<()>();
let port = SharedPort::new(port);
{ let _chan = chan; }
assert!(port.try_recv().is_none());
}
}
#[test]
fn shared_port_close() {
do run_in_mt_newsched_task {
let (end_port, end_chan) = stream::<bool>();
let (port, chan) = stream::<()>();
let end_chan = SharedChan::new(end_chan);
let port = SharedPort::new(port);
let chan = SharedChan::new(chan);
let send_total = 10;
let recv_total = 20;
do spawntask_random {
for send_total.times {
let chan_clone = chan.clone();
do spawntask_random {
chan_clone.send(());
}
}
}
let end_chan_clone = end_chan.clone();
do spawntask_random {
for recv_total.times {
let port_clone = port.clone();
let end_chan_clone = end_chan_clone.clone();
do spawntask_random {
let recvd = port_clone.try_recv().is_some();
end_chan_clone.send(recvd);
}
}
}
let mut recvd = 0;
for recv_total.times {
recvd += if end_port.recv() { 1 } else { 0 };
}
assert!(recvd == send_total);
}
}
#[test]
fn megapipe_stress() {
use rand;
use rand::RngUtil;
do run_in_mt_newsched_task {
let (end_port, end_chan) = stream::<()>();
let end_chan = SharedChan::new(end_chan);
let pipe = megapipe();
let total = stress_factor() + 10;
let mut rng = rand::rng();
for total.times {
let msgs = rng.gen_uint_range(0, 10);
let pipe_clone = pipe.clone();
let end_chan_clone = end_chan.clone();
do spawntask_random {
for msgs.times {
pipe_clone.send(());
}
for msgs.times {
pipe_clone.recv();
}
}
end_chan_clone.send(());
}
for total.times {
end_port.recv();
}
}
}
}

View File

@@ -14,7 +14,7 @@ use c_malloc = libc::malloc;
use c_free = libc::free;
use managed::raw::{BoxHeaderRepr, BoxRepr};
use cast::transmute;
use unstable::intrinsics::{atomic_xadd,atomic_xsub};
use unstable::intrinsics::{atomic_xadd,atomic_xsub, atomic_load};
use ptr::null;
use intrinsic::TyDesc;
@@ -34,8 +34,7 @@ pub unsafe fn malloc(td: *TypeDesc, size: uint) -> *c_void {
box.header.prev = null();
box.header.next = null();
let exchange_count = &mut *exchange_count_ptr();
atomic_xadd(exchange_count, 1);
inc_count();
return transmute(box);
}
@@ -48,21 +47,46 @@ pub unsafe fn malloc_raw(size: uint) -> *c_void {
if p.is_null() {
fail!("Failure in malloc_raw: result ptr is null");
}
inc_count();
p
}
pub unsafe fn free(ptr: *c_void) {
let exchange_count = &mut *exchange_count_ptr();
atomic_xsub(exchange_count, 1);
assert!(ptr.is_not_null());
dec_count();
c_free(ptr);
}
///Thin wrapper around libc::free, as with exchange_alloc::malloc_raw
pub unsafe fn free_raw(ptr: *c_void) {
assert!(ptr.is_not_null());
dec_count();
c_free(ptr);
}
fn inc_count() {
unsafe {
let exchange_count = &mut *exchange_count_ptr();
atomic_xadd(exchange_count, 1);
}
}
fn dec_count() {
unsafe {
let exchange_count = &mut *exchange_count_ptr();
atomic_xsub(exchange_count, 1);
}
}
pub fn cleanup() {
unsafe {
let count_ptr = exchange_count_ptr();
let allocations = atomic_load(&*count_ptr);
if allocations != 0 {
rtabort!("exchange heap not empty on exit - %i dangling allocations", allocations);
}
}
}
fn get_box_size(body_size: uint, body_align: uint) -> uint {
let header_size = size_of::<BoxHeaderRepr>();
// FIXME (#2699): This alignment calculation is suspicious. Is it right?

645
src/libstd/rt/join_latch.rs Normal file
View File

@@ -0,0 +1,645 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! The JoinLatch is a concurrent type that establishes the task
//! tree and propagates failure.
//!
//! Each task gets a JoinLatch that is derived from the JoinLatch
//! of its parent task. Every latch must be released by either calling
//! the non-blocking `release` method or the task-blocking `wait` method.
//! Releasing a latch does not complete until all of its child latches
//! complete.
//!
//! Latches carry a `success` flag that is set to `false` during task
//! failure and is propagated both from children to parents and parents
//! to children. The status af this flag may be queried for the purposes
//! of linked failure.
//!
//! In addition to failure propagation the task tree serves to keep the
//! default task schedulers alive. The runtime only sends the shutdown
//! message to schedulers once the root task exits.
//!
//! Under this scheme tasks that terminate before their children become
//! 'zombies' since they may not exit until their children do. Zombie
//! tasks are 'tombstoned' as `Tombstone(~JoinLatch)` and the tasks
//! themselves allowed to terminate.
//!
//! XXX: Propagate flag from parents to children.
//! XXX: Tombstoning actually doesn't work.
//! XXX: This could probably be done in a way that doesn't leak tombstones
//! longer than the life of the child tasks.
use comm::{GenericPort, Peekable, GenericSmartChan};
use clone::Clone;
use container::Container;
use option::{Option, Some, None};
use ops::Drop;
use rt::comm::{SharedChan, Port, stream};
use rt::local::Local;
use rt::sched::Scheduler;
use unstable::atomics::{AtomicUint, SeqCst};
use util;
use vec::OwnedVector;
// FIXME #7026: Would prefer this to be an enum
pub struct JoinLatch {
priv parent: Option<ParentLink>,
priv child: Option<ChildLink>,
closed: bool,
}
// Shared between parents and all their children.
struct SharedState {
/// Reference count, held by a parent and all children.
count: AtomicUint,
success: bool
}
struct ParentLink {
shared: *mut SharedState,
// For communicating with the parent.
chan: SharedChan<Message>
}
struct ChildLink {
shared: ~SharedState,
// For receiving from children.
port: Port<Message>,
chan: SharedChan<Message>,
// Prevents dropping the child SharedState reference counts multiple times.
dropped_child: bool
}
// Messages from child latches to parent.
enum Message {
Tombstone(~JoinLatch),
ChildrenTerminated
}
impl JoinLatch {
pub fn new_root() -> ~JoinLatch {
let this = ~JoinLatch {
parent: None,
child: None,
closed: false
};
rtdebug!("new root latch %x", this.id());
return this;
}
fn id(&self) -> uint {
unsafe { ::cast::transmute(&*self) }
}
pub fn new_child(&mut self) -> ~JoinLatch {
rtassert!(!self.closed);
if self.child.is_none() {
// This is the first time spawning a child
let shared = ~SharedState {
count: AtomicUint::new(1),
success: true
};
let (port, chan) = stream();
let chan = SharedChan::new(chan);
let child = ChildLink {
shared: shared,
port: port,
chan: chan,
dropped_child: false
};
self.child = Some(child);
}
let child_link: &mut ChildLink = self.child.get_mut_ref();
let shared_state: *mut SharedState = &mut *child_link.shared;
child_link.shared.count.fetch_add(1, SeqCst);
let child = ~JoinLatch {
parent: Some(ParentLink {
shared: shared_state,
chan: child_link.chan.clone()
}),
child: None,
closed: false
};
rtdebug!("NEW child latch %x", child.id());
return child;
}
pub fn release(~self, local_success: bool) {
// XXX: This should not block, but there's a bug in the below
// code that I can't figure out.
self.wait(local_success);
}
// XXX: Should not require ~self
fn release_broken(~self, local_success: bool) {
rtassert!(!self.closed);
rtdebug!("releasing %x", self.id());
let id = self.id();
let _ = id; // XXX: `id` is only used in debug statements so appears unused
let mut this = self;
let mut child_success = true;
let mut children_done = false;
if this.child.is_some() {
rtdebug!("releasing children");
let child_link: &mut ChildLink = this.child.get_mut_ref();
let shared: &mut SharedState = &mut *child_link.shared;
if !child_link.dropped_child {
let last_count = shared.count.fetch_sub(1, SeqCst);
rtdebug!("child count before sub %u %x", last_count, id);
if last_count == 1 {
assert!(child_link.chan.try_send(ChildrenTerminated));
}
child_link.dropped_child = true;
}
// Wait for messages from children
let mut tombstones = ~[];
loop {
if child_link.port.peek() {
match child_link.port.recv() {
Tombstone(t) => {
tombstones.push(t);
},
ChildrenTerminated => {
children_done = true;
break;
}
}
} else {
break
}
}
rtdebug!("releasing %u tombstones %x", tombstones.len(), id);
// Try to release the tombstones. Those that still have
// outstanding will be re-enqueued. When this task's
// parents release their latch we'll end up back here
// trying them again.
while !tombstones.is_empty() {
tombstones.pop().release(true);
}
if children_done {
let count = shared.count.load(SeqCst);
assert!(count == 0);
// self_count is the acquire-read barrier
child_success = shared.success;
}
} else {
children_done = true;
}
let total_success = local_success && child_success;
rtassert!(this.parent.is_some());
unsafe {
{
let parent_link: &mut ParentLink = this.parent.get_mut_ref();
let shared: *mut SharedState = parent_link.shared;
if !total_success {
// parent_count is the write-wait barrier
(*shared).success = false;
}
}
if children_done {
rtdebug!("children done");
do Local::borrow::<Scheduler, ()> |sched| {
sched.metrics.release_tombstone += 1;
}
{
rtdebug!("RELEASING parent %x", id);
let parent_link: &mut ParentLink = this.parent.get_mut_ref();
let shared: *mut SharedState = parent_link.shared;
let last_count = (*shared).count.fetch_sub(1, SeqCst);
rtdebug!("count before parent sub %u %x", last_count, id);
if last_count == 1 {
assert!(parent_link.chan.try_send(ChildrenTerminated));
}
}
this.closed = true;
util::ignore(this);
} else {
rtdebug!("children not done");
rtdebug!("TOMBSTONING %x", id);
do Local::borrow::<Scheduler, ()> |sched| {
sched.metrics.release_no_tombstone += 1;
}
let chan = {
let parent_link: &mut ParentLink = this.parent.get_mut_ref();
parent_link.chan.clone()
};
assert!(chan.try_send(Tombstone(this)));
}
}
}
// XXX: Should not require ~self
pub fn wait(~self, local_success: bool) -> bool {
rtassert!(!self.closed);
rtdebug!("WAITING %x", self.id());
let mut this = self;
let mut child_success = true;
if this.child.is_some() {
rtdebug!("waiting for children");
let child_link: &mut ChildLink = this.child.get_mut_ref();
let shared: &mut SharedState = &mut *child_link.shared;
if !child_link.dropped_child {
let last_count = shared.count.fetch_sub(1, SeqCst);
rtdebug!("child count before sub %u", last_count);
if last_count == 1 {
assert!(child_link.chan.try_send(ChildrenTerminated));
}
child_link.dropped_child = true;
}
// Wait for messages from children
loop {
match child_link.port.recv() {
Tombstone(t) => {
t.wait(true);
}
ChildrenTerminated => break
}
}
let count = shared.count.load(SeqCst);
if count != 0 { ::io::println(fmt!("%u", count)); }
assert!(count == 0);
// self_count is the acquire-read barrier
child_success = shared.success;
}
let total_success = local_success && child_success;
if this.parent.is_some() {
rtdebug!("releasing parent");
unsafe {
let parent_link: &mut ParentLink = this.parent.get_mut_ref();
let shared: *mut SharedState = parent_link.shared;
if !total_success {
// parent_count is the write-wait barrier
(*shared).success = false;
}
let last_count = (*shared).count.fetch_sub(1, SeqCst);
rtdebug!("count before parent sub %u", last_count);
if last_count == 1 {
assert!(parent_link.chan.try_send(ChildrenTerminated));
}
}
}
this.closed = true;
util::ignore(this);
return total_success;
}
}
impl Drop for JoinLatch {
fn finalize(&self) {
rtdebug!("DESTROYING %x", self.id());
rtassert!(self.closed);
}
}
#[cfg(test)]
mod test {
use super::*;
use cell::Cell;
use container::Container;
use iter::Times;
use old_iter::BaseIter;
use rt::test::*;
use rand;
use rand::RngUtil;
use vec::{CopyableVector, ImmutableVector};
#[test]
fn success_immediately() {
do run_in_newsched_task {
let mut latch = JoinLatch::new_root();
let child_latch = latch.new_child();
let child_latch = Cell::new(child_latch);
do spawntask_immediately {
let child_latch = child_latch.take();
assert!(child_latch.wait(true));
}
assert!(latch.wait(true));
}
}
#[test]
fn success_later() {
do run_in_newsched_task {
let mut latch = JoinLatch::new_root();
let child_latch = latch.new_child();
let child_latch = Cell::new(child_latch);
do spawntask_later {
let child_latch = child_latch.take();
assert!(child_latch.wait(true));
}
assert!(latch.wait(true));
}
}
#[test]
fn mt_success() {
do run_in_mt_newsched_task {
let mut latch = JoinLatch::new_root();
for 10.times {
let child_latch = latch.new_child();
let child_latch = Cell::new(child_latch);
do spawntask_random {
let child_latch = child_latch.take();
assert!(child_latch.wait(true));
}
}
assert!(latch.wait(true));
}
}
#[test]
fn mt_failure() {
do run_in_mt_newsched_task {
let mut latch = JoinLatch::new_root();
let spawn = |status| {
let child_latch = latch.new_child();
let child_latch = Cell::new(child_latch);
do spawntask_random {
let child_latch = child_latch.take();
child_latch.wait(status);
}
};
for 10.times { spawn(true) }
spawn(false);
for 10.times { spawn(true) }
assert!(!latch.wait(true));
}
}
#[test]
fn mt_multi_level_success() {
do run_in_mt_newsched_task {
let mut latch = JoinLatch::new_root();
fn child(latch: &mut JoinLatch, i: int) {
let child_latch = latch.new_child();
let child_latch = Cell::new(child_latch);
do spawntask_random {
let mut child_latch = child_latch.take();
if i != 0 {
child(&mut *child_latch, i - 1);
child_latch.wait(true);
} else {
child_latch.wait(true);
}
}
}
child(&mut *latch, 10);
assert!(latch.wait(true));
}
}
#[test]
fn mt_multi_level_failure() {
do run_in_mt_newsched_task {
let mut latch = JoinLatch::new_root();
fn child(latch: &mut JoinLatch, i: int) {
let child_latch = latch.new_child();
let child_latch = Cell::new(child_latch);
do spawntask_random {
let mut child_latch = child_latch.take();
if i != 0 {
child(&mut *child_latch, i - 1);
child_latch.wait(false);
} else {
child_latch.wait(true);
}
}
}
child(&mut *latch, 10);
assert!(!latch.wait(true));
}
}
#[test]
fn release_child() {
do run_in_newsched_task {
let mut latch = JoinLatch::new_root();
let child_latch = latch.new_child();
let child_latch = Cell::new(child_latch);
do spawntask_immediately {
let latch = child_latch.take();
latch.release(false);
}
assert!(!latch.wait(true));
}
}
#[test]
fn release_child_tombstone() {
do run_in_newsched_task {
let mut latch = JoinLatch::new_root();
let child_latch = latch.new_child();
let child_latch = Cell::new(child_latch);
do spawntask_immediately {
let mut latch = child_latch.take();
let child_latch = latch.new_child();
let child_latch = Cell::new(child_latch);
do spawntask_later {
let latch = child_latch.take();
latch.release(false);
}
latch.release(true);
}
assert!(!latch.wait(true));
}
}
#[test]
fn release_child_no_tombstone() {
do run_in_newsched_task {
let mut latch = JoinLatch::new_root();
let child_latch = latch.new_child();
let child_latch = Cell::new(child_latch);
do spawntask_later {
let mut latch = child_latch.take();
let child_latch = latch.new_child();
let child_latch = Cell::new(child_latch);
do spawntask_immediately {
let latch = child_latch.take();
latch.release(false);
}
latch.release(true);
}
assert!(!latch.wait(true));
}
}
#[test]
fn release_child_tombstone_stress() {
fn rand_orders() -> ~[bool] {
let mut v = ~[false,.. 5];
v[0] = true;
let mut rng = rand::rng();
return rng.shuffle(v);
}
fn split_orders(orders: &[bool]) -> (~[bool], ~[bool]) {
if orders.is_empty() {
return (~[], ~[]);
} else if orders.len() <= 2 {
return (orders.to_owned(), ~[]);
}
let mut rng = rand::rng();
let n = rng.gen_uint_range(1, orders.len());
let first = orders.slice(0, n).to_owned();
let last = orders.slice(n, orders.len()).to_owned();
assert!(first.len() + last.len() == orders.len());
return (first, last);
}
for stress_factor().times {
do run_in_newsched_task {
fn doit(latch: &mut JoinLatch, orders: ~[bool], depth: uint) {
let (my_orders, remaining_orders) = split_orders(orders);
rtdebug!("(my_orders, remaining): %?", (&my_orders, &remaining_orders));
rtdebug!("depth: %u", depth);
let mut remaining_orders = remaining_orders;
let mut num = 0;
for my_orders.each |&order| {
let child_latch = latch.new_child();
let child_latch = Cell::new(child_latch);
let (child_orders, remaining) = split_orders(remaining_orders);
rtdebug!("(child_orders, remaining): %?", (&child_orders, &remaining));
remaining_orders = remaining;
let child_orders = Cell::new(child_orders);
let child_num = num;
let _ = child_num; // XXX unused except in rtdebug!
do spawntask_random {
rtdebug!("depth %u num %u", depth, child_num);
let mut child_latch = child_latch.take();
let child_orders = child_orders.take();
doit(&mut *child_latch, child_orders, depth + 1);
child_latch.release(order);
}
num += 1;
}
}
let mut latch = JoinLatch::new_root();
let orders = rand_orders();
rtdebug!("orders: %?", orders);
doit(&mut *latch, orders, 0);
assert!(!latch.wait(true));
}
}
}
#[test]
fn whateverman() {
struct Order {
immediate: bool,
succeed: bool,
orders: ~[Order]
}
fn next(latch: &mut JoinLatch, orders: ~[Order]) {
for orders.each |order| {
let suborders = copy order.orders;
let child_latch = Cell::new(latch.new_child());
let succeed = order.succeed;
if order.immediate {
do spawntask_immediately {
let mut child_latch = child_latch.take();
next(&mut *child_latch, copy suborders);
rtdebug!("immediate releasing");
child_latch.release(succeed);
}
} else {
do spawntask_later {
let mut child_latch = child_latch.take();
next(&mut *child_latch, copy suborders);
rtdebug!("later releasing");
child_latch.release(succeed);
}
}
}
}
do run_in_newsched_task {
let mut latch = JoinLatch::new_root();
let orders = ~[ Order { // 0 0
immediate: true,
succeed: true,
orders: ~[ Order { // 1 0
immediate: true,
succeed: false,
orders: ~[ Order { // 2 0
immediate: false,
succeed: false,
orders: ~[ Order { // 3 0
immediate: true,
succeed: false,
orders: ~[]
}, Order { // 3 1
immediate: false,
succeed: false,
orders: ~[]
}]
}]
}]
}];
next(&mut *latch, orders);
assert!(!latch.wait(true));
}
}
}

View File

@@ -18,7 +18,7 @@ pub trait Local {
fn put(value: ~Self);
fn take() -> ~Self;
fn exists() -> bool;
fn borrow(f: &fn(&mut Self));
fn borrow<T>(f: &fn(&mut Self) -> T) -> T;
unsafe fn unsafe_borrow() -> *mut Self;
unsafe fn try_unsafe_borrow() -> Option<*mut Self>;
}
@@ -27,23 +27,36 @@ impl Local for Scheduler {
fn put(value: ~Scheduler) { unsafe { local_ptr::put(value) }}
fn take() -> ~Scheduler { unsafe { local_ptr::take() } }
fn exists() -> bool { local_ptr::exists() }
fn borrow(f: &fn(&mut Scheduler)) { unsafe { local_ptr::borrow(f) } }
fn borrow<T>(f: &fn(&mut Scheduler) -> T) -> T {
let mut res: Option<T> = None;
let res_ptr: *mut Option<T> = &mut res;
unsafe {
do local_ptr::borrow |sched| {
let result = f(sched);
*res_ptr = Some(result);
}
}
match res {
Some(r) => { r }
None => rtabort!("function failed!")
}
}
unsafe fn unsafe_borrow() -> *mut Scheduler { local_ptr::unsafe_borrow() }
unsafe fn try_unsafe_borrow() -> Option<*mut Scheduler> { abort!("unimpl") }
unsafe fn try_unsafe_borrow() -> Option<*mut Scheduler> { rtabort!("unimpl") }
}
impl Local for Task {
fn put(_value: ~Task) { abort!("unimpl") }
fn take() -> ~Task { abort!("unimpl") }
fn exists() -> bool { abort!("unimpl") }
fn borrow(f: &fn(&mut Task)) {
do Local::borrow::<Scheduler> |sched| {
fn put(_value: ~Task) { rtabort!("unimpl") }
fn take() -> ~Task { rtabort!("unimpl") }
fn exists() -> bool { rtabort!("unimpl") }
fn borrow<T>(f: &fn(&mut Task) -> T) -> T {
do Local::borrow::<Scheduler, T> |sched| {
match sched.current_task {
Some(~ref mut task) => {
f(&mut *task.task)
}
None => {
abort!("no scheduler")
rtabort!("no scheduler")
}
}
}
@@ -56,7 +69,7 @@ impl Local for Task {
}
None => {
// Don't fail. Infinite recursion
abort!("no scheduler")
rtabort!("no scheduler")
}
}
}
@@ -71,48 +84,60 @@ impl Local for Task {
// XXX: This formulation won't work once ~IoFactoryObject is a real trait pointer
impl Local for IoFactoryObject {
fn put(_value: ~IoFactoryObject) { abort!("unimpl") }
fn take() -> ~IoFactoryObject { abort!("unimpl") }
fn exists() -> bool { abort!("unimpl") }
fn borrow(_f: &fn(&mut IoFactoryObject)) { abort!("unimpl") }
fn put(_value: ~IoFactoryObject) { rtabort!("unimpl") }
fn take() -> ~IoFactoryObject { rtabort!("unimpl") }
fn exists() -> bool { rtabort!("unimpl") }
fn borrow<T>(_f: &fn(&mut IoFactoryObject) -> T) -> T { rtabort!("unimpl") }
unsafe fn unsafe_borrow() -> *mut IoFactoryObject {
let sched = Local::unsafe_borrow::<Scheduler>();
let io: *mut IoFactoryObject = (*sched).event_loop.io().unwrap();
return io;
}
unsafe fn try_unsafe_borrow() -> Option<*mut IoFactoryObject> { abort!("unimpl") }
unsafe fn try_unsafe_borrow() -> Option<*mut IoFactoryObject> { rtabort!("unimpl") }
}
#[cfg(test)]
mod test {
use rt::test::*;
use rt::sched::Scheduler;
use rt::uv::uvio::UvEventLoop;
use super::*;
#[test]
fn thread_local_scheduler_smoke_test() {
let scheduler = ~UvEventLoop::new_scheduler();
let scheduler = ~new_test_uv_sched();
Local::put(scheduler);
let _scheduler: ~Scheduler = Local::take();
}
#[test]
fn thread_local_scheduler_two_instances() {
let scheduler = ~UvEventLoop::new_scheduler();
let scheduler = ~new_test_uv_sched();
Local::put(scheduler);
let _scheduler: ~Scheduler = Local::take();
let scheduler = ~UvEventLoop::new_scheduler();
let scheduler = ~new_test_uv_sched();
Local::put(scheduler);
let _scheduler: ~Scheduler = Local::take();
}
#[test]
fn borrow_smoke_test() {
let scheduler = ~UvEventLoop::new_scheduler();
let scheduler = ~new_test_uv_sched();
Local::put(scheduler);
unsafe {
let _scheduler: *mut Scheduler = Local::unsafe_borrow();
}
let _scheduler: ~Scheduler = Local::take();
}
#[test]
fn borrow_with_return() {
let scheduler = ~new_test_uv_sched();
Local::put(scheduler);
let res = do Local::borrow::<Scheduler,bool> |_sched| {
true
};
assert!(res)
let _scheduler: ~Scheduler = Local::take();
}
}

View File

@@ -109,7 +109,7 @@ pub unsafe fn unsafe_borrow<T>() -> *mut T {
fn tls_key() -> tls::Key {
match maybe_tls_key() {
Some(key) => key,
None => abort!("runtime tls key not initialized")
None => rtabort!("runtime tls key not initialized")
}
}

View File

@@ -8,6 +8,9 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! A concurrent queue that supports multiple producers and a
//! single consumer.
use container::Container;
use kinds::Owned;
use vec::OwnedVector;

98
src/libstd/rt/metrics.rs Normal file
View File

@@ -0,0 +1,98 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use to_str::ToStr;
pub struct SchedMetrics {
// The number of times executing `run_sched_once`.
turns: uint,
// The number of turns that received a message.
messages_received: uint,
// The number of turns that ran a task from the queue.
tasks_resumed_from_queue: uint,
// The number of turns that found no work to perform.
wasted_turns: uint,
// The number of times the scheduler went to sleep.
sleepy_times: uint,
// Context switches from the scheduler into a task.
context_switches_sched_to_task: uint,
// Context switches from a task into the scheduler.
context_switches_task_to_sched: uint,
// Context switches from a task to a task.
context_switches_task_to_task: uint,
// Message sends that unblock the receiver
rendezvous_sends: uint,
// Message sends that do not unblock the receiver
non_rendezvous_sends: uint,
// Message receives that do not block the receiver
rendezvous_recvs: uint,
// Message receives that block the receiver
non_rendezvous_recvs: uint,
// JoinLatch releases that create tombstones
release_tombstone: uint,
// JoinLatch releases that do not create tombstones
release_no_tombstone: uint,
}
impl SchedMetrics {
pub fn new() -> SchedMetrics {
SchedMetrics {
turns: 0,
messages_received: 0,
tasks_resumed_from_queue: 0,
wasted_turns: 0,
sleepy_times: 0,
context_switches_sched_to_task: 0,
context_switches_task_to_sched: 0,
context_switches_task_to_task: 0,
rendezvous_sends: 0,
non_rendezvous_sends: 0,
rendezvous_recvs: 0,
non_rendezvous_recvs: 0,
release_tombstone: 0,
release_no_tombstone: 0
}
}
}
impl ToStr for SchedMetrics {
fn to_str(&self) -> ~str {
fmt!("turns: %u\n\
messages_received: %u\n\
tasks_resumed_from_queue: %u\n\
wasted_turns: %u\n\
sleepy_times: %u\n\
context_switches_sched_to_task: %u\n\
context_switches_task_to_sched: %u\n\
context_switches_task_to_task: %u\n\
rendezvous_sends: %u\n\
non_rendezvous_sends: %u\n\
rendezvous_recvs: %u\n\
non_rendezvous_recvs: %u\n\
release_tombstone: %u\n\
release_no_tombstone: %u\n\
",
self.turns,
self.messages_received,
self.tasks_resumed_from_queue,
self.wasted_turns,
self.sleepy_times,
self.context_switches_sched_to_task,
self.context_switches_task_to_sched,
self.context_switches_task_to_task,
self.rendezvous_sends,
self.non_rendezvous_sends,
self.rendezvous_recvs,
self.non_rendezvous_recvs,
self.release_tombstone,
self.release_no_tombstone
)
}
}

View File

@@ -55,8 +55,28 @@ Several modules in `core` are clients of `rt`:
*/
#[doc(hidden)];
#[deny(unused_imports)];
#[deny(unused_mut)];
#[deny(unused_variable)];
use cell::Cell;
use clone::Clone;
use container::Container;
use from_str::FromStr;
use iter::Times;
use iterator::IteratorUtil;
use option::{Some, None};
use os;
use ptr::RawPtr;
use rt::sched::{Scheduler, Coroutine, Shutdown};
use rt::sleeper_list::SleeperList;
use rt::task::Task;
use rt::thread::Thread;
use rt::work_queue::WorkQueue;
use rt::uv::uvio::UvEventLoop;
use unstable::atomics::{AtomicInt, SeqCst};
use unstable::sync::UnsafeAtomicRcBox;
use vec::{OwnedVector, MutableVector};
/// The global (exchange) heap.
pub mod global_heap;
@@ -88,6 +108,9 @@ mod work_queue;
/// A parallel queue.
mod message_queue;
/// A parallel data structure for tracking sleeping schedulers.
mod sleeper_list;
/// Stack segments and caching.
mod stack;
@@ -127,6 +150,14 @@ pub mod local_ptr;
/// Bindings to pthread/windows thread-local storage.
pub mod thread_local_storage;
/// For waiting on child tasks.
pub mod join_latch;
pub mod metrics;
// FIXME #5248 shouldn't be pub
/// Just stuff
pub mod util;
/// Set up a default runtime configuration, given compiler-supplied arguments.
///
@@ -144,25 +175,114 @@ pub mod thread_local_storage;
/// The return value is used as the process return code. 0 on success, 101 on error.
pub fn start(_argc: int, _argv: **u8, crate_map: *u8, main: ~fn()) -> int {
use self::sched::{Scheduler, Coroutine};
use self::uv::uvio::UvEventLoop;
init(crate_map);
let exit_code = run(main);
cleanup();
let loop_ = ~UvEventLoop::new();
let mut sched = ~Scheduler::new(loop_);
let main_task = ~Coroutine::new(&mut sched.stack_pool, main);
sched.enqueue_task(main_task);
sched.run();
return 0;
return exit_code;
}
/// One-time runtime initialization. Currently all this does is set up logging
/// based on the RUST_LOG environment variable.
pub fn init(crate_map: *u8) {
logging::init(crate_map);
unsafe { rust_update_gc_metadata(crate_map) }
extern {
fn rust_update_gc_metadata(crate_map: *u8);
}
}
/// One-time runtime cleanup.
pub fn cleanup() {
global_heap::cleanup();
}
/// Execute the main function in a scheduler.
///
/// Configures the runtime according to the environment, by default
/// using a task scheduler with the same number of threads as cores.
/// Returns a process exit code.
pub fn run(main: ~fn()) -> int {
static DEFAULT_ERROR_CODE: int = 101;
let nthreads = match os::getenv("RUST_THREADS") {
Some(nstr) => FromStr::from_str(nstr).get(),
None => unsafe { util::num_cpus() }
};
// The shared list of sleeping schedulers. Schedulers wake each other
// occassionally to do new work.
let sleepers = SleeperList::new();
// The shared work queue. Temporary until work stealing is implemented.
let work_queue = WorkQueue::new();
// The schedulers.
let mut scheds = ~[];
// Handles to the schedulers. When the main task ends these will be
// sent the Shutdown message to terminate the schedulers.
let mut handles = ~[];
for nthreads.times {
// Every scheduler is driven by an I/O event loop.
let loop_ = ~UvEventLoop::new();
let mut sched = ~Scheduler::new(loop_, work_queue.clone(), sleepers.clone());
let handle = sched.make_handle();
scheds.push(sched);
handles.push(handle);
}
// Create a shared cell for transmitting the process exit
// code from the main task to this function.
let exit_code = UnsafeAtomicRcBox::new(AtomicInt::new(0));
let exit_code_clone = exit_code.clone();
// When the main task exits, after all the tasks in the main
// task tree, shut down the schedulers and set the exit code.
let handles = Cell::new(handles);
let on_exit: ~fn(bool) = |exit_success| {
let mut handles = handles.take();
for handles.mut_iter().advance |handle| {
handle.send(Shutdown);
}
unsafe {
let exit_code = if exit_success { 0 } else { DEFAULT_ERROR_CODE };
(*exit_code_clone.get()).store(exit_code, SeqCst);
}
};
// Create and enqueue the main task.
let main_cell = Cell::new(main);
let mut new_task = ~Task::new_root();
new_task.on_exit = Some(on_exit);
let main_task = ~Coroutine::with_task(&mut scheds[0].stack_pool,
new_task, main_cell.take());
scheds[0].enqueue_task(main_task);
// Run each scheduler in a thread.
let mut threads = ~[];
while !scheds.is_empty() {
let sched = scheds.pop();
let sched_cell = Cell::new(sched);
let thread = do Thread::start {
let sched = sched_cell.take();
sched.run();
};
threads.push(thread);
}
// Wait for schedulers
{ let _threads = threads; }
// Return the exit code
unsafe {
(*exit_code.get()).load(SeqCst)
}
}
/// Possible contexts in which Rust code may be executing.
@@ -194,8 +314,8 @@ pub fn context() -> RuntimeContext {
return OldTaskContext;
} else {
if Local::exists::<Scheduler>() {
let context = ::cell::Cell::new_empty();
do Local::borrow::<Scheduler> |sched| {
let context = Cell::new_empty();
do Local::borrow::<Scheduler, ()> |sched| {
if sched.in_task_context() {
context.put_back(TaskContext);
} else {
@@ -218,23 +338,19 @@ pub fn context() -> RuntimeContext {
fn test_context() {
use unstable::run_in_bare_thread;
use self::sched::{Scheduler, Coroutine};
use rt::uv::uvio::UvEventLoop;
use cell::Cell;
use rt::local::Local;
use rt::test::new_test_uv_sched;
assert_eq!(context(), OldTaskContext);
do run_in_bare_thread {
assert_eq!(context(), GlobalContext);
let mut sched = ~UvEventLoop::new_scheduler();
let task = ~do Coroutine::new(&mut sched.stack_pool) {
let mut sched = ~new_test_uv_sched();
let task = ~do Coroutine::new_root(&mut sched.stack_pool) {
assert_eq!(context(), TaskContext);
let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then() |task| {
do sched.deschedule_running_task_and_then() |sched, task| {
assert_eq!(context(), SchedulerContext);
let task = Cell::new(task);
do Local::borrow::<Scheduler> |sched| {
sched.enqueue_task(task.take());
}
sched.enqueue_task(task);
}
};
sched.enqueue_task(task);

View File

@@ -18,6 +18,7 @@ use rt::uv::uvio;
// XXX: ~object doesn't work currently so these are some placeholder
// types to use instead
pub type EventLoopObject = uvio::UvEventLoop;
pub type RemoteCallbackObject = uvio::UvRemoteCallback;
pub type IoFactoryObject = uvio::UvIoFactory;
pub type RtioTcpStreamObject = uvio::UvTcpStream;
pub type RtioTcpListenerObject = uvio::UvTcpListener;
@@ -26,10 +27,20 @@ pub trait EventLoop {
fn run(&mut self);
fn callback(&mut self, ~fn());
fn callback_ms(&mut self, ms: u64, ~fn());
fn remote_callback(&mut self, ~fn()) -> ~RemoteCallbackObject;
/// The asynchronous I/O services. Not all event loops may provide one
fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject>;
}
pub trait RemoteCallback {
/// Trigger the remote callback. Note that the number of times the callback
/// is run is not guaranteed. All that is guaranteed is that, after calling 'fire',
/// the callback will be called at least once, but multiple callbacks may be coalesced
/// and callbacks may be called more often requested. Destruction also triggers the
/// callback.
fn fire(&mut self);
}
pub trait IoFactory {
fn tcp_connect(&mut self, addr: IpAddr) -> Result<~RtioTcpStreamObject, IoError>;
fn tcp_bind(&mut self, addr: IpAddr) -> Result<~RtioTcpListenerObject, IoError>;

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,59 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! Maintains a shared list of sleeping schedulers. Schedulers
//! use this to wake each other up.
use container::Container;
use vec::OwnedVector;
use option::{Option, Some, None};
use cell::Cell;
use unstable::sync::{Exclusive, exclusive};
use rt::sched::SchedHandle;
use clone::Clone;
pub struct SleeperList {
priv stack: ~Exclusive<~[SchedHandle]>
}
impl SleeperList {
pub fn new() -> SleeperList {
SleeperList {
stack: ~exclusive(~[])
}
}
pub fn push(&mut self, handle: SchedHandle) {
let handle = Cell::new(handle);
unsafe {
self.stack.with(|s| s.push(handle.take()));
}
}
pub fn pop(&mut self) -> Option<SchedHandle> {
unsafe {
do self.stack.with |s| {
if !s.is_empty() {
Some(s.pop())
} else {
None
}
}
}
}
}
impl Clone for SleeperList {
fn clone(&self) -> SleeperList {
SleeperList {
stack: self.stack.clone()
}
}
}

View File

@@ -18,16 +18,22 @@ use cast::transmute;
use libc::{c_void, uintptr_t};
use ptr;
use prelude::*;
use option::{Option, Some, None};
use rt::local::Local;
use rt::logging::StdErrLogger;
use super::local_heap::LocalHeap;
use rt::sched::{SchedHome, AnySched};
use rt::join_latch::JoinLatch;
pub struct Task {
heap: LocalHeap,
gc: GarbageCollector,
storage: LocalStorage,
logger: StdErrLogger,
unwinder: Option<Unwinder>,
unwinder: Unwinder,
home: Option<SchedHome>,
join_latch: Option<~JoinLatch>,
on_exit: Option<~fn(bool)>,
destroyed: bool
}
@@ -39,49 +45,63 @@ pub struct Unwinder {
}
impl Task {
pub fn new() -> Task {
pub fn new_root() -> Task {
Task {
heap: LocalHeap::new(),
gc: GarbageCollector,
storage: LocalStorage(ptr::null(), None),
logger: StdErrLogger,
unwinder: Some(Unwinder { unwinding: false }),
unwinder: Unwinder { unwinding: false },
home: Some(AnySched),
join_latch: Some(JoinLatch::new_root()),
on_exit: None,
destroyed: false
}
}
pub fn without_unwinding() -> Task {
pub fn new_child(&mut self) -> Task {
Task {
heap: LocalHeap::new(),
gc: GarbageCollector,
storage: LocalStorage(ptr::null(), None),
logger: StdErrLogger,
unwinder: None,
home: Some(AnySched),
unwinder: Unwinder { unwinding: false },
join_latch: Some(self.join_latch.get_mut_ref().new_child()),
on_exit: None,
destroyed: false
}
}
pub fn give_home(&mut self, new_home: SchedHome) {
self.home = Some(new_home);
}
pub fn run(&mut self, f: &fn()) {
// This is just an assertion that `run` was called unsafely
// and this instance of Task is still accessible.
do Local::borrow::<Task> |task| {
do Local::borrow::<Task, ()> |task| {
assert!(borrow::ref_eq(task, self));
}
match self.unwinder {
Some(ref mut unwinder) => {
// If there's an unwinder then set up the catch block
unwinder.try(f);
self.unwinder.try(f);
self.destroy();
// Wait for children. Possibly report the exit status.
let local_success = !self.unwinder.unwinding;
let join_latch = self.join_latch.swap_unwrap();
match self.on_exit {
Some(ref on_exit) => {
let success = join_latch.wait(local_success);
(*on_exit)(success);
}
None => {
// Otherwise, just run the body
f()
join_latch.release(local_success);
}
}
self.destroy();
}
/// Must be called manually before finalization to clean up
/// must be called manually before finalization to clean up
/// thread-local resources. Some of the routines here expect
/// Task to be available recursively so this must be
/// called unsafely, without removing Task from
@@ -89,7 +109,7 @@ impl Task {
fn destroy(&mut self) {
// This is just an assertion that `destroy` was called unsafely
// and this instance of Task is still accessible.
do Local::borrow::<Task> |task| {
do Local::borrow::<Task, ()> |task| {
assert!(borrow::ref_eq(task, self));
}
match self.storage {
@@ -227,4 +247,14 @@ mod test {
assert!(port.recv() == 10);
}
}
#[test]
fn linked_failure() {
do run_in_newsched_task() {
let res = do spawntask_try {
spawntask_random(|| fail!());
};
assert!(res.is_err());
}
}
}

View File

@@ -9,13 +9,33 @@
// except according to those terms.
use uint;
use option::*;
use option::{Some, None};
use cell::Cell;
use clone::Clone;
use container::Container;
use iterator::IteratorUtil;
use vec::{OwnedVector, MutableVector};
use result::{Result, Ok, Err};
use unstable::run_in_bare_thread;
use super::io::net::ip::{IpAddr, Ipv4};
use rt::comm::oneshot;
use rt::task::Task;
use rt::thread::Thread;
use rt::local::Local;
use rt::sched::{Scheduler, Coroutine};
use rt::sleeper_list::SleeperList;
use rt::work_queue::WorkQueue;
pub fn new_test_uv_sched() -> Scheduler {
use rt::uv::uvio::UvEventLoop;
use rt::work_queue::WorkQueue;
use rt::sleeper_list::SleeperList;
let mut sched = Scheduler::new(~UvEventLoop::new(), WorkQueue::new(), SleeperList::new());
// Don't wait for the Shutdown message
sched.no_sleep = true;
return sched;
}
/// Creates a new scheduler in a new thread and runs a task in it,
/// then waits for the scheduler to exit. Failure of the task
@@ -23,48 +43,237 @@ use rt::local::Local;
pub fn run_in_newsched_task(f: ~fn()) {
use super::sched::*;
use unstable::run_in_bare_thread;
use rt::uv::uvio::UvEventLoop;
let f = Cell::new(f);
do run_in_bare_thread {
let mut sched = ~UvEventLoop::new_scheduler();
let mut sched = ~new_test_uv_sched();
let mut new_task = ~Task::new_root();
let on_exit: ~fn(bool) = |exit_status| rtassert!(exit_status);
new_task.on_exit = Some(on_exit);
let task = ~Coroutine::with_task(&mut sched.stack_pool,
~Task::without_unwinding(),
new_task,
f.take());
sched.enqueue_task(task);
sched.run();
}
}
/// Create more than one scheduler and run a function in a task
/// in one of the schedulers. The schedulers will stay alive
/// until the function `f` returns.
pub fn run_in_mt_newsched_task(f: ~fn()) {
use os;
use from_str::FromStr;
use rt::uv::uvio::UvEventLoop;
use rt::sched::Shutdown;
use rt::util;
let f_cell = Cell::new(f);
do run_in_bare_thread {
let nthreads = match os::getenv("RUST_TEST_THREADS") {
Some(nstr) => FromStr::from_str(nstr).get(),
None => unsafe {
// Using more threads than cores in test code
// to force the OS to preempt them frequently.
// Assuming that this help stress test concurrent types.
util::num_cpus() * 2
}
};
let sleepers = SleeperList::new();
let work_queue = WorkQueue::new();
let mut handles = ~[];
let mut scheds = ~[];
for uint::range(0, nthreads) |_| {
let loop_ = ~UvEventLoop::new();
let mut sched = ~Scheduler::new(loop_, work_queue.clone(), sleepers.clone());
let handle = sched.make_handle();
handles.push(handle);
scheds.push(sched);
}
let f_cell = Cell::new(f_cell.take());
let handles = Cell::new(handles);
let mut new_task = ~Task::new_root();
let on_exit: ~fn(bool) = |exit_status| {
let mut handles = handles.take();
// Tell schedulers to exit
for handles.mut_iter().advance |handle| {
handle.send(Shutdown);
}
rtassert!(exit_status);
};
new_task.on_exit = Some(on_exit);
let main_task = ~Coroutine::with_task(&mut scheds[0].stack_pool,
new_task, f_cell.take());
scheds[0].enqueue_task(main_task);
let mut threads = ~[];
while !scheds.is_empty() {
let sched = scheds.pop();
let sched_cell = Cell::new(sched);
let thread = do Thread::start {
let sched = sched_cell.take();
sched.run();
};
threads.push(thread);
}
// Wait for schedulers
let _threads = threads;
}
}
// THIS IS AWFUL. Copy-pasted the above initialization function but
// with a number of hacks to make it spawn tasks on a variety of
// schedulers with a variety of homes using the new spawn.
pub fn run_in_mt_newsched_task_random_homed() {
use libc;
use os;
use from_str::FromStr;
use rt::uv::uvio::UvEventLoop;
use rt::sched::Shutdown;
do run_in_bare_thread {
let nthreads = match os::getenv("RUST_TEST_THREADS") {
Some(nstr) => FromStr::from_str(nstr).get(),
None => unsafe {
// Using more threads than cores in test code to force
// the OS to preempt them frequently. Assuming that
// this help stress test concurrent types.
rust_get_num_cpus() * 2
}
};
let sleepers = SleeperList::new();
let work_queue = WorkQueue::new();
let mut handles = ~[];
let mut scheds = ~[];
// create a few special schedulers, those with even indicies
// will be pinned-only
for uint::range(0, nthreads) |i| {
let special = (i % 2) == 0;
let loop_ = ~UvEventLoop::new();
let mut sched = ~Scheduler::new_special(
loop_, work_queue.clone(), sleepers.clone(), special);
let handle = sched.make_handle();
handles.push(handle);
scheds.push(sched);
}
// Schedule a pile o tasks
let n = 5*stress_factor();
for uint::range(0,n) |_i| {
rtdebug!("creating task: %u", _i);
let hf: ~fn() = || { assert!(true) };
spawntask_homed(&mut scheds, hf);
}
// Now we want another pile o tasks that do not ever run on a
// special scheduler, because they are normal tasks. Because
// we can we put these in the "main" task.
let n = 5*stress_factor();
let f: ~fn() = || {
for uint::range(0,n) |_| {
let f: ~fn() = || {
// Borrow the scheduler we run on and check if it is
// privileged.
do Local::borrow::<Scheduler,()> |sched| {
assert!(sched.run_anything);
};
};
spawntask_random(f);
};
};
let f_cell = Cell::new(f);
let handles = Cell::new(handles);
rtdebug!("creating main task");
let main_task = ~do Coroutine::new_root(&mut scheds[0].stack_pool) {
f_cell.take()();
let mut handles = handles.take();
// Tell schedulers to exit
for handles.mut_iter().advance |handle| {
handle.send(Shutdown);
}
};
rtdebug!("queuing main task")
scheds[0].enqueue_task(main_task);
let mut threads = ~[];
while !scheds.is_empty() {
let sched = scheds.pop();
let sched_cell = Cell::new(sched);
let thread = do Thread::start {
let sched = sched_cell.take();
rtdebug!("running sched: %u", sched.sched_id());
sched.run();
};
threads.push(thread);
}
rtdebug!("waiting on scheduler threads");
// Wait for schedulers
let _threads = threads;
}
extern {
fn rust_get_num_cpus() -> libc::uintptr_t;
}
}
/// Test tasks will abort on failure instead of unwinding
pub fn spawntask(f: ~fn()) {
use super::sched::*;
rtdebug!("spawntask taking the scheduler from TLS")
let task = do Local::borrow::<Task, ~Task>() |running_task| {
~running_task.new_child()
};
let mut sched = Local::take::<Scheduler>();
let task = ~Coroutine::with_task(&mut sched.stack_pool,
~Task::without_unwinding(),
f);
do sched.switch_running_tasks_and_then(task) |task| {
let task = Cell::new(task);
let sched = Local::take::<Scheduler>();
sched.schedule_new_task(task.take());
}
task, f);
rtdebug!("spawntask scheduling the new task");
sched.schedule_task(task);
}
/// Create a new task and run it right now. Aborts on failure
pub fn spawntask_immediately(f: ~fn()) {
use super::sched::*;
let task = do Local::borrow::<Task, ~Task>() |running_task| {
~running_task.new_child()
};
let mut sched = Local::take::<Scheduler>();
let task = ~Coroutine::with_task(&mut sched.stack_pool,
~Task::without_unwinding(),
f);
do sched.switch_running_tasks_and_then(task) |task| {
let task = Cell::new(task);
do Local::borrow::<Scheduler> |sched| {
sched.enqueue_task(task.take());
}
task, f);
do sched.switch_running_tasks_and_then(task) |sched, task| {
sched.enqueue_task(task);
}
}
@@ -72,10 +281,13 @@ pub fn spawntask_immediately(f: ~fn()) {
pub fn spawntask_later(f: ~fn()) {
use super::sched::*;
let task = do Local::borrow::<Task, ~Task>() |running_task| {
~running_task.new_child()
};
let mut sched = Local::take::<Scheduler>();
let task = ~Coroutine::with_task(&mut sched.stack_pool,
~Task::without_unwinding(),
f);
task, f);
sched.enqueue_task(task);
Local::put(sched);
@@ -86,20 +298,20 @@ pub fn spawntask_random(f: ~fn()) {
use super::sched::*;
use rand::{Rand, rng};
let mut rng = rng();
let run_now: bool = Rand::rand(&mut rng);
let task = do Local::borrow::<Task, ~Task>() |running_task| {
~running_task.new_child()
};
let mut sched = Local::take::<Scheduler>();
let task = ~Coroutine::with_task(&mut sched.stack_pool,
~Task::without_unwinding(),
f);
task, f);
let mut rng = rng();
let run_now: bool = Rand::rand(&mut rng);
if run_now {
do sched.switch_running_tasks_and_then(task) |task| {
let task = Cell::new(task);
do Local::borrow::<Scheduler> |sched| {
sched.enqueue_task(task.take());
}
do sched.switch_running_tasks_and_then(task) |sched, task| {
sched.enqueue_task(task);
}
} else {
sched.enqueue_task(task);
@@ -107,57 +319,75 @@ pub fn spawntask_random(f: ~fn()) {
}
}
/// Spawn a task, with the current scheduler as home, and queue it to
/// run later.
pub fn spawntask_homed(scheds: &mut ~[~Scheduler], f: ~fn()) {
use super::sched::*;
use rand::{rng, RngUtil};
let mut rng = rng();
let task = {
let sched = &mut scheds[rng.gen_int_range(0,scheds.len() as int)];
let handle = sched.make_handle();
let home_id = handle.sched_id;
// now that we know where this is going, build a new function
// that can assert it is in the right place
let af: ~fn() = || {
do Local::borrow::<Scheduler,()>() |sched| {
rtdebug!("home_id: %u, runtime loc: %u",
home_id,
sched.sched_id());
assert!(home_id == sched.sched_id());
};
f()
};
~Coroutine::with_task_homed(&mut sched.stack_pool,
~Task::new_root(),
af,
Sched(handle))
};
let dest_sched = &mut scheds[rng.gen_int_range(0,scheds.len() as int)];
// enqueue it for future execution
dest_sched.enqueue_task(task);
}
/// Spawn a task and wait for it to finish, returning whether it completed successfully or failed
pub fn spawntask_try(f: ~fn()) -> Result<(), ()> {
use cell::Cell;
use super::sched::*;
use task;
use unstable::finally::Finally;
// Our status variables will be filled in from the scheduler context
let mut failed = false;
let failed_ptr: *mut bool = &mut failed;
// Switch to the scheduler
let f = Cell::new(Cell::new(f));
let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then() |old_task| {
let old_task = Cell::new(old_task);
let f = f.take();
let (port, chan) = oneshot();
let chan = Cell::new(chan);
let mut new_task = ~Task::new_root();
let on_exit: ~fn(bool) = |exit_status| chan.take().send(exit_status);
new_task.on_exit = Some(on_exit);
let mut sched = Local::take::<Scheduler>();
let new_task = ~do Coroutine::new(&mut sched.stack_pool) {
do (|| {
(f.take())()
}).finally {
// Check for failure then resume the parent task
unsafe { *failed_ptr = task::failing(); }
let sched = Local::take::<Scheduler>();
do sched.switch_running_tasks_and_then(old_task.take()) |new_task| {
let new_task = Cell::new(new_task);
do Local::borrow::<Scheduler> |sched| {
sched.enqueue_task(new_task.take());
}
}
}
};
sched.resume_task_immediately(new_task);
let new_task = ~Coroutine::with_task(&mut sched.stack_pool,
new_task, f);
do sched.switch_running_tasks_and_then(new_task) |sched, old_task| {
sched.enqueue_task(old_task);
}
if !failed { Ok(()) } else { Err(()) }
let exit_status = port.recv();
if exit_status { Ok(()) } else { Err(()) }
}
// Spawn a new task in a new scheduler and return a thread handle.
pub fn spawntask_thread(f: ~fn()) -> Thread {
use rt::sched::*;
use rt::uv::uvio::UvEventLoop;
let task = do Local::borrow::<Task, ~Task>() |running_task| {
~running_task.new_child()
};
let task = Cell::new(task);
let f = Cell::new(f);
let thread = do Thread::start {
let mut sched = ~UvEventLoop::new_scheduler();
let mut sched = ~new_test_uv_sched();
let task = ~Coroutine::with_task(&mut sched.stack_pool,
~Task::without_unwinding(),
task.take(),
f.take());
sched.enqueue_task(task);
sched.run();

View File

@@ -72,7 +72,7 @@ impl<T> Tube<T> {
assert!(self.p.refcount() > 1); // There better be somebody to wake us up
assert!((*state).blocked_task.is_none());
let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then |task| {
do sched.deschedule_running_task_and_then |_, task| {
(*state).blocked_task = Some(task);
}
rtdebug!("waking after tube recv");
@@ -107,11 +107,10 @@ mod test {
let tube_clone = tube.clone();
let tube_clone_cell = Cell::new(tube_clone);
let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then |task| {
do sched.deschedule_running_task_and_then |sched, task| {
let mut tube_clone = tube_clone_cell.take();
tube_clone.send(1);
let sched = Local::take::<Scheduler>();
sched.resume_task_immediately(task);
sched.enqueue_task(task);
}
assert!(tube.recv() == 1);
@@ -123,21 +122,17 @@ mod test {
do run_in_newsched_task {
let mut tube: Tube<int> = Tube::new();
let tube_clone = tube.clone();
let tube_clone = Cell::new(Cell::new(Cell::new(tube_clone)));
let tube_clone = Cell::new(tube_clone);
let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then |task| {
let tube_clone = tube_clone.take();
do Local::borrow::<Scheduler> |sched| {
let tube_clone = tube_clone.take();
do sched.deschedule_running_task_and_then |sched, task| {
let tube_clone = Cell::new(tube_clone.take());
do sched.event_loop.callback {
let mut tube_clone = tube_clone.take();
// The task should be blocked on this now and
// sending will wake it up.
tube_clone.send(1);
}
}
let sched = Local::take::<Scheduler>();
sched.resume_task_immediately(task);
sched.enqueue_task(task);
}
assert!(tube.recv() == 1);
@@ -153,14 +148,14 @@ mod test {
let tube_clone = tube.clone();
let tube_clone = Cell::new(tube_clone);
let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then |task| {
do sched.deschedule_running_task_and_then |sched, task| {
callback_send(tube_clone.take(), 0);
fn callback_send(tube: Tube<int>, i: int) {
if i == 100 { return; }
let tube = Cell::new(Cell::new(tube));
do Local::borrow::<Scheduler> |sched| {
do Local::borrow::<Scheduler, ()> |sched| {
let tube = tube.take();
do sched.event_loop.callback {
let mut tube = tube.take();
@@ -172,8 +167,7 @@ mod test {
}
}
let sched = Local::take::<Scheduler>();
sched.resume_task_immediately(task);
sched.enqueue_task(task);
}
for int::range(0, MAX) |i| {

87
src/libstd/rt/util.rs Normal file
View File

@@ -0,0 +1,87 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use container::Container;
use iterator::IteratorUtil;
use libc;
use str::StrSlice;
/// Get the number of cores available
pub fn num_cpus() -> uint {
unsafe {
return rust_get_num_cpus();
}
extern {
fn rust_get_num_cpus() -> libc::uintptr_t;
}
}
pub fn dumb_println(s: &str) {
use io::WriterUtil;
let dbg = ::libc::STDERR_FILENO as ::io::fd_t;
dbg.write_str(s);
dbg.write_str("\n");
}
pub fn abort(msg: &str) -> ! {
let msg = if !msg.is_empty() { msg } else { "aborted" };
let hash = msg.iter().fold(0, |accum, val| accum + (val as uint) );
let quote = match hash % 10 {
0 => "
It was from the artists and poets that the pertinent answers came, and I
know that panic would have broken loose had they been able to compare notes.
As it was, lacking their original letters, I half suspected the compiler of
having asked leading questions, or of having edited the correspondence in
corroboration of what he had latently resolved to see.",
1 => "
There are not many persons who know what wonders are opened to them in the
stories and visions of their youth; for when as children we listen and dream,
we think but half-formed thoughts, and when as men we try to remember, we are
dulled and prosaic with the poison of life. But some of us awake in the night
with strange phantasms of enchanted hills and gardens, of fountains that sing
in the sun, of golden cliffs overhanging murmuring seas, of plains that stretch
down to sleeping cities of bronze and stone, and of shadowy companies of heroes
that ride caparisoned white horses along the edges of thick forests; and then
we know that we have looked back through the ivory gates into that world of
wonder which was ours before we were wise and unhappy.",
2 => "
Instead of the poems I had hoped for, there came only a shuddering blackness
and ineffable loneliness; and I saw at last a fearful truth which no one had
ever dared to breathe before — the unwhisperable secret of secrets — The fact
that this city of stone and stridor is not a sentient perpetuation of Old New
York as London is of Old London and Paris of Old Paris, but that it is in fact
quite dead, its sprawling body imperfectly embalmed and infested with queer
animate things which have nothing to do with it as it was in life.",
3 => "
The ocean ate the last of the land and poured into the smoking gulf, thereby
giving up all it had ever conquered. From the new-flooded lands it flowed
again, uncovering death and decay; and from its ancient and immemorial bed it
trickled loathsomely, uncovering nighted secrets of the years when Time was
young and the gods unborn. Above the waves rose weedy remembered spires. The
moon laid pale lilies of light on dead London, and Paris stood up from its damp
grave to be sanctified with star-dust. Then rose spires and monoliths that were
weedy but not remembered; terrible spires and monoliths of lands that men never
knew were lands...",
4 => "
There was a night when winds from unknown spaces whirled us irresistibly into
limitless vacum beyond all thought and entity. Perceptions of the most
maddeningly untransmissible sort thronged upon us; perceptions of infinity
which at the time convulsed us with joy, yet which are now partly lost to my
memory and partly incapable of presentation to others.",
_ => "You've met with a terrible fate, haven't you?"
};
rterrln!("%s", "");
rterrln!("%s", quote);
rterrln!("%s", "");
rterrln!("fatal runtime error: %s", msg);
unsafe { libc::abort(); }
}

105
src/libstd/rt/uv/async.rs Normal file
View File

@@ -0,0 +1,105 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use libc::{c_int, c_void};
use option::Some;
use rt::uv::uvll;
use rt::uv::uvll::UV_ASYNC;
use rt::uv::{Watcher, Loop, NativeHandle, AsyncCallback, NullCallback};
use rt::uv::WatcherInterop;
use rt::uv::status_to_maybe_uv_error;
pub struct AsyncWatcher(*uvll::uv_async_t);
impl Watcher for AsyncWatcher { }
impl AsyncWatcher {
pub fn new(loop_: &mut Loop, cb: AsyncCallback) -> AsyncWatcher {
unsafe {
let handle = uvll::malloc_handle(UV_ASYNC);
assert!(handle.is_not_null());
let mut watcher: AsyncWatcher = NativeHandle::from_native_handle(handle);
watcher.install_watcher_data();
let data = watcher.get_watcher_data();
data.async_cb = Some(cb);
assert_eq!(0, uvll::async_init(loop_.native_handle(), handle, async_cb));
return watcher;
}
extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) {
let mut watcher: AsyncWatcher = NativeHandle::from_native_handle(handle);
let status = status_to_maybe_uv_error(watcher.native_handle(), status);
let data = watcher.get_watcher_data();
let cb = data.async_cb.get_ref();
(*cb)(watcher, status);
}
}
pub fn send(&mut self) {
unsafe {
let handle = self.native_handle();
uvll::async_send(handle);
}
}
pub fn close(self, cb: NullCallback) {
let mut this = self;
let data = this.get_watcher_data();
assert!(data.close_cb.is_none());
data.close_cb = Some(cb);
unsafe {
uvll::close(self.native_handle(), close_cb);
}
extern fn close_cb(handle: *uvll::uv_stream_t) {
let mut watcher: AsyncWatcher = NativeHandle::from_native_handle(handle);
{
let data = watcher.get_watcher_data();
data.close_cb.swap_unwrap()();
}
watcher.drop_watcher_data();
unsafe { uvll::free_handle(handle as *c_void); }
}
}
}
impl NativeHandle<*uvll::uv_async_t> for AsyncWatcher {
fn from_native_handle(handle: *uvll::uv_async_t) -> AsyncWatcher {
AsyncWatcher(handle)
}
fn native_handle(&self) -> *uvll::uv_async_t {
match self { &AsyncWatcher(ptr) => ptr }
}
}
#[cfg(test)]
mod test {
use super::*;
use rt::uv::Loop;
use unstable::run_in_bare_thread;
use rt::thread::Thread;
use cell::Cell;
#[test]
fn smoke_test() {
do run_in_bare_thread {
let mut loop_ = Loop::new();
let watcher = AsyncWatcher::new(&mut loop_, |w, _| w.close(||()) );
let watcher_cell = Cell::new(watcher);
let _thread = do Thread::start {
let mut watcher = watcher_cell.take();
watcher.send();
};
loop_.run();
loop_.close();
}
}
}

View File

@@ -90,3 +90,65 @@ impl NativeHandle<*uvll::uv_idle_t> for IdleWatcher {
match self { &IdleWatcher(ptr) => ptr }
}
}
#[cfg(test)]
mod test {
use rt::uv::Loop;
use super::*;
use unstable::run_in_bare_thread;
#[test]
#[ignore(reason = "valgrind - loop destroyed before watcher?")]
fn idle_new_then_close() {
do run_in_bare_thread {
let mut loop_ = Loop::new();
let idle_watcher = { IdleWatcher::new(&mut loop_) };
idle_watcher.close(||());
}
}
#[test]
fn idle_smoke_test() {
do run_in_bare_thread {
let mut loop_ = Loop::new();
let mut idle_watcher = { IdleWatcher::new(&mut loop_) };
let mut count = 10;
let count_ptr: *mut int = &mut count;
do idle_watcher.start |idle_watcher, status| {
let mut idle_watcher = idle_watcher;
assert!(status.is_none());
if unsafe { *count_ptr == 10 } {
idle_watcher.stop();
idle_watcher.close(||());
} else {
unsafe { *count_ptr = *count_ptr + 1; }
}
}
loop_.run();
loop_.close();
assert_eq!(count, 10);
}
}
#[test]
fn idle_start_stop_start() {
do run_in_bare_thread {
let mut loop_ = Loop::new();
let mut idle_watcher = { IdleWatcher::new(&mut loop_) };
do idle_watcher.start |idle_watcher, status| {
let mut idle_watcher = idle_watcher;
assert!(status.is_none());
idle_watcher.stop();
do idle_watcher.start |idle_watcher, status| {
assert!(status.is_none());
let mut idle_watcher = idle_watcher;
idle_watcher.stop();
idle_watcher.close(||());
}
}
loop_.run();
loop_.close();
}
}
}

View File

@@ -55,6 +55,7 @@ pub use self::file::FsRequest;
pub use self::net::{StreamWatcher, TcpWatcher};
pub use self::idle::IdleWatcher;
pub use self::timer::TimerWatcher;
pub use self::async::AsyncWatcher;
/// The implementation of `rtio` for libuv
pub mod uvio;
@@ -66,6 +67,7 @@ pub mod file;
pub mod net;
pub mod idle;
pub mod timer;
pub mod async;
/// XXX: Loop(*handle) is buggy with destructors. Normal structs
/// with dtors may not be destructured, but tuple structs can,
@@ -123,6 +125,7 @@ pub type IdleCallback = ~fn(IdleWatcher, Option<UvError>);
pub type ConnectionCallback = ~fn(StreamWatcher, Option<UvError>);
pub type FsCallback = ~fn(FsRequest, Option<UvError>);
pub type TimerCallback = ~fn(TimerWatcher, Option<UvError>);
pub type AsyncCallback = ~fn(AsyncWatcher, Option<UvError>);
/// Callbacks used by StreamWatchers, set as custom data on the foreign handle
@@ -133,7 +136,8 @@ struct WatcherData {
close_cb: Option<NullCallback>,
alloc_cb: Option<AllocCallback>,
idle_cb: Option<IdleCallback>,
timer_cb: Option<TimerCallback>
timer_cb: Option<TimerCallback>,
async_cb: Option<AsyncCallback>
}
pub trait WatcherInterop {
@@ -162,7 +166,8 @@ impl<H, W: Watcher + NativeHandle<*H>> WatcherInterop for W {
close_cb: None,
alloc_cb: None,
idle_cb: None,
timer_cb: None
timer_cb: None,
async_cb: None
};
let data = transmute::<~WatcherData, *c_void>(data);
uvll::set_data_for_uv_handle(self.native_handle(), data);
@@ -347,57 +352,3 @@ fn loop_smoke_test() {
loop_.close();
}
}
#[test]
#[ignore(reason = "valgrind - loop destroyed before watcher?")]
fn idle_new_then_close() {
do run_in_bare_thread {
let mut loop_ = Loop::new();
let idle_watcher = { IdleWatcher::new(&mut loop_) };
idle_watcher.close(||());
}
}
#[test]
fn idle_smoke_test() {
do run_in_bare_thread {
let mut loop_ = Loop::new();
let mut idle_watcher = { IdleWatcher::new(&mut loop_) };
let mut count = 10;
let count_ptr: *mut int = &mut count;
do idle_watcher.start |idle_watcher, status| {
let mut idle_watcher = idle_watcher;
assert!(status.is_none());
if unsafe { *count_ptr == 10 } {
idle_watcher.stop();
idle_watcher.close(||());
} else {
unsafe { *count_ptr = *count_ptr + 1; }
}
}
loop_.run();
loop_.close();
assert_eq!(count, 10);
}
}
#[test]
fn idle_start_stop_start() {
do run_in_bare_thread {
let mut loop_ = Loop::new();
let mut idle_watcher = { IdleWatcher::new(&mut loop_) };
do idle_watcher.start |idle_watcher, status| {
let mut idle_watcher = idle_watcher;
assert!(status.is_none());
idle_watcher.stop();
do idle_watcher.start |idle_watcher, status| {
assert!(status.is_none());
let mut idle_watcher = idle_watcher;
idle_watcher.stop();
idle_watcher.close(||());
}
}
loop_.run();
loop_.close();
}
}

View File

@@ -12,6 +12,7 @@ use option::*;
use result::*;
use ops::Drop;
use cell::Cell;
use cast;
use cast::transmute;
use clone::Clone;
use rt::io::IoError;
@@ -23,6 +24,7 @@ use rt::sched::Scheduler;
use rt::io::{standard_error, OtherIoError};
use rt::tube::Tube;
use rt::local::Local;
use unstable::sync::{Exclusive, exclusive};
#[cfg(test)] use container::Container;
#[cfg(test)] use uint;
@@ -39,11 +41,6 @@ impl UvEventLoop {
uvio: UvIoFactory(Loop::new())
}
}
/// A convenience constructor
pub fn new_scheduler() -> Scheduler {
Scheduler::new(~UvEventLoop::new())
}
}
impl Drop for UvEventLoop {
@@ -81,6 +78,10 @@ impl EventLoop for UvEventLoop {
}
}
fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallbackObject {
~UvRemoteCallback::new(self.uvio.uv_loop(), f)
}
fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject> {
Some(&mut self.uvio)
}
@@ -100,6 +101,89 @@ fn test_callback_run_once() {
}
}
pub struct UvRemoteCallback {
// The uv async handle for triggering the callback
async: AsyncWatcher,
// A flag to tell the callback to exit, set from the dtor. This is
// almost never contested - only in rare races with the dtor.
exit_flag: Exclusive<bool>
}
impl UvRemoteCallback {
pub fn new(loop_: &mut Loop, f: ~fn()) -> UvRemoteCallback {
let exit_flag = exclusive(false);
let exit_flag_clone = exit_flag.clone();
let async = do AsyncWatcher::new(loop_) |watcher, status| {
assert!(status.is_none());
f();
unsafe {
do exit_flag_clone.with_imm |&should_exit| {
if should_exit {
watcher.close(||());
}
}
}
};
UvRemoteCallback {
async: async,
exit_flag: exit_flag
}
}
}
impl RemoteCallback for UvRemoteCallback {
fn fire(&mut self) { self.async.send() }
}
impl Drop for UvRemoteCallback {
fn finalize(&self) {
unsafe {
let this: &mut UvRemoteCallback = cast::transmute_mut(self);
do this.exit_flag.with |should_exit| {
// NB: These two things need to happen atomically. Otherwise
// the event handler could wake up due to a *previous*
// signal and see the exit flag, destroying the handle
// before the final send.
*should_exit = true;
this.async.send();
}
}
}
}
#[cfg(test)]
mod test_remote {
use cell::Cell;
use rt::test::*;
use rt::thread::Thread;
use rt::tube::Tube;
use rt::rtio::EventLoop;
use rt::local::Local;
use rt::sched::Scheduler;
#[test]
fn test_uv_remote() {
do run_in_newsched_task {
let mut tube = Tube::new();
let tube_clone = tube.clone();
let remote_cell = Cell::new_empty();
do Local::borrow::<Scheduler, ()>() |sched| {
let tube_clone = tube_clone.clone();
let tube_clone_cell = Cell::new(tube_clone);
let remote = do sched.event_loop.remote_callback {
tube_clone_cell.take().send(1);
};
remote_cell.put_back(remote);
}
let _thread = do Thread::start {
remote_cell.take().fire();
};
assert!(tube.recv() == 1);
}
}
}
pub struct UvIoFactory(Loop);
impl UvIoFactory {
@@ -122,12 +206,10 @@ impl IoFactory for UvIoFactory {
assert!(scheduler.in_task_context());
// Block this task and take ownership, switch to scheduler context
do scheduler.deschedule_running_task_and_then |task| {
do scheduler.deschedule_running_task_and_then |sched, task| {
rtdebug!("connect: entered scheduler context");
do Local::borrow::<Scheduler> |scheduler| {
assert!(!scheduler.in_task_context());
}
assert!(!sched.in_task_context());
let mut tcp_watcher = TcpWatcher::new(self.uv_loop());
let task_cell = Cell::new(task);
@@ -167,7 +249,7 @@ impl IoFactory for UvIoFactory {
Ok(_) => Ok(~UvTcpListener::new(watcher)),
Err(uverr) => {
let scheduler = Local::take::<Scheduler>();
do scheduler.deschedule_running_task_and_then |task| {
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do watcher.as_stream().close {
let scheduler = Local::take::<Scheduler>();
@@ -203,7 +285,7 @@ impl Drop for UvTcpListener {
fn finalize(&self) {
let watcher = self.watcher();
let scheduler = Local::take::<Scheduler>();
do scheduler.deschedule_running_task_and_then |task| {
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do watcher.as_stream().close {
let scheduler = Local::take::<Scheduler>();
@@ -265,7 +347,7 @@ impl Drop for UvTcpStream {
rtdebug!("closing tcp stream");
let watcher = self.watcher();
let scheduler = Local::take::<Scheduler>();
do scheduler.deschedule_running_task_and_then |task| {
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do watcher.close {
let scheduler = Local::take::<Scheduler>();
@@ -284,11 +366,9 @@ impl RtioTcpStream for UvTcpStream {
assert!(scheduler.in_task_context());
let watcher = self.watcher();
let buf_ptr: *&mut [u8] = &buf;
do scheduler.deschedule_running_task_and_then |task| {
do scheduler.deschedule_running_task_and_then |sched, task| {
rtdebug!("read: entered scheduler context");
do Local::borrow::<Scheduler> |scheduler| {
assert!(!scheduler.in_task_context());
}
assert!(!sched.in_task_context());
let mut watcher = watcher;
let task_cell = Cell::new(task);
// XXX: We shouldn't reallocate these callbacks every
@@ -330,7 +410,7 @@ impl RtioTcpStream for UvTcpStream {
assert!(scheduler.in_task_context());
let watcher = self.watcher();
let buf_ptr: *&[u8] = &buf;
do scheduler.deschedule_running_task_and_then |task| {
do scheduler.deschedule_running_task_and_then |_, task| {
let mut watcher = watcher;
let task_cell = Cell::new(task);
let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
@@ -424,11 +504,9 @@ fn test_read_and_block() {
// Yield to the other task in hopes that it
// will trigger a read callback while we are
// not ready for it
do scheduler.deschedule_running_task_and_then |task| {
do scheduler.deschedule_running_task_and_then |sched, task| {
let task = Cell::new(task);
do Local::borrow::<Scheduler> |scheduler| {
scheduler.enqueue_task(task.take());
}
sched.enqueue_task(task.take());
}
}

View File

@@ -1,443 +0,0 @@
// Copyright 2012 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
/*!
* Low-level bindings to the libuv library.
*
* This module contains a set of direct, 'bare-metal' wrappers around
* the libuv C-API.
*
* We're not bothering yet to redefine uv's structs as Rust structs
* because they are quite large and change often between versions.
* The maintenance burden is just too high. Instead we use the uv's
* `uv_handle_size` and `uv_req_size` to find the correct size of the
* structs and allocate them on the heap. This can be revisited later.
*
* There are also a collection of helper functions to ease interacting
* with the low-level API.
*
* As new functionality, existant in uv.h, is added to the rust stdlib,
* the mappings should be added in this module.
*/
#[allow(non_camel_case_types)]; // C types
use libc::{size_t, c_int, c_uint, c_void, c_char, uintptr_t};
use libc::{malloc, free};
use prelude::*;
pub struct uv_err_t {
code: c_int,
sys_errno_: c_int
}
pub struct uv_buf_t {
base: *u8,
len: libc::size_t,
}
pub type uv_handle_t = c_void;
pub type uv_loop_t = c_void;
pub type uv_idle_t = c_void;
pub type uv_tcp_t = c_void;
pub type uv_connect_t = c_void;
pub type uv_write_t = c_void;
pub type uv_async_t = c_void;
pub type uv_timer_t = c_void;
pub type uv_stream_t = c_void;
pub type uv_fs_t = c_void;
pub type uv_idle_cb = *u8;
pub type sockaddr_in = c_void;
pub type sockaddr_in6 = c_void;
#[deriving(Eq)]
pub enum uv_handle_type {
UV_UNKNOWN_HANDLE,
UV_ASYNC,
UV_CHECK,
UV_FS_EVENT,
UV_FS_POLL,
UV_HANDLE,
UV_IDLE,
UV_NAMED_PIPE,
UV_POLL,
UV_PREPARE,
UV_PROCESS,
UV_STREAM,
UV_TCP,
UV_TIMER,
UV_TTY,
UV_UDP,
UV_SIGNAL,
UV_FILE,
UV_HANDLE_TYPE_MAX
}
#[deriving(Eq)]
pub enum uv_req_type {
UV_UNKNOWN_REQ,
UV_REQ,
UV_CONNECT,
UV_WRITE,
UV_SHUTDOWN,
UV_UDP_SEND,
UV_FS,
UV_WORK,
UV_GETADDRINFO,
UV_REQ_TYPE_MAX
}
pub unsafe fn malloc_handle(handle: uv_handle_type) -> *c_void {
assert!(handle != UV_UNKNOWN_HANDLE && handle != UV_HANDLE_TYPE_MAX);
let size = rust_uv_handle_size(handle as uint);
let p = malloc(size);
assert!(p.is_not_null());
return p;
}
pub unsafe fn free_handle(v: *c_void) {
free(v)
}
pub unsafe fn malloc_req(req: uv_req_type) -> *c_void {
assert!(req != UV_UNKNOWN_REQ && req != UV_REQ_TYPE_MAX);
let size = rust_uv_req_size(req as uint);
let p = malloc(size);
assert!(p.is_not_null());
return p;
}
pub unsafe fn free_req(v: *c_void) {
free(v)
}
#[test]
fn handle_sanity_check() {
unsafe {
assert!(UV_HANDLE_TYPE_MAX as uint == rust_uv_handle_type_max());
}
}
#[test]
fn request_sanity_check() {
unsafe {
assert!(UV_REQ_TYPE_MAX as uint == rust_uv_req_type_max());
}
}
pub unsafe fn loop_new() -> *c_void {
return rust_uv_loop_new();
}
pub unsafe fn loop_delete(loop_handle: *c_void) {
rust_uv_loop_delete(loop_handle);
}
pub unsafe fn run(loop_handle: *c_void) {
rust_uv_run(loop_handle);
}
pub unsafe fn close<T>(handle: *T, cb: *u8) {
rust_uv_close(handle as *c_void, cb);
}
pub unsafe fn walk(loop_handle: *c_void, cb: *u8, arg: *c_void) {
rust_uv_walk(loop_handle, cb, arg);
}
pub unsafe fn idle_new() -> *uv_idle_t {
rust_uv_idle_new()
}
pub unsafe fn idle_delete(handle: *uv_idle_t) {
rust_uv_idle_delete(handle)
}
pub unsafe fn idle_init(loop_handle: *uv_loop_t, handle: *uv_idle_t) -> c_int {
rust_uv_idle_init(loop_handle, handle)
}
pub unsafe fn idle_start(handle: *uv_idle_t, cb: uv_idle_cb) -> c_int {
rust_uv_idle_start(handle, cb)
}
pub unsafe fn idle_stop(handle: *uv_idle_t) -> c_int {
rust_uv_idle_stop(handle)
}
pub unsafe fn tcp_init(loop_handle: *c_void, handle: *uv_tcp_t) -> c_int {
return rust_uv_tcp_init(loop_handle, handle);
}
// FIXME ref #2064
pub unsafe fn tcp_connect(connect_ptr: *uv_connect_t,
tcp_handle_ptr: *uv_tcp_t,
addr_ptr: *sockaddr_in,
after_connect_cb: *u8) -> c_int {
return rust_uv_tcp_connect(connect_ptr, tcp_handle_ptr,
after_connect_cb, addr_ptr);
}
// FIXME ref #2064
pub unsafe fn tcp_connect6(connect_ptr: *uv_connect_t,
tcp_handle_ptr: *uv_tcp_t,
addr_ptr: *sockaddr_in6,
after_connect_cb: *u8) -> c_int {
return rust_uv_tcp_connect6(connect_ptr, tcp_handle_ptr,
after_connect_cb, addr_ptr);
}
// FIXME ref #2064
pub unsafe fn tcp_bind(tcp_server_ptr: *uv_tcp_t, addr_ptr: *sockaddr_in) -> c_int {
return rust_uv_tcp_bind(tcp_server_ptr, addr_ptr);
}
// FIXME ref #2064
pub unsafe fn tcp_bind6(tcp_server_ptr: *uv_tcp_t, addr_ptr: *sockaddr_in6) -> c_int {
return rust_uv_tcp_bind6(tcp_server_ptr, addr_ptr);
}
pub unsafe fn tcp_getpeername(tcp_handle_ptr: *uv_tcp_t, name: *sockaddr_in) -> c_int {
return rust_uv_tcp_getpeername(tcp_handle_ptr, name);
}
pub unsafe fn tcp_getpeername6(tcp_handle_ptr: *uv_tcp_t, name: *sockaddr_in6) ->c_int {
return rust_uv_tcp_getpeername6(tcp_handle_ptr, name);
}
pub unsafe fn listen<T>(stream: *T, backlog: c_int, cb: *u8) -> c_int {
return rust_uv_listen(stream as *c_void, backlog, cb);
}
pub unsafe fn accept(server: *c_void, client: *c_void) -> c_int {
return rust_uv_accept(server as *c_void, client as *c_void);
}
pub unsafe fn write<T>(req: *uv_write_t, stream: *T, buf_in: &[uv_buf_t], cb: *u8) -> c_int {
let buf_ptr = vec::raw::to_ptr(buf_in);
let buf_cnt = buf_in.len() as i32;
return rust_uv_write(req as *c_void, stream as *c_void, buf_ptr, buf_cnt, cb);
}
pub unsafe fn read_start(stream: *uv_stream_t, on_alloc: *u8, on_read: *u8) -> c_int {
return rust_uv_read_start(stream as *c_void, on_alloc, on_read);
}
pub unsafe fn read_stop(stream: *uv_stream_t) -> c_int {
return rust_uv_read_stop(stream as *c_void);
}
pub unsafe fn last_error(loop_handle: *c_void) -> uv_err_t {
return rust_uv_last_error(loop_handle);
}
pub unsafe fn strerror(err: *uv_err_t) -> *c_char {
return rust_uv_strerror(err);
}
pub unsafe fn err_name(err: *uv_err_t) -> *c_char {
return rust_uv_err_name(err);
}
pub unsafe fn async_init(loop_handle: *c_void, async_handle: *uv_async_t, cb: *u8) -> c_int {
return rust_uv_async_init(loop_handle, async_handle, cb);
}
pub unsafe fn async_send(async_handle: *uv_async_t) {
return rust_uv_async_send(async_handle);
}
pub unsafe fn buf_init(input: *u8, len: uint) -> uv_buf_t {
let out_buf = uv_buf_t { base: ptr::null(), len: 0 as size_t };
let out_buf_ptr = ptr::to_unsafe_ptr(&out_buf);
rust_uv_buf_init(out_buf_ptr, input, len as size_t);
return out_buf;
}
pub unsafe fn timer_init(loop_ptr: *c_void, timer_ptr: *uv_timer_t) -> c_int {
return rust_uv_timer_init(loop_ptr, timer_ptr);
}
pub unsafe fn timer_start(timer_ptr: *uv_timer_t, cb: *u8, timeout: uint,
repeat: uint) -> c_int {
return rust_uv_timer_start(timer_ptr, cb, timeout as c_uint, repeat as c_uint);
}
pub unsafe fn timer_stop(timer_ptr: *uv_timer_t) -> c_int {
return rust_uv_timer_stop(timer_ptr);
}
pub unsafe fn malloc_ip4_addr(ip: &str, port: int) -> *sockaddr_in {
do str::as_c_str(ip) |ip_buf| {
rust_uv_ip4_addrp(ip_buf as *u8, port as libc::c_int)
}
}
pub unsafe fn malloc_ip6_addr(ip: &str, port: int) -> *sockaddr_in6 {
do str::as_c_str(ip) |ip_buf| {
rust_uv_ip6_addrp(ip_buf as *u8, port as libc::c_int)
}
}
pub unsafe fn free_ip4_addr(addr: *sockaddr_in) {
rust_uv_free_ip4_addr(addr);
}
pub unsafe fn free_ip6_addr(addr: *sockaddr_in6) {
rust_uv_free_ip6_addr(addr);
}
// data access helpers
pub unsafe fn get_loop_for_uv_handle<T>(handle: *T) -> *c_void {
return rust_uv_get_loop_for_uv_handle(handle as *c_void);
}
pub unsafe fn get_stream_handle_from_connect_req(connect: *uv_connect_t) -> *uv_stream_t {
return rust_uv_get_stream_handle_from_connect_req(connect);
}
pub unsafe fn get_stream_handle_from_write_req(write_req: *uv_write_t) -> *uv_stream_t {
return rust_uv_get_stream_handle_from_write_req(write_req);
}
pub unsafe fn get_data_for_uv_loop(loop_ptr: *c_void) -> *c_void {
rust_uv_get_data_for_uv_loop(loop_ptr)
}
pub unsafe fn set_data_for_uv_loop(loop_ptr: *c_void, data: *c_void) {
rust_uv_set_data_for_uv_loop(loop_ptr, data);
}
pub unsafe fn get_data_for_uv_handle<T>(handle: *T) -> *c_void {
return rust_uv_get_data_for_uv_handle(handle as *c_void);
}
pub unsafe fn set_data_for_uv_handle<T, U>(handle: *T, data: *U) {
rust_uv_set_data_for_uv_handle(handle as *c_void, data as *c_void);
}
pub unsafe fn get_data_for_req<T>(req: *T) -> *c_void {
return rust_uv_get_data_for_req(req as *c_void);
}
pub unsafe fn set_data_for_req<T, U>(req: *T, data: *U) {
rust_uv_set_data_for_req(req as *c_void, data as *c_void);
}
pub unsafe fn get_base_from_buf(buf: uv_buf_t) -> *u8 {
return rust_uv_get_base_from_buf(buf);
}
pub unsafe fn get_len_from_buf(buf: uv_buf_t) -> size_t {
return rust_uv_get_len_from_buf(buf);
}
pub unsafe fn malloc_buf_base_of(suggested_size: size_t) -> *u8 {
return rust_uv_malloc_buf_base_of(suggested_size);
}
pub unsafe fn free_base_of_buf(buf: uv_buf_t) {
rust_uv_free_base_of_buf(buf);
}
pub unsafe fn get_last_err_info(uv_loop: *c_void) -> ~str {
let err = last_error(uv_loop);
let err_ptr = ptr::to_unsafe_ptr(&err);
let err_name = str::raw::from_c_str(err_name(err_ptr));
let err_msg = str::raw::from_c_str(strerror(err_ptr));
return fmt!("LIBUV ERROR: name: %s msg: %s",
err_name, err_msg);
}
pub unsafe fn get_last_err_data(uv_loop: *c_void) -> uv_err_data {
let err = last_error(uv_loop);
let err_ptr = ptr::to_unsafe_ptr(&err);
let err_name = str::raw::from_c_str(err_name(err_ptr));
let err_msg = str::raw::from_c_str(strerror(err_ptr));
uv_err_data { err_name: err_name, err_msg: err_msg }
}
pub struct uv_err_data {
err_name: ~str,
err_msg: ~str,
}
extern {
fn rust_uv_handle_size(type_: uintptr_t) -> size_t;
fn rust_uv_req_size(type_: uintptr_t) -> size_t;
fn rust_uv_handle_type_max() -> uintptr_t;
fn rust_uv_req_type_max() -> uintptr_t;
// libuv public API
fn rust_uv_loop_new() -> *c_void;
fn rust_uv_loop_delete(lp: *c_void);
fn rust_uv_run(loop_handle: *c_void);
fn rust_uv_close(handle: *c_void, cb: *u8);
fn rust_uv_walk(loop_handle: *c_void, cb: *u8, arg: *c_void);
fn rust_uv_idle_new() -> *uv_idle_t;
fn rust_uv_idle_delete(handle: *uv_idle_t);
fn rust_uv_idle_init(loop_handle: *uv_loop_t, handle: *uv_idle_t) -> c_int;
fn rust_uv_idle_start(handle: *uv_idle_t, cb: uv_idle_cb) -> c_int;
fn rust_uv_idle_stop(handle: *uv_idle_t) -> c_int;
fn rust_uv_async_send(handle: *uv_async_t);
fn rust_uv_async_init(loop_handle: *c_void,
async_handle: *uv_async_t,
cb: *u8) -> c_int;
fn rust_uv_tcp_init(loop_handle: *c_void, handle_ptr: *uv_tcp_t) -> c_int;
// FIXME ref #2604 .. ?
fn rust_uv_buf_init(out_buf: *uv_buf_t, base: *u8, len: size_t);
fn rust_uv_last_error(loop_handle: *c_void) -> uv_err_t;
// FIXME ref #2064
fn rust_uv_strerror(err: *uv_err_t) -> *c_char;
// FIXME ref #2064
fn rust_uv_err_name(err: *uv_err_t) -> *c_char;
fn rust_uv_ip4_addrp(ip: *u8, port: c_int) -> *sockaddr_in;
fn rust_uv_ip6_addrp(ip: *u8, port: c_int) -> *sockaddr_in6;
fn rust_uv_free_ip4_addr(addr: *sockaddr_in);
fn rust_uv_free_ip6_addr(addr: *sockaddr_in6);
fn rust_uv_ip4_name(src: *sockaddr_in, dst: *u8, size: size_t) -> c_int;
fn rust_uv_ip6_name(src: *sockaddr_in6, dst: *u8, size: size_t) -> c_int;
fn rust_uv_ip4_port(src: *sockaddr_in) -> c_uint;
fn rust_uv_ip6_port(src: *sockaddr_in6) -> c_uint;
// FIXME ref #2064
fn rust_uv_tcp_connect(connect_ptr: *uv_connect_t,
tcp_handle_ptr: *uv_tcp_t,
after_cb: *u8,
addr: *sockaddr_in) -> c_int;
// FIXME ref #2064
fn rust_uv_tcp_bind(tcp_server: *uv_tcp_t, addr: *sockaddr_in) -> c_int;
// FIXME ref #2064
fn rust_uv_tcp_connect6(connect_ptr: *uv_connect_t,
tcp_handle_ptr: *uv_tcp_t,
after_cb: *u8,
addr: *sockaddr_in6) -> c_int;
// FIXME ref #2064
fn rust_uv_tcp_bind6(tcp_server: *uv_tcp_t, addr: *sockaddr_in6) -> c_int;
fn rust_uv_tcp_getpeername(tcp_handle_ptr: *uv_tcp_t,
name: *sockaddr_in) -> c_int;
fn rust_uv_tcp_getpeername6(tcp_handle_ptr: *uv_tcp_t,
name: *sockaddr_in6) ->c_int;
fn rust_uv_listen(stream: *c_void, backlog: c_int, cb: *u8) -> c_int;
fn rust_uv_accept(server: *c_void, client: *c_void) -> c_int;
fn rust_uv_write(req: *c_void,
stream: *c_void,
buf_in: *uv_buf_t,
buf_cnt: c_int,
cb: *u8) -> c_int;
fn rust_uv_read_start(stream: *c_void,
on_alloc: *u8,
on_read: *u8) -> c_int;
fn rust_uv_read_stop(stream: *c_void) -> c_int;
fn rust_uv_timer_init(loop_handle: *c_void,
timer_handle: *uv_timer_t) -> c_int;
fn rust_uv_timer_start(timer_handle: *uv_timer_t,
cb: *u8,
timeout: c_uint,
repeat: c_uint) -> c_int;
fn rust_uv_timer_stop(handle: *uv_timer_t) -> c_int;
fn rust_uv_malloc_buf_base_of(sug_size: size_t) -> *u8;
fn rust_uv_free_base_of_buf(buf: uv_buf_t);
fn rust_uv_get_stream_handle_from_connect_req(connect_req: *uv_connect_t) -> *uv_stream_t;
fn rust_uv_get_stream_handle_from_write_req(write_req: *uv_write_t) -> *uv_stream_t;
fn rust_uv_get_loop_for_uv_handle(handle: *c_void) -> *c_void;
fn rust_uv_get_data_for_uv_loop(loop_ptr: *c_void) -> *c_void;
fn rust_uv_set_data_for_uv_loop(loop_ptr: *c_void, data: *c_void);
fn rust_uv_get_data_for_uv_handle(handle: *c_void) -> *c_void;
fn rust_uv_set_data_for_uv_handle(handle: *c_void, data: *c_void);
fn rust_uv_get_data_for_req(req: *c_void) -> *c_void;
fn rust_uv_set_data_for_req(req: *c_void, data: *c_void);
fn rust_uv_get_base_from_buf(buf: uv_buf_t) -> *u8;
fn rust_uv_get_len_from_buf(buf: uv_buf_t) -> size_t;
}

View File

@@ -180,10 +180,13 @@ impl FailWithCause for &'static str {
// FIXME #4427: Temporary until rt::rt_fail_ goes away
pub fn begin_unwind_(msg: *c_char, file: *c_char, line: size_t) -> ! {
use cell::Cell;
use option::Option;
use either::Left;
use rt::{context, OldTaskContext, TaskContext};
use rt::task::{Task, Unwinder};
use rt::local::Local;
use rt::logging::Logger;
let context = context();
match context {
@@ -200,12 +203,18 @@ pub fn begin_unwind_(msg: *c_char, file: *c_char, line: size_t) -> ! {
let msg = str::raw::from_c_str(msg);
let file = str::raw::from_c_str(file);
let outmsg = fmt!("%s at line %i of file %s", msg, line as int, file);
let outmsg = fmt!("task failed: '%s' at line %i of file %s",
msg, line as int, file);
// XXX: Logging doesn't work correctly in non-task context because it
// invokes the local heap
if context == TaskContext {
error!(outmsg);
// XXX: Logging doesn't work here - the check to call the log
// function never passes - so calling the log function directly.
let outmsg = Cell::new(outmsg);
do Local::borrow::<Task, ()> |task| {
task.logger.log(Left(outmsg.take()));
}
} else {
rtdebug!("%s", outmsg);
}
@@ -213,11 +222,7 @@ pub fn begin_unwind_(msg: *c_char, file: *c_char, line: size_t) -> ! {
gc::cleanup_stack_for_failure();
let task = Local::unsafe_borrow::<Task>();
let unwinder: &mut Option<Unwinder> = &mut (*task).unwinder;
match *unwinder {
Some(ref mut unwinder) => unwinder.begin_unwind(),
None => abort!("failure without unwinder. aborting process")
}
(*task).unwinder.begin_unwind();
}
}
}

View File

@@ -520,20 +520,9 @@ pub fn failing() -> bool {
}
}
_ => {
let mut unwinding = false;
do Local::borrow::<Task> |local| {
unwinding = match local.unwinder {
Some(unwinder) => {
unwinder.unwinding
do Local::borrow::<Task, bool> |local| {
local.unwinder.unwinding
}
None => {
// Because there is no unwinder we can't be unwinding.
// (The process will abort on failure)
false
}
}
}
return unwinding;
}
}
}

View File

@@ -92,6 +92,7 @@ use util;
use unstable::sync::{Exclusive, exclusive};
use rt::local::Local;
use iterator::{IteratorUtil};
use rt::task::Task;
#[cfg(test)] use task::default_task_opts;
#[cfg(test)] use comm;
@@ -580,9 +581,14 @@ pub fn spawn_raw(opts: TaskOpts, f: ~fn()) {
fn spawn_raw_newsched(_opts: TaskOpts, f: ~fn()) {
use rt::sched::*;
let task = do Local::borrow::<Task, ~Task>() |running_task| {
~running_task.new_child()
};
let mut sched = Local::take::<Scheduler>();
let task = ~Coroutine::new(&mut sched.stack_pool, f);
sched.schedule_new_task(task);
let task = ~Coroutine::with_task(&mut sched.stack_pool,
task, f);
sched.schedule_task(task);
}
fn spawn_raw_oldsched(mut opts: TaskOpts, f: ~fn()) {

View File

@@ -245,7 +245,7 @@ pub unsafe fn local_malloc(td: *c_char, size: uintptr_t) -> *c_char {
}
_ => {
let mut alloc = ::ptr::null();
do Local::borrow::<Task> |task| {
do Local::borrow::<Task,()> |task| {
alloc = task.heap.alloc(td as *c_void, size as uint) as *c_char;
}
return alloc;
@@ -263,7 +263,7 @@ pub unsafe fn local_free(ptr: *c_char) {
rustrt::rust_upcall_free_noswitch(ptr);
}
_ => {
do Local::borrow::<Task> |task| {
do Local::borrow::<Task,()> |task| {
task.heap.free(ptr as *c_void);
}
}

View File

@@ -205,8 +205,53 @@ extern {
fn rust_unlock_little_lock(lock: rust_little_lock);
}
/* *********************************************************************/
//FIXME: #5042 This should be replaced by proper atomic type
pub struct AtomicUint {
priv inner: uint
}
impl AtomicUint {
pub fn new(val: uint) -> AtomicUint { AtomicUint { inner: val } }
pub fn load(&self) -> uint {
unsafe { intrinsics::atomic_load(cast::transmute(self)) as uint }
}
pub fn store(&mut self, val: uint) {
unsafe { intrinsics::atomic_store(cast::transmute(self), val as int); }
}
pub fn add(&mut self, val: int) -> uint {
unsafe { intrinsics::atomic_xadd(cast::transmute(self), val as int) as uint }
}
pub fn cas(&mut self, old:uint, new: uint) -> uint {
unsafe { intrinsics::atomic_cxchg(cast::transmute(self), old as int, new as int) as uint }
}
}
pub struct AtomicInt {
priv inner: int
}
impl AtomicInt {
pub fn new(val: int) -> AtomicInt { AtomicInt { inner: val } }
pub fn load(&self) -> int {
unsafe { intrinsics::atomic_load(&self.inner) }
}
pub fn store(&mut self, val: int) {
unsafe { intrinsics::atomic_store(&mut self.inner, val); }
}
pub fn add(&mut self, val: int) -> int {
unsafe { intrinsics::atomic_xadd(&mut self.inner, val) }
}
pub fn cas(&mut self, old: int, new: int) -> int {
unsafe { intrinsics::atomic_cxchg(&mut self.inner, old, new) }
}
}
#[cfg(test)]
mod tests {
use super::*;
use comm;
use super::exclusive;
use task;
@@ -262,4 +307,28 @@ mod tests {
}
}
}
#[test]
fn atomic_int_smoke_test() {
let mut i = AtomicInt::new(0);
i.store(10);
assert!(i.load() == 10);
assert!(i.add(1) == 10);
assert!(i.load() == 11);
assert!(i.cas(11, 12) == 11);
assert!(i.cas(11, 13) == 12);
assert!(i.load() == 12);
}
#[test]
fn atomic_uint_smoke_test() {
let mut i = AtomicUint::new(0);
i.store(10);
assert!(i.load() == 10);
assert!(i.add(1) == 10);
assert!(i.load() == 11);
assert!(i.cas(11, 12) == 11);
assert!(i.cas(11, 13) == 12);
assert!(i.load() == 12);
}
}

View File

@@ -925,6 +925,13 @@ rust_running_on_valgrind() {
return RUNNING_ON_VALGRIND;
}
extern int get_num_cpus();
extern "C" CDECL uintptr_t
rust_get_num_cpus() {
return get_num_cpus();
}
//
// Local Variables:
// mode: C++

View File

@@ -40,7 +40,7 @@ rust_drop_env_lock() {
}
#if defined(__WIN32__)
static int
int
get_num_cpus() {
SYSTEM_INFO sysinfo;
GetSystemInfo(&sysinfo);
@@ -48,7 +48,7 @@ get_num_cpus() {
return (int) sysinfo.dwNumberOfProcessors;
}
#elif defined(__BSD__)
static int
int
get_num_cpus() {
/* swiped from http://stackoverflow.com/questions/150355/
programmatically-find-the-number-of-cores-on-a-machine */
@@ -75,7 +75,7 @@ get_num_cpus() {
return numCPU;
}
#elif defined(__GNUC__)
static int
int
get_num_cpus() {
return sysconf(_SC_NPROCESSORS_ONLN);
}

View File

@@ -79,6 +79,11 @@ rust_gc_metadata() {
return (void *)global_safe_points;
}
extern "C" CDECL void
rust_update_gc_metadata(const void* map) {
update_gc_metadata(map);
}
//
// Local Variables:
// mode: C++

View File

@@ -178,6 +178,7 @@ rust_call_tydesc_glue
tdefl_compress_mem_to_heap
tinfl_decompress_mem_to_heap
rust_gc_metadata
rust_update_gc_metadata
rust_uv_ip4_port
rust_uv_ip6_port
rust_uv_tcp_getpeername
@@ -240,3 +241,4 @@ rust_take_env_lock
rust_drop_env_lock
rust_update_log_settings
rust_running_on_valgrind
rust_get_num_cpus