Compare commits

...

30 commits
v0.1.2 ... main

Author SHA1 Message Date
18eb357092
time out when client too slow 2024-11-07 21:21:45 +01:00
135cc71df0
Add LICENSE 2024-11-07 21:21:17 +01:00
a1693b1ff7
fix using the wrong listen ip 2024-10-18 11:58:22 +02:00
ccefec16a2
bump version 2024-10-18 11:27:20 +02:00
f5ea5cb3a1
ipv6 support :D 2024-10-18 11:25:41 +02:00
9d712ddd10
make modem init a file 2023-12-28 01:16:24 +01:00
43d208a39f
massive performance++ 2023-10-08 14:31:47 +02:00
551addfdf5
change modemfiles 2023-10-05 15:58:11 +02:00
7a546a1005
increase timeout 2023-10-05 09:09:12 +02:00
1f3e347cd2
Update README.md 2023-10-05 02:26:56 +02:00
6ebf8ef516
move modem command lists to modemfiles/ 2023-10-04 21:12:24 +02:00
bbf2f6e405
fix id syncing 2023-10-04 17:42:15 +02:00
0d197f885b
migrate away from directly using internal connections, fix unreliable read_exact 2023-10-04 17:02:49 +02:00
63287f17fd
improve resyncing 2023-10-04 15:24:10 +02:00
2c61f1b9c9
lower timeout 2023-10-04 13:20:45 +02:00
637fdcae58
fix resyncing 2023-10-04 13:17:13 +02:00
1e03ee8002
add resyncing when connection breaks 2023-10-04 12:14:35 +02:00
289b2ef7a0
add data transfer speed printing 2023-10-03 02:16:02 +02:00
9a2d4c2931
add rate limit sleep 2023-10-01 23:06:56 +02:00
71d5949588
add modem-connect 2023-10-01 22:20:44 +02:00
af4f1d85ec
some fixes 2023-10-01 22:20:30 +02:00
6f7f24016e
make socketadapter not require an os error 2023-10-01 17:52:24 +02:00
9b5c37ceb3
initial modem commit 2023-10-01 17:46:59 +02:00
3181706ef3 format 2023-02-05 07:49:18 +01:00
58bb1d58c6 fix some bugs, improve speed with multiple connections 2023-02-05 07:44:59 +01:00
103bcf7412
Update README.md 2023-02-05 03:22:35 +01:00
9b5de1a393 correct version 2023-02-01 20:50:02 +01:00
bfd1c1bc90 fix some off-by-one errors 2023-02-01 20:42:46 +01:00
404de4676b fix a typo 2023-02-01 20:19:56 +01:00
2b3405af8b better bandwidth management 2023-02-01 20:15:07 +01:00
15 changed files with 733 additions and 159 deletions

99
Cargo.lock generated
View file

@ -10,23 +10,37 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "enum-ordinalize"
version = "3.1.12"
version = "3.1.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a62bb1df8b45ecb7ffa78dca1c17a438fb193eb083db0b1b494d2a61bcb5096a"
checksum = "1bf1fa3f06bbff1ea5b1a9c7b14aa992a39657db60a2759457328d7e058f49ee"
dependencies = [
"num-bigint",
"num-traits",
"proc-macro2",
"quote",
"rustc_version",
"syn",
]
[[package]]
name = "num-bigint"
version = "0.4.3"
name = "ioctl-rs"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f93ab6289c7b344a8a9f60f88d80aa20032336fe78da341afc91c8a2341fc75f"
checksum = "f7970510895cee30b3e9128319f2cefd4bde883a39f38baa279567ba3a7eb97d"
dependencies = [
"libc",
]
[[package]]
name = "libc"
version = "0.2.148"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9cdc71e17332e86d2e1d38c1f99edcb6288ee11b815fb1a4b049eaa2114d369b"
[[package]]
name = "num-bigint"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "608e7659b5c3d7cba262d894801b9ec9d00de989e8a82bd4bef91d08da45cdc0"
dependencies = [
"autocfg",
"num-integer",
@ -45,58 +59,86 @@ dependencies = [
[[package]]
name = "num-traits"
version = "0.2.15"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "578ede34cf02f8924ab9447f50c28075b4d3e5b269972345e7e0372b38c6cdcd"
checksum = "f30b0abd723be7e2ffca1272140fac1a2f084c77ec3e123c192b66af1ee9e6c2"
dependencies = [
"autocfg",
]
[[package]]
name = "proc-macro2"
version = "1.0.50"
version = "1.0.67"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ef7d57beacfaf2d8aee5937dab7b7f28de3cb8b1828479bb5de2a7106f2bae2"
checksum = "3d433d9f1a3e8c1263d9456598b16fec66f4acc9a74dacffd35c7bb09b3a1328"
dependencies = [
"unicode-ident",
]
[[package]]
name = "quote"
version = "1.0.23"
version = "1.0.33"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8856d8364d252a14d474036ea1358d63c9e6965c8e5c1885c18f73d70bff9c7b"
checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae"
dependencies = [
"proc-macro2",
]
[[package]]
name = "revpfw3"
version = "0.1.2"
version = "0.4.1"
dependencies = [
"enum-ordinalize",
"serial",
]
[[package]]
name = "rustc_version"
name = "serial"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366"
checksum = "a1237a96570fc377c13baa1b88c7589ab66edced652e43ffb17088f003db3e86"
dependencies = [
"semver",
"serial-core",
"serial-unix",
"serial-windows",
]
[[package]]
name = "semver"
version = "1.0.16"
name = "serial-core"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "58bc9567378fc7690d6b2addae4e60ac2eeea07becb2c64b9f218b53865cba2a"
checksum = "3f46209b345401737ae2125fe5b19a77acce90cd53e1658cda928e4fe9a64581"
dependencies = [
"libc",
]
[[package]]
name = "serial-unix"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f03fbca4c9d866e24a459cbca71283f545a37f8e3e002ad8c70593871453cab7"
dependencies = [
"ioctl-rs",
"libc",
"serial-core",
"termios",
]
[[package]]
name = "serial-windows"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15c6d3b776267a75d31bbdfd5d36c0ca051251caafc285827052bc53bcdc8162"
dependencies = [
"libc",
"serial-core",
]
[[package]]
name = "syn"
version = "1.0.107"
version = "2.0.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f4064b5b16e03ae50984a5a8ed5d4f8803e6bc1fd170a3cda91a1be4b18e3f5"
checksum = "7303ef2c05cd654186cb250d29049a24840ca25d2747c25c0381c8d9e2f582e8"
dependencies = [
"proc-macro2",
"quote",
@ -104,7 +146,16 @@ dependencies = [
]
[[package]]
name = "unicode-ident"
version = "1.0.6"
name = "termios"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "84a22b9f218b40614adcb3f4ff08b703773ad44fa9423e4e0d346d5db86e4ebc"
checksum = "d5d9cf598a6d7ce700a4e6a9199da127e6819a61e64b68609683cc9a01b5683a"
dependencies = [
"libc",
]
[[package]]
name = "unicode-ident"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b"

View file

@ -3,10 +3,11 @@ name = "revpfw3"
repository = "https://github.com/tudbut/revpfw3"
description = "A tool to bypass portforwarding restrictions using some cheap VServer"
license = "MIT"
version = "0.1.2"
version = "0.4.3"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
enum-ordinalize = "3.1"
serial = "0.4"

8
LICENSE Normal file
View file

@ -0,0 +1,8 @@
Copyright 2024 TudbuT <legal@mail.tudbut.de>
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View file

@ -4,6 +4,9 @@
This tool bypasses port restrictions of your router using some not-very-powerful
server (a cheap 1€ vserver will suffice.)
NEW: Modem support! RevPFW3 can now interact with modems using AT commands. A demo
is included for some simcom modems.
---
### How to download it
@ -25,11 +28,12 @@ https://github.com/tudbut/revpfw3`.
1. Buy some cheap server online, it will only need
1. Enough disk space to run a 5MB program (I recommend about .5GB free after
OS is installed)
2. 500MB RAM or more
2. 50MB of free RAM or more (if you expect to have many clients connecting,
use more RAM)
3. Flexible port settings
4. Not much CPU power
4. Not much CPU power, a single core definitely suffices.
2. Download revpfw3 to it
3. Run it like this: `revpfw3 server <port> <key>` (I reocmmend doing it in a
3. Run it like this: `revpfw3 server <port> <key>` (I recommend doing it in a
loop)
4. Download it to your destination as well (your PC, a raspi, etc)
5. Run it like this: `revpfw3 client <ip of your bridge server> <port> localhost

View file

@ -0,0 +1,4 @@
AT
AT+IPADDR
AT+CIPCLOSE=0
AT+CIPOPEN=0,"TCP","$IP",$PORT

View file

@ -0,0 +1,14 @@
AT
AT+CIPMODE=1
AT+CGDCONT=1,IP,"internet.telekom"
AT+NETOPEN
AT+IPADDR
AT+CIPCLOSE=0
AT+CIPOPEN=0,"TCP","$IP",$PORT

View file

@ -0,0 +1,4 @@
AT
AT+CIFSR
AT+CIPCLOSE=0
AT+CIPSTART=TCP,"$IP",$PORT

View file

@ -0,0 +1,17 @@
AT
AT+CFUN?
AT+CPIN?
AT+CIPMODE=1
AT+CSTT="internet.telekom","",""
AT+CIICR
AT+CIFSR
AT+CIPCLOSE=0
AT+CIPSTART=TCP,"$IP",$PORT

View file

@ -1,41 +1,151 @@
use core::panic;
use std::{
io::Read,
io::Write,
net::{Shutdown, TcpStream},
collections::HashMap,
fs,
io::{Read, Write},
net::TcpStream,
thread,
time::{Duration, SystemTime},
vec,
};
use crate::{io_sync, PacketType, SocketAdapter};
use serial::SerialPort;
pub fn client(ip: &str, port: u16, dest_ip: &str, dest_port: u16, key: &str, sleep_delay_ms: u64) {
use crate::{Connection, PacketType, SocketAdapter};
pub struct ClientParams<'a> {
pub server_ip: &'a str,
pub server_port: u16,
pub dest_ip: &'a str,
pub dest_port: u16,
pub key: &'a str,
pub sleep_delay_ms: u64,
pub modem_port: Option<&'a str>,
pub modem_baud: Option<u32>,
pub modem_init: Option<&'a str>,
pub rate_limit_sleep: u64,
}
fn connect(params: &ClientParams) -> Connection {
if let Some(modem_port) = params.modem_port {
let mut serial = serial::open(modem_port).unwrap();
serial
.configure(&serial::PortSettings {
baud_rate: serial::BaudRate::from_speed(
params.modem_baud.unwrap_or(115200) as usize
),
char_size: serial::CharSize::Bits8,
parity: serial::Parity::ParityNone,
stop_bits: serial::StopBits::Stop1,
flow_control: serial::FlowControl::FlowNone,
})
.unwrap();
if let Some(modem_init) = params.modem_init {
serial.set_timeout(Duration::from_millis(200)).unwrap();
for line in fs::read_to_string(modem_init)
.expect("invalid modem init file")
.lines()
{
let line = line
.replace("$IP", &params.server_ip.to_string())
.replace("$PORT", &params.server_port.to_string());
println!("> {line}");
serial.write_all((line + "\r\n").as_bytes()).unwrap();
let mut s = Vec::new();
let _ = serial.read_to_end(&mut s).is_ok();
if !s.is_empty() {
println!(
"< {}",
String::from_utf8(s).unwrap().replace("\n", "\n< ").trim()
);
}
thread::sleep(Duration::from_millis(300));
}
serial.set_timeout(Duration::from_millis(5000)).unwrap();
let mut s = Vec::new();
let _ = serial.read_to_end(&mut s).is_ok();
if !s.is_empty() {
println!(
"< {}",
String::from_utf8(s).unwrap().replace("\n", "\n< ").trim()
);
}
}
serial.set_timeout(Duration::from_millis(20000)).unwrap();
return Connection::new_serial(serial, true);
}
Connection::new_tcp(
TcpStream::connect((params.server_ip, params.server_port)).unwrap(),
true,
)
}
fn resync(tcp: &mut SocketAdapter, id: &mut u64) {
let mut buf8 = [0u8; 8];
println!();
eprintln!("Server version mismatch or broken connection. Re-syncing in case of the latter...");
tcp.internal.set_print(false);
tcp.write_now().unwrap();
tcp.write(&[PacketType::Resync.ordinal() as u8]).unwrap();
tcp.write(&id.to_be_bytes()).unwrap();
tcp.write_now().unwrap();
eprintln!(
"Sent resync packet. Server should now wait 8 seconds and then send a resync-echo packet."
);
let mut buf = [0; 4096];
// read all packets that are still pending.
while let Some(Some(_x @ 1..)) = tcp.poll(&mut buf).ok() {}
// wait 5 seconds
thread::sleep(Duration::from_secs(5));
// read all packets that are still pending.
while let Some(Some(_x @ 1..)) = tcp.poll(&mut buf).ok() {}
// server should now have stopped sending packets.
let mut buf = [0];
eprintln!("Trying to receive the resync echo...");
tcp.read_now(&mut buf).unwrap();
if buf[0] as i8 == PacketType::ResyncEcho.ordinal() {
tcp.read_now(&mut buf8).unwrap();
*id = u64::from_be_bytes(buf8);
eprintln!("Successfully resynced. RevPFW3 can continue.");
} else {
eprintln!("Resync was not successful. Stopping.");
panic!("broken connection or server version mismatch.");
}
tcp.internal.set_print(true);
}
pub fn client(params: ClientParams) {
let mut buf1 = [0u8; 1];
let mut buf4 = [0u8; 4];
let mut buf8 = [0u8; 8];
let mut buf16 = [0u8; 16];
let mut buf = [0; 1024];
let mut tcp = TcpStream::connect((ip, port)).unwrap();
let mut tcp = connect(&params);
tcp.set_print(false);
println!("Syncing...");
tcp.write_all(&['R' as u8, 'P' as u8, 'F' as u8, 30])
.unwrap();
tcp.write_all(&[b'R', b'P', b'F', 30]).unwrap();
println!("Authenticating...");
tcp.write_all(&(key.len() as u32).to_be_bytes()).unwrap();
tcp.write_all(key.as_bytes()).unwrap();
tcp.write_all(&(params.key.len() as u32).to_be_bytes())
.unwrap();
tcp.write_all(params.key.as_bytes()).unwrap();
println!("Syncing...");
tcp.read_exact(&mut buf4).unwrap();
if buf4 != ['R' as u8, 'P' as u8, 'F' as u8, 30] {
if buf4 != [b'R', b'P', b'F', 30] {
panic!("RPF30 header expected, but not found. Make sure the server is actually running revpfw3!");
}
tcp.write_all(&[PacketType::KeepAlive.ordinal() as u8])
.unwrap();
tcp.set_print(true);
println!("READY!");
let mut tcp = SocketAdapter::new(tcp, true);
tcp.set_nonblocking(true);
let mut sockets: Vec<SocketAdapter> = Vec::new();
let mut tcp = SocketAdapter::new(tcp);
let mut sockets: HashMap<u64, SocketAdapter> = HashMap::new();
let mut id = 0;
let mut last_keep_alive = SystemTime::now();
loop {
thread::sleep(Duration::from_millis(params.rate_limit_sleep));
let mut did_anything = false;
if last_keep_alive.elapsed().unwrap().as_secs() >= 60 {
@ -43,7 +153,7 @@ pub fn client(ip: &str, port: u16, dest_ip: &str, dest_port: u16, key: &str, sle
}
let mut to_remove = vec![];
for (i, socket) in sockets.iter_mut().enumerate() {
for (&i, socket) in sockets.iter_mut() {
if let Ok(x) = socket.poll(&mut buf) {
if let Some(len) = x {
if len == 0 {
@ -51,7 +161,7 @@ pub fn client(ip: &str, port: u16, dest_ip: &str, dest_port: u16, key: &str, sle
} else {
tcp.write(&[PacketType::ServerData.ordinal() as u8])
.unwrap();
tcp.write(&(i as u32).to_be_bytes()).unwrap();
tcp.write(&i.to_be_bytes()).unwrap();
tcp.write(&(len as u32).to_be_bytes()).unwrap();
tcp.write(&buf[..len]).unwrap();
}
@ -61,42 +171,49 @@ pub fn client(ip: &str, port: u16, dest_ip: &str, dest_port: u16, key: &str, sle
to_remove.push(i);
did_anything = true;
}
if let x @ 1.. = socket.clear_delay() {
tcp.write(&[PacketType::ClientExceededBuffer.ordinal() as u8])
.unwrap();
tcp.write(&i.to_be_bytes()).unwrap();
tcp.write(&x.to_be_bytes()).unwrap();
socket.punish(x);
}
}
for i in to_remove.into_iter().rev() {
tcp.write(&[PacketType::CloseClient.ordinal() as u8])
.unwrap();
tcp.write(&(i as u32).to_be_bytes()).unwrap();
let _ = sockets.remove(i).internal.shutdown(Shutdown::Both);
tcp.write(&i.to_be_bytes()).unwrap();
if let Some(x) = sockets.remove(&i) {
let _ = x.internal.close();
}
}
tcp.update().unwrap();
if io_sync(tcp.internal.read_exact(&mut buf1))
.unwrap()
.is_none()
{
if tcp.poll_exact(&mut buf1).unwrap().is_none() {
if !did_anything {
thread::sleep(Duration::from_millis(sleep_delay_ms));
thread::sleep(Duration::from_millis(params.sleep_delay_ms));
}
continue;
}
let pt = PacketType::from_ordinal(buf1[0] as i8)
.expect("server/client version mismatch or broken TCP");
tcp.set_nonblocking(false);
let Some(pt) = PacketType::from_ordinal(buf1[0] as i8) else {
resync(&mut tcp, &mut id);
continue;
};
match pt {
PacketType::NewClient => {
let mut tcp =
SocketAdapter::new(TcpStream::connect((dest_ip, dest_port)).unwrap(), false);
tcp.set_nonblocking(true);
sockets.push(tcp);
let tcp = SocketAdapter::new(Connection::new_tcp(
TcpStream::connect((params.dest_ip, params.dest_port)).unwrap(),
false,
));
sockets.insert((id, id += 1).0, tcp);
}
PacketType::CloseClient => {
tcp.internal.read_exact(&mut buf4).unwrap();
let _ = sockets
.remove(u32::from_be_bytes(buf4) as usize)
.internal
.shutdown(Shutdown::Both);
tcp.read_now(&mut buf8).unwrap();
if let Some(x) = sockets.remove(&u64::from_be_bytes(buf8)) {
let _ = x.internal.close();
}
}
PacketType::KeepAlive => {
@ -105,17 +222,41 @@ pub fn client(ip: &str, port: u16, dest_ip: &str, dest_port: u16, key: &str, sle
}
PacketType::ClientData => {
tcp.internal.read_exact(&mut buf4).unwrap();
let idx = u32::from_be_bytes(buf4) as usize;
tcp.internal.read_exact(&mut buf4).unwrap();
tcp.read_now(&mut buf8).unwrap();
let idx = u64::from_be_bytes(buf8);
tcp.read_now(&mut buf4).unwrap();
let len = u32::from_be_bytes(buf4) as usize;
tcp.internal.read_exact(&mut buf[..len]).unwrap();
tcp.read_now(&mut buf[..len]).unwrap();
let _ = sockets[idx].write_later(&buf[..len]);
if let Some(socket) = sockets.get_mut(&idx) {
let _ = socket.write_later(&buf[..len]);
}
}
PacketType::ServerData => unreachable!(),
PacketType::ServerData => resync(&mut tcp, &mut id),
PacketType::ClientExceededBuffer => {
tcp.read_now(&mut buf8).unwrap();
let idx = u64::from_be_bytes(buf8);
tcp.read_now(&mut buf16).unwrap();
let amount = u128::from_be_bytes(buf16);
// a single connection doesn't need overuse-penalties
if let (true, Some(socket)) = (sockets.len() > 1, sockets.get_mut(&idx)) {
socket.punish(amount);
}
}
PacketType::Resync => {
println!();
tcp.internal.set_print(false);
eprintln!("Server asked for re-sync. Waiting 8 seconds, then initiating resync.");
thread::sleep(Duration::from_secs(8));
resync(&mut tcp, &mut id);
}
// this one shouldnt happen.
PacketType::ResyncEcho => resync(&mut tcp, &mut id),
}
tcp.set_nonblocking(true);
}
}

208
src/connection.rs Normal file
View file

@ -0,0 +1,208 @@
use std::{
io::{self, stdout, ErrorKind, Read, Write},
net::{Shutdown, TcpStream},
ptr::NonNull,
time::{Duration, SystemTime},
};
use serial::SerialPort;
trait ReadWrite: Write + Read + 'static {}
impl<T> ReadWrite for T where T: Write + Read + 'static {}
enum PrintStatus {
No,
Yes {
last_print: SystemTime,
bytes: u128,
last_bytes: u128,
},
}
pub struct Connection {
readwrite: Box<dyn ReadWrite>,
data: NonNull<()>,
set_nonblocking_thunk: fn(NonNull<()>, bool) -> io::Result<()>,
close_thunk: fn(NonNull<()>) -> io::Result<()>,
is_nb: bool,
is_serial: bool,
print: bool,
print_status: PrintStatus,
}
impl Write for Connection {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let result = self.as_write().write(buf);
self.print_status_result(result)
}
fn flush(&mut self) -> io::Result<()> {
self.as_write().flush()
}
}
impl Read for Connection {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let result = self.as_read().read(buf);
self.print_status_result(result)
}
fn read_exact(&mut self, mut buf: &mut [u8]) -> io::Result<()> {
let len = buf.len();
while !buf.is_empty() {
match self.read(buf) {
Ok(0) if self.is_nb && buf.len() == len => {
return Err(io::Error::new(ErrorKind::WouldBlock, "would block"))
}
Ok(0) => break,
Ok(n) => {
let tmp = buf;
buf = &mut tmp[n..];
}
Err(ref e) if e.kind() == ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
}
if !buf.is_empty() {
Err(io::Error::new(
ErrorKind::UnexpectedEof,
"failed to fill whole buffer",
))
} else {
Ok(())
}
}
}
impl Connection {
pub fn new_tcp(stream: TcpStream, print: bool) -> Self {
stream
.set_read_timeout(Some(Duration::from_secs(20)))
.unwrap();
stream
.set_write_timeout(Some(Duration::from_secs(20)))
.unwrap();
let mut stream = Box::new(stream);
Connection {
data: NonNull::from(stream.as_mut()).cast(),
readwrite: stream,
set_nonblocking_thunk: |data, nb| unsafe {
data.cast::<TcpStream>().as_ref().set_nonblocking(nb)
},
close_thunk: |data| unsafe {
data.cast::<TcpStream>().as_ref().shutdown(Shutdown::Both)
},
is_nb: false,
is_serial: false,
print: true,
print_status: if print {
PrintStatus::Yes {
last_print: SystemTime::now(),
bytes: 0,
last_bytes: 0,
}
} else {
PrintStatus::No
},
}
}
pub fn new_serial<T: SerialPort + 'static>(mut serial: T, print: bool) -> Self {
serial.set_timeout(Duration::from_secs(20)).unwrap();
let mut serial = Box::new(serial);
Connection {
data: NonNull::from(serial.as_mut()).cast(),
readwrite: serial,
set_nonblocking_thunk: |data, nb| unsafe {
data.cast::<T>()
.as_mut()
.set_timeout(Duration::from_secs(if nb { 0 } else { 20 }))
.map_err(|_| {
io::Error::new(io::ErrorKind::ConnectionAborted, "serial port went down")
})
},
// no need to close this.
close_thunk: |_data| Ok(()),
is_nb: false,
is_serial: true,
print: true,
print_status: if print {
PrintStatus::Yes {
last_print: SystemTime::now(),
bytes: 0,
last_bytes: 0,
}
} else {
PrintStatus::No
},
}
}
fn as_read(&mut self) -> &mut (dyn Read) {
&mut self.readwrite
}
fn as_write(&mut self) -> &mut (dyn Write) {
&mut self.readwrite
}
#[allow(dead_code)]
pub fn is_nonblocking(&self) -> bool {
self.is_nb
}
pub fn set_nonblocking(&mut self, nonblocking: bool) -> io::Result<()> {
if self.is_nb == nonblocking {
return Ok(());
}
self.is_nb = nonblocking;
(self.set_nonblocking_thunk)(self.data, nonblocking)
}
pub fn close(&self) -> io::Result<()> {
(self.close_thunk)(self.data)
}
pub fn is_serial(&self) -> bool {
self.is_serial
}
pub fn set_print(&mut self, print: bool) {
self.print = print;
}
fn print_status(&mut self, add: usize) {
if let &mut PrintStatus::Yes {
ref mut last_print,
ref mut bytes,
ref mut last_bytes,
} = &mut self.print_status
{
*bytes += add as u128;
if last_print.elapsed().unwrap().as_secs() > 0 {
let diff = *bytes - *last_bytes;
let bps = to_units(diff);
let total = to_units(*bytes);
if self.print {
print!(
"\r\x1b[KCurrent transfer speed: {bps}B/s, transferred {total}B so far."
);
stdout().flush().unwrap();
}
*last_bytes = *bytes;
*last_print = SystemTime::now();
}
}
}
fn print_status_result(&mut self, result: io::Result<usize>) -> io::Result<usize> {
if let Ok(b) = result {
self.print_status(b)
}
result
}
}
fn to_units(diff: u128) -> String {
match diff {
x @ 1_000_000_000_000.. => ((x / 1_000_000_000) as f64 / 1000.0).to_string() + "G",
x @ 1_000_000_000.. => ((x / 1_000_000) as f64 / 1000.0).to_string() + "G",
x @ 1_000_000.. => ((x / 1_000) as f64 / 1000.0).to_string() + "M",
x @ 10_000.. => (x as f64 / 1000.0).to_string() + "K",
x => x.to_string(),
}
}

View file

@ -1,4 +1,5 @@
mod client;
mod connection;
mod packet;
mod server;
mod socket_adapter;
@ -6,6 +7,7 @@ mod socket_adapter;
use std::io::{Error, ErrorKind};
pub use client::*;
pub(crate) use connection::*;
pub(crate) use packet::*;
pub use server::*;
pub(crate) use socket_adapter::*;
@ -14,6 +16,7 @@ pub(crate) fn io_sync<T>(result: Result<T, Error>) -> Result<Option<T>, Error> {
match result {
Ok(x) => Ok(Some(x)),
Err(x) if x.kind() == ErrorKind::WouldBlock => Ok(None),
Err(x) if x.kind() == ErrorKind::TimedOut => Ok(None),
Err(x) => Err(x),
}
}

View file

@ -1,22 +1,22 @@
use std::env;
use revpfw3::{client, server};
use revpfw3::{client, server, ClientParams};
fn main() {
let args: Vec<_> = env::args().skip(1).collect();
if (6..=7).contains(&args.len()) && args[0] == "client" {
client(
&args[1],
args[2].parse().unwrap(),
&args[3],
args[4].parse().unwrap(),
&args[5],
if args.len() == 7 {
args[6].parse().unwrap()
} else {
1
},
);
if (6..=11).contains(&args.len()) && args[0] == "client" {
client(ClientParams {
server_ip: &args[1],
server_port: args[2].parse().unwrap(),
dest_ip: &args[3],
dest_port: args[4].parse().unwrap(),
key: &args[5],
sleep_delay_ms: args.get(6).map(|x| x.parse().unwrap()).unwrap_or(1),
modem_port: args.get(7).map(|x| x.as_str()),
modem_baud: args.get(8).map(|x| x.parse().unwrap()),
modem_init: args.get(9).map(|x| x.as_str()),
rate_limit_sleep: args.get(10).map(|x| x.parse().unwrap()).unwrap_or(0),
});
}
if (3..=4).contains(&args.len()) && args[0] == "server" {
server(
@ -31,5 +31,5 @@ fn main() {
}
eprintln!("Usage: \n\
\x20 revpfw3 server <port> <key> [<poll delay>]\n\
\x20 revpfw3 client <server ip> <server port> <destination ip> <destination port> <key> [<poll delay>]");
\x20 revpfw3 client <server ip> <server port> <destination ip> <destination port> <key> [<poll delay> [<modem port> <modem baud> <modem init file>]]");
}

View file

@ -7,4 +7,7 @@ pub(crate) enum PacketType {
KeepAlive,
ClientData,
ServerData,
ClientExceededBuffer,
Resync,
ResyncEcho,
}

View file

@ -1,4 +1,5 @@
use std::{
collections::HashMap,
io::Read,
io::Write,
net::{Shutdown, TcpListener},
@ -7,20 +8,45 @@ use std::{
vec,
};
use crate::{io_sync, PacketType, SocketAdapter};
use crate::{Connection, PacketType, SocketAdapter};
fn resync(tcp: &mut SocketAdapter) {
println!();
eprintln!("Client version mismatch or broken connection. Re-syncing in case of the latter...");
tcp.internal.set_print(false);
tcp.write_now().unwrap();
tcp.write(&[PacketType::Resync.ordinal() as u8]).unwrap();
tcp.write_now().unwrap();
eprintln!(
"Sent resync packet. Client should now wait 8 seconds and then send a resync packet back, initiating a normal re-sync."
);
let mut buf = [0; 4096];
// read all packets that are still pending.
while let Some(Some(_x @ 1..)) = tcp.poll(&mut buf).ok() {}
// wait 5 seconds
thread::sleep(Duration::from_secs(5));
// read all packets that are still pending.
while let Some(Some(_x @ 1..)) = tcp.poll(&mut buf).ok() {}
// client should now have stopped sending packets.
}
pub fn server(port: u16, key: &str, sleep_delay_ms: u64) {
let mut buf1 = [0u8; 1];
let mut buf4 = [0u8; 4];
let mut buf8 = [0u8; 8];
let mut buf16 = [0u8; 16];
let mut buf = [0; 1024];
let tcpl = TcpListener::bind(("0.0.0.0", port)).unwrap();
let tcpl = TcpListener::bind(("::0", port)).unwrap();
let mut tcp = loop {
let Ok(mut tcp) = tcpl.accept() else { continue };
tcp.0
.set_read_timeout(Some(Duration::from_secs(5)))
.unwrap();
let Ok(()) = tcp.0.read_exact(&mut buf4) else {
tcp.0.shutdown(Shutdown::Both).unwrap();
continue;
};
if buf4 == ['R' as u8, 'P' as u8, 'F' as u8, 30] {
if buf4 == [b'R', b'P', b'F', 30] {
println!("Compatible client connected.");
if tcp.0.read_exact(&mut buf4).is_ok() && u32::from_be_bytes(buf4) == key.len() as u32 {
println!("Key length matches.");
@ -35,20 +61,19 @@ pub fn server(port: u16, key: &str, sleep_delay_ms: u64) {
}
};
tcp.write_all(&mut ['R' as u8, 'P' as u8, 'F' as u8, 30])
.unwrap();
tcp.write_all(&[b'R', b'P', b'F', 30]).unwrap();
tcpl.set_nonblocking(true).unwrap();
let mut tcp = SocketAdapter::new(tcp, true);
tcp.set_nonblocking(true);
let mut sockets: Vec<SocketAdapter> = Vec::new();
let mut tcp = SocketAdapter::new(Connection::new_tcp(tcp, true));
let mut sockets: HashMap<u64, SocketAdapter> = HashMap::new();
let mut id = 0;
let mut last_keep_alive_sent = SystemTime::now();
let mut last_keep_alive = SystemTime::now();
loop {
let mut did_anything = false;
if last_keep_alive_sent.elapsed().unwrap().as_secs() >= 20 {
if last_keep_alive_sent.elapsed().unwrap().as_secs() >= 10 {
last_keep_alive_sent = SystemTime::now();
tcp.write(&[PacketType::KeepAlive.ordinal() as u8]).unwrap();
}
@ -57,15 +82,14 @@ pub fn server(port: u16, key: &str, sleep_delay_ms: u64) {
}
if let Ok(new) = tcpl.accept() {
let mut new = SocketAdapter::new(new.0, false);
new.set_nonblocking(true);
sockets.push(new);
let new = SocketAdapter::new(Connection::new_tcp(new.0, false));
sockets.insert((id, id += 1).0, new);
tcp.write(&[PacketType::NewClient.ordinal() as u8]).unwrap();
did_anything = true;
}
let mut to_remove = vec![];
for (i, socket) in sockets.iter_mut().enumerate() {
for (&i, socket) in sockets.iter_mut() {
if let Ok(x) = socket.poll(&mut buf) {
if let Some(len) = x {
if len == 0 {
@ -73,7 +97,7 @@ pub fn server(port: u16, key: &str, sleep_delay_ms: u64) {
} else {
tcp.write(&[PacketType::ClientData.ordinal() as u8])
.unwrap();
tcp.write(&(i as u32).to_be_bytes()).unwrap();
tcp.write(&i.to_be_bytes()).unwrap();
tcp.write(&(len as u32).to_be_bytes()).unwrap();
tcp.write(&buf[..len]).unwrap();
}
@ -83,55 +107,95 @@ pub fn server(port: u16, key: &str, sleep_delay_ms: u64) {
to_remove.push(i);
did_anything = true;
}
if let x @ 1.. = socket.clear_delay() {
tcp.write(&[PacketType::ClientExceededBuffer.ordinal() as u8])
.unwrap();
tcp.write(&i.to_be_bytes()).unwrap();
tcp.write(&x.to_be_bytes()).unwrap();
socket.punish(x);
}
}
for i in to_remove.into_iter().rev() {
tcp.write(&[PacketType::CloseClient.ordinal() as u8])
.unwrap();
tcp.write(&(i as u32).to_be_bytes()).unwrap();
let _ = sockets.remove(i).internal.shutdown(Shutdown::Both);
tcp.write(&i.to_be_bytes()).unwrap();
if let Some(x) = sockets.remove(&i) {
let _ = x.internal.close();
}
}
tcp.update().unwrap();
if io_sync(tcp.internal.read_exact(&mut buf1))
.unwrap()
.is_none()
{
if tcp.poll_exact(&mut buf1).unwrap().is_none() {
if !did_anything {
thread::sleep(Duration::from_millis(sleep_delay_ms));
}
continue;
}
let pt = PacketType::from_ordinal(buf1[0] as i8)
.expect("server/client version mismatch or broken TCP");
tcp.set_nonblocking(false);
let Some(pt) = PacketType::from_ordinal(buf1[0] as i8) else {
resync(&mut tcp);
continue;
};
match pt {
PacketType::NewClient => unreachable!(),
PacketType::NewClient => resync(&mut tcp),
PacketType::CloseClient => {
tcp.internal.read_exact(&mut buf4).unwrap();
let _ = sockets
.remove(u32::from_be_bytes(buf4) as usize)
.internal
.shutdown(Shutdown::Both);
tcp.read_now(&mut buf8).unwrap();
if let Some(x) = sockets.remove(&u64::from_be_bytes(buf8)) {
let _ = x.internal.close();
}
}
PacketType::KeepAlive => {
last_keep_alive = SystemTime::now();
}
PacketType::ClientData => unreachable!(),
PacketType::ClientData => resync(&mut tcp),
PacketType::ServerData => {
tcp.internal.read_exact(&mut buf4).unwrap();
let idx = u32::from_be_bytes(buf4) as usize;
tcp.internal.read_exact(&mut buf4).unwrap();
tcp.read_now(&mut buf8).unwrap();
let idx = u64::from_be_bytes(buf8);
tcp.read_now(&mut buf4).unwrap();
let len = u32::from_be_bytes(buf4) as usize;
tcp.internal.read_exact(&mut buf[..len]).unwrap();
tcp.read_now(&mut buf[..len]).unwrap();
let _ = sockets[idx].write_later(&buf[..len]);
if let Some(socket) = sockets.get_mut(&idx) {
let _ = socket.write_later(&buf[..len]);
}
}
PacketType::ClientExceededBuffer => {
tcp.read_now(&mut buf8).unwrap();
let idx = u64::from_be_bytes(buf8);
tcp.read_now(&mut buf16).unwrap();
let amount = u128::from_be_bytes(buf16);
// a single connection doesn't need overuse-penalties
if let (true, Some(socket)) = (sockets.len() > 1, sockets.get_mut(&idx)) {
socket.punish(amount);
}
}
PacketType::Resync => {
println!();
tcp.internal.set_print(false);
eprintln!(
"Client asked for a re-sync. Waiting 8 seconds, then sending resync-echo."
);
tcp.read_now(&mut buf8).unwrap();
id = u64::from_be_bytes(buf8).max(id);
tcp.write_now().unwrap();
thread::sleep(Duration::from_secs(8));
tcp.write(&[PacketType::ResyncEcho.ordinal() as u8])
.unwrap();
tcp.write(&id.to_be_bytes()).unwrap();
tcp.write_now().unwrap();
eprintln!("Resync-Echo sent. Going back to normal operation.");
tcp.internal.set_print(true);
}
// this one can't happen, it should only come from the server
PacketType::ResyncEcho => resync(&mut tcp),
}
tcp.set_nonblocking(true);
}
}

View file

@ -1,57 +1,47 @@
use std::{
io::{Error, Read},
io::{ErrorKind, Write},
net::TcpStream,
time::SystemTime,
};
use crate::io_sync;
use crate::{io_sync, Connection};
#[derive(Clone, Copy)]
enum Broken {
OsErr(i32),
DirectErr(ErrorKind),
DirectErr(ErrorKind, &'static str),
}
impl From<Broken> for Error {
fn from(value: Broken) -> Self {
match value {
Broken::OsErr(x) => Error::from_raw_os_error(x),
Broken::DirectErr(x) => Error::from(x),
Broken::DirectErr(x, s) => Error::new(x, s),
}
}
}
pub(crate) struct SocketAdapter {
pub(crate) internal: TcpStream,
pub(crate) internal: Connection,
written: usize,
to_write: usize,
write: [u8; 65536],
broken: Option<Broken>,
wait_if_full: bool,
is_nonblocking: bool,
accumulated_delay: u128,
ignore_until: Option<u128>,
}
impl SocketAdapter {
pub fn new(tcp: TcpStream, wait_if_full: bool) -> SocketAdapter {
pub fn new(connection: Connection) -> SocketAdapter {
Self {
internal: tcp,
internal: connection,
written: 0,
to_write: 0,
write: [0u8; 65536],
broken: None,
wait_if_full,
is_nonblocking: false,
accumulated_delay: 0,
ignore_until: None,
}
}
pub fn set_nonblocking(&mut self, nonblocking: bool) {
if let Err(x) = self.internal.set_nonblocking(nonblocking) {
self.broken = Some(Broken::OsErr(x.raw_os_error().unwrap()));
return;
}
self.is_nonblocking = nonblocking;
}
pub fn write_later(&mut self, buf: &[u8]) -> Result<(), Error> {
if let Some(ref x) = self.broken {
return Err(Error::from(*x));
@ -62,18 +52,19 @@ impl SocketAdapter {
.copy_within(self.written..self.written + self.to_write, 0);
self.written = 0;
}
let Some(x) = self.write.get_mut(self.to_write + self.written..self.to_write + self.written + buf.len()) else {
if self.wait_if_full {
self.internal.set_nonblocking(false)?;
self.internal.write_all(&self.write[self.written..self.written + self.to_write])?;
self.internal.set_nonblocking(self.is_nonblocking)?;
self.written = 0;
self.to_write = buf.len();
self.write[..buf.len()].copy_from_slice(buf);
return Ok(());
}
self.broken = Some(Broken::DirectErr(ErrorKind::TimedOut));
return Err(ErrorKind::TimedOut.into());
let Some(x) = self
.write
.get_mut(self.to_write + self.written..self.to_write + self.written + buf.len())
else {
let sa = SystemTime::now();
self.internal.set_nonblocking(false)?;
self.internal
.write_all(&self.write[self.written..self.written + self.to_write])?;
self.written = 0;
self.to_write = buf.len();
self.write[..buf.len()].copy_from_slice(buf);
self.accumulated_delay += sa.elapsed().unwrap().as_micros();
return Ok(());
};
x.copy_from_slice(buf);
self.to_write += buf.len();
@ -85,7 +76,7 @@ impl SocketAdapter {
self.update()
}
pub fn update(&mut self) -> Result<(), Error> {
pub fn write_now(&mut self) -> Result<(), Error> {
if let Some(ref x) = self.broken {
return Err(Error::from(*x));
}
@ -93,11 +84,39 @@ impl SocketAdapter {
return Ok(());
}
match {
self.internal.set_nonblocking(true)?;
self.internal.set_nonblocking(false)?;
let r = self
.internal
.write_all(&self.write[self.written..self.written + self.to_write]);
r
} {
Ok(()) => {
self.written = 0;
self.to_write = 0;
Ok(())
}
Err(x) => {
self.broken = Some(Broken::DirectErr(x.kind(), "io error"));
Err(x)
}
}
}
pub fn update(&mut self) -> Result<(), Error> {
if Some(SystemTime::UNIX_EPOCH.elapsed().unwrap().as_micros()) < self.ignore_until {
return Ok(());
}
if let Some(ref x) = self.broken {
return Err(Error::from(*x));
}
if self.to_write == 0 {
return Ok(());
}
match {
self.internal.set_nonblocking(!self.internal.is_serial())?;
let r = self
.internal
.write(&self.write[self.written..self.written + self.to_write]);
self.internal.set_nonblocking(self.is_nonblocking)?;
r
} {
Ok(x) => {
@ -110,14 +129,47 @@ impl SocketAdapter {
}
Err(x) if x.kind() == ErrorKind::WouldBlock => Ok(()),
Err(x) => {
self.broken = Some(Broken::OsErr(x.raw_os_error().unwrap()));
self.broken = Some(Broken::DirectErr(x.kind(), "io error"));
Err(x)
}
}
}
pub fn poll(&mut self, buf: &mut [u8]) -> Result<Option<usize>, Error> {
pub fn read_now(&mut self, buf: &mut [u8]) -> Result<Option<()>, Error> {
if Some(SystemTime::UNIX_EPOCH.elapsed().unwrap().as_micros()) < self.ignore_until {
return Ok(None);
}
self.update()?;
self.internal.set_nonblocking(false)?;
io_sync(self.internal.read_exact(buf))
}
pub fn poll_exact(&mut self, buf: &mut [u8]) -> Result<Option<()>, Error> {
if Some(SystemTime::UNIX_EPOCH.elapsed().unwrap().as_micros()) < self.ignore_until {
return Ok(None);
}
self.update()?;
self.internal.set_nonblocking(true)?;
io_sync(self.internal.read_exact(buf))
}
pub fn poll(&mut self, buf: &mut [u8]) -> Result<Option<usize>, Error> {
if Some(SystemTime::UNIX_EPOCH.elapsed().unwrap().as_micros()) < self.ignore_until {
return Ok(None);
}
self.update()?;
self.internal.set_nonblocking(true)?;
io_sync(self.internal.read(buf))
}
pub fn clear_delay(&mut self) -> u128 {
(self.accumulated_delay, self.accumulated_delay = 0).0
}
pub fn punish(&mut self, time: u128) {
if self.ignore_until.is_none() {
self.ignore_until = Some(SystemTime::UNIX_EPOCH.elapsed().unwrap().as_micros());
}
self.ignore_until = self.ignore_until.map(|x| x + time);
}
}