cleanup main loop
This commit is contained in:
@@ -4,12 +4,13 @@ pub(crate) mod pending_requests;
|
||||
|
||||
use std::{error::Error, fmt, path::PathBuf, sync::Arc, time::Instant};
|
||||
|
||||
use crossbeam_channel::{select, unbounded, Receiver, RecvError, Sender};
|
||||
use crossbeam_channel::{select, unbounded, RecvError, Sender};
|
||||
use lsp_server::{Connection, ErrorCode, Message, Notification, Request, RequestId, Response};
|
||||
use lsp_types::{ClientCapabilities, NumberOrString};
|
||||
use ra_ide_api::{Canceled, FeatureFlags, FileId, LibraryData};
|
||||
use ra_ide_api::{Canceled, FeatureFlags, FileId, LibraryData, SourceRootId};
|
||||
use ra_prof::profile;
|
||||
use ra_vfs::VfsTask;
|
||||
use relative_path::RelativePathBuf;
|
||||
use serde::{de::DeserializeOwned, Serialize};
|
||||
use threadpool::ThreadPool;
|
||||
|
||||
@@ -18,7 +19,6 @@ use crate::{
|
||||
pending_requests::{PendingRequest, PendingRequests},
|
||||
subscriptions::Subscriptions,
|
||||
},
|
||||
project_model::workspace_loader,
|
||||
req,
|
||||
world::{Options, WorldSnapshot, WorldState},
|
||||
Result, ServerConfig,
|
||||
@@ -54,14 +54,17 @@ pub fn main_loop(
|
||||
connection: &Connection,
|
||||
) -> Result<()> {
|
||||
log::info!("server_config: {:#?}", config);
|
||||
|
||||
// FIXME: support dynamic workspace loading.
|
||||
let workspaces = {
|
||||
let ws_worker = workspace_loader(config.with_sysroot);
|
||||
let mut loaded_workspaces = Vec::new();
|
||||
for ws_root in &ws_roots {
|
||||
ws_worker.sender().send(ws_root.clone()).unwrap();
|
||||
match ws_worker.receiver().recv().unwrap() {
|
||||
Ok(ws) => loaded_workspaces.push(ws),
|
||||
let workspace = ra_project_model::ProjectWorkspace::discover_with_sysroot(
|
||||
ws_root.as_path(),
|
||||
config.with_sysroot,
|
||||
);
|
||||
match workspace {
|
||||
Ok(workspace) => loaded_workspaces.push(workspace),
|
||||
Err(e) => {
|
||||
log::error!("loading workspace failed: {}", e);
|
||||
|
||||
@@ -75,11 +78,13 @@ pub fn main_loop(
|
||||
}
|
||||
loaded_workspaces
|
||||
};
|
||||
|
||||
let globs = config
|
||||
.exclude_globs
|
||||
.iter()
|
||||
.map(|glob| ra_vfs_glob::Glob::new(glob))
|
||||
.collect::<std::result::Result<Vec<_>, _>>()?;
|
||||
|
||||
let feature_flags = {
|
||||
let mut ff = FeatureFlags::default();
|
||||
for (flag, value) in config.feature_flags {
|
||||
@@ -95,7 +100,8 @@ pub fn main_loop(
|
||||
ff
|
||||
};
|
||||
log::info!("feature_flags: {:#?}", feature_flags);
|
||||
let mut state = WorldState::new(
|
||||
|
||||
let mut world_state = WorldState::new(
|
||||
ws_roots,
|
||||
workspaces,
|
||||
config.lru_capacity,
|
||||
@@ -113,31 +119,58 @@ pub fn main_loop(
|
||||
|
||||
let pool = ThreadPool::new(THREADPOOL_SIZE);
|
||||
let (task_sender, task_receiver) = unbounded::<Task>();
|
||||
let mut pending_requests = PendingRequests::default();
|
||||
let (libdata_sender, libdata_receiver) = unbounded::<LibraryData>();
|
||||
let mut loop_state = LoopState::default();
|
||||
|
||||
log::info!("server initialized, serving requests");
|
||||
let main_res = main_loop_inner(
|
||||
&pool,
|
||||
connection,
|
||||
task_sender,
|
||||
task_receiver.clone(),
|
||||
&mut state,
|
||||
&mut pending_requests,
|
||||
);
|
||||
{
|
||||
let task_sender = task_sender;
|
||||
let libdata_sender = libdata_sender;
|
||||
loop {
|
||||
log::trace!("selecting");
|
||||
let event = select! {
|
||||
recv(&connection.receiver) -> msg => match msg {
|
||||
Ok(msg) => Event::Msg(msg),
|
||||
Err(RecvError) => Err("client exited without shutdown")?,
|
||||
},
|
||||
recv(task_receiver) -> task => Event::Task(task.unwrap()),
|
||||
recv(world_state.task_receiver) -> task => match task {
|
||||
Ok(task) => Event::Vfs(task),
|
||||
Err(RecvError) => Err("vfs died")?,
|
||||
},
|
||||
recv(libdata_receiver) -> data => Event::Lib(data.unwrap())
|
||||
};
|
||||
if let Event::Msg(Message::Request(req)) = &event {
|
||||
if connection.handle_shutdown(&req)? {
|
||||
break;
|
||||
};
|
||||
}
|
||||
loop_turn(
|
||||
&pool,
|
||||
&task_sender,
|
||||
&libdata_sender,
|
||||
connection,
|
||||
&mut world_state,
|
||||
&mut loop_state,
|
||||
event,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
|
||||
log::info!("waiting for tasks to finish...");
|
||||
task_receiver
|
||||
.into_iter()
|
||||
.for_each(|task| on_task(task, &connection.sender, &mut pending_requests, &mut state));
|
||||
task_receiver.into_iter().for_each(|task| {
|
||||
on_task(task, &connection.sender, &mut loop_state.pending_requests, &mut world_state)
|
||||
});
|
||||
libdata_receiver.into_iter().for_each(|lib| drop(lib));
|
||||
log::info!("...tasks have finished");
|
||||
log::info!("joining threadpool...");
|
||||
drop(pool);
|
||||
log::info!("...threadpool has finished");
|
||||
|
||||
let vfs = Arc::try_unwrap(state.vfs).expect("all snapshots should be dead");
|
||||
let vfs = Arc::try_unwrap(world_state.vfs).expect("all snapshots should be dead");
|
||||
drop(vfs);
|
||||
|
||||
main_res
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -192,121 +225,113 @@ impl fmt::Debug for Event {
|
||||
}
|
||||
}
|
||||
|
||||
fn main_loop_inner(
|
||||
pool: &ThreadPool,
|
||||
connection: &Connection,
|
||||
task_sender: Sender<Task>,
|
||||
task_receiver: Receiver<Task>,
|
||||
state: &mut WorldState,
|
||||
pending_requests: &mut PendingRequests,
|
||||
) -> Result<()> {
|
||||
let mut subs = Subscriptions::default();
|
||||
#[derive(Debug, Default)]
|
||||
struct LoopState {
|
||||
pending_requests: PendingRequests,
|
||||
subscriptions: Subscriptions,
|
||||
// We try not to index more than MAX_IN_FLIGHT_LIBS libraries at the same
|
||||
// time to always have a thread ready to react to input.
|
||||
let mut in_flight_libraries = 0;
|
||||
let mut pending_libraries = Vec::new();
|
||||
let mut send_workspace_notification = true;
|
||||
in_flight_libraries: usize,
|
||||
pending_libraries: Vec<(SourceRootId, Vec<(FileId, RelativePathBuf, Arc<String>)>)>,
|
||||
workspace_loaded: bool,
|
||||
}
|
||||
|
||||
let (libdata_sender, libdata_receiver) = unbounded();
|
||||
loop {
|
||||
log::trace!("selecting");
|
||||
let event = select! {
|
||||
recv(&connection.receiver) -> msg => match msg {
|
||||
Ok(msg) => Event::Msg(msg),
|
||||
Err(RecvError) => Err("client exited without shutdown")?,
|
||||
},
|
||||
recv(task_receiver) -> task => Event::Task(task.unwrap()),
|
||||
recv(state.task_receiver) -> task => match task {
|
||||
Ok(task) => Event::Vfs(task),
|
||||
Err(RecvError) => Err("vfs died")?,
|
||||
},
|
||||
recv(libdata_receiver) -> data => Event::Lib(data.unwrap())
|
||||
};
|
||||
let loop_start = Instant::now();
|
||||
fn loop_turn(
|
||||
pool: &ThreadPool,
|
||||
task_sender: &Sender<Task>,
|
||||
libdata_sender: &Sender<LibraryData>,
|
||||
connection: &Connection,
|
||||
world_state: &mut WorldState,
|
||||
loop_state: &mut LoopState,
|
||||
event: Event,
|
||||
) -> Result<()> {
|
||||
let loop_start = Instant::now();
|
||||
|
||||
// NOTE: don't count blocking select! call as a loop-turn time
|
||||
let _p = profile("main_loop_inner/loop-turn");
|
||||
log::info!("loop turn = {:?}", event);
|
||||
let queue_count = pool.queued_count();
|
||||
if queue_count > 0 {
|
||||
log::info!("queued count = {}", queue_count);
|
||||
// NOTE: don't count blocking select! call as a loop-turn time
|
||||
let _p = profile("main_loop_inner/loop-turn");
|
||||
log::info!("loop turn = {:?}", event);
|
||||
let queue_count = pool.queued_count();
|
||||
if queue_count > 0 {
|
||||
log::info!("queued count = {}", queue_count);
|
||||
}
|
||||
|
||||
let mut state_changed = false;
|
||||
match event {
|
||||
Event::Task(task) => {
|
||||
on_task(task, &connection.sender, &mut loop_state.pending_requests, world_state);
|
||||
world_state.maybe_collect_garbage();
|
||||
}
|
||||
|
||||
let mut state_changed = false;
|
||||
match event {
|
||||
Event::Task(task) => {
|
||||
on_task(task, &connection.sender, pending_requests, state);
|
||||
state.maybe_collect_garbage();
|
||||
}
|
||||
Event::Vfs(task) => {
|
||||
state.vfs.write().handle_task(task);
|
||||
Event::Vfs(task) => {
|
||||
world_state.vfs.write().handle_task(task);
|
||||
state_changed = true;
|
||||
}
|
||||
Event::Lib(lib) => {
|
||||
world_state.add_lib(lib);
|
||||
world_state.maybe_collect_garbage();
|
||||
loop_state.in_flight_libraries -= 1;
|
||||
}
|
||||
Event::Msg(msg) => match msg {
|
||||
Message::Request(req) => on_request(
|
||||
world_state,
|
||||
&mut loop_state.pending_requests,
|
||||
pool,
|
||||
task_sender,
|
||||
&connection.sender,
|
||||
loop_start,
|
||||
req,
|
||||
)?,
|
||||
Message::Notification(not) => {
|
||||
on_notification(
|
||||
&connection.sender,
|
||||
world_state,
|
||||
&mut loop_state.pending_requests,
|
||||
&mut loop_state.subscriptions,
|
||||
not,
|
||||
)?;
|
||||
state_changed = true;
|
||||
}
|
||||
Event::Lib(lib) => {
|
||||
state.add_lib(lib);
|
||||
state.maybe_collect_garbage();
|
||||
in_flight_libraries -= 1;
|
||||
}
|
||||
Event::Msg(msg) => match msg {
|
||||
Message::Request(req) => {
|
||||
if connection.handle_shutdown(&req)? {
|
||||
return Ok(());
|
||||
};
|
||||
on_request(
|
||||
state,
|
||||
pending_requests,
|
||||
pool,
|
||||
&task_sender,
|
||||
&connection.sender,
|
||||
loop_start,
|
||||
req,
|
||||
)?
|
||||
}
|
||||
Message::Notification(not) => {
|
||||
on_notification(&connection.sender, state, pending_requests, &mut subs, not)?;
|
||||
state_changed = true;
|
||||
}
|
||||
Message::Response(resp) => log::error!("unexpected response: {:?}", resp),
|
||||
},
|
||||
};
|
||||
Message::Response(resp) => log::error!("unexpected response: {:?}", resp),
|
||||
},
|
||||
};
|
||||
|
||||
pending_libraries.extend(state.process_changes());
|
||||
while in_flight_libraries < MAX_IN_FLIGHT_LIBS && !pending_libraries.is_empty() {
|
||||
let (root, files) = pending_libraries.pop().unwrap();
|
||||
in_flight_libraries += 1;
|
||||
let sender = libdata_sender.clone();
|
||||
pool.execute(move || {
|
||||
log::info!("indexing {:?} ... ", root);
|
||||
let _p = profile(&format!("indexed {:?}", root));
|
||||
let data = LibraryData::prepare(root, files);
|
||||
sender.send(data).unwrap();
|
||||
});
|
||||
}
|
||||
loop_state.pending_libraries.extend(world_state.process_changes());
|
||||
while loop_state.in_flight_libraries < MAX_IN_FLIGHT_LIBS
|
||||
&& !loop_state.pending_libraries.is_empty()
|
||||
{
|
||||
let (root, files) = loop_state.pending_libraries.pop().unwrap();
|
||||
loop_state.in_flight_libraries += 1;
|
||||
let sender = libdata_sender.clone();
|
||||
pool.execute(move || {
|
||||
log::info!("indexing {:?} ... ", root);
|
||||
let _p = profile(&format!("indexed {:?}", root));
|
||||
let data = LibraryData::prepare(root, files);
|
||||
sender.send(data).unwrap();
|
||||
});
|
||||
}
|
||||
|
||||
if send_workspace_notification
|
||||
&& state.roots_to_scan == 0
|
||||
&& pending_libraries.is_empty()
|
||||
&& in_flight_libraries == 0
|
||||
{
|
||||
let n_packages: usize = state.workspaces.iter().map(|it| it.n_packages()).sum();
|
||||
if state.feature_flags().get("notifications.workspace-loaded") {
|
||||
let msg = format!("workspace loaded, {} rust packages", n_packages);
|
||||
show_message(req::MessageType::Info, msg, &connection.sender);
|
||||
}
|
||||
// Only send the notification first time
|
||||
send_workspace_notification = false;
|
||||
}
|
||||
|
||||
if state_changed {
|
||||
update_file_notifications_on_threadpool(
|
||||
pool,
|
||||
state.snapshot(),
|
||||
state.options.publish_decorations,
|
||||
task_sender.clone(),
|
||||
subs.subscriptions(),
|
||||
)
|
||||
if !loop_state.workspace_loaded
|
||||
&& world_state.roots_to_scan == 0
|
||||
&& loop_state.pending_libraries.is_empty()
|
||||
&& loop_state.in_flight_libraries == 0
|
||||
{
|
||||
loop_state.workspace_loaded = true;
|
||||
let n_packages: usize = world_state.workspaces.iter().map(|it| it.n_packages()).sum();
|
||||
if world_state.feature_flags().get("notifications.workspace-loaded") {
|
||||
let msg = format!("workspace loaded, {} rust packages", n_packages);
|
||||
show_message(req::MessageType::Info, msg, &connection.sender);
|
||||
}
|
||||
}
|
||||
|
||||
if state_changed {
|
||||
update_file_notifications_on_threadpool(
|
||||
pool,
|
||||
world_state.snapshot(),
|
||||
world_state.options.publish_decorations,
|
||||
task_sender.clone(),
|
||||
loop_state.subscriptions.subscriptions(),
|
||||
)
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn on_task(
|
||||
|
||||
Reference in New Issue
Block a user