diff --git a/Cargo.lock b/Cargo.lock index 6c36619..d9e44c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,12 +8,6 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "428f6ba17d0c927e57c15a86cf5d7d07a2f35b3fbf15b1eb36b7075459e150a3" -[[package]] -name = "once_cell" -version = "1.17.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7e5500299e16ebb147ae15a00a942af264cf3688f47923b8fc2cd5858f23ad3" - [[package]] name = "readformat" version = "0.1.2" @@ -25,6 +19,5 @@ name = "spl" version = "0.3.2" dependencies = [ "multicall", - "once_cell", "readformat", ] diff --git a/Cargo.toml b/Cargo.toml index 52950ef..2e2e15f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,5 +9,4 @@ authors = ["TudbuT"] [dependencies] readformat = "0.1" -once_cell = "1.17" multicall = "0.1" diff --git a/isbpl.spl b/isbpl.spl index 4e4c420..d217418 100644 --- a/isbpl.spl +++ b/isbpl.spl @@ -36,7 +36,10 @@ func dec { pop } func getms { - 0 time + 0 time:unixms +} +func sleep { + time:sleep } func main { } diff --git a/server.spl b/server.spl new file mode 100644 index 0000000..4b0bf7f --- /dev/null +++ b/server.spl @@ -0,0 +1,63 @@ +"#net.spl" import +"#stream.spl" import + +"server" net:register + +construct net:server namespace { + ServerStream + Type + _Types + types + ; + Types { _Types | with this ; + this:_Types:new + } + register-type { | with id this ; + this:types id awrap aadd this:=types + id this:_Types dyn-def-field; + } +} + +0 anew net:server:=types + +construct net:server:Type { + id + ; + construct { this | with id this ; + id this:=id + this + } + create { ServerStream | with this ; + this:id net:server:ServerStream:new + } +} + +construct net:server:_Types { + ; + construct { this | with this ; + { | with type ; + "type net:server:Type:new this:="; + (type net:server:Type:new) (this ("=" type concat)) dyn-objcall + } net:server:types:foreach; + this + } +} + +"tcp" net:server:register-type + +construct net:server:ServerStream { + id + ; + construct { this | with type this ; + type new-server-stream this:=id + this + } + accept { Stream | with this ; + def stream null Stream settype =stream + this:id accept-server-stream stream:=id + stream + } + close { | with this ; + this:id close-server-stream; + } +} diff --git a/src/runtime.rs b/src/runtime.rs index 2133ae8..1ddf739 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -72,6 +72,7 @@ pub struct Runtime { types_by_id: HashMap, next_stream_id: u128, streams: HashMap>>, + server_streams: HashMap>>, pub embedded_files: HashMap<&'static str, &'static str>, pub native_functions: HashMap<&'static str, (u32, FuncImpl)>, } @@ -101,6 +102,7 @@ impl Runtime { types_by_id: HashMap::new(), next_stream_id: 0, streams: HashMap::new(), + server_streams: HashMap::new(), embedded_files: HashMap::new(), native_functions: HashMap::new(), }; @@ -160,6 +162,23 @@ impl Runtime { self.streams.remove(&id); } + pub fn register_server_stream( + &mut self, + stream: ServerStream, + ) -> (u128, Arc>) { + let id = (self.next_stream_id, self.next_stream_id += 1).0; + self.server_streams.insert(id, Arc::new(Mut::new(stream))); + (id, self.server_streams.get(&id).unwrap().clone()) + } + + pub fn get_server_stream(&self, id: u128) -> Option>> { + self.server_streams.get(&id).cloned() + } + + pub fn destroy_server_stream(&mut self, id: u128) { + self.server_streams.remove(&id); + } + pub fn load_native_function(&self, name: &str) -> &(u32, FuncImpl) { self.native_functions.get(name).unwrap_or_else(|| { panic!( diff --git a/src/std_fns.rs b/src/std_fns.rs index 5c87e79..a04b3fb 100644 --- a/src/std_fns.rs +++ b/src/std_fns.rs @@ -935,7 +935,7 @@ pub fn register(r: &mut Stack, o: Arc) { ("write-sasm", write_sasm, 1), ("write-file-sasm", write_file_sasm, 1), ("fork", fork, 0), - ("time", time, 0), + ("sleeptime", time, 0), ]; for f in fns { r.define_func( diff --git a/src/stream/mod.rs b/src/stream/mod.rs new file mode 100644 index 0000000..88649b7 --- /dev/null +++ b/src/stream/mod.rs @@ -0,0 +1,48 @@ +mod rw; +mod server; +pub use rw::*; +pub use server::*; + +use std::sync::{Arc, LazyLock}; + +use crate::{mutex::Mut, runtime::*}; + +static IS_INITIALIZED: LazyLock>> = LazyLock::new(|| Arc::new(Mut::new(false))); + +pub fn register(r: &mut Stack, o: Arc) { + if !*IS_INITIALIZED.lock_ro() { + register_stream_type("file", stream_file); + register_stream_type("tcp", stream_tcp); + register_stream_type("udp", stream_udp); + register_stream_type("cmd", stream_cmd); + register_server_stream_type("tcp", server_stream_tcp); + *IS_INITIALIZED.lock() = true; + } + + type Fn = fn(&mut Stack) -> OError; + let fns: [(&str, Fn, u32); 10] = [ + ("new-stream", new_stream, 1), + ("write-stream", write_stream, 1), + ("write-all-stream", write_all_stream, 0), + ("flush-stream", flush_stream, 0), + ("read-stream", read_stream, 1), + ("read-all-stream", read_all_stream, 0), + ("close-stream", close_stream, 0), + ("new-server-stream", new_server_stream, 1), + ("accept-server-stream", accept_server_stream, 1), + ("close-server-stream", close_server_stream, 0), + ]; + for f in fns { + r.define_func( + f.0.to_owned(), + AFunc::new(Func { + ret_count: f.2, + to_call: FuncImpl::Native(f.1), + run_as_base: false, + origin: o.clone(), + fname: None, + name: f.0.to_owned(), + }), + ); + } +} diff --git a/src/stream.rs b/src/stream/rw.rs similarity index 74% rename from src/stream.rs rename to src/stream/rw.rs index 1e14867..cd2514c 100644 --- a/src/stream.rs +++ b/src/stream/rw.rs @@ -1,20 +1,20 @@ use std::{ + borrow::BorrowMut, collections::HashMap, - fs::OpenOptions, + hint::black_box, io::{Read, Write}, - mem, - net::{Shutdown, TcpStream, UdpSocket}, + net::{TcpStream, UdpSocket}, process::{self, Stdio}, - sync::Arc, + sync::{Arc, LazyLock}, }; -use once_cell::sync::Lazy; +use crate::*; -use crate::{mutex::Mut, runtime::*, *}; +use fs::OpenOptions; +use mutex::Mut; -static STREAM_TYPES: Lazy>>> = - Lazy::new(|| Arc::new(Mut::new(HashMap::new()))); -static IS_INITIALIZED: Lazy>> = Lazy::new(|| Arc::new(Mut::new(false))); +static STREAM_TYPES: LazyLock>>> = + LazyLock::new(|| Arc::new(Mut::new(HashMap::new()))); /// Registers a custom stream type. pub fn register_stream_type( @@ -45,32 +45,37 @@ impl StreamType { /// An SPL stream, holding a reader and a writer, and a function to close it. pub struct Stream { - reader: Box, - writer: Box, - close: fn(&mut Self), + pub(super) reader: Box, + pub(super) _writer_storage: Option>, + pub(super) writer: &'static mut (dyn Write + Send + Sync + 'static), } impl Stream { - pub fn new(main: T, close: fn(&mut Self)) -> Self { + pub fn new(main: T) -> Self { let mut rw = Box::new(main); Self { - // SAFETY: Because these are both in private fields on one object, they can not be - // written to simultaneously or read from while writing due to the guards put in place - // by the borrow checker on the Stream. - reader: Box::new(unsafe { mem::transmute::<&mut _, &mut T>(rw.as_mut()) }), - writer: rw, - close, + writer: unsafe { + (rw.as_mut() as *mut (dyn Write + Send + Sync + 'static)) + .as_mut() + .unwrap() + }, + _writer_storage: None, + reader: rw, } } pub fn new_split( reader: impl Read + Send + Sync + 'static, writer: impl Write + Send + Sync + 'static, - close: fn(&mut Self), ) -> Self { + let mut bx = Box::new(writer); Self { reader: Box::new(reader), - writer: Box::new(writer), - close, + writer: unsafe { + (bx.as_mut() as *mut (dyn Write + Send + Sync + 'static)) + .as_mut() + .unwrap() + }, + _writer_storage: Some(bx), } } } @@ -124,14 +129,14 @@ where T: Fn(&mut Stack) -> Result + Sync + Send + 'static, { fn from(value: T) -> Self { - StreamType { + Self { func: Arc::new(Box::new(value)), } } } pub fn new_stream(stack: &mut Stack) -> OError { - require_on_stack!(s, Str, stack, "write-stream"); + require_on_stack!(s, Str, stack, "new-stream"); let stream = get_stream_type(s.clone()) .ok_or_else(|| stack.error(ErrorKind::VariableNotFound(format!("__stream-type-{s}"))))? .make_stream(stack)?; @@ -163,6 +168,7 @@ pub fn write_stream(stack: &mut Stack) -> OError { ) .spl(), ); + black_box(&stream.lock_ro()._writer_storage); Ok(()) } @@ -184,6 +190,7 @@ pub fn write_all_stream(stack: &mut Stack) -> OError { .lock() .write_all(&fixed[..]) .map_err(|x| stack.error(ErrorKind::IO(format!("{x:?}"))))?; + black_box(&stream.lock_ro()._writer_storage); Ok(()) } @@ -197,6 +204,7 @@ pub fn flush_stream(stack: &mut Stack) -> OError { .lock() .flush() .map_err(|x| stack.error(ErrorKind::IO(format!("{x:?}"))))?; + black_box(&stream.lock_ro()._writer_storage); Ok(()) } @@ -256,16 +264,11 @@ pub fn read_all_stream(stack: &mut Stack) -> OError { pub fn close_stream(stack: &mut Stack) -> OError { require_on_stack!(id, Mega, stack, "close-stream"); - if let Some(stream) = runtime(|rt| rt.get_stream(id as u128)) { - let mut stream = stream.lock(); - (stream.close)(&mut stream); - } + runtime_mut(|mut rt| rt.destroy_stream(id as u128)); Ok(()) } -fn nop(_stream: &mut Stream) {} - -fn stream_file(stack: &mut Stack) -> Result { +pub(super) fn stream_file(stack: &mut Stack) -> Result { let truncate = stack.pop().lock_ro().is_truthy(); require_on_stack!(path, Str, stack, "FILE new-stream"); Ok(Stream::new( @@ -276,34 +279,23 @@ fn stream_file(stack: &mut Stack) -> Result { .truncate(truncate) .open(path) .map_err(|x| stack.error(ErrorKind::IO(x.to_string())))?, - nop, )) } -fn stream_tcp(stack: &mut Stack) -> Result { +pub(super) fn stream_tcp(stack: &mut Stack) -> Result { require_int_on_stack!(port, stack, "TCP new-stream"); require_on_stack!(ip, Str, stack, "TCP new-stream"); - fn close_tcp(stream: &mut Stream) { - unsafe { - let f = ((stream.reader.as_mut() as *mut dyn Read).cast() as *mut TcpStream) - .as_mut() - .unwrap(); - let _ = f.shutdown(Shutdown::Both); - } - } Ok(Stream::new( TcpStream::connect((ip, port as u16)) .map_err(|x| stack.error(ErrorKind::IO(x.to_string())))?, - close_tcp, )) } -fn stream_udp(stack: &mut Stack) -> Result { +pub(super) fn stream_udp(stack: &mut Stack) -> Result { require_int_on_stack!(port, stack, "UDP new-stream"); require_on_stack!(ip, Str, stack, "UDP new-stream"); require_int_on_stack!(self_port, stack, "UDP new-stream"); require_on_stack!(self_ip, Str, stack, "UDP new-stream"); - fn close_udp(_stream: &mut Stream) {} let sock = UdpSocket::bind((self_ip, self_port as u16)) .map_err(|x| stack.error(ErrorKind::IO(x.to_string())))?; sock.connect((ip, port as u16)) @@ -323,12 +315,11 @@ fn stream_udp(stack: &mut Stack) -> Result { self.0.recv(buf) } } - Ok(Stream::new(UdpRW(sock), close_udp)) + Ok(Stream::new(UdpRW(sock))) } -fn stream_cmd(stack: &mut Stack) -> Result { +pub(super) fn stream_cmd(stack: &mut Stack) -> Result { require_on_stack!(a, Array, stack, "CMD new-stream"); - fn close_cmd(_stream: &mut Stream) {} let mut args = Vec::new(); for item in a.iter() { if let Value::Str(ref s) = item.lock_ro().native { @@ -348,40 +339,5 @@ fn stream_cmd(stack: &mut Stack) -> Result { Ok(Stream::new_split( command.stdout.take().unwrap(), command.stdin.take().unwrap(), - close_cmd, )) } - -pub fn register(r: &mut Stack, o: Arc) { - if !*IS_INITIALIZED.lock_ro() { - register_stream_type("file", stream_file); - register_stream_type("tcp", stream_tcp); - register_stream_type("udp", stream_udp); - register_stream_type("cmd", stream_cmd); - *IS_INITIALIZED.lock() = true; - } - - type Fn = fn(&mut Stack) -> OError; - let fns: [(&str, Fn, u32); 7] = [ - ("new-stream", new_stream, 1), - ("write-stream", write_stream, 1), - ("write-all-stream", write_all_stream, 0), - ("flush-stream", flush_stream, 0), - ("read-stream", read_stream, 1), - ("read-all-stream", read_all_stream, 0), - ("close-stream", close_stream, 0), - ]; - for f in fns { - r.define_func( - f.0.to_owned(), - AFunc::new(Func { - ret_count: f.2, - to_call: FuncImpl::Native(f.1), - run_as_base: false, - origin: o.clone(), - fname: None, - name: f.0.to_owned(), - }), - ); - } -} diff --git a/src/stream/server.rs b/src/stream/server.rs new file mode 100644 index 0000000..7dbeeef --- /dev/null +++ b/src/stream/server.rs @@ -0,0 +1,105 @@ +use std::{ + collections::HashMap, + net::TcpListener, + sync::{Arc, LazyLock}, +}; + +use crate::{mutex::Mut, *}; + +use super::Stream; + +static SERVER_STREAM_TYPES: LazyLock>>> = + LazyLock::new(|| Arc::new(Mut::new(HashMap::new()))); + +/// Registers a custom stream type. +pub fn register_server_stream_type( + name: &str, + supplier: impl Fn(&mut Stack) -> Result + Sync + Send + 'static, +) { + SERVER_STREAM_TYPES + .lock() + .insert(name.to_owned(), ServerStreamType::from(supplier)); +} + +/// Gets a stream type by name. +pub fn get_server_stream_type(name: String) -> Option { + SERVER_STREAM_TYPES.lock_ro().get(&name).cloned() +} + +/// An SPL stream type. +#[derive(Clone)] +pub struct ServerStreamType { + func: Arc Result + Sync + Send + 'static>>, +} + +impl ServerStreamType { + pub fn make_stream(&self, stack: &mut Stack) -> Result { + (self.func)(stack) + } +} + +impl From for ServerStreamType +where + T: Fn(&mut Stack) -> Result + Sync + Send + 'static, +{ + fn from(value: T) -> Self { + Self { + func: Arc::new(Box::new(value)), + } + } +} + +/// An SPL server stream, holding an acceptor and a function to close it. +pub struct ServerStream { + pub(super) acceptor: Box Result + Sync + Send + 'static>, +} +impl ServerStream { + pub fn new( + acceptor: impl Fn(&mut Stack) -> Result + Sync + Send + 'static, + ) -> Self { + Self { + acceptor: Box::new(acceptor), + } + } +} + +pub fn new_server_stream(stack: &mut Stack) -> OError { + require_on_stack!(s, Str, stack, "new-stream"); + let stream = get_server_stream_type(s.clone()) + .ok_or_else(|| stack.error(ErrorKind::VariableNotFound(format!("__stream-type-{s}"))))? + .make_stream(stack)?; + let stream = runtime_mut(move |mut rt| Ok(rt.register_server_stream(stream)))?; + stack.push(Value::Mega(stream.0 as i128).spl()); + Ok(()) +} + +pub fn accept_server_stream(stack: &mut Stack) -> OError { + require_on_stack!(id, Mega, stack, "accept-server-stream"); + let stream = runtime(|rt| { + rt.get_server_stream(id as u128) + .ok_or_else(|| stack.error(ErrorKind::VariableNotFound(format!("__stream-{id}")))) + })?; + let stream = (stream.lock_ro().acceptor)(stack)?; + stack.push((runtime_mut(move |mut rt| rt.register_stream(stream)).0 as i128).spl()); + Ok(()) +} + +pub fn close_server_stream(stack: &mut Stack) -> OError { + require_on_stack!(id, Mega, stack, "close-server-stream"); + runtime_mut(|mut rt| rt.destroy_server_stream(id as u128)); + Ok(()) +} + +pub(crate) fn server_stream_tcp(stack: &mut Stack) -> Result { + require_int_on_stack!(port, stack, "TCP server-stream"); + require_on_stack!(addr, Str, stack, "TCP server-stream"); + let tcp = TcpListener::bind((addr, port as u16)) + .map_err(|e| stack.error(ErrorKind::IO(format!("{e:?}"))))?; + let stream = ServerStream::new(move |stack| { + let socket = tcp + .accept() + .map_err(|e| stack.error(ErrorKind::IO(format!("{e:?}"))))?; + Ok(Stream::new(socket.0)) + }); + Ok(stream) +} diff --git a/stream.spl b/stream.spl index b948641..05bb021 100644 --- a/stream.spl +++ b/stream.spl @@ -66,7 +66,7 @@ construct StreamType { def stream-types 0 anew =stream-types -construct _StreamType { +construct _StreamTypes { ; construct { this | with this ; { | with type ; @@ -79,7 +79,7 @@ construct _StreamType { func register-stream-type { | with id ; stream-types [ id ] aadd =stream-types - id _StreamType dyn-def-field + id _StreamTypes dyn-def-field } "tcp" register-stream-type @@ -87,6 +87,6 @@ func register-stream-type { | with id ; "file" register-stream-type "cmd" register-stream-type -func StreamTypes { _StreamType | - _StreamType:new +func StreamTypes { _StreamTypes | + _StreamTypes:new } diff --git a/test.spl b/test.spl index 3ae0d51..cdea71c 100644 --- a/test.spl +++ b/test.spl @@ -2,6 +2,8 @@ "#stream.spl" import "#http.spl" import "#messaging.spl" import +"#server.spl" import +"#time.spl" import "SPL tester" =program-name @@ -160,6 +162,24 @@ func main { int | with args ; 1 =other-thread-done } fork while { other-thread-done not } { "waiting for the other thread..." println } + + "" println + "testing tcp server" println + + " starting server thread" println + { | + def server "0.0.0.0" 4075 net:server:Types:tcp:create =server + while { 1 } { + def stream server:accept =stream + "Hello!" :to-bytes stream:write-exact; + stream:close; + } + } fork; + 50 time:sleep; + " starting client" println; + def client "localhost" 4075 StreamTypes:tcp:create =client + 1024 client:read-to-end:to-str println; + " ^ this should say 'Hello!'" println; 100 } diff --git a/time.spl b/time.spl index d444fd0..8f65a40 100644 --- a/time.spl +++ b/time.spl @@ -1,6 +1,9 @@ -func unixms { mega | - 0 time +construct time namespace { + ; + unixms { mega | with this ; + 0 sleeptime + } + sleep { mega | with this ; _mega sleeptime } } -func sleep { mega | time }