native: Protect against spurious wakeups on cvars
This is a very real problem with cvars on normal systems, and all of channels will not work if spurious wakeups are accepted. This problem is just solved with a synchronized flag (accessed in the cvar's lock) to see whether a signal() actually happened or whether it's spurious.
This commit is contained in:
@@ -13,14 +13,15 @@
|
|||||||
|
|
||||||
use std::cast;
|
use std::cast;
|
||||||
use std::rt::Runtime;
|
use std::rt::Runtime;
|
||||||
use std::task::TaskOpts;
|
|
||||||
use std::rt::rtio;
|
|
||||||
use std::rt::local::Local;
|
use std::rt::local::Local;
|
||||||
|
use std::rt::rtio;
|
||||||
use std::rt::task::{Task, BlockedTask};
|
use std::rt::task::{Task, BlockedTask};
|
||||||
|
use std::task::TaskOpts;
|
||||||
use std::unstable::sync::LittleLock;
|
use std::unstable::sync::LittleLock;
|
||||||
|
|
||||||
struct SimpleTask {
|
struct SimpleTask {
|
||||||
lock: LittleLock,
|
lock: LittleLock,
|
||||||
|
awoken: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Runtime for SimpleTask {
|
impl Runtime for SimpleTask {
|
||||||
@@ -30,30 +31,37 @@ impl Runtime for SimpleTask {
|
|||||||
f: |BlockedTask| -> Result<(), BlockedTask>) {
|
f: |BlockedTask| -> Result<(), BlockedTask>) {
|
||||||
assert!(times == 1);
|
assert!(times == 1);
|
||||||
|
|
||||||
let my_lock: *mut LittleLock = &mut self.lock;
|
let me = &mut *self as *mut SimpleTask;
|
||||||
|
let cur_dupe = &*cur_task as *Task;
|
||||||
cur_task.put_runtime(self as ~Runtime);
|
cur_task.put_runtime(self as ~Runtime);
|
||||||
|
let task = BlockedTask::block(cur_task);
|
||||||
|
|
||||||
|
// See libnative/task.rs for what's going on here with the `awoken`
|
||||||
|
// field and the while loop around wait()
|
||||||
unsafe {
|
unsafe {
|
||||||
let cur_task_dupe = *cast::transmute::<&~Task, &uint>(&cur_task);
|
let mut guard = (*me).lock.lock();
|
||||||
let task = BlockedTask::block(cur_task);
|
(*me).awoken = false;
|
||||||
|
|
||||||
let mut guard = (*my_lock).lock();
|
|
||||||
match f(task) {
|
match f(task) {
|
||||||
Ok(()) => guard.wait(),
|
Ok(()) => {
|
||||||
|
while !(*me).awoken {
|
||||||
|
guard.wait();
|
||||||
|
}
|
||||||
|
}
|
||||||
Err(task) => { cast::forget(task.wake()); }
|
Err(task) => { cast::forget(task.wake()); }
|
||||||
}
|
}
|
||||||
drop(guard);
|
drop(guard);
|
||||||
cur_task = cast::transmute::<uint, ~Task>(cur_task_dupe);
|
cur_task = cast::transmute(cur_dupe);
|
||||||
}
|
}
|
||||||
Local::put(cur_task);
|
Local::put(cur_task);
|
||||||
}
|
}
|
||||||
fn reawaken(mut ~self, mut to_wake: ~Task) {
|
fn reawaken(mut ~self, mut to_wake: ~Task) {
|
||||||
let lock: *mut LittleLock = &mut self.lock;
|
let me = &mut *self as *mut SimpleTask;
|
||||||
to_wake.put_runtime(self as ~Runtime);
|
to_wake.put_runtime(self as ~Runtime);
|
||||||
unsafe {
|
unsafe {
|
||||||
cast::forget(to_wake);
|
cast::forget(to_wake);
|
||||||
let _l = (*lock).lock();
|
let _l = (*me).lock.lock();
|
||||||
(*lock).signal();
|
(*me).awoken = true;
|
||||||
|
(*me).lock.signal();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -72,6 +80,9 @@ impl Runtime for SimpleTask {
|
|||||||
|
|
||||||
pub fn task() -> ~Task {
|
pub fn task() -> ~Task {
|
||||||
let mut task = ~Task::new();
|
let mut task = ~Task::new();
|
||||||
task.put_runtime(~SimpleTask { lock: LittleLock::new() } as ~Runtime);
|
task.put_runtime(~SimpleTask {
|
||||||
|
lock: LittleLock::new(),
|
||||||
|
awoken: false,
|
||||||
|
} as ~Runtime);
|
||||||
return task;
|
return task;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -33,6 +33,7 @@ pub fn new() -> ~Task {
|
|||||||
let mut task = ~Task::new();
|
let mut task = ~Task::new();
|
||||||
task.put_runtime(~Ops {
|
task.put_runtime(~Ops {
|
||||||
lock: unsafe { Mutex::new() },
|
lock: unsafe { Mutex::new() },
|
||||||
|
awoken: false,
|
||||||
} as ~rt::Runtime);
|
} as ~rt::Runtime);
|
||||||
return task;
|
return task;
|
||||||
}
|
}
|
||||||
@@ -85,7 +86,8 @@ pub fn spawn_opts(opts: TaskOpts, f: proc()) {
|
|||||||
// This structure is the glue between channels and the 1:1 scheduling mode. This
|
// This structure is the glue between channels and the 1:1 scheduling mode. This
|
||||||
// structure is allocated once per task.
|
// structure is allocated once per task.
|
||||||
struct Ops {
|
struct Ops {
|
||||||
lock: Mutex, // native synchronization
|
lock: Mutex, // native synchronization
|
||||||
|
awoken: bool, // used to prevent spurious wakeups
|
||||||
}
|
}
|
||||||
|
|
||||||
impl rt::Runtime for Ops {
|
impl rt::Runtime for Ops {
|
||||||
@@ -139,9 +141,16 @@ impl rt::Runtime for Ops {
|
|||||||
// reasoning for this is the same logic as above in that the task silently
|
// reasoning for this is the same logic as above in that the task silently
|
||||||
// transfers ownership via the `uint`, not through normal compiler
|
// transfers ownership via the `uint`, not through normal compiler
|
||||||
// semantics.
|
// semantics.
|
||||||
|
//
|
||||||
|
// On a mildly unrelated note, it should also be pointed out that OS
|
||||||
|
// condition variables are susceptible to spurious wakeups, which we need to
|
||||||
|
// be ready for. In order to accomodate for this fact, we have an extra
|
||||||
|
// `awoken` field which indicates whether we were actually woken up via some
|
||||||
|
// invocation of `reawaken`. This flag is only ever accessed inside the
|
||||||
|
// lock, so there's no need to make it atomic.
|
||||||
fn deschedule(mut ~self, times: uint, mut cur_task: ~Task,
|
fn deschedule(mut ~self, times: uint, mut cur_task: ~Task,
|
||||||
f: |BlockedTask| -> Result<(), BlockedTask>) {
|
f: |BlockedTask| -> Result<(), BlockedTask>) {
|
||||||
let my_lock: *mut Mutex = &mut self.lock as *mut Mutex;
|
let me = &mut *self as *mut Ops;
|
||||||
cur_task.put_runtime(self as ~rt::Runtime);
|
cur_task.put_runtime(self as ~rt::Runtime);
|
||||||
|
|
||||||
unsafe {
|
unsafe {
|
||||||
@@ -149,15 +158,21 @@ impl rt::Runtime for Ops {
|
|||||||
let task = BlockedTask::block(cur_task);
|
let task = BlockedTask::block(cur_task);
|
||||||
|
|
||||||
if times == 1 {
|
if times == 1 {
|
||||||
(*my_lock).lock();
|
(*me).lock.lock();
|
||||||
|
(*me).awoken = false;
|
||||||
match f(task) {
|
match f(task) {
|
||||||
Ok(()) => (*my_lock).wait(),
|
Ok(()) => {
|
||||||
|
while !(*me).awoken {
|
||||||
|
(*me).lock.wait();
|
||||||
|
}
|
||||||
|
}
|
||||||
Err(task) => { cast::forget(task.wake()); }
|
Err(task) => { cast::forget(task.wake()); }
|
||||||
}
|
}
|
||||||
(*my_lock).unlock();
|
(*me).lock.unlock();
|
||||||
} else {
|
} else {
|
||||||
let mut iter = task.make_selectable(times);
|
let mut iter = task.make_selectable(times);
|
||||||
(*my_lock).lock();
|
(*me).lock.lock();
|
||||||
|
(*me).awoken = false;
|
||||||
let success = iter.all(|task| {
|
let success = iter.all(|task| {
|
||||||
match f(task) {
|
match f(task) {
|
||||||
Ok(()) => true,
|
Ok(()) => true,
|
||||||
@@ -167,10 +182,10 @@ impl rt::Runtime for Ops {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
if success {
|
while success && !(*me).awoken {
|
||||||
(*my_lock).wait();
|
(*me).lock.wait();
|
||||||
}
|
}
|
||||||
(*my_lock).unlock();
|
(*me).lock.unlock();
|
||||||
}
|
}
|
||||||
// re-acquire ownership of the task
|
// re-acquire ownership of the task
|
||||||
cur_task = cast::transmute::<uint, ~Task>(cur_task_dupe);
|
cur_task = cast::transmute::<uint, ~Task>(cur_task_dupe);
|
||||||
@@ -184,12 +199,13 @@ impl rt::Runtime for Ops {
|
|||||||
// why it's valid to do so.
|
// why it's valid to do so.
|
||||||
fn reawaken(mut ~self, mut to_wake: ~Task, _can_resched: bool) {
|
fn reawaken(mut ~self, mut to_wake: ~Task, _can_resched: bool) {
|
||||||
unsafe {
|
unsafe {
|
||||||
let lock: *mut Mutex = &mut self.lock as *mut Mutex;
|
let me = &mut *self as *mut Ops;
|
||||||
to_wake.put_runtime(self as ~rt::Runtime);
|
to_wake.put_runtime(self as ~rt::Runtime);
|
||||||
cast::forget(to_wake);
|
cast::forget(to_wake);
|
||||||
(*lock).lock();
|
(*me).lock.lock();
|
||||||
(*lock).signal();
|
(*me).awoken = true;
|
||||||
(*lock).unlock();
|
(*me).lock.signal();
|
||||||
|
(*me).lock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -875,7 +875,7 @@ impl<T: Send> Port<T> {
|
|||||||
let data = self.try_recv_inc(false);
|
let data = self.try_recv_inc(false);
|
||||||
if data.is_none() &&
|
if data.is_none() &&
|
||||||
unsafe { (*packet).cnt.load(SeqCst) } != DISCONNECTED {
|
unsafe { (*packet).cnt.load(SeqCst) } != DISCONNECTED {
|
||||||
fail!("bug: woke up too soon");
|
fail!("bug: woke up too soon {}", unsafe { (*packet).cnt.load(SeqCst) });
|
||||||
}
|
}
|
||||||
return data;
|
return data;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user