reimplement iterator from lowlevel
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
6dd6e4bfaf
commit
0522fe7d92
3 changed files with 115 additions and 29 deletions
105
src/database/iter.rs
Normal file
105
src/database/iter.rs
Normal file
|
@ -0,0 +1,105 @@
|
||||||
|
use std::{iter::FusedIterator, sync::Arc};
|
||||||
|
|
||||||
|
use conduit::Result;
|
||||||
|
use rocksdb::{ColumnFamily, DBRawIteratorWithThreadMode, Direction, IteratorMode, ReadOptions};
|
||||||
|
|
||||||
|
use crate::{engine::Db, map::KeyVal, result, Engine};
|
||||||
|
|
||||||
|
type Cursor<'cursor> = DBRawIteratorWithThreadMode<'cursor, Db>;
|
||||||
|
type Key<'item> = &'item [u8];
|
||||||
|
type Val<'item> = &'item [u8];
|
||||||
|
type Item<'item> = (Key<'item>, Val<'item>);
|
||||||
|
|
||||||
|
struct State<'cursor> {
|
||||||
|
cursor: Cursor<'cursor>,
|
||||||
|
direction: Direction,
|
||||||
|
valid: bool,
|
||||||
|
init: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'cursor> State<'cursor> {
|
||||||
|
pub(crate) fn new(
|
||||||
|
db: &'cursor Arc<Engine>, cf: &'cursor Arc<ColumnFamily>, opts: ReadOptions, mode: &IteratorMode<'_>,
|
||||||
|
) -> Self {
|
||||||
|
let mut cursor = db.db.raw_iterator_cf_opt(&**cf, opts);
|
||||||
|
let direction = into_direction(mode);
|
||||||
|
let valid = seek_init(&mut cursor, mode);
|
||||||
|
Self {
|
||||||
|
cursor,
|
||||||
|
direction,
|
||||||
|
valid,
|
||||||
|
init: true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Iter<'cursor> {
|
||||||
|
state: State<'cursor>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'cursor> Iter<'cursor> {
|
||||||
|
pub(crate) fn new(
|
||||||
|
db: &'cursor Arc<Engine>, cf: &'cursor Arc<ColumnFamily>, opts: ReadOptions, mode: &IteratorMode<'_>,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
state: State::new(db, cf, opts, mode),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Iterator for Iter<'_> {
|
||||||
|
type Item = KeyVal;
|
||||||
|
|
||||||
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
|
if !self.state.init && self.state.valid {
|
||||||
|
seek_next(&mut self.state.cursor, self.state.direction);
|
||||||
|
} else if self.state.init {
|
||||||
|
self.state.init = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.state.cursor.item().map(into_keyval).or_else(|| {
|
||||||
|
when_invalid(&mut self.state).expect("iterator invalidated due to error");
|
||||||
|
None
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FusedIterator for Iter<'_> {}
|
||||||
|
|
||||||
|
fn when_invalid(state: &mut State<'_>) -> Result<()> {
|
||||||
|
state.valid = false;
|
||||||
|
result(state.cursor.status())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn seek_next(cursor: &mut Cursor<'_>, direction: Direction) {
|
||||||
|
match direction {
|
||||||
|
Direction::Forward => cursor.next(),
|
||||||
|
Direction::Reverse => cursor.prev(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn seek_init(cursor: &mut Cursor<'_>, mode: &IteratorMode<'_>) -> bool {
|
||||||
|
use Direction::{Forward, Reverse};
|
||||||
|
use IteratorMode::{End, From, Start};
|
||||||
|
|
||||||
|
match mode {
|
||||||
|
Start => cursor.seek_to_first(),
|
||||||
|
End => cursor.seek_to_last(),
|
||||||
|
From(key, Forward) => cursor.seek(key),
|
||||||
|
From(key, Reverse) => cursor.seek_for_prev(key),
|
||||||
|
};
|
||||||
|
|
||||||
|
cursor.valid()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn into_direction(mode: &IteratorMode<'_>) -> Direction {
|
||||||
|
use Direction::{Forward, Reverse};
|
||||||
|
use IteratorMode::{End, From, Start};
|
||||||
|
|
||||||
|
match mode {
|
||||||
|
Start | From(_, Forward) => Forward,
|
||||||
|
End | From(_, Reverse) => Reverse,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn into_keyval((key, val): Item<'_>) -> KeyVal { (Vec::from(key), Vec::from(val)) }
|
|
@ -5,7 +5,7 @@ use rocksdb::{
|
||||||
AsColumnFamilyRef, ColumnFamily, Direction, IteratorMode, ReadOptions, WriteBatchWithTransaction, WriteOptions,
|
AsColumnFamilyRef, ColumnFamily, Direction, IteratorMode, ReadOptions, WriteBatchWithTransaction, WriteOptions,
|
||||||
};
|
};
|
||||||
|
|
||||||
use super::{or_else, result, watchers::Watchers, Engine};
|
use crate::{or_else, result, watchers::Watchers, Engine, Iter};
|
||||||
|
|
||||||
pub struct Map {
|
pub struct Map {
|
||||||
name: String,
|
name: String,
|
||||||
|
@ -16,9 +16,9 @@ pub struct Map {
|
||||||
read_options: ReadOptions,
|
read_options: ReadOptions,
|
||||||
}
|
}
|
||||||
|
|
||||||
type Key = Vec<u8>;
|
pub(crate) type KeyVal = (Key, Val);
|
||||||
type Val = Vec<u8>;
|
pub(crate) type Val = Vec<u8>;
|
||||||
type KeyVal = (Key, Val);
|
pub(crate) type Key = Vec<u8>;
|
||||||
|
|
||||||
impl Map {
|
impl Map {
|
||||||
pub(crate) fn open(db: &Arc<Engine>, name: &str) -> Result<Arc<Self>> {
|
pub(crate) fn open(db: &Arc<Engine>, name: &str) -> Result<Arc<Self>> {
|
||||||
|
@ -121,15 +121,9 @@ impl Map {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = KeyVal> + 'a> {
|
pub fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = KeyVal> + 'a> {
|
||||||
|
let mode = IteratorMode::Start;
|
||||||
let read_options = read_options_default();
|
let read_options = read_options_default();
|
||||||
let it = self
|
Box::new(Iter::new(&self.db, &self.cf, read_options, &mode))
|
||||||
.db
|
|
||||||
.db
|
|
||||||
.iterator_cf_opt(&self.cf(), read_options, IteratorMode::Start)
|
|
||||||
.map(Result::unwrap)
|
|
||||||
.map(|(k, v)| (Vec::from(k), Vec::from(v)));
|
|
||||||
|
|
||||||
Box::new(it)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn iter_from<'a>(&'a self, from: &[u8], backwards: bool) -> Box<dyn Iterator<Item = KeyVal> + 'a> {
|
pub fn iter_from<'a>(&'a self, from: &[u8], backwards: bool) -> Box<dyn Iterator<Item = KeyVal> + 'a> {
|
||||||
|
@ -140,28 +134,13 @@ impl Map {
|
||||||
};
|
};
|
||||||
let mode = IteratorMode::From(from, direction);
|
let mode = IteratorMode::From(from, direction);
|
||||||
let read_options = read_options_default();
|
let read_options = read_options_default();
|
||||||
let it = self
|
Box::new(Iter::new(&self.db, &self.cf, read_options, &mode))
|
||||||
.db
|
|
||||||
.db
|
|
||||||
.iterator_cf_opt(&self.cf(), read_options, mode)
|
|
||||||
.map(Result::unwrap)
|
|
||||||
.map(|(k, v)| (Vec::from(k), Vec::from(v)));
|
|
||||||
|
|
||||||
Box::new(it)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn scan_prefix<'a>(&'a self, prefix: Vec<u8>) -> Box<dyn Iterator<Item = KeyVal> + 'a> {
|
pub fn scan_prefix<'a>(&'a self, prefix: Vec<u8>) -> Box<dyn Iterator<Item = KeyVal> + 'a> {
|
||||||
let mode = IteratorMode::From(&prefix, Direction::Forward);
|
let mode = IteratorMode::From(&prefix, Direction::Forward);
|
||||||
let read_options = read_options_default();
|
let read_options = read_options_default();
|
||||||
let it = self
|
Box::new(Iter::new(&self.db, &self.cf, read_options, &mode).take_while(move |(k, _)| k.starts_with(&prefix)))
|
||||||
.db
|
|
||||||
.db
|
|
||||||
.iterator_cf_opt(&self.cf(), read_options, mode)
|
|
||||||
.map(Result::unwrap)
|
|
||||||
.map(|(k, v)| (Vec::from(k), Vec::from(v)))
|
|
||||||
.take_while(move |(k, _)| k.starts_with(&prefix));
|
|
||||||
|
|
||||||
Box::new(it)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn increment(&self, key: &[u8]) -> Result<Vec<u8>> {
|
pub fn increment(&self, key: &[u8]) -> Result<Vec<u8>> {
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
mod cork;
|
mod cork;
|
||||||
mod database;
|
mod database;
|
||||||
mod engine;
|
mod engine;
|
||||||
|
mod iter;
|
||||||
mod map;
|
mod map;
|
||||||
pub mod maps;
|
pub mod maps;
|
||||||
mod opts;
|
mod opts;
|
||||||
|
@ -12,6 +13,7 @@ extern crate rust_rocksdb as rocksdb;
|
||||||
|
|
||||||
pub use database::Database;
|
pub use database::Database;
|
||||||
pub(crate) use engine::Engine;
|
pub(crate) use engine::Engine;
|
||||||
|
pub use iter::Iter;
|
||||||
pub use map::Map;
|
pub use map::Map;
|
||||||
pub(crate) use util::{or_else, result};
|
pub(crate) use util::{or_else, result};
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue