Only work-steal in the main loop for rustc_thread_pool
Co-authored-by: Zoxc <zoxc32@gmail.com>
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
use std::fmt;
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
|
||||
use crate::job::{ArcJob, StackJob};
|
||||
use crate::latch::{CountLatch, LatchRef};
|
||||
@@ -97,13 +98,22 @@ where
|
||||
OP: Fn(BroadcastContext<'_>) -> R + Sync,
|
||||
R: Send,
|
||||
{
|
||||
let current_thread = WorkerThread::current();
|
||||
let current_thread_addr = current_thread.expose_provenance();
|
||||
let started = &AtomicBool::new(false);
|
||||
let f = move |injected: bool| {
|
||||
debug_assert!(injected);
|
||||
|
||||
// Mark as started if we are the thread that initiated that broadcast.
|
||||
if current_thread_addr == WorkerThread::current().expose_provenance() {
|
||||
started.store(true, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
BroadcastContext::with(&op)
|
||||
};
|
||||
|
||||
let n_threads = registry.num_threads();
|
||||
let current_thread = unsafe { WorkerThread::current().as_ref() };
|
||||
let current_thread = unsafe { current_thread.as_ref() };
|
||||
let tlv = crate::tlv::get();
|
||||
let latch = CountLatch::with_count(n_threads, current_thread);
|
||||
let jobs: Vec<_> =
|
||||
@@ -112,8 +122,16 @@ where
|
||||
|
||||
registry.inject_broadcast(job_refs);
|
||||
|
||||
let current_thread_job_id = current_thread
|
||||
.and_then(|worker| (registry.id() == worker.registry.id()).then(|| worker))
|
||||
.map(|worker| unsafe { jobs[worker.index()].as_job_ref() }.id());
|
||||
|
||||
// Wait for all jobs to complete, then collect the results, maybe propagating a panic.
|
||||
latch.wait(current_thread);
|
||||
latch.wait(
|
||||
current_thread,
|
||||
|| started.load(Ordering::Relaxed),
|
||||
|job| Some(job.id()) == current_thread_job_id,
|
||||
);
|
||||
jobs.into_iter().map(|job| unsafe { job.into_result() }).collect()
|
||||
}
|
||||
|
||||
@@ -129,7 +147,7 @@ where
|
||||
{
|
||||
let job = ArcJob::new({
|
||||
let registry = Arc::clone(registry);
|
||||
move || {
|
||||
move |_| {
|
||||
registry.catch_unwind(|| BroadcastContext::with(&op));
|
||||
registry.terminate(); // (*) permit registry to terminate now
|
||||
}
|
||||
|
||||
@@ -65,6 +65,7 @@ fn spawn_broadcast_self() {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
|
||||
fn broadcast_mutual() {
|
||||
let count = AtomicUsize::new(0);
|
||||
@@ -99,6 +100,7 @@ fn spawn_broadcast_mutual() {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
|
||||
fn broadcast_mutual_sleepy() {
|
||||
let count = AtomicUsize::new(0);
|
||||
|
||||
@@ -27,6 +27,11 @@ pub(super) trait Job {
|
||||
unsafe fn execute(this: *const ());
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
|
||||
pub(super) struct JobRefId {
|
||||
pointer: usize,
|
||||
}
|
||||
|
||||
/// Effectively a Job trait object. Each JobRef **must** be executed
|
||||
/// exactly once, or else data may leak.
|
||||
///
|
||||
@@ -52,11 +57,9 @@ impl JobRef {
|
||||
JobRef { pointer: data as *const (), execute_fn: <T as Job>::execute }
|
||||
}
|
||||
|
||||
/// Returns an opaque handle that can be saved and compared,
|
||||
/// without making `JobRef` itself `Copy + Eq`.
|
||||
#[inline]
|
||||
pub(super) fn id(&self) -> impl Eq {
|
||||
(self.pointer, self.execute_fn)
|
||||
pub(super) fn id(&self) -> JobRefId {
|
||||
JobRefId { pointer: self.pointer.expose_provenance() }
|
||||
}
|
||||
|
||||
#[inline]
|
||||
@@ -100,8 +103,15 @@ where
|
||||
unsafe { JobRef::new(self) }
|
||||
}
|
||||
|
||||
pub(super) unsafe fn run_inline(self, stolen: bool) -> R {
|
||||
self.func.into_inner().unwrap()(stolen)
|
||||
pub(super) unsafe fn run_inline(&self, stolen: bool) {
|
||||
unsafe {
|
||||
let func = (*self.func.get()).take().unwrap();
|
||||
*(self.result.get()) = match unwind::halt_unwinding(|| func(stolen)) {
|
||||
Ok(x) => JobResult::Ok(x),
|
||||
Err(x) => JobResult::Panic(x),
|
||||
};
|
||||
Latch::set(&self.latch);
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) unsafe fn into_result(self) -> R {
|
||||
@@ -138,7 +148,7 @@ where
|
||||
/// (Probably `StackJob` should be refactored in a similar fashion.)
|
||||
pub(super) struct HeapJob<BODY>
|
||||
where
|
||||
BODY: FnOnce() + Send,
|
||||
BODY: FnOnce(JobRefId) + Send,
|
||||
{
|
||||
job: BODY,
|
||||
tlv: Tlv,
|
||||
@@ -146,7 +156,7 @@ where
|
||||
|
||||
impl<BODY> HeapJob<BODY>
|
||||
where
|
||||
BODY: FnOnce() + Send,
|
||||
BODY: FnOnce(JobRefId) + Send,
|
||||
{
|
||||
pub(super) fn new(tlv: Tlv, job: BODY) -> Box<Self> {
|
||||
Box::new(HeapJob { job, tlv })
|
||||
@@ -170,12 +180,13 @@ where
|
||||
|
||||
impl<BODY> Job for HeapJob<BODY>
|
||||
where
|
||||
BODY: FnOnce() + Send,
|
||||
BODY: FnOnce(JobRefId) + Send,
|
||||
{
|
||||
unsafe fn execute(this: *const ()) {
|
||||
let pointer = this.expose_provenance();
|
||||
let this = unsafe { Box::from_raw(this as *mut Self) };
|
||||
tlv::set(this.tlv);
|
||||
(this.job)();
|
||||
(this.job)(JobRefId { pointer });
|
||||
}
|
||||
}
|
||||
|
||||
@@ -183,14 +194,14 @@ where
|
||||
/// be turned into multiple `JobRef`s and called multiple times.
|
||||
pub(super) struct ArcJob<BODY>
|
||||
where
|
||||
BODY: Fn() + Send + Sync,
|
||||
BODY: Fn(JobRefId) + Send + Sync,
|
||||
{
|
||||
job: BODY,
|
||||
}
|
||||
|
||||
impl<BODY> ArcJob<BODY>
|
||||
where
|
||||
BODY: Fn() + Send + Sync,
|
||||
BODY: Fn(JobRefId) + Send + Sync,
|
||||
{
|
||||
pub(super) fn new(job: BODY) -> Arc<Self> {
|
||||
Arc::new(ArcJob { job })
|
||||
@@ -214,11 +225,12 @@ where
|
||||
|
||||
impl<BODY> Job for ArcJob<BODY>
|
||||
where
|
||||
BODY: Fn() + Send + Sync,
|
||||
BODY: Fn(JobRefId) + Send + Sync,
|
||||
{
|
||||
unsafe fn execute(this: *const ()) {
|
||||
let pointer = this.expose_provenance();
|
||||
let this = unsafe { Arc::from_raw(this as *mut Self) };
|
||||
(this.job)();
|
||||
(this.job)(JobRefId { pointer });
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,10 +1,8 @@
|
||||
use std::any::Any;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
|
||||
use crate::job::StackJob;
|
||||
use crate::latch::SpinLatch;
|
||||
use crate::registry::{self, WorkerThread};
|
||||
use crate::tlv::{self, Tlv};
|
||||
use crate::{FnContext, unwind};
|
||||
use crate::{FnContext, registry, tlv, unwind};
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
@@ -134,68 +132,38 @@ where
|
||||
// Create virtual wrapper for task b; this all has to be
|
||||
// done here so that the stack frame can keep it all live
|
||||
// long enough.
|
||||
let job_b = StackJob::new(tlv, call_b(oper_b), SpinLatch::new(worker_thread));
|
||||
let job_b_started = AtomicBool::new(false);
|
||||
let job_b = StackJob::new(
|
||||
tlv,
|
||||
|migrated| {
|
||||
job_b_started.store(true, Ordering::Relaxed);
|
||||
call_b(oper_b)(migrated)
|
||||
},
|
||||
SpinLatch::new(worker_thread),
|
||||
);
|
||||
let job_b_ref = job_b.as_job_ref();
|
||||
let job_b_id = job_b_ref.id();
|
||||
worker_thread.push(job_b_ref);
|
||||
|
||||
// Execute task a; hopefully b gets stolen in the meantime.
|
||||
let status_a = unwind::halt_unwinding(call_a(oper_a, injected));
|
||||
let result_a = match status_a {
|
||||
Ok(v) => v,
|
||||
Err(err) => join_recover_from_panic(worker_thread, &job_b.latch, err, tlv),
|
||||
};
|
||||
|
||||
// Now that task A has finished, try to pop job B from the
|
||||
// local stack. It may already have been popped by job A; it
|
||||
// may also have been stolen. There may also be some tasks
|
||||
// pushed on top of it in the stack, and we will have to pop
|
||||
// those off to get to it.
|
||||
while !job_b.latch.probe() {
|
||||
if let Some(job) = worker_thread.take_local_job() {
|
||||
if job_b_id == job.id() {
|
||||
// Found it! Let's run it.
|
||||
//
|
||||
// Note that this could panic, but it's ok if we unwind here.
|
||||
|
||||
// Restore the TLV since we might have run some jobs overwriting it when waiting for job b.
|
||||
tlv::set(tlv);
|
||||
|
||||
let result_b = job_b.run_inline(injected);
|
||||
return (result_a, result_b);
|
||||
} else {
|
||||
worker_thread.execute(job);
|
||||
}
|
||||
} else {
|
||||
// Local deque is empty. Time to steal from other
|
||||
// threads.
|
||||
worker_thread.wait_until(&job_b.latch);
|
||||
debug_assert!(job_b.latch.probe());
|
||||
break;
|
||||
}
|
||||
}
|
||||
worker_thread.wait_for_jobs::<_, false>(
|
||||
&job_b.latch,
|
||||
|| job_b_started.load(Ordering::Relaxed),
|
||||
|job| job.id() == job_b_id,
|
||||
|job| {
|
||||
debug_assert_eq!(job.id(), job_b_id);
|
||||
job_b.run_inline(injected);
|
||||
},
|
||||
);
|
||||
|
||||
// Restore the TLV since we might have run some jobs overwriting it when waiting for job b.
|
||||
tlv::set(tlv);
|
||||
|
||||
let result_a = match status_a {
|
||||
Ok(v) => v,
|
||||
Err(err) => unwind::resume_unwinding(err),
|
||||
};
|
||||
(result_a, job_b.into_result())
|
||||
})
|
||||
}
|
||||
|
||||
/// If job A panics, we still cannot return until we are sure that job
|
||||
/// B is complete. This is because it may contain references into the
|
||||
/// enclosing stack frame(s).
|
||||
#[cold] // cold path
|
||||
unsafe fn join_recover_from_panic(
|
||||
worker_thread: &WorkerThread,
|
||||
job_b_latch: &SpinLatch<'_>,
|
||||
err: Box<dyn Any + Send>,
|
||||
tlv: Tlv,
|
||||
) -> ! {
|
||||
unsafe { worker_thread.wait_until(job_b_latch) };
|
||||
|
||||
// Restore the TLV since we might have run some jobs overwriting it when waiting for job b.
|
||||
tlv::set(tlv);
|
||||
|
||||
unwind::resume_unwinding(err)
|
||||
}
|
||||
|
||||
@@ -97,6 +97,7 @@ fn join_context_both() {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
|
||||
fn join_context_neither() {
|
||||
// If we're already in a 1-thread pool, neither job should be stolen.
|
||||
|
||||
@@ -3,6 +3,7 @@ use std::ops::Deref;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Condvar, Mutex};
|
||||
|
||||
use crate::job::JobRef;
|
||||
use crate::registry::{Registry, WorkerThread};
|
||||
|
||||
/// We define various kinds of latches, which are all a primitive signaling
|
||||
@@ -166,11 +167,6 @@ impl<'r> SpinLatch<'r> {
|
||||
pub(super) fn cross(thread: &'r WorkerThread) -> SpinLatch<'r> {
|
||||
SpinLatch { cross: true, ..SpinLatch::new(thread) }
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(super) fn probe(&self) -> bool {
|
||||
self.core_latch.probe()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'r> AsCoreLatch for SpinLatch<'r> {
|
||||
@@ -368,13 +364,20 @@ impl CountLatch {
|
||||
debug_assert!(old_counter != 0);
|
||||
}
|
||||
|
||||
pub(super) fn wait(&self, owner: Option<&WorkerThread>) {
|
||||
pub(super) fn wait(
|
||||
&self,
|
||||
owner: Option<&WorkerThread>,
|
||||
all_jobs_started: impl FnMut() -> bool,
|
||||
is_job: impl FnMut(&JobRef) -> bool,
|
||||
) {
|
||||
match &self.kind {
|
||||
CountLatchKind::Stealing { latch, registry, worker_index } => unsafe {
|
||||
let owner = owner.expect("owner thread");
|
||||
debug_assert_eq!(registry.id(), owner.registry().id());
|
||||
debug_assert_eq!(*worker_index, owner.index());
|
||||
owner.wait_until(latch);
|
||||
owner.wait_for_jobs::<_, true>(latch, all_jobs_started, is_job, |job| {
|
||||
owner.execute(job);
|
||||
});
|
||||
},
|
||||
CountLatchKind::Blocking { latch } => latch.wait(),
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ use std::sync::{Arc, Mutex, Once};
|
||||
use std::{fmt, io, mem, ptr, thread};
|
||||
|
||||
use crossbeam_deque::{Injector, Steal, Stealer, Worker};
|
||||
use smallvec::SmallVec;
|
||||
|
||||
use crate::job::{JobFifo, JobRef, StackJob};
|
||||
use crate::latch::{AsCoreLatch, CoreLatch, Latch, LatchRef, LockLatch, OnceLatch, SpinLatch};
|
||||
@@ -796,14 +797,81 @@ impl WorkerThread {
|
||||
/// stealing tasks as necessary.
|
||||
#[inline]
|
||||
pub(super) unsafe fn wait_until<L: AsCoreLatch + ?Sized>(&self, latch: &L) {
|
||||
unsafe { self.wait_or_steal_until(latch, false) };
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(super) unsafe fn wait_for_jobs<L: AsCoreLatch + ?Sized, const BROADCAST_JOBS: bool>(
|
||||
&self,
|
||||
latch: &L,
|
||||
mut all_jobs_started: impl FnMut() -> bool,
|
||||
mut is_job: impl FnMut(&JobRef) -> bool,
|
||||
mut execute_job: impl FnMut(JobRef) -> (),
|
||||
) {
|
||||
let mut jobs = SmallVec::<[JobRef; 8]>::new();
|
||||
let mut broadcast_jobs = SmallVec::<[JobRef; 8]>::new();
|
||||
|
||||
while !all_jobs_started() {
|
||||
if let Some(job) = self.worker.pop() {
|
||||
if is_job(&job) {
|
||||
execute_job(job);
|
||||
} else {
|
||||
jobs.push(job);
|
||||
}
|
||||
} else {
|
||||
if BROADCAST_JOBS {
|
||||
let broadcast_job = loop {
|
||||
match self.stealer.steal() {
|
||||
Steal::Success(job) => break Some(job),
|
||||
Steal::Empty => break None,
|
||||
Steal::Retry => continue,
|
||||
}
|
||||
};
|
||||
if let Some(job) = broadcast_job {
|
||||
if is_job(&job) {
|
||||
execute_job(job);
|
||||
} else {
|
||||
broadcast_jobs.push(job);
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Restore the jobs that we weren't looking for.
|
||||
for job in jobs {
|
||||
self.worker.push(job);
|
||||
}
|
||||
if BROADCAST_JOBS {
|
||||
let broadcasts = self.registry.broadcasts.lock().unwrap();
|
||||
for job in broadcast_jobs {
|
||||
broadcasts[self.index].push(job);
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for the jobs to finish.
|
||||
unsafe { self.wait_until(latch) };
|
||||
debug_assert!(latch.as_core_latch().probe());
|
||||
}
|
||||
|
||||
pub(super) unsafe fn wait_or_steal_until<L: AsCoreLatch + ?Sized>(
|
||||
&self,
|
||||
latch: &L,
|
||||
steal: bool,
|
||||
) {
|
||||
let latch = latch.as_core_latch();
|
||||
if !latch.probe() {
|
||||
unsafe { self.wait_until_cold(latch) };
|
||||
if steal {
|
||||
unsafe { self.wait_or_steal_until_cold(latch) };
|
||||
} else {
|
||||
unsafe { self.wait_until_cold(latch) };
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cold]
|
||||
unsafe fn wait_until_cold(&self, latch: &CoreLatch) {
|
||||
unsafe fn wait_or_steal_until_cold(&self, latch: &CoreLatch) {
|
||||
// the code below should swallow all panics and hence never
|
||||
// unwind; but if something does wrong, we want to abort,
|
||||
// because otherwise other code in rayon may assume that the
|
||||
@@ -827,7 +895,7 @@ impl WorkerThread {
|
||||
// The job might have injected local work, so go back to the outer loop.
|
||||
continue 'outer;
|
||||
} else {
|
||||
self.registry.sleep.no_work_found(&mut idle_state, latch, &self)
|
||||
self.registry.sleep.no_work_found(&mut idle_state, latch, &self, true)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -840,13 +908,34 @@ impl WorkerThread {
|
||||
mem::forget(abort_guard); // successful execution, do not abort
|
||||
}
|
||||
|
||||
#[cold]
|
||||
unsafe fn wait_until_cold(&self, latch: &CoreLatch) {
|
||||
// the code below should swallow all panics and hence never
|
||||
// unwind; but if something does wrong, we want to abort,
|
||||
// because otherwise other code in rayon may assume that the
|
||||
// latch has been signaled, and that can lead to random memory
|
||||
// accesses, which would be *very bad*
|
||||
let abort_guard = unwind::AbortIfPanic;
|
||||
|
||||
let mut idle_state = self.registry.sleep.start_looking(self.index);
|
||||
while !latch.probe() {
|
||||
self.registry.sleep.no_work_found(&mut idle_state, latch, &self, false);
|
||||
}
|
||||
|
||||
// If we were sleepy, we are not anymore. We "found work" --
|
||||
// whatever the surrounding thread was doing before it had to wait.
|
||||
self.registry.sleep.work_found();
|
||||
|
||||
mem::forget(abort_guard); // successful execution, do not abort
|
||||
}
|
||||
|
||||
unsafe fn wait_until_out_of_work(&self) {
|
||||
debug_assert_eq!(self as *const _, WorkerThread::current());
|
||||
let registry = &*self.registry;
|
||||
let index = self.index;
|
||||
|
||||
registry.acquire_thread();
|
||||
unsafe { self.wait_until(®istry.thread_infos[index].terminate) };
|
||||
unsafe { self.wait_or_steal_until(®istry.thread_infos[index].terminate, true) };
|
||||
|
||||
// Should not be any work left in our queue.
|
||||
debug_assert!(self.take_local_job().is_none());
|
||||
|
||||
@@ -8,12 +8,14 @@
|
||||
use std::any::Any;
|
||||
use std::marker::PhantomData;
|
||||
use std::mem::ManuallyDrop;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicPtr, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::{fmt, ptr};
|
||||
|
||||
use indexmap::IndexSet;
|
||||
|
||||
use crate::broadcast::BroadcastContext;
|
||||
use crate::job::{ArcJob, HeapJob, JobFifo, JobRef};
|
||||
use crate::job::{ArcJob, HeapJob, JobFifo, JobRef, JobRefId};
|
||||
use crate::latch::{CountLatch, Latch};
|
||||
use crate::registry::{Registry, WorkerThread, global_registry, in_worker};
|
||||
use crate::tlv::{self, Tlv};
|
||||
@@ -52,6 +54,12 @@ struct ScopeBase<'scope> {
|
||||
/// latch to track job counts
|
||||
job_completed_latch: CountLatch,
|
||||
|
||||
/// Jobs that have been spawned, but not yet started.
|
||||
pending_jobs: Mutex<IndexSet<JobRefId>>,
|
||||
|
||||
/// The worker which will wait on scope completion, if any.
|
||||
worker: Option<usize>,
|
||||
|
||||
/// You can think of a scope as containing a list of closures to execute,
|
||||
/// all of which outlive `'scope`. They're not actually required to be
|
||||
/// `Sync`, but it's still safe to let the `Scope` implement `Sync` because
|
||||
@@ -525,13 +533,19 @@ impl<'scope> Scope<'scope> {
|
||||
BODY: FnOnce(&Scope<'scope>) + Send + 'scope,
|
||||
{
|
||||
let scope_ptr = ScopePtr(self);
|
||||
let job = HeapJob::new(self.base.tlv, move || unsafe {
|
||||
let job = HeapJob::new(self.base.tlv, move |id| unsafe {
|
||||
// SAFETY: this job will execute before the scope ends.
|
||||
let scope = scope_ptr.as_ref();
|
||||
|
||||
// Mark this job is started.
|
||||
scope.base.pending_jobs.lock().unwrap().swap_remove_full(&id);
|
||||
|
||||
ScopeBase::execute_job(&scope.base, move || body(scope))
|
||||
});
|
||||
let job_ref = self.base.heap_job_ref(job);
|
||||
|
||||
// Mark this job as pending.
|
||||
self.base.pending_jobs.lock().unwrap().insert(job_ref.id());
|
||||
// Since `Scope` implements `Sync`, we can't be sure that we're still in a
|
||||
// thread of this pool, so we can't just push to the local worker thread.
|
||||
// Also, this might be an in-place scope.
|
||||
@@ -547,10 +561,17 @@ impl<'scope> Scope<'scope> {
|
||||
BODY: Fn(&Scope<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope,
|
||||
{
|
||||
let scope_ptr = ScopePtr(self);
|
||||
let job = ArcJob::new(move || unsafe {
|
||||
let job = ArcJob::new(move |id| unsafe {
|
||||
// SAFETY: this job will execute before the scope ends.
|
||||
let scope = scope_ptr.as_ref();
|
||||
let body = &body;
|
||||
|
||||
let current_index = WorkerThread::current().as_ref().map(|worker| worker.index());
|
||||
if current_index == scope.base.worker {
|
||||
// Mark this job as started on the scope's worker thread.
|
||||
scope.base.pending_jobs.lock().unwrap().swap_remove(&id);
|
||||
}
|
||||
|
||||
let func = move || BroadcastContext::with(move |ctx| body(scope, ctx));
|
||||
ScopeBase::execute_job(&scope.base, func)
|
||||
});
|
||||
@@ -585,23 +606,24 @@ impl<'scope> ScopeFifo<'scope> {
|
||||
BODY: FnOnce(&ScopeFifo<'scope>) + Send + 'scope,
|
||||
{
|
||||
let scope_ptr = ScopePtr(self);
|
||||
let job = HeapJob::new(self.base.tlv, move || unsafe {
|
||||
let job = HeapJob::new(self.base.tlv, move |id| unsafe {
|
||||
// SAFETY: this job will execute before the scope ends.
|
||||
let scope = scope_ptr.as_ref();
|
||||
|
||||
// Mark this job is started.
|
||||
scope.base.pending_jobs.lock().unwrap().swap_remove(&id);
|
||||
|
||||
ScopeBase::execute_job(&scope.base, move || body(scope))
|
||||
});
|
||||
let job_ref = self.base.heap_job_ref(job);
|
||||
|
||||
// If we're in the pool, use our scope's private fifo for this thread to execute
|
||||
// in a locally-FIFO order. Otherwise, just use the pool's global injector.
|
||||
match self.base.registry.current_thread() {
|
||||
Some(worker) => {
|
||||
let fifo = &self.fifos[worker.index()];
|
||||
// SAFETY: this job will execute before the scope ends.
|
||||
unsafe { worker.push(fifo.push(job_ref)) };
|
||||
}
|
||||
None => self.base.registry.inject(job_ref),
|
||||
}
|
||||
// Mark this job as pending.
|
||||
self.base.pending_jobs.lock().unwrap().insert(job_ref.id());
|
||||
|
||||
// Since `ScopeFifo` implements `Sync`, we can't be sure that we're still in a
|
||||
// thread of this pool, so we can't just push to the local worker thread.
|
||||
// Also, this might be an in-place scope.
|
||||
self.base.registry.inject_or_push(job_ref);
|
||||
}
|
||||
|
||||
/// Spawns a job into every thread of the fork-join scope `self`. This job will
|
||||
@@ -613,9 +635,15 @@ impl<'scope> ScopeFifo<'scope> {
|
||||
BODY: Fn(&ScopeFifo<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope,
|
||||
{
|
||||
let scope_ptr = ScopePtr(self);
|
||||
let job = ArcJob::new(move || unsafe {
|
||||
let job = ArcJob::new(move |id| unsafe {
|
||||
// SAFETY: this job will execute before the scope ends.
|
||||
let scope = scope_ptr.as_ref();
|
||||
|
||||
let current_index = WorkerThread::current().as_ref().map(|worker| worker.index());
|
||||
if current_index == scope.base.worker {
|
||||
// Mark this job as started on the scope's worker thread.
|
||||
scope.base.pending_jobs.lock().unwrap().swap_remove(&id);
|
||||
}
|
||||
let body = &body;
|
||||
let func = move || BroadcastContext::with(move |ctx| body(scope, ctx));
|
||||
ScopeBase::execute_job(&scope.base, func)
|
||||
@@ -636,6 +664,8 @@ impl<'scope> ScopeBase<'scope> {
|
||||
registry: Arc::clone(registry),
|
||||
panic: AtomicPtr::new(ptr::null_mut()),
|
||||
job_completed_latch: CountLatch::new(owner),
|
||||
pending_jobs: Mutex::new(IndexSet::new()),
|
||||
worker: owner.map(|w| w.index()),
|
||||
marker: PhantomData,
|
||||
tlv: tlv::get(),
|
||||
}
|
||||
@@ -643,7 +673,7 @@ impl<'scope> ScopeBase<'scope> {
|
||||
|
||||
fn heap_job_ref<FUNC>(&self, job: Box<HeapJob<FUNC>>) -> JobRef
|
||||
where
|
||||
FUNC: FnOnce() + Send + 'scope,
|
||||
FUNC: FnOnce(JobRefId) + Send + 'scope,
|
||||
{
|
||||
unsafe {
|
||||
self.job_completed_latch.increment();
|
||||
@@ -653,8 +683,12 @@ impl<'scope> ScopeBase<'scope> {
|
||||
|
||||
fn inject_broadcast<FUNC>(&self, job: Arc<ArcJob<FUNC>>)
|
||||
where
|
||||
FUNC: Fn() + Send + Sync + 'scope,
|
||||
FUNC: Fn(JobRefId) + Send + Sync + 'scope,
|
||||
{
|
||||
if self.worker.is_some() {
|
||||
let id = unsafe { ArcJob::as_job_ref(&job).id() };
|
||||
self.pending_jobs.lock().unwrap().insert(id);
|
||||
}
|
||||
let n_threads = self.registry.num_threads();
|
||||
let job_refs = (0..n_threads).map(|_| unsafe {
|
||||
self.job_completed_latch.increment();
|
||||
@@ -671,7 +705,11 @@ impl<'scope> ScopeBase<'scope> {
|
||||
FUNC: FnOnce() -> R,
|
||||
{
|
||||
let result = unsafe { Self::execute_job_closure(self, func) };
|
||||
self.job_completed_latch.wait(owner);
|
||||
self.job_completed_latch.wait(
|
||||
owner,
|
||||
|| self.pending_jobs.lock().unwrap().is_empty(),
|
||||
|job| self.pending_jobs.lock().unwrap().contains(&job.id()),
|
||||
);
|
||||
|
||||
// Restore the TLV if we ran some jobs while waiting
|
||||
tlv::set(self.tlv);
|
||||
|
||||
@@ -290,6 +290,7 @@ macro_rules! test_order {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
|
||||
fn lifo_order() {
|
||||
// In the absence of stealing, `scope()` runs its `spawn()` jobs in LIFO order.
|
||||
@@ -299,6 +300,7 @@ fn lifo_order() {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
|
||||
fn fifo_order() {
|
||||
// In the absence of stealing, `scope_fifo()` runs its `spawn_fifo()` jobs in FIFO order.
|
||||
@@ -334,6 +336,7 @@ macro_rules! test_nested_order {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
|
||||
fn nested_lifo_order() {
|
||||
// In the absence of stealing, `scope()` runs its `spawn()` jobs in LIFO order.
|
||||
@@ -343,6 +346,7 @@ fn nested_lifo_order() {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
|
||||
fn nested_fifo_order() {
|
||||
// In the absence of stealing, `scope_fifo()` runs its `spawn_fifo()` jobs in FIFO order.
|
||||
@@ -352,6 +356,7 @@ fn nested_fifo_order() {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
|
||||
fn nested_lifo_fifo_order() {
|
||||
// LIFO on the outside, FIFO on the inside
|
||||
@@ -361,6 +366,7 @@ fn nested_lifo_fifo_order() {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
|
||||
fn nested_fifo_lifo_order() {
|
||||
// FIFO on the outside, LIFO on the inside
|
||||
@@ -402,6 +408,7 @@ macro_rules! test_mixed_order {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
|
||||
fn mixed_lifo_order() {
|
||||
// NB: the end of the inner scope makes us execute some of the outer scope
|
||||
@@ -412,6 +419,7 @@ fn mixed_lifo_order() {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
|
||||
fn mixed_fifo_order() {
|
||||
let vec = test_mixed_order!(scope_fifo => spawn_fifo, scope_fifo => spawn_fifo);
|
||||
@@ -420,6 +428,7 @@ fn mixed_fifo_order() {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
|
||||
fn mixed_lifo_fifo_order() {
|
||||
// NB: the end of the inner scope makes us execute some of the outer scope
|
||||
@@ -430,6 +439,7 @@ fn mixed_lifo_fifo_order() {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
|
||||
fn mixed_fifo_lifo_order() {
|
||||
let vec = test_mixed_order!(scope_fifo => spawn_fifo, scope => spawn);
|
||||
@@ -519,8 +529,9 @@ fn mixed_lifetime_scope_fifo() {
|
||||
|
||||
#[test]
|
||||
fn scope_spawn_broadcast() {
|
||||
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
|
||||
let sum = AtomicUsize::new(0);
|
||||
let n = scope(|s| {
|
||||
let n = pool.scope(|s| {
|
||||
s.spawn_broadcast(|_, ctx| {
|
||||
sum.fetch_add(ctx.index(), Ordering::Relaxed);
|
||||
});
|
||||
@@ -531,8 +542,9 @@ fn scope_spawn_broadcast() {
|
||||
|
||||
#[test]
|
||||
fn scope_fifo_spawn_broadcast() {
|
||||
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
|
||||
let sum = AtomicUsize::new(0);
|
||||
let n = scope_fifo(|s| {
|
||||
let n = pool.scope_fifo(|s| {
|
||||
s.spawn_broadcast(|_, ctx| {
|
||||
sum.fetch_add(ctx.index(), Ordering::Relaxed);
|
||||
});
|
||||
@@ -542,6 +554,7 @@ fn scope_fifo_spawn_broadcast() {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn scope_spawn_broadcast_nested() {
|
||||
let sum = AtomicUsize::new(0);
|
||||
let n = scope(|s| {
|
||||
|
||||
@@ -144,6 +144,7 @@ impl Sleep {
|
||||
idle_state: &mut IdleState,
|
||||
latch: &CoreLatch,
|
||||
thread: &WorkerThread,
|
||||
steal: bool,
|
||||
) {
|
||||
if idle_state.rounds < ROUNDS_UNTIL_SLEEPY {
|
||||
thread::yield_now();
|
||||
@@ -157,7 +158,7 @@ impl Sleep {
|
||||
thread::yield_now();
|
||||
} else {
|
||||
debug_assert_eq!(idle_state.rounds, ROUNDS_UNTIL_SLEEPING);
|
||||
self.sleep(idle_state, latch, thread);
|
||||
self.sleep(idle_state, latch, thread, steal);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -167,7 +168,13 @@ impl Sleep {
|
||||
}
|
||||
|
||||
#[cold]
|
||||
fn sleep(&self, idle_state: &mut IdleState, latch: &CoreLatch, thread: &WorkerThread) {
|
||||
fn sleep(
|
||||
&self,
|
||||
idle_state: &mut IdleState,
|
||||
latch: &CoreLatch,
|
||||
thread: &WorkerThread,
|
||||
steal: bool,
|
||||
) {
|
||||
let worker_index = idle_state.worker_index;
|
||||
|
||||
if !latch.get_sleepy() {
|
||||
@@ -215,7 +222,7 @@ impl Sleep {
|
||||
// - that job triggers the rollover over the JEC such that we don't see it
|
||||
// - we are the last active worker thread
|
||||
std::sync::atomic::fence(Ordering::SeqCst);
|
||||
if thread.has_injected_job() {
|
||||
if steal && thread.has_injected_job() {
|
||||
// If we see an externally injected job, then we have to 'wake
|
||||
// ourselves up'. (Ordinarily, `sub_sleeping_thread` is invoked by
|
||||
// the one that wakes us.)
|
||||
|
||||
@@ -95,7 +95,7 @@ where
|
||||
|
||||
HeapJob::new(Tlv::null(), {
|
||||
let registry = Arc::clone(registry);
|
||||
move || {
|
||||
move |_| {
|
||||
registry.catch_unwind(func);
|
||||
registry.terminate(); // (*) permit registry to terminate now
|
||||
}
|
||||
|
||||
@@ -167,6 +167,7 @@ macro_rules! test_order {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
|
||||
fn lifo_order() {
|
||||
// In the absence of stealing, `spawn()` jobs on a thread will run in LIFO order.
|
||||
@@ -176,6 +177,7 @@ fn lifo_order() {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
|
||||
fn fifo_order() {
|
||||
// In the absence of stealing, `spawn_fifo()` jobs on a thread will run in FIFO order.
|
||||
@@ -185,6 +187,7 @@ fn fifo_order() {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
|
||||
fn lifo_fifo_order() {
|
||||
// LIFO on the outside, FIFO on the inside
|
||||
@@ -194,6 +197,7 @@ fn lifo_fifo_order() {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
|
||||
fn fifo_lifo_order() {
|
||||
// FIFO on the outside, LIFO on the inside
|
||||
@@ -230,6 +234,7 @@ macro_rules! test_mixed_order {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
|
||||
fn mixed_lifo_fifo_order() {
|
||||
let vec = test_mixed_order!(spawn, spawn_fifo);
|
||||
@@ -238,6 +243,7 @@ fn mixed_lifo_fifo_order() {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
|
||||
fn mixed_fifo_lifo_order() {
|
||||
let vec = test_mixed_order!(spawn_fifo, spawn);
|
||||
|
||||
@@ -152,6 +152,7 @@ fn self_install() {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
|
||||
fn mutual_install() {
|
||||
let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
|
||||
@@ -172,6 +173,7 @@ fn mutual_install() {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
|
||||
fn mutual_install_sleepy() {
|
||||
use std::{thread, time};
|
||||
@@ -227,6 +229,7 @@ macro_rules! test_scope_order {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
|
||||
fn scope_lifo_order() {
|
||||
let vec = test_scope_order!(scope => spawn);
|
||||
@@ -235,6 +238,7 @@ fn scope_lifo_order() {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
|
||||
fn scope_fifo_order() {
|
||||
let vec = test_scope_order!(scope_fifo => spawn_fifo);
|
||||
@@ -276,6 +280,7 @@ fn spawn_fifo_order() {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
|
||||
fn nested_scopes() {
|
||||
// Create matching scopes for every thread pool.
|
||||
@@ -312,6 +317,7 @@ fn nested_scopes() {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
|
||||
fn nested_fifo_scopes() {
|
||||
// Create matching fifo scopes for every thread pool.
|
||||
|
||||
Reference in New Issue
Block a user