Rollup merge of #144651 - connortsui20:nonpoison_condvar, r=joboet

Implementation: `#[feature(nonpoison_condvar)]`

Tracking Issue: https://github.com/rust-lang/rust/issues/134645

This PR continues the effort made in https://github.com/rust-lang/rust/pull/144022 by adding the implementation of `nonpoison::condvar`.

Many of the changes here are similar to the changes made to implement `nonpoison::mutex`.

There are two other changes here. The first is that the `Barrier` implementation is migrated to use the `nonpoison::Condvar` instead of the `poison` variant. The second (which might be subject to some discussion) is that `WaitTimeoutResult` is moved up to `mod.rs`, as both `condvar` variants need that type (and I do not know if there is a better place to put it now).

### Related PRs

- `nonpoison_rwlock` implementation: https://github.com/rust-lang/rust/pull/144648
- `nonpoison_once` implementation: https://github.com/rust-lang/rust/pull/144653
This commit is contained in:
Stuart Cook
2025-08-30 20:29:06 +10:00
committed by GitHub
11 changed files with 783 additions and 283 deletions

View File

@@ -1,6 +1,5 @@
use crate::fmt; use crate::fmt;
// FIXME(nonpoison_mutex,nonpoison_condvar): switch to nonpoison versions once they are available use crate::sync::nonpoison::{Condvar, Mutex};
use crate::sync::{Condvar, Mutex};
/// A barrier enables multiple threads to synchronize the beginning /// A barrier enables multiple threads to synchronize the beginning
/// of some computation. /// of some computation.
@@ -118,12 +117,11 @@ impl Barrier {
/// ``` /// ```
#[stable(feature = "rust1", since = "1.0.0")] #[stable(feature = "rust1", since = "1.0.0")]
pub fn wait(&self) -> BarrierWaitResult { pub fn wait(&self) -> BarrierWaitResult {
let mut lock = self.lock.lock().unwrap(); let mut lock = self.lock.lock();
let local_gen = lock.generation_id; let local_gen = lock.generation_id;
lock.count += 1; lock.count += 1;
if lock.count < self.num_threads { if lock.count < self.num_threads {
let _guard = let _guard = self.cvar.wait_while(lock, |state| local_gen == state.generation_id);
self.cvar.wait_while(lock, |state| local_gen == state.generation_id).unwrap();
BarrierWaitResult(false) BarrierWaitResult(false)
} else { } else {
lock.count = 0; lock.count = 0;

View File

@@ -209,7 +209,7 @@ pub use self::poison::{LockResult, PoisonError};
#[doc(inline)] #[doc(inline)]
pub use self::poison::{ pub use self::poison::{
Mutex, MutexGuard, TryLockError, TryLockResult, Mutex, MutexGuard, TryLockError, TryLockResult,
Condvar, WaitTimeoutResult, Condvar,
Once, OnceState, Once, OnceState,
RwLock, RwLockReadGuard, RwLockWriteGuard, RwLock, RwLockReadGuard, RwLockWriteGuard,
}; };
@@ -234,3 +234,66 @@ mod barrier;
mod lazy_lock; mod lazy_lock;
mod once_lock; mod once_lock;
mod reentrant_lock; mod reentrant_lock;
/// A type indicating whether a timed wait on a condition variable returned
/// due to a time out or not.
///
/// It is returned by the [`wait_timeout`] method.
///
/// [`wait_timeout`]: Condvar::wait_timeout
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
#[stable(feature = "wait_timeout", since = "1.5.0")]
pub struct WaitTimeoutResult(bool);
impl WaitTimeoutResult {
/// Returns `true` if the wait was known to have timed out.
///
/// # Examples
///
/// This example spawns a thread which will sleep 20 milliseconds before
/// updating a boolean value and then notifying the condvar.
///
/// The main thread will wait with a 10 millisecond timeout on the condvar
/// and will leave the loop upon timeout.
///
/// ```
/// use std::sync::{Arc, Condvar, Mutex};
/// use std::thread;
/// use std::time::Duration;
///
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
/// let pair2 = Arc::clone(&pair);
///
/// # let handle =
/// thread::spawn(move || {
/// let (lock, cvar) = &*pair2;
///
/// // Let's wait 20 milliseconds before notifying the condvar.
/// thread::sleep(Duration::from_millis(20));
///
/// let mut started = lock.lock().unwrap();
/// // We update the boolean value.
/// *started = true;
/// cvar.notify_one();
/// });
///
/// // Wait for the thread to start up.
/// let (lock, cvar) = &*pair;
/// loop {
/// // Let's put a timeout on the condvar's wait.
/// let result = cvar.wait_timeout(lock.lock().unwrap(), Duration::from_millis(10)).unwrap();
/// // 10 milliseconds have passed.
/// if result.1.timed_out() {
/// // timed out now and we can leave.
/// break
/// }
/// }
/// # // Prevent leaks for Miri.
/// # let _ = handle.join();
/// ```
#[must_use]
#[stable(feature = "wait_timeout", since = "1.5.0")]
pub fn timed_out(&self) -> bool {
self.0
}
}

View File

@@ -29,6 +29,8 @@ impl fmt::Display for WouldBlock {
} }
} }
#[unstable(feature = "nonpoison_condvar", issue = "134645")]
pub use self::condvar::Condvar;
#[unstable(feature = "mapped_lock_guards", issue = "117108")] #[unstable(feature = "mapped_lock_guards", issue = "117108")]
pub use self::mutex::MappedMutexGuard; pub use self::mutex::MappedMutexGuard;
#[unstable(feature = "nonpoison_mutex", issue = "134645")] #[unstable(feature = "nonpoison_mutex", issue = "134645")]
@@ -38,5 +40,6 @@ pub use self::rwlock::{MappedRwLockReadGuard, MappedRwLockWriteGuard};
#[unstable(feature = "nonpoison_rwlock", issue = "134645")] #[unstable(feature = "nonpoison_rwlock", issue = "134645")]
pub use self::rwlock::{RwLock, RwLockReadGuard, RwLockWriteGuard}; pub use self::rwlock::{RwLock, RwLockReadGuard, RwLockWriteGuard};
mod condvar;
mod mutex; mod mutex;
mod rwlock; mod rwlock;

View File

@@ -0,0 +1,448 @@
use crate::fmt;
use crate::sync::WaitTimeoutResult;
use crate::sync::nonpoison::{MutexGuard, mutex};
use crate::sys::sync as sys;
use crate::time::{Duration, Instant};
/// A Condition Variable
///
/// For more information about condition variables, check out the documentation for the poisoning
/// variant of this type at [`poison::Condvar`].
///
/// # Examples
///
/// Note that this `Condvar` does **not** propagate information about threads that panic while
/// holding a lock. If you need this functionality, see [`poison::Mutex`] and [`poison::Condvar`].
///
/// ```
/// #![feature(nonpoison_mutex)]
/// #![feature(nonpoison_condvar)]
///
/// use std::sync::nonpoison::{Mutex, Condvar};
/// use std::sync::Arc;
/// use std::thread;
///
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
/// let pair2 = Arc::clone(&pair);
///
/// // Inside of our lock, spawn a new thread, and then wait for it to start.
/// thread::spawn(move || {
/// let (lock, cvar) = &*pair2;
/// let mut started = lock.lock();
/// *started = true;
/// // We notify the condvar that the value has changed.
/// cvar.notify_one();
/// });
///
/// // Wait for the thread to start up.
/// let (lock, cvar) = &*pair;
/// let mut started = lock.lock();
/// while !*started {
/// started = cvar.wait(started);
/// }
/// ```
///
/// [`poison::Mutex`]: crate::sync::poison::Mutex
/// [`poison::Condvar`]: crate::sync::poison::Condvar
#[unstable(feature = "nonpoison_condvar", issue = "134645")]
pub struct Condvar {
inner: sys::Condvar,
}
impl Condvar {
/// Creates a new condition variable which is ready to be waited on and
/// notified.
///
/// # Examples
///
/// ```
/// use std::sync::Condvar;
///
/// let condvar = Condvar::new();
/// ```
#[unstable(feature = "nonpoison_condvar", issue = "134645")]
#[must_use]
#[inline]
pub const fn new() -> Condvar {
Condvar { inner: sys::Condvar::new() }
}
/// Blocks the current thread until this condition variable receives a
/// notification.
///
/// This function will atomically unlock the mutex specified (represented by
/// `guard`) and block the current thread. This means that any calls
/// to [`notify_one`] or [`notify_all`] which happen logically after the
/// mutex is unlocked are candidates to wake this thread up. When this
/// function call returns, the lock specified will have been re-acquired.
///
/// Note that this function is susceptible to spurious wakeups. Condition
/// variables normally have a boolean predicate associated with them, and
/// the predicate must always be checked each time this function returns to
/// protect against spurious wakeups.
///
/// # Panics
///
/// This function may [`panic!`] if it is used with more than one mutex
/// over time.
///
/// [`notify_one`]: Self::notify_one
/// [`notify_all`]: Self::notify_all
///
/// # Examples
///
/// ```
/// #![feature(nonpoison_mutex)]
/// #![feature(nonpoison_condvar)]
///
/// use std::sync::nonpoison::{Mutex, Condvar};
/// use std::sync::Arc;
/// use std::thread;
///
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
/// let pair2 = Arc::clone(&pair);
///
/// thread::spawn(move || {
/// let (lock, cvar) = &*pair2;
/// let mut started = lock.lock();
/// *started = true;
/// // We notify the condvar that the value has changed.
/// cvar.notify_one();
/// });
///
/// // Wait for the thread to start up.
/// let (lock, cvar) = &*pair;
/// let mut started = lock.lock();
/// // As long as the value inside the `Mutex<bool>` is `false`, we wait.
/// while !*started {
/// started = cvar.wait(started);
/// }
/// ```
#[unstable(feature = "nonpoison_condvar", issue = "134645")]
pub fn wait<'a, T>(&self, guard: MutexGuard<'a, T>) -> MutexGuard<'a, T> {
unsafe {
let lock = mutex::guard_lock(&guard);
self.inner.wait(lock);
}
guard
}
/// Blocks the current thread until the provided condition becomes false.
///
/// `condition` is checked immediately; if not met (returns `true`), this
/// will [`wait`] for the next notification then check again. This repeats
/// until `condition` returns `false`, in which case this function returns.
///
/// This function will atomically unlock the mutex specified (represented by
/// `guard`) and block the current thread. This means that any calls
/// to [`notify_one`] or [`notify_all`] which happen logically after the
/// mutex is unlocked are candidates to wake this thread up. When this
/// function call returns, the lock specified will have been re-acquired.
///
/// [`wait`]: Self::wait
/// [`notify_one`]: Self::notify_one
/// [`notify_all`]: Self::notify_all
///
/// # Examples
///
/// ```
/// #![feature(nonpoison_mutex)]
/// #![feature(nonpoison_condvar)]
///
/// use std::sync::nonpoison::{Mutex, Condvar};
/// use std::sync::Arc;
/// use std::thread;
///
/// let pair = Arc::new((Mutex::new(true), Condvar::new()));
/// let pair2 = Arc::clone(&pair);
///
/// thread::spawn(move || {
/// let (lock, cvar) = &*pair2;
/// let mut pending = lock.lock();
/// *pending = false;
/// // We notify the condvar that the value has changed.
/// cvar.notify_one();
/// });
///
/// // Wait for the thread to start up.
/// let (lock, cvar) = &*pair;
/// // As long as the value inside the `Mutex<bool>` is `true`, we wait.
/// let _guard = cvar.wait_while(lock.lock(), |pending| { *pending });
/// ```
#[unstable(feature = "nonpoison_condvar", issue = "134645")]
pub fn wait_while<'a, T, F>(
&self,
mut guard: MutexGuard<'a, T>,
mut condition: F,
) -> MutexGuard<'a, T>
where
F: FnMut(&mut T) -> bool,
{
while condition(&mut *guard) {
guard = self.wait(guard);
}
guard
}
/// Waits on this condition variable for a notification, timing out after a
/// specified duration.
///
/// The semantics of this function are equivalent to [`wait`] except that
/// the thread will be blocked for roughly no longer than `dur`. This
/// method should not be used for precise timing due to anomalies such as
/// preemption or platform differences that might not cause the maximum
/// amount of time waited to be precisely `dur`.
///
/// Note that the best effort is made to ensure that the time waited is
/// measured with a monotonic clock, and not affected by the changes made to
/// the system time. This function is susceptible to spurious wakeups.
/// Condition variables normally have a boolean predicate associated with
/// them, and the predicate must always be checked each time this function
/// returns to protect against spurious wakeups. Additionally, it is
/// typically desirable for the timeout to not exceed some duration in
/// spite of spurious wakes, thus the sleep-duration is decremented by the
/// amount slept. Alternatively, use the `wait_timeout_while` method
/// to wait with a timeout while a predicate is true.
///
/// The returned [`WaitTimeoutResult`] value indicates if the timeout is
/// known to have elapsed.
///
/// Like [`wait`], the lock specified will be re-acquired when this function
/// returns, regardless of whether the timeout elapsed or not.
///
/// [`wait`]: Self::wait
/// [`wait_timeout_while`]: Self::wait_timeout_while
///
/// # Examples
///
/// ```
/// #![feature(nonpoison_mutex)]
/// #![feature(nonpoison_condvar)]
///
/// use std::sync::nonpoison::{Mutex, Condvar};
/// use std::sync::Arc;
/// use std::thread;
/// use std::time::Duration;
///
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
/// let pair2 = Arc::clone(&pair);
///
/// thread::spawn(move || {
/// let (lock, cvar) = &*pair2;
/// let mut started = lock.lock();
/// *started = true;
/// // We notify the condvar that the value has changed.
/// cvar.notify_one();
/// });
///
/// // wait for the thread to start up
/// let (lock, cvar) = &*pair;
/// let mut started = lock.lock();
/// // as long as the value inside the `Mutex<bool>` is `false`, we wait
/// loop {
/// let result = cvar.wait_timeout(started, Duration::from_millis(10));
/// // 10 milliseconds have passed, or maybe the value changed!
/// started = result.0;
/// if *started == true {
/// // We received the notification and the value has been updated, we can leave.
/// break
/// }
/// }
/// ```
#[unstable(feature = "nonpoison_condvar", issue = "134645")]
pub fn wait_timeout<'a, T>(
&self,
guard: MutexGuard<'a, T>,
dur: Duration,
) -> (MutexGuard<'a, T>, WaitTimeoutResult) {
let success = unsafe {
let lock = mutex::guard_lock(&guard);
self.inner.wait_timeout(lock, dur)
};
(guard, WaitTimeoutResult(!success))
}
/// Waits on this condition variable for a notification, timing out after a
/// specified duration.
///
/// The semantics of this function are equivalent to [`wait_while`] except
/// that the thread will be blocked for roughly no longer than `dur`. This
/// method should not be used for precise timing due to anomalies such as
/// preemption or platform differences that might not cause the maximum
/// amount of time waited to be precisely `dur`.
///
/// Note that the best effort is made to ensure that the time waited is
/// measured with a monotonic clock, and not affected by the changes made to
/// the system time.
///
/// The returned [`WaitTimeoutResult`] value indicates if the timeout is
/// known to have elapsed without the condition being met.
///
/// Like [`wait_while`], the lock specified will be re-acquired when this
/// function returns, regardless of whether the timeout elapsed or not.
///
/// [`wait_while`]: Self::wait_while
/// [`wait_timeout`]: Self::wait_timeout
///
/// # Examples
///
/// ```
/// #![feature(nonpoison_mutex)]
/// #![feature(nonpoison_condvar)]
///
/// use std::sync::nonpoison::{Mutex, Condvar};
/// use std::sync::Arc;
/// use std::thread;
/// use std::time::Duration;
///
/// let pair = Arc::new((Mutex::new(true), Condvar::new()));
/// let pair2 = Arc::clone(&pair);
///
/// thread::spawn(move || {
/// let (lock, cvar) = &*pair2;
/// let mut pending = lock.lock();
/// *pending = false;
/// // We notify the condvar that the value has changed.
/// cvar.notify_one();
/// });
///
/// // wait for the thread to start up
/// let (lock, cvar) = &*pair;
/// let result = cvar.wait_timeout_while(
/// lock.lock(),
/// Duration::from_millis(100),
/// |&mut pending| pending,
/// );
/// if result.1.timed_out() {
/// // timed-out without the condition ever evaluating to false.
/// }
/// // access the locked mutex via result.0
/// ```
#[unstable(feature = "nonpoison_condvar", issue = "134645")]
pub fn wait_timeout_while<'a, T, F>(
&self,
mut guard: MutexGuard<'a, T>,
dur: Duration,
mut condition: F,
) -> (MutexGuard<'a, T>, WaitTimeoutResult)
where
F: FnMut(&mut T) -> bool,
{
let start = Instant::now();
loop {
if !condition(&mut *guard) {
return (guard, WaitTimeoutResult(false));
}
let timeout = match dur.checked_sub(start.elapsed()) {
Some(timeout) => timeout,
None => return (guard, WaitTimeoutResult(true)),
};
guard = self.wait_timeout(guard, timeout).0;
}
}
/// Wakes up one blocked thread on this condvar.
///
/// If there is a blocked thread on this condition variable, then it will
/// be woken up from its call to [`wait`] or [`wait_timeout`]. Calls to
/// `notify_one` are not buffered in any way.
///
/// To wake up all threads, see [`notify_all`].
///
/// [`wait`]: Self::wait
/// [`wait_timeout`]: Self::wait_timeout
/// [`notify_all`]: Self::notify_all
///
/// # Examples
///
/// ```
/// #![feature(nonpoison_mutex)]
/// #![feature(nonpoison_condvar)]
///
/// use std::sync::nonpoison::{Mutex, Condvar};
/// use std::sync::Arc;
/// use std::thread;
///
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
/// let pair2 = Arc::clone(&pair);
///
/// thread::spawn(move || {
/// let (lock, cvar) = &*pair2;
/// let mut started = lock.lock();
/// *started = true;
/// // We notify the condvar that the value has changed.
/// cvar.notify_one();
/// });
///
/// // Wait for the thread to start up.
/// let (lock, cvar) = &*pair;
/// let mut started = lock.lock();
/// // As long as the value inside the `Mutex<bool>` is `false`, we wait.
/// while !*started {
/// started = cvar.wait(started);
/// }
/// ```
#[unstable(feature = "nonpoison_condvar", issue = "134645")]
pub fn notify_one(&self) {
self.inner.notify_one()
}
/// Wakes up all blocked threads on this condvar.
///
/// This method will ensure that any current waiters on the condition
/// variable are awoken. Calls to `notify_all()` are not buffered in any
/// way.
///
/// To wake up only one thread, see [`notify_one`].
///
/// [`notify_one`]: Self::notify_one
///
/// # Examples
///
/// ```
/// #![feature(nonpoison_mutex)]
/// #![feature(nonpoison_condvar)]
///
/// use std::sync::nonpoison::{Mutex, Condvar};
/// use std::sync::Arc;
/// use std::thread;
///
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
/// let pair2 = Arc::clone(&pair);
///
/// thread::spawn(move || {
/// let (lock, cvar) = &*pair2;
/// let mut started = lock.lock();
/// *started = true;
/// // We notify the condvar that the value has changed.
/// cvar.notify_all();
/// });
///
/// // Wait for the thread to start up.
/// let (lock, cvar) = &*pair;
/// let mut started = lock.lock();
/// // As long as the value inside the `Mutex<bool>` is `false`, we wait.
/// while !*started {
/// started = cvar.wait(started);
/// }
/// ```
#[unstable(feature = "nonpoison_condvar", issue = "134645")]
pub fn notify_all(&self) {
self.inner.notify_all()
}
}
#[unstable(feature = "nonpoison_condvar", issue = "134645")]
impl fmt::Debug for Condvar {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Condvar").finish_non_exhaustive()
}
}
#[unstable(feature = "nonpoison_condvar", issue = "134645")]
impl Default for Condvar {
/// Creates a `Condvar` which is ready to be waited on and notified.
fn default() -> Condvar {
Condvar::new()
}
}

View File

@@ -114,7 +114,6 @@ impl<T: ?Sized> !Send for MutexGuard<'_, T> {}
#[unstable(feature = "nonpoison_mutex", issue = "134645")] #[unstable(feature = "nonpoison_mutex", issue = "134645")]
unsafe impl<T: ?Sized + Sync> Sync for MutexGuard<'_, T> {} unsafe impl<T: ?Sized + Sync> Sync for MutexGuard<'_, T> {}
// FIXME(nonpoison_condvar): Use this link instead: [`Condvar`]: crate::sync::nonpoison::Condvar
/// An RAII mutex guard returned by `MutexGuard::map`, which can point to a /// An RAII mutex guard returned by `MutexGuard::map`, which can point to a
/// subfield of the protected data. When this structure is dropped (falls out /// subfield of the protected data. When this structure is dropped (falls out
/// of scope), the lock will be unlocked. /// of scope), the lock will be unlocked.
@@ -131,7 +130,7 @@ unsafe impl<T: ?Sized + Sync> Sync for MutexGuard<'_, T> {}
/// ///
/// [`map`]: MutexGuard::map /// [`map`]: MutexGuard::map
/// [`filter_map`]: MutexGuard::filter_map /// [`filter_map`]: MutexGuard::filter_map
/// [`Condvar`]: crate::sync::Condvar /// [`Condvar`]: crate::sync::nonpoison::Condvar
#[must_use = "if unused the Mutex will immediately unlock"] #[must_use = "if unused the Mutex will immediately unlock"]
#[must_not_suspend = "holding a MappedMutexGuard across suspend \ #[must_not_suspend = "holding a MappedMutexGuard across suspend \
points can cause deadlocks, delays, \ points can cause deadlocks, delays, \
@@ -458,6 +457,11 @@ impl<T: ?Sized + fmt::Display> fmt::Display for MutexGuard<'_, T> {
} }
} }
/// For use in [`nonpoison::condvar`](super::condvar).
pub(super) fn guard_lock<'a, T: ?Sized>(guard: &MutexGuard<'a, T>) -> &'a sys::Mutex {
&guard.lock.inner
}
impl<'a, T: ?Sized> MutexGuard<'a, T> { impl<'a, T: ?Sized> MutexGuard<'a, T> {
/// Makes a [`MappedMutexGuard`] for a component of the borrowed data, e.g. /// Makes a [`MappedMutexGuard`] for a component of the borrowed data, e.g.
/// an enum variant. /// an enum variant.

View File

@@ -61,7 +61,7 @@
//! then the lock will not be poisoned. //! then the lock will not be poisoned.
#[stable(feature = "rust1", since = "1.0.0")] #[stable(feature = "rust1", since = "1.0.0")]
pub use self::condvar::{Condvar, WaitTimeoutResult}; pub use self::condvar::Condvar;
#[unstable(feature = "mapped_lock_guards", issue = "117108")] #[unstable(feature = "mapped_lock_guards", issue = "117108")]
pub use self::mutex::MappedMutexGuard; pub use self::mutex::MappedMutexGuard;
#[stable(feature = "rust1", since = "1.0.0")] #[stable(feature = "rust1", since = "1.0.0")]

View File

@@ -1,73 +1,9 @@
use crate::fmt; use crate::fmt;
use crate::sync::WaitTimeoutResult;
use crate::sync::poison::{self, LockResult, MutexGuard, PoisonError, mutex}; use crate::sync::poison::{self, LockResult, MutexGuard, PoisonError, mutex};
use crate::sys::sync as sys; use crate::sys::sync as sys;
use crate::time::{Duration, Instant}; use crate::time::{Duration, Instant};
/// A type indicating whether a timed wait on a condition variable returned
/// due to a time out or not.
///
/// It is returned by the [`wait_timeout`] method.
///
/// [`wait_timeout`]: Condvar::wait_timeout
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
#[stable(feature = "wait_timeout", since = "1.5.0")]
pub struct WaitTimeoutResult(bool);
// FIXME(nonpoison_condvar): `WaitTimeoutResult` is actually poisoning-agnostic, it seems.
// Should we take advantage of this fact?
impl WaitTimeoutResult {
/// Returns `true` if the wait was known to have timed out.
///
/// # Examples
///
/// This example spawns a thread which will sleep 20 milliseconds before
/// updating a boolean value and then notifying the condvar.
///
/// The main thread will wait with a 10 millisecond timeout on the condvar
/// and will leave the loop upon timeout.
///
/// ```
/// use std::sync::{Arc, Condvar, Mutex};
/// use std::thread;
/// use std::time::Duration;
///
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
/// let pair2 = Arc::clone(&pair);
///
/// # let handle =
/// thread::spawn(move || {
/// let (lock, cvar) = &*pair2;
///
/// // Let's wait 20 milliseconds before notifying the condvar.
/// thread::sleep(Duration::from_millis(20));
///
/// let mut started = lock.lock().unwrap();
/// // We update the boolean value.
/// *started = true;
/// cvar.notify_one();
/// });
///
/// // Wait for the thread to start up.
/// let (lock, cvar) = &*pair;
/// loop {
/// // Let's put a timeout on the condvar's wait.
/// let result = cvar.wait_timeout(lock.lock().unwrap(), Duration::from_millis(10)).unwrap();
/// // 10 milliseconds have passed.
/// if result.1.timed_out() {
/// // timed out now and we can leave.
/// break
/// }
/// }
/// # // Prevent leaks for Miri.
/// # let _ = handle.join();
/// ```
#[must_use]
#[stable(feature = "wait_timeout", since = "1.5.0")]
pub fn timed_out(&self) -> bool {
self.0
}
}
/// A Condition Variable /// A Condition Variable
/// ///
/// Condition variables represent the ability to block a thread such that it /// Condition variables represent the ability to block a thread such that it

View File

@@ -757,11 +757,13 @@ impl<T: ?Sized + fmt::Display> fmt::Display for MutexGuard<'_, T> {
} }
} }
pub fn guard_lock<'a, T: ?Sized>(guard: &MutexGuard<'a, T>) -> &'a sys::Mutex { /// For use in [`nonpoison::condvar`](super::condvar).
pub(super) fn guard_lock<'a, T: ?Sized>(guard: &MutexGuard<'a, T>) -> &'a sys::Mutex {
&guard.lock.inner &guard.lock.inner
} }
pub fn guard_poison<'a, T: ?Sized>(guard: &MutexGuard<'a, T>) -> &'a poison::Flag { /// For use in [`nonpoison::condvar`](super::condvar).
pub(super) fn guard_poison<'a, T: ?Sized>(guard: &MutexGuard<'a, T>) -> &'a poison::Flag {
&guard.lock.poison &guard.lock.poison
} }

View File

@@ -1,36 +1,49 @@
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::sync::{Arc, Condvar, Mutex};
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
#[test] use super::nonpoison_and_poison_unwrap_test;
fn smoke() {
nonpoison_and_poison_unwrap_test!(
name: smoke,
test_body: {
use locks::Condvar;
let c = Condvar::new(); let c = Condvar::new();
c.notify_one(); c.notify_one();
c.notify_all(); c.notify_all();
} }
);
#[test]
#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // no threads #[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // no threads
fn notify_one() { nonpoison_and_poison_unwrap_test!(
name: notify_one,
test_body: {
use locks::{Condvar, Mutex};
let m = Arc::new(Mutex::new(())); let m = Arc::new(Mutex::new(()));
let m2 = m.clone(); let m2 = m.clone();
let c = Arc::new(Condvar::new()); let c = Arc::new(Condvar::new());
let c2 = c.clone(); let c2 = c.clone();
let g = m.lock().unwrap(); let g = maybe_unwrap(m.lock());
let _t = thread::spawn(move || { let _t = thread::spawn(move || {
let _g = m2.lock().unwrap(); let _g = maybe_unwrap(m2.lock());
c2.notify_one(); c2.notify_one();
}); });
let g = c.wait(g).unwrap(); let g = maybe_unwrap(c.wait(g));
drop(g); drop(g);
} }
);
#[test]
#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // no threads #[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // no threads
fn notify_all() { nonpoison_and_poison_unwrap_test!(
name: notify_all,
test_body: {
use locks::{Condvar, Mutex};
const N: usize = 10; const N: usize = 10;
let data = Arc::new((Mutex::new(0), Condvar::new())); let data = Arc::new((Mutex::new(0), Condvar::new()));
@@ -40,13 +53,13 @@ fn notify_all() {
let tx = tx.clone(); let tx = tx.clone();
thread::spawn(move || { thread::spawn(move || {
let &(ref lock, ref cond) = &*data; let &(ref lock, ref cond) = &*data;
let mut cnt = lock.lock().unwrap(); let mut cnt = maybe_unwrap(lock.lock());
*cnt += 1; *cnt += 1;
if *cnt == N { if *cnt == N {
tx.send(()).unwrap(); tx.send(()).unwrap();
} }
while *cnt != 0 { while *cnt != 0 {
cnt = cond.wait(cnt).unwrap(); cnt = maybe_unwrap(cond.wait(cnt));
} }
tx.send(()).unwrap(); tx.send(()).unwrap();
}); });
@@ -55,7 +68,7 @@ fn notify_all() {
let &(ref lock, ref cond) = &*data; let &(ref lock, ref cond) = &*data;
rx.recv().unwrap(); rx.recv().unwrap();
let mut cnt = lock.lock().unwrap(); let mut cnt = maybe_unwrap(lock.lock());
*cnt = 0; *cnt = 0;
cond.notify_all(); cond.notify_all();
drop(cnt); drop(cnt);
@@ -64,17 +77,58 @@ fn notify_all() {
rx.recv().unwrap(); rx.recv().unwrap();
} }
} }
);
#[test]
#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // no threads #[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // no threads
fn wait_while() { nonpoison_and_poison_unwrap_test!(
name: test_mutex_arc_condvar,
test_body: {
use locks::{Condvar, Mutex};
struct Packet<T>(Arc<(Mutex<T>, Condvar)>);
let packet = Packet(Arc::new((Mutex::new(false), Condvar::new())));
let packet2 = Packet(packet.0.clone());
let (tx, rx) = channel();
let _t = thread::spawn(move || {
// Wait until our parent has taken the lock.
rx.recv().unwrap();
let &(ref lock, ref cvar) = &*packet2.0;
// Set the data to `true` and wake up our parent.
let mut guard = maybe_unwrap(lock.lock());
*guard = true;
cvar.notify_one();
});
let &(ref lock, ref cvar) = &*packet.0;
let mut guard = maybe_unwrap(lock.lock());
// Wake up our child.
tx.send(()).unwrap();
// Wait until our child has set the data to `true`.
assert!(!*guard);
while !*guard {
guard = maybe_unwrap(cvar.wait(guard));
}
}
);
#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // no threads
nonpoison_and_poison_unwrap_test!(
name: wait_while,
test_body: {
use locks::{Condvar, Mutex};
let pair = Arc::new((Mutex::new(false), Condvar::new())); let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair2 = pair.clone(); let pair2 = pair.clone();
// Inside of our lock, spawn a new thread, and then wait for it to start. // Inside of our lock, spawn a new thread, and then wait for it to start.
thread::spawn(move || { thread::spawn(move || {
let &(ref lock, ref cvar) = &*pair2; let &(ref lock, ref cvar) = &*pair2;
let mut started = lock.lock().unwrap(); let mut started = maybe_unwrap(lock.lock());
*started = true; *started = true;
// We notify the condvar that the value has changed. // We notify the condvar that the value has changed.
cvar.notify_one(); cvar.notify_one();
@@ -82,19 +136,23 @@ fn wait_while() {
// Wait for the thread to start up. // Wait for the thread to start up.
let &(ref lock, ref cvar) = &*pair; let &(ref lock, ref cvar) = &*pair;
let guard = cvar.wait_while(lock.lock().unwrap(), |started| !*started); let guard = cvar.wait_while(maybe_unwrap(lock.lock()), |started| !*started);
assert!(*guard.unwrap()); assert!(*maybe_unwrap(guard));
} }
);
#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // no threads
nonpoison_and_poison_unwrap_test!(
name: wait_timeout_wait,
test_body: {
use locks::{Condvar, Mutex};
#[test]
#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // condvar wait not supported
fn wait_timeout_wait() {
let m = Arc::new(Mutex::new(())); let m = Arc::new(Mutex::new(()));
let c = Arc::new(Condvar::new()); let c = Arc::new(Condvar::new());
loop { loop {
let g = m.lock().unwrap(); let g = maybe_unwrap(m.lock());
let (_g, no_timeout) = c.wait_timeout(g, Duration::from_millis(1)).unwrap(); let (_g, no_timeout) = maybe_unwrap(c.wait_timeout(g, Duration::from_millis(1)));
// spurious wakeups mean this isn't necessarily true // spurious wakeups mean this isn't necessarily true
// so execute test again, if not timeout // so execute test again, if not timeout
if !no_timeout.timed_out() { if !no_timeout.timed_out() {
@@ -104,62 +162,81 @@ fn wait_timeout_wait() {
break; break;
} }
} }
);
#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // no threads
nonpoison_and_poison_unwrap_test!(
name: wait_timeout_while_wait,
test_body: {
use locks::{Condvar, Mutex};
#[test]
#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // condvar wait not supported
fn wait_timeout_while_wait() {
let m = Arc::new(Mutex::new(())); let m = Arc::new(Mutex::new(()));
let c = Arc::new(Condvar::new()); let c = Arc::new(Condvar::new());
let g = m.lock().unwrap(); let g = maybe_unwrap(m.lock());
let (_g, wait) = c.wait_timeout_while(g, Duration::from_millis(1), |_| true).unwrap(); let (_g, wait) = maybe_unwrap(c.wait_timeout_while(g, Duration::from_millis(1), |_| true));
// no spurious wakeups. ensure it timed-out // no spurious wakeups. ensure it timed-out
assert!(wait.timed_out()); assert!(wait.timed_out());
} }
);
#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // no threads
nonpoison_and_poison_unwrap_test!(
name: wait_timeout_while_instant_satisfy,
test_body: {
use locks::{Condvar, Mutex};
#[test]
#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // condvar wait not supported
fn wait_timeout_while_instant_satisfy() {
let m = Arc::new(Mutex::new(())); let m = Arc::new(Mutex::new(()));
let c = Arc::new(Condvar::new()); let c = Arc::new(Condvar::new());
let g = m.lock().unwrap(); let g = maybe_unwrap(m.lock());
let (_g, wait) = c.wait_timeout_while(g, Duration::from_millis(0), |_| false).unwrap(); let (_g, wait) =
maybe_unwrap(c.wait_timeout_while(g, Duration::from_millis(0), |_| false));
// ensure it didn't time-out even if we were not given any time. // ensure it didn't time-out even if we were not given any time.
assert!(!wait.timed_out()); assert!(!wait.timed_out());
} }
);
#[test]
#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // no threads #[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // no threads
fn wait_timeout_while_wake() { nonpoison_and_poison_unwrap_test!(
name: wait_timeout_while_wake,
test_body: {
use locks::{Condvar, Mutex};
let pair = Arc::new((Mutex::new(false), Condvar::new())); let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair_copy = pair.clone(); let pair_copy = pair.clone();
let &(ref m, ref c) = &*pair; let &(ref m, ref c) = &*pair;
let g = m.lock().unwrap(); let g = maybe_unwrap(m.lock());
let _t = thread::spawn(move || { let _t = thread::spawn(move || {
let &(ref lock, ref cvar) = &*pair_copy; let &(ref lock, ref cvar) = &*pair_copy;
let mut started = lock.lock().unwrap(); let mut started = maybe_unwrap(lock.lock());
thread::sleep(Duration::from_millis(1)); thread::sleep(Duration::from_millis(1));
*started = true; *started = true;
cvar.notify_one(); cvar.notify_one();
}); });
let (g2, wait) = c let (g2, wait) = maybe_unwrap(c.wait_timeout_while(
.wait_timeout_while(g, Duration::from_millis(u64::MAX), |&mut notified| !notified) g,
.unwrap(); Duration::from_millis(u64::MAX),
|&mut notified| !notified
));
// ensure it didn't time-out even if we were not given any time. // ensure it didn't time-out even if we were not given any time.
assert!(!wait.timed_out()); assert!(!wait.timed_out());
assert!(*g2); assert!(*g2);
} }
);
#[test]
#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // no threads #[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // no threads
fn wait_timeout_wake() { nonpoison_and_poison_unwrap_test!(
name: wait_timeout_wake,
test_body: {
use locks::{Condvar, Mutex};
let m = Arc::new(Mutex::new(())); let m = Arc::new(Mutex::new(()));
let c = Arc::new(Condvar::new()); let c = Arc::new(Condvar::new());
loop { loop {
let g = m.lock().unwrap(); let g = maybe_unwrap(m.lock());
let c2 = c.clone(); let c2 = c.clone();
let m2 = m.clone(); let m2 = m.clone();
@@ -168,12 +245,13 @@ fn wait_timeout_wake() {
let notified_copy = notified.clone(); let notified_copy = notified.clone();
let t = thread::spawn(move || { let t = thread::spawn(move || {
let _g = m2.lock().unwrap(); let _g = maybe_unwrap(m2.lock());
thread::sleep(Duration::from_millis(1)); thread::sleep(Duration::from_millis(1));
notified_copy.store(true, Ordering::Relaxed); notified_copy.store(true, Ordering::Relaxed);
c2.notify_one(); c2.notify_one();
}); });
let (g, timeout_res) = c.wait_timeout(g, Duration::from_millis(u64::MAX)).unwrap(); let (g, timeout_res) =
maybe_unwrap(c.wait_timeout(g, Duration::from_millis(u64::MAX)));
assert!(!timeout_res.timed_out()); assert!(!timeout_res.timed_out());
// spurious wakeups mean this isn't necessarily true // spurious wakeups mean this isn't necessarily true
// so execute test again, if not notified // so execute test again, if not notified
@@ -188,3 +266,4 @@ fn wait_timeout_wake() {
break; break;
} }
} }
);

View File

@@ -7,6 +7,7 @@
#![feature(rwlock_downgrade)] #![feature(rwlock_downgrade)]
#![feature(std_internals)] #![feature(std_internals)]
#![feature(sync_nonpoison)] #![feature(sync_nonpoison)]
#![feature(nonpoison_condvar)]
#![feature(nonpoison_mutex)] #![feature(nonpoison_mutex)]
#![feature(nonpoison_rwlock)] #![feature(nonpoison_rwlock)]
#![allow(internal_features)] #![allow(internal_features)]

View File

@@ -213,40 +213,6 @@ nonpoison_and_poison_unwrap_test!(
} }
); );
// FIXME(nonpoison_condvar): Move this to the `condvar.rs` test file once `nonpoison::condvar` gets
// implemented.
#[test]
fn test_mutex_arc_condvar() {
struct Packet<T>(Arc<(Mutex<T>, Condvar)>);
let packet = Packet(Arc::new((Mutex::new(false), Condvar::new())));
let packet2 = Packet(packet.0.clone());
let (tx, rx) = channel();
let _t = thread::spawn(move || {
// Wait until our parent has taken the lock.
rx.recv().unwrap();
let &(ref lock, ref cvar) = &*packet2.0;
// Set the data to `true` and wake up our parent.
let mut guard = lock.lock().unwrap();
*guard = true;
cvar.notify_one();
});
let &(ref lock, ref cvar) = &*packet.0;
let mut guard = lock.lock().unwrap();
// Wake up our child.
tx.send(()).unwrap();
// Wait until our child has set the data to `true`.
assert!(!*guard);
while !*guard {
guard = cvar.wait(guard).unwrap();
}
}
nonpoison_and_poison_unwrap_test!( nonpoison_and_poison_unwrap_test!(
name: test_mutex_arc_nested, name: test_mutex_arc_nested,
test_body: { test_body: {