diff --git a/spl/stream.spl b/spl/stream.spl
index b8c8784..92365f6 100644
--- a/spl/stream.spl
+++ b/spl/stream.spl
@@ -45,6 +45,9 @@ construct Stream {
flush { | with this ;
this:id flush-stream
}
+ shutdown-input { | with this ;
+ this:id shutdown-input-stream
+ }
close { | with this ;
this:id close-stream
}
diff --git a/src/stream/mod.rs b/src/stream/mod.rs
index eb1af17..6feae76 100644
--- a/src/stream/mod.rs
+++ b/src/stream/mod.rs
@@ -25,7 +25,7 @@ pub fn register(r: &mut Stack, o: Arc) {
}
type Fn = fn(&mut Stack) -> OError;
- let fns: [(&str, Fn, u32); 11] = [
+ let fns: [(&str, Fn, u32); 12] = [
("new-stream", new_stream, 1),
("write-stream", write_stream, 1),
("write-all-stream", write_all_stream, 0),
@@ -37,6 +37,7 @@ pub fn register(r: &mut Stack, o: Arc) {
("accept-server-stream", accept_server_stream, 1),
("close-server-stream", close_server_stream, 0),
("get-stream-peer", get_stream_peer, 2),
+ ("shutdown-input-stream", shutdown_input_stream, 0),
];
for f in fns {
r.define_func(
diff --git a/src/stream/rw.rs b/src/stream/rw.rs
index 2bdde3b..196e858 100644
--- a/src/stream/rw.rs
+++ b/src/stream/rw.rs
@@ -3,7 +3,7 @@ use std::{
hint::black_box,
io::{Read, Write},
net::{TcpStream, UdpSocket},
- process::{self, Stdio},
+ process::{self, ChildStdin, Stdio},
sync::{Arc, LazyLock},
};
@@ -86,6 +86,16 @@ impl Stream {
f(&mut self.extra);
self
}
+
+ pub fn shutdown_write(&mut self) {
+ let mut bx = Box::new(IgnoreWrite());
+ self.writer = unsafe {
+ (bx.as_mut() as *mut (dyn Write + Send + Sync + 'static))
+ .as_mut()
+ .unwrap()
+ };
+ self._writer_storage = Some(bx);
+ }
}
impl Read for Stream {
@@ -132,6 +142,17 @@ impl Write for Stream {
}
}
+struct IgnoreWrite();
+impl Write for IgnoreWrite {
+ fn write(&mut self, buf: &[u8]) -> std::io::Result {
+ Ok(buf.len())
+ }
+
+ fn flush(&mut self) -> std::io::Result<()> {
+ Ok(())
+ }
+}
+
impl From for StreamType
where
T: Fn(&mut Stack) -> Result + Sync + Send + 'static,
@@ -284,6 +305,16 @@ pub fn close_stream(stack: &mut Stack) -> OError {
Ok(())
}
+pub fn shutdown_input_stream(stack: &mut Stack) -> OError {
+ require_on_stack!(id, Mega, stack, "shutdown-input-stream");
+ let stream = runtime(|rt| {
+ rt.get_stream(id as u128)
+ .ok_or_else(|| stack.error(ErrorKind::VariableNotFound(format!("__stream-{id}"))))
+ })?;
+ stream.lock().shutdown_write();
+ Ok(())
+}
+
pub(super) fn stream_file(stack: &mut Stack) -> Result {
let truncate = stack.pop().lock_ro().is_truthy();
require_on_stack!(path, Str, stack, "FILE new-stream");