Compare commits

..

No commits in common. "main" and "v0.1.2" have entirely different histories.
main ... v0.1.2

15 changed files with 154 additions and 725 deletions

95
Cargo.lock generated
View file

@ -10,37 +10,23 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "enum-ordinalize"
version = "3.1.15"
version = "3.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1bf1fa3f06bbff1ea5b1a9c7b14aa992a39657db60a2759457328d7e058f49ee"
checksum = "a62bb1df8b45ecb7ffa78dca1c17a438fb193eb083db0b1b494d2a61bcb5096a"
dependencies = [
"num-bigint",
"num-traits",
"proc-macro2",
"quote",
"rustc_version",
"syn",
]
[[package]]
name = "ioctl-rs"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7970510895cee30b3e9128319f2cefd4bde883a39f38baa279567ba3a7eb97d"
dependencies = [
"libc",
]
[[package]]
name = "libc"
version = "0.2.148"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9cdc71e17332e86d2e1d38c1f99edcb6288ee11b815fb1a4b049eaa2114d369b"
[[package]]
name = "num-bigint"
version = "0.4.4"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "608e7659b5c3d7cba262d894801b9ec9d00de989e8a82bd4bef91d08da45cdc0"
checksum = "f93ab6289c7b344a8a9f60f88d80aa20032336fe78da341afc91c8a2341fc75f"
dependencies = [
"autocfg",
"num-integer",
@ -59,103 +45,66 @@ dependencies = [
[[package]]
name = "num-traits"
version = "0.2.16"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f30b0abd723be7e2ffca1272140fac1a2f084c77ec3e123c192b66af1ee9e6c2"
checksum = "578ede34cf02f8924ab9447f50c28075b4d3e5b269972345e7e0372b38c6cdcd"
dependencies = [
"autocfg",
]
[[package]]
name = "proc-macro2"
version = "1.0.67"
version = "1.0.50"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d433d9f1a3e8c1263d9456598b16fec66f4acc9a74dacffd35c7bb09b3a1328"
checksum = "6ef7d57beacfaf2d8aee5937dab7b7f28de3cb8b1828479bb5de2a7106f2bae2"
dependencies = [
"unicode-ident",
]
[[package]]
name = "quote"
version = "1.0.33"
version = "1.0.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae"
checksum = "8856d8364d252a14d474036ea1358d63c9e6965c8e5c1885c18f73d70bff9c7b"
dependencies = [
"proc-macro2",
]
[[package]]
name = "revpfw3"
version = "0.4.0"
version = "0.1.2"
dependencies = [
"enum-ordinalize",
"serial",
]
[[package]]
name = "serial"
name = "rustc_version"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1237a96570fc377c13baa1b88c7589ab66edced652e43ffb17088f003db3e86"
checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366"
dependencies = [
"serial-core",
"serial-unix",
"serial-windows",
"semver",
]
[[package]]
name = "serial-core"
version = "0.4.0"
name = "semver"
version = "1.0.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f46209b345401737ae2125fe5b19a77acce90cd53e1658cda928e4fe9a64581"
dependencies = [
"libc",
]
[[package]]
name = "serial-unix"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f03fbca4c9d866e24a459cbca71283f545a37f8e3e002ad8c70593871453cab7"
dependencies = [
"ioctl-rs",
"libc",
"serial-core",
"termios",
]
[[package]]
name = "serial-windows"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15c6d3b776267a75d31bbdfd5d36c0ca051251caafc285827052bc53bcdc8162"
dependencies = [
"libc",
"serial-core",
]
checksum = "58bc9567378fc7690d6b2addae4e60ac2eeea07becb2c64b9f218b53865cba2a"
[[package]]
name = "syn"
version = "2.0.37"
version = "1.0.107"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7303ef2c05cd654186cb250d29049a24840ca25d2747c25c0381c8d9e2f582e8"
checksum = "1f4064b5b16e03ae50984a5a8ed5d4f8803e6bc1fd170a3cda91a1be4b18e3f5"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
[[package]]
name = "termios"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d5d9cf598a6d7ce700a4e6a9199da127e6819a61e64b68609683cc9a01b5683a"
dependencies = [
"libc",
]
[[package]]
name = "unicode-ident"
version = "1.0.12"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b"
checksum = "84a22b9f218b40614adcb3f4ff08b703773ad44fa9423e4e0d346d5db86e4ebc"

View file

@ -3,11 +3,10 @@ name = "revpfw3"
repository = "https://github.com/tudbut/revpfw3"
description = "A tool to bypass portforwarding restrictions using some cheap VServer"
license = "MIT"
version = "0.4.0"
version = "0.1.2"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
enum-ordinalize = "3.1"
serial = "0.4"

View file

@ -1,8 +0,0 @@
Copyright 2024 TudbuT <legal@mail.tudbut.de>
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View file

@ -4,9 +4,6 @@
This tool bypasses port restrictions of your router using some not-very-powerful
server (a cheap 1€ vserver will suffice.)
NEW: Modem support! RevPFW3 can now interact with modems using AT commands. A demo
is included for some simcom modems.
---
### How to download it
@ -28,12 +25,11 @@ https://github.com/tudbut/revpfw3`.
1. Buy some cheap server online, it will only need
1. Enough disk space to run a 5MB program (I recommend about .5GB free after
OS is installed)
2. 50MB of free RAM or more (if you expect to have many clients connecting,
use more RAM)
2. 500MB RAM or more
3. Flexible port settings
4. Not much CPU power, a single core definitely suffices.
4. Not much CPU power
2. Download revpfw3 to it
3. Run it like this: `revpfw3 server <port> <key>` (I recommend doing it in a
3. Run it like this: `revpfw3 server <port> <key>` (I reocmmend doing it in a
loop)
4. Download it to your destination as well (your PC, a raspi, etc)
5. Run it like this: `revpfw3 client <ip of your bridge server> <port> localhost

View file

@ -1,4 +0,0 @@
AT
AT+IPADDR
AT+CIPCLOSE=0
AT+CIPOPEN=0,"TCP","$IP",$PORT

View file

@ -1,14 +0,0 @@
AT
AT+CIPMODE=1
AT+CGDCONT=1,IP,"internet.telekom"
AT+NETOPEN
AT+IPADDR
AT+CIPCLOSE=0
AT+CIPOPEN=0,"TCP","$IP",$PORT

View file

@ -1,4 +0,0 @@
AT
AT+CIFSR
AT+CIPCLOSE=0
AT+CIPSTART=TCP,"$IP",$PORT

View file

@ -1,17 +0,0 @@
AT
AT+CFUN?
AT+CPIN?
AT+CIPMODE=1
AT+CSTT="internet.telekom","",""
AT+CIICR
AT+CIFSR
AT+CIPCLOSE=0
AT+CIPSTART=TCP,"$IP",$PORT

View file

@ -1,151 +1,41 @@
use core::panic;
use std::{
collections::HashMap,
fs,
io::{Read, Write},
net::TcpStream,
io::Read,
io::Write,
net::{Shutdown, TcpStream},
thread,
time::{Duration, SystemTime},
vec,
};
use serial::SerialPort;
use crate::{io_sync, PacketType, SocketAdapter};
use crate::{Connection, PacketType, SocketAdapter};
pub struct ClientParams<'a> {
pub server_ip: &'a str,
pub server_port: u16,
pub dest_ip: &'a str,
pub dest_port: u16,
pub key: &'a str,
pub sleep_delay_ms: u64,
pub modem_port: Option<&'a str>,
pub modem_baud: Option<u32>,
pub modem_init: Option<&'a str>,
pub rate_limit_sleep: u64,
}
fn connect(params: &ClientParams) -> Connection {
if let Some(modem_port) = params.modem_port {
let mut serial = serial::open(modem_port).unwrap();
serial
.configure(&serial::PortSettings {
baud_rate: serial::BaudRate::from_speed(
params.modem_baud.unwrap_or(115200) as usize
),
char_size: serial::CharSize::Bits8,
parity: serial::Parity::ParityNone,
stop_bits: serial::StopBits::Stop1,
flow_control: serial::FlowControl::FlowNone,
})
.unwrap();
if let Some(modem_init) = params.modem_init {
serial.set_timeout(Duration::from_millis(200)).unwrap();
for line in fs::read_to_string(modem_init)
.expect("invalid modem init file")
.lines()
{
let line = line
.replace("$IP", &params.server_ip.to_string())
.replace("$PORT", &params.server_port.to_string());
println!("> {line}");
serial.write_all((line + "\r\n").as_bytes()).unwrap();
let mut s = Vec::new();
let _ = serial.read_to_end(&mut s).is_ok();
if !s.is_empty() {
println!(
"< {}",
String::from_utf8(s).unwrap().replace("\n", "\n< ").trim()
);
}
thread::sleep(Duration::from_millis(300));
}
serial.set_timeout(Duration::from_millis(5000)).unwrap();
let mut s = Vec::new();
let _ = serial.read_to_end(&mut s).is_ok();
if !s.is_empty() {
println!(
"< {}",
String::from_utf8(s).unwrap().replace("\n", "\n< ").trim()
);
}
}
serial.set_timeout(Duration::from_millis(20000)).unwrap();
return Connection::new_serial(serial, true);
}
Connection::new_tcp(
TcpStream::connect((params.server_ip, params.server_port)).unwrap(),
true,
)
}
fn resync(tcp: &mut SocketAdapter, id: &mut u64) {
let mut buf8 = [0u8; 8];
println!();
eprintln!("Server version mismatch or broken connection. Re-syncing in case of the latter...");
tcp.internal.set_print(false);
tcp.write_now().unwrap();
tcp.write(&[PacketType::Resync.ordinal() as u8]).unwrap();
tcp.write(&id.to_be_bytes()).unwrap();
tcp.write_now().unwrap();
eprintln!(
"Sent resync packet. Server should now wait 8 seconds and then send a resync-echo packet."
);
let mut buf = [0; 4096];
// read all packets that are still pending.
while let Some(Some(_x @ 1..)) = tcp.poll(&mut buf).ok() {}
// wait 5 seconds
thread::sleep(Duration::from_secs(5));
// read all packets that are still pending.
while let Some(Some(_x @ 1..)) = tcp.poll(&mut buf).ok() {}
// server should now have stopped sending packets.
let mut buf = [0];
eprintln!("Trying to receive the resync echo...");
tcp.read_now(&mut buf).unwrap();
if buf[0] as i8 == PacketType::ResyncEcho.ordinal() {
tcp.read_now(&mut buf8).unwrap();
*id = u64::from_be_bytes(buf8);
eprintln!("Successfully resynced. RevPFW3 can continue.");
} else {
eprintln!("Resync was not successful. Stopping.");
panic!("broken connection or server version mismatch.");
}
tcp.internal.set_print(true);
}
pub fn client(params: ClientParams) {
pub fn client(ip: &str, port: u16, dest_ip: &str, dest_port: u16, key: &str, sleep_delay_ms: u64) {
let mut buf1 = [0u8; 1];
let mut buf4 = [0u8; 4];
let mut buf8 = [0u8; 8];
let mut buf16 = [0u8; 16];
let mut buf = [0; 1024];
let mut tcp = connect(&params);
tcp.set_print(false);
let mut tcp = TcpStream::connect((ip, port)).unwrap();
println!("Syncing...");
tcp.write_all(&[b'R', b'P', b'F', 30]).unwrap();
println!("Authenticating...");
tcp.write_all(&(params.key.len() as u32).to_be_bytes())
tcp.write_all(&['R' as u8, 'P' as u8, 'F' as u8, 30])
.unwrap();
tcp.write_all(params.key.as_bytes()).unwrap();
println!("Authenticating...");
tcp.write_all(&(key.len() as u32).to_be_bytes()).unwrap();
tcp.write_all(key.as_bytes()).unwrap();
println!("Syncing...");
tcp.read_exact(&mut buf4).unwrap();
if buf4 != [b'R', b'P', b'F', 30] {
if buf4 != ['R' as u8, 'P' as u8, 'F' as u8, 30] {
panic!("RPF30 header expected, but not found. Make sure the server is actually running revpfw3!");
}
tcp.write_all(&[PacketType::KeepAlive.ordinal() as u8])
.unwrap();
tcp.set_print(true);
println!("READY!");
let mut tcp = SocketAdapter::new(tcp);
let mut sockets: HashMap<u64, SocketAdapter> = HashMap::new();
let mut id = 0;
let mut tcp = SocketAdapter::new(tcp, true);
tcp.set_nonblocking(true);
let mut sockets: Vec<SocketAdapter> = Vec::new();
let mut last_keep_alive = SystemTime::now();
loop {
thread::sleep(Duration::from_millis(params.rate_limit_sleep));
let mut did_anything = false;
if last_keep_alive.elapsed().unwrap().as_secs() >= 60 {
@ -153,7 +43,7 @@ pub fn client(params: ClientParams) {
}
let mut to_remove = vec![];
for (&i, socket) in sockets.iter_mut() {
for (i, socket) in sockets.iter_mut().enumerate() {
if let Ok(x) = socket.poll(&mut buf) {
if let Some(len) = x {
if len == 0 {
@ -161,7 +51,7 @@ pub fn client(params: ClientParams) {
} else {
tcp.write(&[PacketType::ServerData.ordinal() as u8])
.unwrap();
tcp.write(&i.to_be_bytes()).unwrap();
tcp.write(&(i as u32).to_be_bytes()).unwrap();
tcp.write(&(len as u32).to_be_bytes()).unwrap();
tcp.write(&buf[..len]).unwrap();
}
@ -171,49 +61,42 @@ pub fn client(params: ClientParams) {
to_remove.push(i);
did_anything = true;
}
if let x @ 1.. = socket.clear_delay() {
tcp.write(&[PacketType::ClientExceededBuffer.ordinal() as u8])
.unwrap();
tcp.write(&i.to_be_bytes()).unwrap();
tcp.write(&x.to_be_bytes()).unwrap();
socket.punish(x);
}
}
for i in to_remove.into_iter().rev() {
tcp.write(&[PacketType::CloseClient.ordinal() as u8])
.unwrap();
tcp.write(&i.to_be_bytes()).unwrap();
if let Some(x) = sockets.remove(&i) {
let _ = x.internal.close();
}
tcp.write(&(i as u32).to_be_bytes()).unwrap();
let _ = sockets.remove(i).internal.shutdown(Shutdown::Both);
}
tcp.update().unwrap();
if tcp.poll_exact(&mut buf1).unwrap().is_none() {
if io_sync(tcp.internal.read_exact(&mut buf1))
.unwrap()
.is_none()
{
if !did_anything {
thread::sleep(Duration::from_millis(params.sleep_delay_ms));
thread::sleep(Duration::from_millis(sleep_delay_ms));
}
continue;
}
let Some(pt) = PacketType::from_ordinal(buf1[0] as i8) else {
resync(&mut tcp, &mut id);
continue;
};
let pt = PacketType::from_ordinal(buf1[0] as i8)
.expect("server/client version mismatch or broken TCP");
tcp.set_nonblocking(false);
match pt {
PacketType::NewClient => {
let tcp = SocketAdapter::new(Connection::new_tcp(
TcpStream::connect((params.dest_ip, params.dest_port)).unwrap(),
false,
));
sockets.insert((id, id += 1).0, tcp);
let mut tcp =
SocketAdapter::new(TcpStream::connect((dest_ip, dest_port)).unwrap(), false);
tcp.set_nonblocking(true);
sockets.push(tcp);
}
PacketType::CloseClient => {
tcp.read_now(&mut buf8).unwrap();
if let Some(x) = sockets.remove(&u64::from_be_bytes(buf8)) {
let _ = x.internal.close();
}
tcp.internal.read_exact(&mut buf4).unwrap();
let _ = sockets
.remove(u32::from_be_bytes(buf4) as usize)
.internal
.shutdown(Shutdown::Both);
}
PacketType::KeepAlive => {
@ -222,41 +105,17 @@ pub fn client(params: ClientParams) {
}
PacketType::ClientData => {
tcp.read_now(&mut buf8).unwrap();
let idx = u64::from_be_bytes(buf8);
tcp.read_now(&mut buf4).unwrap();
tcp.internal.read_exact(&mut buf4).unwrap();
let idx = u32::from_be_bytes(buf4) as usize;
tcp.internal.read_exact(&mut buf4).unwrap();
let len = u32::from_be_bytes(buf4) as usize;
tcp.read_now(&mut buf[..len]).unwrap();
tcp.internal.read_exact(&mut buf[..len]).unwrap();
if let Some(socket) = sockets.get_mut(&idx) {
let _ = socket.write_later(&buf[..len]);
}
let _ = sockets[idx].write_later(&buf[..len]);
}
PacketType::ServerData => resync(&mut tcp, &mut id),
PacketType::ClientExceededBuffer => {
tcp.read_now(&mut buf8).unwrap();
let idx = u64::from_be_bytes(buf8);
tcp.read_now(&mut buf16).unwrap();
let amount = u128::from_be_bytes(buf16);
// a single connection doesn't need overuse-penalties
if let (true, Some(socket)) = (sockets.len() > 1, sockets.get_mut(&idx)) {
socket.punish(amount);
}
}
PacketType::Resync => {
println!();
tcp.internal.set_print(false);
eprintln!("Server asked for re-sync. Waiting 8 seconds, then initiating resync.");
thread::sleep(Duration::from_secs(8));
resync(&mut tcp, &mut id);
}
// this one shouldnt happen.
PacketType::ResyncEcho => resync(&mut tcp, &mut id),
PacketType::ServerData => unreachable!(),
}
tcp.set_nonblocking(true);
}
}

View file

@ -1,208 +0,0 @@
use std::{
io::{self, stdout, ErrorKind, Read, Write},
net::{Shutdown, TcpStream},
ptr::NonNull,
time::{Duration, SystemTime},
};
use serial::SerialPort;
trait ReadWrite: Write + Read + 'static {}
impl<T> ReadWrite for T where T: Write + Read + 'static {}
enum PrintStatus {
No,
Yes {
last_print: SystemTime,
bytes: u128,
last_bytes: u128,
},
}
pub struct Connection {
readwrite: Box<dyn ReadWrite>,
data: NonNull<()>,
set_nonblocking_thunk: fn(NonNull<()>, bool) -> io::Result<()>,
close_thunk: fn(NonNull<()>) -> io::Result<()>,
is_nb: bool,
is_serial: bool,
print: bool,
print_status: PrintStatus,
}
impl Write for Connection {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let result = self.as_write().write(buf);
self.print_status_result(result)
}
fn flush(&mut self) -> io::Result<()> {
self.as_write().flush()
}
}
impl Read for Connection {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let result = self.as_read().read(buf);
self.print_status_result(result)
}
fn read_exact(&mut self, mut buf: &mut [u8]) -> io::Result<()> {
let len = buf.len();
while !buf.is_empty() {
match self.read(buf) {
Ok(0) if self.is_nb && buf.len() == len => {
return Err(io::Error::new(ErrorKind::WouldBlock, "would block"))
}
Ok(0) => break,
Ok(n) => {
let tmp = buf;
buf = &mut tmp[n..];
}
Err(ref e) if e.kind() == ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
}
if !buf.is_empty() {
Err(io::Error::new(
ErrorKind::UnexpectedEof,
"failed to fill whole buffer",
))
} else {
Ok(())
}
}
}
impl Connection {
pub fn new_tcp(stream: TcpStream, print: bool) -> Self {
stream
.set_read_timeout(Some(Duration::from_secs(20)))
.unwrap();
stream
.set_write_timeout(Some(Duration::from_secs(20)))
.unwrap();
let mut stream = Box::new(stream);
Connection {
data: NonNull::from(stream.as_mut()).cast(),
readwrite: stream,
set_nonblocking_thunk: |data, nb| unsafe {
data.cast::<TcpStream>().as_ref().set_nonblocking(nb)
},
close_thunk: |data| unsafe {
data.cast::<TcpStream>().as_ref().shutdown(Shutdown::Both)
},
is_nb: false,
is_serial: false,
print: true,
print_status: if print {
PrintStatus::Yes {
last_print: SystemTime::now(),
bytes: 0,
last_bytes: 0,
}
} else {
PrintStatus::No
},
}
}
pub fn new_serial<T: SerialPort + 'static>(mut serial: T, print: bool) -> Self {
serial.set_timeout(Duration::from_secs(20)).unwrap();
let mut serial = Box::new(serial);
Connection {
data: NonNull::from(serial.as_mut()).cast(),
readwrite: serial,
set_nonblocking_thunk: |data, nb| unsafe {
data.cast::<T>()
.as_mut()
.set_timeout(Duration::from_secs(if nb { 0 } else { 20 }))
.map_err(|_| {
io::Error::new(io::ErrorKind::ConnectionAborted, "serial port went down")
})
},
// no need to close this.
close_thunk: |_data| Ok(()),
is_nb: false,
is_serial: true,
print: true,
print_status: if print {
PrintStatus::Yes {
last_print: SystemTime::now(),
bytes: 0,
last_bytes: 0,
}
} else {
PrintStatus::No
},
}
}
fn as_read(&mut self) -> &mut (dyn Read) {
&mut self.readwrite
}
fn as_write(&mut self) -> &mut (dyn Write) {
&mut self.readwrite
}
#[allow(dead_code)]
pub fn is_nonblocking(&self) -> bool {
self.is_nb
}
pub fn set_nonblocking(&mut self, nonblocking: bool) -> io::Result<()> {
if self.is_nb == nonblocking {
return Ok(());
}
self.is_nb = nonblocking;
(self.set_nonblocking_thunk)(self.data, nonblocking)
}
pub fn close(&self) -> io::Result<()> {
(self.close_thunk)(self.data)
}
pub fn is_serial(&self) -> bool {
self.is_serial
}
pub fn set_print(&mut self, print: bool) {
self.print = print;
}
fn print_status(&mut self, add: usize) {
if let &mut PrintStatus::Yes {
ref mut last_print,
ref mut bytes,
ref mut last_bytes,
} = &mut self.print_status
{
*bytes += add as u128;
if last_print.elapsed().unwrap().as_secs() > 0 {
let diff = *bytes - *last_bytes;
let bps = to_units(diff);
let total = to_units(*bytes);
if self.print {
print!(
"\r\x1b[KCurrent transfer speed: {bps}B/s, transferred {total}B so far."
);
stdout().flush().unwrap();
}
*last_bytes = *bytes;
*last_print = SystemTime::now();
}
}
}
fn print_status_result(&mut self, result: io::Result<usize>) -> io::Result<usize> {
if let Ok(b) = result {
self.print_status(b)
}
result
}
}
fn to_units(diff: u128) -> String {
match diff {
x @ 1_000_000_000_000.. => ((x / 1_000_000_000) as f64 / 1000.0).to_string() + "G",
x @ 1_000_000_000.. => ((x / 1_000_000) as f64 / 1000.0).to_string() + "G",
x @ 1_000_000.. => ((x / 1_000) as f64 / 1000.0).to_string() + "M",
x @ 10_000.. => (x as f64 / 1000.0).to_string() + "K",
x => x.to_string(),
}
}

View file

@ -1,5 +1,4 @@
mod client;
mod connection;
mod packet;
mod server;
mod socket_adapter;
@ -7,7 +6,6 @@ mod socket_adapter;
use std::io::{Error, ErrorKind};
pub use client::*;
pub(crate) use connection::*;
pub(crate) use packet::*;
pub use server::*;
pub(crate) use socket_adapter::*;
@ -16,7 +14,6 @@ pub(crate) fn io_sync<T>(result: Result<T, Error>) -> Result<Option<T>, Error> {
match result {
Ok(x) => Ok(Some(x)),
Err(x) if x.kind() == ErrorKind::WouldBlock => Ok(None),
Err(x) if x.kind() == ErrorKind::TimedOut => Ok(None),
Err(x) => Err(x),
}
}

View file

@ -1,22 +1,22 @@
use std::env;
use revpfw3::{client, server, ClientParams};
use revpfw3::{client, server};
fn main() {
let args: Vec<_> = env::args().skip(1).collect();
if (6..=11).contains(&args.len()) && args[0] == "client" {
client(ClientParams {
server_ip: &args[1],
server_port: args[2].parse().unwrap(),
dest_ip: &args[3],
dest_port: args[4].parse().unwrap(),
key: &args[5],
sleep_delay_ms: args.get(6).map(|x| x.parse().unwrap()).unwrap_or(1),
modem_port: args.get(7).map(|x| x.as_str()),
modem_baud: args.get(8).map(|x| x.parse().unwrap()),
modem_init: args.get(9).map(|x| x.as_str()),
rate_limit_sleep: args.get(10).map(|x| x.parse().unwrap()).unwrap_or(0),
});
if (6..=7).contains(&args.len()) && args[0] == "client" {
client(
&args[1],
args[2].parse().unwrap(),
&args[3],
args[4].parse().unwrap(),
&args[5],
if args.len() == 7 {
args[6].parse().unwrap()
} else {
1
},
);
}
if (3..=4).contains(&args.len()) && args[0] == "server" {
server(
@ -31,5 +31,5 @@ fn main() {
}
eprintln!("Usage: \n\
\x20 revpfw3 server <port> <key> [<poll delay>]\n\
\x20 revpfw3 client <server ip> <server port> <destination ip> <destination port> <key> [<poll delay> [<modem port> <modem baud> <modem init file>]]");
\x20 revpfw3 client <server ip> <server port> <destination ip> <destination port> <key> [<poll delay>]");
}

View file

@ -7,7 +7,4 @@ pub(crate) enum PacketType {
KeepAlive,
ClientData,
ServerData,
ClientExceededBuffer,
Resync,
ResyncEcho,
}

View file

@ -1,5 +1,4 @@
use std::{
collections::HashMap,
io::Read,
io::Write,
net::{Shutdown, TcpListener},
@ -8,33 +7,11 @@ use std::{
vec,
};
use crate::{Connection, PacketType, SocketAdapter};
fn resync(tcp: &mut SocketAdapter) {
println!();
eprintln!("Client version mismatch or broken connection. Re-syncing in case of the latter...");
tcp.internal.set_print(false);
tcp.write_now().unwrap();
tcp.write(&[PacketType::Resync.ordinal() as u8]).unwrap();
tcp.write_now().unwrap();
eprintln!(
"Sent resync packet. Client should now wait 8 seconds and then send a resync packet back, initiating a normal re-sync."
);
let mut buf = [0; 4096];
// read all packets that are still pending.
while let Some(Some(_x @ 1..)) = tcp.poll(&mut buf).ok() {}
// wait 5 seconds
thread::sleep(Duration::from_secs(5));
// read all packets that are still pending.
while let Some(Some(_x @ 1..)) = tcp.poll(&mut buf).ok() {}
// client should now have stopped sending packets.
}
use crate::{io_sync, PacketType, SocketAdapter};
pub fn server(port: u16, key: &str, sleep_delay_ms: u64) {
let mut buf1 = [0u8; 1];
let mut buf4 = [0u8; 4];
let mut buf8 = [0u8; 8];
let mut buf16 = [0u8; 16];
let mut buf = [0; 1024];
let tcpl = TcpListener::bind(("0.0.0.0", port)).unwrap();
let mut tcp = loop {
@ -43,7 +20,7 @@ pub fn server(port: u16, key: &str, sleep_delay_ms: u64) {
tcp.0.shutdown(Shutdown::Both).unwrap();
continue;
};
if buf4 == [b'R', b'P', b'F', 30] {
if buf4 == ['R' as u8, 'P' as u8, 'F' as u8, 30] {
println!("Compatible client connected.");
if tcp.0.read_exact(&mut buf4).is_ok() && u32::from_be_bytes(buf4) == key.len() as u32 {
println!("Key length matches.");
@ -58,19 +35,20 @@ pub fn server(port: u16, key: &str, sleep_delay_ms: u64) {
}
};
tcp.write_all(&[b'R', b'P', b'F', 30]).unwrap();
tcp.write_all(&mut ['R' as u8, 'P' as u8, 'F' as u8, 30])
.unwrap();
tcpl.set_nonblocking(true).unwrap();
let mut tcp = SocketAdapter::new(Connection::new_tcp(tcp, true));
let mut sockets: HashMap<u64, SocketAdapter> = HashMap::new();
let mut id = 0;
let mut tcp = SocketAdapter::new(tcp, true);
tcp.set_nonblocking(true);
let mut sockets: Vec<SocketAdapter> = Vec::new();
let mut last_keep_alive_sent = SystemTime::now();
let mut last_keep_alive = SystemTime::now();
loop {
let mut did_anything = false;
if last_keep_alive_sent.elapsed().unwrap().as_secs() >= 10 {
if last_keep_alive_sent.elapsed().unwrap().as_secs() >= 20 {
last_keep_alive_sent = SystemTime::now();
tcp.write(&[PacketType::KeepAlive.ordinal() as u8]).unwrap();
}
@ -79,14 +57,15 @@ pub fn server(port: u16, key: &str, sleep_delay_ms: u64) {
}
if let Ok(new) = tcpl.accept() {
let new = SocketAdapter::new(Connection::new_tcp(new.0, false));
sockets.insert((id, id += 1).0, new);
let mut new = SocketAdapter::new(new.0, false);
new.set_nonblocking(true);
sockets.push(new);
tcp.write(&[PacketType::NewClient.ordinal() as u8]).unwrap();
did_anything = true;
}
let mut to_remove = vec![];
for (&i, socket) in sockets.iter_mut() {
for (i, socket) in sockets.iter_mut().enumerate() {
if let Ok(x) = socket.poll(&mut buf) {
if let Some(len) = x {
if len == 0 {
@ -94,7 +73,7 @@ pub fn server(port: u16, key: &str, sleep_delay_ms: u64) {
} else {
tcp.write(&[PacketType::ClientData.ordinal() as u8])
.unwrap();
tcp.write(&i.to_be_bytes()).unwrap();
tcp.write(&(i as u32).to_be_bytes()).unwrap();
tcp.write(&(len as u32).to_be_bytes()).unwrap();
tcp.write(&buf[..len]).unwrap();
}
@ -104,95 +83,55 @@ pub fn server(port: u16, key: &str, sleep_delay_ms: u64) {
to_remove.push(i);
did_anything = true;
}
if let x @ 1.. = socket.clear_delay() {
tcp.write(&[PacketType::ClientExceededBuffer.ordinal() as u8])
.unwrap();
tcp.write(&i.to_be_bytes()).unwrap();
tcp.write(&x.to_be_bytes()).unwrap();
socket.punish(x);
}
}
for i in to_remove.into_iter().rev() {
tcp.write(&[PacketType::CloseClient.ordinal() as u8])
.unwrap();
tcp.write(&i.to_be_bytes()).unwrap();
if let Some(x) = sockets.remove(&i) {
let _ = x.internal.close();
}
tcp.write(&(i as u32).to_be_bytes()).unwrap();
let _ = sockets.remove(i).internal.shutdown(Shutdown::Both);
}
tcp.update().unwrap();
if tcp.poll_exact(&mut buf1).unwrap().is_none() {
if io_sync(tcp.internal.read_exact(&mut buf1))
.unwrap()
.is_none()
{
if !did_anything {
thread::sleep(Duration::from_millis(sleep_delay_ms));
}
continue;
}
let Some(pt) = PacketType::from_ordinal(buf1[0] as i8) else {
resync(&mut tcp);
continue;
};
let pt = PacketType::from_ordinal(buf1[0] as i8)
.expect("server/client version mismatch or broken TCP");
tcp.set_nonblocking(false);
match pt {
PacketType::NewClient => resync(&mut tcp),
PacketType::NewClient => unreachable!(),
PacketType::CloseClient => {
tcp.read_now(&mut buf8).unwrap();
if let Some(x) = sockets.remove(&u64::from_be_bytes(buf8)) {
let _ = x.internal.close();
}
tcp.internal.read_exact(&mut buf4).unwrap();
let _ = sockets
.remove(u32::from_be_bytes(buf4) as usize)
.internal
.shutdown(Shutdown::Both);
}
PacketType::KeepAlive => {
last_keep_alive = SystemTime::now();
}
PacketType::ClientData => resync(&mut tcp),
PacketType::ClientData => unreachable!(),
PacketType::ServerData => {
tcp.read_now(&mut buf8).unwrap();
let idx = u64::from_be_bytes(buf8);
tcp.read_now(&mut buf4).unwrap();
tcp.internal.read_exact(&mut buf4).unwrap();
let idx = u32::from_be_bytes(buf4) as usize;
tcp.internal.read_exact(&mut buf4).unwrap();
let len = u32::from_be_bytes(buf4) as usize;
tcp.read_now(&mut buf[..len]).unwrap();
tcp.internal.read_exact(&mut buf[..len]).unwrap();
if let Some(socket) = sockets.get_mut(&idx) {
let _ = socket.write_later(&buf[..len]);
}
let _ = sockets[idx].write_later(&buf[..len]);
}
PacketType::ClientExceededBuffer => {
tcp.read_now(&mut buf8).unwrap();
let idx = u64::from_be_bytes(buf8);
tcp.read_now(&mut buf16).unwrap();
let amount = u128::from_be_bytes(buf16);
// a single connection doesn't need overuse-penalties
if let (true, Some(socket)) = (sockets.len() > 1, sockets.get_mut(&idx)) {
socket.punish(amount);
}
}
PacketType::Resync => {
println!();
tcp.internal.set_print(false);
eprintln!(
"Client asked for a re-sync. Waiting 8 seconds, then sending resync-echo."
);
tcp.read_now(&mut buf8).unwrap();
id = u64::from_be_bytes(buf8).max(id);
tcp.write_now().unwrap();
thread::sleep(Duration::from_secs(8));
tcp.write(&[PacketType::ResyncEcho.ordinal() as u8])
.unwrap();
tcp.write(&id.to_be_bytes()).unwrap();
tcp.write_now().unwrap();
eprintln!("Resync-Echo sent. Going back to normal operation.");
tcp.internal.set_print(true);
}
// this one can't happen, it should only come from the server
PacketType::ResyncEcho => resync(&mut tcp),
}
tcp.set_nonblocking(true);
}
}

View file

@ -1,47 +1,57 @@
use std::{
io::{Error, Read},
io::{ErrorKind, Write},
time::SystemTime,
net::TcpStream,
};
use crate::{io_sync, Connection};
use crate::io_sync;
#[derive(Clone, Copy)]
enum Broken {
DirectErr(ErrorKind, &'static str),
OsErr(i32),
DirectErr(ErrorKind),
}
impl From<Broken> for Error {
fn from(value: Broken) -> Self {
match value {
Broken::DirectErr(x, s) => Error::new(x, s),
Broken::OsErr(x) => Error::from_raw_os_error(x),
Broken::DirectErr(x) => Error::from(x),
}
}
}
pub(crate) struct SocketAdapter {
pub(crate) internal: Connection,
pub(crate) internal: TcpStream,
written: usize,
to_write: usize,
write: [u8; 65536],
broken: Option<Broken>,
accumulated_delay: u128,
ignore_until: Option<u128>,
wait_if_full: bool,
is_nonblocking: bool,
}
impl SocketAdapter {
pub fn new(connection: Connection) -> SocketAdapter {
pub fn new(tcp: TcpStream, wait_if_full: bool) -> SocketAdapter {
Self {
internal: connection,
internal: tcp,
written: 0,
to_write: 0,
write: [0u8; 65536],
broken: None,
accumulated_delay: 0,
ignore_until: None,
wait_if_full,
is_nonblocking: false,
}
}
pub fn set_nonblocking(&mut self, nonblocking: bool) {
if let Err(x) = self.internal.set_nonblocking(nonblocking) {
self.broken = Some(Broken::OsErr(x.raw_os_error().unwrap()));
return;
}
self.is_nonblocking = nonblocking;
}
pub fn write_later(&mut self, buf: &[u8]) -> Result<(), Error> {
if let Some(ref x) = self.broken {
return Err(Error::from(*x));
@ -52,19 +62,18 @@ impl SocketAdapter {
.copy_within(self.written..self.written + self.to_write, 0);
self.written = 0;
}
let Some(x) = self
.write
.get_mut(self.to_write + self.written..self.to_write + self.written + buf.len())
else {
let sa = SystemTime::now();
self.internal.set_nonblocking(false)?;
self.internal
.write_all(&self.write[self.written..self.written + self.to_write])?;
self.written = 0;
self.to_write = buf.len();
self.write[..buf.len()].copy_from_slice(buf);
self.accumulated_delay += sa.elapsed().unwrap().as_micros();
return Ok(());
let Some(x) = self.write.get_mut(self.to_write + self.written..self.to_write + self.written + buf.len()) else {
if self.wait_if_full {
self.internal.set_nonblocking(false)?;
self.internal.write_all(&self.write[self.written..self.written + self.to_write])?;
self.internal.set_nonblocking(self.is_nonblocking)?;
self.written = 0;
self.to_write = buf.len();
self.write[..buf.len()].copy_from_slice(buf);
return Ok(());
}
self.broken = Some(Broken::DirectErr(ErrorKind::TimedOut));
return Err(ErrorKind::TimedOut.into());
};
x.copy_from_slice(buf);
self.to_write += buf.len();
@ -76,36 +85,7 @@ impl SocketAdapter {
self.update()
}
pub fn write_now(&mut self) -> Result<(), Error> {
if let Some(ref x) = self.broken {
return Err(Error::from(*x));
}
if self.to_write == 0 {
return Ok(());
}
match {
self.internal.set_nonblocking(false)?;
let r = self
.internal
.write_all(&self.write[self.written..self.written + self.to_write]);
r
} {
Ok(()) => {
self.written = 0;
self.to_write = 0;
Ok(())
}
Err(x) => {
self.broken = Some(Broken::DirectErr(x.kind(), "io error"));
Err(x)
}
}
}
pub fn update(&mut self) -> Result<(), Error> {
if Some(SystemTime::UNIX_EPOCH.elapsed().unwrap().as_micros()) < self.ignore_until {
return Ok(());
}
if let Some(ref x) = self.broken {
return Err(Error::from(*x));
}
@ -113,10 +93,11 @@ impl SocketAdapter {
return Ok(());
}
match {
self.internal.set_nonblocking(!self.internal.is_serial())?;
self.internal.set_nonblocking(true)?;
let r = self
.internal
.write(&self.write[self.written..self.written + self.to_write]);
self.internal.set_nonblocking(self.is_nonblocking)?;
r
} {
Ok(x) => {
@ -129,47 +110,14 @@ impl SocketAdapter {
}
Err(x) if x.kind() == ErrorKind::WouldBlock => Ok(()),
Err(x) => {
self.broken = Some(Broken::DirectErr(x.kind(), "io error"));
self.broken = Some(Broken::OsErr(x.raw_os_error().unwrap()));
Err(x)
}
}
}
pub fn read_now(&mut self, buf: &mut [u8]) -> Result<Option<()>, Error> {
if Some(SystemTime::UNIX_EPOCH.elapsed().unwrap().as_micros()) < self.ignore_until {
return Ok(None);
}
self.update()?;
self.internal.set_nonblocking(false)?;
io_sync(self.internal.read_exact(buf))
}
pub fn poll_exact(&mut self, buf: &mut [u8]) -> Result<Option<()>, Error> {
if Some(SystemTime::UNIX_EPOCH.elapsed().unwrap().as_micros()) < self.ignore_until {
return Ok(None);
}
self.update()?;
self.internal.set_nonblocking(true)?;
io_sync(self.internal.read_exact(buf))
}
pub fn poll(&mut self, buf: &mut [u8]) -> Result<Option<usize>, Error> {
if Some(SystemTime::UNIX_EPOCH.elapsed().unwrap().as_micros()) < self.ignore_until {
return Ok(None);
}
self.update()?;
self.internal.set_nonblocking(true)?;
io_sync(self.internal.read(buf))
}
pub fn clear_delay(&mut self) -> u128 {
(self.accumulated_delay, self.accumulated_delay = 0).0
}
pub fn punish(&mut self, time: u128) {
if self.ignore_until.is_none() {
self.ignore_until = Some(SystemTime::UNIX_EPOCH.elapsed().unwrap().as_micros());
}
self.ignore_until = self.ignore_until.map(|x| x + time);
}
}