From b75a385e70ca60f0a3c59a834d67cdc3354f4d41 Mon Sep 17 00:00:00 2001 From: TudbuT Date: Tue, 16 Aug 2022 09:20:34 +0200 Subject: [PATCH] implement more parts of TCP to improve connection speed --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/main.rs | 147 ++++++++++++++++++++++++++++++++++++++-------------- 3 files changed, 109 insertions(+), 42 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 14b1ea8..64226cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4,4 +4,4 @@ version = 3 [[package]] name = "qft" -version = "0.3.1" +version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index d41bff6..c5867bb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "qft" -version = "0.3.1" +version = "0.4.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/src/main.rs b/src/main.rs index 7737d69..655a1ca 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,11 +13,13 @@ use std::{ enum SafeReadWritePacket { Write, Ack, + ResendRequest, End, } struct SafeReadWrite { socket: UdpSocket, + last_transmitted: HashMap>, packet_count_out: u64, packet_count_in: u64, } @@ -26,29 +28,30 @@ impl SafeReadWrite { pub fn new(socket: UdpSocket) -> SafeReadWrite { SafeReadWrite { socket, + last_transmitted: HashMap::new(), packet_count_in: 0, packet_count_out: 0, } } pub fn write_safe(&mut self, buf: &[u8]) -> Result<(), Error> { - if buf.len() > 0xfffd { + if buf.len() > 0xfffc { panic!( - "too large data packet sent over SafeReadWrite ({} > 0xfffd)", + "too large data packet sent over SafeReadWrite ({} > 0xfffc)", buf.len() ); } - let id = self.packet_count_out as u8; + let id = (self.packet_count_out as u16).to_be_bytes(); self.packet_count_out += 1; - let mut buf = Vec::from(buf); - buf.insert(0, SafeReadWritePacket::Write as u8); - buf.insert(0, id); - let buf = buf.as_slice(); + let mut vbuf = Vec::from(buf); + vbuf.insert(0, SafeReadWritePacket::Write as u8); + vbuf.insert(0, id[1]); + vbuf.insert(0, id[0]); // this is now the first byte + let buf = vbuf.as_slice(); - let mut resend = true; - while resend { + loop { match self.socket.send(buf) { Ok(x) => { if x != buf.len() { @@ -59,26 +62,58 @@ impl SafeReadWrite { continue; } } - let mut buf = [0, 0]; + self.last_transmitted.insert(u16::from_be_bytes(id), vbuf); + break; + } + let mut buf = [0, 0, 0]; + if self.last_transmitted.len() < 50 { + self.socket.set_read_timeout(Some(Duration::from_millis(1))).unwrap(); + } + loop { match self.socket.recv(&mut buf).ok() { Some(x) => { - if x == 0 { + if x != 3 { continue; } - if buf[1] == SafeReadWritePacket::Ack as u8 && buf[0] == id { - resend = false; + if buf[2] == SafeReadWritePacket::Ack as u8 { + self.last_transmitted + .remove(&u16::from_be_bytes([buf[0], buf[1]])); + } + if buf[2] == SafeReadWritePacket::ResendRequest as u8 { + let buf = self + .last_transmitted + .get(&u16::from_be_bytes([buf[0], buf[1]])) + .expect("tried to ResendRequest an Ack'd packet"); + loop { + // resend until success + match self.socket.send(&buf.as_slice()) { + Ok(x) => { + if x != buf.len() { + continue; + } + } + Err(_) => { + continue; + } + }; + break; + } + // do NOT remove from last_transmitted yet, wait for Ack to do that. } } - None => {} + None => { + break; + } } } + self.socket.set_read_timeout(Some(Duration::from_millis(1000))).unwrap(); return Ok(()); } pub fn read_safe(&mut self, buf: &[u8]) -> Result<(Vec, usize), Error> { - if buf.len() > 0xfffd { + if buf.len() > 0xfffc { panic!( - "attempted to receive too large data packet with SafeReadWrite ({} > 0xfffd)", + "attempted to receive too large data packet with SafeReadWrite ({} > 0xfffc)", buf.len() ); } @@ -86,6 +121,7 @@ impl SafeReadWrite { let mut mbuf = Vec::from(buf); mbuf.insert(0, 0); mbuf.insert(0, 0); + mbuf.insert(0, 0); let buf: &mut [u8] = mbuf.as_mut(); let mut r = (vec![], 0); @@ -94,20 +130,28 @@ impl SafeReadWrite { while try_again { match self.socket.recv(buf) { Ok(x) => { - if x < 2 { + if x < 3 { continue; } - if buf[0] <= self.packet_count_in as u8 { + let id = u16::from_be_bytes([buf[0], buf[1]]); + if id <= self.packet_count_in as u16 { self.socket - .send(&[buf[0], SafeReadWritePacket::Ack as u8]) + .send(&[buf[0], buf[1], SafeReadWritePacket::Ack as u8]) .expect("send error"); } - if buf[0] == self.packet_count_in as u8 { + if id == self.packet_count_in as u16 { try_again = false; self.packet_count_in += 1; - r.1 = x - 2; + r.1 = x - 3; } - if buf[1] == SafeReadWritePacket::End as u8 { + if id > self.packet_count_in as u16 { + // ask to resend, then do nothing + let id = (self.packet_count_in as u16).to_be_bytes(); + self.socket + .send(&[id[0], id[1], SafeReadWritePacket::ResendRequest as u8]) + .expect("send error"); + } + if buf[2] == SafeReadWritePacket::End as u8 { return Ok((vec![], 0)); } } @@ -116,21 +160,22 @@ impl SafeReadWrite { } mbuf.remove(0); mbuf.remove(0); + mbuf.remove(0); r.0 = mbuf; return Ok(r); } pub fn end(mut self) -> UdpSocket { - let id = self.packet_count_out as u8; + let id = (self.packet_count_out as u16).to_be_bytes(); self.packet_count_out += 1; - let mut buf = vec![]; - buf.insert(0, SafeReadWritePacket::End as u8); - buf.insert(0, id); - let buf = buf.as_slice(); + let mut vbuf = Vec::new(); + vbuf.insert(0, SafeReadWritePacket::End as u8); + vbuf.insert(0, id[1]); + vbuf.insert(0, id[0]); // this is now the first byte + let buf = vbuf.as_slice(); - let mut resend = true; - while resend { + loop { match self.socket.send(buf) { Ok(x) => { if x != buf.len() { @@ -141,14 +186,40 @@ impl SafeReadWrite { continue; } } - let mut buf = [0, 0]; + self.last_transmitted.insert(u16::from_be_bytes(id), vbuf); + break; + } + let mut buf = [0, 0, 0]; + while self.last_transmitted.len() != 0 { match self.socket.recv(&mut buf).ok() { Some(x) => { - if x == 0 { + if x != 3 { continue; } - if buf[1] == SafeReadWritePacket::Ack as u8 && buf[0] == id { - resend = false; + if buf[2] == SafeReadWritePacket::Ack as u8 { + self.last_transmitted + .remove(&u16::from_be_bytes([buf[0], buf[1]])); + } + if buf[2] == SafeReadWritePacket::ResendRequest as u8 { + let buf = self + .last_transmitted + .get(&u16::from_be_bytes([buf[0], buf[1]])) + .expect("tried to ResendRequest an Ack'd packet"); + loop { + // resend until success + match self.socket.send(&buf.as_slice()) { + Ok(x) => { + if x != buf.len() { + continue; + } + } + Err(_) => { + continue; + } + }; + break; + } + // do NOT remove from last_transmitted yet, wait for Ack to do that. } } None => {} @@ -250,14 +321,12 @@ fn sender(args: &Vec) { return; } - let m = unix_millis(); sc.write_safe(&buf[..read]).expect("send error"); bytes_sent += read as u64; if (bytes_sent % (br * 20) as u64) < (br as u64) { print!( - "\r\x1b[KSent {} bytes with ping {}", - bytes_sent, - unix_millis() - m + "\r\x1b[KSent {} bytes", + bytes_sent ); stdout().flush().unwrap(); } @@ -295,7 +364,6 @@ fn receiver(args: &Vec) { let mut sc = SafeReadWrite::new(connection); let mut bytes_received: u64 = 0; loop { - let m = unix_millis(); let (mbuf, len) = sc.read_safe(buf).expect("read error"); buf = &mbuf.leak()[..len]; if len == 0 { @@ -303,12 +371,11 @@ fn receiver(args: &Vec) { println!("Transfer done. Thank you!"); return; } - let m = unix_millis() - m; file.write(buf).expect("write error"); bytes_received += len as u64; if (bytes_received % (br * 20) as u64) < (br as u64) { - print!("\r\x1b[KReceived {} bytes with ping {}", bytes_received, m); + print!("\r\x1b[KReceived {} bytes", bytes_received); stdout().flush().unwrap(); } }