better bandwidth management

This commit is contained in:
Daniella / Tove 2023-02-01 20:01:59 +01:00
parent f6967c01a5
commit 2b3405af8b
6 changed files with 77 additions and 23 deletions

2
Cargo.lock generated
View file

@ -72,7 +72,7 @@ dependencies = [
[[package]] [[package]]
name = "revpfw3" name = "revpfw3"
version = "0.1.2" version = "0.2.0"
dependencies = [ dependencies = [
"enum-ordinalize", "enum-ordinalize",
] ]

View file

@ -3,7 +3,7 @@ name = "revpfw3"
repository = "https://github.com/tudbut/revpfw3" repository = "https://github.com/tudbut/revpfw3"
description = "A tool to bypass portforwarding restrictions using some cheap VServer" description = "A tool to bypass portforwarding restrictions using some cheap VServer"
license = "MIT" license = "MIT"
version = "0.1.2" version = "0.2.0"
edition = "2021" edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

View file

@ -12,6 +12,7 @@ use crate::{io_sync, PacketType, SocketAdapter};
pub fn client(ip: &str, port: u16, dest_ip: &str, dest_port: u16, key: &str, sleep_delay_ms: u64) { pub fn client(ip: &str, port: u16, dest_ip: &str, dest_port: u16, key: &str, sleep_delay_ms: u64) {
let mut buf1 = [0u8; 1]; let mut buf1 = [0u8; 1];
let mut buf4 = [0u8; 4]; let mut buf4 = [0u8; 4];
let mut buf16 = [0u8; 16];
let mut buf = [0; 1024]; let mut buf = [0; 1024];
let mut tcp = TcpStream::connect((ip, port)).unwrap(); let mut tcp = TcpStream::connect((ip, port)).unwrap();
println!("Syncing..."); println!("Syncing...");
@ -31,7 +32,7 @@ pub fn client(ip: &str, port: u16, dest_ip: &str, dest_port: u16, key: &str, sle
println!("READY!"); println!("READY!");
let mut tcp = SocketAdapter::new(tcp, true); let mut tcp = SocketAdapter::new(tcp);
tcp.set_nonblocking(true); tcp.set_nonblocking(true);
let mut sockets: Vec<SocketAdapter> = Vec::new(); let mut sockets: Vec<SocketAdapter> = Vec::new();
let mut last_keep_alive = SystemTime::now(); let mut last_keep_alive = SystemTime::now();
@ -61,6 +62,12 @@ pub fn client(ip: &str, port: u16, dest_ip: &str, dest_port: u16, key: &str, sle
to_remove.push(i); to_remove.push(i);
did_anything = true; did_anything = true;
} }
if let x @ 1.. = socket.clear_delay() {
tcp.write(&[PacketType::ClientExceededBuffer.ordinal() as u8])
.unwrap();
tcp.write(&(i as u32).to_be_bytes()).unwrap();
tcp.write(&x.to_be_bytes()).unwrap();
}
} }
for i in to_remove.into_iter().rev() { for i in to_remove.into_iter().rev() {
tcp.write(&[PacketType::CloseClient.ordinal() as u8]) tcp.write(&[PacketType::CloseClient.ordinal() as u8])
@ -85,8 +92,7 @@ pub fn client(ip: &str, port: u16, dest_ip: &str, dest_port: u16, key: &str, sle
tcp.set_nonblocking(false); tcp.set_nonblocking(false);
match pt { match pt {
PacketType::NewClient => { PacketType::NewClient => {
let mut tcp = let mut tcp = SocketAdapter::new(TcpStream::connect((dest_ip, dest_port)).unwrap());
SocketAdapter::new(TcpStream::connect((dest_ip, dest_port)).unwrap(), false);
tcp.set_nonblocking(true); tcp.set_nonblocking(true);
sockets.push(tcp); sockets.push(tcp);
} }
@ -115,6 +121,17 @@ pub fn client(ip: &str, port: u16, dest_ip: &str, dest_port: u16, key: &str, sle
} }
PacketType::ServerData => unreachable!(), PacketType::ServerData => unreachable!(),
PacketType::ClientExceededBuffer => {
tcp.internal.read_exact(&mut buf4).unwrap();
let idx = u32::from_be_bytes(buf4) as usize;
tcp.internal.read_exact(&mut buf16).unwrap();
let amount = u128::from_be_bytes(buf16);
if sockets.len() != 1 { // a single connection doesn't need overuse-penalties
sockets[idx].punish(amount);
}
}
} }
tcp.set_nonblocking(true); tcp.set_nonblocking(true);
} }

View file

@ -7,4 +7,5 @@ pub(crate) enum PacketType {
KeepAlive, KeepAlive,
ClientData, ClientData,
ServerData, ServerData,
ClientExceededBuffer,
} }

View file

@ -12,6 +12,7 @@ use crate::{io_sync, PacketType, SocketAdapter};
pub fn server(port: u16, key: &str, sleep_delay_ms: u64) { pub fn server(port: u16, key: &str, sleep_delay_ms: u64) {
let mut buf1 = [0u8; 1]; let mut buf1 = [0u8; 1];
let mut buf4 = [0u8; 4]; let mut buf4 = [0u8; 4];
let mut buf16 = [0u8; 16];
let mut buf = [0; 1024]; let mut buf = [0; 1024];
let tcpl = TcpListener::bind(("0.0.0.0", port)).unwrap(); let tcpl = TcpListener::bind(("0.0.0.0", port)).unwrap();
let mut tcp = loop { let mut tcp = loop {
@ -40,7 +41,7 @@ pub fn server(port: u16, key: &str, sleep_delay_ms: u64) {
tcpl.set_nonblocking(true).unwrap(); tcpl.set_nonblocking(true).unwrap();
let mut tcp = SocketAdapter::new(tcp, true); let mut tcp = SocketAdapter::new(tcp);
tcp.set_nonblocking(true); tcp.set_nonblocking(true);
let mut sockets: Vec<SocketAdapter> = Vec::new(); let mut sockets: Vec<SocketAdapter> = Vec::new();
let mut last_keep_alive_sent = SystemTime::now(); let mut last_keep_alive_sent = SystemTime::now();
@ -57,7 +58,7 @@ pub fn server(port: u16, key: &str, sleep_delay_ms: u64) {
} }
if let Ok(new) = tcpl.accept() { if let Ok(new) = tcpl.accept() {
let mut new = SocketAdapter::new(new.0, false); let mut new = SocketAdapter::new(new.0);
new.set_nonblocking(true); new.set_nonblocking(true);
sockets.push(new); sockets.push(new);
tcp.write(&[PacketType::NewClient.ordinal() as u8]).unwrap(); tcp.write(&[PacketType::NewClient.ordinal() as u8]).unwrap();
@ -83,6 +84,12 @@ pub fn server(port: u16, key: &str, sleep_delay_ms: u64) {
to_remove.push(i); to_remove.push(i);
did_anything = true; did_anything = true;
} }
if let x @ 1.. = socket.clear_delay() {
tcp.write(&[PacketType::ClientExceededBuffer.ordinal() as u8])
.unwrap();
tcp.write(&(i as u32).to_be_bytes()).unwrap();
tcp.write(&x.to_be_bytes()).unwrap();
}
} }
for i in to_remove.into_iter().rev() { for i in to_remove.into_iter().rev() {
tcp.write(&[PacketType::CloseClient.ordinal() as u8]) tcp.write(&[PacketType::CloseClient.ordinal() as u8])
@ -131,6 +138,17 @@ pub fn server(port: u16, key: &str, sleep_delay_ms: u64) {
let _ = sockets[idx].write_later(&buf[..len]); let _ = sockets[idx].write_later(&buf[..len]);
} }
PacketType::ClientExceededBuffer => {
tcp.internal.read_exact(&mut buf4).unwrap();
let idx = u32::from_be_bytes(buf4) as usize;
tcp.internal.read_exact(&mut buf16).unwrap();
let amount = u128::from_be_bytes(buf16);
if sockets.len() != 1 { // a single connection doesn't need overuse-penalties
sockets[idx].punish(amount);
}
}
} }
tcp.set_nonblocking(true); tcp.set_nonblocking(true);
} }

View file

@ -2,6 +2,7 @@ use std::{
io::{Error, Read}, io::{Error, Read},
io::{ErrorKind, Write}, io::{ErrorKind, Write},
net::TcpStream, net::TcpStream,
time::SystemTime,
}; };
use crate::io_sync; use crate::io_sync;
@ -9,14 +10,14 @@ use crate::io_sync;
#[derive(Clone, Copy)] #[derive(Clone, Copy)]
enum Broken { enum Broken {
OsErr(i32), OsErr(i32),
DirectErr(ErrorKind), //DirectErr(ErrorKind),
} }
impl From<Broken> for Error { impl From<Broken> for Error {
fn from(value: Broken) -> Self { fn from(value: Broken) -> Self {
match value { match value {
Broken::OsErr(x) => Error::from_raw_os_error(x), Broken::OsErr(x) => Error::from_raw_os_error(x),
Broken::DirectErr(x) => Error::from(x), //Broken::DirectErr(x) => Error::from(x),
} }
} }
} }
@ -27,20 +28,22 @@ pub(crate) struct SocketAdapter {
to_write: usize, to_write: usize,
write: [u8; 65536], write: [u8; 65536],
broken: Option<Broken>, broken: Option<Broken>,
wait_if_full: bool, accumulated_delay: u128,
is_nonblocking: bool, is_nonblocking: bool,
ignore_until: Option<u128>,
} }
impl SocketAdapter { impl SocketAdapter {
pub fn new(tcp: TcpStream, wait_if_full: bool) -> SocketAdapter { pub fn new(tcp: TcpStream) -> SocketAdapter {
Self { Self {
internal: tcp, internal: tcp,
written: 0, written: 0,
to_write: 0, to_write: 0,
write: [0u8; 65536], write: [0u8; 65536],
broken: None, broken: None,
wait_if_full, accumulated_delay: 0,
is_nonblocking: false, is_nonblocking: false,
ignore_until: None,
} }
} }
@ -63,17 +66,15 @@ impl SocketAdapter {
self.written = 0; self.written = 0;
} }
let Some(x) = self.write.get_mut(self.to_write + self.written..self.to_write + self.written + buf.len()) else { let Some(x) = self.write.get_mut(self.to_write + self.written..self.to_write + self.written + buf.len()) else {
if self.wait_if_full { let sa = SystemTime::now();
self.internal.set_nonblocking(false)?; self.internal.set_nonblocking(false)?;
self.internal.write_all(&self.write[self.written..self.written + self.to_write])?; self.internal.write_all(&self.write[self.written..self.written + self.to_write])?;
self.internal.set_nonblocking(self.is_nonblocking)?; self.internal.set_nonblocking(self.is_nonblocking)?;
self.written = 0; self.written = 0;
self.to_write = buf.len(); self.to_write = buf.len();
self.write[..buf.len()].copy_from_slice(buf); self.write[..buf.len()].copy_from_slice(buf);
self.accumulated_delay += sa.elapsed().unwrap().as_millis();
return Ok(()); return Ok(());
}
self.broken = Some(Broken::DirectErr(ErrorKind::TimedOut));
return Err(ErrorKind::TimedOut.into());
}; };
x.copy_from_slice(buf); x.copy_from_slice(buf);
self.to_write += buf.len(); self.to_write += buf.len();
@ -86,6 +87,9 @@ impl SocketAdapter {
} }
pub fn update(&mut self) -> Result<(), Error> { pub fn update(&mut self) -> Result<(), Error> {
if Some(SystemTime::UNIX_EPOCH.elapsed().unwrap().as_millis()) < self.ignore_until {
return Ok(());
}
if let Some(ref x) = self.broken { if let Some(ref x) = self.broken {
return Err(Error::from(*x)); return Err(Error::from(*x));
} }
@ -117,7 +121,21 @@ impl SocketAdapter {
} }
pub fn poll(&mut self, buf: &mut [u8]) -> Result<Option<usize>, Error> { pub fn poll(&mut self, buf: &mut [u8]) -> Result<Option<usize>, Error> {
if Some(SystemTime::UNIX_EPOCH.elapsed().unwrap().as_millis()) < self.ignore_until {
return Ok(None);
}
self.update()?; self.update()?;
io_sync(self.internal.read(buf)) io_sync(self.internal.read(buf))
} }
pub fn clear_delay(&mut self) -> u128 {
(self.accumulated_delay, self.accumulated_delay = 0).0
}
pub fn punish(&mut self, time: u128) {
if self.ignore_until == None {
self.ignore_until = Some(SystemTime::UNIX_EPOCH.elapsed().unwrap().as_millis());
}
self.ignore_until = self.ignore_until.map(|x| x + time);
}
} }