Merge branch 'master' into compute-lazy-assits

# Conflicts:
#	crates/rust-analyzer/src/main_loop/handlers.rs
#	crates/rust-analyzer/src/to_proto.rs
This commit is contained in:
Mikhail Rakhmanov
2020-06-03 19:26:01 +02:00
50 changed files with 1191 additions and 1829 deletions

View File

@@ -12,13 +12,11 @@ use std::{
fmt,
ops::Range,
panic,
path::PathBuf,
sync::Arc,
time::{Duration, Instant},
};
use crossbeam_channel::{never, select, unbounded, RecvError, Sender};
use itertools::Itertools;
use lsp_server::{Connection, ErrorCode, Message, Notification, Request, RequestId, Response};
use lsp_types::{
DidChangeTextDocumentParams, NumberOrString, TextDocumentContentChangeEvent, WorkDoneProgress,
@@ -36,14 +34,15 @@ use serde::{de::DeserializeOwned, Serialize};
use threadpool::ThreadPool;
use crate::{
config::{Config, FilesWatcher},
config::{Config, FilesWatcher, LinkedProject},
diagnostics::{to_proto::url_from_path_with_drive_lowercasing, DiagnosticTask},
from_proto, lsp_ext,
from_proto,
global_state::{GlobalState, GlobalStateSnapshot},
lsp_ext,
main_loop::{
pending_requests::{PendingRequest, PendingRequests},
subscriptions::Subscriptions,
},
world::{WorldSnapshot, WorldState},
Result,
};
@@ -69,7 +68,7 @@ impl fmt::Display for LspError {
impl Error for LspError {}
pub fn main_loop(ws_roots: Vec<PathBuf>, config: Config, connection: Connection) -> Result<()> {
pub fn main_loop(config: Config, connection: Connection) -> Result<()> {
log::info!("initial config: {:#?}", config);
// Windows scheduler implements priority boosts: if thread waits for an
@@ -92,43 +91,37 @@ pub fn main_loop(ws_roots: Vec<PathBuf>, config: Config, connection: Connection)
}
let mut loop_state = LoopState::default();
let mut world_state = {
let mut global_state = {
let workspaces = {
// FIXME: support dynamic workspace loading.
let project_roots: FxHashSet<_> = ws_roots
.iter()
.filter_map(|it| ra_project_model::ProjectRoot::discover(it).ok())
.flatten()
.collect();
if project_roots.is_empty() && config.notifications.cargo_toml_not_found {
if config.linked_projects.is_empty() && config.notifications.cargo_toml_not_found {
show_message(
lsp_types::MessageType::Error,
format!(
"rust-analyzer failed to discover workspace, no Cargo.toml found, dirs searched: {}",
ws_roots.iter().format_with(", ", |it, f| f(&it.display()))
),
"rust-analyzer failed to discover workspace".to_string(),
&connection.sender,
);
};
project_roots
.into_iter()
.filter_map(|root| {
ra_project_model::ProjectWorkspace::load(
root,
&config.cargo,
config.with_sysroot,
)
.map_err(|err| {
log::error!("failed to load workspace: {:#}", err);
show_message(
lsp_types::MessageType::Error,
format!("rust-analyzer failed to load workspace: {:#}", err),
&connection.sender,
);
})
.ok()
config
.linked_projects
.iter()
.filter_map(|project| match project {
LinkedProject::ProjectManifest(manifest) => {
ra_project_model::ProjectWorkspace::load(
manifest.clone(),
&config.cargo,
config.with_sysroot,
)
.map_err(|err| {
log::error!("failed to load workspace: {:#}", err);
show_message(
lsp_types::MessageType::Error,
format!("rust-analyzer failed to load workspace: {:#}", err),
&connection.sender,
);
})
.ok()
}
LinkedProject::JsonProject(it) => Some(it.clone().into()),
})
.collect::<Vec<_>>()
};
@@ -163,8 +156,7 @@ pub fn main_loop(ws_roots: Vec<PathBuf>, config: Config, connection: Connection)
connection.sender.send(request.into()).unwrap();
}
WorldState::new(
ws_roots,
GlobalState::new(
workspaces,
config.lru_capacity,
&globs,
@@ -173,7 +165,7 @@ pub fn main_loop(ws_roots: Vec<PathBuf>, config: Config, connection: Connection)
)
};
loop_state.roots_total = world_state.vfs.read().n_roots();
loop_state.roots_total = global_state.vfs.read().n_roots();
let pool = ThreadPool::default();
let (task_sender, task_receiver) = unbounded::<Task>();
@@ -191,12 +183,12 @@ pub fn main_loop(ws_roots: Vec<PathBuf>, config: Config, connection: Connection)
Err(RecvError) => return Err("client exited without shutdown".into()),
},
recv(task_receiver) -> task => Event::Task(task.unwrap()),
recv(world_state.task_receiver) -> task => match task {
recv(global_state.task_receiver) -> task => match task {
Ok(task) => Event::Vfs(task),
Err(RecvError) => return Err("vfs died".into()),
},
recv(libdata_receiver) -> data => Event::Lib(data.unwrap()),
recv(world_state.flycheck.as_ref().map_or(&never(), |it| &it.task_recv)) -> task => match task {
recv(global_state.flycheck.as_ref().map_or(&never(), |it| &it.task_recv)) -> task => match task {
Ok(task) => Event::CheckWatcher(task),
Err(RecvError) => return Err("check watcher died".into()),
}
@@ -211,16 +203,16 @@ pub fn main_loop(ws_roots: Vec<PathBuf>, config: Config, connection: Connection)
&task_sender,
&libdata_sender,
&connection,
&mut world_state,
&mut global_state,
&mut loop_state,
event,
)?;
}
}
world_state.analysis_host.request_cancellation();
global_state.analysis_host.request_cancellation();
log::info!("waiting for tasks to finish...");
task_receiver.into_iter().for_each(|task| {
on_task(task, &connection.sender, &mut loop_state.pending_requests, &mut world_state)
on_task(task, &connection.sender, &mut loop_state.pending_requests, &mut global_state)
});
libdata_receiver.into_iter().for_each(drop);
log::info!("...tasks have finished");
@@ -229,7 +221,7 @@ pub fn main_loop(ws_roots: Vec<PathBuf>, config: Config, connection: Connection)
drop(pool);
log::info!("...threadpool has finished");
let vfs = Arc::try_unwrap(world_state.vfs).expect("all snapshots should be dead");
let vfs = Arc::try_unwrap(global_state.vfs).expect("all snapshots should be dead");
drop(vfs);
Ok(())
@@ -320,7 +312,7 @@ fn loop_turn(
task_sender: &Sender<Task>,
libdata_sender: &Sender<LibraryData>,
connection: &Connection,
world_state: &mut WorldState,
global_state: &mut GlobalState,
loop_state: &mut LoopState,
event: Event,
) -> Result<()> {
@@ -336,22 +328,22 @@ fn loop_turn(
match event {
Event::Task(task) => {
on_task(task, &connection.sender, &mut loop_state.pending_requests, world_state);
world_state.maybe_collect_garbage();
on_task(task, &connection.sender, &mut loop_state.pending_requests, global_state);
global_state.maybe_collect_garbage();
}
Event::Vfs(task) => {
world_state.vfs.write().handle_task(task);
global_state.vfs.write().handle_task(task);
}
Event::Lib(lib) => {
world_state.add_lib(lib);
world_state.maybe_collect_garbage();
global_state.add_lib(lib);
global_state.maybe_collect_garbage();
loop_state.in_flight_libraries -= 1;
loop_state.roots_scanned += 1;
}
Event::CheckWatcher(task) => on_check_task(task, world_state, task_sender)?,
Event::CheckWatcher(task) => on_check_task(task, global_state, task_sender)?,
Event::Msg(msg) => match msg {
Message::Request(req) => on_request(
world_state,
global_state,
&mut loop_state.pending_requests,
pool,
task_sender,
@@ -360,7 +352,7 @@ fn loop_turn(
req,
)?,
Message::Notification(not) => {
on_notification(&connection.sender, world_state, loop_state, not)?;
on_notification(&connection.sender, global_state, loop_state, not)?;
}
Message::Response(resp) => {
let removed = loop_state.pending_responses.remove(&resp.id);
@@ -379,9 +371,9 @@ fn loop_turn(
}
(None, Some(configs)) => {
if let Some(new_config) = configs.get(0) {
let mut config = world_state.config.clone();
let mut config = global_state.config.clone();
config.update(&new_config);
world_state.update_configuration(config);
global_state.update_configuration(config);
}
}
(None, None) => {
@@ -394,7 +386,7 @@ fn loop_turn(
};
let mut state_changed = false;
if let Some(changes) = world_state.process_changes(&mut loop_state.roots_scanned) {
if let Some(changes) = global_state.process_changes(&mut loop_state.roots_scanned) {
state_changed = true;
loop_state.pending_libraries.extend(changes);
}
@@ -416,7 +408,7 @@ fn loop_turn(
}
let show_progress =
!loop_state.workspace_loaded && world_state.config.client_caps.work_done_progress;
!loop_state.workspace_loaded && global_state.config.client_caps.work_done_progress;
if !loop_state.workspace_loaded
&& loop_state.roots_scanned == loop_state.roots_total
@@ -425,7 +417,7 @@ fn loop_turn(
{
state_changed = true;
loop_state.workspace_loaded = true;
if let Some(flycheck) = &world_state.flycheck {
if let Some(flycheck) = &global_state.flycheck {
flycheck.update();
}
}
@@ -437,13 +429,13 @@ fn loop_turn(
if state_changed && loop_state.workspace_loaded {
update_file_notifications_on_threadpool(
pool,
world_state.snapshot(),
global_state.snapshot(),
task_sender.clone(),
loop_state.subscriptions.subscriptions(),
);
pool.execute({
let subs = loop_state.subscriptions.subscriptions();
let snap = world_state.snapshot();
let snap = global_state.snapshot();
move || snap.analysis().prime_caches(subs).unwrap_or_else(|_: Canceled| ())
});
}
@@ -467,7 +459,7 @@ fn on_task(
task: Task,
msg_sender: &Sender<Message>,
pending_requests: &mut PendingRequests,
state: &mut WorldState,
state: &mut GlobalState,
) {
match task {
Task::Respond(response) => {
@@ -485,7 +477,7 @@ fn on_task(
}
fn on_request(
world: &mut WorldState,
global_state: &mut GlobalState,
pending_requests: &mut PendingRequests,
pool: &ThreadPool,
task_sender: &Sender<Task>,
@@ -496,7 +488,7 @@ fn on_request(
let mut pool_dispatcher = PoolDispatcher {
req: Some(req),
pool,
world,
global_state,
task_sender,
msg_sender,
pending_requests,
@@ -553,7 +545,7 @@ fn on_request(
fn on_notification(
msg_sender: &Sender<Message>,
state: &mut WorldState,
state: &mut GlobalState,
loop_state: &mut LoopState,
not: Notification,
) -> Result<()> {
@@ -727,7 +719,7 @@ fn apply_document_changes(
fn on_check_task(
task: CheckTask,
world_state: &mut WorldState,
global_state: &mut GlobalState,
task_sender: &Sender<Task>,
) -> Result<()> {
match task {
@@ -746,7 +738,7 @@ fn on_check_task(
.uri
.to_file_path()
.map_err(|()| format!("invalid uri: {}", diag.location.uri))?;
let file_id = match world_state.vfs.read().path2file(&path) {
let file_id = match global_state.vfs.read().path2file(&path) {
Some(file) => FileId(file.0),
None => {
log::error!(
@@ -766,7 +758,7 @@ fn on_check_task(
}
CheckTask::Status(status) => {
if world_state.config.client_caps.work_done_progress {
if global_state.config.client_caps.work_done_progress {
let progress = match status {
Status::Being => {
lsp_types::WorkDoneProgress::Begin(lsp_types::WorkDoneProgressBegin {
@@ -805,7 +797,7 @@ fn on_check_task(
Ok(())
}
fn on_diagnostic_task(task: DiagnosticTask, msg_sender: &Sender<Message>, state: &mut WorldState) {
fn on_diagnostic_task(task: DiagnosticTask, msg_sender: &Sender<Message>, state: &mut GlobalState) {
let subscriptions = state.diagnostics.handle_task(task);
for file_id in subscriptions {
@@ -880,7 +872,7 @@ fn send_startup_progress(sender: &Sender<Message>, loop_state: &mut LoopState) {
struct PoolDispatcher<'a> {
req: Option<Request>,
pool: &'a ThreadPool,
world: &'a mut WorldState,
global_state: &'a mut GlobalState,
pending_requests: &'a mut PendingRequests,
msg_sender: &'a Sender<Message>,
task_sender: &'a Sender<Task>,
@@ -891,7 +883,7 @@ impl<'a> PoolDispatcher<'a> {
/// Dispatches the request onto the current thread
fn on_sync<R>(
&mut self,
f: fn(&mut WorldState, R::Params) -> Result<R::Result>,
f: fn(&mut GlobalState, R::Params) -> Result<R::Result>,
) -> Result<&mut Self>
where
R: lsp_types::request::Request + 'static,
@@ -904,18 +896,21 @@ impl<'a> PoolDispatcher<'a> {
return Ok(self);
}
};
let world = panic::AssertUnwindSafe(&mut *self.world);
let world = panic::AssertUnwindSafe(&mut *self.global_state);
let task = panic::catch_unwind(move || {
let result = f(world.0, params);
result_to_task::<R>(id, result)
})
.map_err(|_| format!("sync task {:?} panicked", R::METHOD))?;
on_task(task, self.msg_sender, self.pending_requests, self.world);
on_task(task, self.msg_sender, self.pending_requests, self.global_state);
Ok(self)
}
/// Dispatches the request onto thread pool
fn on<R>(&mut self, f: fn(WorldSnapshot, R::Params) -> Result<R::Result>) -> Result<&mut Self>
fn on<R>(
&mut self,
f: fn(GlobalStateSnapshot, R::Params) -> Result<R::Result>,
) -> Result<&mut Self>
where
R: lsp_types::request::Request + 'static,
R::Params: DeserializeOwned + Send + 'static,
@@ -929,7 +924,7 @@ impl<'a> PoolDispatcher<'a> {
};
self.pool.execute({
let world = self.world.snapshot();
let world = self.global_state.snapshot();
let sender = self.task_sender.clone();
move || {
let result = f(world, params);
@@ -1013,7 +1008,7 @@ where
fn update_file_notifications_on_threadpool(
pool: &ThreadPool,
world: WorldSnapshot,
world: GlobalStateSnapshot,
task_sender: Sender<Task>,
subscriptions: Vec<FileId>,
) {