From 7afef789fc89267bb4b78eca960b6b38e356da23 Mon Sep 17 00:00:00 2001 From: TudbuT Date: Thu, 18 Aug 2022 00:10:42 +0200 Subject: [PATCH] unify end() and write_safe() into internal_write_safe() --- src/main.rs | 206 ++++++++++++++++++++-------------------------------- 1 file changed, 78 insertions(+), 128 deletions(-) diff --git a/src/main.rs b/src/main.rs index 23dfbce..cd022f0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -35,111 +35,11 @@ impl SafeReadWrite { } pub fn write_safe(&mut self, buf: &[u8]) -> Result<(), Error> { - if buf.len() > 0xfffc { - panic!( - "too large data packet sent over SafeReadWrite ({} > 0xfffc)", - buf.len() - ); - } + self.write_flush_safe(buf, false) + } - let id = (self.packet_count_out as u16).to_be_bytes(); - let idn = self.packet_count_out as u16; - self.packet_count_out += 1; - - let mut vbuf = Vec::from(buf); - vbuf.insert(0, SafeReadWritePacket::Write as u8); - vbuf.insert(0, id[1]); - vbuf.insert(0, id[0]); // this is now the first byte - let buf = vbuf.as_slice(); - - loop { - match self.socket.send(buf) { - Ok(x) => { - if x != buf.len() { - continue; - } - } - Err(_) => { - continue; - } - } - self.last_transmitted.insert(u16::from_be_bytes(id), vbuf); - break; - } - let mut buf = [0, 0, 0]; - if self.last_transmitted.len() < 50 { - self.socket - .set_read_timeout(Some(Duration::from_millis(1))) - .unwrap(); - } - let mut wait = idn == 0xffff; - if wait { - print!("\r\x1b[KPacket ID needs to wrap. Waiting for partner to catch up...") - } - let mut is_catching_up = false; - loop { - match self.socket.recv(&mut buf).ok() { - Some(x) => { - if x != 3 { - continue; - } - if buf[2] == SafeReadWritePacket::Ack as u8 { - let n = u16::from_be_bytes([buf[0], buf[1]]); - self.last_transmitted.remove(&n); - if n == idn { - if idn == 0xffff { - println!("\r\x1b[KPacket ID wrap successful."); - } - wait = false; - self.last_transmitted.clear(); // if the latest packet is ACK'd, all - // previous ones must be as well. - } - } - if buf[2] == SafeReadWritePacket::ResendRequest as u8 { - let mut n = u16::from_be_bytes([buf[0], buf[1]]); - if !is_catching_up { - println!("\r\x1b[KA packet dropped: {}", &n); - } - wait = true; - is_catching_up = true; - while n <= idn && !(idn == 0xffff && n == 0) { - let buf = self.last_transmitted.get(&n).expect( - format!( - "tried to ResendRequest an Ack'd packet with ID {}. Current ID: {}", - &n, &idn - ) - .as_str(), - ); - loop { - // resend until success - match self.socket.send(&buf.as_slice()) { - Ok(x) => { - if x != buf.len() { - continue; - } - } - Err(_) => { - continue; - } - }; - break; - } - // do NOT remove from last_transmitted yet, wait for Ack to do that. - n += 1; - } - } - } - None => { - if !wait { - break; - } - } - } - } - self.socket - .set_read_timeout(Some(Duration::from_millis(1000))) - .unwrap(); - return Ok(()); + pub fn write_flush_safe(&mut self, buf: &[u8], flush: bool) -> Result<(), Error> { + self.internal_write_safe(buf, SafeReadWritePacket::Write, flush) } pub fn read_safe(&mut self, buf: &[u8]) -> Result<(Vec, usize), Error> { @@ -209,11 +109,25 @@ impl SafeReadWrite { } pub fn end(mut self) -> UdpSocket { + let _ = self.internal_write_safe(&mut [], SafeReadWritePacket::End, true); + + self.socket + } + + fn internal_write_safe(&mut self, buf: &[u8], packet: SafeReadWritePacket, flush: bool) -> Result<(), Error> { + if buf.len() > 0xfffc { + panic!( + "too large data packet sent over SafeReadWrite ({} > 0xfffc)", + buf.len() + ); + } + let id = (self.packet_count_out as u16).to_be_bytes(); + let idn = self.packet_count_out as u16; self.packet_count_out += 1; - let mut vbuf = Vec::new(); - vbuf.insert(0, SafeReadWritePacket::End as u8); + let mut vbuf = Vec::from(buf); + vbuf.insert(0, packet as u8); vbuf.insert(0, id[1]); vbuf.insert(0, id[0]); // this is now the first byte let buf = vbuf.as_slice(); @@ -233,45 +147,81 @@ impl SafeReadWrite { break; } let mut buf = [0, 0, 0]; - while self.last_transmitted.len() != 0 { + if self.last_transmitted.len() < 50 { + self.socket + .set_read_timeout(Some(Duration::from_millis(1))) + .unwrap(); + } + let mut wait = idn == 0xffff || flush; + if idn == 0xffff { + print!("\r\x1b[KPacket ID needs to wrap. Waiting for partner to catch up...") + } + let mut is_catching_up = false; + loop { match self.socket.recv(&mut buf).ok() { Some(x) => { if x != 3 { continue; } if buf[2] == SafeReadWritePacket::Ack as u8 { - self.last_transmitted - .remove(&u16::from_be_bytes([buf[0], buf[1]])); + let n = u16::from_be_bytes([buf[0], buf[1]]); + self.last_transmitted.remove(&n); + if n == idn { + if idn == 0xffff { + println!("\r\x1b[KPacket ID wrap successful."); + } + wait = false; + self.last_transmitted.clear(); // if the latest packet is ACK'd, all + // previous ones must be as well. + } } if buf[2] == SafeReadWritePacket::ResendRequest as u8 { - let buf = self - .last_transmitted - .get(&u16::from_be_bytes([buf[0], buf[1]])) - .expect("tried to ResendRequest an Ack'd packet"); - println!("\nCatching up..."); - loop { - // resend until success - match self.socket.send(&buf.as_slice()) { - Ok(x) => { - if x != buf.len() { + let mut n = u16::from_be_bytes([buf[0], buf[1]]); + if !is_catching_up { + println!("\r\x1b[KA packet dropped: {}", &n); + } + wait = true; + is_catching_up = true; + while n <= idn && !(idn == 0xffff && n == 0) { + let buf = self.last_transmitted.get(&n).expect( + format!( + "tried to ResendRequest an Ack'd packet with ID {}. Current ID: {}", + &n, &idn + ) + .as_str(), + ); + loop { + // resend until success + match self.socket.send(&buf.as_slice()) { + Ok(x) => { + if x != buf.len() { + continue; + } + } + Err(_) => { continue; } - } - Err(_) => { - continue; - } - }; - break; + }; + break; + } + // do NOT remove from last_transmitted yet, wait for Ack to do that. + n += 1; } - // do NOT remove from last_transmitted yet, wait for Ack to do that. } } - None => {} + None => { + if !wait { + break; + } + } } } - self.socket + .set_read_timeout(Some(Duration::from_millis(1000))) + .unwrap(); + return Ok(()); } + } fn main() {