add tcp server

This commit is contained in:
Daniella / Tove 2024-09-09 16:31:35 +02:00
parent a1f5941c1a
commit f86c55dd83
12 changed files with 308 additions and 99 deletions

7
Cargo.lock generated
View file

@ -8,12 +8,6 @@ version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "428f6ba17d0c927e57c15a86cf5d7d07a2f35b3fbf15b1eb36b7075459e150a3"
[[package]]
name = "once_cell"
version = "1.17.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b7e5500299e16ebb147ae15a00a942af264cf3688f47923b8fc2cd5858f23ad3"
[[package]]
name = "readformat"
version = "0.1.2"
@ -25,6 +19,5 @@ name = "spl"
version = "0.3.2"
dependencies = [
"multicall",
"once_cell",
"readformat",
]

View file

@ -9,5 +9,4 @@ authors = ["TudbuT"]
[dependencies]
readformat = "0.1"
once_cell = "1.17"
multicall = "0.1"

View file

@ -36,7 +36,10 @@ func dec {
pop
}
func getms {
0 time
0 time:unixms
}
func sleep {
time:sleep
}
func main { }

63
server.spl Normal file
View file

@ -0,0 +1,63 @@
"#net.spl" import
"#stream.spl" import
"server" net:register
construct net:server namespace {
ServerStream
Type
_Types
types
;
Types { _Types | with this ;
this:_Types:new
}
register-type { | with id this ;
this:types id awrap aadd this:=types
id this:_Types dyn-def-field;
}
}
0 anew net:server:=types
construct net:server:Type {
id
;
construct { this | with id this ;
id this:=id
this
}
create { ServerStream | with this ;
this:id net:server:ServerStream:new
}
}
construct net:server:_Types {
;
construct { this | with this ;
{ | with type ;
"type net:server:Type:new this:=<type>";
(type net:server:Type:new) (this ("=" type concat)) dyn-objcall
} net:server:types:foreach;
this
}
}
"tcp" net:server:register-type
construct net:server:ServerStream {
id
;
construct { this | with type this ;
type new-server-stream this:=id
this
}
accept { Stream | with this ;
def stream null Stream settype =stream
this:id accept-server-stream stream:=id
stream
}
close { | with this ;
this:id close-server-stream;
}
}

View file

@ -72,6 +72,7 @@ pub struct Runtime {
types_by_id: HashMap<u32, AMType>,
next_stream_id: u128,
streams: HashMap<u128, Arc<Mut<Stream>>>,
server_streams: HashMap<u128, Arc<Mut<ServerStream>>>,
pub embedded_files: HashMap<&'static str, &'static str>,
pub native_functions: HashMap<&'static str, (u32, FuncImpl)>,
}
@ -101,6 +102,7 @@ impl Runtime {
types_by_id: HashMap::new(),
next_stream_id: 0,
streams: HashMap::new(),
server_streams: HashMap::new(),
embedded_files: HashMap::new(),
native_functions: HashMap::new(),
};
@ -160,6 +162,23 @@ impl Runtime {
self.streams.remove(&id);
}
pub fn register_server_stream(
&mut self,
stream: ServerStream,
) -> (u128, Arc<Mut<ServerStream>>) {
let id = (self.next_stream_id, self.next_stream_id += 1).0;
self.server_streams.insert(id, Arc::new(Mut::new(stream)));
(id, self.server_streams.get(&id).unwrap().clone())
}
pub fn get_server_stream(&self, id: u128) -> Option<Arc<Mut<ServerStream>>> {
self.server_streams.get(&id).cloned()
}
pub fn destroy_server_stream(&mut self, id: u128) {
self.server_streams.remove(&id);
}
pub fn load_native_function(&self, name: &str) -> &(u32, FuncImpl) {
self.native_functions.get(name).unwrap_or_else(|| {
panic!(

View file

@ -935,7 +935,7 @@ pub fn register(r: &mut Stack, o: Arc<Frame>) {
("write-sasm", write_sasm, 1),
("write-file-sasm", write_file_sasm, 1),
("fork", fork, 0),
("time", time, 0),
("sleeptime", time, 0),
];
for f in fns {
r.define_func(

48
src/stream/mod.rs Normal file
View file

@ -0,0 +1,48 @@
mod rw;
mod server;
pub use rw::*;
pub use server::*;
use std::sync::{Arc, LazyLock};
use crate::{mutex::Mut, runtime::*};
static IS_INITIALIZED: LazyLock<Arc<Mut<bool>>> = LazyLock::new(|| Arc::new(Mut::new(false)));
pub fn register(r: &mut Stack, o: Arc<Frame>) {
if !*IS_INITIALIZED.lock_ro() {
register_stream_type("file", stream_file);
register_stream_type("tcp", stream_tcp);
register_stream_type("udp", stream_udp);
register_stream_type("cmd", stream_cmd);
register_server_stream_type("tcp", server_stream_tcp);
*IS_INITIALIZED.lock() = true;
}
type Fn = fn(&mut Stack) -> OError;
let fns: [(&str, Fn, u32); 10] = [
("new-stream", new_stream, 1),
("write-stream", write_stream, 1),
("write-all-stream", write_all_stream, 0),
("flush-stream", flush_stream, 0),
("read-stream", read_stream, 1),
("read-all-stream", read_all_stream, 0),
("close-stream", close_stream, 0),
("new-server-stream", new_server_stream, 1),
("accept-server-stream", accept_server_stream, 1),
("close-server-stream", close_server_stream, 0),
];
for f in fns {
r.define_func(
f.0.to_owned(),
AFunc::new(Func {
ret_count: f.2,
to_call: FuncImpl::Native(f.1),
run_as_base: false,
origin: o.clone(),
fname: None,
name: f.0.to_owned(),
}),
);
}
}

View file

@ -1,20 +1,20 @@
use std::{
borrow::BorrowMut,
collections::HashMap,
fs::OpenOptions,
hint::black_box,
io::{Read, Write},
mem,
net::{Shutdown, TcpStream, UdpSocket},
net::{TcpStream, UdpSocket},
process::{self, Stdio},
sync::Arc,
sync::{Arc, LazyLock},
};
use once_cell::sync::Lazy;
use crate::*;
use crate::{mutex::Mut, runtime::*, *};
use fs::OpenOptions;
use mutex::Mut;
static STREAM_TYPES: Lazy<Arc<Mut<HashMap<String, StreamType>>>> =
Lazy::new(|| Arc::new(Mut::new(HashMap::new())));
static IS_INITIALIZED: Lazy<Arc<Mut<bool>>> = Lazy::new(|| Arc::new(Mut::new(false)));
static STREAM_TYPES: LazyLock<Arc<Mut<HashMap<String, StreamType>>>> =
LazyLock::new(|| Arc::new(Mut::new(HashMap::new())));
/// Registers a custom stream type.
pub fn register_stream_type(
@ -45,32 +45,37 @@ impl StreamType {
/// An SPL stream, holding a reader and a writer, and a function to close it.
pub struct Stream {
reader: Box<dyn Read + Send + Sync + 'static>,
writer: Box<dyn Write + Send + Sync + 'static>,
close: fn(&mut Self),
pub(super) reader: Box<dyn Read + Send + Sync + 'static>,
pub(super) _writer_storage: Option<Box<dyn Write + Send + Sync + 'static>>,
pub(super) writer: &'static mut (dyn Write + Send + Sync + 'static),
}
impl Stream {
pub fn new<T: Read + Write + Send + Sync + 'static>(main: T, close: fn(&mut Self)) -> Self {
pub fn new<T: Read + Write + Send + Sync + 'static>(main: T) -> Self {
let mut rw = Box::new(main);
Self {
// SAFETY: Because these are both in private fields on one object, they can not be
// written to simultaneously or read from while writing due to the guards put in place
// by the borrow checker on the Stream.
reader: Box::new(unsafe { mem::transmute::<&mut _, &mut T>(rw.as_mut()) }),
writer: rw,
close,
writer: unsafe {
(rw.as_mut() as *mut (dyn Write + Send + Sync + 'static))
.as_mut()
.unwrap()
},
_writer_storage: None,
reader: rw,
}
}
pub fn new_split(
reader: impl Read + Send + Sync + 'static,
writer: impl Write + Send + Sync + 'static,
close: fn(&mut Self),
) -> Self {
let mut bx = Box::new(writer);
Self {
reader: Box::new(reader),
writer: Box::new(writer),
close,
writer: unsafe {
(bx.as_mut() as *mut (dyn Write + Send + Sync + 'static))
.as_mut()
.unwrap()
},
_writer_storage: Some(bx),
}
}
}
@ -124,14 +129,14 @@ where
T: Fn(&mut Stack) -> Result<Stream, Error> + Sync + Send + 'static,
{
fn from(value: T) -> Self {
StreamType {
Self {
func: Arc::new(Box::new(value)),
}
}
}
pub fn new_stream(stack: &mut Stack) -> OError {
require_on_stack!(s, Str, stack, "write-stream");
require_on_stack!(s, Str, stack, "new-stream");
let stream = get_stream_type(s.clone())
.ok_or_else(|| stack.error(ErrorKind::VariableNotFound(format!("__stream-type-{s}"))))?
.make_stream(stack)?;
@ -163,6 +168,7 @@ pub fn write_stream(stack: &mut Stack) -> OError {
)
.spl(),
);
black_box(&stream.lock_ro()._writer_storage);
Ok(())
}
@ -184,6 +190,7 @@ pub fn write_all_stream(stack: &mut Stack) -> OError {
.lock()
.write_all(&fixed[..])
.map_err(|x| stack.error(ErrorKind::IO(format!("{x:?}"))))?;
black_box(&stream.lock_ro()._writer_storage);
Ok(())
}
@ -197,6 +204,7 @@ pub fn flush_stream(stack: &mut Stack) -> OError {
.lock()
.flush()
.map_err(|x| stack.error(ErrorKind::IO(format!("{x:?}"))))?;
black_box(&stream.lock_ro()._writer_storage);
Ok(())
}
@ -256,16 +264,11 @@ pub fn read_all_stream(stack: &mut Stack) -> OError {
pub fn close_stream(stack: &mut Stack) -> OError {
require_on_stack!(id, Mega, stack, "close-stream");
if let Some(stream) = runtime(|rt| rt.get_stream(id as u128)) {
let mut stream = stream.lock();
(stream.close)(&mut stream);
}
runtime_mut(|mut rt| rt.destroy_stream(id as u128));
Ok(())
}
fn nop(_stream: &mut Stream) {}
fn stream_file(stack: &mut Stack) -> Result<Stream, Error> {
pub(super) fn stream_file(stack: &mut Stack) -> Result<Stream, Error> {
let truncate = stack.pop().lock_ro().is_truthy();
require_on_stack!(path, Str, stack, "FILE new-stream");
Ok(Stream::new(
@ -276,34 +279,23 @@ fn stream_file(stack: &mut Stack) -> Result<Stream, Error> {
.truncate(truncate)
.open(path)
.map_err(|x| stack.error(ErrorKind::IO(x.to_string())))?,
nop,
))
}
fn stream_tcp(stack: &mut Stack) -> Result<Stream, Error> {
pub(super) fn stream_tcp(stack: &mut Stack) -> Result<Stream, Error> {
require_int_on_stack!(port, stack, "TCP new-stream");
require_on_stack!(ip, Str, stack, "TCP new-stream");
fn close_tcp(stream: &mut Stream) {
unsafe {
let f = ((stream.reader.as_mut() as *mut dyn Read).cast() as *mut TcpStream)
.as_mut()
.unwrap();
let _ = f.shutdown(Shutdown::Both);
}
}
Ok(Stream::new(
TcpStream::connect((ip, port as u16))
.map_err(|x| stack.error(ErrorKind::IO(x.to_string())))?,
close_tcp,
))
}
fn stream_udp(stack: &mut Stack) -> Result<Stream, Error> {
pub(super) fn stream_udp(stack: &mut Stack) -> Result<Stream, Error> {
require_int_on_stack!(port, stack, "UDP new-stream");
require_on_stack!(ip, Str, stack, "UDP new-stream");
require_int_on_stack!(self_port, stack, "UDP new-stream");
require_on_stack!(self_ip, Str, stack, "UDP new-stream");
fn close_udp(_stream: &mut Stream) {}
let sock = UdpSocket::bind((self_ip, self_port as u16))
.map_err(|x| stack.error(ErrorKind::IO(x.to_string())))?;
sock.connect((ip, port as u16))
@ -323,12 +315,11 @@ fn stream_udp(stack: &mut Stack) -> Result<Stream, Error> {
self.0.recv(buf)
}
}
Ok(Stream::new(UdpRW(sock), close_udp))
Ok(Stream::new(UdpRW(sock)))
}
fn stream_cmd(stack: &mut Stack) -> Result<Stream, Error> {
pub(super) fn stream_cmd(stack: &mut Stack) -> Result<Stream, Error> {
require_on_stack!(a, Array, stack, "CMD new-stream");
fn close_cmd(_stream: &mut Stream) {}
let mut args = Vec::new();
for item in a.iter() {
if let Value::Str(ref s) = item.lock_ro().native {
@ -348,40 +339,5 @@ fn stream_cmd(stack: &mut Stack) -> Result<Stream, Error> {
Ok(Stream::new_split(
command.stdout.take().unwrap(),
command.stdin.take().unwrap(),
close_cmd,
))
}
pub fn register(r: &mut Stack, o: Arc<Frame>) {
if !*IS_INITIALIZED.lock_ro() {
register_stream_type("file", stream_file);
register_stream_type("tcp", stream_tcp);
register_stream_type("udp", stream_udp);
register_stream_type("cmd", stream_cmd);
*IS_INITIALIZED.lock() = true;
}
type Fn = fn(&mut Stack) -> OError;
let fns: [(&str, Fn, u32); 7] = [
("new-stream", new_stream, 1),
("write-stream", write_stream, 1),
("write-all-stream", write_all_stream, 0),
("flush-stream", flush_stream, 0),
("read-stream", read_stream, 1),
("read-all-stream", read_all_stream, 0),
("close-stream", close_stream, 0),
];
for f in fns {
r.define_func(
f.0.to_owned(),
AFunc::new(Func {
ret_count: f.2,
to_call: FuncImpl::Native(f.1),
run_as_base: false,
origin: o.clone(),
fname: None,
name: f.0.to_owned(),
}),
);
}
}

105
src/stream/server.rs Normal file
View file

@ -0,0 +1,105 @@
use std::{
collections::HashMap,
net::TcpListener,
sync::{Arc, LazyLock},
};
use crate::{mutex::Mut, *};
use super::Stream;
static SERVER_STREAM_TYPES: LazyLock<Arc<Mut<HashMap<String, ServerStreamType>>>> =
LazyLock::new(|| Arc::new(Mut::new(HashMap::new())));
/// Registers a custom stream type.
pub fn register_server_stream_type(
name: &str,
supplier: impl Fn(&mut Stack) -> Result<ServerStream, Error> + Sync + Send + 'static,
) {
SERVER_STREAM_TYPES
.lock()
.insert(name.to_owned(), ServerStreamType::from(supplier));
}
/// Gets a stream type by name.
pub fn get_server_stream_type(name: String) -> Option<ServerStreamType> {
SERVER_STREAM_TYPES.lock_ro().get(&name).cloned()
}
/// An SPL stream type.
#[derive(Clone)]
pub struct ServerStreamType {
func: Arc<Box<dyn Fn(&mut Stack) -> Result<ServerStream, Error> + Sync + Send + 'static>>,
}
impl ServerStreamType {
pub fn make_stream(&self, stack: &mut Stack) -> Result<ServerStream, Error> {
(self.func)(stack)
}
}
impl<T> From<T> for ServerStreamType
where
T: Fn(&mut Stack) -> Result<ServerStream, Error> + Sync + Send + 'static,
{
fn from(value: T) -> Self {
Self {
func: Arc::new(Box::new(value)),
}
}
}
/// An SPL server stream, holding an acceptor and a function to close it.
pub struct ServerStream {
pub(super) acceptor: Box<dyn Fn(&mut Stack) -> Result<Stream, Error> + Sync + Send + 'static>,
}
impl ServerStream {
pub fn new(
acceptor: impl Fn(&mut Stack) -> Result<Stream, Error> + Sync + Send + 'static,
) -> Self {
Self {
acceptor: Box::new(acceptor),
}
}
}
pub fn new_server_stream(stack: &mut Stack) -> OError {
require_on_stack!(s, Str, stack, "new-stream");
let stream = get_server_stream_type(s.clone())
.ok_or_else(|| stack.error(ErrorKind::VariableNotFound(format!("__stream-type-{s}"))))?
.make_stream(stack)?;
let stream = runtime_mut(move |mut rt| Ok(rt.register_server_stream(stream)))?;
stack.push(Value::Mega(stream.0 as i128).spl());
Ok(())
}
pub fn accept_server_stream(stack: &mut Stack) -> OError {
require_on_stack!(id, Mega, stack, "accept-server-stream");
let stream = runtime(|rt| {
rt.get_server_stream(id as u128)
.ok_or_else(|| stack.error(ErrorKind::VariableNotFound(format!("__stream-{id}"))))
})?;
let stream = (stream.lock_ro().acceptor)(stack)?;
stack.push((runtime_mut(move |mut rt| rt.register_stream(stream)).0 as i128).spl());
Ok(())
}
pub fn close_server_stream(stack: &mut Stack) -> OError {
require_on_stack!(id, Mega, stack, "close-server-stream");
runtime_mut(|mut rt| rt.destroy_server_stream(id as u128));
Ok(())
}
pub(crate) fn server_stream_tcp(stack: &mut Stack) -> Result<ServerStream, Error> {
require_int_on_stack!(port, stack, "TCP server-stream");
require_on_stack!(addr, Str, stack, "TCP server-stream");
let tcp = TcpListener::bind((addr, port as u16))
.map_err(|e| stack.error(ErrorKind::IO(format!("{e:?}"))))?;
let stream = ServerStream::new(move |stack| {
let socket = tcp
.accept()
.map_err(|e| stack.error(ErrorKind::IO(format!("{e:?}"))))?;
Ok(Stream::new(socket.0))
});
Ok(stream)
}

View file

@ -66,7 +66,7 @@ construct StreamType {
def stream-types 0 anew =stream-types
construct _StreamType {
construct _StreamTypes {
;
construct { this | with this ;
{ | with type ;
@ -79,7 +79,7 @@ construct _StreamType {
func register-stream-type { | with id ;
stream-types [ id ] aadd =stream-types
id _StreamType dyn-def-field
id _StreamTypes dyn-def-field
}
"tcp" register-stream-type
@ -87,6 +87,6 @@ func register-stream-type { | with id ;
"file" register-stream-type
"cmd" register-stream-type
func StreamTypes { _StreamType |
_StreamType:new
func StreamTypes { _StreamTypes |
_StreamTypes:new
}

View file

@ -2,6 +2,8 @@
"#stream.spl" import
"#http.spl" import
"#messaging.spl" import
"#server.spl" import
"#time.spl" import
"SPL tester" =program-name
@ -160,6 +162,24 @@ func main { int | with args ;
1 =other-thread-done
} fork
while { other-thread-done not } { "waiting for the other thread..." println }
"" println
"testing tcp server" println
" starting server thread" println
{ |
def server "0.0.0.0" 4075 net:server:Types:tcp:create =server
while { 1 } {
def stream server:accept =stream
"Hello!" :to-bytes stream:write-exact;
stream:close;
}
} fork;
50 time:sleep;
" starting client" println;
def client "localhost" 4075 StreamTypes:tcp:create =client
1024 client:read-to-end:to-str println;
" ^ this should say 'Hello!'" println;
100
}

View file

@ -1,6 +1,9 @@
func unixms { mega |
0 time
construct time namespace {
;
unixms { mega | with this ;
0 sleeptime
}
sleep { mega | with this ; _mega sleeptime }
}
func sleep { mega | time }