std: move "mod tests/benches" to separate files
Also doing fmt inplace as requested.
This commit is contained in:
@@ -1,3 +1,6 @@
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
use crate::fmt;
|
||||
use crate::sync::{Condvar, Mutex};
|
||||
|
||||
@@ -174,42 +177,3 @@ impl BarrierWaitResult {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::sync::mpsc::{channel, TryRecvError};
|
||||
use crate::sync::{Arc, Barrier};
|
||||
use crate::thread;
|
||||
|
||||
#[test]
|
||||
#[cfg_attr(target_os = "emscripten", ignore)]
|
||||
fn test_barrier() {
|
||||
const N: usize = 10;
|
||||
|
||||
let barrier = Arc::new(Barrier::new(N));
|
||||
let (tx, rx) = channel();
|
||||
|
||||
for _ in 0..N - 1 {
|
||||
let c = barrier.clone();
|
||||
let tx = tx.clone();
|
||||
thread::spawn(move || {
|
||||
tx.send(c.wait().is_leader()).unwrap();
|
||||
});
|
||||
}
|
||||
|
||||
// At this point, all spawned threads should be blocked,
|
||||
// so we shouldn't get anything from the port
|
||||
assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
|
||||
|
||||
let mut leader_found = barrier.wait().is_leader();
|
||||
|
||||
// Now, the barrier is cleared and we should get data.
|
||||
for _ in 0..N - 1 {
|
||||
if rx.recv().unwrap() {
|
||||
assert!(!leader_found);
|
||||
leader_found = true;
|
||||
}
|
||||
}
|
||||
assert!(leader_found);
|
||||
}
|
||||
}
|
||||
|
||||
35
library/std/src/sync/barrier/tests.rs
Normal file
35
library/std/src/sync/barrier/tests.rs
Normal file
@@ -0,0 +1,35 @@
|
||||
use crate::sync::mpsc::{channel, TryRecvError};
|
||||
use crate::sync::{Arc, Barrier};
|
||||
use crate::thread;
|
||||
|
||||
#[test]
|
||||
#[cfg_attr(target_os = "emscripten", ignore)]
|
||||
fn test_barrier() {
|
||||
const N: usize = 10;
|
||||
|
||||
let barrier = Arc::new(Barrier::new(N));
|
||||
let (tx, rx) = channel();
|
||||
|
||||
for _ in 0..N - 1 {
|
||||
let c = barrier.clone();
|
||||
let tx = tx.clone();
|
||||
thread::spawn(move || {
|
||||
tx.send(c.wait().is_leader()).unwrap();
|
||||
});
|
||||
}
|
||||
|
||||
// At this point, all spawned threads should be blocked,
|
||||
// so we shouldn't get anything from the port
|
||||
assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
|
||||
|
||||
let mut leader_found = barrier.wait().is_leader();
|
||||
|
||||
// Now, the barrier is cleared and we should get data.
|
||||
for _ in 0..N - 1 {
|
||||
if rx.recv().unwrap() {
|
||||
assert!(!leader_found);
|
||||
leader_found = true;
|
||||
}
|
||||
}
|
||||
assert!(leader_found);
|
||||
}
|
||||
@@ -1,3 +1,6 @@
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
use crate::fmt;
|
||||
use crate::sync::atomic::{AtomicUsize, Ordering};
|
||||
use crate::sync::{mutex, MutexGuard, PoisonError};
|
||||
@@ -598,218 +601,3 @@ impl Drop for Condvar {
|
||||
unsafe { self.inner.destroy() }
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::sync::atomic::{AtomicBool, Ordering};
|
||||
use crate::sync::mpsc::channel;
|
||||
use crate::sync::{Arc, Condvar, Mutex};
|
||||
use crate::thread;
|
||||
use crate::time::Duration;
|
||||
|
||||
#[test]
|
||||
fn smoke() {
|
||||
let c = Condvar::new();
|
||||
c.notify_one();
|
||||
c.notify_all();
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg_attr(target_os = "emscripten", ignore)]
|
||||
fn notify_one() {
|
||||
let m = Arc::new(Mutex::new(()));
|
||||
let m2 = m.clone();
|
||||
let c = Arc::new(Condvar::new());
|
||||
let c2 = c.clone();
|
||||
|
||||
let g = m.lock().unwrap();
|
||||
let _t = thread::spawn(move || {
|
||||
let _g = m2.lock().unwrap();
|
||||
c2.notify_one();
|
||||
});
|
||||
let g = c.wait(g).unwrap();
|
||||
drop(g);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg_attr(target_os = "emscripten", ignore)]
|
||||
fn notify_all() {
|
||||
const N: usize = 10;
|
||||
|
||||
let data = Arc::new((Mutex::new(0), Condvar::new()));
|
||||
let (tx, rx) = channel();
|
||||
for _ in 0..N {
|
||||
let data = data.clone();
|
||||
let tx = tx.clone();
|
||||
thread::spawn(move || {
|
||||
let &(ref lock, ref cond) = &*data;
|
||||
let mut cnt = lock.lock().unwrap();
|
||||
*cnt += 1;
|
||||
if *cnt == N {
|
||||
tx.send(()).unwrap();
|
||||
}
|
||||
while *cnt != 0 {
|
||||
cnt = cond.wait(cnt).unwrap();
|
||||
}
|
||||
tx.send(()).unwrap();
|
||||
});
|
||||
}
|
||||
drop(tx);
|
||||
|
||||
let &(ref lock, ref cond) = &*data;
|
||||
rx.recv().unwrap();
|
||||
let mut cnt = lock.lock().unwrap();
|
||||
*cnt = 0;
|
||||
cond.notify_all();
|
||||
drop(cnt);
|
||||
|
||||
for _ in 0..N {
|
||||
rx.recv().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg_attr(target_os = "emscripten", ignore)]
|
||||
fn wait_while() {
|
||||
let pair = Arc::new((Mutex::new(false), Condvar::new()));
|
||||
let pair2 = pair.clone();
|
||||
|
||||
// Inside of our lock, spawn a new thread, and then wait for it to start.
|
||||
thread::spawn(move || {
|
||||
let &(ref lock, ref cvar) = &*pair2;
|
||||
let mut started = lock.lock().unwrap();
|
||||
*started = true;
|
||||
// We notify the condvar that the value has changed.
|
||||
cvar.notify_one();
|
||||
});
|
||||
|
||||
// Wait for the thread to start up.
|
||||
let &(ref lock, ref cvar) = &*pair;
|
||||
let guard = cvar.wait_while(lock.lock().unwrap(), |started| !*started);
|
||||
assert!(*guard.unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg_attr(target_os = "emscripten", ignore)]
|
||||
fn wait_timeout_wait() {
|
||||
let m = Arc::new(Mutex::new(()));
|
||||
let c = Arc::new(Condvar::new());
|
||||
|
||||
loop {
|
||||
let g = m.lock().unwrap();
|
||||
let (_g, no_timeout) = c.wait_timeout(g, Duration::from_millis(1)).unwrap();
|
||||
// spurious wakeups mean this isn't necessarily true
|
||||
// so execute test again, if not timeout
|
||||
if !no_timeout.timed_out() {
|
||||
continue;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg_attr(target_os = "emscripten", ignore)]
|
||||
fn wait_timeout_while_wait() {
|
||||
let m = Arc::new(Mutex::new(()));
|
||||
let c = Arc::new(Condvar::new());
|
||||
|
||||
let g = m.lock().unwrap();
|
||||
let (_g, wait) = c.wait_timeout_while(g, Duration::from_millis(1), |_| true).unwrap();
|
||||
// no spurious wakeups. ensure it timed-out
|
||||
assert!(wait.timed_out());
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg_attr(target_os = "emscripten", ignore)]
|
||||
fn wait_timeout_while_instant_satisfy() {
|
||||
let m = Arc::new(Mutex::new(()));
|
||||
let c = Arc::new(Condvar::new());
|
||||
|
||||
let g = m.lock().unwrap();
|
||||
let (_g, wait) = c.wait_timeout_while(g, Duration::from_millis(0), |_| false).unwrap();
|
||||
// ensure it didn't time-out even if we were not given any time.
|
||||
assert!(!wait.timed_out());
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg_attr(target_os = "emscripten", ignore)]
|
||||
fn wait_timeout_while_wake() {
|
||||
let pair = Arc::new((Mutex::new(false), Condvar::new()));
|
||||
let pair_copy = pair.clone();
|
||||
|
||||
let &(ref m, ref c) = &*pair;
|
||||
let g = m.lock().unwrap();
|
||||
let _t = thread::spawn(move || {
|
||||
let &(ref lock, ref cvar) = &*pair_copy;
|
||||
let mut started = lock.lock().unwrap();
|
||||
thread::sleep(Duration::from_millis(1));
|
||||
*started = true;
|
||||
cvar.notify_one();
|
||||
});
|
||||
let (g2, wait) = c
|
||||
.wait_timeout_while(g, Duration::from_millis(u64::MAX), |&mut notified| !notified)
|
||||
.unwrap();
|
||||
// ensure it didn't time-out even if we were not given any time.
|
||||
assert!(!wait.timed_out());
|
||||
assert!(*g2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg_attr(target_os = "emscripten", ignore)]
|
||||
fn wait_timeout_wake() {
|
||||
let m = Arc::new(Mutex::new(()));
|
||||
let c = Arc::new(Condvar::new());
|
||||
|
||||
loop {
|
||||
let g = m.lock().unwrap();
|
||||
|
||||
let c2 = c.clone();
|
||||
let m2 = m.clone();
|
||||
|
||||
let notified = Arc::new(AtomicBool::new(false));
|
||||
let notified_copy = notified.clone();
|
||||
|
||||
let t = thread::spawn(move || {
|
||||
let _g = m2.lock().unwrap();
|
||||
thread::sleep(Duration::from_millis(1));
|
||||
notified_copy.store(true, Ordering::SeqCst);
|
||||
c2.notify_one();
|
||||
});
|
||||
let (g, timeout_res) = c.wait_timeout(g, Duration::from_millis(u64::MAX)).unwrap();
|
||||
assert!(!timeout_res.timed_out());
|
||||
// spurious wakeups mean this isn't necessarily true
|
||||
// so execute test again, if not notified
|
||||
if !notified.load(Ordering::SeqCst) {
|
||||
t.join().unwrap();
|
||||
continue;
|
||||
}
|
||||
drop(g);
|
||||
|
||||
t.join().unwrap();
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
#[cfg_attr(target_os = "emscripten", ignore)]
|
||||
fn two_mutexes() {
|
||||
let m = Arc::new(Mutex::new(()));
|
||||
let m2 = m.clone();
|
||||
let c = Arc::new(Condvar::new());
|
||||
let c2 = c.clone();
|
||||
|
||||
let mut g = m.lock().unwrap();
|
||||
let _t = thread::spawn(move || {
|
||||
let _g = m2.lock().unwrap();
|
||||
c2.notify_one();
|
||||
});
|
||||
g = c.wait(g).unwrap();
|
||||
drop(g);
|
||||
|
||||
let m = Mutex::new(());
|
||||
let _ = c.wait(m.lock().unwrap()).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
211
library/std/src/sync/condvar/tests.rs
Normal file
211
library/std/src/sync/condvar/tests.rs
Normal file
@@ -0,0 +1,211 @@
|
||||
use crate::sync::atomic::{AtomicBool, Ordering};
|
||||
use crate::sync::mpsc::channel;
|
||||
use crate::sync::{Arc, Condvar, Mutex};
|
||||
use crate::thread;
|
||||
use crate::time::Duration;
|
||||
|
||||
#[test]
|
||||
fn smoke() {
|
||||
let c = Condvar::new();
|
||||
c.notify_one();
|
||||
c.notify_all();
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg_attr(target_os = "emscripten", ignore)]
|
||||
fn notify_one() {
|
||||
let m = Arc::new(Mutex::new(()));
|
||||
let m2 = m.clone();
|
||||
let c = Arc::new(Condvar::new());
|
||||
let c2 = c.clone();
|
||||
|
||||
let g = m.lock().unwrap();
|
||||
let _t = thread::spawn(move || {
|
||||
let _g = m2.lock().unwrap();
|
||||
c2.notify_one();
|
||||
});
|
||||
let g = c.wait(g).unwrap();
|
||||
drop(g);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg_attr(target_os = "emscripten", ignore)]
|
||||
fn notify_all() {
|
||||
const N: usize = 10;
|
||||
|
||||
let data = Arc::new((Mutex::new(0), Condvar::new()));
|
||||
let (tx, rx) = channel();
|
||||
for _ in 0..N {
|
||||
let data = data.clone();
|
||||
let tx = tx.clone();
|
||||
thread::spawn(move || {
|
||||
let &(ref lock, ref cond) = &*data;
|
||||
let mut cnt = lock.lock().unwrap();
|
||||
*cnt += 1;
|
||||
if *cnt == N {
|
||||
tx.send(()).unwrap();
|
||||
}
|
||||
while *cnt != 0 {
|
||||
cnt = cond.wait(cnt).unwrap();
|
||||
}
|
||||
tx.send(()).unwrap();
|
||||
});
|
||||
}
|
||||
drop(tx);
|
||||
|
||||
let &(ref lock, ref cond) = &*data;
|
||||
rx.recv().unwrap();
|
||||
let mut cnt = lock.lock().unwrap();
|
||||
*cnt = 0;
|
||||
cond.notify_all();
|
||||
drop(cnt);
|
||||
|
||||
for _ in 0..N {
|
||||
rx.recv().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg_attr(target_os = "emscripten", ignore)]
|
||||
fn wait_while() {
|
||||
let pair = Arc::new((Mutex::new(false), Condvar::new()));
|
||||
let pair2 = pair.clone();
|
||||
|
||||
// Inside of our lock, spawn a new thread, and then wait for it to start.
|
||||
thread::spawn(move || {
|
||||
let &(ref lock, ref cvar) = &*pair2;
|
||||
let mut started = lock.lock().unwrap();
|
||||
*started = true;
|
||||
// We notify the condvar that the value has changed.
|
||||
cvar.notify_one();
|
||||
});
|
||||
|
||||
// Wait for the thread to start up.
|
||||
let &(ref lock, ref cvar) = &*pair;
|
||||
let guard = cvar.wait_while(lock.lock().unwrap(), |started| !*started);
|
||||
assert!(*guard.unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg_attr(target_os = "emscripten", ignore)]
|
||||
fn wait_timeout_wait() {
|
||||
let m = Arc::new(Mutex::new(()));
|
||||
let c = Arc::new(Condvar::new());
|
||||
|
||||
loop {
|
||||
let g = m.lock().unwrap();
|
||||
let (_g, no_timeout) = c.wait_timeout(g, Duration::from_millis(1)).unwrap();
|
||||
// spurious wakeups mean this isn't necessarily true
|
||||
// so execute test again, if not timeout
|
||||
if !no_timeout.timed_out() {
|
||||
continue;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg_attr(target_os = "emscripten", ignore)]
|
||||
fn wait_timeout_while_wait() {
|
||||
let m = Arc::new(Mutex::new(()));
|
||||
let c = Arc::new(Condvar::new());
|
||||
|
||||
let g = m.lock().unwrap();
|
||||
let (_g, wait) = c.wait_timeout_while(g, Duration::from_millis(1), |_| true).unwrap();
|
||||
// no spurious wakeups. ensure it timed-out
|
||||
assert!(wait.timed_out());
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg_attr(target_os = "emscripten", ignore)]
|
||||
fn wait_timeout_while_instant_satisfy() {
|
||||
let m = Arc::new(Mutex::new(()));
|
||||
let c = Arc::new(Condvar::new());
|
||||
|
||||
let g = m.lock().unwrap();
|
||||
let (_g, wait) = c.wait_timeout_while(g, Duration::from_millis(0), |_| false).unwrap();
|
||||
// ensure it didn't time-out even if we were not given any time.
|
||||
assert!(!wait.timed_out());
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg_attr(target_os = "emscripten", ignore)]
|
||||
fn wait_timeout_while_wake() {
|
||||
let pair = Arc::new((Mutex::new(false), Condvar::new()));
|
||||
let pair_copy = pair.clone();
|
||||
|
||||
let &(ref m, ref c) = &*pair;
|
||||
let g = m.lock().unwrap();
|
||||
let _t = thread::spawn(move || {
|
||||
let &(ref lock, ref cvar) = &*pair_copy;
|
||||
let mut started = lock.lock().unwrap();
|
||||
thread::sleep(Duration::from_millis(1));
|
||||
*started = true;
|
||||
cvar.notify_one();
|
||||
});
|
||||
let (g2, wait) = c
|
||||
.wait_timeout_while(g, Duration::from_millis(u64::MAX), |&mut notified| !notified)
|
||||
.unwrap();
|
||||
// ensure it didn't time-out even if we were not given any time.
|
||||
assert!(!wait.timed_out());
|
||||
assert!(*g2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg_attr(target_os = "emscripten", ignore)]
|
||||
fn wait_timeout_wake() {
|
||||
let m = Arc::new(Mutex::new(()));
|
||||
let c = Arc::new(Condvar::new());
|
||||
|
||||
loop {
|
||||
let g = m.lock().unwrap();
|
||||
|
||||
let c2 = c.clone();
|
||||
let m2 = m.clone();
|
||||
|
||||
let notified = Arc::new(AtomicBool::new(false));
|
||||
let notified_copy = notified.clone();
|
||||
|
||||
let t = thread::spawn(move || {
|
||||
let _g = m2.lock().unwrap();
|
||||
thread::sleep(Duration::from_millis(1));
|
||||
notified_copy.store(true, Ordering::SeqCst);
|
||||
c2.notify_one();
|
||||
});
|
||||
let (g, timeout_res) = c.wait_timeout(g, Duration::from_millis(u64::MAX)).unwrap();
|
||||
assert!(!timeout_res.timed_out());
|
||||
// spurious wakeups mean this isn't necessarily true
|
||||
// so execute test again, if not notified
|
||||
if !notified.load(Ordering::SeqCst) {
|
||||
t.join().unwrap();
|
||||
continue;
|
||||
}
|
||||
drop(g);
|
||||
|
||||
t.join().unwrap();
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
#[cfg_attr(target_os = "emscripten", ignore)]
|
||||
fn two_mutexes() {
|
||||
let m = Arc::new(Mutex::new(()));
|
||||
let m2 = m.clone();
|
||||
let c = Arc::new(Condvar::new());
|
||||
let c2 = c.clone();
|
||||
|
||||
let mut g = m.lock().unwrap();
|
||||
let _t = thread::spawn(move || {
|
||||
let _g = m2.lock().unwrap();
|
||||
c2.notify_one();
|
||||
});
|
||||
g = c.wait(g).unwrap();
|
||||
drop(g);
|
||||
|
||||
let m = Mutex::new(());
|
||||
let _ = c.wait(m.lock().unwrap()).unwrap();
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@@ -11,6 +11,9 @@
|
||||
// http://www.1024cores.net/home/lock-free-algorithms
|
||||
// /queues/non-intrusive-mpsc-node-based-queue
|
||||
|
||||
#[cfg(all(test, not(target_os = "emscripten")))]
|
||||
mod tests;
|
||||
|
||||
pub use self::PopResult::*;
|
||||
|
||||
use core::cell::UnsafeCell;
|
||||
@@ -112,54 +115,3 @@ impl<T> Drop for Queue<T> {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(test, not(target_os = "emscripten")))]
|
||||
mod tests {
|
||||
use super::{Data, Empty, Inconsistent, Queue};
|
||||
use crate::sync::mpsc::channel;
|
||||
use crate::sync::Arc;
|
||||
use crate::thread;
|
||||
|
||||
#[test]
|
||||
fn test_full() {
|
||||
let q: Queue<Box<_>> = Queue::new();
|
||||
q.push(box 1);
|
||||
q.push(box 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test() {
|
||||
let nthreads = 8;
|
||||
let nmsgs = 1000;
|
||||
let q = Queue::new();
|
||||
match q.pop() {
|
||||
Empty => {}
|
||||
Inconsistent | Data(..) => panic!(),
|
||||
}
|
||||
let (tx, rx) = channel();
|
||||
let q = Arc::new(q);
|
||||
|
||||
for _ in 0..nthreads {
|
||||
let tx = tx.clone();
|
||||
let q = q.clone();
|
||||
thread::spawn(move || {
|
||||
for i in 0..nmsgs {
|
||||
q.push(i);
|
||||
}
|
||||
tx.send(()).unwrap();
|
||||
});
|
||||
}
|
||||
|
||||
let mut i = 0;
|
||||
while i < nthreads * nmsgs {
|
||||
match q.pop() {
|
||||
Empty | Inconsistent => {}
|
||||
Data(_) => i += 1,
|
||||
}
|
||||
}
|
||||
drop(tx);
|
||||
for _ in 0..nthreads {
|
||||
rx.recv().unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
47
library/std/src/sync/mpsc/mpsc_queue/tests.rs
Normal file
47
library/std/src/sync/mpsc/mpsc_queue/tests.rs
Normal file
@@ -0,0 +1,47 @@
|
||||
use super::{Data, Empty, Inconsistent, Queue};
|
||||
use crate::sync::mpsc::channel;
|
||||
use crate::sync::Arc;
|
||||
use crate::thread;
|
||||
|
||||
#[test]
|
||||
fn test_full() {
|
||||
let q: Queue<Box<_>> = Queue::new();
|
||||
q.push(box 1);
|
||||
q.push(box 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test() {
|
||||
let nthreads = 8;
|
||||
let nmsgs = 1000;
|
||||
let q = Queue::new();
|
||||
match q.pop() {
|
||||
Empty => {}
|
||||
Inconsistent | Data(..) => panic!(),
|
||||
}
|
||||
let (tx, rx) = channel();
|
||||
let q = Arc::new(q);
|
||||
|
||||
for _ in 0..nthreads {
|
||||
let tx = tx.clone();
|
||||
let q = q.clone();
|
||||
thread::spawn(move || {
|
||||
for i in 0..nmsgs {
|
||||
q.push(i);
|
||||
}
|
||||
tx.send(()).unwrap();
|
||||
});
|
||||
}
|
||||
|
||||
let mut i = 0;
|
||||
while i < nthreads * nmsgs {
|
||||
match q.pop() {
|
||||
Empty | Inconsistent => {}
|
||||
Data(_) => i += 1,
|
||||
}
|
||||
}
|
||||
drop(tx);
|
||||
for _ in 0..nthreads {
|
||||
rx.recv().unwrap();
|
||||
}
|
||||
}
|
||||
@@ -6,6 +6,9 @@
|
||||
|
||||
// http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue
|
||||
|
||||
#[cfg(all(test, not(target_os = "emscripten")))]
|
||||
mod tests;
|
||||
|
||||
use core::cell::UnsafeCell;
|
||||
use core::ptr;
|
||||
|
||||
@@ -231,108 +234,3 @@ impl<T, ProducerAddition, ConsumerAddition> Drop for Queue<T, ProducerAddition,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(test, not(target_os = "emscripten")))]
|
||||
mod tests {
|
||||
use super::Queue;
|
||||
use crate::sync::mpsc::channel;
|
||||
use crate::sync::Arc;
|
||||
use crate::thread;
|
||||
|
||||
#[test]
|
||||
fn smoke() {
|
||||
unsafe {
|
||||
let queue = Queue::with_additions(0, (), ());
|
||||
queue.push(1);
|
||||
queue.push(2);
|
||||
assert_eq!(queue.pop(), Some(1));
|
||||
assert_eq!(queue.pop(), Some(2));
|
||||
assert_eq!(queue.pop(), None);
|
||||
queue.push(3);
|
||||
queue.push(4);
|
||||
assert_eq!(queue.pop(), Some(3));
|
||||
assert_eq!(queue.pop(), Some(4));
|
||||
assert_eq!(queue.pop(), None);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn peek() {
|
||||
unsafe {
|
||||
let queue = Queue::with_additions(0, (), ());
|
||||
queue.push(vec![1]);
|
||||
|
||||
// Ensure the borrowchecker works
|
||||
match queue.peek() {
|
||||
Some(vec) => {
|
||||
assert_eq!(&*vec, &[1]);
|
||||
}
|
||||
None => unreachable!(),
|
||||
}
|
||||
|
||||
match queue.pop() {
|
||||
Some(vec) => {
|
||||
assert_eq!(&*vec, &[1]);
|
||||
}
|
||||
None => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn drop_full() {
|
||||
unsafe {
|
||||
let q: Queue<Box<_>> = Queue::with_additions(0, (), ());
|
||||
q.push(box 1);
|
||||
q.push(box 2);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn smoke_bound() {
|
||||
unsafe {
|
||||
let q = Queue::with_additions(0, (), ());
|
||||
q.push(1);
|
||||
q.push(2);
|
||||
assert_eq!(q.pop(), Some(1));
|
||||
assert_eq!(q.pop(), Some(2));
|
||||
assert_eq!(q.pop(), None);
|
||||
q.push(3);
|
||||
q.push(4);
|
||||
assert_eq!(q.pop(), Some(3));
|
||||
assert_eq!(q.pop(), Some(4));
|
||||
assert_eq!(q.pop(), None);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stress() {
|
||||
unsafe {
|
||||
stress_bound(0);
|
||||
stress_bound(1);
|
||||
}
|
||||
|
||||
unsafe fn stress_bound(bound: usize) {
|
||||
let q = Arc::new(Queue::with_additions(bound, (), ()));
|
||||
|
||||
let (tx, rx) = channel();
|
||||
let q2 = q.clone();
|
||||
let _t = thread::spawn(move || {
|
||||
for _ in 0..100000 {
|
||||
loop {
|
||||
match q2.pop() {
|
||||
Some(1) => break,
|
||||
Some(_) => panic!(),
|
||||
None => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
tx.send(()).unwrap();
|
||||
});
|
||||
for _ in 0..100000 {
|
||||
q.push(1);
|
||||
}
|
||||
rx.recv().unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
101
library/std/src/sync/mpsc/spsc_queue/tests.rs
Normal file
101
library/std/src/sync/mpsc/spsc_queue/tests.rs
Normal file
@@ -0,0 +1,101 @@
|
||||
use super::Queue;
|
||||
use crate::sync::mpsc::channel;
|
||||
use crate::sync::Arc;
|
||||
use crate::thread;
|
||||
|
||||
#[test]
|
||||
fn smoke() {
|
||||
unsafe {
|
||||
let queue = Queue::with_additions(0, (), ());
|
||||
queue.push(1);
|
||||
queue.push(2);
|
||||
assert_eq!(queue.pop(), Some(1));
|
||||
assert_eq!(queue.pop(), Some(2));
|
||||
assert_eq!(queue.pop(), None);
|
||||
queue.push(3);
|
||||
queue.push(4);
|
||||
assert_eq!(queue.pop(), Some(3));
|
||||
assert_eq!(queue.pop(), Some(4));
|
||||
assert_eq!(queue.pop(), None);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn peek() {
|
||||
unsafe {
|
||||
let queue = Queue::with_additions(0, (), ());
|
||||
queue.push(vec![1]);
|
||||
|
||||
// Ensure the borrowchecker works
|
||||
match queue.peek() {
|
||||
Some(vec) => {
|
||||
assert_eq!(&*vec, &[1]);
|
||||
}
|
||||
None => unreachable!(),
|
||||
}
|
||||
|
||||
match queue.pop() {
|
||||
Some(vec) => {
|
||||
assert_eq!(&*vec, &[1]);
|
||||
}
|
||||
None => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn drop_full() {
|
||||
unsafe {
|
||||
let q: Queue<Box<_>> = Queue::with_additions(0, (), ());
|
||||
q.push(box 1);
|
||||
q.push(box 2);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn smoke_bound() {
|
||||
unsafe {
|
||||
let q = Queue::with_additions(0, (), ());
|
||||
q.push(1);
|
||||
q.push(2);
|
||||
assert_eq!(q.pop(), Some(1));
|
||||
assert_eq!(q.pop(), Some(2));
|
||||
assert_eq!(q.pop(), None);
|
||||
q.push(3);
|
||||
q.push(4);
|
||||
assert_eq!(q.pop(), Some(3));
|
||||
assert_eq!(q.pop(), Some(4));
|
||||
assert_eq!(q.pop(), None);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stress() {
|
||||
unsafe {
|
||||
stress_bound(0);
|
||||
stress_bound(1);
|
||||
}
|
||||
|
||||
unsafe fn stress_bound(bound: usize) {
|
||||
let q = Arc::new(Queue::with_additions(bound, (), ()));
|
||||
|
||||
let (tx, rx) = channel();
|
||||
let q2 = q.clone();
|
||||
let _t = thread::spawn(move || {
|
||||
for _ in 0..100000 {
|
||||
loop {
|
||||
match q2.pop() {
|
||||
Some(1) => break,
|
||||
Some(_) => panic!(),
|
||||
None => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
tx.send(()).unwrap();
|
||||
});
|
||||
for _ in 0..100000 {
|
||||
q.push(1);
|
||||
}
|
||||
rx.recv().unwrap();
|
||||
}
|
||||
}
|
||||
647
library/std/src/sync/mpsc/sync_tests.rs
Normal file
647
library/std/src/sync/mpsc/sync_tests.rs
Normal file
@@ -0,0 +1,647 @@
|
||||
use super::*;
|
||||
use crate::env;
|
||||
use crate::thread;
|
||||
use crate::time::Duration;
|
||||
|
||||
pub fn stress_factor() -> usize {
|
||||
match env::var("RUST_TEST_STRESS") {
|
||||
Ok(val) => val.parse().unwrap(),
|
||||
Err(..) => 1,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn smoke() {
|
||||
let (tx, rx) = sync_channel::<i32>(1);
|
||||
tx.send(1).unwrap();
|
||||
assert_eq!(rx.recv().unwrap(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn drop_full() {
|
||||
let (tx, _rx) = sync_channel::<Box<isize>>(1);
|
||||
tx.send(box 1).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn smoke_shared() {
|
||||
let (tx, rx) = sync_channel::<i32>(1);
|
||||
tx.send(1).unwrap();
|
||||
assert_eq!(rx.recv().unwrap(), 1);
|
||||
let tx = tx.clone();
|
||||
tx.send(1).unwrap();
|
||||
assert_eq!(rx.recv().unwrap(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn recv_timeout() {
|
||||
let (tx, rx) = sync_channel::<i32>(1);
|
||||
assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
|
||||
tx.send(1).unwrap();
|
||||
assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn smoke_threads() {
|
||||
let (tx, rx) = sync_channel::<i32>(0);
|
||||
let _t = thread::spawn(move || {
|
||||
tx.send(1).unwrap();
|
||||
});
|
||||
assert_eq!(rx.recv().unwrap(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn smoke_port_gone() {
|
||||
let (tx, rx) = sync_channel::<i32>(0);
|
||||
drop(rx);
|
||||
assert!(tx.send(1).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn smoke_shared_port_gone2() {
|
||||
let (tx, rx) = sync_channel::<i32>(0);
|
||||
drop(rx);
|
||||
let tx2 = tx.clone();
|
||||
drop(tx);
|
||||
assert!(tx2.send(1).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn port_gone_concurrent() {
|
||||
let (tx, rx) = sync_channel::<i32>(0);
|
||||
let _t = thread::spawn(move || {
|
||||
rx.recv().unwrap();
|
||||
});
|
||||
while tx.send(1).is_ok() {}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn port_gone_concurrent_shared() {
|
||||
let (tx, rx) = sync_channel::<i32>(0);
|
||||
let tx2 = tx.clone();
|
||||
let _t = thread::spawn(move || {
|
||||
rx.recv().unwrap();
|
||||
});
|
||||
while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn smoke_chan_gone() {
|
||||
let (tx, rx) = sync_channel::<i32>(0);
|
||||
drop(tx);
|
||||
assert!(rx.recv().is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn smoke_chan_gone_shared() {
|
||||
let (tx, rx) = sync_channel::<()>(0);
|
||||
let tx2 = tx.clone();
|
||||
drop(tx);
|
||||
drop(tx2);
|
||||
assert!(rx.recv().is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn chan_gone_concurrent() {
|
||||
let (tx, rx) = sync_channel::<i32>(0);
|
||||
thread::spawn(move || {
|
||||
tx.send(1).unwrap();
|
||||
tx.send(1).unwrap();
|
||||
});
|
||||
while rx.recv().is_ok() {}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stress() {
|
||||
let (tx, rx) = sync_channel::<i32>(0);
|
||||
thread::spawn(move || {
|
||||
for _ in 0..10000 {
|
||||
tx.send(1).unwrap();
|
||||
}
|
||||
});
|
||||
for _ in 0..10000 {
|
||||
assert_eq!(rx.recv().unwrap(), 1);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stress_recv_timeout_two_threads() {
|
||||
let (tx, rx) = sync_channel::<i32>(0);
|
||||
|
||||
thread::spawn(move || {
|
||||
for _ in 0..10000 {
|
||||
tx.send(1).unwrap();
|
||||
}
|
||||
});
|
||||
|
||||
let mut recv_count = 0;
|
||||
loop {
|
||||
match rx.recv_timeout(Duration::from_millis(1)) {
|
||||
Ok(v) => {
|
||||
assert_eq!(v, 1);
|
||||
recv_count += 1;
|
||||
}
|
||||
Err(RecvTimeoutError::Timeout) => continue,
|
||||
Err(RecvTimeoutError::Disconnected) => break,
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(recv_count, 10000);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stress_recv_timeout_shared() {
|
||||
const AMT: u32 = 1000;
|
||||
const NTHREADS: u32 = 8;
|
||||
let (tx, rx) = sync_channel::<i32>(0);
|
||||
let (dtx, drx) = sync_channel::<()>(0);
|
||||
|
||||
thread::spawn(move || {
|
||||
let mut recv_count = 0;
|
||||
loop {
|
||||
match rx.recv_timeout(Duration::from_millis(10)) {
|
||||
Ok(v) => {
|
||||
assert_eq!(v, 1);
|
||||
recv_count += 1;
|
||||
}
|
||||
Err(RecvTimeoutError::Timeout) => continue,
|
||||
Err(RecvTimeoutError::Disconnected) => break,
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(recv_count, AMT * NTHREADS);
|
||||
assert!(rx.try_recv().is_err());
|
||||
|
||||
dtx.send(()).unwrap();
|
||||
});
|
||||
|
||||
for _ in 0..NTHREADS {
|
||||
let tx = tx.clone();
|
||||
thread::spawn(move || {
|
||||
for _ in 0..AMT {
|
||||
tx.send(1).unwrap();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
drop(tx);
|
||||
|
||||
drx.recv().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stress_shared() {
|
||||
const AMT: u32 = 1000;
|
||||
const NTHREADS: u32 = 8;
|
||||
let (tx, rx) = sync_channel::<i32>(0);
|
||||
let (dtx, drx) = sync_channel::<()>(0);
|
||||
|
||||
thread::spawn(move || {
|
||||
for _ in 0..AMT * NTHREADS {
|
||||
assert_eq!(rx.recv().unwrap(), 1);
|
||||
}
|
||||
match rx.try_recv() {
|
||||
Ok(..) => panic!(),
|
||||
_ => {}
|
||||
}
|
||||
dtx.send(()).unwrap();
|
||||
});
|
||||
|
||||
for _ in 0..NTHREADS {
|
||||
let tx = tx.clone();
|
||||
thread::spawn(move || {
|
||||
for _ in 0..AMT {
|
||||
tx.send(1).unwrap();
|
||||
}
|
||||
});
|
||||
}
|
||||
drop(tx);
|
||||
drx.recv().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn oneshot_single_thread_close_port_first() {
|
||||
// Simple test of closing without sending
|
||||
let (_tx, rx) = sync_channel::<i32>(0);
|
||||
drop(rx);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn oneshot_single_thread_close_chan_first() {
|
||||
// Simple test of closing without sending
|
||||
let (tx, _rx) = sync_channel::<i32>(0);
|
||||
drop(tx);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn oneshot_single_thread_send_port_close() {
|
||||
// Testing that the sender cleans up the payload if receiver is closed
|
||||
let (tx, rx) = sync_channel::<Box<i32>>(0);
|
||||
drop(rx);
|
||||
assert!(tx.send(box 0).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn oneshot_single_thread_recv_chan_close() {
|
||||
// Receiving on a closed chan will panic
|
||||
let res = thread::spawn(move || {
|
||||
let (tx, rx) = sync_channel::<i32>(0);
|
||||
drop(tx);
|
||||
rx.recv().unwrap();
|
||||
})
|
||||
.join();
|
||||
// What is our res?
|
||||
assert!(res.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn oneshot_single_thread_send_then_recv() {
|
||||
let (tx, rx) = sync_channel::<Box<i32>>(1);
|
||||
tx.send(box 10).unwrap();
|
||||
assert!(*rx.recv().unwrap() == 10);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn oneshot_single_thread_try_send_open() {
|
||||
let (tx, rx) = sync_channel::<i32>(1);
|
||||
assert_eq!(tx.try_send(10), Ok(()));
|
||||
assert!(rx.recv().unwrap() == 10);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn oneshot_single_thread_try_send_closed() {
|
||||
let (tx, rx) = sync_channel::<i32>(0);
|
||||
drop(rx);
|
||||
assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn oneshot_single_thread_try_send_closed2() {
|
||||
let (tx, _rx) = sync_channel::<i32>(0);
|
||||
assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn oneshot_single_thread_try_recv_open() {
|
||||
let (tx, rx) = sync_channel::<i32>(1);
|
||||
tx.send(10).unwrap();
|
||||
assert!(rx.recv() == Ok(10));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn oneshot_single_thread_try_recv_closed() {
|
||||
let (tx, rx) = sync_channel::<i32>(0);
|
||||
drop(tx);
|
||||
assert!(rx.recv().is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn oneshot_single_thread_try_recv_closed_with_data() {
|
||||
let (tx, rx) = sync_channel::<i32>(1);
|
||||
tx.send(10).unwrap();
|
||||
drop(tx);
|
||||
assert_eq!(rx.try_recv(), Ok(10));
|
||||
assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn oneshot_single_thread_peek_data() {
|
||||
let (tx, rx) = sync_channel::<i32>(1);
|
||||
assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
|
||||
tx.send(10).unwrap();
|
||||
assert_eq!(rx.try_recv(), Ok(10));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn oneshot_single_thread_peek_close() {
|
||||
let (tx, rx) = sync_channel::<i32>(0);
|
||||
drop(tx);
|
||||
assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
|
||||
assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn oneshot_single_thread_peek_open() {
|
||||
let (_tx, rx) = sync_channel::<i32>(0);
|
||||
assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn oneshot_multi_task_recv_then_send() {
|
||||
let (tx, rx) = sync_channel::<Box<i32>>(0);
|
||||
let _t = thread::spawn(move || {
|
||||
assert!(*rx.recv().unwrap() == 10);
|
||||
});
|
||||
|
||||
tx.send(box 10).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn oneshot_multi_task_recv_then_close() {
|
||||
let (tx, rx) = sync_channel::<Box<i32>>(0);
|
||||
let _t = thread::spawn(move || {
|
||||
drop(tx);
|
||||
});
|
||||
let res = thread::spawn(move || {
|
||||
assert!(*rx.recv().unwrap() == 10);
|
||||
})
|
||||
.join();
|
||||
assert!(res.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn oneshot_multi_thread_close_stress() {
|
||||
for _ in 0..stress_factor() {
|
||||
let (tx, rx) = sync_channel::<i32>(0);
|
||||
let _t = thread::spawn(move || {
|
||||
drop(rx);
|
||||
});
|
||||
drop(tx);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn oneshot_multi_thread_send_close_stress() {
|
||||
for _ in 0..stress_factor() {
|
||||
let (tx, rx) = sync_channel::<i32>(0);
|
||||
let _t = thread::spawn(move || {
|
||||
drop(rx);
|
||||
});
|
||||
let _ = thread::spawn(move || {
|
||||
tx.send(1).unwrap();
|
||||
})
|
||||
.join();
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn oneshot_multi_thread_recv_close_stress() {
|
||||
for _ in 0..stress_factor() {
|
||||
let (tx, rx) = sync_channel::<i32>(0);
|
||||
let _t = thread::spawn(move || {
|
||||
let res = thread::spawn(move || {
|
||||
rx.recv().unwrap();
|
||||
})
|
||||
.join();
|
||||
assert!(res.is_err());
|
||||
});
|
||||
let _t = thread::spawn(move || {
|
||||
thread::spawn(move || {
|
||||
drop(tx);
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn oneshot_multi_thread_send_recv_stress() {
|
||||
for _ in 0..stress_factor() {
|
||||
let (tx, rx) = sync_channel::<Box<i32>>(0);
|
||||
let _t = thread::spawn(move || {
|
||||
tx.send(box 10).unwrap();
|
||||
});
|
||||
assert!(*rx.recv().unwrap() == 10);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stream_send_recv_stress() {
|
||||
for _ in 0..stress_factor() {
|
||||
let (tx, rx) = sync_channel::<Box<i32>>(0);
|
||||
|
||||
send(tx, 0);
|
||||
recv(rx, 0);
|
||||
|
||||
fn send(tx: SyncSender<Box<i32>>, i: i32) {
|
||||
if i == 10 {
|
||||
return;
|
||||
}
|
||||
|
||||
thread::spawn(move || {
|
||||
tx.send(box i).unwrap();
|
||||
send(tx, i + 1);
|
||||
});
|
||||
}
|
||||
|
||||
fn recv(rx: Receiver<Box<i32>>, i: i32) {
|
||||
if i == 10 {
|
||||
return;
|
||||
}
|
||||
|
||||
thread::spawn(move || {
|
||||
assert!(*rx.recv().unwrap() == i);
|
||||
recv(rx, i + 1);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn recv_a_lot() {
|
||||
// Regression test that we don't run out of stack in scheduler context
|
||||
let (tx, rx) = sync_channel(10000);
|
||||
for _ in 0..10000 {
|
||||
tx.send(()).unwrap();
|
||||
}
|
||||
for _ in 0..10000 {
|
||||
rx.recv().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn shared_chan_stress() {
|
||||
let (tx, rx) = sync_channel(0);
|
||||
let total = stress_factor() + 100;
|
||||
for _ in 0..total {
|
||||
let tx = tx.clone();
|
||||
thread::spawn(move || {
|
||||
tx.send(()).unwrap();
|
||||
});
|
||||
}
|
||||
|
||||
for _ in 0..total {
|
||||
rx.recv().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_nested_recv_iter() {
|
||||
let (tx, rx) = sync_channel::<i32>(0);
|
||||
let (total_tx, total_rx) = sync_channel::<i32>(0);
|
||||
|
||||
let _t = thread::spawn(move || {
|
||||
let mut acc = 0;
|
||||
for x in rx.iter() {
|
||||
acc += x;
|
||||
}
|
||||
total_tx.send(acc).unwrap();
|
||||
});
|
||||
|
||||
tx.send(3).unwrap();
|
||||
tx.send(1).unwrap();
|
||||
tx.send(2).unwrap();
|
||||
drop(tx);
|
||||
assert_eq!(total_rx.recv().unwrap(), 6);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_recv_iter_break() {
|
||||
let (tx, rx) = sync_channel::<i32>(0);
|
||||
let (count_tx, count_rx) = sync_channel(0);
|
||||
|
||||
let _t = thread::spawn(move || {
|
||||
let mut count = 0;
|
||||
for x in rx.iter() {
|
||||
if count >= 3 {
|
||||
break;
|
||||
} else {
|
||||
count += x;
|
||||
}
|
||||
}
|
||||
count_tx.send(count).unwrap();
|
||||
});
|
||||
|
||||
tx.send(2).unwrap();
|
||||
tx.send(2).unwrap();
|
||||
tx.send(2).unwrap();
|
||||
let _ = tx.try_send(2);
|
||||
drop(tx);
|
||||
assert_eq!(count_rx.recv().unwrap(), 4);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn try_recv_states() {
|
||||
let (tx1, rx1) = sync_channel::<i32>(1);
|
||||
let (tx2, rx2) = sync_channel::<()>(1);
|
||||
let (tx3, rx3) = sync_channel::<()>(1);
|
||||
let _t = thread::spawn(move || {
|
||||
rx2.recv().unwrap();
|
||||
tx1.send(1).unwrap();
|
||||
tx3.send(()).unwrap();
|
||||
rx2.recv().unwrap();
|
||||
drop(tx1);
|
||||
tx3.send(()).unwrap();
|
||||
});
|
||||
|
||||
assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
|
||||
tx2.send(()).unwrap();
|
||||
rx3.recv().unwrap();
|
||||
assert_eq!(rx1.try_recv(), Ok(1));
|
||||
assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
|
||||
tx2.send(()).unwrap();
|
||||
rx3.recv().unwrap();
|
||||
assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
|
||||
}
|
||||
|
||||
// This bug used to end up in a livelock inside of the Receiver destructor
|
||||
// because the internal state of the Shared packet was corrupted
|
||||
#[test]
|
||||
fn destroy_upgraded_shared_port_when_sender_still_active() {
|
||||
let (tx, rx) = sync_channel::<()>(0);
|
||||
let (tx2, rx2) = sync_channel::<()>(0);
|
||||
let _t = thread::spawn(move || {
|
||||
rx.recv().unwrap(); // wait on a oneshot
|
||||
drop(rx); // destroy a shared
|
||||
tx2.send(()).unwrap();
|
||||
});
|
||||
// make sure the other thread has gone to sleep
|
||||
for _ in 0..5000 {
|
||||
thread::yield_now();
|
||||
}
|
||||
|
||||
// upgrade to a shared chan and send a message
|
||||
let t = tx.clone();
|
||||
drop(tx);
|
||||
t.send(()).unwrap();
|
||||
|
||||
// wait for the child thread to exit before we exit
|
||||
rx2.recv().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn send1() {
|
||||
let (tx, rx) = sync_channel::<i32>(0);
|
||||
let _t = thread::spawn(move || {
|
||||
rx.recv().unwrap();
|
||||
});
|
||||
assert_eq!(tx.send(1), Ok(()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn send2() {
|
||||
let (tx, rx) = sync_channel::<i32>(0);
|
||||
let _t = thread::spawn(move || {
|
||||
drop(rx);
|
||||
});
|
||||
assert!(tx.send(1).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn send3() {
|
||||
let (tx, rx) = sync_channel::<i32>(1);
|
||||
assert_eq!(tx.send(1), Ok(()));
|
||||
let _t = thread::spawn(move || {
|
||||
drop(rx);
|
||||
});
|
||||
assert!(tx.send(1).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn send4() {
|
||||
let (tx, rx) = sync_channel::<i32>(0);
|
||||
let tx2 = tx.clone();
|
||||
let (done, donerx) = channel();
|
||||
let done2 = done.clone();
|
||||
let _t = thread::spawn(move || {
|
||||
assert!(tx.send(1).is_err());
|
||||
done.send(()).unwrap();
|
||||
});
|
||||
let _t = thread::spawn(move || {
|
||||
assert!(tx2.send(2).is_err());
|
||||
done2.send(()).unwrap();
|
||||
});
|
||||
drop(rx);
|
||||
donerx.recv().unwrap();
|
||||
donerx.recv().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn try_send1() {
|
||||
let (tx, _rx) = sync_channel::<i32>(0);
|
||||
assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn try_send2() {
|
||||
let (tx, _rx) = sync_channel::<i32>(1);
|
||||
assert_eq!(tx.try_send(1), Ok(()));
|
||||
assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn try_send3() {
|
||||
let (tx, rx) = sync_channel::<i32>(1);
|
||||
assert_eq!(tx.try_send(1), Ok(()));
|
||||
drop(rx);
|
||||
assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn issue_15761() {
|
||||
fn repro() {
|
||||
let (tx1, rx1) = sync_channel::<()>(3);
|
||||
let (tx2, rx2) = sync_channel::<()>(3);
|
||||
|
||||
let _t = thread::spawn(move || {
|
||||
rx1.recv().unwrap();
|
||||
tx2.try_send(()).unwrap();
|
||||
});
|
||||
|
||||
tx1.try_send(()).unwrap();
|
||||
rx2.recv().unwrap();
|
||||
}
|
||||
|
||||
for _ in 0..100 {
|
||||
repro()
|
||||
}
|
||||
}
|
||||
706
library/std/src/sync/mpsc/tests.rs
Normal file
706
library/std/src/sync/mpsc/tests.rs
Normal file
@@ -0,0 +1,706 @@
|
||||
use super::*;
|
||||
use crate::env;
|
||||
use crate::thread;
|
||||
use crate::time::{Duration, Instant};
|
||||
|
||||
pub fn stress_factor() -> usize {
|
||||
match env::var("RUST_TEST_STRESS") {
|
||||
Ok(val) => val.parse().unwrap(),
|
||||
Err(..) => 1,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn smoke() {
|
||||
let (tx, rx) = channel::<i32>();
|
||||
tx.send(1).unwrap();
|
||||
assert_eq!(rx.recv().unwrap(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn drop_full() {
|
||||
let (tx, _rx) = channel::<Box<isize>>();
|
||||
tx.send(box 1).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn drop_full_shared() {
|
||||
let (tx, _rx) = channel::<Box<isize>>();
|
||||
drop(tx.clone());
|
||||
drop(tx.clone());
|
||||
tx.send(box 1).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn smoke_shared() {
|
||||
let (tx, rx) = channel::<i32>();
|
||||
tx.send(1).unwrap();
|
||||
assert_eq!(rx.recv().unwrap(), 1);
|
||||
let tx = tx.clone();
|
||||
tx.send(1).unwrap();
|
||||
assert_eq!(rx.recv().unwrap(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn smoke_threads() {
|
||||
let (tx, rx) = channel::<i32>();
|
||||
let _t = thread::spawn(move || {
|
||||
tx.send(1).unwrap();
|
||||
});
|
||||
assert_eq!(rx.recv().unwrap(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn smoke_port_gone() {
|
||||
let (tx, rx) = channel::<i32>();
|
||||
drop(rx);
|
||||
assert!(tx.send(1).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn smoke_shared_port_gone() {
|
||||
let (tx, rx) = channel::<i32>();
|
||||
drop(rx);
|
||||
assert!(tx.send(1).is_err())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn smoke_shared_port_gone2() {
|
||||
let (tx, rx) = channel::<i32>();
|
||||
drop(rx);
|
||||
let tx2 = tx.clone();
|
||||
drop(tx);
|
||||
assert!(tx2.send(1).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn port_gone_concurrent() {
|
||||
let (tx, rx) = channel::<i32>();
|
||||
let _t = thread::spawn(move || {
|
||||
rx.recv().unwrap();
|
||||
});
|
||||
while tx.send(1).is_ok() {}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn port_gone_concurrent_shared() {
|
||||
let (tx, rx) = channel::<i32>();
|
||||
let tx2 = tx.clone();
|
||||
let _t = thread::spawn(move || {
|
||||
rx.recv().unwrap();
|
||||
});
|
||||
while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn smoke_chan_gone() {
|
||||
let (tx, rx) = channel::<i32>();
|
||||
drop(tx);
|
||||
assert!(rx.recv().is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn smoke_chan_gone_shared() {
|
||||
let (tx, rx) = channel::<()>();
|
||||
let tx2 = tx.clone();
|
||||
drop(tx);
|
||||
drop(tx2);
|
||||
assert!(rx.recv().is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn chan_gone_concurrent() {
|
||||
let (tx, rx) = channel::<i32>();
|
||||
let _t = thread::spawn(move || {
|
||||
tx.send(1).unwrap();
|
||||
tx.send(1).unwrap();
|
||||
});
|
||||
while rx.recv().is_ok() {}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stress() {
|
||||
let (tx, rx) = channel::<i32>();
|
||||
let t = thread::spawn(move || {
|
||||
for _ in 0..10000 {
|
||||
tx.send(1).unwrap();
|
||||
}
|
||||
});
|
||||
for _ in 0..10000 {
|
||||
assert_eq!(rx.recv().unwrap(), 1);
|
||||
}
|
||||
t.join().ok().expect("thread panicked");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stress_shared() {
|
||||
const AMT: u32 = 10000;
|
||||
const NTHREADS: u32 = 8;
|
||||
let (tx, rx) = channel::<i32>();
|
||||
|
||||
let t = thread::spawn(move || {
|
||||
for _ in 0..AMT * NTHREADS {
|
||||
assert_eq!(rx.recv().unwrap(), 1);
|
||||
}
|
||||
match rx.try_recv() {
|
||||
Ok(..) => panic!(),
|
||||
_ => {}
|
||||
}
|
||||
});
|
||||
|
||||
for _ in 0..NTHREADS {
|
||||
let tx = tx.clone();
|
||||
thread::spawn(move || {
|
||||
for _ in 0..AMT {
|
||||
tx.send(1).unwrap();
|
||||
}
|
||||
});
|
||||
}
|
||||
drop(tx);
|
||||
t.join().ok().expect("thread panicked");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn send_from_outside_runtime() {
|
||||
let (tx1, rx1) = channel::<()>();
|
||||
let (tx2, rx2) = channel::<i32>();
|
||||
let t1 = thread::spawn(move || {
|
||||
tx1.send(()).unwrap();
|
||||
for _ in 0..40 {
|
||||
assert_eq!(rx2.recv().unwrap(), 1);
|
||||
}
|
||||
});
|
||||
rx1.recv().unwrap();
|
||||
let t2 = thread::spawn(move || {
|
||||
for _ in 0..40 {
|
||||
tx2.send(1).unwrap();
|
||||
}
|
||||
});
|
||||
t1.join().ok().expect("thread panicked");
|
||||
t2.join().ok().expect("thread panicked");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn recv_from_outside_runtime() {
|
||||
let (tx, rx) = channel::<i32>();
|
||||
let t = thread::spawn(move || {
|
||||
for _ in 0..40 {
|
||||
assert_eq!(rx.recv().unwrap(), 1);
|
||||
}
|
||||
});
|
||||
for _ in 0..40 {
|
||||
tx.send(1).unwrap();
|
||||
}
|
||||
t.join().ok().expect("thread panicked");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn no_runtime() {
|
||||
let (tx1, rx1) = channel::<i32>();
|
||||
let (tx2, rx2) = channel::<i32>();
|
||||
let t1 = thread::spawn(move || {
|
||||
assert_eq!(rx1.recv().unwrap(), 1);
|
||||
tx2.send(2).unwrap();
|
||||
});
|
||||
let t2 = thread::spawn(move || {
|
||||
tx1.send(1).unwrap();
|
||||
assert_eq!(rx2.recv().unwrap(), 2);
|
||||
});
|
||||
t1.join().ok().expect("thread panicked");
|
||||
t2.join().ok().expect("thread panicked");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn oneshot_single_thread_close_port_first() {
|
||||
// Simple test of closing without sending
|
||||
let (_tx, rx) = channel::<i32>();
|
||||
drop(rx);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn oneshot_single_thread_close_chan_first() {
|
||||
// Simple test of closing without sending
|
||||
let (tx, _rx) = channel::<i32>();
|
||||
drop(tx);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn oneshot_single_thread_send_port_close() {
|
||||
// Testing that the sender cleans up the payload if receiver is closed
|
||||
let (tx, rx) = channel::<Box<i32>>();
|
||||
drop(rx);
|
||||
assert!(tx.send(box 0).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn oneshot_single_thread_recv_chan_close() {
|
||||
// Receiving on a closed chan will panic
|
||||
let res = thread::spawn(move || {
|
||||
let (tx, rx) = channel::<i32>();
|
||||
drop(tx);
|
||||
rx.recv().unwrap();
|
||||
})
|
||||
.join();
|
||||
// What is our res?
|
||||
assert!(res.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn oneshot_single_thread_send_then_recv() {
|
||||
let (tx, rx) = channel::<Box<i32>>();
|
||||
tx.send(box 10).unwrap();
|
||||
assert!(*rx.recv().unwrap() == 10);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn oneshot_single_thread_try_send_open() {
|
||||
let (tx, rx) = channel::<i32>();
|
||||
assert!(tx.send(10).is_ok());
|
||||
assert!(rx.recv().unwrap() == 10);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn oneshot_single_thread_try_send_closed() {
|
||||
let (tx, rx) = channel::<i32>();
|
||||
drop(rx);
|
||||
assert!(tx.send(10).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn oneshot_single_thread_try_recv_open() {
|
||||
let (tx, rx) = channel::<i32>();
|
||||
tx.send(10).unwrap();
|
||||
assert!(rx.recv() == Ok(10));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn oneshot_single_thread_try_recv_closed() {
|
||||
let (tx, rx) = channel::<i32>();
|
||||
drop(tx);
|
||||
assert!(rx.recv().is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn oneshot_single_thread_peek_data() {
|
||||
let (tx, rx) = channel::<i32>();
|
||||
assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
|
||||
tx.send(10).unwrap();
|
||||
assert_eq!(rx.try_recv(), Ok(10));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn oneshot_single_thread_peek_close() {
|
||||
let (tx, rx) = channel::<i32>();
|
||||
drop(tx);
|
||||
assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
|
||||
assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn oneshot_single_thread_peek_open() {
|
||||
let (_tx, rx) = channel::<i32>();
|
||||
assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn oneshot_multi_task_recv_then_send() {
|
||||
let (tx, rx) = channel::<Box<i32>>();
|
||||
let _t = thread::spawn(move || {
|
||||
assert!(*rx.recv().unwrap() == 10);
|
||||
});
|
||||
|
||||
tx.send(box 10).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn oneshot_multi_task_recv_then_close() {
|
||||
let (tx, rx) = channel::<Box<i32>>();
|
||||
let _t = thread::spawn(move || {
|
||||
drop(tx);
|
||||
});
|
||||
let res = thread::spawn(move || {
|
||||
assert!(*rx.recv().unwrap() == 10);
|
||||
})
|
||||
.join();
|
||||
assert!(res.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn oneshot_multi_thread_close_stress() {
|
||||
for _ in 0..stress_factor() {
|
||||
let (tx, rx) = channel::<i32>();
|
||||
let _t = thread::spawn(move || {
|
||||
drop(rx);
|
||||
});
|
||||
drop(tx);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn oneshot_multi_thread_send_close_stress() {
|
||||
for _ in 0..stress_factor() {
|
||||
let (tx, rx) = channel::<i32>();
|
||||
let _t = thread::spawn(move || {
|
||||
drop(rx);
|
||||
});
|
||||
let _ = thread::spawn(move || {
|
||||
tx.send(1).unwrap();
|
||||
})
|
||||
.join();
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn oneshot_multi_thread_recv_close_stress() {
|
||||
for _ in 0..stress_factor() {
|
||||
let (tx, rx) = channel::<i32>();
|
||||
thread::spawn(move || {
|
||||
let res = thread::spawn(move || {
|
||||
rx.recv().unwrap();
|
||||
})
|
||||
.join();
|
||||
assert!(res.is_err());
|
||||
});
|
||||
let _t = thread::spawn(move || {
|
||||
thread::spawn(move || {
|
||||
drop(tx);
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn oneshot_multi_thread_send_recv_stress() {
|
||||
for _ in 0..stress_factor() {
|
||||
let (tx, rx) = channel::<Box<isize>>();
|
||||
let _t = thread::spawn(move || {
|
||||
tx.send(box 10).unwrap();
|
||||
});
|
||||
assert!(*rx.recv().unwrap() == 10);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stream_send_recv_stress() {
|
||||
for _ in 0..stress_factor() {
|
||||
let (tx, rx) = channel();
|
||||
|
||||
send(tx, 0);
|
||||
recv(rx, 0);
|
||||
|
||||
fn send(tx: Sender<Box<i32>>, i: i32) {
|
||||
if i == 10 {
|
||||
return;
|
||||
}
|
||||
|
||||
thread::spawn(move || {
|
||||
tx.send(box i).unwrap();
|
||||
send(tx, i + 1);
|
||||
});
|
||||
}
|
||||
|
||||
fn recv(rx: Receiver<Box<i32>>, i: i32) {
|
||||
if i == 10 {
|
||||
return;
|
||||
}
|
||||
|
||||
thread::spawn(move || {
|
||||
assert!(*rx.recv().unwrap() == i);
|
||||
recv(rx, i + 1);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn oneshot_single_thread_recv_timeout() {
|
||||
let (tx, rx) = channel();
|
||||
tx.send(()).unwrap();
|
||||
assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
|
||||
assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
|
||||
tx.send(()).unwrap();
|
||||
assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stress_recv_timeout_two_threads() {
|
||||
let (tx, rx) = channel();
|
||||
let stress = stress_factor() + 100;
|
||||
let timeout = Duration::from_millis(100);
|
||||
|
||||
thread::spawn(move || {
|
||||
for i in 0..stress {
|
||||
if i % 2 == 0 {
|
||||
thread::sleep(timeout * 2);
|
||||
}
|
||||
tx.send(1usize).unwrap();
|
||||
}
|
||||
});
|
||||
|
||||
let mut recv_count = 0;
|
||||
loop {
|
||||
match rx.recv_timeout(timeout) {
|
||||
Ok(n) => {
|
||||
assert_eq!(n, 1usize);
|
||||
recv_count += 1;
|
||||
}
|
||||
Err(RecvTimeoutError::Timeout) => continue,
|
||||
Err(RecvTimeoutError::Disconnected) => break,
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(recv_count, stress);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn recv_timeout_upgrade() {
|
||||
let (tx, rx) = channel::<()>();
|
||||
let timeout = Duration::from_millis(1);
|
||||
let _tx_clone = tx.clone();
|
||||
|
||||
let start = Instant::now();
|
||||
assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
|
||||
assert!(Instant::now() >= start + timeout);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stress_recv_timeout_shared() {
|
||||
let (tx, rx) = channel();
|
||||
let stress = stress_factor() + 100;
|
||||
|
||||
for i in 0..stress {
|
||||
let tx = tx.clone();
|
||||
thread::spawn(move || {
|
||||
thread::sleep(Duration::from_millis(i as u64 * 10));
|
||||
tx.send(1usize).unwrap();
|
||||
});
|
||||
}
|
||||
|
||||
drop(tx);
|
||||
|
||||
let mut recv_count = 0;
|
||||
loop {
|
||||
match rx.recv_timeout(Duration::from_millis(10)) {
|
||||
Ok(n) => {
|
||||
assert_eq!(n, 1usize);
|
||||
recv_count += 1;
|
||||
}
|
||||
Err(RecvTimeoutError::Timeout) => continue,
|
||||
Err(RecvTimeoutError::Disconnected) => break,
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(recv_count, stress);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn very_long_recv_timeout_wont_panic() {
|
||||
let (tx, rx) = channel::<()>();
|
||||
let join_handle = thread::spawn(move || rx.recv_timeout(Duration::from_secs(u64::MAX)));
|
||||
thread::sleep(Duration::from_secs(1));
|
||||
assert!(tx.send(()).is_ok());
|
||||
assert_eq!(join_handle.join().unwrap(), Ok(()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn recv_a_lot() {
|
||||
// Regression test that we don't run out of stack in scheduler context
|
||||
let (tx, rx) = channel();
|
||||
for _ in 0..10000 {
|
||||
tx.send(()).unwrap();
|
||||
}
|
||||
for _ in 0..10000 {
|
||||
rx.recv().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn shared_recv_timeout() {
|
||||
let (tx, rx) = channel();
|
||||
let total = 5;
|
||||
for _ in 0..total {
|
||||
let tx = tx.clone();
|
||||
thread::spawn(move || {
|
||||
tx.send(()).unwrap();
|
||||
});
|
||||
}
|
||||
|
||||
for _ in 0..total {
|
||||
rx.recv().unwrap();
|
||||
}
|
||||
|
||||
assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
|
||||
tx.send(()).unwrap();
|
||||
assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn shared_chan_stress() {
|
||||
let (tx, rx) = channel();
|
||||
let total = stress_factor() + 100;
|
||||
for _ in 0..total {
|
||||
let tx = tx.clone();
|
||||
thread::spawn(move || {
|
||||
tx.send(()).unwrap();
|
||||
});
|
||||
}
|
||||
|
||||
for _ in 0..total {
|
||||
rx.recv().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_nested_recv_iter() {
|
||||
let (tx, rx) = channel::<i32>();
|
||||
let (total_tx, total_rx) = channel::<i32>();
|
||||
|
||||
let _t = thread::spawn(move || {
|
||||
let mut acc = 0;
|
||||
for x in rx.iter() {
|
||||
acc += x;
|
||||
}
|
||||
total_tx.send(acc).unwrap();
|
||||
});
|
||||
|
||||
tx.send(3).unwrap();
|
||||
tx.send(1).unwrap();
|
||||
tx.send(2).unwrap();
|
||||
drop(tx);
|
||||
assert_eq!(total_rx.recv().unwrap(), 6);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_recv_iter_break() {
|
||||
let (tx, rx) = channel::<i32>();
|
||||
let (count_tx, count_rx) = channel();
|
||||
|
||||
let _t = thread::spawn(move || {
|
||||
let mut count = 0;
|
||||
for x in rx.iter() {
|
||||
if count >= 3 {
|
||||
break;
|
||||
} else {
|
||||
count += x;
|
||||
}
|
||||
}
|
||||
count_tx.send(count).unwrap();
|
||||
});
|
||||
|
||||
tx.send(2).unwrap();
|
||||
tx.send(2).unwrap();
|
||||
tx.send(2).unwrap();
|
||||
let _ = tx.send(2);
|
||||
drop(tx);
|
||||
assert_eq!(count_rx.recv().unwrap(), 4);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_recv_try_iter() {
|
||||
let (request_tx, request_rx) = channel();
|
||||
let (response_tx, response_rx) = channel();
|
||||
|
||||
// Request `x`s until we have `6`.
|
||||
let t = thread::spawn(move || {
|
||||
let mut count = 0;
|
||||
loop {
|
||||
for x in response_rx.try_iter() {
|
||||
count += x;
|
||||
if count == 6 {
|
||||
return count;
|
||||
}
|
||||
}
|
||||
request_tx.send(()).unwrap();
|
||||
}
|
||||
});
|
||||
|
||||
for _ in request_rx.iter() {
|
||||
if response_tx.send(2).is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(t.join().unwrap(), 6);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_recv_into_iter_owned() {
|
||||
let mut iter = {
|
||||
let (tx, rx) = channel::<i32>();
|
||||
tx.send(1).unwrap();
|
||||
tx.send(2).unwrap();
|
||||
|
||||
rx.into_iter()
|
||||
};
|
||||
assert_eq!(iter.next().unwrap(), 1);
|
||||
assert_eq!(iter.next().unwrap(), 2);
|
||||
assert_eq!(iter.next().is_none(), true);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_recv_into_iter_borrowed() {
|
||||
let (tx, rx) = channel::<i32>();
|
||||
tx.send(1).unwrap();
|
||||
tx.send(2).unwrap();
|
||||
drop(tx);
|
||||
let mut iter = (&rx).into_iter();
|
||||
assert_eq!(iter.next().unwrap(), 1);
|
||||
assert_eq!(iter.next().unwrap(), 2);
|
||||
assert_eq!(iter.next().is_none(), true);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn try_recv_states() {
|
||||
let (tx1, rx1) = channel::<i32>();
|
||||
let (tx2, rx2) = channel::<()>();
|
||||
let (tx3, rx3) = channel::<()>();
|
||||
let _t = thread::spawn(move || {
|
||||
rx2.recv().unwrap();
|
||||
tx1.send(1).unwrap();
|
||||
tx3.send(()).unwrap();
|
||||
rx2.recv().unwrap();
|
||||
drop(tx1);
|
||||
tx3.send(()).unwrap();
|
||||
});
|
||||
|
||||
assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
|
||||
tx2.send(()).unwrap();
|
||||
rx3.recv().unwrap();
|
||||
assert_eq!(rx1.try_recv(), Ok(1));
|
||||
assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
|
||||
tx2.send(()).unwrap();
|
||||
rx3.recv().unwrap();
|
||||
assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
|
||||
}
|
||||
|
||||
// This bug used to end up in a livelock inside of the Receiver destructor
|
||||
// because the internal state of the Shared packet was corrupted
|
||||
#[test]
|
||||
fn destroy_upgraded_shared_port_when_sender_still_active() {
|
||||
let (tx, rx) = channel();
|
||||
let (tx2, rx2) = channel();
|
||||
let _t = thread::spawn(move || {
|
||||
rx.recv().unwrap(); // wait on a oneshot
|
||||
drop(rx); // destroy a shared
|
||||
tx2.send(()).unwrap();
|
||||
});
|
||||
// make sure the other thread has gone to sleep
|
||||
for _ in 0..5000 {
|
||||
thread::yield_now();
|
||||
}
|
||||
|
||||
// upgrade to a shared chan and send a message
|
||||
let t = tx.clone();
|
||||
drop(tx);
|
||||
t.send(()).unwrap();
|
||||
|
||||
// wait for the child thread to exit before we exit
|
||||
rx2.recv().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn issue_32114() {
|
||||
let (tx, _) = channel();
|
||||
let _ = tx.send(123);
|
||||
assert_eq!(tx.send(123), Err(SendError(123)));
|
||||
}
|
||||
@@ -1,3 +1,6 @@
|
||||
#[cfg(all(test, not(target_os = "emscripten")))]
|
||||
mod tests;
|
||||
|
||||
use crate::cell::UnsafeCell;
|
||||
use crate::fmt;
|
||||
use crate::mem;
|
||||
@@ -515,245 +518,3 @@ pub fn guard_lock<'a, T: ?Sized>(guard: &MutexGuard<'a, T>) -> &'a sys::Mutex {
|
||||
pub fn guard_poison<'a, T: ?Sized>(guard: &MutexGuard<'a, T>) -> &'a poison::Flag {
|
||||
&guard.lock.poison
|
||||
}
|
||||
|
||||
#[cfg(all(test, not(target_os = "emscripten")))]
|
||||
mod tests {
|
||||
use crate::sync::atomic::{AtomicUsize, Ordering};
|
||||
use crate::sync::mpsc::channel;
|
||||
use crate::sync::{Arc, Condvar, Mutex};
|
||||
use crate::thread;
|
||||
|
||||
struct Packet<T>(Arc<(Mutex<T>, Condvar)>);
|
||||
|
||||
#[derive(Eq, PartialEq, Debug)]
|
||||
struct NonCopy(i32);
|
||||
|
||||
#[test]
|
||||
fn smoke() {
|
||||
let m = Mutex::new(());
|
||||
drop(m.lock().unwrap());
|
||||
drop(m.lock().unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn lots_and_lots() {
|
||||
const J: u32 = 1000;
|
||||
const K: u32 = 3;
|
||||
|
||||
let m = Arc::new(Mutex::new(0));
|
||||
|
||||
fn inc(m: &Mutex<u32>) {
|
||||
for _ in 0..J {
|
||||
*m.lock().unwrap() += 1;
|
||||
}
|
||||
}
|
||||
|
||||
let (tx, rx) = channel();
|
||||
for _ in 0..K {
|
||||
let tx2 = tx.clone();
|
||||
let m2 = m.clone();
|
||||
thread::spawn(move || {
|
||||
inc(&m2);
|
||||
tx2.send(()).unwrap();
|
||||
});
|
||||
let tx2 = tx.clone();
|
||||
let m2 = m.clone();
|
||||
thread::spawn(move || {
|
||||
inc(&m2);
|
||||
tx2.send(()).unwrap();
|
||||
});
|
||||
}
|
||||
|
||||
drop(tx);
|
||||
for _ in 0..2 * K {
|
||||
rx.recv().unwrap();
|
||||
}
|
||||
assert_eq!(*m.lock().unwrap(), J * K * 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn try_lock() {
|
||||
let m = Mutex::new(());
|
||||
*m.try_lock().unwrap() = ();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_into_inner() {
|
||||
let m = Mutex::new(NonCopy(10));
|
||||
assert_eq!(m.into_inner().unwrap(), NonCopy(10));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_into_inner_drop() {
|
||||
struct Foo(Arc<AtomicUsize>);
|
||||
impl Drop for Foo {
|
||||
fn drop(&mut self) {
|
||||
self.0.fetch_add(1, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
let num_drops = Arc::new(AtomicUsize::new(0));
|
||||
let m = Mutex::new(Foo(num_drops.clone()));
|
||||
assert_eq!(num_drops.load(Ordering::SeqCst), 0);
|
||||
{
|
||||
let _inner = m.into_inner().unwrap();
|
||||
assert_eq!(num_drops.load(Ordering::SeqCst), 0);
|
||||
}
|
||||
assert_eq!(num_drops.load(Ordering::SeqCst), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_into_inner_poison() {
|
||||
let m = Arc::new(Mutex::new(NonCopy(10)));
|
||||
let m2 = m.clone();
|
||||
let _ = thread::spawn(move || {
|
||||
let _lock = m2.lock().unwrap();
|
||||
panic!("test panic in inner thread to poison mutex");
|
||||
})
|
||||
.join();
|
||||
|
||||
assert!(m.is_poisoned());
|
||||
match Arc::try_unwrap(m).unwrap().into_inner() {
|
||||
Err(e) => assert_eq!(e.into_inner(), NonCopy(10)),
|
||||
Ok(x) => panic!("into_inner of poisoned Mutex is Ok: {:?}", x),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_mut() {
|
||||
let mut m = Mutex::new(NonCopy(10));
|
||||
*m.get_mut().unwrap() = NonCopy(20);
|
||||
assert_eq!(m.into_inner().unwrap(), NonCopy(20));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_mut_poison() {
|
||||
let m = Arc::new(Mutex::new(NonCopy(10)));
|
||||
let m2 = m.clone();
|
||||
let _ = thread::spawn(move || {
|
||||
let _lock = m2.lock().unwrap();
|
||||
panic!("test panic in inner thread to poison mutex");
|
||||
})
|
||||
.join();
|
||||
|
||||
assert!(m.is_poisoned());
|
||||
match Arc::try_unwrap(m).unwrap().get_mut() {
|
||||
Err(e) => assert_eq!(*e.into_inner(), NonCopy(10)),
|
||||
Ok(x) => panic!("get_mut of poisoned Mutex is Ok: {:?}", x),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_mutex_arc_condvar() {
|
||||
let packet = Packet(Arc::new((Mutex::new(false), Condvar::new())));
|
||||
let packet2 = Packet(packet.0.clone());
|
||||
let (tx, rx) = channel();
|
||||
let _t = thread::spawn(move || {
|
||||
// wait until parent gets in
|
||||
rx.recv().unwrap();
|
||||
let &(ref lock, ref cvar) = &*packet2.0;
|
||||
let mut lock = lock.lock().unwrap();
|
||||
*lock = true;
|
||||
cvar.notify_one();
|
||||
});
|
||||
|
||||
let &(ref lock, ref cvar) = &*packet.0;
|
||||
let mut lock = lock.lock().unwrap();
|
||||
tx.send(()).unwrap();
|
||||
assert!(!*lock);
|
||||
while !*lock {
|
||||
lock = cvar.wait(lock).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_arc_condvar_poison() {
|
||||
let packet = Packet(Arc::new((Mutex::new(1), Condvar::new())));
|
||||
let packet2 = Packet(packet.0.clone());
|
||||
let (tx, rx) = channel();
|
||||
|
||||
let _t = thread::spawn(move || -> () {
|
||||
rx.recv().unwrap();
|
||||
let &(ref lock, ref cvar) = &*packet2.0;
|
||||
let _g = lock.lock().unwrap();
|
||||
cvar.notify_one();
|
||||
// Parent should fail when it wakes up.
|
||||
panic!();
|
||||
});
|
||||
|
||||
let &(ref lock, ref cvar) = &*packet.0;
|
||||
let mut lock = lock.lock().unwrap();
|
||||
tx.send(()).unwrap();
|
||||
while *lock == 1 {
|
||||
match cvar.wait(lock) {
|
||||
Ok(l) => {
|
||||
lock = l;
|
||||
assert_eq!(*lock, 1);
|
||||
}
|
||||
Err(..) => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_mutex_arc_poison() {
|
||||
let arc = Arc::new(Mutex::new(1));
|
||||
assert!(!arc.is_poisoned());
|
||||
let arc2 = arc.clone();
|
||||
let _ = thread::spawn(move || {
|
||||
let lock = arc2.lock().unwrap();
|
||||
assert_eq!(*lock, 2);
|
||||
})
|
||||
.join();
|
||||
assert!(arc.lock().is_err());
|
||||
assert!(arc.is_poisoned());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_mutex_arc_nested() {
|
||||
// Tests nested mutexes and access
|
||||
// to underlying data.
|
||||
let arc = Arc::new(Mutex::new(1));
|
||||
let arc2 = Arc::new(Mutex::new(arc));
|
||||
let (tx, rx) = channel();
|
||||
let _t = thread::spawn(move || {
|
||||
let lock = arc2.lock().unwrap();
|
||||
let lock2 = lock.lock().unwrap();
|
||||
assert_eq!(*lock2, 1);
|
||||
tx.send(()).unwrap();
|
||||
});
|
||||
rx.recv().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_mutex_arc_access_in_unwind() {
|
||||
let arc = Arc::new(Mutex::new(1));
|
||||
let arc2 = arc.clone();
|
||||
let _ = thread::spawn(move || -> () {
|
||||
struct Unwinder {
|
||||
i: Arc<Mutex<i32>>,
|
||||
}
|
||||
impl Drop for Unwinder {
|
||||
fn drop(&mut self) {
|
||||
*self.i.lock().unwrap() += 1;
|
||||
}
|
||||
}
|
||||
let _u = Unwinder { i: arc2 };
|
||||
panic!();
|
||||
})
|
||||
.join();
|
||||
let lock = arc.lock().unwrap();
|
||||
assert_eq!(*lock, 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_mutex_unsized() {
|
||||
let mutex: &Mutex<[i32]> = &Mutex::new([1, 2, 3]);
|
||||
{
|
||||
let b = &mut *mutex.lock().unwrap();
|
||||
b[0] = 4;
|
||||
b[2] = 5;
|
||||
}
|
||||
let comp: &[i32] = &[4, 2, 5];
|
||||
assert_eq!(&*mutex.lock().unwrap(), comp);
|
||||
}
|
||||
}
|
||||
|
||||
238
library/std/src/sync/mutex/tests.rs
Normal file
238
library/std/src/sync/mutex/tests.rs
Normal file
@@ -0,0 +1,238 @@
|
||||
use crate::sync::atomic::{AtomicUsize, Ordering};
|
||||
use crate::sync::mpsc::channel;
|
||||
use crate::sync::{Arc, Condvar, Mutex};
|
||||
use crate::thread;
|
||||
|
||||
struct Packet<T>(Arc<(Mutex<T>, Condvar)>);
|
||||
|
||||
#[derive(Eq, PartialEq, Debug)]
|
||||
struct NonCopy(i32);
|
||||
|
||||
#[test]
|
||||
fn smoke() {
|
||||
let m = Mutex::new(());
|
||||
drop(m.lock().unwrap());
|
||||
drop(m.lock().unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn lots_and_lots() {
|
||||
const J: u32 = 1000;
|
||||
const K: u32 = 3;
|
||||
|
||||
let m = Arc::new(Mutex::new(0));
|
||||
|
||||
fn inc(m: &Mutex<u32>) {
|
||||
for _ in 0..J {
|
||||
*m.lock().unwrap() += 1;
|
||||
}
|
||||
}
|
||||
|
||||
let (tx, rx) = channel();
|
||||
for _ in 0..K {
|
||||
let tx2 = tx.clone();
|
||||
let m2 = m.clone();
|
||||
thread::spawn(move || {
|
||||
inc(&m2);
|
||||
tx2.send(()).unwrap();
|
||||
});
|
||||
let tx2 = tx.clone();
|
||||
let m2 = m.clone();
|
||||
thread::spawn(move || {
|
||||
inc(&m2);
|
||||
tx2.send(()).unwrap();
|
||||
});
|
||||
}
|
||||
|
||||
drop(tx);
|
||||
for _ in 0..2 * K {
|
||||
rx.recv().unwrap();
|
||||
}
|
||||
assert_eq!(*m.lock().unwrap(), J * K * 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn try_lock() {
|
||||
let m = Mutex::new(());
|
||||
*m.try_lock().unwrap() = ();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_into_inner() {
|
||||
let m = Mutex::new(NonCopy(10));
|
||||
assert_eq!(m.into_inner().unwrap(), NonCopy(10));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_into_inner_drop() {
|
||||
struct Foo(Arc<AtomicUsize>);
|
||||
impl Drop for Foo {
|
||||
fn drop(&mut self) {
|
||||
self.0.fetch_add(1, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
let num_drops = Arc::new(AtomicUsize::new(0));
|
||||
let m = Mutex::new(Foo(num_drops.clone()));
|
||||
assert_eq!(num_drops.load(Ordering::SeqCst), 0);
|
||||
{
|
||||
let _inner = m.into_inner().unwrap();
|
||||
assert_eq!(num_drops.load(Ordering::SeqCst), 0);
|
||||
}
|
||||
assert_eq!(num_drops.load(Ordering::SeqCst), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_into_inner_poison() {
|
||||
let m = Arc::new(Mutex::new(NonCopy(10)));
|
||||
let m2 = m.clone();
|
||||
let _ = thread::spawn(move || {
|
||||
let _lock = m2.lock().unwrap();
|
||||
panic!("test panic in inner thread to poison mutex");
|
||||
})
|
||||
.join();
|
||||
|
||||
assert!(m.is_poisoned());
|
||||
match Arc::try_unwrap(m).unwrap().into_inner() {
|
||||
Err(e) => assert_eq!(e.into_inner(), NonCopy(10)),
|
||||
Ok(x) => panic!("into_inner of poisoned Mutex is Ok: {:?}", x),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_mut() {
|
||||
let mut m = Mutex::new(NonCopy(10));
|
||||
*m.get_mut().unwrap() = NonCopy(20);
|
||||
assert_eq!(m.into_inner().unwrap(), NonCopy(20));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_mut_poison() {
|
||||
let m = Arc::new(Mutex::new(NonCopy(10)));
|
||||
let m2 = m.clone();
|
||||
let _ = thread::spawn(move || {
|
||||
let _lock = m2.lock().unwrap();
|
||||
panic!("test panic in inner thread to poison mutex");
|
||||
})
|
||||
.join();
|
||||
|
||||
assert!(m.is_poisoned());
|
||||
match Arc::try_unwrap(m).unwrap().get_mut() {
|
||||
Err(e) => assert_eq!(*e.into_inner(), NonCopy(10)),
|
||||
Ok(x) => panic!("get_mut of poisoned Mutex is Ok: {:?}", x),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_mutex_arc_condvar() {
|
||||
let packet = Packet(Arc::new((Mutex::new(false), Condvar::new())));
|
||||
let packet2 = Packet(packet.0.clone());
|
||||
let (tx, rx) = channel();
|
||||
let _t = thread::spawn(move || {
|
||||
// wait until parent gets in
|
||||
rx.recv().unwrap();
|
||||
let &(ref lock, ref cvar) = &*packet2.0;
|
||||
let mut lock = lock.lock().unwrap();
|
||||
*lock = true;
|
||||
cvar.notify_one();
|
||||
});
|
||||
|
||||
let &(ref lock, ref cvar) = &*packet.0;
|
||||
let mut lock = lock.lock().unwrap();
|
||||
tx.send(()).unwrap();
|
||||
assert!(!*lock);
|
||||
while !*lock {
|
||||
lock = cvar.wait(lock).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_arc_condvar_poison() {
|
||||
let packet = Packet(Arc::new((Mutex::new(1), Condvar::new())));
|
||||
let packet2 = Packet(packet.0.clone());
|
||||
let (tx, rx) = channel();
|
||||
|
||||
let _t = thread::spawn(move || -> () {
|
||||
rx.recv().unwrap();
|
||||
let &(ref lock, ref cvar) = &*packet2.0;
|
||||
let _g = lock.lock().unwrap();
|
||||
cvar.notify_one();
|
||||
// Parent should fail when it wakes up.
|
||||
panic!();
|
||||
});
|
||||
|
||||
let &(ref lock, ref cvar) = &*packet.0;
|
||||
let mut lock = lock.lock().unwrap();
|
||||
tx.send(()).unwrap();
|
||||
while *lock == 1 {
|
||||
match cvar.wait(lock) {
|
||||
Ok(l) => {
|
||||
lock = l;
|
||||
assert_eq!(*lock, 1);
|
||||
}
|
||||
Err(..) => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_mutex_arc_poison() {
|
||||
let arc = Arc::new(Mutex::new(1));
|
||||
assert!(!arc.is_poisoned());
|
||||
let arc2 = arc.clone();
|
||||
let _ = thread::spawn(move || {
|
||||
let lock = arc2.lock().unwrap();
|
||||
assert_eq!(*lock, 2);
|
||||
})
|
||||
.join();
|
||||
assert!(arc.lock().is_err());
|
||||
assert!(arc.is_poisoned());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_mutex_arc_nested() {
|
||||
// Tests nested mutexes and access
|
||||
// to underlying data.
|
||||
let arc = Arc::new(Mutex::new(1));
|
||||
let arc2 = Arc::new(Mutex::new(arc));
|
||||
let (tx, rx) = channel();
|
||||
let _t = thread::spawn(move || {
|
||||
let lock = arc2.lock().unwrap();
|
||||
let lock2 = lock.lock().unwrap();
|
||||
assert_eq!(*lock2, 1);
|
||||
tx.send(()).unwrap();
|
||||
});
|
||||
rx.recv().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_mutex_arc_access_in_unwind() {
|
||||
let arc = Arc::new(Mutex::new(1));
|
||||
let arc2 = arc.clone();
|
||||
let _ = thread::spawn(move || -> () {
|
||||
struct Unwinder {
|
||||
i: Arc<Mutex<i32>>,
|
||||
}
|
||||
impl Drop for Unwinder {
|
||||
fn drop(&mut self) {
|
||||
*self.i.lock().unwrap() += 1;
|
||||
}
|
||||
}
|
||||
let _u = Unwinder { i: arc2 };
|
||||
panic!();
|
||||
})
|
||||
.join();
|
||||
let lock = arc.lock().unwrap();
|
||||
assert_eq!(*lock, 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_mutex_unsized() {
|
||||
let mutex: &Mutex<[i32]> = &Mutex::new([1, 2, 3]);
|
||||
{
|
||||
let b = &mut *mutex.lock().unwrap();
|
||||
b[0] = 4;
|
||||
b[2] = 5;
|
||||
}
|
||||
let comp: &[i32] = &[4, 2, 5];
|
||||
assert_eq!(&*mutex.lock().unwrap(), comp);
|
||||
}
|
||||
@@ -84,6 +84,9 @@
|
||||
// processor. Because both use Acquire ordering such a reordering is not
|
||||
// allowed, so no need for SeqCst.
|
||||
|
||||
#[cfg(all(test, not(target_os = "emscripten")))]
|
||||
mod tests;
|
||||
|
||||
use crate::cell::Cell;
|
||||
use crate::fmt;
|
||||
use crate::marker;
|
||||
@@ -568,123 +571,3 @@ impl OnceState {
|
||||
self.set_state_on_drop_to.set(POISONED);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(test, not(target_os = "emscripten")))]
|
||||
mod tests {
|
||||
use super::Once;
|
||||
use crate::panic;
|
||||
use crate::sync::mpsc::channel;
|
||||
use crate::thread;
|
||||
|
||||
#[test]
|
||||
fn smoke_once() {
|
||||
static O: Once = Once::new();
|
||||
let mut a = 0;
|
||||
O.call_once(|| a += 1);
|
||||
assert_eq!(a, 1);
|
||||
O.call_once(|| a += 1);
|
||||
assert_eq!(a, 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stampede_once() {
|
||||
static O: Once = Once::new();
|
||||
static mut RUN: bool = false;
|
||||
|
||||
let (tx, rx) = channel();
|
||||
for _ in 0..10 {
|
||||
let tx = tx.clone();
|
||||
thread::spawn(move || {
|
||||
for _ in 0..4 {
|
||||
thread::yield_now()
|
||||
}
|
||||
unsafe {
|
||||
O.call_once(|| {
|
||||
assert!(!RUN);
|
||||
RUN = true;
|
||||
});
|
||||
assert!(RUN);
|
||||
}
|
||||
tx.send(()).unwrap();
|
||||
});
|
||||
}
|
||||
|
||||
unsafe {
|
||||
O.call_once(|| {
|
||||
assert!(!RUN);
|
||||
RUN = true;
|
||||
});
|
||||
assert!(RUN);
|
||||
}
|
||||
|
||||
for _ in 0..10 {
|
||||
rx.recv().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn poison_bad() {
|
||||
static O: Once = Once::new();
|
||||
|
||||
// poison the once
|
||||
let t = panic::catch_unwind(|| {
|
||||
O.call_once(|| panic!());
|
||||
});
|
||||
assert!(t.is_err());
|
||||
|
||||
// poisoning propagates
|
||||
let t = panic::catch_unwind(|| {
|
||||
O.call_once(|| {});
|
||||
});
|
||||
assert!(t.is_err());
|
||||
|
||||
// we can subvert poisoning, however
|
||||
let mut called = false;
|
||||
O.call_once_force(|p| {
|
||||
called = true;
|
||||
assert!(p.poisoned())
|
||||
});
|
||||
assert!(called);
|
||||
|
||||
// once any success happens, we stop propagating the poison
|
||||
O.call_once(|| {});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn wait_for_force_to_finish() {
|
||||
static O: Once = Once::new();
|
||||
|
||||
// poison the once
|
||||
let t = panic::catch_unwind(|| {
|
||||
O.call_once(|| panic!());
|
||||
});
|
||||
assert!(t.is_err());
|
||||
|
||||
// make sure someone's waiting inside the once via a force
|
||||
let (tx1, rx1) = channel();
|
||||
let (tx2, rx2) = channel();
|
||||
let t1 = thread::spawn(move || {
|
||||
O.call_once_force(|p| {
|
||||
assert!(p.poisoned());
|
||||
tx1.send(()).unwrap();
|
||||
rx2.recv().unwrap();
|
||||
});
|
||||
});
|
||||
|
||||
rx1.recv().unwrap();
|
||||
|
||||
// put another waiter on the once
|
||||
let t2 = thread::spawn(|| {
|
||||
let mut called = false;
|
||||
O.call_once(|| {
|
||||
called = true;
|
||||
});
|
||||
assert!(!called);
|
||||
});
|
||||
|
||||
tx2.send(()).unwrap();
|
||||
|
||||
assert!(t1.join().is_ok());
|
||||
assert!(t2.join().is_ok());
|
||||
}
|
||||
}
|
||||
|
||||
116
library/std/src/sync/once/tests.rs
Normal file
116
library/std/src/sync/once/tests.rs
Normal file
@@ -0,0 +1,116 @@
|
||||
use super::Once;
|
||||
use crate::panic;
|
||||
use crate::sync::mpsc::channel;
|
||||
use crate::thread;
|
||||
|
||||
#[test]
|
||||
fn smoke_once() {
|
||||
static O: Once = Once::new();
|
||||
let mut a = 0;
|
||||
O.call_once(|| a += 1);
|
||||
assert_eq!(a, 1);
|
||||
O.call_once(|| a += 1);
|
||||
assert_eq!(a, 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stampede_once() {
|
||||
static O: Once = Once::new();
|
||||
static mut RUN: bool = false;
|
||||
|
||||
let (tx, rx) = channel();
|
||||
for _ in 0..10 {
|
||||
let tx = tx.clone();
|
||||
thread::spawn(move || {
|
||||
for _ in 0..4 {
|
||||
thread::yield_now()
|
||||
}
|
||||
unsafe {
|
||||
O.call_once(|| {
|
||||
assert!(!RUN);
|
||||
RUN = true;
|
||||
});
|
||||
assert!(RUN);
|
||||
}
|
||||
tx.send(()).unwrap();
|
||||
});
|
||||
}
|
||||
|
||||
unsafe {
|
||||
O.call_once(|| {
|
||||
assert!(!RUN);
|
||||
RUN = true;
|
||||
});
|
||||
assert!(RUN);
|
||||
}
|
||||
|
||||
for _ in 0..10 {
|
||||
rx.recv().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn poison_bad() {
|
||||
static O: Once = Once::new();
|
||||
|
||||
// poison the once
|
||||
let t = panic::catch_unwind(|| {
|
||||
O.call_once(|| panic!());
|
||||
});
|
||||
assert!(t.is_err());
|
||||
|
||||
// poisoning propagates
|
||||
let t = panic::catch_unwind(|| {
|
||||
O.call_once(|| {});
|
||||
});
|
||||
assert!(t.is_err());
|
||||
|
||||
// we can subvert poisoning, however
|
||||
let mut called = false;
|
||||
O.call_once_force(|p| {
|
||||
called = true;
|
||||
assert!(p.poisoned())
|
||||
});
|
||||
assert!(called);
|
||||
|
||||
// once any success happens, we stop propagating the poison
|
||||
O.call_once(|| {});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn wait_for_force_to_finish() {
|
||||
static O: Once = Once::new();
|
||||
|
||||
// poison the once
|
||||
let t = panic::catch_unwind(|| {
|
||||
O.call_once(|| panic!());
|
||||
});
|
||||
assert!(t.is_err());
|
||||
|
||||
// make sure someone's waiting inside the once via a force
|
||||
let (tx1, rx1) = channel();
|
||||
let (tx2, rx2) = channel();
|
||||
let t1 = thread::spawn(move || {
|
||||
O.call_once_force(|p| {
|
||||
assert!(p.poisoned());
|
||||
tx1.send(()).unwrap();
|
||||
rx2.recv().unwrap();
|
||||
});
|
||||
});
|
||||
|
||||
rx1.recv().unwrap();
|
||||
|
||||
// put another waiter on the once
|
||||
let t2 = thread::spawn(|| {
|
||||
let mut called = false;
|
||||
O.call_once(|| {
|
||||
called = true;
|
||||
});
|
||||
assert!(!called);
|
||||
});
|
||||
|
||||
tx2.send(()).unwrap();
|
||||
|
||||
assert!(t1.join().is_ok());
|
||||
assert!(t2.join().is_ok());
|
||||
}
|
||||
@@ -1,3 +1,6 @@
|
||||
#[cfg(all(test, not(target_os = "emscripten")))]
|
||||
mod tests;
|
||||
|
||||
use crate::cell::UnsafeCell;
|
||||
use crate::fmt;
|
||||
use crate::mem;
|
||||
@@ -538,254 +541,3 @@ impl<T: ?Sized> Drop for RwLockWriteGuard<'_, T> {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(test, not(target_os = "emscripten")))]
|
||||
mod tests {
|
||||
use crate::sync::atomic::{AtomicUsize, Ordering};
|
||||
use crate::sync::mpsc::channel;
|
||||
use crate::sync::{Arc, RwLock, TryLockError};
|
||||
use crate::thread;
|
||||
use rand::{self, Rng};
|
||||
|
||||
#[derive(Eq, PartialEq, Debug)]
|
||||
struct NonCopy(i32);
|
||||
|
||||
#[test]
|
||||
fn smoke() {
|
||||
let l = RwLock::new(());
|
||||
drop(l.read().unwrap());
|
||||
drop(l.write().unwrap());
|
||||
drop((l.read().unwrap(), l.read().unwrap()));
|
||||
drop(l.write().unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn frob() {
|
||||
const N: u32 = 10;
|
||||
const M: usize = 1000;
|
||||
|
||||
let r = Arc::new(RwLock::new(()));
|
||||
|
||||
let (tx, rx) = channel::<()>();
|
||||
for _ in 0..N {
|
||||
let tx = tx.clone();
|
||||
let r = r.clone();
|
||||
thread::spawn(move || {
|
||||
let mut rng = rand::thread_rng();
|
||||
for _ in 0..M {
|
||||
if rng.gen_bool(1.0 / (N as f64)) {
|
||||
drop(r.write().unwrap());
|
||||
} else {
|
||||
drop(r.read().unwrap());
|
||||
}
|
||||
}
|
||||
drop(tx);
|
||||
});
|
||||
}
|
||||
drop(tx);
|
||||
let _ = rx.recv();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_rw_arc_poison_wr() {
|
||||
let arc = Arc::new(RwLock::new(1));
|
||||
let arc2 = arc.clone();
|
||||
let _: Result<(), _> = thread::spawn(move || {
|
||||
let _lock = arc2.write().unwrap();
|
||||
panic!();
|
||||
})
|
||||
.join();
|
||||
assert!(arc.read().is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_rw_arc_poison_ww() {
|
||||
let arc = Arc::new(RwLock::new(1));
|
||||
assert!(!arc.is_poisoned());
|
||||
let arc2 = arc.clone();
|
||||
let _: Result<(), _> = thread::spawn(move || {
|
||||
let _lock = arc2.write().unwrap();
|
||||
panic!();
|
||||
})
|
||||
.join();
|
||||
assert!(arc.write().is_err());
|
||||
assert!(arc.is_poisoned());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_rw_arc_no_poison_rr() {
|
||||
let arc = Arc::new(RwLock::new(1));
|
||||
let arc2 = arc.clone();
|
||||
let _: Result<(), _> = thread::spawn(move || {
|
||||
let _lock = arc2.read().unwrap();
|
||||
panic!();
|
||||
})
|
||||
.join();
|
||||
let lock = arc.read().unwrap();
|
||||
assert_eq!(*lock, 1);
|
||||
}
|
||||
#[test]
|
||||
fn test_rw_arc_no_poison_rw() {
|
||||
let arc = Arc::new(RwLock::new(1));
|
||||
let arc2 = arc.clone();
|
||||
let _: Result<(), _> = thread::spawn(move || {
|
||||
let _lock = arc2.read().unwrap();
|
||||
panic!()
|
||||
})
|
||||
.join();
|
||||
let lock = arc.write().unwrap();
|
||||
assert_eq!(*lock, 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_rw_arc() {
|
||||
let arc = Arc::new(RwLock::new(0));
|
||||
let arc2 = arc.clone();
|
||||
let (tx, rx) = channel();
|
||||
|
||||
thread::spawn(move || {
|
||||
let mut lock = arc2.write().unwrap();
|
||||
for _ in 0..10 {
|
||||
let tmp = *lock;
|
||||
*lock = -1;
|
||||
thread::yield_now();
|
||||
*lock = tmp + 1;
|
||||
}
|
||||
tx.send(()).unwrap();
|
||||
});
|
||||
|
||||
// Readers try to catch the writer in the act
|
||||
let mut children = Vec::new();
|
||||
for _ in 0..5 {
|
||||
let arc3 = arc.clone();
|
||||
children.push(thread::spawn(move || {
|
||||
let lock = arc3.read().unwrap();
|
||||
assert!(*lock >= 0);
|
||||
}));
|
||||
}
|
||||
|
||||
// Wait for children to pass their asserts
|
||||
for r in children {
|
||||
assert!(r.join().is_ok());
|
||||
}
|
||||
|
||||
// Wait for writer to finish
|
||||
rx.recv().unwrap();
|
||||
let lock = arc.read().unwrap();
|
||||
assert_eq!(*lock, 10);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_rw_arc_access_in_unwind() {
|
||||
let arc = Arc::new(RwLock::new(1));
|
||||
let arc2 = arc.clone();
|
||||
let _ = thread::spawn(move || -> () {
|
||||
struct Unwinder {
|
||||
i: Arc<RwLock<isize>>,
|
||||
}
|
||||
impl Drop for Unwinder {
|
||||
fn drop(&mut self) {
|
||||
let mut lock = self.i.write().unwrap();
|
||||
*lock += 1;
|
||||
}
|
||||
}
|
||||
let _u = Unwinder { i: arc2 };
|
||||
panic!();
|
||||
})
|
||||
.join();
|
||||
let lock = arc.read().unwrap();
|
||||
assert_eq!(*lock, 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_rwlock_unsized() {
|
||||
let rw: &RwLock<[i32]> = &RwLock::new([1, 2, 3]);
|
||||
{
|
||||
let b = &mut *rw.write().unwrap();
|
||||
b[0] = 4;
|
||||
b[2] = 5;
|
||||
}
|
||||
let comp: &[i32] = &[4, 2, 5];
|
||||
assert_eq!(&*rw.read().unwrap(), comp);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_rwlock_try_write() {
|
||||
let lock = RwLock::new(0isize);
|
||||
let read_guard = lock.read().unwrap();
|
||||
|
||||
let write_result = lock.try_write();
|
||||
match write_result {
|
||||
Err(TryLockError::WouldBlock) => (),
|
||||
Ok(_) => assert!(false, "try_write should not succeed while read_guard is in scope"),
|
||||
Err(_) => assert!(false, "unexpected error"),
|
||||
}
|
||||
|
||||
drop(read_guard);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_into_inner() {
|
||||
let m = RwLock::new(NonCopy(10));
|
||||
assert_eq!(m.into_inner().unwrap(), NonCopy(10));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_into_inner_drop() {
|
||||
struct Foo(Arc<AtomicUsize>);
|
||||
impl Drop for Foo {
|
||||
fn drop(&mut self) {
|
||||
self.0.fetch_add(1, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
let num_drops = Arc::new(AtomicUsize::new(0));
|
||||
let m = RwLock::new(Foo(num_drops.clone()));
|
||||
assert_eq!(num_drops.load(Ordering::SeqCst), 0);
|
||||
{
|
||||
let _inner = m.into_inner().unwrap();
|
||||
assert_eq!(num_drops.load(Ordering::SeqCst), 0);
|
||||
}
|
||||
assert_eq!(num_drops.load(Ordering::SeqCst), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_into_inner_poison() {
|
||||
let m = Arc::new(RwLock::new(NonCopy(10)));
|
||||
let m2 = m.clone();
|
||||
let _ = thread::spawn(move || {
|
||||
let _lock = m2.write().unwrap();
|
||||
panic!("test panic in inner thread to poison RwLock");
|
||||
})
|
||||
.join();
|
||||
|
||||
assert!(m.is_poisoned());
|
||||
match Arc::try_unwrap(m).unwrap().into_inner() {
|
||||
Err(e) => assert_eq!(e.into_inner(), NonCopy(10)),
|
||||
Ok(x) => panic!("into_inner of poisoned RwLock is Ok: {:?}", x),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_mut() {
|
||||
let mut m = RwLock::new(NonCopy(10));
|
||||
*m.get_mut().unwrap() = NonCopy(20);
|
||||
assert_eq!(m.into_inner().unwrap(), NonCopy(20));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_mut_poison() {
|
||||
let m = Arc::new(RwLock::new(NonCopy(10)));
|
||||
let m2 = m.clone();
|
||||
let _ = thread::spawn(move || {
|
||||
let _lock = m2.write().unwrap();
|
||||
panic!("test panic in inner thread to poison RwLock");
|
||||
})
|
||||
.join();
|
||||
|
||||
assert!(m.is_poisoned());
|
||||
match Arc::try_unwrap(m).unwrap().get_mut() {
|
||||
Err(e) => assert_eq!(*e.into_inner(), NonCopy(10)),
|
||||
Ok(x) => panic!("get_mut of poisoned RwLock is Ok: {:?}", x),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
247
library/std/src/sync/rwlock/tests.rs
Normal file
247
library/std/src/sync/rwlock/tests.rs
Normal file
@@ -0,0 +1,247 @@
|
||||
use crate::sync::atomic::{AtomicUsize, Ordering};
|
||||
use crate::sync::mpsc::channel;
|
||||
use crate::sync::{Arc, RwLock, TryLockError};
|
||||
use crate::thread;
|
||||
use rand::{self, Rng};
|
||||
|
||||
#[derive(Eq, PartialEq, Debug)]
|
||||
struct NonCopy(i32);
|
||||
|
||||
#[test]
|
||||
fn smoke() {
|
||||
let l = RwLock::new(());
|
||||
drop(l.read().unwrap());
|
||||
drop(l.write().unwrap());
|
||||
drop((l.read().unwrap(), l.read().unwrap()));
|
||||
drop(l.write().unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn frob() {
|
||||
const N: u32 = 10;
|
||||
const M: usize = 1000;
|
||||
|
||||
let r = Arc::new(RwLock::new(()));
|
||||
|
||||
let (tx, rx) = channel::<()>();
|
||||
for _ in 0..N {
|
||||
let tx = tx.clone();
|
||||
let r = r.clone();
|
||||
thread::spawn(move || {
|
||||
let mut rng = rand::thread_rng();
|
||||
for _ in 0..M {
|
||||
if rng.gen_bool(1.0 / (N as f64)) {
|
||||
drop(r.write().unwrap());
|
||||
} else {
|
||||
drop(r.read().unwrap());
|
||||
}
|
||||
}
|
||||
drop(tx);
|
||||
});
|
||||
}
|
||||
drop(tx);
|
||||
let _ = rx.recv();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_rw_arc_poison_wr() {
|
||||
let arc = Arc::new(RwLock::new(1));
|
||||
let arc2 = arc.clone();
|
||||
let _: Result<(), _> = thread::spawn(move || {
|
||||
let _lock = arc2.write().unwrap();
|
||||
panic!();
|
||||
})
|
||||
.join();
|
||||
assert!(arc.read().is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_rw_arc_poison_ww() {
|
||||
let arc = Arc::new(RwLock::new(1));
|
||||
assert!(!arc.is_poisoned());
|
||||
let arc2 = arc.clone();
|
||||
let _: Result<(), _> = thread::spawn(move || {
|
||||
let _lock = arc2.write().unwrap();
|
||||
panic!();
|
||||
})
|
||||
.join();
|
||||
assert!(arc.write().is_err());
|
||||
assert!(arc.is_poisoned());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_rw_arc_no_poison_rr() {
|
||||
let arc = Arc::new(RwLock::new(1));
|
||||
let arc2 = arc.clone();
|
||||
let _: Result<(), _> = thread::spawn(move || {
|
||||
let _lock = arc2.read().unwrap();
|
||||
panic!();
|
||||
})
|
||||
.join();
|
||||
let lock = arc.read().unwrap();
|
||||
assert_eq!(*lock, 1);
|
||||
}
|
||||
#[test]
|
||||
fn test_rw_arc_no_poison_rw() {
|
||||
let arc = Arc::new(RwLock::new(1));
|
||||
let arc2 = arc.clone();
|
||||
let _: Result<(), _> = thread::spawn(move || {
|
||||
let _lock = arc2.read().unwrap();
|
||||
panic!()
|
||||
})
|
||||
.join();
|
||||
let lock = arc.write().unwrap();
|
||||
assert_eq!(*lock, 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_rw_arc() {
|
||||
let arc = Arc::new(RwLock::new(0));
|
||||
let arc2 = arc.clone();
|
||||
let (tx, rx) = channel();
|
||||
|
||||
thread::spawn(move || {
|
||||
let mut lock = arc2.write().unwrap();
|
||||
for _ in 0..10 {
|
||||
let tmp = *lock;
|
||||
*lock = -1;
|
||||
thread::yield_now();
|
||||
*lock = tmp + 1;
|
||||
}
|
||||
tx.send(()).unwrap();
|
||||
});
|
||||
|
||||
// Readers try to catch the writer in the act
|
||||
let mut children = Vec::new();
|
||||
for _ in 0..5 {
|
||||
let arc3 = arc.clone();
|
||||
children.push(thread::spawn(move || {
|
||||
let lock = arc3.read().unwrap();
|
||||
assert!(*lock >= 0);
|
||||
}));
|
||||
}
|
||||
|
||||
// Wait for children to pass their asserts
|
||||
for r in children {
|
||||
assert!(r.join().is_ok());
|
||||
}
|
||||
|
||||
// Wait for writer to finish
|
||||
rx.recv().unwrap();
|
||||
let lock = arc.read().unwrap();
|
||||
assert_eq!(*lock, 10);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_rw_arc_access_in_unwind() {
|
||||
let arc = Arc::new(RwLock::new(1));
|
||||
let arc2 = arc.clone();
|
||||
let _ = thread::spawn(move || -> () {
|
||||
struct Unwinder {
|
||||
i: Arc<RwLock<isize>>,
|
||||
}
|
||||
impl Drop for Unwinder {
|
||||
fn drop(&mut self) {
|
||||
let mut lock = self.i.write().unwrap();
|
||||
*lock += 1;
|
||||
}
|
||||
}
|
||||
let _u = Unwinder { i: arc2 };
|
||||
panic!();
|
||||
})
|
||||
.join();
|
||||
let lock = arc.read().unwrap();
|
||||
assert_eq!(*lock, 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_rwlock_unsized() {
|
||||
let rw: &RwLock<[i32]> = &RwLock::new([1, 2, 3]);
|
||||
{
|
||||
let b = &mut *rw.write().unwrap();
|
||||
b[0] = 4;
|
||||
b[2] = 5;
|
||||
}
|
||||
let comp: &[i32] = &[4, 2, 5];
|
||||
assert_eq!(&*rw.read().unwrap(), comp);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_rwlock_try_write() {
|
||||
let lock = RwLock::new(0isize);
|
||||
let read_guard = lock.read().unwrap();
|
||||
|
||||
let write_result = lock.try_write();
|
||||
match write_result {
|
||||
Err(TryLockError::WouldBlock) => (),
|
||||
Ok(_) => assert!(false, "try_write should not succeed while read_guard is in scope"),
|
||||
Err(_) => assert!(false, "unexpected error"),
|
||||
}
|
||||
|
||||
drop(read_guard);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_into_inner() {
|
||||
let m = RwLock::new(NonCopy(10));
|
||||
assert_eq!(m.into_inner().unwrap(), NonCopy(10));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_into_inner_drop() {
|
||||
struct Foo(Arc<AtomicUsize>);
|
||||
impl Drop for Foo {
|
||||
fn drop(&mut self) {
|
||||
self.0.fetch_add(1, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
let num_drops = Arc::new(AtomicUsize::new(0));
|
||||
let m = RwLock::new(Foo(num_drops.clone()));
|
||||
assert_eq!(num_drops.load(Ordering::SeqCst), 0);
|
||||
{
|
||||
let _inner = m.into_inner().unwrap();
|
||||
assert_eq!(num_drops.load(Ordering::SeqCst), 0);
|
||||
}
|
||||
assert_eq!(num_drops.load(Ordering::SeqCst), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_into_inner_poison() {
|
||||
let m = Arc::new(RwLock::new(NonCopy(10)));
|
||||
let m2 = m.clone();
|
||||
let _ = thread::spawn(move || {
|
||||
let _lock = m2.write().unwrap();
|
||||
panic!("test panic in inner thread to poison RwLock");
|
||||
})
|
||||
.join();
|
||||
|
||||
assert!(m.is_poisoned());
|
||||
match Arc::try_unwrap(m).unwrap().into_inner() {
|
||||
Err(e) => assert_eq!(e.into_inner(), NonCopy(10)),
|
||||
Ok(x) => panic!("into_inner of poisoned RwLock is Ok: {:?}", x),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_mut() {
|
||||
let mut m = RwLock::new(NonCopy(10));
|
||||
*m.get_mut().unwrap() = NonCopy(20);
|
||||
assert_eq!(m.into_inner().unwrap(), NonCopy(20));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_mut_poison() {
|
||||
let m = Arc::new(RwLock::new(NonCopy(10)));
|
||||
let m2 = m.clone();
|
||||
let _ = thread::spawn(move || {
|
||||
let _lock = m2.write().unwrap();
|
||||
panic!("test panic in inner thread to poison RwLock");
|
||||
})
|
||||
.join();
|
||||
|
||||
assert!(m.is_poisoned());
|
||||
match Arc::try_unwrap(m).unwrap().get_mut() {
|
||||
Err(e) => assert_eq!(*e.into_inner(), NonCopy(10)),
|
||||
Ok(x) => panic!("get_mut of poisoned RwLock is Ok: {:?}", x),
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user