Made task threads wait instead of sleep, so they can be woken up. This appears to give us much better parallel performance.

Also, commented out one more unsafe log and updated rust_kernel.cpp to compile under g++
This commit is contained in:
Eric Holk
2011-07-25 18:00:37 -07:00
parent e697a52359
commit 5302cde188
8 changed files with 61 additions and 22 deletions

View File

@@ -123,7 +123,7 @@ circular_buffer::dequeue(void *dst) {
if (dst != NULL) {
memcpy(dst, &_buffer[_next], unit_sz);
}
DLOG(sched, mem, "shifted data from index %d", _next);
//DLOG(sched, mem, "shifted data from index %d", _next);
_unread -= unit_sz;
_next += unit_sz;
if (_next == _buffer_sz) {

View File

@@ -53,13 +53,13 @@ rust_kernel::destroy_scheduler(rust_scheduler *sched) {
}
void rust_kernel::create_schedulers() {
for(int i = 0; i < num_threads; ++i) {
for(size_t i = 0; i < num_threads; ++i) {
threads.push(create_scheduler(i));
}
}
void rust_kernel::destroy_schedulers() {
for(int i = 0; i < num_threads; ++i) {
for(size_t i = 0; i < num_threads; ++i) {
destroy_scheduler(threads[i]);
}
}
@@ -106,7 +106,7 @@ rust_kernel::get_port_handle(rust_port *port) {
void
rust_kernel::log_all_scheduler_state() {
for(int i = 0; i < num_threads; ++i) {
for(size_t i = 0; i < num_threads; ++i) {
threads[i]->log_state();
}
}
@@ -252,12 +252,12 @@ rust_kernel::signal_kernel_lock() {
int rust_kernel::start_task_threads()
{
for(int i = 0; i < num_threads; ++i) {
for(size_t i = 0; i < num_threads; ++i) {
rust_scheduler *thread = threads[i];
thread->start();
}
for(int i = 0; i < num_threads; ++i) {
for(size_t i = 0; i < num_threads; ++i) {
rust_scheduler *thread = threads[i];
thread->join();
}
@@ -271,6 +271,12 @@ rust_kernel::create_task(rust_task *spawner, const char *name) {
return threads[rand(&rctx) % num_threads]->create_task(spawner, name);
}
void rust_kernel::wakeup_schedulers() {
for(size_t i = 0; i < num_threads; ++i) {
threads[i]->lock.signal_all();
}
}
#ifdef __WIN32__
void
rust_kernel::win32_require(LPCTSTR fn, BOOL ok) {

View File

@@ -106,6 +106,7 @@ public:
bool is_deadlocked();
void signal_kernel_lock();
void wakeup_schedulers();
/**
* Notifies the kernel whenever a message has been enqueued . This gives

View File

@@ -87,6 +87,7 @@ rust_scheduler::reap_dead_tasks(int id) {
I(this, lock.lock_held_by_current_thread());
for (size_t i = 0; i < dead_tasks.length(); ) {
rust_task *task = dead_tasks[i];
task->lock.lock();
// Make sure this task isn't still running somewhere else...
if (task->ref_count == 0 && task->can_schedule(id)) {
I(this, task->tasks_waiting_to_join.is_empty());
@@ -94,10 +95,13 @@ rust_scheduler::reap_dead_tasks(int id) {
DLOG(this, task,
"deleting unreferenced dead task %s @0x%" PRIxPTR,
task->name, task);
task->lock.unlock();
delete task;
sync::decrement(kernel->live_tasks);
kernel->wakeup_schedulers();
continue;
}
task->lock.unlock();
++i;
}
}
@@ -206,21 +210,15 @@ rust_scheduler::start_main_loop() {
rust_task *scheduled_task = schedule_task(id);
// The scheduler busy waits until a task is available for scheduling.
// Eventually we'll want a smarter way to do this, perhaps sleep
// for a minimum amount of time.
if (scheduled_task == NULL) {
log_state();
DLOG(this, task,
"all tasks are blocked, scheduler id %d yielding ...",
id);
lock.unlock();
sync::sleep(100);
lock.lock();
DLOG(this, task,
"scheduler resuming ...");
lock.timed_wait(100000);
reap_dead_tasks(id);
DLOG(this, task,
"scheduler %d resuming ...", id);
continue;
}

View File

@@ -105,7 +105,7 @@ rust_task::~rust_task()
/* FIXME: tighten this up, there are some more
assertions that hold at task-lifecycle events. */
// I(sched, ref_count == 0 ||
I(sched, ref_count == 0); // ||
// (ref_count == 1 && this == sched->root_task));
del_stk(this, stk);
@@ -167,6 +167,7 @@ rust_task::start(uintptr_t spawnee_fn,
yield_timer.reset_us(0);
transition(&sched->newborn_tasks, &sched->running_tasks);
sched->lock.signal();
}
void
@@ -212,6 +213,8 @@ rust_task::kill() {
if (NULL == supervisor && propagate_failure)
sched->fail();
sched->lock.signal();
LOG(this, task, "preparing to unwind task: 0x%" PRIxPTR, this);
// run_on_resume(rust_unwind_glue);
}
@@ -442,12 +445,15 @@ rust_task::wakeup(rust_cond *from) {
if(_on_wakeup) {
_on_wakeup->on_wakeup();
}
sched->lock.signal();
}
void
rust_task::die() {
scoped_lock with(lock);
transition(&sched->running_tasks, &sched->dead_tasks);
sched->lock.signal();
}
void

View File

@@ -10,7 +10,9 @@
#include "lock_and_signal.h"
#if defined(__WIN32__)
lock_and_signal::lock_and_signal() {
lock_and_signal::lock_and_signal()
: alive(true)
{
// FIXME: In order to match the behavior of pthread_cond_broadcast on
// Windows, we create manual reset events. This however breaks the
// behavior of pthread_cond_signal, fixing this is quite involved:
@@ -22,7 +24,7 @@ lock_and_signal::lock_and_signal() {
#else
lock_and_signal::lock_and_signal()
: _locked(false)
: _locked(false), alive(true)
{
CHECKED(pthread_cond_init(&_cond, NULL));
CHECKED(pthread_mutex_init(&_mutex, NULL));
@@ -36,6 +38,7 @@ lock_and_signal::~lock_and_signal() {
CHECKED(pthread_cond_destroy(&_cond));
CHECKED(pthread_mutex_destroy(&_mutex));
#endif
alive = false;
}
void lock_and_signal::lock() {
@@ -65,11 +68,14 @@ void lock_and_signal::wait() {
timed_wait(0);
}
void lock_and_signal::timed_wait(size_t timeout_in_ns) {
bool lock_and_signal::timed_wait(size_t timeout_in_ns) {
_locked = false;
bool rv = true;
#if defined(__WIN32__)
LeaveCriticalSection(&_cs);
WaitForSingleObject(_event, INFINITE);
EnterCriticalSection(&_cs);
_holding_thread = GetCurrentThreadId();
#else
if (timeout_in_ns == 0) {
CHECKED(pthread_cond_wait(&_cond, &_mutex));
@@ -79,9 +85,29 @@ void lock_and_signal::timed_wait(size_t timeout_in_ns) {
timespec time_spec;
time_spec.tv_sec = time_val.tv_sec + 0;
time_spec.tv_nsec = time_val.tv_usec * 1000 + timeout_in_ns;
CHECKED(pthread_cond_timedwait(&_cond, &_mutex, &time_spec));
if(time_spec.tv_nsec >= 1000000000) {
time_spec.tv_sec++;
time_spec.tv_nsec -= 1000000000;
}
int cond_wait_status
= pthread_cond_timedwait(&_cond, &_mutex, &time_spec);
switch(cond_wait_status) {
case 0:
// successfully grabbed the lock.
break;
case ETIMEDOUT:
// Oops, we timed out.
rv = false;
break;
default:
// Error
CHECKED(cond_wait_status);
}
}
_holding_thread = pthread_self();
#endif
_locked = true;
return rv;
}
/**

View File

@@ -14,6 +14,9 @@ class lock_and_signal {
pthread_t _holding_thread;
#endif
bool _locked;
bool alive;
public:
lock_and_signal();
virtual ~lock_and_signal();
@@ -21,7 +24,7 @@ public:
void lock();
void unlock();
void wait();
void timed_wait(size_t timeout_in_ns);
bool timed_wait(size_t timeout_in_ns);
void signal();
void signal_all();

View File

@@ -184,7 +184,6 @@ mod map_reduce {
let m;
ctrl |> m;
alt m {
mapper_done. {
// log_err "received mapper terminated.";