Auto merge of #96393 - joboet:pthread_parker, r=thomcc
std: directly use pthread in UNIX parker implementation `Mutex` and `Condvar` are being replaced by more efficient implementations, which need thread parking themselves (see #93740). Therefore we should use the `pthread` synchronization primitives directly. Also, we can avoid allocating the mutex and condition variable because the `Parker` struct is being placed in an `Arc` anyways. This basically is just a copy of the current `Mutex` and `Condvar` code, which will however be removed (again, see #93740). An alternative implementation could be to use dedicated private `OsMutex` and `OsCondvar` types, but all the other platforms supported by std actually have their own thread parking primitives. I used `Pin` to guarantee a stable address for the `Parker` struct, while the current implementation does not, rather using extra unsafe declaration. Since the thread struct is shared anyways, I assumed this would not add too much clutter while being clearer.
This commit is contained in:
@@ -1,3 +1,4 @@
|
||||
use crate::pin::Pin;
|
||||
use crate::sync::atomic::AtomicU32;
|
||||
use crate::sync::atomic::Ordering::{Acquire, Release};
|
||||
use crate::sys::futex::{futex_wait, futex_wake};
|
||||
@@ -32,14 +33,15 @@ pub struct Parker {
|
||||
// Ordering::Release when writing NOTIFIED (the 'token') in unpark(), and using
|
||||
// Ordering::Acquire when checking for this state in park().
|
||||
impl Parker {
|
||||
#[inline]
|
||||
pub const fn new() -> Self {
|
||||
Parker { state: AtomicU32::new(EMPTY) }
|
||||
/// Construct the futex parker. The UNIX parker implementation
|
||||
/// requires this to happen in-place.
|
||||
pub unsafe fn new(parker: *mut Parker) {
|
||||
parker.write(Self { state: AtomicU32::new(EMPTY) });
|
||||
}
|
||||
|
||||
// Assumes this is only called by the thread that owns the Parker,
|
||||
// which means that `self.state != PARKED`.
|
||||
pub unsafe fn park(&self) {
|
||||
pub unsafe fn park(self: Pin<&Self>) {
|
||||
// Change NOTIFIED=>EMPTY or EMPTY=>PARKED, and directly return in the
|
||||
// first case.
|
||||
if self.state.fetch_sub(1, Acquire) == NOTIFIED {
|
||||
@@ -58,8 +60,9 @@ impl Parker {
|
||||
}
|
||||
|
||||
// Assumes this is only called by the thread that owns the Parker,
|
||||
// which means that `self.state != PARKED`.
|
||||
pub unsafe fn park_timeout(&self, timeout: Duration) {
|
||||
// which means that `self.state != PARKED`. This implementation doesn't
|
||||
// require `Pin`, but other implementations do.
|
||||
pub unsafe fn park_timeout(self: Pin<&Self>, timeout: Duration) {
|
||||
// Change NOTIFIED=>EMPTY or EMPTY=>PARKED, and directly return in the
|
||||
// first case.
|
||||
if self.state.fetch_sub(1, Acquire) == NOTIFIED {
|
||||
@@ -78,8 +81,9 @@ impl Parker {
|
||||
}
|
||||
}
|
||||
|
||||
// This implementation doesn't require `Pin`, but other implementations do.
|
||||
#[inline]
|
||||
pub fn unpark(&self) {
|
||||
pub fn unpark(self: Pin<&Self>) {
|
||||
// Change PARKED=>NOTIFIED, EMPTY=>NOTIFIED, or NOTIFIED=>NOTIFIED, and
|
||||
// wake the thread in the first case.
|
||||
//
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
//! Parker implementation based on a Mutex and Condvar.
|
||||
|
||||
use crate::pin::Pin;
|
||||
use crate::sync::atomic::AtomicUsize;
|
||||
use crate::sync::atomic::Ordering::SeqCst;
|
||||
use crate::sync::{Condvar, Mutex};
|
||||
@@ -16,13 +17,18 @@ pub struct Parker {
|
||||
}
|
||||
|
||||
impl Parker {
|
||||
pub fn new() -> Self {
|
||||
Parker { state: AtomicUsize::new(EMPTY), lock: Mutex::new(()), cvar: Condvar::new() }
|
||||
/// Construct the generic parker. The UNIX parker implementation
|
||||
/// requires this to happen in-place.
|
||||
pub unsafe fn new(parker: *mut Parker) {
|
||||
parker.write(Parker {
|
||||
state: AtomicUsize::new(EMPTY),
|
||||
lock: Mutex::new(()),
|
||||
cvar: Condvar::new(),
|
||||
});
|
||||
}
|
||||
|
||||
// This implementation doesn't require `unsafe`, but other implementations
|
||||
// may assume this is only called by the thread that owns the Parker.
|
||||
pub unsafe fn park(&self) {
|
||||
// This implementation doesn't require `unsafe` and `Pin`, but other implementations do.
|
||||
pub unsafe fn park(self: Pin<&Self>) {
|
||||
// If we were previously notified then we consume this notification and
|
||||
// return quickly.
|
||||
if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
|
||||
@@ -55,9 +61,8 @@ impl Parker {
|
||||
}
|
||||
}
|
||||
|
||||
// This implementation doesn't require `unsafe`, but other implementations
|
||||
// may assume this is only called by the thread that owns the Parker.
|
||||
pub unsafe fn park_timeout(&self, dur: Duration) {
|
||||
// This implementation doesn't require `unsafe` and `Pin`, but other implementations do.
|
||||
pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) {
|
||||
// Like `park` above we have a fast path for an already-notified thread, and
|
||||
// afterwards we start coordinating for a sleep.
|
||||
// return quickly.
|
||||
@@ -88,7 +93,8 @@ impl Parker {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn unpark(&self) {
|
||||
// This implementation doesn't require `Pin`, but other implementations do.
|
||||
pub fn unpark(self: Pin<&Self>) {
|
||||
// To ensure the unparked thread will observe any writes we made
|
||||
// before this call, we must perform a release operation that `park`
|
||||
// can synchronize with. To do that we must write `NOTIFIED` even if
|
||||
|
||||
@@ -8,6 +8,8 @@ cfg_if::cfg_if! {
|
||||
pub use futex::Parker;
|
||||
} else if #[cfg(windows)] {
|
||||
pub use crate::sys::thread_parker::Parker;
|
||||
} else if #[cfg(target_family = "unix")] {
|
||||
pub use crate::sys::thread_parker::Parker;
|
||||
} else {
|
||||
mod generic;
|
||||
pub use generic::Parker;
|
||||
|
||||
Reference in New Issue
Block a user