allow shutting down parts of streams

This commit is contained in:
Daniella 2024-11-05 14:23:16 +01:00
parent 1b1855fe5a
commit 3445b27fa9
Signed by: TudbuT
GPG key ID: B3CF345217F202D3
3 changed files with 36 additions and 1 deletions

View file

@ -45,6 +45,9 @@ construct Stream {
flush { | with this ;
this:id flush-stream
}
shutdown-input { | with this ;
this:id shutdown-input-stream
}
close { | with this ;
this:id close-stream
}

View file

@ -25,7 +25,7 @@ pub fn register(r: &mut Stack, o: Arc<Frame>) {
}
type Fn = fn(&mut Stack) -> OError;
let fns: [(&str, Fn, u32); 11] = [
let fns: [(&str, Fn, u32); 12] = [
("new-stream", new_stream, 1),
("write-stream", write_stream, 1),
("write-all-stream", write_all_stream, 0),
@ -37,6 +37,7 @@ pub fn register(r: &mut Stack, o: Arc<Frame>) {
("accept-server-stream", accept_server_stream, 1),
("close-server-stream", close_server_stream, 0),
("get-stream-peer", get_stream_peer, 2),
("shutdown-input-stream", shutdown_input_stream, 0),
];
for f in fns {
r.define_func(

View file

@ -86,6 +86,16 @@ impl Stream {
f(&mut self.extra);
self
}
pub fn shutdown_write(&mut self) {
let mut bx = Box::new(IgnoreWrite());
self.writer = unsafe {
(bx.as_mut() as *mut (dyn Write + Send + Sync + 'static))
.as_mut()
.unwrap()
};
self._writer_storage = Some(bx);
}
}
impl Read for Stream {
@ -132,6 +142,17 @@ impl Write for Stream {
}
}
struct IgnoreWrite();
impl Write for IgnoreWrite {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
impl<T> From<T> for StreamType
where
T: Fn(&mut Stack) -> Result<Stream, Error> + Sync + Send + 'static,
@ -284,6 +305,16 @@ pub fn close_stream(stack: &mut Stack) -> OError {
Ok(())
}
pub fn shutdown_input_stream(stack: &mut Stack) -> OError {
require_on_stack!(id, Mega, stack, "shutdown-input-stream");
let stream = runtime(|rt| {
rt.get_stream(id as u128)
.ok_or_else(|| stack.error(ErrorKind::VariableNotFound(format!("__stream-{id}"))))
})?;
stream.lock().shutdown_write();
Ok(())
}
pub(super) fn stream_file(stack: &mut Stack) -> Result<Stream, Error> {
let truncate = stack.pop().lock_ro().is_truthy();
require_on_stack!(path, Str, stack, "FILE new-stream");