Fix format and tidy for code moved from rayon

This commit is contained in:
Celina G. Val
2025-06-11 11:12:32 -07:00
parent 35c5144394
commit 0b9b1df006
25 changed files with 187 additions and 325 deletions

View File

@@ -1,10 +1,11 @@
use crate::job::{ArcJob, StackJob};
use crate::latch::{CountLatch, LatchRef};
use crate::registry::{Registry, WorkerThread};
use std::fmt;
use std::marker::PhantomData;
use std::sync::Arc;
use crate::job::{ArcJob, StackJob};
use crate::latch::{CountLatch, LatchRef};
use crate::registry::{Registry, WorkerThread};
mod test;
/// Executes `op` within every thread in the current threadpool. If this is
@@ -53,10 +54,7 @@ impl<'a> BroadcastContext<'a> {
pub(super) fn with<R>(f: impl FnOnce(BroadcastContext<'_>) -> R) -> R {
let worker_thread = WorkerThread::current();
assert!(!worker_thread.is_null());
f(BroadcastContext {
worker: unsafe { &*worker_thread },
_marker: PhantomData,
})
f(BroadcastContext { worker: unsafe { &*worker_thread }, _marker: PhantomData })
}
/// Our index amongst the broadcast threads (ranges from `0..self.num_threads()`).
@@ -108,9 +106,8 @@ where
let current_thread = WorkerThread::current().as_ref();
let tlv = crate::tlv::get();
let latch = CountLatch::with_count(n_threads, current_thread);
let jobs: Vec<_> = (0..n_threads)
.map(|_| StackJob::new(tlv, &f, LatchRef::new(&latch)))
.collect();
let jobs: Vec<_> =
(0..n_threads).map(|_| StackJob::new(tlv, &f, LatchRef::new(&latch))).collect();
let job_refs = jobs.iter().map(|job| job.as_job_ref());
registry.inject_broadcast(job_refs);

View File

@@ -1,11 +1,12 @@
#![cfg(test)]
use crate::ThreadPoolBuilder;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::channel;
use std::sync::Arc;
use std::{thread, time};
use crate::ThreadPoolBuilder;
#[test]
fn broadcast_global() {
let v = crate::broadcast(|ctx| ctx.index());

View File

@@ -1,13 +1,14 @@
use crate::latch::Latch;
use crate::tlv;
use crate::tlv::Tlv;
use crate::unwind;
use crossbeam_deque::{Injector, Steal};
use std::any::Any;
use std::cell::UnsafeCell;
use std::mem;
use std::sync::Arc;
use crossbeam_deque::{Injector, Steal};
use crate::latch::Latch;
use crate::tlv::Tlv;
use crate::{tlv, unwind};
pub(super) enum JobResult<T> {
None,
Ok(T),
@@ -29,7 +30,7 @@ pub(super) trait Job {
/// Effectively a Job trait object. Each JobRef **must** be executed
/// exactly once, or else data may leak.
///
/// Internally, we store the job's data in a `*const ()` pointer. The
/// Internally, we store the job's data in a `*const ()` pointer. The
/// true type is something like `*const StackJob<...>`, but we hide
/// it. We also carry the "execute fn" from the `Job` trait.
pub(super) struct JobRef {
@@ -48,10 +49,7 @@ impl JobRef {
T: Job,
{
// erase types:
JobRef {
pointer: data as *const (),
execute_fn: <T as Job>::execute,
}
JobRef { pointer: data as *const (), execute_fn: <T as Job>::execute }
}
/// Returns an opaque handle that can be saved and compared,
@@ -69,7 +67,7 @@ impl JobRef {
/// A job that will be owned by a stack slot. This means that when it
/// executes it need not free any heap data, the cleanup occurs when
/// the stack frame is later popped. The function parameter indicates
/// the stack frame is later popped. The function parameter indicates
/// `true` if the job was stolen -- executed on a different thread.
pub(super) struct StackJob<L, F, R>
where
@@ -248,13 +246,11 @@ pub(super) struct JobFifo {
impl JobFifo {
pub(super) fn new() -> Self {
JobFifo {
inner: Injector::new(),
}
JobFifo { inner: Injector::new() }
}
pub(super) unsafe fn push(&self, job_ref: JobRef) -> JobRef {
// A little indirection ensures that spawns are always prioritized in FIFO order. The
// A little indirection ensures that spawns are always prioritized in FIFO order. The
// jobs in a thread's deque may be popped from the back (LIFO) or stolen from the front
// (FIFO), but either way they will end up popping from the front of this queue.
self.inner.push(job_ref);

View File

@@ -1,11 +1,10 @@
use std::any::Any;
use crate::job::StackJob;
use crate::latch::SpinLatch;
use crate::registry::{self, WorkerThread};
use crate::tlv::{self, Tlv};
use crate::unwind;
use std::any::Any;
use crate::FnContext;
use crate::{FnContext, unwind};
#[cfg(test)]
mod test;
@@ -22,7 +21,7 @@ mod test;
/// it.
///
/// When `join` is called from outside the thread pool, the calling
/// thread will block while the closures execute in the pool. When
/// thread will block while the closures execute in the pool. When
/// `join` is called within the pool, the calling thread still actively
/// participates in the thread pool. It will begin by executing closure
/// A (on the current thread). While it is doing that, it will advertise
@@ -80,13 +79,13 @@ mod test;
/// CPU-bound tasks that do not perform I/O or other blocking
/// operations. If you do perform I/O, and that I/O should block
/// (e.g., waiting for a network request), the overall performance may
/// be poor. Moreover, if you cause one closure to be blocked waiting
/// be poor. Moreover, if you cause one closure to be blocked waiting
/// on another (for example, using a channel), that could lead to a
/// deadlock.
///
/// # Panics
///
/// No matter what happens, both closures will always be executed. If
/// No matter what happens, both closures will always be executed. If
/// a single closure panics, whether it be the first or second
/// closure, that panic will be propagated and hence `join()` will
/// panic with the same panic value. If both closures panic, `join()`
@@ -109,7 +108,7 @@ where
/// Identical to `join`, except that the closures have a parameter
/// that provides context for the way the closure has been called,
/// especially indicating whether they're executing on a different
/// thread than where `join_context` was called. This will occur if
/// thread than where `join_context` was called. This will occur if
/// the second job is stolen by a different thread, or if
/// `join_context` was called from outside the thread pool to begin
/// with.
@@ -148,7 +147,7 @@ where
};
// Now that task A has finished, try to pop job B from the
// local stack. It may already have been popped by job A; it
// local stack. It may already have been popped by job A; it
// may also have been stolen. There may also be some tasks
// pushed on top of it in the stack, and we will have to pop
// those off to get to it.

View File

@@ -1,11 +1,12 @@
//! Tests for the join code.
use super::*;
use crate::ThreadPoolBuilder;
use rand::distr::StandardUniform;
use rand::{Rng, SeedableRng};
use rand_xorshift::XorShiftRng;
use super::*;
use crate::ThreadPoolBuilder;
fn quick_sort<T: PartialOrd + Send>(v: &mut [T]) {
if v.len() <= 1 {
return;

View File

@@ -28,7 +28,7 @@ use crate::registry::{Registry, WorkerThread};
/// - Once `probe()` returns true, all memory effects from the `set()`
/// are visible (in other words, the set should synchronize-with
/// the probe).
/// - Once `set()` occurs, the next `probe()` *will* observe it. This
/// - Once `set()` occurs, the next `probe()` *will* observe it. This
/// typically requires a seq-cst ordering. See [the "tickle-then-get-sleepy" scenario in the sleep
/// README](/src/sleep/README.md#tickle-then-get-sleepy) for details.
pub(super) trait Latch {
@@ -78,9 +78,7 @@ pub(super) struct CoreLatch {
impl CoreLatch {
#[inline]
fn new() -> Self {
Self {
state: AtomicUsize::new(0),
}
Self { state: AtomicUsize::new(0) }
}
/// Invoked by owning thread as it prepares to sleep. Returns true
@@ -88,9 +86,7 @@ impl CoreLatch {
/// latch was set in the meantime.
#[inline]
pub(super) fn get_sleepy(&self) -> bool {
self.state
.compare_exchange(UNSET, SLEEPY, Ordering::SeqCst, Ordering::Relaxed)
.is_ok()
self.state.compare_exchange(UNSET, SLEEPY, Ordering::SeqCst, Ordering::Relaxed).is_ok()
}
/// Invoked by owning thread as it falls asleep sleep. Returns
@@ -98,9 +94,7 @@ impl CoreLatch {
/// was set in the meantime.
#[inline]
pub(super) fn fall_asleep(&self) -> bool {
self.state
.compare_exchange(SLEEPY, SLEEPING, Ordering::SeqCst, Ordering::Relaxed)
.is_ok()
self.state.compare_exchange(SLEEPY, SLEEPING, Ordering::SeqCst, Ordering::Relaxed).is_ok()
}
/// Invoked by owning thread as it falls asleep sleep. Returns
@@ -110,8 +104,7 @@ impl CoreLatch {
pub(super) fn wake_up(&self) {
if !self.probe() {
let _ =
self.state
.compare_exchange(SLEEPING, UNSET, Ordering::SeqCst, Ordering::Relaxed);
self.state.compare_exchange(SLEEPING, UNSET, Ordering::SeqCst, Ordering::Relaxed);
}
}
@@ -166,15 +159,12 @@ impl<'r> SpinLatch<'r> {
}
}
/// Creates a new spin latch for cross-threadpool blocking. Notably, we
/// Creates a new spin latch for cross-threadpool blocking. Notably, we
/// need to make sure the registry is kept alive after setting, so we can
/// safely call the notification.
#[inline]
pub(super) fn cross(thread: &'r WorkerThread) -> SpinLatch<'r> {
SpinLatch {
cross: true,
..SpinLatch::new(thread)
}
SpinLatch { cross: true, ..SpinLatch::new(thread) }
}
#[inline]
@@ -235,10 +225,7 @@ pub(super) struct LockLatch {
impl LockLatch {
#[inline]
pub(super) fn new() -> LockLatch {
LockLatch {
m: Mutex::new(false),
v: Condvar::new(),
}
LockLatch { m: Mutex::new(false), v: Condvar::new() }
}
/// Block until latch is set, then resets this lock latch so it can be reused again.
@@ -288,9 +275,7 @@ pub(super) struct OnceLatch {
impl OnceLatch {
#[inline]
pub(super) fn new() -> OnceLatch {
Self {
core_latch: CoreLatch::new(),
}
Self { core_latch: CoreLatch::new() }
}
/// Set the latch, then tickle the specific worker thread,
@@ -372,9 +357,7 @@ impl CountLatch {
registry: Arc::clone(owner.registry()),
worker_index: owner.index(),
},
None => CountLatchKind::Blocking {
latch: LockLatch::new(),
},
None => CountLatchKind::Blocking { latch: LockLatch::new() },
},
}
}
@@ -387,11 +370,7 @@ impl CountLatch {
pub(super) fn wait(&self, owner: Option<&WorkerThread>) {
match &self.kind {
CountLatchKind::Stealing {
latch,
registry,
worker_index,
} => unsafe {
CountLatchKind::Stealing { latch, registry, worker_index } => unsafe {
let owner = owner.expect("owner thread");
debug_assert_eq!(registry.id(), owner.registry().id());
debug_assert_eq!(*worker_index, owner.index());
@@ -409,11 +388,7 @@ impl Latch for CountLatch {
// NOTE: Once we call `set` on the internal `latch`,
// the target may proceed and invalidate `this`!
match (*this).kind {
CountLatchKind::Stealing {
ref latch,
ref registry,
worker_index,
} => {
CountLatchKind::Stealing { ref latch, ref registry, worker_index } => {
let registry = Arc::clone(registry);
if CoreLatch::set(latch) {
registry.notify_worker_latch_is_set(worker_index);
@@ -433,10 +408,7 @@ pub(super) struct LatchRef<'a, L> {
impl<L> LatchRef<'_, L> {
pub(super) fn new(inner: &L) -> LatchRef<'_, L> {
LatchRef {
inner,
marker: PhantomData,
}
LatchRef { inner, marker: PhantomData }
}
}

View File

@@ -57,20 +57,17 @@
//!
//! While we strive to keep `rayon-core` semver-compatible, it's still
//! possible to arrive at this situation if different crates have overly
//! restrictive tilde or inequality requirements for `rayon-core`. The
//! restrictive tilde or inequality requirements for `rayon-core`. The
//! conflicting requirements will need to be resolved before the build will
//! succeed.
#![warn(rust_2018_idioms)]
use std::any::Any;
use std::env;
use std::error::Error;
use std::fmt;
use std::io;
use std::marker::PhantomData;
use std::str::FromStr;
use std::thread;
use std::{env, fmt, io, thread};
#[macro_use]
mod private;
@@ -92,20 +89,18 @@ mod test;
pub mod tlv;
pub use self::broadcast::{broadcast, spawn_broadcast, BroadcastContext};
pub use self::join::{join, join_context};
pub use self::registry::ThreadBuilder;
pub use self::registry::{mark_blocked, mark_unblocked, Registry};
pub use self::scope::{in_place_scope, scope, Scope};
pub use self::scope::{in_place_scope_fifo, scope_fifo, ScopeFifo};
pub use self::spawn::{spawn, spawn_fifo};
pub use self::thread_pool::current_thread_has_pending_tasks;
pub use self::thread_pool::current_thread_index;
pub use self::thread_pool::ThreadPool;
pub use self::thread_pool::{yield_local, yield_now, Yield};
pub use worker_local::WorkerLocal;
pub use self::broadcast::{BroadcastContext, broadcast, spawn_broadcast};
pub use self::join::{join, join_context};
use self::registry::{CustomSpawn, DefaultSpawn, ThreadSpawn};
pub use self::registry::{Registry, ThreadBuilder, mark_blocked, mark_unblocked};
pub use self::scope::{Scope, ScopeFifo, in_place_scope, in_place_scope_fifo, scope, scope_fifo};
pub use self::spawn::{spawn, spawn_fifo};
pub use self::thread_pool::{
ThreadPool, Yield, current_thread_has_pending_tasks, current_thread_index, yield_local,
yield_now,
};
/// Returns the maximum number of threads that Rayon supports in a single thread-pool.
///
@@ -282,7 +277,7 @@ where
}
/// Initializes the global thread pool. This initialization is
/// **optional**. If you do not call this function, the thread pool
/// **optional**. If you do not call this function, the thread pool
/// will be automatically initialized with the default
/// configuration. Calling `build_global` is not recommended, except
/// in two scenarios:
@@ -290,7 +285,7 @@ where
/// - You wish to change the default configuration.
/// - You are running a benchmark, in which case initializing may
/// yield slightly more consistent results, since the worker threads
/// will already be ready to go even in the first iteration. But
/// will already be ready to go even in the first iteration. But
/// this cost is minimal.
///
/// Initialization of the global thread pool happens exactly
@@ -490,26 +485,16 @@ impl<S> ThreadPoolBuilder<S> {
if self.num_threads > 0 {
self.num_threads
} else {
let default = || {
thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1)
};
let default = || thread::available_parallelism().map(|n| n.get()).unwrap_or(1);
match env::var("RAYON_NUM_THREADS")
.ok()
.and_then(|s| usize::from_str(&s).ok())
{
match env::var("RAYON_NUM_THREADS").ok().and_then(|s| usize::from_str(&s).ok()) {
Some(x @ 1..) => return x,
Some(0) => return default(),
_ => {}
}
// Support for deprecated `RAYON_RS_NUM_CPUS`.
match env::var("RAYON_RS_NUM_CPUS")
.ok()
.and_then(|s| usize::from_str(&s).ok())
{
match env::var("RAYON_RS_NUM_CPUS").ok().and_then(|s| usize::from_str(&s).ok()) {
Some(x @ 1..) => x,
_ => default(),
}
@@ -723,9 +708,7 @@ impl<S> ThreadPoolBuilder<S> {
impl Configuration {
/// Creates and return a valid rayon thread pool configuration, but does not initialize it.
pub fn new() -> Configuration {
Configuration {
builder: ThreadPoolBuilder::new(),
}
Configuration { builder: ThreadPoolBuilder::new() }
}
/// Deprecated in favor of `ThreadPoolBuilder::build`.
@@ -905,10 +888,7 @@ pub struct FnContext {
impl FnContext {
#[inline]
fn new(migrated: bool) -> Self {
FnContext {
migrated,
_marker: PhantomData,
}
FnContext { migrated, _marker: PhantomData }
}
}

View File

@@ -1,5 +1,5 @@
//! The public parts of this private module are used to create traits
//! that cannot be implemented outside of our own crate. This way we
//! that cannot be implemented outside of our own crate. This way we
//! can feel free to extend those traits without worrying about it
//! being a breaking change for other implementations.

View File

@@ -1,23 +1,20 @@
use std::cell::Cell;
use std::collections::hash_map::DefaultHasher;
use std::hash::Hasher;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, Once};
use std::{fmt, io, mem, ptr, thread};
use crossbeam_deque::{Injector, Steal, Stealer, Worker};
use crate::job::{JobFifo, JobRef, StackJob};
use crate::latch::{AsCoreLatch, CoreLatch, Latch, LatchRef, LockLatch, OnceLatch, SpinLatch};
use crate::sleep::Sleep;
use crate::tlv::Tlv;
use crate::unwind;
use crate::{
AcquireThreadHandler, DeadlockHandler, ErrorKind, ExitHandler, PanicHandler,
ReleaseThreadHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder, Yield,
ReleaseThreadHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder, Yield, unwind,
};
use crossbeam_deque::{Injector, Steal, Stealer, Worker};
use std::cell::Cell;
use std::collections::hash_map::DefaultHasher;
use std::fmt;
use std::hash::Hasher;
use std::io;
use std::mem;
use std::ptr;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, Once};
use std::thread;
/// Thread builder used for customization via
/// [`ThreadPoolBuilder::spawn_handler`](struct.ThreadPoolBuilder.html#method.spawn_handler).
@@ -193,9 +190,7 @@ fn set_global_registry<F>(registry: F) -> Result<&'static Arc<Registry>, ThreadP
where
F: FnOnce() -> Result<Arc<Registry>, ThreadPoolBuildError>,
{
let mut result = Err(ThreadPoolBuildError::new(
ErrorKind::GlobalPoolAlreadyInitialized,
));
let mut result = Err(ThreadPoolBuildError::new(ErrorKind::GlobalPoolAlreadyInitialized));
THE_REGISTRY_SET.call_once(|| {
result = registry().map(|registry: Arc<Registry>| {
@@ -222,25 +217,23 @@ fn default_global_registry() -> Result<Arc<Registry>, ThreadPoolBuildError> {
// is stubbed out, and we won't have to change anything if they do add real threading.
let unsupported = matches!(&result, Err(e) if e.is_unsupported());
if unsupported && WorkerThread::current().is_null() {
let builder = ThreadPoolBuilder::new()
.num_threads(1)
.spawn_handler(|thread| {
// Rather than starting a new thread, we're just taking over the current thread
// *without* running the main loop, so we can still return from here.
// The WorkerThread is leaked, but we never shutdown the global pool anyway.
let worker_thread = Box::leak(Box::new(WorkerThread::from(thread)));
let registry = &*worker_thread.registry;
let index = worker_thread.index;
let builder = ThreadPoolBuilder::new().num_threads(1).spawn_handler(|thread| {
// Rather than starting a new thread, we're just taking over the current thread
// *without* running the main loop, so we can still return from here.
// The WorkerThread is leaked, but we never shutdown the global pool anyway.
let worker_thread = Box::leak(Box::new(WorkerThread::from(thread)));
let registry = &*worker_thread.registry;
let index = worker_thread.index;
unsafe {
WorkerThread::set_current(worker_thread);
unsafe {
WorkerThread::set_current(worker_thread);
// let registry know we are ready to do work
Latch::set(&registry.thread_infos[index].primed);
}
// let registry know we are ready to do work
Latch::set(&registry.thread_infos[index].primed);
}
Ok(())
});
Ok(())
});
let fallback_result = Registry::new(builder);
if fallback_result.is_ok() {
@@ -273,11 +266,7 @@ impl Registry {
let (workers, stealers): (Vec<_>, Vec<_>) = (0..n_threads)
.map(|_| {
let worker = if breadth_first {
Worker::new_fifo()
} else {
Worker::new_lifo()
};
let worker = if breadth_first { Worker::new_fifo() } else { Worker::new_lifo() };
let stealer = worker.stealer();
(worker, stealer)
@@ -341,7 +330,7 @@ impl Registry {
}
}
/// Returns the number of threads in the current registry. This
/// Returns the number of threads in the current registry. This
/// is better than `Registry::current().num_threads()` because it
/// avoids incrementing the `Arc`.
pub(super) fn current_num_threads() -> usize {
@@ -359,11 +348,7 @@ impl Registry {
pub(super) fn current_thread(&self) -> Option<&WorkerThread> {
unsafe {
let worker = WorkerThread::current().as_ref()?;
if worker.registry().id() == self.id() {
Some(worker)
} else {
None
}
if worker.registry().id() == self.id() { Some(worker) } else { None }
}
}
@@ -371,9 +356,7 @@ impl Registry {
pub(super) fn id(&self) -> RegistryId {
// We can rely on `self` not to change since we only ever create
// registries that are boxed up in an `Arc` (see `new()` above).
RegistryId {
addr: self as *const Self as usize,
}
RegistryId { addr: self as *const Self as usize }
}
pub(super) fn num_threads(&self) -> usize {
@@ -391,7 +374,7 @@ impl Registry {
}
}
/// Waits for the worker threads to get up and running. This is
/// Waits for the worker threads to get up and running. This is
/// meant to be used for benchmarking purposes, primarily, so that
/// you can get more consistent numbers by having everything
/// "ready to go".
@@ -512,7 +495,7 @@ impl Registry {
/// If already in a worker-thread of this registry, just execute `op`.
/// Otherwise, inject `op` in this thread-pool. Either way, block until `op`
/// completes and return its return value. If `op` panics, that panic will
/// be propagated as well. The second argument indicates `true` if injection
/// be propagated as well. The second argument indicates `true` if injection
/// was performed, `false` if executed directly.
pub(super) fn in_worker<OP, R>(&self, op: OP) -> R
where
@@ -844,9 +827,7 @@ impl WorkerThread {
// The job might have injected local work, so go back to the outer loop.
continue 'outer;
} else {
self.registry
.sleep
.no_work_found(&mut idle_state, latch, &self)
self.registry.sleep.no_work_found(&mut idle_state, latch, &self)
}
}
@@ -880,9 +861,7 @@ impl WorkerThread {
// deques, and finally to injected jobs from the
// outside. The idea is to finish what we started before
// we take on something new.
self.take_local_job()
.or_else(|| self.steal())
.or_else(|| self.registry.pop_injected_job())
self.take_local_job().or_else(|| self.steal()).or_else(|| self.registry.pop_injected_job())
}
pub(super) fn yield_now(&self) -> Yield {
@@ -984,10 +963,10 @@ unsafe fn main_loop(thread: ThreadBuilder) {
registry.release_thread();
}
/// If already in a worker-thread, just execute `op`. Otherwise,
/// If already in a worker-thread, just execute `op`. Otherwise,
/// execute `op` in the default thread-pool. Either way, block until
/// `op` completes and return its return value. If `op` panics, that
/// panic will be propagated as well. The second argument indicates
/// panic will be propagated as well. The second argument indicates
/// `true` if injection was performed, `false` if executed directly.
pub(super) fn in_worker<OP, R>(op: OP) -> R
where
@@ -1026,9 +1005,7 @@ impl XorShift64Star {
seed = hasher.finish();
}
XorShift64Star {
state: Cell::new(seed),
}
XorShift64Star { state: Cell::new(seed) }
}
fn next(&self) -> u64 {

View File

@@ -5,19 +5,19 @@
//! [`in_place_scope()`]: fn.in_place_scope.html
//! [`join()`]: ../join/join.fn.html
use std::any::Any;
use std::marker::PhantomData;
use std::mem::ManuallyDrop;
use std::sync::Arc;
use std::sync::atomic::{AtomicPtr, Ordering};
use std::{fmt, ptr};
use crate::broadcast::BroadcastContext;
use crate::job::{ArcJob, HeapJob, JobFifo, JobRef};
use crate::latch::{CountLatch, Latch};
use crate::registry::{global_registry, in_worker, Registry, WorkerThread};
use crate::registry::{Registry, WorkerThread, global_registry, in_worker};
use crate::tlv::{self, Tlv};
use crate::unwind;
use std::any::Any;
use std::fmt;
use std::marker::PhantomData;
use std::mem::ManuallyDrop;
use std::ptr;
use std::sync::atomic::{AtomicPtr, Ordering};
use std::sync::Arc;
#[cfg(test)]
mod test;
@@ -53,7 +53,7 @@ struct ScopeBase<'scope> {
job_completed_latch: CountLatch,
/// You can think of a scope as containing a list of closures to execute,
/// all of which outlive `'scope`. They're not actually required to be
/// all of which outlive `'scope`. They're not actually required to be
/// `Sync`, but it's still safe to let the `Scope` implement `Sync` because
/// the closures are only *moved* across threads to be executed.
#[allow(clippy::type_complexity)]
@@ -179,9 +179,9 @@ struct ScopeBase<'scope> {
/// they were spawned. So in this example, absent any stealing, we can
/// expect `s.2` to execute before `s.1`, and `t.2` before `t.1`. Other
/// threads always steal from the other end of the deque, like FIFO
/// order. The idea is that "recent" tasks are most likely to be fresh
/// order. The idea is that "recent" tasks are most likely to be fresh
/// in the local CPU's cache, while other threads can steal older
/// "stale" tasks. For an alternate approach, consider
/// "stale" tasks. For an alternate approach, consider
/// [`scope_fifo()`] instead.
///
/// [`scope_fifo()`]: fn.scope_fifo.html
@@ -353,7 +353,7 @@ where
///
/// Under `scope_fifo()`, the spawns are prioritized in a FIFO order on
/// the thread from which they were spawned, as opposed to `scope()`'s
/// LIFO. So in this example, we can expect `s.1` to execute before
/// LIFO. So in this example, we can expect `s.1` to execute before
/// `s.2`, and `t.1` before `t.2`. Other threads also steal tasks in
/// FIFO order, as usual. Overall, this has roughly the same order as
/// the now-deprecated [`breadth_first`] option, except the effect is
@@ -469,7 +469,7 @@ impl<'scope> Scope<'scope> {
}
/// Spawns a job into the fork-join scope `self`. This job will
/// execute sometime before the fork-join scope completes. The
/// execute sometime before the fork-join scope completes. The
/// job is specified as a closure, and this closure receives its
/// own reference to the scope `self` as argument. This can be
/// used to inject new jobs into `self`.
@@ -539,7 +539,7 @@ impl<'scope> Scope<'scope> {
}
/// Spawns a job into every thread of the fork-join scope `self`. This job will
/// execute on each thread sometime before the fork-join scope completes. The
/// execute on each thread sometime before the fork-join scope completes. The
/// job is specified as a closure, and this closure receives its own reference
/// to the scope `self` as argument, as well as a `BroadcastContext`.
pub fn spawn_broadcast<BODY>(&self, body: BODY)
@@ -567,7 +567,7 @@ impl<'scope> ScopeFifo<'scope> {
}
/// Spawns a job into the fork-join scope `self`. This job will
/// execute sometime before the fork-join scope completes. The
/// execute sometime before the fork-join scope completes. The
/// job is specified as a closure, and this closure receives its
/// own reference to the scope `self` as argument. This can be
/// used to inject new jobs into `self`.
@@ -575,7 +575,7 @@ impl<'scope> ScopeFifo<'scope> {
/// # See also
///
/// This method is akin to [`Scope::spawn()`], but with a FIFO
/// priority. The [`scope_fifo` function] has more details about
/// priority. The [`scope_fifo` function] has more details about
/// this distinction.
///
/// [`Scope::spawn()`]: struct.Scope.html#method.spawn
@@ -605,7 +605,7 @@ impl<'scope> ScopeFifo<'scope> {
}
/// Spawns a job into every thread of the fork-join scope `self`. This job will
/// execute on each thread sometime before the fork-join scope completes. The
/// execute on each thread sometime before the fork-join scope completes. The
/// job is specified as a closure, and this closure receives its own reference
/// to the scope `self` as argument, as well as a `BroadcastContext`.
pub fn spawn_broadcast<BODY>(&self, body: BODY)

View File

@@ -1,13 +1,13 @@
use crate::unwind;
use crate::ThreadPoolBuilder;
use crate::{scope, scope_fifo, Scope, ScopeFifo};
use rand::{Rng, SeedableRng};
use rand_xorshift::XorShiftRng;
use std::iter::once;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Barrier, Mutex};
use std::vec;
use rand::{Rng, SeedableRng};
use rand_xorshift::XorShiftRng;
use crate::{Scope, ScopeFifo, ThreadPoolBuilder, scope, scope_fifo, unwind};
#[test]
fn scope_empty() {
scope(|_| {});
@@ -93,10 +93,7 @@ impl<T: Send> Tree<T> {
where
OP: Fn(&mut T) + Sync,
{
let Tree {
ref mut value,
ref mut children,
} = *self;
let Tree { ref mut value, ref mut children } = *self;
scope.spawn(move |scope| {
for child in children {
scope.spawn(move |scope| child.update_in_scope(op, scope));
@@ -124,10 +121,7 @@ fn random_tree1(depth: usize, rng: &mut XorShiftRng) -> Tree<u32> {
.collect()
};
Tree {
value: rng.random_range(0..1_000_000),
children,
}
Tree { value: rng.random_range(0..1_000_000), children }
}
#[test]
@@ -161,11 +155,7 @@ fn linear_stack_growth() {
let diff_when_500 = *max_diff.get_mut().unwrap() as f64;
let ratio = diff_when_5 / diff_when_500;
assert!(
ratio > 0.9 && ratio < 1.1,
"stack usage ratio out of bounds: {}",
ratio
);
assert!(ratio > 0.9 && ratio < 1.1, "stack usage ratio out of bounds: {}", ratio);
});
}
@@ -366,10 +356,7 @@ fn nested_fifo_order() {
fn nested_lifo_fifo_order() {
// LIFO on the outside, FIFO on the inside
let vec = test_nested_order!(scope => spawn, scope_fifo => spawn_fifo);
let expected: Vec<i32> = (0..10)
.rev()
.flat_map(|i| (0..10).map(move |j| i * 10 + j))
.collect();
let expected: Vec<i32> = (0..10).rev().flat_map(|i| (0..10).map(move |j| i * 10 + j)).collect();
assert_eq!(vec, expected);
}
@@ -378,9 +365,7 @@ fn nested_lifo_fifo_order() {
fn nested_fifo_lifo_order() {
// FIFO on the outside, LIFO on the inside
let vec = test_nested_order!(scope_fifo => spawn_fifo, scope => spawn);
let expected: Vec<i32> = (0..10)
.flat_map(|i| (0..10).rev().map(move |j| i * 10 + j))
.collect();
let expected: Vec<i32> = (0..10).flat_map(|i| (0..10).rev().map(move |j| i * 10 + j)).collect();
assert_eq!(vec, expected);
}

View File

@@ -182,7 +182,7 @@ This is possible because the C++ memory model typically offers guarantees of the
form "if you see the access A, then you must see those other accesses" -- but it
doesn't guarantee that you will see the access A (i.e., if you think of
processors with independent caches, you may be operating on very out of date
cache state).
cache state).
## Using seq-cst fences to prevent deadlock

View File

@@ -89,9 +89,7 @@ const ONE_JEC: usize = 1 << JEC_SHIFT;
impl AtomicCounters {
#[inline]
pub(super) fn new() -> AtomicCounters {
AtomicCounters {
value: AtomicUsize::new(0),
}
AtomicCounters { value: AtomicUsize::new(0) }
}
/// Load and return the current value of the various counters.
@@ -230,9 +228,7 @@ impl Counters {
fn increment_jobs_counter(self) -> Counters {
// We can freely add to JEC because it occupies the most significant bits.
// Thus it doesn't overflow into the other counters, just wraps itself.
Counters {
word: self.word.wrapping_add(ONE_JEC),
}
Counters { word: self.word.wrapping_add(ONE_JEC) }
}
#[inline]

View File

@@ -1,14 +1,16 @@
//! Code that decides when workers should go to sleep. See README.md
//! for an overview.
use crate::latch::CoreLatch;
use crate::registry::WorkerThread;
use crate::DeadlockHandler;
use crossbeam_utils::CachePadded;
use std::sync::atomic::Ordering;
use std::sync::{Condvar, Mutex};
use std::thread;
use crossbeam_utils::CachePadded;
use crate::DeadlockHandler;
use crate::latch::CoreLatch;
use crate::registry::WorkerThread;
mod counters;
pub(crate) use self::counters::THREADS_MAX;
use self::counters::{AtomicCounters, JobsEventCounter};
@@ -125,11 +127,7 @@ impl Sleep {
pub(super) fn start_looking(&self, worker_index: usize) -> IdleState {
self.counters.add_inactive_thread();
IdleState {
worker_index,
rounds: 0,
jobs_counter: JobsEventCounter::DUMMY,
}
IdleState { worker_index, rounds: 0, jobs_counter: JobsEventCounter::DUMMY }
}
#[inline]
@@ -165,9 +163,7 @@ impl Sleep {
#[cold]
fn announce_sleepy(&self) -> JobsEventCounter {
self.counters
.increment_jobs_event_counter_if(JobsEventCounter::is_active)
.jobs_counter()
self.counters.increment_jobs_event_counter_if(JobsEventCounter::is_active).jobs_counter()
}
#[cold]
@@ -258,7 +254,7 @@ impl Sleep {
}
/// Notify the given thread that it should wake up (if it is
/// sleeping). When this method is invoked, we typically know the
/// sleeping). When this method is invoked, we typically know the
/// thread is asleep, though in rare cases it could have been
/// awoken by (e.g.) new work having been posted.
pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) {
@@ -307,9 +303,7 @@ impl Sleep {
// Read the counters and -- if sleepy workers have announced themselves
// -- announce that there is now work available. The final value of `counters`
// with which we exit the loop thus corresponds to a state when
let counters = self
.counters
.increment_jobs_event_counter_if(JobsEventCounter::is_sleepy);
let counters = self.counters.increment_jobs_event_counter_if(JobsEventCounter::is_sleepy);
let num_awake_but_idle = counters.awake_but_idle_threads();
let num_sleepers = counters.sleeping_threads();

View File

@@ -1,9 +1,10 @@
use std::mem;
use std::sync::Arc;
use crate::job::*;
use crate::registry::Registry;
use crate::tlv::Tlv;
use crate::unwind;
use std::mem;
use std::sync::Arc;
/// Puts the task into the Rayon threadpool's job queue in the "static"
/// or "global" scope. Just like a standard thread, this task is not
@@ -28,9 +29,9 @@ use std::sync::Arc;
/// other threads may steal tasks at any time. However, they are
/// generally prioritized in a LIFO order on the thread from which
/// they were spawned. Other threads always steal from the other end of
/// the deque, like FIFO order. The idea is that "recent" tasks are
/// the deque, like FIFO order. The idea is that "recent" tasks are
/// most likely to be fresh in the local CPU's cache, while other
/// threads can steal older "stale" tasks. For an alternate approach,
/// threads can steal older "stale" tasks. For an alternate approach,
/// consider [`spawn_fifo()`] instead.
///
/// [`spawn_fifo()`]: fn.spawn_fifo.html
@@ -39,7 +40,7 @@ use std::sync::Arc;
///
/// If this closure should panic, the resulting panic will be
/// propagated to the panic handler registered in the `ThreadPoolBuilder`,
/// if any. See [`ThreadPoolBuilder::panic_handler()`][ph] for more
/// if any. See [`ThreadPoolBuilder::panic_handler()`][ph] for more
/// details.
///
/// [ph]: struct.ThreadPoolBuilder.html#method.panic_handler
@@ -103,7 +104,7 @@ where
}
/// Fires off a task into the Rayon threadpool in the "static" or
/// "global" scope. Just like a standard thread, this task is not
/// "global" scope. Just like a standard thread, this task is not
/// tied to the current stack frame, and hence it cannot hold any
/// references other than those with `'static` lifetime. If you want
/// to spawn a task that references stack data, use [the `scope_fifo()`
@@ -124,7 +125,7 @@ where
///
/// If this closure should panic, the resulting panic will be
/// propagated to the panic handler registered in the `ThreadPoolBuilder`,
/// if any. See [`ThreadPoolBuilder::panic_handler()`][ph] for more
/// if any. See [`ThreadPoolBuilder::panic_handler()`][ph] for more
/// details.
///
/// [ph]: struct.ThreadPoolBuilder.html#method.panic_handler
@@ -152,7 +153,7 @@ where
let job_ref = spawn_job(func, registry);
// If we're in the pool, use our thread's private fifo for this thread to execute
// in a locally-FIFO order. Otherwise, just use the pool's global injector.
// in a locally-FIFO order. Otherwise, just use the pool's global injector.
match registry.current_thread() {
Some(worker) => worker.push_fifo(job_ref),
None => registry.inject(job_ref),

View File

@@ -1,10 +1,9 @@
use crate::scope;
use std::any::Any;
use std::sync::mpsc::channel;
use std::sync::Mutex;
use std::sync::mpsc::channel;
use super::{spawn, spawn_fifo};
use crate::ThreadPoolBuilder;
use crate::{ThreadPoolBuilder, scope};
#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
@@ -45,10 +44,7 @@ fn panic_fwd() {
let builder = ThreadPoolBuilder::new().panic_handler(panic_handler);
builder
.build()
.unwrap()
.spawn(move || panic!("Hello, world!"));
builder.build().unwrap().spawn(move || panic!("Hello, world!"));
assert_eq!(1, rx.recv().unwrap());
}
@@ -193,10 +189,7 @@ fn fifo_order() {
fn lifo_fifo_order() {
// LIFO on the outside, FIFO on the inside
let vec = test_order!(spawn, spawn_fifo);
let expected: Vec<i32> = (0..10)
.rev()
.flat_map(|i| (0..10).map(move |j| i * 10 + j))
.collect();
let expected: Vec<i32> = (0..10).rev().flat_map(|i| (0..10).map(move |j| i * 10 + j)).collect();
assert_eq!(vec, expected);
}
@@ -205,9 +198,7 @@ fn lifo_fifo_order() {
fn fifo_lifo_order() {
// FIFO on the outside, LIFO on the inside
let vec = test_order!(spawn_fifo, spawn);
let expected: Vec<i32> = (0..10)
.flat_map(|i| (0..10).rev().map(move |j| i * 10 + j))
.collect();
let expected: Vec<i32> = (0..10).flat_map(|i| (0..10).rev().map(move |j| i * 10 + j)).collect();
assert_eq!(vec, expected);
}

View File

@@ -1,9 +1,10 @@
#![cfg(test)]
use crate::{ThreadPoolBuildError, ThreadPoolBuilder};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Barrier};
use crate::{ThreadPoolBuildError, ThreadPoolBuilder};
#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn worker_thread_index() {
@@ -29,9 +30,7 @@ fn start_callback_called() {
b.wait();
};
let conf = ThreadPoolBuilder::new()
.num_threads(n_threads)
.start_handler(start_handler);
let conf = ThreadPoolBuilder::new().num_threads(n_threads).start_handler(start_handler);
let _ = conf.build().unwrap();
// Wait for all the threads to have been scheduled to run.
@@ -56,9 +55,7 @@ fn exit_callback_called() {
b.wait();
};
let conf = ThreadPoolBuilder::new()
.num_threads(n_threads)
.exit_handler(exit_handler);
let conf = ThreadPoolBuilder::new().num_threads(n_threads).exit_handler(exit_handler);
{
let _ = conf.build().unwrap();
// Drop the pool so it stops the running threads.

View File

@@ -3,18 +3,17 @@
//!
//! [`ThreadPool`]: struct.ThreadPool.html
use crate::broadcast::{self, BroadcastContext};
use crate::join;
use crate::registry::{Registry, ThreadSpawn, WorkerThread};
use crate::scope::{do_in_place_scope, do_in_place_scope_fifo};
use crate::spawn;
use crate::{scope, Scope};
use crate::{scope_fifo, ScopeFifo};
use crate::{ThreadPoolBuildError, ThreadPoolBuilder};
use std::error::Error;
use std::fmt;
use std::sync::Arc;
use crate::broadcast::{self, BroadcastContext};
use crate::registry::{Registry, ThreadSpawn, WorkerThread};
use crate::scope::{do_in_place_scope, do_in_place_scope_fifo};
use crate::{
Scope, ScopeFifo, ThreadPoolBuildError, ThreadPoolBuilder, join, scope, scope_fifo, spawn,
};
mod test;
/// Represents a user created [thread-pool].

View File

@@ -4,7 +4,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, Mutex};
use crate::{join, Scope, ScopeFifo, ThreadPool, ThreadPoolBuilder};
use crate::{Scope, ScopeFifo, ThreadPool, ThreadPoolBuilder, join};
#[test]
#[should_panic(expected = "Hello, world!")]
@@ -296,9 +296,8 @@ fn nested_scopes() {
}
}
let pools: Vec<_> = (0..10)
.map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap())
.collect();
let pools: Vec<_> =
(0..10).map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap()).collect();
let counter = AtomicUsize::new(0);
nest(&pools, vec![], |scopes| {
@@ -333,9 +332,8 @@ fn nested_fifo_scopes() {
}
}
let pools: Vec<_> = (0..10)
.map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap())
.collect();
let pools: Vec<_> =
(0..10).map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap()).collect();
let counter = AtomicUsize::new(0);
nest(&pools, vec![], |scopes| {

View File

@@ -1,7 +1,8 @@
//! Allows access to the Rayon's thread local value
//! which is preserved when moving jobs across threads
use std::{cell::Cell, ptr};
use std::cell::Cell;
use std::ptr;
thread_local!(pub static TLV: Cell<*const ()> = const { Cell::new(ptr::null()) });

View File

@@ -1,8 +1,9 @@
use crate::registry::{Registry, WorkerThread};
use std::fmt;
use std::ops::Deref;
use std::sync::Arc;
use crate::registry::{Registry, WorkerThread};
#[repr(align(64))]
#[derive(Debug)]
struct CacheAligned<T>(T);
@@ -27,9 +28,7 @@ impl<T> WorkerLocal<T> {
pub fn new<F: FnMut(usize) -> T>(mut initial: F) -> WorkerLocal<T> {
let registry = Registry::current();
WorkerLocal {
locals: (0..registry.num_threads())
.map(|i| CacheAligned(initial(i)))
.collect(),
locals: (0..registry.num_threads()).map(|i| CacheAligned(initial(i))).collect(),
registry,
}
}
@@ -62,9 +61,7 @@ impl<T> WorkerLocal<Vec<T>> {
impl<T: fmt::Debug> fmt::Debug for WorkerLocal<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WorkerLocal")
.field("registry", &self.registry.id())
.finish()
f.debug_struct("WorkerLocal").field("registry", &self.registry.id()).finish()
}
}