From 2b3405af8bf80b6f48bf2cf7b8f56232bf828838 Mon Sep 17 00:00:00 2001 From: TudbuT Date: Wed, 1 Feb 2023 20:01:59 +0100 Subject: [PATCH] better bandwidth management --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/client.rs | 23 +++++++++++++++++--- src/packet.rs | 1 + src/server.rs | 22 +++++++++++++++++-- src/socket_adapter.rs | 50 +++++++++++++++++++++++++++++-------------- 6 files changed, 77 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7f8a7d8..6bebb6f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -72,7 +72,7 @@ dependencies = [ [[package]] name = "revpfw3" -version = "0.1.2" +version = "0.2.0" dependencies = [ "enum-ordinalize", ] diff --git a/Cargo.toml b/Cargo.toml index 91f21db..fe8c2b1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ name = "revpfw3" repository = "https://github.com/tudbut/revpfw3" description = "A tool to bypass portforwarding restrictions using some cheap VServer" license = "MIT" -version = "0.1.2" +version = "0.2.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/src/client.rs b/src/client.rs index 2f32346..c75e9be 100644 --- a/src/client.rs +++ b/src/client.rs @@ -12,6 +12,7 @@ use crate::{io_sync, PacketType, SocketAdapter}; pub fn client(ip: &str, port: u16, dest_ip: &str, dest_port: u16, key: &str, sleep_delay_ms: u64) { let mut buf1 = [0u8; 1]; let mut buf4 = [0u8; 4]; + let mut buf16 = [0u8; 16]; let mut buf = [0; 1024]; let mut tcp = TcpStream::connect((ip, port)).unwrap(); println!("Syncing..."); @@ -31,7 +32,7 @@ pub fn client(ip: &str, port: u16, dest_ip: &str, dest_port: u16, key: &str, sle println!("READY!"); - let mut tcp = SocketAdapter::new(tcp, true); + let mut tcp = SocketAdapter::new(tcp); tcp.set_nonblocking(true); let mut sockets: Vec = Vec::new(); let mut last_keep_alive = SystemTime::now(); @@ -61,6 +62,12 @@ pub fn client(ip: &str, port: u16, dest_ip: &str, dest_port: u16, key: &str, sle to_remove.push(i); did_anything = true; } + if let x @ 1.. = socket.clear_delay() { + tcp.write(&[PacketType::ClientExceededBuffer.ordinal() as u8]) + .unwrap(); + tcp.write(&(i as u32).to_be_bytes()).unwrap(); + tcp.write(&x.to_be_bytes()).unwrap(); + } } for i in to_remove.into_iter().rev() { tcp.write(&[PacketType::CloseClient.ordinal() as u8]) @@ -85,8 +92,7 @@ pub fn client(ip: &str, port: u16, dest_ip: &str, dest_port: u16, key: &str, sle tcp.set_nonblocking(false); match pt { PacketType::NewClient => { - let mut tcp = - SocketAdapter::new(TcpStream::connect((dest_ip, dest_port)).unwrap(), false); + let mut tcp = SocketAdapter::new(TcpStream::connect((dest_ip, dest_port)).unwrap()); tcp.set_nonblocking(true); sockets.push(tcp); } @@ -115,6 +121,17 @@ pub fn client(ip: &str, port: u16, dest_ip: &str, dest_port: u16, key: &str, sle } PacketType::ServerData => unreachable!(), + + PacketType::ClientExceededBuffer => { + tcp.internal.read_exact(&mut buf4).unwrap(); + let idx = u32::from_be_bytes(buf4) as usize; + tcp.internal.read_exact(&mut buf16).unwrap(); + let amount = u128::from_be_bytes(buf16); + + if sockets.len() != 1 { // a single connection doesn't need overuse-penalties + sockets[idx].punish(amount); + } + } } tcp.set_nonblocking(true); } diff --git a/src/packet.rs b/src/packet.rs index 23e234f..921e688 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -7,4 +7,5 @@ pub(crate) enum PacketType { KeepAlive, ClientData, ServerData, + ClientExceededBuffer, } diff --git a/src/server.rs b/src/server.rs index 61b9529..273338a 100644 --- a/src/server.rs +++ b/src/server.rs @@ -12,6 +12,7 @@ use crate::{io_sync, PacketType, SocketAdapter}; pub fn server(port: u16, key: &str, sleep_delay_ms: u64) { let mut buf1 = [0u8; 1]; let mut buf4 = [0u8; 4]; + let mut buf16 = [0u8; 16]; let mut buf = [0; 1024]; let tcpl = TcpListener::bind(("0.0.0.0", port)).unwrap(); let mut tcp = loop { @@ -40,7 +41,7 @@ pub fn server(port: u16, key: &str, sleep_delay_ms: u64) { tcpl.set_nonblocking(true).unwrap(); - let mut tcp = SocketAdapter::new(tcp, true); + let mut tcp = SocketAdapter::new(tcp); tcp.set_nonblocking(true); let mut sockets: Vec = Vec::new(); let mut last_keep_alive_sent = SystemTime::now(); @@ -57,7 +58,7 @@ pub fn server(port: u16, key: &str, sleep_delay_ms: u64) { } if let Ok(new) = tcpl.accept() { - let mut new = SocketAdapter::new(new.0, false); + let mut new = SocketAdapter::new(new.0); new.set_nonblocking(true); sockets.push(new); tcp.write(&[PacketType::NewClient.ordinal() as u8]).unwrap(); @@ -83,6 +84,12 @@ pub fn server(port: u16, key: &str, sleep_delay_ms: u64) { to_remove.push(i); did_anything = true; } + if let x @ 1.. = socket.clear_delay() { + tcp.write(&[PacketType::ClientExceededBuffer.ordinal() as u8]) + .unwrap(); + tcp.write(&(i as u32).to_be_bytes()).unwrap(); + tcp.write(&x.to_be_bytes()).unwrap(); + } } for i in to_remove.into_iter().rev() { tcp.write(&[PacketType::CloseClient.ordinal() as u8]) @@ -131,6 +138,17 @@ pub fn server(port: u16, key: &str, sleep_delay_ms: u64) { let _ = sockets[idx].write_later(&buf[..len]); } + + PacketType::ClientExceededBuffer => { + tcp.internal.read_exact(&mut buf4).unwrap(); + let idx = u32::from_be_bytes(buf4) as usize; + tcp.internal.read_exact(&mut buf16).unwrap(); + let amount = u128::from_be_bytes(buf16); + + if sockets.len() != 1 { // a single connection doesn't need overuse-penalties + sockets[idx].punish(amount); + } + } } tcp.set_nonblocking(true); } diff --git a/src/socket_adapter.rs b/src/socket_adapter.rs index 2335e38..728cbf7 100644 --- a/src/socket_adapter.rs +++ b/src/socket_adapter.rs @@ -2,6 +2,7 @@ use std::{ io::{Error, Read}, io::{ErrorKind, Write}, net::TcpStream, + time::SystemTime, }; use crate::io_sync; @@ -9,14 +10,14 @@ use crate::io_sync; #[derive(Clone, Copy)] enum Broken { OsErr(i32), - DirectErr(ErrorKind), + //DirectErr(ErrorKind), } impl From for Error { fn from(value: Broken) -> Self { match value { Broken::OsErr(x) => Error::from_raw_os_error(x), - Broken::DirectErr(x) => Error::from(x), + //Broken::DirectErr(x) => Error::from(x), } } } @@ -27,20 +28,22 @@ pub(crate) struct SocketAdapter { to_write: usize, write: [u8; 65536], broken: Option, - wait_if_full: bool, + accumulated_delay: u128, is_nonblocking: bool, + ignore_until: Option, } impl SocketAdapter { - pub fn new(tcp: TcpStream, wait_if_full: bool) -> SocketAdapter { + pub fn new(tcp: TcpStream) -> SocketAdapter { Self { internal: tcp, written: 0, to_write: 0, write: [0u8; 65536], broken: None, - wait_if_full, + accumulated_delay: 0, is_nonblocking: false, + ignore_until: None, } } @@ -63,17 +66,15 @@ impl SocketAdapter { self.written = 0; } let Some(x) = self.write.get_mut(self.to_write + self.written..self.to_write + self.written + buf.len()) else { - if self.wait_if_full { - self.internal.set_nonblocking(false)?; - self.internal.write_all(&self.write[self.written..self.written + self.to_write])?; - self.internal.set_nonblocking(self.is_nonblocking)?; - self.written = 0; - self.to_write = buf.len(); - self.write[..buf.len()].copy_from_slice(buf); - return Ok(()); - } - self.broken = Some(Broken::DirectErr(ErrorKind::TimedOut)); - return Err(ErrorKind::TimedOut.into()); + let sa = SystemTime::now(); + self.internal.set_nonblocking(false)?; + self.internal.write_all(&self.write[self.written..self.written + self.to_write])?; + self.internal.set_nonblocking(self.is_nonblocking)?; + self.written = 0; + self.to_write = buf.len(); + self.write[..buf.len()].copy_from_slice(buf); + self.accumulated_delay += sa.elapsed().unwrap().as_millis(); + return Ok(()); }; x.copy_from_slice(buf); self.to_write += buf.len(); @@ -86,6 +87,9 @@ impl SocketAdapter { } pub fn update(&mut self) -> Result<(), Error> { + if Some(SystemTime::UNIX_EPOCH.elapsed().unwrap().as_millis()) < self.ignore_until { + return Ok(()); + } if let Some(ref x) = self.broken { return Err(Error::from(*x)); } @@ -117,7 +121,21 @@ impl SocketAdapter { } pub fn poll(&mut self, buf: &mut [u8]) -> Result, Error> { + if Some(SystemTime::UNIX_EPOCH.elapsed().unwrap().as_millis()) < self.ignore_until { + return Ok(None); + } self.update()?; io_sync(self.internal.read(buf)) } + + pub fn clear_delay(&mut self) -> u128 { + (self.accumulated_delay, self.accumulated_delay = 0).0 + } + + pub fn punish(&mut self, time: u128) { + if self.ignore_until == None { + self.ignore_until = Some(SystemTime::UNIX_EPOCH.elapsed().unwrap().as_millis()); + } + self.ignore_until = self.ignore_until.map(|x| x + time); + } }