core: Allocate threads on demand, not on scheduler startup

API change: rust_kernel::create_scheduler() or
rust_scheduler::rust_scheduler() respecitevly now take ownership of the
launch factory argument, it is needed to create new threads on demand.

Also renames rustrt::sched_threads() to rustrt::rust_sched_threads() for
consistency. Added rustrt::rust_max_sched_threads() to return the
maximal number of scheduled threads of the current scheduler.

Fixes #3493.
This commit is contained in:
Philipp Brüschweiler
2012-09-14 15:01:17 +02:00
committed by Brian Anderson
parent 35a9353774
commit 68e755b1c2
8 changed files with 102 additions and 48 deletions

View File

@@ -1661,7 +1661,8 @@ extern mod rustrt {
fn rust_get_sched_id() -> sched_id; fn rust_get_sched_id() -> sched_id;
fn rust_new_sched(num_threads: libc::uintptr_t) -> sched_id; fn rust_new_sched(num_threads: libc::uintptr_t) -> sched_id;
fn sched_threads() -> libc::size_t; fn rust_max_sched_threads() -> libc::size_t;
fn rust_sched_threads() -> libc::size_t;
fn rust_num_threads() -> libc::uintptr_t; fn rust_num_threads() -> libc::uintptr_t;
fn get_task_id() -> task_id; fn get_task_id() -> task_id;
@@ -2435,10 +2436,36 @@ fn test_sched_thread_per_core() {
do spawn_sched(ThreadPerCore) { do spawn_sched(ThreadPerCore) {
let cores = rustrt::rust_num_threads(); let cores = rustrt::rust_num_threads();
let reported_threads = rustrt::sched_threads(); let reported_threads = rustrt::rust_max_sched_threads();
assert(cores as uint == reported_threads as uint); assert(cores as uint == reported_threads as uint);
chan.send(()); chan.send(());
} }
port.recv(); port.recv();
} }
#[test]
fn test_spawn_thread_on_demand() {
let (chan, port) = pipes::stream();
do spawn_sched(ManualThreads(2)) {
let max_threads = rustrt::rust_max_sched_threads();
assert(max_threads as int == 2);
let running_threads = rustrt::rust_sched_threads();
assert(running_threads as int == 1);
let (chan2, port2) = pipes::stream();
do spawn() {
chan2.send(());
}
let running_threads2 = rustrt::rust_sched_threads();
assert(running_threads2 as int == 2);
port2.recv();
chan.send(());
}
port.recv();
}

View File

@@ -26,7 +26,7 @@ export run_tests_console;
#[abi = "cdecl"] #[abi = "cdecl"]
extern mod rustrt { extern mod rustrt {
fn sched_threads() -> libc::size_t; fn rust_max_sched_threads() -> libc::size_t;
} }
// The name of a test. By convention this follows the rules for rust // The name of a test. By convention this follows the rules for rust
@@ -327,7 +327,7 @@ const sched_overcommit : uint = 1u;
const sched_overcommit : uint = 4u; const sched_overcommit : uint = 4u;
fn get_concurrency() -> uint { fn get_concurrency() -> uint {
let threads = rustrt::sched_threads() as uint; let threads = rustrt::rust_max_sched_threads() as uint;
if threads == 1u { 1u } if threads == 1u { 1u }
else { threads * sched_overcommit } else { threads * sched_overcommit }
} }

View File

@@ -627,11 +627,17 @@ start_task(rust_task *target, fn_env_pair *f) {
} }
extern "C" CDECL size_t extern "C" CDECL size_t
sched_threads() { rust_sched_threads() {
rust_task *task = rust_get_current_task(); rust_task *task = rust_get_current_task();
return task->sched->number_of_threads(); return task->sched->number_of_threads();
} }
extern "C" CDECL size_t
rust_max_sched_threads() {
rust_task *task = rust_get_current_task();
return task->sched->max_number_of_threads();
}
extern "C" CDECL rust_port* extern "C" CDECL rust_port*
rust_port_take(rust_port_id id) { rust_port_take(rust_port_id id) {
rust_task *task = rust_get_current_task(); rust_task *task = rust_get_current_task();

View File

@@ -31,9 +31,10 @@ rust_kernel::rust_kernel(rust_env *env) :
// Create the single threaded scheduler that will run on the platform's // Create the single threaded scheduler that will run on the platform's
// main thread // main thread
rust_manual_sched_launcher_factory launchfac; rust_manual_sched_launcher_factory *launchfac =
osmain_scheduler = create_scheduler(&launchfac, 1, false); new rust_manual_sched_launcher_factory();
osmain_driver = launchfac.get_driver(); osmain_scheduler = create_scheduler(launchfac, 1, false);
osmain_driver = launchfac->get_driver();
sched_reaper.start(); sched_reaper.start();
} }
@@ -79,8 +80,9 @@ void rust_kernel::free(void *mem) {
rust_sched_id rust_sched_id
rust_kernel::create_scheduler(size_t num_threads) { rust_kernel::create_scheduler(size_t num_threads) {
rust_thread_sched_launcher_factory launchfac; rust_thread_sched_launcher_factory *launchfac =
return create_scheduler(&launchfac, num_threads, true); new rust_thread_sched_launcher_factory();
return create_scheduler(launchfac, num_threads, true);
} }
rust_sched_id rust_sched_id

View File

@@ -6,34 +6,39 @@
#include "rust_sched_launcher.h" #include "rust_sched_launcher.h"
rust_scheduler::rust_scheduler(rust_kernel *kernel, rust_scheduler::rust_scheduler(rust_kernel *kernel,
size_t num_threads, size_t max_num_threads,
rust_sched_id id, rust_sched_id id,
bool allow_exit, bool allow_exit,
bool killed, bool killed,
rust_sched_launcher_factory *launchfac) : rust_sched_launcher_factory *launchfac) :
ref_count(1), ref_count(1),
kernel(kernel), kernel(kernel),
live_threads(num_threads), live_threads(0),
live_tasks(0), live_tasks(0),
cur_thread(0), cur_thread(0),
may_exit(allow_exit), may_exit(allow_exit),
num_threads(num_threads), killed(killed),
launchfac(launchfac),
max_num_threads(max_num_threads),
id(id) id(id)
{ {
create_task_threads(launchfac, killed); // Create the first thread
threads.push(create_task_thread(0));
} }
void rust_scheduler::delete_this() { void rust_scheduler::delete_this() {
destroy_task_threads(); destroy_task_threads();
delete launchfac;
delete this; delete this;
} }
rust_sched_launcher * rust_sched_launcher *
rust_scheduler::create_task_thread(rust_sched_launcher_factory *launchfac, rust_scheduler::create_task_thread(int id) {
int id, bool killed) { live_threads++;
rust_sched_launcher *thread = launchfac->create(this, id, killed); rust_sched_launcher *thread = launchfac->create(this, id, killed);
KLOG(kernel, kern, "created task thread: " PTR ", id: %d", KLOG(kernel, kern, "created task thread: " PTR
thread, id); ", id: %d, live_threads: %d",
thread, id, live_threads);
return thread; return thread;
} }
@@ -43,19 +48,9 @@ rust_scheduler::destroy_task_thread(rust_sched_launcher *thread) {
delete thread; delete thread;
} }
void
rust_scheduler::create_task_threads(rust_sched_launcher_factory *launchfac,
bool killed) {
KLOG(kernel, kern, "Using %d scheduler threads.", num_threads);
for(size_t i = 0; i < num_threads; ++i) {
threads.push(create_task_thread(launchfac, i, killed));
}
}
void void
rust_scheduler::destroy_task_threads() { rust_scheduler::destroy_task_threads() {
for(size_t i = 0; i < num_threads; ++i) { for(size_t i = 0; i < threads.size(); ++i) {
destroy_task_thread(threads[i]); destroy_task_thread(threads[i]);
} }
} }
@@ -63,7 +58,7 @@ rust_scheduler::destroy_task_threads() {
void void
rust_scheduler::start_task_threads() rust_scheduler::start_task_threads()
{ {
for(size_t i = 0; i < num_threads; ++i) { for(size_t i = 0; i < threads.size(); ++i) {
rust_sched_launcher *thread = threads[i]; rust_sched_launcher *thread = threads[i];
thread->start(); thread->start();
} }
@@ -72,7 +67,7 @@ rust_scheduler::start_task_threads()
void void
rust_scheduler::join_task_threads() rust_scheduler::join_task_threads()
{ {
for(size_t i = 0; i < num_threads; ++i) { for(size_t i = 0; i < threads.size(); ++i) {
rust_sched_launcher *thread = threads[i]; rust_sched_launcher *thread = threads[i];
thread->join(); thread->join();
} }
@@ -80,7 +75,7 @@ rust_scheduler::join_task_threads()
void void
rust_scheduler::kill_all_tasks() { rust_scheduler::kill_all_tasks() {
for(size_t i = 0; i < num_threads; ++i) { for(size_t i = 0; i < threads.size(); ++i) {
rust_sched_launcher *thread = threads[i]; rust_sched_launcher *thread = threads[i];
thread->get_loop()->kill_all_tasks(); thread->get_loop()->kill_all_tasks();
} }
@@ -92,10 +87,29 @@ rust_scheduler::create_task(rust_task *spawner, const char *name) {
{ {
scoped_lock with(lock); scoped_lock with(lock);
live_tasks++; live_tasks++;
thread_no = cur_thread++;
if (cur_thread >= num_threads) // Find unoccupied thread
cur_thread = 0; for (thread_no = 0; thread_no < threads.size(); ++thread_no) {
if (threads[thread_no]->get_loop()->number_of_live_tasks() == 0)
break;
}
if (thread_no == threads.size()) {
if (threads.size() < max_num_threads) {
// Else create new thread
thread_no = threads.size();
rust_sched_launcher *thread = create_task_thread(thread_no);
thread->start();
threads.push(thread);
} else {
// Or use round robin allocation
thread_no = cur_thread++;
if (cur_thread >= max_num_threads)
cur_thread = 0;
}
}
} }
KLOG(kernel, kern, "Creating task %s, on thread %d.", name, thread_no);
kernel->register_task(); kernel->register_task();
rust_sched_launcher *thread = threads[thread_no]; rust_sched_launcher *thread = threads[thread_no];
return thread->get_loop()->create_task(spawner, name); return thread->get_loop()->create_task(spawner, name);
@@ -119,17 +133,22 @@ rust_scheduler::release_task() {
void void
rust_scheduler::exit() { rust_scheduler::exit() {
// Take a copy of num_threads. After the last thread exits this // Take a copy of the number of threads. After the last thread exits this
// scheduler will get destroyed, and our fields will cease to exist. // scheduler will get destroyed, and our fields will cease to exist.
size_t current_num_threads = num_threads; size_t current_num_threads = threads.size();
for(size_t i = 0; i < current_num_threads; ++i) { for(size_t i = 0; i < current_num_threads; ++i) {
threads[i]->get_loop()->exit(); threads[i]->get_loop()->exit();
} }
} }
size_t
rust_scheduler::max_number_of_threads() {
return max_num_threads;
}
size_t size_t
rust_scheduler::number_of_threads() { rust_scheduler::number_of_threads() {
return num_threads; return threads.size();
} }
void void

View File

@@ -30,19 +30,17 @@ private:
uintptr_t live_tasks; uintptr_t live_tasks;
size_t cur_thread; size_t cur_thread;
bool may_exit; bool may_exit;
bool killed;
rust_sched_launcher_factory *launchfac;
array_list<rust_sched_launcher *> threads; array_list<rust_sched_launcher *> threads;
const size_t num_threads; const size_t max_num_threads;
rust_sched_id id; rust_sched_id id;
void create_task_threads(rust_sched_launcher_factory *launchfac,
bool killed);
void destroy_task_threads(); void destroy_task_threads();
rust_sched_launcher * rust_sched_launcher *create_task_thread(int id);
create_task_thread(rust_sched_launcher_factory *launchfac, int id,
bool killed);
void destroy_task_thread(rust_sched_launcher *thread); void destroy_task_thread(rust_sched_launcher *thread);
void exit(); void exit();
@@ -51,7 +49,7 @@ private:
void delete_this(); void delete_this();
public: public:
rust_scheduler(rust_kernel *kernel, size_t num_threads, rust_scheduler(rust_kernel *kernel, size_t max_num_threads,
rust_sched_id id, bool allow_exit, bool killed, rust_sched_id id, bool allow_exit, bool killed,
rust_sched_launcher_factory *launchfac); rust_sched_launcher_factory *launchfac);
@@ -62,6 +60,7 @@ public:
void release_task(); void release_task();
size_t max_number_of_threads();
size_t number_of_threads(); size_t number_of_threads();
// Called by each thread when it terminates. When all threads // Called by each thread when it terminates. When all threads
// terminate the scheduler does as well. // terminate the scheduler does as well.

View File

@@ -30,6 +30,7 @@ rand_new_seeded
rand_next rand_next
rand_seed rand_seed
rust_get_sched_id rust_get_sched_id
rust_max_sched_threads
rust_new_sched rust_new_sched
rust_new_task_in_sched rust_new_task_in_sched
rust_num_threads rust_num_threads
@@ -48,6 +49,7 @@ rust_port_size
rust_process_wait rust_process_wait
rust_ptr_eq rust_ptr_eq
rust_run_program rust_run_program
rust_sched_threads
rust_set_exit_status rust_set_exit_status
rust_start rust_start
rust_getcwd rust_getcwd
@@ -58,7 +60,6 @@ rust_get_task
rust_get_stack_segment rust_get_stack_segment
rust_task_weaken rust_task_weaken
rust_task_unweaken rust_task_unweaken
sched_threads
shape_log_str shape_log_str
start_task start_task
vec_reserve_shared_actual vec_reserve_shared_actual

View File

@@ -8,7 +8,7 @@ extern mod rustrt {
fn last_os_error() -> ~str; fn last_os_error() -> ~str;
fn rust_getcwd() -> ~str; fn rust_getcwd() -> ~str;
fn get_task_id() -> libc::intptr_t; fn get_task_id() -> libc::intptr_t;
fn sched_threads(); fn rust_max_sched_threads();
fn rust_get_task(); fn rust_get_task();
} }
@@ -16,7 +16,7 @@ fn calllink01() { rustrt::rust_get_sched_id(); }
fn calllink02() { rustrt::last_os_error(); } fn calllink02() { rustrt::last_os_error(); }
fn calllink03() { rustrt::rust_getcwd(); } fn calllink03() { rustrt::rust_getcwd(); }
fn calllink08() { rustrt::get_task_id(); } fn calllink08() { rustrt::get_task_id(); }
fn calllink09() { rustrt::sched_threads(); } fn calllink09() { rustrt::rust_max_sched_threads(); }
fn calllink10() { rustrt::rust_get_task(); } fn calllink10() { rustrt::rust_get_task(); }
fn runtest(f: fn~(), frame_backoff: u32) { fn runtest(f: fn~(), frame_backoff: u32) {