std: Optimize thread park/unpark implementation
This is an adaptation of alexcrichton/futures-rs#597 for the standard library. The goal here is to avoid locking a mutex on the "fast path" for thread park/unpark where you're waking up a thread that isn't sleeping or otherwise trying to park a thread that's already been notified. Mutex performance varies quite a bit across platforms so this should provide a nice consistent speed boost for the fast path of these functions.
This commit is contained in:
@@ -171,6 +171,8 @@ use panic;
|
||||
use panicking;
|
||||
use str;
|
||||
use sync::{Mutex, Condvar, Arc};
|
||||
use sync::atomic::AtomicUsize;
|
||||
use sync::atomic::Ordering::SeqCst;
|
||||
use sys::thread as imp;
|
||||
use sys_common::mutex;
|
||||
use sys_common::thread_info;
|
||||
@@ -694,6 +696,11 @@ pub fn sleep(dur: Duration) {
|
||||
imp::Thread::sleep(dur)
|
||||
}
|
||||
|
||||
// constants for park/unpark
|
||||
const EMPTY: usize = 0;
|
||||
const PARKED: usize = 1;
|
||||
const NOTIFIED: usize = 2;
|
||||
|
||||
/// Blocks unless or until the current thread's token is made available.
|
||||
///
|
||||
/// A call to `park` does not guarantee that the thread will remain parked
|
||||
@@ -771,11 +778,27 @@ pub fn sleep(dur: Duration) {
|
||||
#[stable(feature = "rust1", since = "1.0.0")]
|
||||
pub fn park() {
|
||||
let thread = current();
|
||||
let mut guard = thread.inner.lock.lock().unwrap();
|
||||
while !*guard {
|
||||
guard = thread.inner.cvar.wait(guard).unwrap();
|
||||
|
||||
// If we were previously notified then we consume this notification and
|
||||
// return quickly.
|
||||
if thread.inner.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
|
||||
return
|
||||
}
|
||||
|
||||
// Otherwise we need to coordinate going to sleep
|
||||
let mut m = thread.inner.lock.lock().unwrap();
|
||||
match thread.inner.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
|
||||
Ok(_) => {}
|
||||
Err(NOTIFIED) => return, // notified after we locked
|
||||
Err(_) => panic!("inconsistent park state"),
|
||||
}
|
||||
loop {
|
||||
m = thread.inner.cvar.wait(m).unwrap();
|
||||
match thread.inner.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) {
|
||||
Ok(_) => return, // got a notification
|
||||
Err(_) => {} // spurious wakeup, go back to sleep
|
||||
}
|
||||
}
|
||||
*guard = false;
|
||||
}
|
||||
|
||||
/// Use [`park_timeout`].
|
||||
@@ -842,12 +865,30 @@ pub fn park_timeout_ms(ms: u32) {
|
||||
#[stable(feature = "park_timeout", since = "1.4.0")]
|
||||
pub fn park_timeout(dur: Duration) {
|
||||
let thread = current();
|
||||
let mut guard = thread.inner.lock.lock().unwrap();
|
||||
if !*guard {
|
||||
let (g, _) = thread.inner.cvar.wait_timeout(guard, dur).unwrap();
|
||||
guard = g;
|
||||
|
||||
// Like `park` above we have a fast path for an already-notified thread, and
|
||||
// afterwards we start coordinating for a sleep.
|
||||
// return quickly.
|
||||
if thread.inner.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
|
||||
return
|
||||
}
|
||||
let m = thread.inner.lock.lock().unwrap();
|
||||
match thread.inner.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
|
||||
Ok(_) => {}
|
||||
Err(NOTIFIED) => return, // notified after we locked
|
||||
Err(_) => panic!("inconsistent park_timeout state"),
|
||||
}
|
||||
|
||||
// Wait with a timeout, and if we spuriously wake up or otherwise wake up
|
||||
// from a notification we just want to unconditionally set the state back to
|
||||
// empty, either consuming a notification or un-flagging ourselves as
|
||||
// parked.
|
||||
let (_m, _result) = thread.inner.cvar.wait_timeout(m, dur).unwrap();
|
||||
match thread.inner.state.swap(EMPTY, SeqCst) {
|
||||
NOTIFIED => {} // got a notification, hurray!
|
||||
PARKED => {} // no notification, alas
|
||||
n => panic!("inconsistent park_timeout state: {}", n),
|
||||
}
|
||||
*guard = false;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
@@ -914,7 +955,10 @@ impl ThreadId {
|
||||
struct Inner {
|
||||
name: Option<CString>, // Guaranteed to be UTF-8
|
||||
id: ThreadId,
|
||||
lock: Mutex<bool>, // true when there is a buffered unpark
|
||||
|
||||
// state for thread park/unpark
|
||||
state: AtomicUsize,
|
||||
lock: Mutex<()>,
|
||||
cvar: Condvar,
|
||||
}
|
||||
|
||||
@@ -958,7 +1002,8 @@ impl Thread {
|
||||
inner: Arc::new(Inner {
|
||||
name: cname,
|
||||
id: ThreadId::new(),
|
||||
lock: Mutex::new(false),
|
||||
state: AtomicUsize::new(EMPTY),
|
||||
lock: Mutex::new(()),
|
||||
cvar: Condvar::new(),
|
||||
})
|
||||
}
|
||||
@@ -998,10 +1043,22 @@ impl Thread {
|
||||
/// [park]: fn.park.html
|
||||
#[stable(feature = "rust1", since = "1.0.0")]
|
||||
pub fn unpark(&self) {
|
||||
let mut guard = self.inner.lock.lock().unwrap();
|
||||
if !*guard {
|
||||
*guard = true;
|
||||
self.inner.cvar.notify_one();
|
||||
loop {
|
||||
match self.inner.state.compare_exchange(EMPTY, NOTIFIED, SeqCst, SeqCst) {
|
||||
Ok(_) => return, // no one was waiting
|
||||
Err(NOTIFIED) => return, // already unparked
|
||||
Err(PARKED) => {} // gotta go wake someone up
|
||||
_ => panic!("inconsistent state in unpark"),
|
||||
}
|
||||
|
||||
// Coordinate wakeup through the mutex and a condvar notification
|
||||
let _lock = self.inner.lock.lock().unwrap();
|
||||
match self.inner.state.compare_exchange(PARKED, NOTIFIED, SeqCst, SeqCst) {
|
||||
Ok(_) => return self.inner.cvar.notify_one(),
|
||||
Err(NOTIFIED) => return, // a different thread unparked
|
||||
Err(EMPTY) => {} // parked thread went away, try again
|
||||
_ => panic!("inconsistent state in unpark"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user