Tokio migration.

This commit is contained in:
Blaž Hrastnik 2021-05-06 13:56:34 +09:00
parent 0e5308bce1
commit 355ad3cb82
15 changed files with 169 additions and 378 deletions

308
Cargo.lock generated
View file

@ -17,112 +17,6 @@ version = "1.0.40"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28b2cd92db5cbd74e8e5028f7e27dd7aa3090e89e4f2a197cc7c8dfb69c7063b" checksum = "28b2cd92db5cbd74e8e5028f7e27dd7aa3090e89e4f2a197cc7c8dfb69c7063b"
[[package]]
name = "async-channel"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2114d64672151c0c5eaa5e131ec84a74f06e1e559830dabba01ca30605d66319"
dependencies = [
"concurrent-queue",
"event-listener",
"futures-core",
]
[[package]]
name = "async-executor"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "871f9bb5e0a22eeb7e8cf16641feb87c9dc67032ccf8ff49e772eb9941d3a965"
dependencies = [
"async-task",
"concurrent-queue",
"fastrand",
"futures-lite",
"once_cell",
"slab",
]
[[package]]
name = "async-fs"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b3ca4f8ff117c37c278a2f7415ce9be55560b846b5bc4412aaa5d29c1c3dae2"
dependencies = [
"async-lock",
"blocking",
"futures-lite",
]
[[package]]
name = "async-io"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bbfd5cf2794b1e908ea8457e6c45f8f8f1f6ec5f74617bf4662623f47503c3b"
dependencies = [
"concurrent-queue",
"fastrand",
"futures-lite",
"libc",
"log",
"once_cell",
"parking",
"polling",
"slab",
"socket2",
"waker-fn",
"winapi",
]
[[package]]
name = "async-lock"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6a8ea61bf9947a1007c5cada31e647dbc77b103c679858150003ba697ea798b"
dependencies = [
"event-listener",
]
[[package]]
name = "async-net"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69b0a74e7f70af3c8cf1aa539edbd044795706659ac52b78a71dc1a205ecefdf"
dependencies = [
"async-io",
"blocking",
"fastrand",
"futures-lite",
]
[[package]]
name = "async-process"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8f38756dd9ac84671c428afbf7c9f7495feff9ec5b0710f17100098e5b354ac"
dependencies = [
"async-io",
"blocking",
"cfg-if",
"event-listener",
"futures-lite",
"libc",
"once_cell",
"signal-hook 0.3.8",
"winapi",
]
[[package]]
name = "async-task"
version = "4.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0"
[[package]]
name = "atomic-waker"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "065374052e7df7ee4047b1160cca5e1467a12351a40b3da123c870ba0b8eda2a"
[[package]] [[package]]
name = "autocfg" name = "autocfg"
version = "1.0.1" version = "1.0.1"
@ -135,20 +29,6 @@ version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
[[package]]
name = "blocking"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c5e170dbede1f740736619b776d7251cb1b9095c435c34d8ca9f57fcd2f335e9"
dependencies = [
"async-channel",
"async-task",
"atomic-waker",
"fastrand",
"futures-lite",
"once_cell",
]
[[package]] [[package]]
name = "bstr" name = "bstr"
version = "0.2.16" version = "0.2.16"
@ -159,10 +39,10 @@ dependencies = [
] ]
[[package]] [[package]]
name = "cache-padded" name = "bytes"
version = "1.1.1" version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "631ae5198c9be5e753e5cc215e1bd73c2b466a3565173db433f52bb9d3e66dba" checksum = "b700ce4376041dcd0a327fd0097c41095743c4c8af8887265942faf1100bd040"
[[package]] [[package]]
name = "cassowary" name = "cassowary"
@ -212,15 +92,6 @@ dependencies = [
"vec_map", "vec_map",
] ]
[[package]]
name = "concurrent-queue"
version = "1.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30ed07550be01594c6026cff2a1d7fe9c8f683caa798e12b68694ac9e88286a3"
dependencies = [
"cache-padded",
]
[[package]] [[package]]
name = "crossbeam-utils" name = "crossbeam-utils"
version = "0.8.4" version = "0.8.4"
@ -245,7 +116,7 @@ dependencies = [
"libc", "libc",
"mio", "mio",
"parking_lot", "parking_lot",
"signal-hook 0.1.17", "signal-hook",
"winapi", "winapi",
] ]
@ -279,21 +150,6 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "event-listener"
version = "2.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7531096570974c3a9dcf9e4b8e1cede1ec26cf5046219fb3b9d897503b9be59"
[[package]]
name = "fastrand"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77b705829d1e87f762c2df6da140b26af5839e1033aa84aa5f56bb688e4e1bdb"
dependencies = [
"instant",
]
[[package]] [[package]]
name = "fern" name = "fern"
version = "0.6.0" version = "0.6.0"
@ -377,21 +233,6 @@ version = "0.3.14"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "365a1a1fb30ea1c03a830fdb2158f5236833ac81fa0ad12fe35b29cddc35cb04" checksum = "365a1a1fb30ea1c03a830fdb2158f5236833ac81fa0ad12fe35b29cddc35cb04"
[[package]]
name = "futures-lite"
version = "1.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4481d0cd0de1d204a4fa55e7d45f07b1d958abcb06714b3446438e2eff695fb"
dependencies = [
"fastrand",
"futures-core",
"futures-io",
"memchr",
"parking",
"pin-project-lite 0.2.6",
"waker-fn",
]
[[package]] [[package]]
name = "futures-macro" name = "futures-macro"
version = "0.3.14" version = "0.3.14"
@ -429,7 +270,7 @@ dependencies = [
"futures-sink", "futures-sink",
"futures-task", "futures-task",
"memchr", "memchr",
"pin-project-lite 0.2.6", "pin-project-lite",
"pin-utils", "pin-utils",
"proc-macro-hack", "proc-macro-hack",
"proc-macro-nested", "proc-macro-nested",
@ -515,9 +356,9 @@ dependencies = [
"serde", "serde",
"serde_json", "serde_json",
"shellexpand", "shellexpand",
"smol",
"smol-timeout",
"thiserror", "thiserror",
"tokio",
"tokio-stream",
"url", "url",
] ]
@ -553,8 +394,7 @@ dependencies = [
"pulldown-cmark", "pulldown-cmark",
"serde", "serde",
"serde_json", "serde_json",
"smol", "tokio",
"smol-timeout",
"toml", "toml",
"tui", "tui",
] ]
@ -571,7 +411,7 @@ dependencies = [
"once_cell", "once_cell",
"serde", "serde",
"slotmap", "slotmap",
"smol", "tokio",
"toml", "toml",
"tui", "tui",
"url", "url",
@ -801,12 +641,6 @@ version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "afb2e1c3ee07430c2cf76151675e583e0f19985fa6efae47d6848a3e2c824f85" checksum = "afb2e1c3ee07430c2cf76151675e583e0f19985fa6efae47d6848a3e2c824f85"
[[package]]
name = "parking"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72"
[[package]] [[package]]
name = "parking_lot" name = "parking_lot"
version = "0.11.1" version = "0.11.1"
@ -844,12 +678,6 @@ version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
[[package]]
name = "pin-project-lite"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "257b64915a082f7811703966789728173279bdebb956b143dbcd23f6f970a777"
[[package]] [[package]]
name = "pin-project-lite" name = "pin-project-lite"
version = "0.2.6" version = "0.2.6"
@ -862,19 +690,6 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "polling"
version = "2.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fc12d774e799ee9ebae13f4076ca003b40d18a11ac0f3641e6f899618580b7b"
dependencies = [
"cfg-if",
"libc",
"log",
"wepoll-sys",
"winapi",
]
[[package]] [[package]]
name = "proc-macro-hack" name = "proc-macro-hack"
version = "0.5.19" version = "0.5.19"
@ -1044,16 +859,6 @@ dependencies = [
"signal-hook-registry", "signal-hook-registry",
] ]
[[package]]
name = "signal-hook"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef33d6d0cd06e0840fba9985aab098c147e67e05cee14d412d3345ed14ff30ac"
dependencies = [
"libc",
"signal-hook-registry",
]
[[package]] [[package]]
name = "signal-hook-registry" name = "signal-hook-registry"
version = "1.3.0" version = "1.3.0"
@ -1084,44 +889,6 @@ version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e" checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e"
[[package]]
name = "smol"
version = "1.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85cf3b5351f3e783c1d79ab5fc604eeed8b8ae9abd36b166e8b87a089efd85e4"
dependencies = [
"async-channel",
"async-executor",
"async-fs",
"async-io",
"async-lock",
"async-net",
"async-process",
"blocking",
"futures-lite",
"once_cell",
]
[[package]]
name = "smol-timeout"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "847d777e2c6c166bad26264479e80a9820f3d364fcb4a0e23cd57bbfa8e94961"
dependencies = [
"async-io",
"pin-project-lite 0.1.12",
]
[[package]]
name = "socket2"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e3dfc207c526015c632472a77be09cf1b6e46866581aecae5cc38fb4235dea2"
dependencies = [
"libc",
"winapi",
]
[[package]] [[package]]
name = "syn" name = "syn"
version = "1.0.71" version = "1.0.71"
@ -1206,6 +973,48 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]]
name = "tokio"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83f0c8e7c0addab50b663055baf787d0af7f413a46e6e7fb9559a4e4db7137a5"
dependencies = [
"autocfg",
"bytes",
"libc",
"memchr",
"mio",
"num_cpus",
"once_cell",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"tokio-macros",
"winapi",
]
[[package]]
name = "tokio-macros"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "caf7b11a536f46a809a8a9f0bb4237020f70ecbf115b842360afb127ea2fda57"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tokio-stream"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e177a5d8c3bf36de9ebe6d58537d8879e964332f93fb3339e43f618c81361af0"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
]
[[package]] [[package]]
name = "toml" name = "toml"
version = "0.5.8" version = "0.5.8"
@ -1314,12 +1123,6 @@ version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe" checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe"
[[package]]
name = "waker-fn"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca"
[[package]] [[package]]
name = "walkdir" name = "walkdir"
version = "2.3.2" version = "2.3.2"
@ -1337,15 +1140,6 @@ version = "0.10.2+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6"
[[package]]
name = "wepoll-sys"
version = "3.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fcb14dea929042224824779fbc82d9fab8d2e6d3cbc0ac404de8edf489e77ff"
dependencies = [
"cc",
]
[[package]] [[package]]
name = "winapi" name = "winapi"
version = "0.3.9" version = "0.3.9"

View file

@ -12,8 +12,8 @@ helix-core = { path = "../helix-core" }
once_cell = "1.4" once_cell = "1.4"
lsp-types = { version = "0.89", features = ["proposed"] } lsp-types = { version = "0.89", features = ["proposed"] }
smol = "1.2" tokio = { version = "1", features = ["full"] }
smol-timeout = "0.6" tokio-stream = "0.1.5"
url = "2" url = "2"
pathdiff = "0.2" pathdiff = "0.2"
shellexpand = "2.0" shellexpand = "2.0"

View file

@ -13,18 +13,18 @@ use jsonrpc_core as jsonrpc;
use lsp_types as lsp; use lsp_types as lsp;
use serde_json::Value; use serde_json::Value;
use smol::{ use std::process::Stdio;
channel::{Receiver, Sender}, use tokio::{
io::{BufReader, BufWriter}, io::{BufReader, BufWriter},
// prelude::*, // prelude::*,
process::{Child, Command, Stdio}, process::{Child, Command},
Executor, sync::mpsc::{channel, UnboundedReceiver, UnboundedSender},
}; };
pub struct Client { pub struct Client {
_process: Child, _process: Child,
outgoing: Sender<Payload>, outgoing: UnboundedSender<Payload>,
// pub incoming: Receiver<Call>, // pub incoming: Receiver<Call>,
pub request_counter: AtomicU64, pub request_counter: AtomicU64,
@ -33,13 +33,14 @@ pub struct Client {
} }
impl Client { impl Client {
pub fn start(ex: &Executor, cmd: &str, args: &[String]) -> Result<(Self, Receiver<Call>)> { pub fn start(cmd: &str, args: &[String]) -> Result<(Self, UnboundedReceiver<Call>)> {
// smol makes sure the process is reaped on drop, but using kill_on_drop(true) maybe?
let process = Command::new(cmd) let process = Command::new(cmd)
.args(args) .args(args)
.stdin(Stdio::piped()) .stdin(Stdio::piped())
.stdout(Stdio::piped()) .stdout(Stdio::piped())
.stderr(Stdio::piped()) .stderr(Stdio::piped())
// make sure the process is reaped on drop
.kill_on_drop(true)
.spawn(); .spawn();
// use std::io::ErrorKind; // use std::io::ErrorKind;
@ -58,7 +59,7 @@ impl Client {
let reader = BufReader::new(process.stdout.take().expect("Failed to open stdout")); let reader = BufReader::new(process.stdout.take().expect("Failed to open stdout"));
let stderr = BufReader::new(process.stderr.take().expect("Failed to open stderr")); let stderr = BufReader::new(process.stderr.take().expect("Failed to open stderr"));
let (incoming, outgoing) = Transport::start(ex, reader, writer, stderr); let (incoming, outgoing) = Transport::start(reader, writer, stderr);
let client = Client { let client = Client {
_process: process, _process: process,
@ -134,49 +135,53 @@ impl Client {
params: Self::value_into_params(params), params: Self::value_into_params(params),
}; };
let (tx, rx) = smol::channel::bounded::<Result<Value>>(1); let (tx, mut rx) = channel::<Result<Value>>(1);
self.outgoing self.outgoing
.send(Payload::Request { .send(Payload::Request {
chan: tx, chan: tx,
value: request, value: request,
}) })
.await
.map_err(|e| Error::Other(e.into()))?; .map_err(|e| Error::Other(e.into()))?;
use smol_timeout::TimeoutExt;
use std::time::Duration; use std::time::Duration;
use tokio::time::timeout;
let future = async move { let future = async move {
rx.recv() timeout(Duration::from_secs(2), rx.recv())
.timeout(Duration::from_secs(2))
.await .await
.ok_or(Error::Timeout)? // return Timeout .map_err(|e| Error::Timeout)? // return Timeout
.map_err(|e| Error::Other(e.into()))? .unwrap() // TODO: None if channel closed
}; };
Ok(future) Ok(future)
} }
/// Send a RPC notification to the language server. /// Send a RPC notification to the language server.
pub async fn notify<R: lsp::notification::Notification>(&self, params: R::Params) -> Result<()> pub fn notify<R: lsp::notification::Notification>(
&self,
params: R::Params,
) -> impl Future<Output = Result<()>>
where where
R::Params: serde::Serialize, R::Params: serde::Serialize,
{ {
let params = serde_json::to_value(params)?; let outgoing = self.outgoing.clone();
let notification = jsonrpc::Notification { async move {
jsonrpc: Some(jsonrpc::Version::V2), let params = serde_json::to_value(params)?;
method: R::METHOD.to_string(),
params: Self::value_into_params(params),
};
self.outgoing let notification = jsonrpc::Notification {
.send(Payload::Notification(notification)) jsonrpc: Some(jsonrpc::Version::V2),
.await method: R::METHOD.to_string(),
.map_err(|e| Error::Other(e.into()))?; params: Self::value_into_params(params),
};
Ok(()) outgoing
.send(Payload::Notification(notification))
.map_err(|e| Error::Other(e.into()))?;
Ok(())
}
} }
/// Reply to a language server RPC call. /// Reply to a language server RPC call.
@ -202,7 +207,6 @@ impl Client {
self.outgoing self.outgoing
.send(Payload::Response(output)) .send(Payload::Response(output))
.await
.map_err(|e| Error::Other(e.into()))?; .map_err(|e| Error::Other(e.into()))?;
Ok(()) Ok(())
@ -387,13 +391,13 @@ impl Client {
changes changes
} }
pub async fn text_document_did_change( pub fn text_document_did_change(
&self, &self,
text_document: lsp::VersionedTextDocumentIdentifier, text_document: lsp::VersionedTextDocumentIdentifier,
old_text: &Rope, old_text: &Rope,
new_text: &Rope, new_text: &Rope,
changes: &ChangeSet, changes: &ChangeSet,
) -> Result<()> { ) -> Option<impl Future<Output = Result<()>>> {
// figure out what kind of sync the server supports // figure out what kind of sync the server supports
let capabilities = self.capabilities.as_ref().unwrap(); let capabilities = self.capabilities.as_ref().unwrap();
@ -405,7 +409,7 @@ impl Client {
.. ..
})) => kind, })) => kind,
// None | SyncOptions { changes: None } // None | SyncOptions { changes: None }
_ => return Ok(()), _ => return None,
}; };
let changes = match sync_capabilities { let changes = match sync_capabilities {
@ -420,14 +424,15 @@ impl Client {
lsp::TextDocumentSyncKind::Incremental => { lsp::TextDocumentSyncKind::Incremental => {
Self::changeset_to_changes(old_text, new_text, changes, self.offset_encoding) Self::changeset_to_changes(old_text, new_text, changes, self.offset_encoding)
} }
lsp::TextDocumentSyncKind::None => return Ok(()), lsp::TextDocumentSyncKind::None => return None,
}; };
self.notify::<lsp::notification::DidChangeTextDocument>(lsp::DidChangeTextDocumentParams { Some(self.notify::<lsp::notification::DidChangeTextDocument>(
text_document, lsp::DidChangeTextDocumentParams {
content_changes: changes, text_document,
}) content_changes: changes,
.await },
))
} }
pub async fn text_document_did_close( pub async fn text_document_did_close(

View file

@ -18,6 +18,8 @@ use std::{collections::HashMap, sync::Arc};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio_stream::wrappers::UnboundedReceiverStream;
#[derive(Error, Debug)] #[derive(Error, Debug)]
pub enum Error { pub enum Error {
#[error("protocol error: {0}")] #[error("protocol error: {0}")]
@ -163,12 +165,11 @@ pub use jsonrpc::Call;
type LanguageId = String; type LanguageId = String;
use crate::select_all::SelectAll; use crate::select_all::SelectAll;
use smol::channel::Receiver;
pub struct Registry { pub struct Registry {
inner: HashMap<LanguageId, Option<Arc<Client>>>, inner: HashMap<LanguageId, Option<Arc<Client>>>,
pub incoming: SelectAll<Receiver<Call>>, pub incoming: SelectAll<UnboundedReceiverStream<Call>>,
} }
impl Default for Registry { impl Default for Registry {
@ -185,11 +186,7 @@ impl Registry {
} }
} }
pub fn get( pub fn get(&mut self, language_config: &LanguageConfiguration) -> Option<Arc<Client>> {
&mut self,
language_config: &LanguageConfiguration,
ex: &smol::Executor,
) -> Option<Arc<Client>> {
// TODO: propagate the error // TODO: propagate the error
if let Some(config) = &language_config.language_server { if let Some(config) = &language_config.language_server {
// avoid borrow issues // avoid borrow issues
@ -203,12 +200,13 @@ impl Registry {
// initialize a new client // initialize a new client
let (mut client, incoming) = let (mut client, incoming) =
Client::start(&ex, &config.command, &config.args).ok()?; Client::start(&config.command, &config.args).ok()?;
// TODO: run this async without blocking // TODO: run this async without blocking
smol::block_on(client.initialize()).unwrap(); let rt = tokio::runtime::Handle::current();
rt.block_on(client.initialize()).unwrap();
s_incoming.push(incoming); s_incoming.push(UnboundedReceiverStream::new(incoming));
Some(Arc::new(client)) Some(Arc::new(client))
}) })

View file

@ -6,10 +6,10 @@ use core::{
pin::Pin, pin::Pin,
}; };
use smol::{ready, stream::Stream};
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use futures_util::stream::{FusedStream, FuturesUnordered, StreamExt, StreamFuture}; use futures_util::stream::{FusedStream, FuturesUnordered, StreamExt, StreamFuture};
use futures_util::{ready, stream::Stream};
/// An unbounded set of streams /// An unbounded set of streams
/// ///

View file

@ -10,15 +10,13 @@ type Result<T> = core::result::Result<T, Error>;
use jsonrpc_core as jsonrpc; use jsonrpc_core as jsonrpc;
use serde_json::Value; use serde_json::Value;
use smol::prelude::*; use tokio::{
io::{AsyncBufRead, AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter},
use smol::{
channel::{Receiver, Sender},
io::{BufReader, BufWriter},
process::{ChildStderr, ChildStdin, ChildStdout}, process::{ChildStderr, ChildStdin, ChildStdout},
Executor, sync::mpsc::{unbounded_channel, Sender, UnboundedReceiver, UnboundedSender},
}; };
#[derive(Debug)]
pub enum Payload { pub enum Payload {
Request { Request {
chan: Sender<Result<Value>>, chan: Sender<Result<Value>>,
@ -41,8 +39,8 @@ enum Message {
} }
pub struct Transport { pub struct Transport {
incoming: Sender<jsonrpc::Call>, incoming: UnboundedSender<jsonrpc::Call>,
outgoing: Receiver<Payload>, outgoing: UnboundedReceiver<Payload>,
pending_requests: HashMap<jsonrpc::Id, Sender<Result<Value>>>, pending_requests: HashMap<jsonrpc::Id, Sender<Result<Value>>>,
headers: HashMap<String, String>, headers: HashMap<String, String>,
@ -54,13 +52,12 @@ pub struct Transport {
impl Transport { impl Transport {
pub fn start( pub fn start(
ex: &Executor,
reader: BufReader<ChildStdout>, reader: BufReader<ChildStdout>,
writer: BufWriter<ChildStdin>, writer: BufWriter<ChildStdin>,
stderr: BufReader<ChildStderr>, stderr: BufReader<ChildStderr>,
) -> (Receiver<jsonrpc::Call>, Sender<Payload>) { ) -> (UnboundedReceiver<jsonrpc::Call>, UnboundedSender<Payload>) {
let (incoming, rx) = smol::channel::unbounded(); let (incoming, rx) = unbounded_channel();
let (tx, outgoing) = smol::channel::unbounded(); let (tx, outgoing) = unbounded_channel();
let transport = Self { let transport = Self {
reader, reader,
@ -72,7 +69,7 @@ impl Transport {
headers: HashMap::default(), headers: HashMap::default(),
}; };
ex.spawn(transport.duplex()).detach(); tokio::spawn(transport.duplex());
(rx, tx) (rx, tx)
} }
@ -168,7 +165,7 @@ impl Transport {
match msg { match msg {
Message::Output(output) => self.recv_response(output).await?, Message::Output(output) => self.recv_response(output).await?,
Message::Call(call) => { Message::Call(call) => {
self.incoming.send(call).await?; self.incoming.send(call).unwrap();
// let notification = Notification::parse(&method, params); // let notification = Notification::parse(&method, params);
} }
}; };
@ -204,11 +201,10 @@ impl Transport {
} }
pub async fn duplex(mut self) { pub async fn duplex(mut self) {
use futures_util::{select, FutureExt};
loop { loop {
select! { tokio::select! {
// client -> server // client -> server
msg = self.outgoing.next().fuse() => { msg = self.outgoing.recv() => {
if msg.is_none() { if msg.is_none() {
break; break;
} }
@ -217,7 +213,7 @@ impl Transport {
self.send_payload(msg).await.unwrap(); self.send_payload(msg).await.unwrap();
} }
// server <- client // server <- client
msg = Self::recv(&mut self.reader, &mut self.headers).fuse() => { msg = Self::recv(&mut self.reader, &mut self.headers) => {
if msg.is_err() { if msg.is_err() {
error!("err: <- {:?}", msg); error!("err: <- {:?}", msg);
break; break;
@ -226,7 +222,7 @@ impl Transport {
self.recv_msg(msg).await.unwrap(); self.recv_msg(msg).await.unwrap();
} }
_msg = Self::err(&mut self.stderr).fuse() => {} _msg = Self::err(&mut self.stderr) => {}
} }
} }
} }

View file

@ -19,8 +19,7 @@ helix-lsp = { path = "../helix-lsp"}
anyhow = "1" anyhow = "1"
once_cell = "1.4" once_cell = "1.4"
smol = "1" tokio = { version = "1", features = ["full"] }
smol-timeout = "0.6"
num_cpus = "1" num_cpus = "1"
tui = { version = "0.15", default-features = false, features = ["crossterm"] } tui = { version = "0.15", default-features = false, features = ["crossterm"] }
crossterm = { version = "0.19", features = ["event-stream"] } crossterm = { version = "0.19", features = ["event-stream"] }

View file

@ -7,14 +7,13 @@ use crate::{compositor::Compositor, ui};
use log::{error, info}; use log::{error, info};
use std::{ use std::{
future::Future,
io::{self, stdout, Stdout, Write}, io::{self, stdout, Stdout, Write},
path::PathBuf, path::PathBuf,
sync::Arc, sync::Arc,
time::Duration, time::Duration,
}; };
use smol::prelude::*;
use anyhow::Error; use anyhow::Error;
use crossterm::{ use crossterm::{
@ -39,16 +38,15 @@ pub struct Application {
compositor: Compositor, compositor: Compositor,
editor: Editor, editor: Editor,
executor: &'static smol::Executor<'static>,
callbacks: LspCallbacks, callbacks: LspCallbacks,
} }
impl Application { impl Application {
pub fn new(mut args: Args, executor: &'static smol::Executor<'static>) -> Result<Self, Error> { pub fn new(mut args: Args) -> Result<Self, Error> {
use helix_view::editor::Action; use helix_view::editor::Action;
let mut compositor = Compositor::new()?; let mut compositor = Compositor::new()?;
let size = compositor.size(); let size = compositor.size();
let mut editor = Editor::new(executor, size); let mut editor = Editor::new(size);
if let Ok(files) = args.values_of_t::<PathBuf>("files") { if let Ok(files) = args.values_of_t::<PathBuf>("files") {
for file in files { for file in files {
@ -64,7 +62,6 @@ impl Application {
compositor, compositor,
editor, editor,
executor,
callbacks: FuturesUnordered::new(), callbacks: FuturesUnordered::new(),
}; };
@ -72,14 +69,12 @@ impl Application {
} }
fn render(&mut self) { fn render(&mut self) {
let executor = &self.executor;
let editor = &mut self.editor; let editor = &mut self.editor;
let compositor = &mut self.compositor; let compositor = &mut self.compositor;
let callbacks = &mut self.callbacks; let callbacks = &mut self.callbacks;
let mut cx = crate::compositor::Context { let mut cx = crate::compositor::Context {
editor, editor,
executor,
callbacks, callbacks,
scroll: None, scroll: None,
}; };
@ -97,7 +92,7 @@ impl Application {
break; break;
} }
use futures_util::{select, FutureExt}; use futures_util::{select, FutureExt, StreamExt};
select! { select! {
event = reader.next().fuse() => { event = reader.next().fuse() => {
self.handle_terminal_events(event) self.handle_terminal_events(event)
@ -125,7 +120,6 @@ impl Application {
pub fn handle_terminal_events(&mut self, event: Option<Result<Event, crossterm::ErrorKind>>) { pub fn handle_terminal_events(&mut self, event: Option<Result<Event, crossterm::ErrorKind>>) {
let mut cx = crate::compositor::Context { let mut cx = crate::compositor::Context {
editor: &mut self.editor, editor: &mut self.editor,
executor: self.executor,
callbacks: &mut self.callbacks, callbacks: &mut self.callbacks,
scroll: None, scroll: None,
}; };

View file

@ -108,6 +108,16 @@ impl<'a> Context<'a> {
/// state (usually by creating and applying a transaction). /// state (usually by creating and applying a transaction).
pub type Command = fn(cx: &mut Context); pub type Command = fn(cx: &mut Context);
#[inline]
fn block_on<T>(future: impl Future<Output = T>) -> T {
use tokio::runtime::Runtime;
// let rt = Runtime::new().unwrap();
let rt = tokio::runtime::Handle::current();
// let local = LocalSet::new();
// local.block_on(&rt, future)
rt.block_on(future)
}
pub fn move_char_left(cx: &mut Context) { pub fn move_char_left(cx: &mut Context) {
let count = cx.count; let count = cx.count;
let (view, doc) = cx.current(); let (view, doc) = cx.current();
@ -861,7 +871,6 @@ pub fn command_mode(cx: &mut Context) {
match *parts.as_slice() { match *parts.as_slice() {
["q"] | ["quit"] => { ["q"] | ["quit"] => {
editor.close(editor.view().id); editor.close(editor.view().id);
// editor.should_close = true,
} }
["o", path] | ["open", path] => { ["o", path] | ["open", path] => {
use helix_view::editor::Action; use helix_view::editor::Action;
@ -871,7 +880,7 @@ pub fn command_mode(cx: &mut Context) {
// TODO: non-blocking via save() command // TODO: non-blocking via save() command
let id = editor.view().doc; let id = editor.view().doc;
let doc = &mut editor.documents[id]; let doc = &mut editor.documents[id];
smol::block_on(doc.save()); block_on(doc.save());
} }
_ => (), _ => (),
@ -1175,8 +1184,7 @@ pub fn goto_definition(cx: &mut Context) {
let pos = pos_to_lsp_pos(doc.text(), doc.selection(view.id).cursor(), offset_encoding); let pos = pos_to_lsp_pos(doc.text(), doc.selection(view.id).cursor(), offset_encoding);
// TODO: handle fails // TODO: handle fails
let res = let res = block_on(language_server.goto_definition(doc.identifier(), pos)).unwrap_or_default();
smol::block_on(language_server.goto_definition(doc.identifier(), pos)).unwrap_or_default();
_goto(cx, res, offset_encoding); _goto(cx, res, offset_encoding);
} }
@ -1192,8 +1200,8 @@ pub fn goto_type_definition(cx: &mut Context) {
let pos = pos_to_lsp_pos(doc.text(), doc.selection(view.id).cursor(), offset_encoding); let pos = pos_to_lsp_pos(doc.text(), doc.selection(view.id).cursor(), offset_encoding);
// TODO: handle fails // TODO: handle fails
let res = smol::block_on(language_server.goto_type_definition(doc.identifier(), pos)) let res =
.unwrap_or_default(); block_on(language_server.goto_type_definition(doc.identifier(), pos)).unwrap_or_default();
_goto(cx, res, offset_encoding); _goto(cx, res, offset_encoding);
} }
@ -1209,8 +1217,8 @@ pub fn goto_implementation(cx: &mut Context) {
let pos = pos_to_lsp_pos(doc.text(), doc.selection(view.id).cursor(), offset_encoding); let pos = pos_to_lsp_pos(doc.text(), doc.selection(view.id).cursor(), offset_encoding);
// TODO: handle fails // TODO: handle fails
let res = smol::block_on(language_server.goto_implementation(doc.identifier(), pos)) let res =
.unwrap_or_default(); block_on(language_server.goto_implementation(doc.identifier(), pos)).unwrap_or_default();
_goto(cx, res, offset_encoding); _goto(cx, res, offset_encoding);
} }
@ -1226,8 +1234,7 @@ pub fn goto_reference(cx: &mut Context) {
let pos = pos_to_lsp_pos(doc.text(), doc.selection(view.id).cursor(), offset_encoding); let pos = pos_to_lsp_pos(doc.text(), doc.selection(view.id).cursor(), offset_encoding);
// TODO: handle fails // TODO: handle fails
let res = let res = block_on(language_server.goto_reference(doc.identifier(), pos)).unwrap_or_default();
smol::block_on(language_server.goto_reference(doc.identifier(), pos)).unwrap_or_default();
_goto(cx, res, offset_encoding); _goto(cx, res, offset_encoding);
} }
@ -1247,7 +1254,7 @@ pub fn signature_help(cx: &mut Context) {
// TODO: handle fails // TODO: handle fails
let res = smol::block_on(language_server.text_document_signature_help(doc.identifier(), pos)) let res = block_on(language_server.text_document_signature_help(doc.identifier(), pos))
.unwrap_or_default(); .unwrap_or_default();
if let Some(signature_help) = res { if let Some(signature_help) = res {
@ -1636,7 +1643,7 @@ pub fn format_selections(cx: &mut Context) {
}; };
// TODO: handle fails // TODO: handle fails
// TODO: concurrent map // TODO: concurrent map
let edits = smol::block_on(language_server.text_document_range_formatting( let edits = block_on(language_server.text_document_range_formatting(
doc.identifier(), doc.identifier(),
range, range,
lsp::FormattingOptions::default(), lsp::FormattingOptions::default(),
@ -1726,7 +1733,8 @@ pub fn save(cx: &mut Context) {
// Spawns an async task to actually do the saving. This way we prevent blocking. // Spawns an async task to actually do the saving. This way we prevent blocking.
// TODO: handle save errors somehow? // TODO: handle save errors somehow?
cx.editor.executor.spawn(cx.doc().save()).detach(); // TODO: don't block
block_on(cx.doc().save());
} }
pub fn completion(cx: &mut Context) { pub fn completion(cx: &mut Context) {
@ -1782,7 +1790,7 @@ pub fn completion(cx: &mut Context) {
); );
// TODO: handle fails // TODO: handle fails
let res = smol::block_on(language_server.completion(doc.identifier(), pos)).unwrap(); let res = block_on(language_server.completion(doc.identifier(), pos)).unwrap();
let trigger_offset = doc.selection(view.id).cursor(); let trigger_offset = doc.selection(view.id).cursor();
@ -1839,8 +1847,8 @@ pub fn hover(cx: &mut Context) {
); );
// TODO: handle fails // TODO: handle fails
let res = smol::block_on(language_server.text_document_hover(doc.identifier(), pos)) let res =
.unwrap_or_default(); block_on(language_server.text_document_hover(doc.identifier(), pos)).unwrap_or_default();
if let Some(hover) = res { if let Some(hover) = res {
// hover.contents / .range <- used for visualizing // hover.contents / .range <- used for visualizing
@ -1963,7 +1971,7 @@ pub fn space_mode(cx: &mut Context) {
'w' => { 'w' => {
// save current buffer // save current buffer
let doc = cx.doc(); let doc = cx.doc();
smol::block_on(doc.save()); block_on(doc.save());
} }
'c' => { 'c' => {
// close current split // close current split

View file

@ -4,7 +4,6 @@
use crossterm::event::Event; use crossterm::event::Event;
use helix_core::Position; use helix_core::Position;
use smol::Executor;
use tui::{buffer::Buffer as Surface, layout::Rect}; use tui::{buffer::Buffer as Surface, layout::Rect};
pub type Callback = Box<dyn FnOnce(&mut Compositor, &mut Editor)>; pub type Callback = Box<dyn FnOnce(&mut Compositor, &mut Editor)>;
@ -29,7 +28,6 @@ use crate::application::LspCallbacks;
pub struct Context<'a> { pub struct Context<'a> {
pub editor: &'a mut Editor, pub editor: &'a mut Editor,
pub executor: &'static smol::Executor<'static>,
pub scroll: Option<usize>, pub scroll: Option<usize>,
pub callbacks: &'a mut LspCallbacks, pub callbacks: &'a mut LspCallbacks,
} }

View file

@ -14,8 +14,6 @@ use std::path::PathBuf;
use anyhow::Error; use anyhow::Error;
static EX: smol::Executor = smol::Executor::new();
fn setup_logging(verbosity: u64) -> Result<(), fern::InitError> { fn setup_logging(verbosity: u64) -> Result<(), fern::InitError> {
let mut base_config = fern::Dispatch::new(); let mut base_config = fern::Dispatch::new();
@ -88,12 +86,12 @@ fn main() {
Loader::new(config) Loader::new(config)
}); });
for _ in 0..num_cpus::get() { let runtime = tokio::runtime::Runtime::new().unwrap();
std::thread::spawn(move || smol::block_on(EX.run(smol::future::pending::<()>())));
}
let mut app = Application::new(args, &EX).unwrap(); // TODO: use the thread local executor to spawn the application task separately from the work pool
runtime.block_on(async move {
let mut app = Application::new(args).unwrap();
// we use the thread local executor to spawn the application task separately from the work pool app.run().await;
smol::block_on(app.run()); });
} }

View file

@ -207,7 +207,7 @@ impl EditorView {
}); });
let style = if is_diagnostic { let style = if is_diagnostic {
style.clone().add_modifier(Modifier::UNDERLINED) style.add_modifier(Modifier::UNDERLINED)
} else { } else {
style style
}; };
@ -569,7 +569,6 @@ impl Component for EditorView {
let mut cx = Context { let mut cx = Context {
editor: cxt.editor, editor: cxt.editor,
callbacks: cxt.callbacks, callbacks: cxt.callbacks,
executor: cx.executor,
scroll: None, scroll: None,
}; };
let res = completion.handle_event(event, &mut cx); let res = completion.handle_event(event, &mut cx);

View file

@ -21,7 +21,7 @@ crossterm = { version = "0.19", features = ["event-stream"], optional = true }
once_cell = "1.4" once_cell = "1.4"
url = "2" url = "2"
smol = "1" tokio = { version = "1", features = ["full"] }
futures-util = "0.3" futures-util = "0.3"
slotmap = "1" slotmap = "1"

View file

@ -134,7 +134,7 @@ impl Document {
self.last_saved_revision = self.history.current_revision(); self.last_saved_revision = self.history.current_revision();
async move { async move {
use smol::{fs::File, prelude::*}; use tokio::{fs::File, io::AsyncWriteExt};
let mut file = File::create(path).await?; let mut file = File::create(path).await?;
// write all the rope chunks to file // write all the rope chunks to file
@ -232,7 +232,9 @@ impl Document {
transaction.changes(), transaction.changes(),
); );
smol::block_on(notify).expect("failed to emit textDocument/didChange"); if let Some(notify) = notify {
tokio::spawn(notify);
} //.expect("failed to emit textDocument/didChange");
} }
} }
success success

View file

@ -13,7 +13,6 @@ pub struct Editor {
pub count: Option<usize>, pub count: Option<usize>,
pub theme: Theme, pub theme: Theme,
pub language_servers: helix_lsp::Registry, pub language_servers: helix_lsp::Registry,
pub executor: &'static smol::Executor<'static>,
} }
#[derive(Copy, Clone)] #[derive(Copy, Clone)]
@ -24,7 +23,7 @@ pub enum Action {
} }
impl Editor { impl Editor {
pub fn new(executor: &'static smol::Executor<'static>, mut area: tui::layout::Rect) -> Self { pub fn new(mut area: tui::layout::Rect) -> Self {
use helix_core::config_dir; use helix_core::config_dir;
let config = std::fs::read(config_dir().join("theme.toml")); let config = std::fs::read(config_dir().join("theme.toml"));
// load $HOME/.config/helix/theme.toml, fallback to default config // load $HOME/.config/helix/theme.toml, fallback to default config
@ -44,7 +43,6 @@ impl Editor {
count: None, count: None,
theme, theme,
language_servers, language_servers,
executor,
} }
} }
@ -122,7 +120,7 @@ impl Editor {
let language_server = doc let language_server = doc
.language .language
.as_ref() .as_ref()
.and_then(|language| self.language_servers.get(language, self.executor)); .and_then(|language| self.language_servers.get(language));
if let Some(language_server) = language_server { if let Some(language_server) = language_server {
doc.set_language_server(Some(language_server.clone())); doc.set_language_server(Some(language_server.clone()));
@ -133,7 +131,8 @@ impl Editor {
.map(ToOwned::to_owned) .map(ToOwned::to_owned)
.unwrap_or_default(); .unwrap_or_default();
smol::block_on(language_server.text_document_did_open( let rt = tokio::runtime::Handle::current();
rt.block_on(language_server.text_document_did_open(
doc.url().unwrap(), doc.url().unwrap(),
doc.version(), doc.version(),
doc.text(), doc.text(),
@ -154,17 +153,18 @@ impl Editor {
let view = self.tree.get(self.tree.focus); let view = self.tree.get(self.tree.focus);
// get around borrowck issues // get around borrowck issues
let language_servers = &mut self.language_servers; let language_servers = &mut self.language_servers;
let executor = self.executor;
let doc = &self.documents[view.doc]; let doc = &self.documents[view.doc];
let language_server = doc let language_server = doc
.language .language
.as_ref() .as_ref()
.and_then(|language| language_servers.get(language, executor)); .and_then(|language| language_servers.get(language));
if let Some(language_server) = language_server { if let Some(language_server) = language_server {
smol::block_on(language_server.text_document_did_close(doc.identifier())).unwrap(); let rt = tokio::runtime::Handle::current();
rt.block_on(language_server.text_document_did_close(doc.identifier()))
.unwrap();
} }
// remove selection // remove selection