allow shutting down parts of streams
This commit is contained in:
parent
1b1855fe5a
commit
966f81ca35
3 changed files with 37 additions and 2 deletions
|
@ -45,6 +45,9 @@ construct Stream {
|
||||||
flush { | with this ;
|
flush { | with this ;
|
||||||
this:id flush-stream
|
this:id flush-stream
|
||||||
}
|
}
|
||||||
|
shutdown-input { | with this ;
|
||||||
|
this:id shutdown-input-stream
|
||||||
|
}
|
||||||
close { | with this ;
|
close { | with this ;
|
||||||
this:id close-stream
|
this:id close-stream
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,7 +25,7 @@ pub fn register(r: &mut Stack, o: Arc<Frame>) {
|
||||||
}
|
}
|
||||||
|
|
||||||
type Fn = fn(&mut Stack) -> OError;
|
type Fn = fn(&mut Stack) -> OError;
|
||||||
let fns: [(&str, Fn, u32); 11] = [
|
let fns: [(&str, Fn, u32); 12] = [
|
||||||
("new-stream", new_stream, 1),
|
("new-stream", new_stream, 1),
|
||||||
("write-stream", write_stream, 1),
|
("write-stream", write_stream, 1),
|
||||||
("write-all-stream", write_all_stream, 0),
|
("write-all-stream", write_all_stream, 0),
|
||||||
|
@ -37,6 +37,7 @@ pub fn register(r: &mut Stack, o: Arc<Frame>) {
|
||||||
("accept-server-stream", accept_server_stream, 1),
|
("accept-server-stream", accept_server_stream, 1),
|
||||||
("close-server-stream", close_server_stream, 0),
|
("close-server-stream", close_server_stream, 0),
|
||||||
("get-stream-peer", get_stream_peer, 2),
|
("get-stream-peer", get_stream_peer, 2),
|
||||||
|
("shutdown-input-stream", shutdown_input_stream, 0),
|
||||||
];
|
];
|
||||||
for f in fns {
|
for f in fns {
|
||||||
r.define_func(
|
r.define_func(
|
||||||
|
|
|
@ -3,7 +3,7 @@ use std::{
|
||||||
hint::black_box,
|
hint::black_box,
|
||||||
io::{Read, Write},
|
io::{Read, Write},
|
||||||
net::{TcpStream, UdpSocket},
|
net::{TcpStream, UdpSocket},
|
||||||
process::{self, Stdio},
|
process::{self, ChildStdin, Stdio},
|
||||||
sync::{Arc, LazyLock},
|
sync::{Arc, LazyLock},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -86,6 +86,16 @@ impl Stream {
|
||||||
f(&mut self.extra);
|
f(&mut self.extra);
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn shutdown_write(&mut self) {
|
||||||
|
let mut bx = Box::new(IgnoreWrite());
|
||||||
|
self.writer = unsafe {
|
||||||
|
(bx.as_mut() as *mut (dyn Write + Send + Sync + 'static))
|
||||||
|
.as_mut()
|
||||||
|
.unwrap()
|
||||||
|
};
|
||||||
|
self._writer_storage = Some(bx);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Read for Stream {
|
impl Read for Stream {
|
||||||
|
@ -132,6 +142,17 @@ impl Write for Stream {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct IgnoreWrite();
|
||||||
|
impl Write for IgnoreWrite {
|
||||||
|
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
||||||
|
Ok(buf.len())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn flush(&mut self) -> std::io::Result<()> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl<T> From<T> for StreamType
|
impl<T> From<T> for StreamType
|
||||||
where
|
where
|
||||||
T: Fn(&mut Stack) -> Result<Stream, Error> + Sync + Send + 'static,
|
T: Fn(&mut Stack) -> Result<Stream, Error> + Sync + Send + 'static,
|
||||||
|
@ -284,6 +305,16 @@ pub fn close_stream(stack: &mut Stack) -> OError {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn shutdown_input_stream(stack: &mut Stack) -> OError {
|
||||||
|
require_on_stack!(id, Mega, stack, "shutdown-input-stream");
|
||||||
|
let stream = runtime(|rt| {
|
||||||
|
rt.get_stream(id as u128)
|
||||||
|
.ok_or_else(|| stack.error(ErrorKind::VariableNotFound(format!("__stream-{id}"))))
|
||||||
|
})?;
|
||||||
|
stream.lock().shutdown_write();
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub(super) fn stream_file(stack: &mut Stack) -> Result<Stream, Error> {
|
pub(super) fn stream_file(stack: &mut Stack) -> Result<Stream, Error> {
|
||||||
let truncate = stack.pop().lock_ro().is_truthy();
|
let truncate = stack.pop().lock_ro().is_truthy();
|
||||||
require_on_stack!(path, Str, stack, "FILE new-stream");
|
require_on_stack!(path, Str, stack, "FILE new-stream");
|
||||||
|
|
Loading…
Add table
Reference in a new issue