From 966f81ca35dccbf90b27310b3b4e657e21789b0f Mon Sep 17 00:00:00 2001 From: TudbuT Date: Tue, 5 Nov 2024 14:23:16 +0100 Subject: [PATCH] allow shutting down parts of streams --- spl/stream.spl | 3 +++ src/stream/mod.rs | 3 ++- src/stream/rw.rs | 33 ++++++++++++++++++++++++++++++++- 3 files changed, 37 insertions(+), 2 deletions(-) diff --git a/spl/stream.spl b/spl/stream.spl index b8c8784..92365f6 100644 --- a/spl/stream.spl +++ b/spl/stream.spl @@ -45,6 +45,9 @@ construct Stream { flush { | with this ; this:id flush-stream } + shutdown-input { | with this ; + this:id shutdown-input-stream + } close { | with this ; this:id close-stream } diff --git a/src/stream/mod.rs b/src/stream/mod.rs index eb1af17..6feae76 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -25,7 +25,7 @@ pub fn register(r: &mut Stack, o: Arc) { } type Fn = fn(&mut Stack) -> OError; - let fns: [(&str, Fn, u32); 11] = [ + let fns: [(&str, Fn, u32); 12] = [ ("new-stream", new_stream, 1), ("write-stream", write_stream, 1), ("write-all-stream", write_all_stream, 0), @@ -37,6 +37,7 @@ pub fn register(r: &mut Stack, o: Arc) { ("accept-server-stream", accept_server_stream, 1), ("close-server-stream", close_server_stream, 0), ("get-stream-peer", get_stream_peer, 2), + ("shutdown-input-stream", shutdown_input_stream, 0), ]; for f in fns { r.define_func( diff --git a/src/stream/rw.rs b/src/stream/rw.rs index 2bdde3b..196e858 100644 --- a/src/stream/rw.rs +++ b/src/stream/rw.rs @@ -3,7 +3,7 @@ use std::{ hint::black_box, io::{Read, Write}, net::{TcpStream, UdpSocket}, - process::{self, Stdio}, + process::{self, ChildStdin, Stdio}, sync::{Arc, LazyLock}, }; @@ -86,6 +86,16 @@ impl Stream { f(&mut self.extra); self } + + pub fn shutdown_write(&mut self) { + let mut bx = Box::new(IgnoreWrite()); + self.writer = unsafe { + (bx.as_mut() as *mut (dyn Write + Send + Sync + 'static)) + .as_mut() + .unwrap() + }; + self._writer_storage = Some(bx); + } } impl Read for Stream { @@ -132,6 +142,17 @@ impl Write for Stream { } } +struct IgnoreWrite(); +impl Write for IgnoreWrite { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + Ok(buf.len()) + } + + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} + impl From for StreamType where T: Fn(&mut Stack) -> Result + Sync + Send + 'static, @@ -284,6 +305,16 @@ pub fn close_stream(stack: &mut Stack) -> OError { Ok(()) } +pub fn shutdown_input_stream(stack: &mut Stack) -> OError { + require_on_stack!(id, Mega, stack, "shutdown-input-stream"); + let stream = runtime(|rt| { + rt.get_stream(id as u128) + .ok_or_else(|| stack.error(ErrorKind::VariableNotFound(format!("__stream-{id}")))) + })?; + stream.lock().shutdown_write(); + Ok(()) +} + pub(super) fn stream_file(stack: &mut Stack) -> Result { let truncate = stack.pop().lock_ro().is_truthy(); require_on_stack!(path, Str, stack, "FILE new-stream");