Move request dispatcher to a separate file

This commit is contained in:
Aleksey Kladov
2020-06-25 17:22:18 +02:00
parent 379a096de9
commit 22098127c4
3 changed files with 146 additions and 129 deletions

View File

@@ -6,26 +6,26 @@ use std::{
};
use crossbeam_channel::{never, select, Receiver};
use lsp_server::{Connection, ErrorCode, Notification, Request, RequestId, Response};
use lsp_server::{Connection, Notification, Request, RequestId, Response};
use lsp_types::{notification::Notification as _, request::Request as _, NumberOrString};
use ra_db::VfsPath;
use ra_ide::{Canceled, FileId};
use ra_prof::profile;
use ra_project_model::{PackageRoot, ProjectWorkspace};
use serde::{de::DeserializeOwned, Serialize};
use crate::{
config::{Config, FilesWatcher, LinkedProject},
diagnostics::DiagnosticTask,
dispatch::RequestDispatcher,
from_proto,
global_state::{file_id_to_url, GlobalState, GlobalStateSnapshot, Status},
global_state::{file_id_to_url, GlobalState, Status},
handlers, lsp_ext,
lsp_utils::{
apply_document_changes, is_canceled, notification_cast, notification_is, notification_new,
show_message,
},
request_metrics::RequestMetrics,
LspError, Result,
Result,
};
pub fn main_loop(config: Config, connection: Connection) -> Result<()> {
@@ -241,7 +241,7 @@ impl GlobalState {
fn on_request(&mut self, request_received: Instant, req: Request) -> Result<()> {
let mut pool_dispatcher =
PoolDispatcher { req: Some(req), global_state: self, request_received };
RequestDispatcher { req: Some(req), global_state: self, request_received };
pool_dispatcher
.on_sync::<lsp_ext::CollectGarbage>(|s, ()| Ok(s.collect_garbage()))?
.on_sync::<lsp_ext::JoinLines>(|s, p| handlers::handle_join_lines(s.snapshot(), p))?
@@ -426,7 +426,8 @@ impl GlobalState {
log::error!("unhandled notification: {:?}", not);
Ok(())
}
fn on_task(&mut self, task: Task) {
// TODO
pub(crate) fn on_task(&mut self, task: Task) {
match task {
Task::Respond(response) => {
if let Some((method, start)) = self.req_queue.incoming.complete(response.id.clone())
@@ -480,6 +481,7 @@ impl GlobalState {
}
}
// TODO
#[derive(Debug)]
pub(crate) enum Task {
Respond(Response),
@@ -645,126 +647,3 @@ fn report_progress(
});
global_state.send(notification.into());
}
struct PoolDispatcher<'a> {
req: Option<Request>,
global_state: &'a mut GlobalState,
request_received: Instant,
}
impl<'a> PoolDispatcher<'a> {
/// Dispatches the request onto the current thread
fn on_sync<R>(
&mut self,
f: fn(&mut GlobalState, R::Params) -> Result<R::Result>,
) -> Result<&mut Self>
where
R: lsp_types::request::Request + 'static,
R::Params: DeserializeOwned + panic::UnwindSafe + 'static,
R::Result: Serialize + 'static,
{
let (id, params) = match self.parse::<R>() {
Some(it) => it,
None => {
return Ok(self);
}
};
let world = panic::AssertUnwindSafe(&mut *self.global_state);
let task = panic::catch_unwind(move || {
let result = f(world.0, params);
result_to_task::<R>(id, result)
})
.map_err(|_| format!("sync task {:?} panicked", R::METHOD))?;
self.global_state.on_task(task);
Ok(self)
}
/// Dispatches the request onto thread pool
fn on<R>(
&mut self,
f: fn(GlobalStateSnapshot, R::Params) -> Result<R::Result>,
) -> Result<&mut Self>
where
R: lsp_types::request::Request + 'static,
R::Params: DeserializeOwned + Send + 'static,
R::Result: Serialize + 'static,
{
let (id, params) = match self.parse::<R>() {
Some(it) => it,
None => {
return Ok(self);
}
};
self.global_state.task_pool.0.spawn({
let world = self.global_state.snapshot();
move || {
let result = f(world, params);
result_to_task::<R>(id, result)
}
});
Ok(self)
}
fn parse<R>(&mut self) -> Option<(RequestId, R::Params)>
where
R: lsp_types::request::Request + 'static,
R::Params: DeserializeOwned + 'static,
{
let req = self.req.take()?;
let (id, params) = match req.extract::<R::Params>(R::METHOD) {
Ok(it) => it,
Err(req) => {
self.req = Some(req);
return None;
}
};
self.global_state
.req_queue
.incoming
.register(id.clone(), (R::METHOD, self.request_received));
Some((id, params))
}
fn finish(&mut self) {
match self.req.take() {
None => (),
Some(req) => {
log::error!("unknown request: {:?}", req);
let resp = Response::new_err(
req.id,
ErrorCode::MethodNotFound as i32,
"unknown request".to_string(),
);
self.global_state.send(resp.into());
}
}
}
}
fn result_to_task<R>(id: RequestId, result: Result<R::Result>) -> Task
where
R: lsp_types::request::Request + 'static,
R::Params: DeserializeOwned + 'static,
R::Result: Serialize + 'static,
{
let response = match result {
Ok(resp) => Response::new_ok(id, &resp),
Err(e) => match e.downcast::<LspError>() {
Ok(lsp_error) => Response::new_err(id, lsp_error.code, lsp_error.message),
Err(e) => {
if is_canceled(&*e) {
Response::new_err(
id,
ErrorCode::ContentModified as i32,
"content modified".to_string(),
)
} else {
Response::new_err(id, ErrorCode::InternalError as i32, e.to_string())
}
}
},
};
Task::Respond(response)
}