std: Second pass stabilization of sync

This pass performs a second pass of stabilization through the `std::sync`
module, avoiding modules/types that are being handled in other PRs (e.g.
mutexes, rwlocks, condvars, and channels).

The following items are now stable

* `sync::atomic`
* `sync::atomic::ATOMIC_BOOL_INIT` (was `INIT_ATOMIC_BOOL`)
* `sync::atomic::ATOMIC_INT_INIT` (was `INIT_ATOMIC_INT`)
* `sync::atomic::ATOMIC_UINT_INIT` (was `INIT_ATOMIC_UINT`)
* `sync::Once`
* `sync::ONCE_INIT`
* `sync::Once::call_once` (was `doit`)
  * C == `pthread_once(..)`
  * Boost == `call_once(..)`
  * Windows == `InitOnceExecuteOnce`
* `sync::Barrier`
* `sync::Barrier::new`
* `sync::Barrier::wait` (now returns a `bool`)
* `sync::Semaphore::new`
* `sync::Semaphore::acquire`
* `sync::Semaphore::release`

The following items remain unstable

* `sync::SemaphoreGuard`
* `sync::Semaphore::access` - it's unclear how this relates to the poisoning
                              story of mutexes.
* `sync::TaskPool` - the semantics of a failing task and whether a thread is
                     re-attached to a thread pool are somewhat unclear, and the
                     utility of this type in `sync` is question with respect to
                     the jobs of other primitives. This type will likely become
                     stable or move out of the standard library over time.
* `sync::Future` - futures as-is have yet to be deeply re-evaluated with the
                   recent core changes to Rust's synchronization story, and will
                   likely become stable in the future but are unstable until
                   that time comes.

[breaking-change]
This commit is contained in:
Alex Crichton
2014-12-29 15:03:01 -08:00
parent cd614164e6
commit f3a7ec7028
44 changed files with 168 additions and 237 deletions

View File

@@ -86,15 +86,15 @@
//! Keep a global count of live tasks:
//!
//! ```
//! use std::sync::atomic::{AtomicUint, SeqCst, INIT_ATOMIC_UINT};
//! use std::sync::atomic::{AtomicUint, SeqCst, ATOMIC_UINT_INIT};
//!
//! static GLOBAL_TASK_COUNT: AtomicUint = INIT_ATOMIC_UINT;
//! static GLOBAL_TASK_COUNT: AtomicUint = ATOMIC_UINT_INIT;
//!
//! let old_task_count = GLOBAL_TASK_COUNT.fetch_add(1, SeqCst);
//! println!("live tasks: {}", old_task_count + 1);
//! ```
#![allow(deprecated)]
#![stable]
use alloc::boxed::Box;
use core::mem;
@@ -102,6 +102,7 @@ use core::prelude::{Send, Drop, None, Option, Some};
pub use core::atomic::{AtomicBool, AtomicInt, AtomicUint, AtomicPtr};
pub use core::atomic::{INIT_ATOMIC_BOOL, INIT_ATOMIC_INT, INIT_ATOMIC_UINT};
pub use core::atomic::{ATOMIC_BOOL_INIT, ATOMIC_INT_INIT, ATOMIC_UINT_INIT};
pub use core::atomic::fence;
pub use core::atomic::Ordering::{mod, Relaxed, Release, Acquire, AcqRel, SeqCst};
@@ -116,6 +117,7 @@ pub struct AtomicOption<T> {
p: AtomicUint,
}
#[allow(deprecated)]
impl<T: Send> AtomicOption<T> {
/// Create a new `AtomicOption`
pub fn new(p: Box<T>) -> AtomicOption<T> {

View File

@@ -8,7 +8,6 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use kinds::{Send, Sync};
use sync::{Mutex, Condvar};
/// A barrier enables multiple tasks to synchronize the beginning
@@ -30,29 +29,32 @@ use sync::{Mutex, Condvar};
/// }).detach();
/// }
/// ```
#[stable]
pub struct Barrier {
lock: Mutex<BarrierState>,
cvar: Condvar,
num_threads: uint,
}
unsafe impl Send for Barrier {}
unsafe impl Sync for Barrier {}
// The inner state of a double barrier
struct BarrierState {
count: uint,
generation_id: uint,
}
unsafe impl Send for BarrierState {}
unsafe impl Sync for BarrierState {}
/// A result returned from wait.
///
/// Currently this opaque structure only has one method, `.is_leader()`. Only
/// one thread will receive a result that will return `true` from this function.
#[allow(missing_copy_implementations)]
pub struct BarrierWaitResult(bool);
impl Barrier {
/// Create a new barrier that can block a given number of threads.
///
/// A barrier will block `n`-1 threads which call `wait` and then wake up
/// all threads at once when the `n`th thread calls `wait`.
#[stable]
pub fn new(n: uint) -> Barrier {
Barrier {
lock: Mutex::new(BarrierState {
@@ -68,7 +70,13 @@ impl Barrier {
///
/// Barriers are re-usable after all threads have rendezvoused once, and can
/// be used continuously.
pub fn wait(&self) {
///
/// A single (arbitrary) thread will receive a `BarrierWaitResult` that
/// returns `true` from `is_leader` when returning from this function, and
/// all other threads will receive a result that will return `false` from
/// `is_leader`
#[stable]
pub fn wait(&self) -> BarrierWaitResult {
let mut lock = self.lock.lock().unwrap();
let local_gen = lock.generation_id;
lock.count += 1;
@@ -79,32 +87,44 @@ impl Barrier {
lock.count < self.num_threads {
lock = self.cvar.wait(lock).unwrap();
}
BarrierWaitResult(false)
} else {
lock.count = 0;
lock.generation_id += 1;
self.cvar.notify_all();
BarrierWaitResult(true)
}
}
}
impl BarrierWaitResult {
/// Return whether this thread from `wait` is the "leader thread".
///
/// Only one thread will have `true` returned from their result, all other
/// threads will have `false` returned.
#[stable]
pub fn is_leader(&self) -> bool { self.0 }
}
#[cfg(test)]
mod tests {
use prelude::*;
use sync::{Arc, Barrier};
use comm::Empty;
use sync::{Arc, Barrier};
#[test]
fn test_barrier() {
let barrier = Arc::new(Barrier::new(10));
const N: uint = 10;
let barrier = Arc::new(Barrier::new(N));
let (tx, rx) = channel();
for _ in range(0u, 9) {
for _ in range(0u, N - 1) {
let c = barrier.clone();
let tx = tx.clone();
spawn(move|| {
c.wait();
tx.send(true);
tx.send(c.wait().is_leader());
});
}
@@ -115,10 +135,15 @@ mod tests {
_ => false,
});
barrier.wait();
let mut leader_found = barrier.wait().is_leader();
// Now, the barrier is cleared and we should get data.
for _ in range(0u, 9) {
rx.recv();
for _ in range(0u, N - 1) {
if rx.recv() {
assert!(!leader_found);
leader_found = true;
}
}
assert!(leader_found);
}
}

View File

@@ -88,7 +88,7 @@ unsafe impl Sync for StaticCondvar {}
#[unstable = "may be merged with Condvar in the future"]
pub const CONDVAR_INIT: StaticCondvar = StaticCondvar {
inner: sys::CONDVAR_INIT,
mutex: atomic::INIT_ATOMIC_UINT,
mutex: atomic::ATOMIC_UINT_INIT,
};
impl Condvar {

View File

@@ -8,8 +8,8 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! A type representing values that may be computed concurrently and operations for working with
//! them.
//! A type representing values that may be computed concurrently and operations
//! for working with them.
//!
//! # Example
//!
@@ -23,6 +23,9 @@
//! ```
#![allow(missing_docs)]
#![unstable = "futures as-is have yet to be deeply reevaluated with recent \
core changes to Rust's synchronization story, and will likely \
become stable in the future but are unstable until that time"]
use core::prelude::*;
use core::mem::replace;

View File

@@ -26,7 +26,7 @@ pub use self::rwlock::{RWLockReadGuard, RWLockWriteGuard};
pub use self::condvar::{Condvar, StaticCondvar, CONDVAR_INIT};
pub use self::once::{Once, ONCE_INIT};
pub use self::semaphore::{Semaphore, SemaphoreGuard};
pub use self::barrier::Barrier;
pub use self::barrier::{Barrier, BarrierWaitResult};
pub use self::poison::{PoisonError, TryLockError, TryLockResult, LockResult};
pub use self::future::Future;

View File

@@ -32,10 +32,11 @@ use sync::{StaticMutex, MUTEX_INIT};
///
/// static START: Once = ONCE_INIT;
///
/// START.doit(|| {
/// START.call_once(|| {
/// // run initialization here
/// });
/// ```
#[stable]
pub struct Once {
mutex: StaticMutex,
cnt: atomic::AtomicInt,
@@ -45,23 +46,25 @@ pub struct Once {
unsafe impl Sync for Once {}
/// Initialization value for static `Once` values.
#[stable]
pub const ONCE_INIT: Once = Once {
mutex: MUTEX_INIT,
cnt: atomic::INIT_ATOMIC_INT,
lock_cnt: atomic::INIT_ATOMIC_INT,
cnt: atomic::ATOMIC_INT_INIT,
lock_cnt: atomic::ATOMIC_INT_INIT,
};
impl Once {
/// Perform an initialization routine once and only once. The given closure
/// will be executed if this is the first time `doit` has been called, and
/// otherwise the routine will *not* be invoked.
/// will be executed if this is the first time `call_once` has been called,
/// and otherwise the routine will *not* be invoked.
///
/// This method will block the calling task if another initialization
/// routine is currently running.
///
/// When this function returns, it is guaranteed that some initialization
/// has run and completed (it may not be the closure specified).
pub fn doit<F>(&'static self, f: F) where F: FnOnce() {
#[stable]
pub fn call_once<F>(&'static self, f: F) where F: FnOnce() {
// Optimize common path: load is much cheaper than fetch_add.
if self.cnt.load(atomic::SeqCst) < 0 {
return
@@ -91,13 +94,13 @@ impl Once {
//
// It is crucial that the negative value is swapped in *after* the
// initialization routine has completed because otherwise new threads
// calling `doit` will return immediately before the initialization has
// completed.
// calling `call_once` will return immediately before the initialization
// has completed.
let prev = self.cnt.fetch_add(1, atomic::SeqCst);
if prev < 0 {
// Make sure we never overflow, we'll never have int::MIN
// simultaneous calls to `doit` to make this value go back to 0
// simultaneous calls to `call_once` to make this value go back to 0
self.cnt.store(int::MIN, atomic::SeqCst);
return
}
@@ -118,6 +121,10 @@ impl Once {
unsafe { self.mutex.destroy() }
}
}
/// Deprecated
#[deprecated = "renamed to `call_once`"]
pub fn doit<F>(&'static self, f: F) where F: FnOnce() { self.call_once(f) }
}
#[cfg(test)]
@@ -131,9 +138,9 @@ mod test {
fn smoke_once() {
static O: Once = ONCE_INIT;
let mut a = 0i;
O.doit(|| a += 1);
O.call_once(|| a += 1);
assert_eq!(a, 1);
O.doit(|| a += 1);
O.call_once(|| a += 1);
assert_eq!(a, 1);
}
@@ -148,7 +155,7 @@ mod test {
spawn(move|| {
for _ in range(0u, 4) { Thread::yield_now() }
unsafe {
O.doit(|| {
O.call_once(|| {
assert!(!run);
run = true;
});
@@ -159,7 +166,7 @@ mod test {
}
unsafe {
O.doit(|| {
O.call_once(|| {
assert!(!run);
run = true;
});

View File

@@ -8,6 +8,9 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
#![unstable = "the interaction between semaphores and the acquisition/release \
of resources is currently unclear"]
use ops::Drop;
use sync::{Mutex, Condvar};

View File

@@ -10,6 +10,11 @@
//! Abstraction of a thread pool for basic parallelism.
#![unstable = "the semantics of a failing task and whether a thread is \
re-attached to a thread pool are somewhat unclear, and the \
utility of this type in `std::sync` is questionable with \
respect to the jobs of other primitives"]
use core::prelude::*;
use thread::Thread;