the great persy, sled, and heed purge (and bump MSRV to 1.74.1)

these database backends are either unmaintained, broken in conduit, or
incredibly niche for something like conduwuit.

also i want to bump the MSRV.

Signed-off-by: strawberry <strawberry@puppygock.gay>
This commit is contained in:
strawberry 2024-01-11 20:45:48 -05:00 committed by June
parent b28a2fad97
commit ee2f1c3084
16 changed files with 33 additions and 753 deletions

161
Cargo.lock generated
View file

@ -205,9 +205,9 @@ dependencies = [
[[package]]
name = "base64"
version = "0.21.5"
version = "0.21.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "35636a1494ede3b646cc98f74f8e62c773a38a659ebc777a2cf26b9b74171df9"
checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567"
[[package]]
name = "base64ct"
@ -215,15 +215,6 @@ version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b"
[[package]]
name = "bincode"
version = "1.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad"
dependencies = [
"serde",
]
[[package]]
name = "bindgen"
version = "0.69.1"
@ -347,9 +338,9 @@ dependencies = [
[[package]]
name = "clap"
version = "4.4.13"
version = "4.4.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52bdc885e4cacc7f7c9eedc1ef6da641603180c783c41a15c264944deeaab642"
checksum = "c12ed66a79a555082f595f7eb980d08669de95009dd4b3d61168c573ebe38fc9"
dependencies = [
"clap_builder",
"clap_derive",
@ -357,9 +348,9 @@ dependencies = [
[[package]]
name = "clap_builder"
version = "4.4.12"
version = "4.4.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb7fb5e4e979aec3be7791562fcba452f94ad85e954da024396433e0e25a79e9"
checksum = "0f4645eab3431e5a8403a96bea02506a8b35d28cd0f0330977dd5d22f9c84f43"
dependencies = [
"anstyle",
"clap_lex",
@ -404,7 +395,6 @@ dependencies = [
"directories",
"figment",
"futures-util",
"heed",
"hmac",
"http",
"hyper",
@ -421,7 +411,6 @@ dependencies = [
"opentelemetry-jaeger",
"opentelemetry_sdk",
"parking_lot",
"persy",
"rand",
"regex",
"reqwest",
@ -488,21 +477,6 @@ dependencies = [
"libc",
]
[[package]]
name = "crc"
version = "3.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86ec7a15cbe22e59248fc7eadb1907dab5ba09372595da4d73dd805ed4417dfe"
dependencies = [
"crc-catalog",
]
[[package]]
name = "crc-catalog"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9cace84e55f07e7301bae1c519df89cdad8cc3cd868413d3fdbdeca9ff3db484"
[[package]]
name = "crc32fast"
version = "1.3.2"
@ -792,16 +766,6 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "fs2"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "futures-channel"
version = "0.3.28"
@ -991,42 +955,6 @@ version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8"
[[package]]
name = "heed"
version = "0.10.6"
source = "git+https://github.com/timokoesters/heed.git?rev=f6f825da7fb2c758867e05ad973ef800a6fe1d5d#f6f825da7fb2c758867e05ad973ef800a6fe1d5d"
dependencies = [
"bytemuck",
"byteorder",
"heed-traits",
"heed-types",
"libc",
"lmdb-rkv-sys",
"once_cell",
"page_size",
"serde",
"synchronoise",
"url",
]
[[package]]
name = "heed-traits"
version = "0.7.0"
source = "git+https://github.com/timokoesters/heed.git?rev=f6f825da7fb2c758867e05ad973ef800a6fe1d5d#f6f825da7fb2c758867e05ad973ef800a6fe1d5d"
[[package]]
name = "heed-types"
version = "0.7.2"
source = "git+https://github.com/timokoesters/heed.git?rev=f6f825da7fb2c758867e05ad973ef800a6fe1d5d#f6f825da7fb2c758867e05ad973ef800a6fe1d5d"
dependencies = [
"bincode",
"bytemuck",
"byteorder",
"heed-traits",
"serde",
"serde_json",
]
[[package]]
name = "hermit-abi"
version = "0.3.2"
@ -1389,17 +1317,6 @@ version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f"
[[package]]
name = "lmdb-rkv-sys"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61b9ce6b3be08acefa3003c57b7565377432a89ec24476bbe72e11d101f852fe"
dependencies = [
"cc",
"libc",
"pkg-config",
]
[[package]]
name = "lock_api"
version = "0.4.10"
@ -1703,16 +1620,6 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "page_size"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eebde548fbbf1ea81a99b128872779c437752fb99f217c45245e1a61dcd9edcd"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "parking_lot"
version = "0.12.1"
@ -1798,22 +1705,6 @@ version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94"
[[package]]
name = "persy"
version = "1.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3cd38c602b23c2f451842d89f27cd5e0d4b292176daf40feeda859c658dcdc76"
dependencies = [
"crc",
"data-encoding",
"fs2",
"linked-hash-map",
"rand",
"thiserror",
"unsigned-varint",
"zigzag",
]
[[package]]
name = "pin-project"
version = "1.1.3"
@ -2465,18 +2356,18 @@ checksum = "b0293b4b29daaf487284529cc2f5675b8e57c61f70167ba415a463651fd6a918"
[[package]]
name = "serde"
version = "1.0.194"
version = "1.0.195"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b114498256798c94a0689e1a15fec6005dee8ac1f41de56404b67afc2a4b773"
checksum = "63261df402c67811e9ac6def069e4786148c4563f4b50fd4bf30aa370d626b02"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.194"
version = "1.0.195"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3385e45322e8f9931410f01b3031ec534c3947d0e94c18049af4d9f9907d4e0"
checksum = "46fe8f8603d81ba86327b23a2e9cdf49e1255fb94a4c5f297f6ee0547178ea2c"
dependencies = [
"proc-macro2",
"quote",
@ -2498,9 +2389,9 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.109"
version = "1.0.111"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb0652c533506ad7a2e353cce269330d6afd8bdfb6d75e0ace5b35aacbd7b9e9"
checksum = "176e46fa42316f18edd598015a5166857fc835ec732f5215eac6b7bdbf0a84f4"
dependencies = [
"itoa",
"ryu",
@ -2540,9 +2431,9 @@ dependencies = [
[[package]]
name = "serde_yaml"
version = "0.9.29"
version = "0.9.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a15e0ef66bf939a7c890a0bf6d5a733c70202225f9888a89ed5c62298b019129"
checksum = "b1bf28c79a99f70ee1f1d83d10c875d2e70618417fda01ad1785e027579d9d38"
dependencies = [
"indexmap 2.0.0",
"itoa",
@ -2721,15 +2612,6 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160"
[[package]]
name = "synchronoise"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3dbc01390fc626ce8d1cffe3376ded2b72a11bb70e1c75f404a210e4daa4def2"
dependencies = [
"crossbeam-queue",
]
[[package]]
name = "system-configuration"
version = "0.5.1"
@ -3228,12 +3110,6 @@ version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab4c90930b95a82d00dc9e9ac071b4991924390d46cbd0dfe566148667605e4b"
[[package]]
name = "unsigned-varint"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6889a77d49f1f013504cec6bf97a2c730394adedaeb1deb5ea08949a50541105"
[[package]]
name = "untrusted"
version = "0.7.1"
@ -3528,15 +3404,6 @@ version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a0956f1ba7c7909bfb66c2e9e4124ab6f6482560f6628b5aaeba39207c9aad9"
[[package]]
name = "zigzag"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70b40401a28d86ce16a330b863b86fd7dbee4d7c940587ab09ab8c019f9e3fdf"
dependencies = [
"num-traits",
]
[[package]]
name = "zstd"
version = "0.13.0"

View file

@ -13,7 +13,7 @@ edition = "2021"
# `nix flake update`. If you don't have Nix installed or otherwise don't know
# how to do this, ping `@charles:computer.surgery` or `@dusk:gaze.systems` in
# the matrix room.
rust-version = "1.70.0"
rust-version = "1.74.1"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@ -37,10 +37,6 @@ hyperlocal = { git = "https://github.com/softprops/hyperlocal", rev = "2ee4d1496
hyper = { version = "0.14", features = ["server", "http1", "http2"] }
tokio = { version = "1.35.1", features = ["fs", "macros", "signal", "sync"] }
loole = "0.3"
# Used for storing data permanently
#sled = { version = "0.34.7", features = ["compression", "no_metrics"], optional = true }
#sled = { git = "https://github.com/spacejam/sled.git", rev = "e4640e0773595229f398438886f19bca6f7326a2", features = ["compression"] }
persy = { version = "1.4.7", optional = true, features = ["background_ops"] }
# Used for the http request / response body type for Ruma endpoints used with reqwest
bytes = "1.5.0"
@ -48,11 +44,11 @@ http = "0.2.11"
# Used to find data directory for default db path
directories = "5.0.1"
# Used for ruma wrapper
serde_json = { version = "1.0.109", features = ["raw_value"] }
serde_json = { version = "1.0.111", features = ["raw_value"] }
# Used for appservice registration files
serde_yaml = "0.9.29"
serde_yaml = "0.9.30"
# Used for pdu definition
serde = { version = "1.0.194", features = ["rc"] }
serde = { version = "1.0.195", features = ["rc"] }
# Used for secure identifiers
rand = "0.8.5"
# Used to hash passwords
@ -63,7 +59,7 @@ thiserror = "1.0.56"
# Used to generate thumbnails for images
image = { version = "0.24.7", default-features = false, features = ["jpeg", "png", "gif", "webp"] }
# Used to encode server public key
base64 = "0.21.5"
base64 = "0.21.6"
# Used when hashing the state
ring = "0.17.7"
# Used when querying the SRV record of other servers
@ -86,7 +82,6 @@ parking_lot = { version = "0.12.1", optional = true }
crossbeam = { version = "0.8.3", optional = true }
num_cpus = "1.16.0"
threadpool = "1.8.1"
heed = { git = "https://github.com/timokoesters/heed.git", rev = "f6f825da7fb2c758867e05ad973ef800a6fe1d5d", optional = true }
# Used for ruma wrapper
serde_html_form = "0.2.3"
@ -98,7 +93,7 @@ hmac = "0.12.1"
sha-1 = "0.10.1"
sha2 = { version = "0.10.8" }
# used for conduit's CLI and admin room command parsing
clap = { version = "4.4.13", default-features = false, features = ["std", "derive", "help", "usage", "error-context"] }
clap = { version = "4.4.14", default-features = false, features = ["std", "derive", "help", "usage", "error-context"] }
futures-util = { version = "0.3.30", default-features = false }
# Used for reading the configuration from conduit.toml & environment variables
figment = { version = "0.10.13", features = ["env", "toml"] }
@ -118,10 +113,7 @@ nix = { version = "0.27.1", features = ["resource"] }
[features]
default = ["conduit_bin", "backend_rocksdb", "systemd", "zstd_compression"]
#backend_sled = ["sled"]
backend_persy = ["persy", "parking_lot"]
backend_sqlite = ["sqlite"]
#backend_heed = ["heed", "crossbeam"]
backend_rocksdb = ["rocksdb"]
jemalloc = ["tikv-jemalloc-ctl", "tikv-jemallocator"]
sqlite = ["rusqlite", "parking_lot", "tokio/signal"]

View file

@ -180,7 +180,7 @@ allow_check_for_updates = true
trusted_servers = ["matrix.org"]
#max_concurrent_requests = 100 # How many requests Conduit sends to other servers at the same time
#log = "warn,state_res=warn,rocket=off,_=off,sled=off"
#log = "warn,state_res=warn,rocket=off,_=off"
address = "127.0.0.1" # This makes sure Conduit can only be reached using the reverse proxy
#address = "0.0.0.0" # If Conduit is running in a container, make sure the reverse proxy (ie. Traefik) can reach it.

View file

@ -33,7 +33,7 @@ RUN echo "allow_federation = true" >> conduit.toml
RUN echo "allow_check_for_updates = true" >> conduit.toml
RUN echo "allow_encryption = true" >> conduit.toml
RUN echo "allow_registration = true" >> conduit.toml
RUN echo "log = \"warn,_=off,sled=off\"" >> conduit.toml
RUN echo "log = \"warn,_=off\"" >> conduit.toml
RUN sed -i "s/address = \"127.0.0.1\"/address = \"0.0.0.0\"/g" conduit.toml
COPY --from=builder /workdir/target/release/conduit /workdir/conduit

View file

@ -61,8 +61,8 @@ enable_lightning_bolt = true
# servers.)
trusted_servers = ["matrix.org"]
#max_concurrent_requests = 100 # How many requests Conduit sends to other servers at the same time
#log = "warn,state_res=warn,rocket=off,_=off,sled=off"
#max_concurrent_requests = 400 # How many requests Conduit sends to other servers at the same time
#log = "warn,state_res=warn"
address = "127.0.0.1" # This makes sure Conduit can only be reached using the reverse proxy
#address = "0.0.0.0" # If Conduit is running in a container, make sure the reverse proxy (ie. Traefik) can reach it.

4
debian/postinst vendored
View file

@ -77,8 +77,8 @@ allow_check_for_updates = true
trusted_servers = ["matrix.org"]
#max_concurrent_requests = 100 # How many requests Conduit sends to other servers at the same time
#log = "warn,state_res=warn,rocket=off,_=off,sled=off"
#max_concurrent_requests = 400 # How many requests Conduit sends to other servers at the same time
#log = "warn,state_res=warn"
EOF
fi
;;

View file

@ -64,7 +64,7 @@ docker run -d -p 8448:6167 \
-e CONDUIT_MAX_REQUEST_SIZE="20_000_000" \
-e CONDUIT_TRUSTED_SERVERS="[\"matrix.org\"]" \
-e CONDUIT_MAX_CONCURRENT_REQUESTS="100" \
-e CONDUIT_LOG="warn,rocket=off,_=off,sled=off" \
-e CONDUIT_LOG="warn,state_res=warn" \
--name conduit <link>
```

View file

@ -32,7 +32,7 @@ services:
CONDUIT_ALLOW_CHECK_FOR_UPDATES: 'true'
CONDUIT_TRUSTED_SERVERS: '["matrix.org"]'
#CONDUIT_MAX_CONCURRENT_REQUESTS: 100
#CONDUIT_LOG: warn,rocket=off,_=off,sled=off
#CONDUIT_LOG: warn,state_res=warn
CONDUIT_ADDRESS: 0.0.0.0
CONDUIT_CONFIG: '' # Ignore this

View file

@ -33,7 +33,7 @@ services:
# CONDUIT_PORT: 6167
# CONDUIT_CONFIG: '/srv/conduit/conduit.toml' # if you want to configure purely by env vars, set this to an empty string ''
# Available levels are: error, warn, info, debug, trace - more info at: https://docs.rs/env_logger/*/env_logger/#enabling-logging
# CONDUIT_LOG: info # default is: "warn,_=off,sled=off"
# CONDUIT_LOG: info # default is: "warn,state_res=warn"
# CONDUIT_ALLOW_JAEGER: 'false'
# CONDUIT_ALLOW_ENCRYPTION: 'true'
# CONDUIT_ALLOW_FEDERATION: 'true'

View file

@ -31,8 +31,8 @@ services:
CONDUIT_ALLOW_FEDERATION: 'true'
CONDUIT_ALLOW_CHECK_FOR_UPDATES: 'true'
CONDUIT_TRUSTED_SERVERS: '["matrix.org"]'
#CONDUIT_MAX_CONCURRENT_REQUESTS: 100
#CONDUIT_LOG: warn,rocket=off,_=off,sled=off
#CONDUIT_MAX_CONCURRENT_REQUESTS: 400
#CONDUIT_LOG: warn,state_res=warn
CONDUIT_ADDRESS: 0.0.0.0
CONDUIT_CONFIG: '' # Ignore this
#

View file

@ -3,27 +3,13 @@ use crate::Result;
use std::{future::Future, pin::Pin, sync::Arc};
#[cfg(feature = "sled")]
pub mod sled;
#[cfg(feature = "sqlite")]
pub mod sqlite;
#[cfg(feature = "heed")]
pub mod heed;
#[cfg(feature = "rocksdb")]
pub mod rocksdb;
#[cfg(feature = "persy")]
pub mod persy;
#[cfg(any(
feature = "sqlite",
feature = "rocksdb",
feature = "heed",
feature = "persy"
))]
#[cfg(any(feature = "sqlite", feature = "rocksdb"))]
pub mod watchers;
pub trait KeyValueDatabaseEngine: Send + Sync {

View file

@ -1,194 +0,0 @@
use super::{super::Config, watchers::Watchers};
use crossbeam::channel::{bounded, Sender as ChannelSender};
use threadpool::ThreadPool;
use crate::{Error, Result};
use std::{
future::Future,
pin::Pin,
sync::{Arc, Mutex},
};
use super::{DatabaseEngine, Tree};
type TupleOfBytes = (Vec<u8>, Vec<u8>);
pub struct Engine {
env: heed::Env,
iter_pool: Mutex<ThreadPool>,
}
pub struct EngineTree {
engine: Arc<Engine>,
tree: Arc<heed::UntypedDatabase>,
watchers: Watchers,
}
fn convert_error(error: heed::Error) -> Error {
Error::HeedError {
error: error.to_string(),
}
}
impl DatabaseEngine for Engine {
fn open(config: &Config) -> Result<Arc<Self>> {
let mut env_builder = heed::EnvOpenOptions::new();
env_builder.map_size(1024 * 1024 * 1024 * 1024); // 1 Terabyte
env_builder.max_readers(126);
env_builder.max_dbs(128);
unsafe {
env_builder.flag(heed::flags::Flags::MdbWriteMap);
env_builder.flag(heed::flags::Flags::MdbMapAsync);
}
Ok(Arc::new(Engine {
env: env_builder
.open(&config.database_path)
.map_err(convert_error)?,
iter_pool: Mutex::new(ThreadPool::new(10)),
}))
}
fn open_tree(self: &Arc<Self>, name: &'static str) -> Result<Arc<dyn Tree>> {
// Creates the db if it doesn't exist already
Ok(Arc::new(EngineTree {
engine: Arc::clone(self),
tree: Arc::new(
self.env
.create_database(Some(name))
.map_err(convert_error)?,
),
watchers: Default::default(),
}))
}
fn flush(self: &Arc<Self>) -> Result<()> {
self.env.force_sync().map_err(convert_error)?;
Ok(())
}
}
impl EngineTree {
fn iter_from_thread(
&self,
tree: Arc<heed::UntypedDatabase>,
from: Vec<u8>,
backwards: bool,
) -> Box<dyn Iterator<Item = TupleOfBytes> + Send + Sync> {
let (s, r) = bounded::<TupleOfBytes>(100);
let engine = Arc::clone(&self.engine);
let lock = self.engine.iter_pool.lock().await;
if lock.active_count() < lock.max_count() {
lock.execute(move || {
iter_from_thread_work(tree, &engine.env.read_txn().unwrap(), from, backwards, &s);
});
} else {
std::thread::spawn(move || {
iter_from_thread_work(tree, &engine.env.read_txn().unwrap(), from, backwards, &s);
});
}
Box::new(r.into_iter())
}
}
fn iter_from_thread_work(
tree: Arc<heed::UntypedDatabase>,
txn: &heed::RoTxn<'_>,
from: Vec<u8>,
backwards: bool,
s: &ChannelSender<(Vec<u8>, Vec<u8>)>,
) {
if backwards {
for (k, v) in tree.rev_range(txn, ..=&*from).unwrap().map(|r| r.unwrap()) {
if s.send((k.to_vec(), v.to_vec())).is_err() {
return;
}
}
} else {
if from.is_empty() {
for (k, v) in tree.iter(txn).unwrap().map(|r| r.unwrap()) {
if s.send((k.to_vec(), v.to_vec())).is_err() {
return;
}
}
} else {
for (k, v) in tree.range(txn, &*from..).unwrap().map(|r| r.unwrap()) {
if s.send((k.to_vec(), v.to_vec())).is_err() {
return;
}
}
}
}
}
impl Tree for EngineTree {
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
let txn = self.engine.env.read_txn().map_err(convert_error)?;
Ok(self
.tree
.get(&txn, &key)
.map_err(convert_error)?
.map(|s| s.to_vec()))
}
fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> {
let mut txn = self.engine.env.write_txn().map_err(convert_error)?;
self.tree
.put(&mut txn, &key, &value)
.map_err(convert_error)?;
txn.commit().map_err(convert_error)?;
self.watchers.wake(key);
Ok(())
}
fn remove(&self, key: &[u8]) -> Result<()> {
let mut txn = self.engine.env.write_txn().map_err(convert_error)?;
self.tree.delete(&mut txn, &key).map_err(convert_error)?;
txn.commit().map_err(convert_error)?;
Ok(())
}
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + Send + 'a> {
self.iter_from(&[], false)
}
fn iter_from(
&self,
from: &[u8],
backwards: bool,
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + Send> {
self.iter_from_thread(Arc::clone(&self.tree), from.to_vec(), backwards)
}
fn increment(&self, key: &[u8]) -> Result<Vec<u8>> {
let mut txn = self.engine.env.write_txn().map_err(convert_error)?;
let old = self.tree.get(&txn, &key).map_err(convert_error)?;
let new =
crate::utils::increment(old.as_deref()).expect("utils::increment always returns Some");
self.tree
.put(&mut txn, &key, &&*new)
.map_err(convert_error)?;
txn.commit().map_err(convert_error)?;
Ok(new)
}
fn scan_prefix<'a>(
&'a self,
prefix: Vec<u8>,
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + Send + 'a> {
Box::new(
self.iter_from(&prefix, false)
.take_while(move |(key, _)| key.starts_with(&prefix)),
)
}
fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
self.watchers.watch(prefix)
}
}

View file

@ -1,197 +0,0 @@
use crate::{
database::{
abstraction::{watchers::Watchers, KeyValueDatabaseEngine, KvTree},
Config,
},
Result,
};
use persy::{ByteVec, OpenOptions, Persy, Transaction, TransactionConfig, ValueMode};
use std::{future::Future, pin::Pin, sync::Arc};
use tracing::warn;
pub struct Engine {
persy: Persy,
}
impl KeyValueDatabaseEngine for Arc<Engine> {
fn open(config: &Config) -> Result<Self> {
let mut cfg = persy::Config::new();
cfg.change_cache_size((config.db_cache_capacity_mb * 1024.0 * 1024.0) as u64);
let persy = OpenOptions::new()
.create(true)
.config(cfg)
.open(&format!("{}/db.persy", config.database_path))?;
Ok(Arc::new(Engine { persy }))
}
fn open_tree(&self, name: &'static str) -> Result<Arc<dyn KvTree>> {
// Create if it doesn't exist
if !self.persy.exists_index(name)? {
let mut tx = self.persy.begin()?;
tx.create_index::<ByteVec, ByteVec>(name, ValueMode::Replace)?;
tx.prepare()?.commit()?;
}
Ok(Arc::new(PersyTree {
persy: self.persy.clone(),
name: name.to_owned(),
watchers: Watchers::default(),
}))
}
fn flush(&self) -> Result<()> {
Ok(())
}
}
pub struct PersyTree {
persy: Persy,
name: String,
watchers: Watchers,
}
impl PersyTree {
fn begin(&self) -> Result<Transaction> {
Ok(self
.persy
.begin_with(TransactionConfig::new().set_background_sync(true))?)
}
}
impl KvTree for PersyTree {
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
let result = self
.persy
.get::<ByteVec, ByteVec>(&self.name, &ByteVec::from(key))?
.next()
.map(|v| (*v).to_owned());
Ok(result)
}
fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> {
self.insert_batch(&mut Some((key.to_owned(), value.to_owned())).into_iter())?;
self.watchers.wake(key);
Ok(())
}
fn insert_batch<'a>(&self, iter: &mut dyn Iterator<Item = (Vec<u8>, Vec<u8>)>) -> Result<()> {
let mut tx = self.begin()?;
for (key, value) in iter {
tx.put::<ByteVec, ByteVec>(
&self.name,
ByteVec::from(key.clone()),
ByteVec::from(value),
)?;
}
tx.prepare()?.commit()?;
Ok(())
}
fn increment_batch<'a>(&self, iter: &mut dyn Iterator<Item = Vec<u8>>) -> Result<()> {
let mut tx = self.begin()?;
for key in iter {
let old = tx
.get::<ByteVec, ByteVec>(&self.name, &ByteVec::from(key.clone()))?
.next()
.map(|v| (*v).to_owned());
let new = crate::utils::increment(old.as_deref()).unwrap();
tx.put::<ByteVec, ByteVec>(&self.name, ByteVec::from(key), ByteVec::from(new))?;
}
tx.prepare()?.commit()?;
Ok(())
}
fn remove(&self, key: &[u8]) -> Result<()> {
let mut tx = self.begin()?;
tx.remove::<ByteVec, ByteVec>(&self.name, ByteVec::from(key), None)?;
tx.prepare()?.commit()?;
Ok(())
}
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
let iter = self.persy.range::<ByteVec, ByteVec, _>(&self.name, ..);
match iter {
Ok(iter) => Box::new(iter.filter_map(|(k, v)| {
v.into_iter()
.map(|val| ((*k).to_owned().into(), (*val).to_owned().into()))
.next()
})),
Err(e) => {
warn!("error iterating {:?}", e);
Box::new(std::iter::empty())
}
}
}
fn iter_from<'a>(
&'a self,
from: &[u8],
backwards: bool,
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
let range = if backwards {
self.persy
.range::<ByteVec, ByteVec, _>(&self.name, ..=ByteVec::from(from))
} else {
self.persy
.range::<ByteVec, ByteVec, _>(&self.name, ByteVec::from(from)..)
};
match range {
Ok(iter) => {
let map = iter.filter_map(|(k, v)| {
v.into_iter()
.map(|val| ((*k).to_owned().into(), (*val).to_owned().into()))
.next()
});
if backwards {
Box::new(map.rev())
} else {
Box::new(map)
}
}
Err(e) => {
warn!("error iterating with prefix {:?}", e);
Box::new(std::iter::empty())
}
}
}
fn increment(&self, key: &[u8]) -> Result<Vec<u8>> {
self.increment_batch(&mut Some(key.to_owned()).into_iter())?;
Ok(self.get(key)?.unwrap())
}
fn scan_prefix<'a>(
&'a self,
prefix: Vec<u8>,
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
let range_prefix = ByteVec::from(prefix.clone());
let range = self
.persy
.range::<ByteVec, ByteVec, _>(&self.name, range_prefix..);
match range {
Ok(iter) => {
let owned_prefix = prefix.clone();
Box::new(
iter.take_while(move |(k, _)| (*k).starts_with(&owned_prefix))
.filter_map(|(k, v)| {
v.into_iter()
.map(|val| ((*k).to_owned().into(), (*val).to_owned().into()))
.next()
}),
)
}
Err(e) => {
warn!("error scanning prefix {:?}", e);
Box::new(std::iter::empty())
}
}
}
fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
self.watchers.watch(prefix)
}
}

View file

@ -1,127 +0,0 @@
use super::super::Config;
use crate::{utils, Result};
use std::{future::Future, pin::Pin, sync::Arc};
use tracing::warn;
use super::{DatabaseEngine, Tree};
pub struct Engine(sled::Db);
pub struct SledEngineTree(sled::Tree);
impl DatabaseEngine for Engine {
fn open(config: &Config) -> Result<Arc<Self>> {
Ok(Arc::new(Engine(
sled::Config::default()
.path(&config.database_path)
.cache_capacity((config.db_cache_capacity_mb * 1024.0 * 1024.0) as u64)
.use_compression(true)
.open()?,
)))
}
fn open_tree(self: &Arc<Self>, name: &'static str) -> Result<Arc<dyn Tree>> {
Ok(Arc::new(SledEngineTree(self.0.open_tree(name)?)))
}
fn flush(self: &Arc<Self>) -> Result<()> {
Ok(()) // noop
}
}
impl Tree for SledEngineTree {
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
Ok(self.0.get(key)?.map(|v| v.to_vec()))
}
fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> {
self.0.insert(key, value)?;
Ok(())
}
fn insert_batch<'a>(&self, iter: &mut dyn Iterator<Item = (Vec<u8>, Vec<u8>)>) -> Result<()> {
for (key, value) in iter {
self.0.insert(key, value)?;
}
Ok(())
}
fn remove(&self, key: &[u8]) -> Result<()> {
self.0.remove(key)?;
Ok(())
}
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
Box::new(
self.0
.iter()
.filter_map(|r| {
if let Err(e) = &r {
warn!("Error: {}", e);
}
r.ok()
})
.map(|(k, v)| (k.to_vec().into(), v.to_vec().into())),
)
}
fn iter_from(
&self,
from: &[u8],
backwards: bool,
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)>> {
let iter = if backwards {
self.0.range(..=from)
} else {
self.0.range(from..)
};
let iter = iter
.filter_map(|r| {
if let Err(e) = &r {
warn!("Error: {}", e);
}
r.ok()
})
.map(|(k, v)| (k.to_vec().into(), v.to_vec().into()));
if backwards {
Box::new(iter.rev())
} else {
Box::new(iter)
}
}
fn increment(&self, key: &[u8]) -> Result<Vec<u8>> {
Ok(self
.0
.update_and_fetch(key, utils::increment)
.map(|o| o.expect("increment always sets a value").to_vec())?)
}
fn scan_prefix<'a>(
&'a self,
prefix: Vec<u8>,
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
let iter = self
.0
.scan_prefix(prefix)
.filter_map(|r| {
if let Err(e) = &r {
warn!("Error: {}", e);
}
r.ok()
})
.map(|(k, v)| (k.to_vec().into(), v.to_vec().into()));
Box::new(iter)
}
fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
let prefix = prefix.to_vec();
Box::pin(async move {
self.0.watch_prefix(prefix).await;
})
}
}

View file

@ -192,16 +192,11 @@ impl KeyValueDatabase {
fn check_db_setup(config: &Config) -> Result<()> {
let path = Path::new(&config.database_path);
let sled_exists = path.join("db").exists();
let sqlite_exists = path.join("conduit.db").exists();
let rocksdb_exists = path.join("IDENTITY").exists();
let mut count = 0;
if sled_exists {
count += 1;
}
if sqlite_exists {
count += 1;
}
@ -215,12 +210,6 @@ impl KeyValueDatabase {
return Ok(());
}
if sled_exists && config.database_backend != "sled" {
return Err(Error::bad_config(
"Found sled at database_path, but is not specified in config.",
));
}
if sqlite_exists && config.database_backend != "sqlite" {
return Err(Error::bad_config(
"Found sqlite at database_path, but is not specified in config.",
@ -260,14 +249,8 @@ impl KeyValueDatabase {
#[cfg(feature = "rocksdb")]
Arc::new(Arc::<abstraction::rocksdb::Engine>::open(&config)?)
}
"persy" => {
#[cfg(not(feature = "persy"))]
return Err(Error::BadConfig("Database backend not found."));
#[cfg(feature = "persy")]
Arc::new(Arc::<abstraction::persy::Engine>::open(&config)?)
}
_ => {
return Err(Error::BadConfig("Database backend not found."));
return Err(Error::BadConfig("Database backend not found. sqlite (not recommended) and rocksdb are the only supported backends."));
}
};

View file

@ -11,33 +11,18 @@ use ruma::{
use thiserror::Error;
use tracing::{error, info};
#[cfg(feature = "persy")]
use persy::PersyError;
use crate::RumaResponse;
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Error, Debug)]
pub enum Error {
#[cfg(feature = "sled")]
#[error("There was a problem with the connection to the sled database.")]
SledError {
#[from]
source: sled::Error,
},
#[cfg(feature = "sqlite")]
#[error("There was a problem with the connection to the sqlite database: {source}")]
SqliteError {
#[from]
source: rusqlite::Error,
},
#[cfg(feature = "persy")]
#[error("There was a problem with the connection to the persy database.")]
PersyError { source: PersyError },
#[cfg(feature = "heed")]
#[error("There was a problem with the connection to the heed database: {error}")]
HeedError { error: String },
#[cfg(feature = "rocksdb")]
#[error("There was a problem with the connection to the rocksdb database: {source}")]
RocksDbError {
@ -150,14 +135,8 @@ impl Error {
let db_error = String::from("Database or I/O error occurred.");
match self {
#[cfg(feature = "sled")]
Self::SledError { .. } => db_error,
#[cfg(feature = "sqlite")]
Self::SqliteError { .. } => db_error,
#[cfg(feature = "persy")]
Self::PersyError { .. } => db_error,
#[cfg(feature = "heed")]
Self::HeedError => db_error,
#[cfg(feature = "rocksdb")]
Self::RocksDbError { .. } => db_error,
Self::IoError { .. } => db_error,
@ -168,15 +147,6 @@ impl Error {
}
}
#[cfg(feature = "persy")]
impl<T: Into<PersyError>> From<persy::PE<T>> for Error {
fn from(err: persy::PE<T>) -> Self {
Error::PersyError {
source: err.error().into(),
}
}
}
impl From<Infallible> for Error {
fn from(i: Infallible) -> Self {
match i {}