implement more parts of TCP to improve connection speed

This commit is contained in:
Daniella / Tove 2022-08-16 09:20:34 +02:00
parent 3522d0ffa7
commit b75a385e70
3 changed files with 109 additions and 42 deletions

2
Cargo.lock generated
View file

@ -4,4 +4,4 @@ version = 3
[[package]]
name = "qft"
version = "0.3.1"
version = "0.4.0"

View file

@ -1,6 +1,6 @@
[package]
name = "qft"
version = "0.3.1"
version = "0.4.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

View file

@ -13,11 +13,13 @@ use std::{
enum SafeReadWritePacket {
Write,
Ack,
ResendRequest,
End,
}
struct SafeReadWrite {
socket: UdpSocket,
last_transmitted: HashMap<u16, Vec<u8>>,
packet_count_out: u64,
packet_count_in: u64,
}
@ -26,29 +28,30 @@ impl SafeReadWrite {
pub fn new(socket: UdpSocket) -> SafeReadWrite {
SafeReadWrite {
socket,
last_transmitted: HashMap::new(),
packet_count_in: 0,
packet_count_out: 0,
}
}
pub fn write_safe(&mut self, buf: &[u8]) -> Result<(), Error> {
if buf.len() > 0xfffd {
if buf.len() > 0xfffc {
panic!(
"too large data packet sent over SafeReadWrite ({} > 0xfffd)",
"too large data packet sent over SafeReadWrite ({} > 0xfffc)",
buf.len()
);
}
let id = self.packet_count_out as u8;
let id = (self.packet_count_out as u16).to_be_bytes();
self.packet_count_out += 1;
let mut buf = Vec::from(buf);
buf.insert(0, SafeReadWritePacket::Write as u8);
buf.insert(0, id);
let buf = buf.as_slice();
let mut vbuf = Vec::from(buf);
vbuf.insert(0, SafeReadWritePacket::Write as u8);
vbuf.insert(0, id[1]);
vbuf.insert(0, id[0]); // this is now the first byte
let buf = vbuf.as_slice();
let mut resend = true;
while resend {
loop {
match self.socket.send(buf) {
Ok(x) => {
if x != buf.len() {
@ -59,26 +62,58 @@ impl SafeReadWrite {
continue;
}
}
let mut buf = [0, 0];
self.last_transmitted.insert(u16::from_be_bytes(id), vbuf);
break;
}
let mut buf = [0, 0, 0];
if self.last_transmitted.len() < 50 {
self.socket.set_read_timeout(Some(Duration::from_millis(1))).unwrap();
}
loop {
match self.socket.recv(&mut buf).ok() {
Some(x) => {
if x == 0 {
if x != 3 {
continue;
}
if buf[1] == SafeReadWritePacket::Ack as u8 && buf[0] == id {
resend = false;
if buf[2] == SafeReadWritePacket::Ack as u8 {
self.last_transmitted
.remove(&u16::from_be_bytes([buf[0], buf[1]]));
}
if buf[2] == SafeReadWritePacket::ResendRequest as u8 {
let buf = self
.last_transmitted
.get(&u16::from_be_bytes([buf[0], buf[1]]))
.expect("tried to ResendRequest an Ack'd packet");
loop {
// resend until success
match self.socket.send(&buf.as_slice()) {
Ok(x) => {
if x != buf.len() {
continue;
}
}
Err(_) => {
continue;
}
};
break;
}
// do NOT remove from last_transmitted yet, wait for Ack to do that.
}
}
None => {}
None => {
break;
}
}
}
self.socket.set_read_timeout(Some(Duration::from_millis(1000))).unwrap();
return Ok(());
}
pub fn read_safe(&mut self, buf: &[u8]) -> Result<(Vec<u8>, usize), Error> {
if buf.len() > 0xfffd {
if buf.len() > 0xfffc {
panic!(
"attempted to receive too large data packet with SafeReadWrite ({} > 0xfffd)",
"attempted to receive too large data packet with SafeReadWrite ({} > 0xfffc)",
buf.len()
);
}
@ -86,6 +121,7 @@ impl SafeReadWrite {
let mut mbuf = Vec::from(buf);
mbuf.insert(0, 0);
mbuf.insert(0, 0);
mbuf.insert(0, 0);
let buf: &mut [u8] = mbuf.as_mut();
let mut r = (vec![], 0);
@ -94,20 +130,28 @@ impl SafeReadWrite {
while try_again {
match self.socket.recv(buf) {
Ok(x) => {
if x < 2 {
if x < 3 {
continue;
}
if buf[0] <= self.packet_count_in as u8 {
let id = u16::from_be_bytes([buf[0], buf[1]]);
if id <= self.packet_count_in as u16 {
self.socket
.send(&[buf[0], SafeReadWritePacket::Ack as u8])
.send(&[buf[0], buf[1], SafeReadWritePacket::Ack as u8])
.expect("send error");
}
if buf[0] == self.packet_count_in as u8 {
if id == self.packet_count_in as u16 {
try_again = false;
self.packet_count_in += 1;
r.1 = x - 2;
r.1 = x - 3;
}
if buf[1] == SafeReadWritePacket::End as u8 {
if id > self.packet_count_in as u16 {
// ask to resend, then do nothing
let id = (self.packet_count_in as u16).to_be_bytes();
self.socket
.send(&[id[0], id[1], SafeReadWritePacket::ResendRequest as u8])
.expect("send error");
}
if buf[2] == SafeReadWritePacket::End as u8 {
return Ok((vec![], 0));
}
}
@ -116,21 +160,22 @@ impl SafeReadWrite {
}
mbuf.remove(0);
mbuf.remove(0);
mbuf.remove(0);
r.0 = mbuf;
return Ok(r);
}
pub fn end(mut self) -> UdpSocket {
let id = self.packet_count_out as u8;
let id = (self.packet_count_out as u16).to_be_bytes();
self.packet_count_out += 1;
let mut buf = vec![];
buf.insert(0, SafeReadWritePacket::End as u8);
buf.insert(0, id);
let buf = buf.as_slice();
let mut vbuf = Vec::new();
vbuf.insert(0, SafeReadWritePacket::End as u8);
vbuf.insert(0, id[1]);
vbuf.insert(0, id[0]); // this is now the first byte
let buf = vbuf.as_slice();
let mut resend = true;
while resend {
loop {
match self.socket.send(buf) {
Ok(x) => {
if x != buf.len() {
@ -141,14 +186,40 @@ impl SafeReadWrite {
continue;
}
}
let mut buf = [0, 0];
self.last_transmitted.insert(u16::from_be_bytes(id), vbuf);
break;
}
let mut buf = [0, 0, 0];
while self.last_transmitted.len() != 0 {
match self.socket.recv(&mut buf).ok() {
Some(x) => {
if x == 0 {
if x != 3 {
continue;
}
if buf[1] == SafeReadWritePacket::Ack as u8 && buf[0] == id {
resend = false;
if buf[2] == SafeReadWritePacket::Ack as u8 {
self.last_transmitted
.remove(&u16::from_be_bytes([buf[0], buf[1]]));
}
if buf[2] == SafeReadWritePacket::ResendRequest as u8 {
let buf = self
.last_transmitted
.get(&u16::from_be_bytes([buf[0], buf[1]]))
.expect("tried to ResendRequest an Ack'd packet");
loop {
// resend until success
match self.socket.send(&buf.as_slice()) {
Ok(x) => {
if x != buf.len() {
continue;
}
}
Err(_) => {
continue;
}
};
break;
}
// do NOT remove from last_transmitted yet, wait for Ack to do that.
}
}
None => {}
@ -250,14 +321,12 @@ fn sender(args: &Vec<String>) {
return;
}
let m = unix_millis();
sc.write_safe(&buf[..read]).expect("send error");
bytes_sent += read as u64;
if (bytes_sent % (br * 20) as u64) < (br as u64) {
print!(
"\r\x1b[KSent {} bytes with ping {}",
bytes_sent,
unix_millis() - m
"\r\x1b[KSent {} bytes",
bytes_sent
);
stdout().flush().unwrap();
}
@ -295,7 +364,6 @@ fn receiver(args: &Vec<String>) {
let mut sc = SafeReadWrite::new(connection);
let mut bytes_received: u64 = 0;
loop {
let m = unix_millis();
let (mbuf, len) = sc.read_safe(buf).expect("read error");
buf = &mbuf.leak()[..len];
if len == 0 {
@ -303,12 +371,11 @@ fn receiver(args: &Vec<String>) {
println!("Transfer done. Thank you!");
return;
}
let m = unix_millis() - m;
file.write(buf).expect("write error");
bytes_received += len as u64;
if (bytes_received % (br * 20) as u64) < (br as u64) {
print!("\r\x1b[KReceived {} bytes with ping {}", bytes_received, m);
print!("\r\x1b[KReceived {} bytes", bytes_received);
stdout().flush().unwrap();
}
}