Compare commits
30 commits
Author | SHA1 | Date | |
---|---|---|---|
18eb357092 | |||
135cc71df0 | |||
a1693b1ff7 | |||
ccefec16a2 | |||
f5ea5cb3a1 | |||
9d712ddd10 | |||
43d208a39f | |||
551addfdf5 | |||
7a546a1005 | |||
1f3e347cd2 | |||
6ebf8ef516 | |||
bbf2f6e405 | |||
0d197f885b | |||
63287f17fd | |||
2c61f1b9c9 | |||
637fdcae58 | |||
1e03ee8002 | |||
289b2ef7a0 | |||
9a2d4c2931 | |||
71d5949588 | |||
af4f1d85ec | |||
6f7f24016e | |||
9b5c37ceb3 | |||
3181706ef3 | |||
58bb1d58c6 | |||
103bcf7412 | |||
9b5de1a393 | |||
bfd1c1bc90 | |||
404de4676b | |||
2b3405af8b |
15 changed files with 733 additions and 159 deletions
99
Cargo.lock
generated
99
Cargo.lock
generated
|
@ -10,23 +10,37 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
|
|||
|
||||
[[package]]
|
||||
name = "enum-ordinalize"
|
||||
version = "3.1.12"
|
||||
version = "3.1.15"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a62bb1df8b45ecb7ffa78dca1c17a438fb193eb083db0b1b494d2a61bcb5096a"
|
||||
checksum = "1bf1fa3f06bbff1ea5b1a9c7b14aa992a39657db60a2759457328d7e058f49ee"
|
||||
dependencies = [
|
||||
"num-bigint",
|
||||
"num-traits",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"rustc_version",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num-bigint"
|
||||
version = "0.4.3"
|
||||
name = "ioctl-rs"
|
||||
version = "0.1.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f93ab6289c7b344a8a9f60f88d80aa20032336fe78da341afc91c8a2341fc75f"
|
||||
checksum = "f7970510895cee30b3e9128319f2cefd4bde883a39f38baa279567ba3a7eb97d"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "libc"
|
||||
version = "0.2.148"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9cdc71e17332e86d2e1d38c1f99edcb6288ee11b815fb1a4b049eaa2114d369b"
|
||||
|
||||
[[package]]
|
||||
name = "num-bigint"
|
||||
version = "0.4.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "608e7659b5c3d7cba262d894801b9ec9d00de989e8a82bd4bef91d08da45cdc0"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"num-integer",
|
||||
|
@ -45,58 +59,86 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "num-traits"
|
||||
version = "0.2.15"
|
||||
version = "0.2.16"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "578ede34cf02f8924ab9447f50c28075b4d3e5b269972345e7e0372b38c6cdcd"
|
||||
checksum = "f30b0abd723be7e2ffca1272140fac1a2f084c77ec3e123c192b66af1ee9e6c2"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro2"
|
||||
version = "1.0.50"
|
||||
version = "1.0.67"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6ef7d57beacfaf2d8aee5937dab7b7f28de3cb8b1828479bb5de2a7106f2bae2"
|
||||
checksum = "3d433d9f1a3e8c1263d9456598b16fec66f4acc9a74dacffd35c7bb09b3a1328"
|
||||
dependencies = [
|
||||
"unicode-ident",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quote"
|
||||
version = "1.0.23"
|
||||
version = "1.0.33"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8856d8364d252a14d474036ea1358d63c9e6965c8e5c1885c18f73d70bff9c7b"
|
||||
checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "revpfw3"
|
||||
version = "0.1.2"
|
||||
version = "0.4.1"
|
||||
dependencies = [
|
||||
"enum-ordinalize",
|
||||
"serial",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustc_version"
|
||||
name = "serial"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366"
|
||||
checksum = "a1237a96570fc377c13baa1b88c7589ab66edced652e43ffb17088f003db3e86"
|
||||
dependencies = [
|
||||
"semver",
|
||||
"serial-core",
|
||||
"serial-unix",
|
||||
"serial-windows",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "semver"
|
||||
version = "1.0.16"
|
||||
name = "serial-core"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "58bc9567378fc7690d6b2addae4e60ac2eeea07becb2c64b9f218b53865cba2a"
|
||||
checksum = "3f46209b345401737ae2125fe5b19a77acce90cd53e1658cda928e4fe9a64581"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serial-unix"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f03fbca4c9d866e24a459cbca71283f545a37f8e3e002ad8c70593871453cab7"
|
||||
dependencies = [
|
||||
"ioctl-rs",
|
||||
"libc",
|
||||
"serial-core",
|
||||
"termios",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serial-windows"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "15c6d3b776267a75d31bbdfd5d36c0ca051251caafc285827052bc53bcdc8162"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"serial-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "syn"
|
||||
version = "1.0.107"
|
||||
version = "2.0.37"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1f4064b5b16e03ae50984a5a8ed5d4f8803e6bc1fd170a3cda91a1be4b18e3f5"
|
||||
checksum = "7303ef2c05cd654186cb250d29049a24840ca25d2747c25c0381c8d9e2f582e8"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
|
@ -104,7 +146,16 @@ dependencies = [
|
|||
]
|
||||
|
||||
[[package]]
|
||||
name = "unicode-ident"
|
||||
version = "1.0.6"
|
||||
name = "termios"
|
||||
version = "0.2.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "84a22b9f218b40614adcb3f4ff08b703773ad44fa9423e4e0d346d5db86e4ebc"
|
||||
checksum = "d5d9cf598a6d7ce700a4e6a9199da127e6819a61e64b68609683cc9a01b5683a"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "unicode-ident"
|
||||
version = "1.0.12"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b"
|
||||
|
|
|
@ -3,10 +3,11 @@ name = "revpfw3"
|
|||
repository = "https://github.com/tudbut/revpfw3"
|
||||
description = "A tool to bypass portforwarding restrictions using some cheap VServer"
|
||||
license = "MIT"
|
||||
version = "0.1.2"
|
||||
version = "0.4.3"
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
enum-ordinalize = "3.1"
|
||||
serial = "0.4"
|
||||
|
|
8
LICENSE
Normal file
8
LICENSE
Normal file
|
@ -0,0 +1,8 @@
|
|||
Copyright 2024 TudbuT <legal@mail.tudbut.de>
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
|
10
README.md
10
README.md
|
@ -4,6 +4,9 @@
|
|||
This tool bypasses port restrictions of your router using some not-very-powerful
|
||||
server (a cheap 1€ vserver will suffice.)
|
||||
|
||||
NEW: Modem support! RevPFW3 can now interact with modems using AT commands. A demo
|
||||
is included for some simcom modems.
|
||||
|
||||
---
|
||||
|
||||
### How to download it
|
||||
|
@ -25,11 +28,12 @@ https://github.com/tudbut/revpfw3`.
|
|||
1. Buy some cheap server online, it will only need
|
||||
1. Enough disk space to run a 5MB program (I recommend about .5GB free after
|
||||
OS is installed)
|
||||
2. 500MB RAM or more
|
||||
2. 50MB of free RAM or more (if you expect to have many clients connecting,
|
||||
use more RAM)
|
||||
3. Flexible port settings
|
||||
4. Not much CPU power
|
||||
4. Not much CPU power, a single core definitely suffices.
|
||||
2. Download revpfw3 to it
|
||||
3. Run it like this: `revpfw3 server <port> <key>` (I reocmmend doing it in a
|
||||
3. Run it like this: `revpfw3 server <port> <key>` (I recommend doing it in a
|
||||
loop)
|
||||
4. Download it to your destination as well (your PC, a raspi, etc)
|
||||
5. Run it like this: `revpfw3 client <ip of your bridge server> <port> localhost
|
||||
|
|
4
modemfiles/SIM7XXX_conn.txt
Normal file
4
modemfiles/SIM7XXX_conn.txt
Normal file
|
@ -0,0 +1,4 @@
|
|||
AT
|
||||
AT+IPADDR
|
||||
AT+CIPCLOSE=0
|
||||
AT+CIPOPEN=0,"TCP","$IP",$PORT
|
14
modemfiles/SIM7XXX_init.txt
Normal file
14
modemfiles/SIM7XXX_init.txt
Normal file
|
@ -0,0 +1,14 @@
|
|||
|
||||
AT
|
||||
|
||||
|
||||
AT+CIPMODE=1
|
||||
AT+CGDCONT=1,IP,"internet.telekom"
|
||||
AT+NETOPEN
|
||||
|
||||
|
||||
|
||||
|
||||
AT+IPADDR
|
||||
AT+CIPCLOSE=0
|
||||
AT+CIPOPEN=0,"TCP","$IP",$PORT
|
4
modemfiles/SIM800_conn.txt
Normal file
4
modemfiles/SIM800_conn.txt
Normal file
|
@ -0,0 +1,4 @@
|
|||
AT
|
||||
AT+CIFSR
|
||||
AT+CIPCLOSE=0
|
||||
AT+CIPSTART=TCP,"$IP",$PORT
|
17
modemfiles/SIM800_init.txt
Normal file
17
modemfiles/SIM800_init.txt
Normal file
|
@ -0,0 +1,17 @@
|
|||
|
||||
AT
|
||||
|
||||
|
||||
AT+CFUN?
|
||||
AT+CPIN?
|
||||
AT+CIPMODE=1
|
||||
|
||||
AT+CSTT="internet.telekom","",""
|
||||
AT+CIICR
|
||||
|
||||
|
||||
|
||||
|
||||
AT+CIFSR
|
||||
AT+CIPCLOSE=0
|
||||
AT+CIPSTART=TCP,"$IP",$PORT
|
225
src/client.rs
225
src/client.rs
|
@ -1,41 +1,151 @@
|
|||
use core::panic;
|
||||
use std::{
|
||||
io::Read,
|
||||
io::Write,
|
||||
net::{Shutdown, TcpStream},
|
||||
collections::HashMap,
|
||||
fs,
|
||||
io::{Read, Write},
|
||||
net::TcpStream,
|
||||
thread,
|
||||
time::{Duration, SystemTime},
|
||||
vec,
|
||||
};
|
||||
|
||||
use crate::{io_sync, PacketType, SocketAdapter};
|
||||
use serial::SerialPort;
|
||||
|
||||
pub fn client(ip: &str, port: u16, dest_ip: &str, dest_port: u16, key: &str, sleep_delay_ms: u64) {
|
||||
use crate::{Connection, PacketType, SocketAdapter};
|
||||
|
||||
pub struct ClientParams<'a> {
|
||||
pub server_ip: &'a str,
|
||||
pub server_port: u16,
|
||||
pub dest_ip: &'a str,
|
||||
pub dest_port: u16,
|
||||
pub key: &'a str,
|
||||
pub sleep_delay_ms: u64,
|
||||
pub modem_port: Option<&'a str>,
|
||||
pub modem_baud: Option<u32>,
|
||||
pub modem_init: Option<&'a str>,
|
||||
pub rate_limit_sleep: u64,
|
||||
}
|
||||
|
||||
fn connect(params: &ClientParams) -> Connection {
|
||||
if let Some(modem_port) = params.modem_port {
|
||||
let mut serial = serial::open(modem_port).unwrap();
|
||||
serial
|
||||
.configure(&serial::PortSettings {
|
||||
baud_rate: serial::BaudRate::from_speed(
|
||||
params.modem_baud.unwrap_or(115200) as usize
|
||||
),
|
||||
char_size: serial::CharSize::Bits8,
|
||||
parity: serial::Parity::ParityNone,
|
||||
stop_bits: serial::StopBits::Stop1,
|
||||
flow_control: serial::FlowControl::FlowNone,
|
||||
})
|
||||
.unwrap();
|
||||
if let Some(modem_init) = params.modem_init {
|
||||
serial.set_timeout(Duration::from_millis(200)).unwrap();
|
||||
for line in fs::read_to_string(modem_init)
|
||||
.expect("invalid modem init file")
|
||||
.lines()
|
||||
{
|
||||
let line = line
|
||||
.replace("$IP", ¶ms.server_ip.to_string())
|
||||
.replace("$PORT", ¶ms.server_port.to_string());
|
||||
println!("> {line}");
|
||||
serial.write_all((line + "\r\n").as_bytes()).unwrap();
|
||||
let mut s = Vec::new();
|
||||
let _ = serial.read_to_end(&mut s).is_ok();
|
||||
if !s.is_empty() {
|
||||
println!(
|
||||
"< {}",
|
||||
String::from_utf8(s).unwrap().replace("\n", "\n< ").trim()
|
||||
);
|
||||
}
|
||||
thread::sleep(Duration::from_millis(300));
|
||||
}
|
||||
serial.set_timeout(Duration::from_millis(5000)).unwrap();
|
||||
let mut s = Vec::new();
|
||||
let _ = serial.read_to_end(&mut s).is_ok();
|
||||
if !s.is_empty() {
|
||||
println!(
|
||||
"< {}",
|
||||
String::from_utf8(s).unwrap().replace("\n", "\n< ").trim()
|
||||
);
|
||||
}
|
||||
}
|
||||
serial.set_timeout(Duration::from_millis(20000)).unwrap();
|
||||
return Connection::new_serial(serial, true);
|
||||
}
|
||||
Connection::new_tcp(
|
||||
TcpStream::connect((params.server_ip, params.server_port)).unwrap(),
|
||||
true,
|
||||
)
|
||||
}
|
||||
|
||||
fn resync(tcp: &mut SocketAdapter, id: &mut u64) {
|
||||
let mut buf8 = [0u8; 8];
|
||||
println!();
|
||||
eprintln!("Server version mismatch or broken connection. Re-syncing in case of the latter...");
|
||||
tcp.internal.set_print(false);
|
||||
tcp.write_now().unwrap();
|
||||
tcp.write(&[PacketType::Resync.ordinal() as u8]).unwrap();
|
||||
tcp.write(&id.to_be_bytes()).unwrap();
|
||||
tcp.write_now().unwrap();
|
||||
eprintln!(
|
||||
"Sent resync packet. Server should now wait 8 seconds and then send a resync-echo packet."
|
||||
);
|
||||
let mut buf = [0; 4096];
|
||||
// read all packets that are still pending.
|
||||
while let Some(Some(_x @ 1..)) = tcp.poll(&mut buf).ok() {}
|
||||
// wait 5 seconds
|
||||
thread::sleep(Duration::from_secs(5));
|
||||
// read all packets that are still pending.
|
||||
while let Some(Some(_x @ 1..)) = tcp.poll(&mut buf).ok() {}
|
||||
// server should now have stopped sending packets.
|
||||
let mut buf = [0];
|
||||
eprintln!("Trying to receive the resync echo...");
|
||||
tcp.read_now(&mut buf).unwrap();
|
||||
if buf[0] as i8 == PacketType::ResyncEcho.ordinal() {
|
||||
tcp.read_now(&mut buf8).unwrap();
|
||||
*id = u64::from_be_bytes(buf8);
|
||||
eprintln!("Successfully resynced. RevPFW3 can continue.");
|
||||
} else {
|
||||
eprintln!("Resync was not successful. Stopping.");
|
||||
panic!("broken connection or server version mismatch.");
|
||||
}
|
||||
tcp.internal.set_print(true);
|
||||
}
|
||||
|
||||
pub fn client(params: ClientParams) {
|
||||
let mut buf1 = [0u8; 1];
|
||||
let mut buf4 = [0u8; 4];
|
||||
let mut buf8 = [0u8; 8];
|
||||
let mut buf16 = [0u8; 16];
|
||||
let mut buf = [0; 1024];
|
||||
let mut tcp = TcpStream::connect((ip, port)).unwrap();
|
||||
let mut tcp = connect(¶ms);
|
||||
tcp.set_print(false);
|
||||
println!("Syncing...");
|
||||
tcp.write_all(&['R' as u8, 'P' as u8, 'F' as u8, 30])
|
||||
.unwrap();
|
||||
tcp.write_all(&[b'R', b'P', b'F', 30]).unwrap();
|
||||
println!("Authenticating...");
|
||||
tcp.write_all(&(key.len() as u32).to_be_bytes()).unwrap();
|
||||
tcp.write_all(key.as_bytes()).unwrap();
|
||||
tcp.write_all(&(params.key.len() as u32).to_be_bytes())
|
||||
.unwrap();
|
||||
tcp.write_all(params.key.as_bytes()).unwrap();
|
||||
|
||||
println!("Syncing...");
|
||||
tcp.read_exact(&mut buf4).unwrap();
|
||||
if buf4 != ['R' as u8, 'P' as u8, 'F' as u8, 30] {
|
||||
if buf4 != [b'R', b'P', b'F', 30] {
|
||||
panic!("RPF30 header expected, but not found. Make sure the server is actually running revpfw3!");
|
||||
}
|
||||
tcp.write_all(&[PacketType::KeepAlive.ordinal() as u8])
|
||||
.unwrap();
|
||||
tcp.set_print(true);
|
||||
|
||||
println!("READY!");
|
||||
|
||||
let mut tcp = SocketAdapter::new(tcp, true);
|
||||
tcp.set_nonblocking(true);
|
||||
let mut sockets: Vec<SocketAdapter> = Vec::new();
|
||||
let mut tcp = SocketAdapter::new(tcp);
|
||||
let mut sockets: HashMap<u64, SocketAdapter> = HashMap::new();
|
||||
let mut id = 0;
|
||||
let mut last_keep_alive = SystemTime::now();
|
||||
loop {
|
||||
thread::sleep(Duration::from_millis(params.rate_limit_sleep));
|
||||
let mut did_anything = false;
|
||||
|
||||
if last_keep_alive.elapsed().unwrap().as_secs() >= 60 {
|
||||
|
@ -43,7 +153,7 @@ pub fn client(ip: &str, port: u16, dest_ip: &str, dest_port: u16, key: &str, sle
|
|||
}
|
||||
|
||||
let mut to_remove = vec![];
|
||||
for (i, socket) in sockets.iter_mut().enumerate() {
|
||||
for (&i, socket) in sockets.iter_mut() {
|
||||
if let Ok(x) = socket.poll(&mut buf) {
|
||||
if let Some(len) = x {
|
||||
if len == 0 {
|
||||
|
@ -51,7 +161,7 @@ pub fn client(ip: &str, port: u16, dest_ip: &str, dest_port: u16, key: &str, sle
|
|||
} else {
|
||||
tcp.write(&[PacketType::ServerData.ordinal() as u8])
|
||||
.unwrap();
|
||||
tcp.write(&(i as u32).to_be_bytes()).unwrap();
|
||||
tcp.write(&i.to_be_bytes()).unwrap();
|
||||
tcp.write(&(len as u32).to_be_bytes()).unwrap();
|
||||
tcp.write(&buf[..len]).unwrap();
|
||||
}
|
||||
|
@ -61,42 +171,49 @@ pub fn client(ip: &str, port: u16, dest_ip: &str, dest_port: u16, key: &str, sle
|
|||
to_remove.push(i);
|
||||
did_anything = true;
|
||||
}
|
||||
if let x @ 1.. = socket.clear_delay() {
|
||||
tcp.write(&[PacketType::ClientExceededBuffer.ordinal() as u8])
|
||||
.unwrap();
|
||||
tcp.write(&i.to_be_bytes()).unwrap();
|
||||
tcp.write(&x.to_be_bytes()).unwrap();
|
||||
socket.punish(x);
|
||||
}
|
||||
}
|
||||
for i in to_remove.into_iter().rev() {
|
||||
tcp.write(&[PacketType::CloseClient.ordinal() as u8])
|
||||
.unwrap();
|
||||
tcp.write(&(i as u32).to_be_bytes()).unwrap();
|
||||
let _ = sockets.remove(i).internal.shutdown(Shutdown::Both);
|
||||
tcp.write(&i.to_be_bytes()).unwrap();
|
||||
if let Some(x) = sockets.remove(&i) {
|
||||
let _ = x.internal.close();
|
||||
}
|
||||
}
|
||||
|
||||
tcp.update().unwrap();
|
||||
if io_sync(tcp.internal.read_exact(&mut buf1))
|
||||
.unwrap()
|
||||
.is_none()
|
||||
{
|
||||
if tcp.poll_exact(&mut buf1).unwrap().is_none() {
|
||||
if !did_anything {
|
||||
thread::sleep(Duration::from_millis(sleep_delay_ms));
|
||||
thread::sleep(Duration::from_millis(params.sleep_delay_ms));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
let pt = PacketType::from_ordinal(buf1[0] as i8)
|
||||
.expect("server/client version mismatch or broken TCP");
|
||||
tcp.set_nonblocking(false);
|
||||
let Some(pt) = PacketType::from_ordinal(buf1[0] as i8) else {
|
||||
resync(&mut tcp, &mut id);
|
||||
continue;
|
||||
};
|
||||
match pt {
|
||||
PacketType::NewClient => {
|
||||
let mut tcp =
|
||||
SocketAdapter::new(TcpStream::connect((dest_ip, dest_port)).unwrap(), false);
|
||||
tcp.set_nonblocking(true);
|
||||
sockets.push(tcp);
|
||||
let tcp = SocketAdapter::new(Connection::new_tcp(
|
||||
TcpStream::connect((params.dest_ip, params.dest_port)).unwrap(),
|
||||
false,
|
||||
));
|
||||
sockets.insert((id, id += 1).0, tcp);
|
||||
}
|
||||
|
||||
PacketType::CloseClient => {
|
||||
tcp.internal.read_exact(&mut buf4).unwrap();
|
||||
let _ = sockets
|
||||
.remove(u32::from_be_bytes(buf4) as usize)
|
||||
.internal
|
||||
.shutdown(Shutdown::Both);
|
||||
tcp.read_now(&mut buf8).unwrap();
|
||||
if let Some(x) = sockets.remove(&u64::from_be_bytes(buf8)) {
|
||||
let _ = x.internal.close();
|
||||
}
|
||||
}
|
||||
|
||||
PacketType::KeepAlive => {
|
||||
|
@ -105,17 +222,41 @@ pub fn client(ip: &str, port: u16, dest_ip: &str, dest_port: u16, key: &str, sle
|
|||
}
|
||||
|
||||
PacketType::ClientData => {
|
||||
tcp.internal.read_exact(&mut buf4).unwrap();
|
||||
let idx = u32::from_be_bytes(buf4) as usize;
|
||||
tcp.internal.read_exact(&mut buf4).unwrap();
|
||||
tcp.read_now(&mut buf8).unwrap();
|
||||
let idx = u64::from_be_bytes(buf8);
|
||||
tcp.read_now(&mut buf4).unwrap();
|
||||
let len = u32::from_be_bytes(buf4) as usize;
|
||||
tcp.internal.read_exact(&mut buf[..len]).unwrap();
|
||||
tcp.read_now(&mut buf[..len]).unwrap();
|
||||
|
||||
let _ = sockets[idx].write_later(&buf[..len]);
|
||||
if let Some(socket) = sockets.get_mut(&idx) {
|
||||
let _ = socket.write_later(&buf[..len]);
|
||||
}
|
||||
}
|
||||
|
||||
PacketType::ServerData => unreachable!(),
|
||||
PacketType::ServerData => resync(&mut tcp, &mut id),
|
||||
|
||||
PacketType::ClientExceededBuffer => {
|
||||
tcp.read_now(&mut buf8).unwrap();
|
||||
let idx = u64::from_be_bytes(buf8);
|
||||
tcp.read_now(&mut buf16).unwrap();
|
||||
let amount = u128::from_be_bytes(buf16);
|
||||
|
||||
// a single connection doesn't need overuse-penalties
|
||||
if let (true, Some(socket)) = (sockets.len() > 1, sockets.get_mut(&idx)) {
|
||||
socket.punish(amount);
|
||||
}
|
||||
}
|
||||
|
||||
PacketType::Resync => {
|
||||
println!();
|
||||
tcp.internal.set_print(false);
|
||||
eprintln!("Server asked for re-sync. Waiting 8 seconds, then initiating resync.");
|
||||
thread::sleep(Duration::from_secs(8));
|
||||
resync(&mut tcp, &mut id);
|
||||
}
|
||||
|
||||
// this one shouldnt happen.
|
||||
PacketType::ResyncEcho => resync(&mut tcp, &mut id),
|
||||
}
|
||||
tcp.set_nonblocking(true);
|
||||
}
|
||||
}
|
||||
|
|
208
src/connection.rs
Normal file
208
src/connection.rs
Normal file
|
@ -0,0 +1,208 @@
|
|||
use std::{
|
||||
io::{self, stdout, ErrorKind, Read, Write},
|
||||
net::{Shutdown, TcpStream},
|
||||
ptr::NonNull,
|
||||
time::{Duration, SystemTime},
|
||||
};
|
||||
|
||||
use serial::SerialPort;
|
||||
|
||||
trait ReadWrite: Write + Read + 'static {}
|
||||
impl<T> ReadWrite for T where T: Write + Read + 'static {}
|
||||
|
||||
enum PrintStatus {
|
||||
No,
|
||||
Yes {
|
||||
last_print: SystemTime,
|
||||
bytes: u128,
|
||||
last_bytes: u128,
|
||||
},
|
||||
}
|
||||
|
||||
pub struct Connection {
|
||||
readwrite: Box<dyn ReadWrite>,
|
||||
data: NonNull<()>,
|
||||
set_nonblocking_thunk: fn(NonNull<()>, bool) -> io::Result<()>,
|
||||
close_thunk: fn(NonNull<()>) -> io::Result<()>,
|
||||
is_nb: bool,
|
||||
is_serial: bool,
|
||||
print: bool,
|
||||
print_status: PrintStatus,
|
||||
}
|
||||
|
||||
impl Write for Connection {
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
let result = self.as_write().write(buf);
|
||||
self.print_status_result(result)
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
self.as_write().flush()
|
||||
}
|
||||
}
|
||||
|
||||
impl Read for Connection {
|
||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||
let result = self.as_read().read(buf);
|
||||
self.print_status_result(result)
|
||||
}
|
||||
|
||||
fn read_exact(&mut self, mut buf: &mut [u8]) -> io::Result<()> {
|
||||
let len = buf.len();
|
||||
while !buf.is_empty() {
|
||||
match self.read(buf) {
|
||||
Ok(0) if self.is_nb && buf.len() == len => {
|
||||
return Err(io::Error::new(ErrorKind::WouldBlock, "would block"))
|
||||
}
|
||||
Ok(0) => break,
|
||||
Ok(n) => {
|
||||
let tmp = buf;
|
||||
buf = &mut tmp[n..];
|
||||
}
|
||||
Err(ref e) if e.kind() == ErrorKind::Interrupted => {}
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
}
|
||||
if !buf.is_empty() {
|
||||
Err(io::Error::new(
|
||||
ErrorKind::UnexpectedEof,
|
||||
"failed to fill whole buffer",
|
||||
))
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Connection {
|
||||
pub fn new_tcp(stream: TcpStream, print: bool) -> Self {
|
||||
stream
|
||||
.set_read_timeout(Some(Duration::from_secs(20)))
|
||||
.unwrap();
|
||||
stream
|
||||
.set_write_timeout(Some(Duration::from_secs(20)))
|
||||
.unwrap();
|
||||
let mut stream = Box::new(stream);
|
||||
Connection {
|
||||
data: NonNull::from(stream.as_mut()).cast(),
|
||||
readwrite: stream,
|
||||
set_nonblocking_thunk: |data, nb| unsafe {
|
||||
data.cast::<TcpStream>().as_ref().set_nonblocking(nb)
|
||||
},
|
||||
close_thunk: |data| unsafe {
|
||||
data.cast::<TcpStream>().as_ref().shutdown(Shutdown::Both)
|
||||
},
|
||||
is_nb: false,
|
||||
is_serial: false,
|
||||
print: true,
|
||||
print_status: if print {
|
||||
PrintStatus::Yes {
|
||||
last_print: SystemTime::now(),
|
||||
bytes: 0,
|
||||
last_bytes: 0,
|
||||
}
|
||||
} else {
|
||||
PrintStatus::No
|
||||
},
|
||||
}
|
||||
}
|
||||
pub fn new_serial<T: SerialPort + 'static>(mut serial: T, print: bool) -> Self {
|
||||
serial.set_timeout(Duration::from_secs(20)).unwrap();
|
||||
let mut serial = Box::new(serial);
|
||||
Connection {
|
||||
data: NonNull::from(serial.as_mut()).cast(),
|
||||
readwrite: serial,
|
||||
set_nonblocking_thunk: |data, nb| unsafe {
|
||||
data.cast::<T>()
|
||||
.as_mut()
|
||||
.set_timeout(Duration::from_secs(if nb { 0 } else { 20 }))
|
||||
.map_err(|_| {
|
||||
io::Error::new(io::ErrorKind::ConnectionAborted, "serial port went down")
|
||||
})
|
||||
},
|
||||
// no need to close this.
|
||||
close_thunk: |_data| Ok(()),
|
||||
is_nb: false,
|
||||
is_serial: true,
|
||||
print: true,
|
||||
print_status: if print {
|
||||
PrintStatus::Yes {
|
||||
last_print: SystemTime::now(),
|
||||
bytes: 0,
|
||||
last_bytes: 0,
|
||||
}
|
||||
} else {
|
||||
PrintStatus::No
|
||||
},
|
||||
}
|
||||
}
|
||||
fn as_read(&mut self) -> &mut (dyn Read) {
|
||||
&mut self.readwrite
|
||||
}
|
||||
fn as_write(&mut self) -> &mut (dyn Write) {
|
||||
&mut self.readwrite
|
||||
}
|
||||
#[allow(dead_code)]
|
||||
pub fn is_nonblocking(&self) -> bool {
|
||||
self.is_nb
|
||||
}
|
||||
pub fn set_nonblocking(&mut self, nonblocking: bool) -> io::Result<()> {
|
||||
if self.is_nb == nonblocking {
|
||||
return Ok(());
|
||||
}
|
||||
self.is_nb = nonblocking;
|
||||
(self.set_nonblocking_thunk)(self.data, nonblocking)
|
||||
}
|
||||
pub fn close(&self) -> io::Result<()> {
|
||||
(self.close_thunk)(self.data)
|
||||
}
|
||||
|
||||
pub fn is_serial(&self) -> bool {
|
||||
self.is_serial
|
||||
}
|
||||
|
||||
pub fn set_print(&mut self, print: bool) {
|
||||
self.print = print;
|
||||
}
|
||||
|
||||
fn print_status(&mut self, add: usize) {
|
||||
if let &mut PrintStatus::Yes {
|
||||
ref mut last_print,
|
||||
ref mut bytes,
|
||||
ref mut last_bytes,
|
||||
} = &mut self.print_status
|
||||
{
|
||||
*bytes += add as u128;
|
||||
if last_print.elapsed().unwrap().as_secs() > 0 {
|
||||
let diff = *bytes - *last_bytes;
|
||||
let bps = to_units(diff);
|
||||
let total = to_units(*bytes);
|
||||
if self.print {
|
||||
print!(
|
||||
"\r\x1b[KCurrent transfer speed: {bps}B/s, transferred {total}B so far."
|
||||
);
|
||||
stdout().flush().unwrap();
|
||||
}
|
||||
*last_bytes = *bytes;
|
||||
*last_print = SystemTime::now();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn print_status_result(&mut self, result: io::Result<usize>) -> io::Result<usize> {
|
||||
if let Ok(b) = result {
|
||||
self.print_status(b)
|
||||
}
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
fn to_units(diff: u128) -> String {
|
||||
match diff {
|
||||
x @ 1_000_000_000_000.. => ((x / 1_000_000_000) as f64 / 1000.0).to_string() + "G",
|
||||
x @ 1_000_000_000.. => ((x / 1_000_000) as f64 / 1000.0).to_string() + "G",
|
||||
x @ 1_000_000.. => ((x / 1_000) as f64 / 1000.0).to_string() + "M",
|
||||
x @ 10_000.. => (x as f64 / 1000.0).to_string() + "K",
|
||||
x => x.to_string(),
|
||||
}
|
||||
}
|
|
@ -1,4 +1,5 @@
|
|||
mod client;
|
||||
mod connection;
|
||||
mod packet;
|
||||
mod server;
|
||||
mod socket_adapter;
|
||||
|
@ -6,6 +7,7 @@ mod socket_adapter;
|
|||
use std::io::{Error, ErrorKind};
|
||||
|
||||
pub use client::*;
|
||||
pub(crate) use connection::*;
|
||||
pub(crate) use packet::*;
|
||||
pub use server::*;
|
||||
pub(crate) use socket_adapter::*;
|
||||
|
@ -14,6 +16,7 @@ pub(crate) fn io_sync<T>(result: Result<T, Error>) -> Result<Option<T>, Error> {
|
|||
match result {
|
||||
Ok(x) => Ok(Some(x)),
|
||||
Err(x) if x.kind() == ErrorKind::WouldBlock => Ok(None),
|
||||
Err(x) if x.kind() == ErrorKind::TimedOut => Ok(None),
|
||||
Err(x) => Err(x),
|
||||
}
|
||||
}
|
||||
|
|
30
src/main.rs
30
src/main.rs
|
@ -1,22 +1,22 @@
|
|||
use std::env;
|
||||
|
||||
use revpfw3::{client, server};
|
||||
use revpfw3::{client, server, ClientParams};
|
||||
|
||||
fn main() {
|
||||
let args: Vec<_> = env::args().skip(1).collect();
|
||||
if (6..=7).contains(&args.len()) && args[0] == "client" {
|
||||
client(
|
||||
&args[1],
|
||||
args[2].parse().unwrap(),
|
||||
&args[3],
|
||||
args[4].parse().unwrap(),
|
||||
&args[5],
|
||||
if args.len() == 7 {
|
||||
args[6].parse().unwrap()
|
||||
} else {
|
||||
1
|
||||
},
|
||||
);
|
||||
if (6..=11).contains(&args.len()) && args[0] == "client" {
|
||||
client(ClientParams {
|
||||
server_ip: &args[1],
|
||||
server_port: args[2].parse().unwrap(),
|
||||
dest_ip: &args[3],
|
||||
dest_port: args[4].parse().unwrap(),
|
||||
key: &args[5],
|
||||
sleep_delay_ms: args.get(6).map(|x| x.parse().unwrap()).unwrap_or(1),
|
||||
modem_port: args.get(7).map(|x| x.as_str()),
|
||||
modem_baud: args.get(8).map(|x| x.parse().unwrap()),
|
||||
modem_init: args.get(9).map(|x| x.as_str()),
|
||||
rate_limit_sleep: args.get(10).map(|x| x.parse().unwrap()).unwrap_or(0),
|
||||
});
|
||||
}
|
||||
if (3..=4).contains(&args.len()) && args[0] == "server" {
|
||||
server(
|
||||
|
@ -31,5 +31,5 @@ fn main() {
|
|||
}
|
||||
eprintln!("Usage: \n\
|
||||
\x20 revpfw3 server <port> <key> [<poll delay>]\n\
|
||||
\x20 revpfw3 client <server ip> <server port> <destination ip> <destination port> <key> [<poll delay>]");
|
||||
\x20 revpfw3 client <server ip> <server port> <destination ip> <destination port> <key> [<poll delay> [<modem port> <modem baud> <modem init file>]]");
|
||||
}
|
||||
|
|
|
@ -7,4 +7,7 @@ pub(crate) enum PacketType {
|
|||
KeepAlive,
|
||||
ClientData,
|
||||
ServerData,
|
||||
ClientExceededBuffer,
|
||||
Resync,
|
||||
ResyncEcho,
|
||||
}
|
||||
|
|
136
src/server.rs
136
src/server.rs
|
@ -1,4 +1,5 @@
|
|||
use std::{
|
||||
collections::HashMap,
|
||||
io::Read,
|
||||
io::Write,
|
||||
net::{Shutdown, TcpListener},
|
||||
|
@ -7,20 +8,45 @@ use std::{
|
|||
vec,
|
||||
};
|
||||
|
||||
use crate::{io_sync, PacketType, SocketAdapter};
|
||||
use crate::{Connection, PacketType, SocketAdapter};
|
||||
|
||||
fn resync(tcp: &mut SocketAdapter) {
|
||||
println!();
|
||||
eprintln!("Client version mismatch or broken connection. Re-syncing in case of the latter...");
|
||||
tcp.internal.set_print(false);
|
||||
tcp.write_now().unwrap();
|
||||
tcp.write(&[PacketType::Resync.ordinal() as u8]).unwrap();
|
||||
tcp.write_now().unwrap();
|
||||
eprintln!(
|
||||
"Sent resync packet. Client should now wait 8 seconds and then send a resync packet back, initiating a normal re-sync."
|
||||
);
|
||||
let mut buf = [0; 4096];
|
||||
// read all packets that are still pending.
|
||||
while let Some(Some(_x @ 1..)) = tcp.poll(&mut buf).ok() {}
|
||||
// wait 5 seconds
|
||||
thread::sleep(Duration::from_secs(5));
|
||||
// read all packets that are still pending.
|
||||
while let Some(Some(_x @ 1..)) = tcp.poll(&mut buf).ok() {}
|
||||
// client should now have stopped sending packets.
|
||||
}
|
||||
|
||||
pub fn server(port: u16, key: &str, sleep_delay_ms: u64) {
|
||||
let mut buf1 = [0u8; 1];
|
||||
let mut buf4 = [0u8; 4];
|
||||
let mut buf8 = [0u8; 8];
|
||||
let mut buf16 = [0u8; 16];
|
||||
let mut buf = [0; 1024];
|
||||
let tcpl = TcpListener::bind(("0.0.0.0", port)).unwrap();
|
||||
let tcpl = TcpListener::bind(("::0", port)).unwrap();
|
||||
let mut tcp = loop {
|
||||
let Ok(mut tcp) = tcpl.accept() else { continue };
|
||||
tcp.0
|
||||
.set_read_timeout(Some(Duration::from_secs(5)))
|
||||
.unwrap();
|
||||
let Ok(()) = tcp.0.read_exact(&mut buf4) else {
|
||||
tcp.0.shutdown(Shutdown::Both).unwrap();
|
||||
continue;
|
||||
};
|
||||
if buf4 == ['R' as u8, 'P' as u8, 'F' as u8, 30] {
|
||||
if buf4 == [b'R', b'P', b'F', 30] {
|
||||
println!("Compatible client connected.");
|
||||
if tcp.0.read_exact(&mut buf4).is_ok() && u32::from_be_bytes(buf4) == key.len() as u32 {
|
||||
println!("Key length matches.");
|
||||
|
@ -35,20 +61,19 @@ pub fn server(port: u16, key: &str, sleep_delay_ms: u64) {
|
|||
}
|
||||
};
|
||||
|
||||
tcp.write_all(&mut ['R' as u8, 'P' as u8, 'F' as u8, 30])
|
||||
.unwrap();
|
||||
tcp.write_all(&[b'R', b'P', b'F', 30]).unwrap();
|
||||
|
||||
tcpl.set_nonblocking(true).unwrap();
|
||||
|
||||
let mut tcp = SocketAdapter::new(tcp, true);
|
||||
tcp.set_nonblocking(true);
|
||||
let mut sockets: Vec<SocketAdapter> = Vec::new();
|
||||
let mut tcp = SocketAdapter::new(Connection::new_tcp(tcp, true));
|
||||
let mut sockets: HashMap<u64, SocketAdapter> = HashMap::new();
|
||||
let mut id = 0;
|
||||
let mut last_keep_alive_sent = SystemTime::now();
|
||||
let mut last_keep_alive = SystemTime::now();
|
||||
loop {
|
||||
let mut did_anything = false;
|
||||
|
||||
if last_keep_alive_sent.elapsed().unwrap().as_secs() >= 20 {
|
||||
if last_keep_alive_sent.elapsed().unwrap().as_secs() >= 10 {
|
||||
last_keep_alive_sent = SystemTime::now();
|
||||
tcp.write(&[PacketType::KeepAlive.ordinal() as u8]).unwrap();
|
||||
}
|
||||
|
@ -57,15 +82,14 @@ pub fn server(port: u16, key: &str, sleep_delay_ms: u64) {
|
|||
}
|
||||
|
||||
if let Ok(new) = tcpl.accept() {
|
||||
let mut new = SocketAdapter::new(new.0, false);
|
||||
new.set_nonblocking(true);
|
||||
sockets.push(new);
|
||||
let new = SocketAdapter::new(Connection::new_tcp(new.0, false));
|
||||
sockets.insert((id, id += 1).0, new);
|
||||
tcp.write(&[PacketType::NewClient.ordinal() as u8]).unwrap();
|
||||
did_anything = true;
|
||||
}
|
||||
|
||||
let mut to_remove = vec![];
|
||||
for (i, socket) in sockets.iter_mut().enumerate() {
|
||||
for (&i, socket) in sockets.iter_mut() {
|
||||
if let Ok(x) = socket.poll(&mut buf) {
|
||||
if let Some(len) = x {
|
||||
if len == 0 {
|
||||
|
@ -73,7 +97,7 @@ pub fn server(port: u16, key: &str, sleep_delay_ms: u64) {
|
|||
} else {
|
||||
tcp.write(&[PacketType::ClientData.ordinal() as u8])
|
||||
.unwrap();
|
||||
tcp.write(&(i as u32).to_be_bytes()).unwrap();
|
||||
tcp.write(&i.to_be_bytes()).unwrap();
|
||||
tcp.write(&(len as u32).to_be_bytes()).unwrap();
|
||||
tcp.write(&buf[..len]).unwrap();
|
||||
}
|
||||
|
@ -83,55 +107,95 @@ pub fn server(port: u16, key: &str, sleep_delay_ms: u64) {
|
|||
to_remove.push(i);
|
||||
did_anything = true;
|
||||
}
|
||||
if let x @ 1.. = socket.clear_delay() {
|
||||
tcp.write(&[PacketType::ClientExceededBuffer.ordinal() as u8])
|
||||
.unwrap();
|
||||
tcp.write(&i.to_be_bytes()).unwrap();
|
||||
tcp.write(&x.to_be_bytes()).unwrap();
|
||||
socket.punish(x);
|
||||
}
|
||||
}
|
||||
for i in to_remove.into_iter().rev() {
|
||||
tcp.write(&[PacketType::CloseClient.ordinal() as u8])
|
||||
.unwrap();
|
||||
tcp.write(&(i as u32).to_be_bytes()).unwrap();
|
||||
let _ = sockets.remove(i).internal.shutdown(Shutdown::Both);
|
||||
tcp.write(&i.to_be_bytes()).unwrap();
|
||||
if let Some(x) = sockets.remove(&i) {
|
||||
let _ = x.internal.close();
|
||||
}
|
||||
}
|
||||
|
||||
tcp.update().unwrap();
|
||||
if io_sync(tcp.internal.read_exact(&mut buf1))
|
||||
.unwrap()
|
||||
.is_none()
|
||||
{
|
||||
if tcp.poll_exact(&mut buf1).unwrap().is_none() {
|
||||
if !did_anything {
|
||||
thread::sleep(Duration::from_millis(sleep_delay_ms));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
let pt = PacketType::from_ordinal(buf1[0] as i8)
|
||||
.expect("server/client version mismatch or broken TCP");
|
||||
tcp.set_nonblocking(false);
|
||||
let Some(pt) = PacketType::from_ordinal(buf1[0] as i8) else {
|
||||
resync(&mut tcp);
|
||||
continue;
|
||||
};
|
||||
match pt {
|
||||
PacketType::NewClient => unreachable!(),
|
||||
PacketType::NewClient => resync(&mut tcp),
|
||||
|
||||
PacketType::CloseClient => {
|
||||
tcp.internal.read_exact(&mut buf4).unwrap();
|
||||
let _ = sockets
|
||||
.remove(u32::from_be_bytes(buf4) as usize)
|
||||
.internal
|
||||
.shutdown(Shutdown::Both);
|
||||
tcp.read_now(&mut buf8).unwrap();
|
||||
if let Some(x) = sockets.remove(&u64::from_be_bytes(buf8)) {
|
||||
let _ = x.internal.close();
|
||||
}
|
||||
}
|
||||
|
||||
PacketType::KeepAlive => {
|
||||
last_keep_alive = SystemTime::now();
|
||||
}
|
||||
|
||||
PacketType::ClientData => unreachable!(),
|
||||
PacketType::ClientData => resync(&mut tcp),
|
||||
|
||||
PacketType::ServerData => {
|
||||
tcp.internal.read_exact(&mut buf4).unwrap();
|
||||
let idx = u32::from_be_bytes(buf4) as usize;
|
||||
tcp.internal.read_exact(&mut buf4).unwrap();
|
||||
tcp.read_now(&mut buf8).unwrap();
|
||||
let idx = u64::from_be_bytes(buf8);
|
||||
tcp.read_now(&mut buf4).unwrap();
|
||||
let len = u32::from_be_bytes(buf4) as usize;
|
||||
tcp.internal.read_exact(&mut buf[..len]).unwrap();
|
||||
tcp.read_now(&mut buf[..len]).unwrap();
|
||||
|
||||
let _ = sockets[idx].write_later(&buf[..len]);
|
||||
if let Some(socket) = sockets.get_mut(&idx) {
|
||||
let _ = socket.write_later(&buf[..len]);
|
||||
}
|
||||
}
|
||||
|
||||
PacketType::ClientExceededBuffer => {
|
||||
tcp.read_now(&mut buf8).unwrap();
|
||||
let idx = u64::from_be_bytes(buf8);
|
||||
tcp.read_now(&mut buf16).unwrap();
|
||||
let amount = u128::from_be_bytes(buf16);
|
||||
|
||||
// a single connection doesn't need overuse-penalties
|
||||
if let (true, Some(socket)) = (sockets.len() > 1, sockets.get_mut(&idx)) {
|
||||
socket.punish(amount);
|
||||
}
|
||||
}
|
||||
|
||||
PacketType::Resync => {
|
||||
println!();
|
||||
tcp.internal.set_print(false);
|
||||
eprintln!(
|
||||
"Client asked for a re-sync. Waiting 8 seconds, then sending resync-echo."
|
||||
);
|
||||
tcp.read_now(&mut buf8).unwrap();
|
||||
id = u64::from_be_bytes(buf8).max(id);
|
||||
tcp.write_now().unwrap();
|
||||
thread::sleep(Duration::from_secs(8));
|
||||
tcp.write(&[PacketType::ResyncEcho.ordinal() as u8])
|
||||
.unwrap();
|
||||
tcp.write(&id.to_be_bytes()).unwrap();
|
||||
tcp.write_now().unwrap();
|
||||
eprintln!("Resync-Echo sent. Going back to normal operation.");
|
||||
tcp.internal.set_print(true);
|
||||
}
|
||||
|
||||
// this one can't happen, it should only come from the server
|
||||
PacketType::ResyncEcho => resync(&mut tcp),
|
||||
}
|
||||
tcp.set_nonblocking(true);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,57 +1,47 @@
|
|||
use std::{
|
||||
io::{Error, Read},
|
||||
io::{ErrorKind, Write},
|
||||
net::TcpStream,
|
||||
time::SystemTime,
|
||||
};
|
||||
|
||||
use crate::io_sync;
|
||||
use crate::{io_sync, Connection};
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
enum Broken {
|
||||
OsErr(i32),
|
||||
DirectErr(ErrorKind),
|
||||
DirectErr(ErrorKind, &'static str),
|
||||
}
|
||||
|
||||
impl From<Broken> for Error {
|
||||
fn from(value: Broken) -> Self {
|
||||
match value {
|
||||
Broken::OsErr(x) => Error::from_raw_os_error(x),
|
||||
Broken::DirectErr(x) => Error::from(x),
|
||||
Broken::DirectErr(x, s) => Error::new(x, s),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct SocketAdapter {
|
||||
pub(crate) internal: TcpStream,
|
||||
pub(crate) internal: Connection,
|
||||
written: usize,
|
||||
to_write: usize,
|
||||
write: [u8; 65536],
|
||||
broken: Option<Broken>,
|
||||
wait_if_full: bool,
|
||||
is_nonblocking: bool,
|
||||
accumulated_delay: u128,
|
||||
ignore_until: Option<u128>,
|
||||
}
|
||||
|
||||
impl SocketAdapter {
|
||||
pub fn new(tcp: TcpStream, wait_if_full: bool) -> SocketAdapter {
|
||||
pub fn new(connection: Connection) -> SocketAdapter {
|
||||
Self {
|
||||
internal: tcp,
|
||||
internal: connection,
|
||||
written: 0,
|
||||
to_write: 0,
|
||||
write: [0u8; 65536],
|
||||
broken: None,
|
||||
wait_if_full,
|
||||
is_nonblocking: false,
|
||||
accumulated_delay: 0,
|
||||
ignore_until: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_nonblocking(&mut self, nonblocking: bool) {
|
||||
if let Err(x) = self.internal.set_nonblocking(nonblocking) {
|
||||
self.broken = Some(Broken::OsErr(x.raw_os_error().unwrap()));
|
||||
return;
|
||||
}
|
||||
self.is_nonblocking = nonblocking;
|
||||
}
|
||||
|
||||
pub fn write_later(&mut self, buf: &[u8]) -> Result<(), Error> {
|
||||
if let Some(ref x) = self.broken {
|
||||
return Err(Error::from(*x));
|
||||
|
@ -62,18 +52,19 @@ impl SocketAdapter {
|
|||
.copy_within(self.written..self.written + self.to_write, 0);
|
||||
self.written = 0;
|
||||
}
|
||||
let Some(x) = self.write.get_mut(self.to_write + self.written..self.to_write + self.written + buf.len()) else {
|
||||
if self.wait_if_full {
|
||||
self.internal.set_nonblocking(false)?;
|
||||
self.internal.write_all(&self.write[self.written..self.written + self.to_write])?;
|
||||
self.internal.set_nonblocking(self.is_nonblocking)?;
|
||||
self.written = 0;
|
||||
self.to_write = buf.len();
|
||||
self.write[..buf.len()].copy_from_slice(buf);
|
||||
return Ok(());
|
||||
}
|
||||
self.broken = Some(Broken::DirectErr(ErrorKind::TimedOut));
|
||||
return Err(ErrorKind::TimedOut.into());
|
||||
let Some(x) = self
|
||||
.write
|
||||
.get_mut(self.to_write + self.written..self.to_write + self.written + buf.len())
|
||||
else {
|
||||
let sa = SystemTime::now();
|
||||
self.internal.set_nonblocking(false)?;
|
||||
self.internal
|
||||
.write_all(&self.write[self.written..self.written + self.to_write])?;
|
||||
self.written = 0;
|
||||
self.to_write = buf.len();
|
||||
self.write[..buf.len()].copy_from_slice(buf);
|
||||
self.accumulated_delay += sa.elapsed().unwrap().as_micros();
|
||||
return Ok(());
|
||||
};
|
||||
x.copy_from_slice(buf);
|
||||
self.to_write += buf.len();
|
||||
|
@ -85,7 +76,7 @@ impl SocketAdapter {
|
|||
self.update()
|
||||
}
|
||||
|
||||
pub fn update(&mut self) -> Result<(), Error> {
|
||||
pub fn write_now(&mut self) -> Result<(), Error> {
|
||||
if let Some(ref x) = self.broken {
|
||||
return Err(Error::from(*x));
|
||||
}
|
||||
|
@ -93,11 +84,39 @@ impl SocketAdapter {
|
|||
return Ok(());
|
||||
}
|
||||
match {
|
||||
self.internal.set_nonblocking(true)?;
|
||||
self.internal.set_nonblocking(false)?;
|
||||
let r = self
|
||||
.internal
|
||||
.write_all(&self.write[self.written..self.written + self.to_write]);
|
||||
r
|
||||
} {
|
||||
Ok(()) => {
|
||||
self.written = 0;
|
||||
self.to_write = 0;
|
||||
Ok(())
|
||||
}
|
||||
Err(x) => {
|
||||
self.broken = Some(Broken::DirectErr(x.kind(), "io error"));
|
||||
Err(x)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update(&mut self) -> Result<(), Error> {
|
||||
if Some(SystemTime::UNIX_EPOCH.elapsed().unwrap().as_micros()) < self.ignore_until {
|
||||
return Ok(());
|
||||
}
|
||||
if let Some(ref x) = self.broken {
|
||||
return Err(Error::from(*x));
|
||||
}
|
||||
if self.to_write == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
match {
|
||||
self.internal.set_nonblocking(!self.internal.is_serial())?;
|
||||
let r = self
|
||||
.internal
|
||||
.write(&self.write[self.written..self.written + self.to_write]);
|
||||
self.internal.set_nonblocking(self.is_nonblocking)?;
|
||||
r
|
||||
} {
|
||||
Ok(x) => {
|
||||
|
@ -110,14 +129,47 @@ impl SocketAdapter {
|
|||
}
|
||||
Err(x) if x.kind() == ErrorKind::WouldBlock => Ok(()),
|
||||
Err(x) => {
|
||||
self.broken = Some(Broken::OsErr(x.raw_os_error().unwrap()));
|
||||
self.broken = Some(Broken::DirectErr(x.kind(), "io error"));
|
||||
Err(x)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn poll(&mut self, buf: &mut [u8]) -> Result<Option<usize>, Error> {
|
||||
pub fn read_now(&mut self, buf: &mut [u8]) -> Result<Option<()>, Error> {
|
||||
if Some(SystemTime::UNIX_EPOCH.elapsed().unwrap().as_micros()) < self.ignore_until {
|
||||
return Ok(None);
|
||||
}
|
||||
self.update()?;
|
||||
self.internal.set_nonblocking(false)?;
|
||||
io_sync(self.internal.read_exact(buf))
|
||||
}
|
||||
|
||||
pub fn poll_exact(&mut self, buf: &mut [u8]) -> Result<Option<()>, Error> {
|
||||
if Some(SystemTime::UNIX_EPOCH.elapsed().unwrap().as_micros()) < self.ignore_until {
|
||||
return Ok(None);
|
||||
}
|
||||
self.update()?;
|
||||
self.internal.set_nonblocking(true)?;
|
||||
io_sync(self.internal.read_exact(buf))
|
||||
}
|
||||
|
||||
pub fn poll(&mut self, buf: &mut [u8]) -> Result<Option<usize>, Error> {
|
||||
if Some(SystemTime::UNIX_EPOCH.elapsed().unwrap().as_micros()) < self.ignore_until {
|
||||
return Ok(None);
|
||||
}
|
||||
self.update()?;
|
||||
self.internal.set_nonblocking(true)?;
|
||||
io_sync(self.internal.read(buf))
|
||||
}
|
||||
|
||||
pub fn clear_delay(&mut self) -> u128 {
|
||||
(self.accumulated_delay, self.accumulated_delay = 0).0
|
||||
}
|
||||
|
||||
pub fn punish(&mut self, time: u128) {
|
||||
if self.ignore_until.is_none() {
|
||||
self.ignore_until = Some(SystemTime::UNIX_EPOCH.elapsed().unwrap().as_micros());
|
||||
}
|
||||
self.ignore_until = self.ignore_until.map(|x| x + time);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue