Files
rust/src/libstd/sys/common/helper_thread.rs
Alex Crichton 8b7d032014 rollup merge of #20273: alexcrichton/second-pass-comm
Conflicts:
	src/doc/guide.md
	src/libcollections/bit.rs
	src/libcollections/btree/node.rs
	src/libcollections/slice.rs
	src/libcore/ops.rs
	src/libcore/prelude.rs
	src/librand/rand_impls.rs
	src/librustc/middle/check_match.rs
	src/librustc/middle/infer/region_inference/mod.rs
	src/librustc_driver/lib.rs
	src/librustdoc/test.rs
	src/libstd/bitflags.rs
	src/libstd/io/comm_adapters.rs
	src/libstd/io/mem.rs
	src/libstd/io/mod.rs
	src/libstd/io/net/pipe.rs
	src/libstd/io/net/tcp.rs
	src/libstd/io/net/udp.rs
	src/libstd/io/pipe.rs
	src/libstd/io/process.rs
	src/libstd/io/stdio.rs
	src/libstd/io/timer.rs
	src/libstd/io/util.rs
	src/libstd/macros.rs
	src/libstd/os.rs
	src/libstd/path/posix.rs
	src/libstd/path/windows.rs
	src/libstd/prelude/v1.rs
	src/libstd/rand/mod.rs
	src/libstd/rand/os.rs
	src/libstd/sync/barrier.rs
	src/libstd/sync/condvar.rs
	src/libstd/sync/future.rs
	src/libstd/sync/mpsc/mod.rs
	src/libstd/sync/mpsc/mpsc_queue.rs
	src/libstd/sync/mpsc/select.rs
	src/libstd/sync/mpsc/spsc_queue.rs
	src/libstd/sync/mutex.rs
	src/libstd/sync/once.rs
	src/libstd/sync/rwlock.rs
	src/libstd/sync/semaphore.rs
	src/libstd/sync/task_pool.rs
	src/libstd/sys/common/helper_thread.rs
	src/libstd/sys/unix/process.rs
	src/libstd/sys/unix/timer.rs
	src/libstd/sys/windows/c.rs
	src/libstd/sys/windows/timer.rs
	src/libstd/sys/windows/tty.rs
	src/libstd/thread.rs
	src/libstd/thread_local/mod.rs
	src/libstd/thread_local/scoped.rs
	src/libtest/lib.rs
	src/test/auxiliary/cci_capture_clause.rs
	src/test/bench/shootout-reverse-complement.rs
	src/test/bench/shootout-spectralnorm.rs
	src/test/compile-fail/array-old-syntax-2.rs
	src/test/compile-fail/bind-by-move-no-guards.rs
	src/test/compile-fail/builtin-superkinds-self-type.rs
	src/test/compile-fail/comm-not-freeze-receiver.rs
	src/test/compile-fail/comm-not-freeze.rs
	src/test/compile-fail/issue-12041.rs
	src/test/compile-fail/unsendable-class.rs
	src/test/run-pass/builtin-superkinds-capabilities-transitive.rs
	src/test/run-pass/builtin-superkinds-capabilities-xc.rs
	src/test/run-pass/builtin-superkinds-capabilities.rs
	src/test/run-pass/builtin-superkinds-self-type.rs
	src/test/run-pass/capturing-logging.rs
	src/test/run-pass/closure-bounds-can-capture-chan.rs
	src/test/run-pass/comm.rs
	src/test/run-pass/core-run-destroy.rs
	src/test/run-pass/drop-trait-enum.rs
	src/test/run-pass/hashmap-memory.rs
	src/test/run-pass/issue-13494.rs
	src/test/run-pass/issue-3609.rs
	src/test/run-pass/issue-4446.rs
	src/test/run-pass/issue-4448.rs
	src/test/run-pass/issue-8827.rs
	src/test/run-pass/issue-9396.rs
	src/test/run-pass/ivec-tag.rs
	src/test/run-pass/rust-log-filter.rs
	src/test/run-pass/send-resource.rs
	src/test/run-pass/send-type-inference.rs
	src/test/run-pass/sendable-class.rs
	src/test/run-pass/spawn-types.rs
	src/test/run-pass/task-comm-0.rs
	src/test/run-pass/task-comm-10.rs
	src/test/run-pass/task-comm-11.rs
	src/test/run-pass/task-comm-13.rs
	src/test/run-pass/task-comm-14.rs
	src/test/run-pass/task-comm-15.rs
	src/test/run-pass/task-comm-16.rs
	src/test/run-pass/task-comm-3.rs
	src/test/run-pass/task-comm-4.rs
	src/test/run-pass/task-comm-5.rs
	src/test/run-pass/task-comm-6.rs
	src/test/run-pass/task-comm-7.rs
	src/test/run-pass/task-comm-9.rs
	src/test/run-pass/task-comm-chan-nil.rs
	src/test/run-pass/task-spawn-move-and-copy.rs
	src/test/run-pass/task-stderr.rs
	src/test/run-pass/tcp-accept-stress.rs
	src/test/run-pass/tcp-connect-timeouts.rs
	src/test/run-pass/tempfile.rs
	src/test/run-pass/trait-bounds-in-arc.rs
	src/test/run-pass/trivial-message.rs
	src/test/run-pass/unique-send-2.rs
	src/test/run-pass/unique-send.rs
	src/test/run-pass/unwind-resource.rs
2015-01-02 09:15:54 -08:00

153 lines
5.4 KiB
Rust

// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! Implementation of the helper thread for the timer module
//!
//! This module contains the management necessary for the timer worker thread.
//! This thread is responsible for performing the send()s on channels for timers
//! that are using channels instead of a blocking call.
//!
//! The timer thread is lazily initialized, and it's shut down via the
//! `shutdown` function provided. It must be maintained as an invariant that
//! `shutdown` is only called when the entire program is finished. No new timers
//! can be created in the future and there must be no active timers at that
//! time.
use prelude::v1::*;
use cell::UnsafeCell;
use comm::{channel, Sender, Receiver};
use mem;
use rt;
use sync::{StaticMutex, StaticCondvar};
use sync::mpsc::{channel, Sender, Receiver};
use sys::helper_signal;
use thread::Thread;
/// A structure for management of a helper thread.
///
/// This is generally a static structure which tracks the lifetime of a helper
/// thread.
///
/// The fields of this helper are all public, but they should not be used, this
/// is for static initialization.
pub struct Helper<M> {
/// Internal lock which protects the remaining fields
pub lock: StaticMutex,
pub cond: StaticCondvar,
// You'll notice that the remaining fields are UnsafeCell<T>, and this is
// because all helper thread operations are done through &self, but we need
// these to be mutable (once `lock` is held).
/// Lazily allocated channel to send messages to the helper thread.
pub chan: UnsafeCell<*mut Sender<M>>,
/// OS handle used to wake up a blocked helper thread
pub signal: UnsafeCell<uint>,
/// Flag if this helper thread has booted and been initialized yet.
pub initialized: UnsafeCell<bool>,
/// Flag if this helper thread has shut down
pub shutdown: UnsafeCell<bool>,
}
unsafe impl<M:Send> Send for Helper<M> { }
unsafe impl<M:Send> Sync for Helper<M> { }
struct RaceBox(helper_signal::signal);
unsafe impl Send for RaceBox {}
unsafe impl Sync for RaceBox {}
impl<M: Send> Helper<M> {
/// Lazily boots a helper thread, becoming a no-op if the helper has already
/// been spawned.
///
/// This function will check to see if the thread has been initialized, and
/// if it has it returns quickly. If initialization has not happened yet,
/// the closure `f` will be run (inside of the initialization lock) and
/// passed to the helper thread in a separate task.
///
/// This function is safe to be called many times.
pub fn boot<T, F>(&'static self, f: F, helper: fn(helper_signal::signal, Receiver<M>, T)) where
T: Send,
F: FnOnce() -> T,
{
unsafe {
let _guard = self.lock.lock().unwrap();
if !*self.initialized.get() {
let (tx, rx) = channel();
*self.chan.get() = mem::transmute(box tx);
let (receive, send) = helper_signal::new();
*self.signal.get() = send as uint;
let receive = RaceBox(receive);
let t = f();
Thread::spawn(move |:| {
helper(receive.0, rx, t);
let _g = self.lock.lock().unwrap();
*self.shutdown.get() = true;
self.cond.notify_one()
}).detach();
rt::at_exit(move|:| { self.shutdown() });
*self.initialized.get() = true;
}
}
}
/// Sends a message to a spawned worker thread.
///
/// This is only valid if the worker thread has previously booted
pub fn send(&'static self, msg: M) {
unsafe {
let _guard = self.lock.lock().unwrap();
// Must send and *then* signal to ensure that the child receives the
// message. Otherwise it could wake up and go to sleep before we
// send the message.
assert!(!self.chan.get().is_null());
(**self.chan.get()).send(msg).unwrap();
helper_signal::signal(*self.signal.get() as helper_signal::signal);
}
}
fn shutdown(&'static self) {
unsafe {
// Shut down, but make sure this is done inside our lock to ensure
// that we'll always receive the exit signal when the thread
// returns.
let mut guard = self.lock.lock().unwrap();
// Close the channel by destroying it
let chan: Box<Sender<M>> = mem::transmute(*self.chan.get());
*self.chan.get() = 0 as *mut Sender<M>;
drop(chan);
helper_signal::signal(*self.signal.get() as helper_signal::signal);
// Wait for the child to exit
while !*self.shutdown.get() {
guard = self.cond.wait(guard).unwrap();
}
drop(guard);
// Clean up after ourselves
self.lock.destroy();
helper_signal::close(*self.signal.get() as helper_signal::signal);
*self.signal.get() = 0;
}
}
}