unify end() and write_safe() into internal_write_safe()

This commit is contained in:
Daniella 2022-08-18 00:10:42 +02:00
parent 0b75ce13ba
commit 7afef789fc

View file

@ -35,111 +35,11 @@ impl SafeReadWrite {
}
pub fn write_safe(&mut self, buf: &[u8]) -> Result<(), Error> {
if buf.len() > 0xfffc {
panic!(
"too large data packet sent over SafeReadWrite ({} > 0xfffc)",
buf.len()
);
}
self.write_flush_safe(buf, false)
}
let id = (self.packet_count_out as u16).to_be_bytes();
let idn = self.packet_count_out as u16;
self.packet_count_out += 1;
let mut vbuf = Vec::from(buf);
vbuf.insert(0, SafeReadWritePacket::Write as u8);
vbuf.insert(0, id[1]);
vbuf.insert(0, id[0]); // this is now the first byte
let buf = vbuf.as_slice();
loop {
match self.socket.send(buf) {
Ok(x) => {
if x != buf.len() {
continue;
}
}
Err(_) => {
continue;
}
}
self.last_transmitted.insert(u16::from_be_bytes(id), vbuf);
break;
}
let mut buf = [0, 0, 0];
if self.last_transmitted.len() < 50 {
self.socket
.set_read_timeout(Some(Duration::from_millis(1)))
.unwrap();
}
let mut wait = idn == 0xffff;
if wait {
print!("\r\x1b[KPacket ID needs to wrap. Waiting for partner to catch up...")
}
let mut is_catching_up = false;
loop {
match self.socket.recv(&mut buf).ok() {
Some(x) => {
if x != 3 {
continue;
}
if buf[2] == SafeReadWritePacket::Ack as u8 {
let n = u16::from_be_bytes([buf[0], buf[1]]);
self.last_transmitted.remove(&n);
if n == idn {
if idn == 0xffff {
println!("\r\x1b[KPacket ID wrap successful.");
}
wait = false;
self.last_transmitted.clear(); // if the latest packet is ACK'd, all
// previous ones must be as well.
}
}
if buf[2] == SafeReadWritePacket::ResendRequest as u8 {
let mut n = u16::from_be_bytes([buf[0], buf[1]]);
if !is_catching_up {
println!("\r\x1b[KA packet dropped: {}", &n);
}
wait = true;
is_catching_up = true;
while n <= idn && !(idn == 0xffff && n == 0) {
let buf = self.last_transmitted.get(&n).expect(
format!(
"tried to ResendRequest an Ack'd packet with ID {}. Current ID: {}",
&n, &idn
)
.as_str(),
);
loop {
// resend until success
match self.socket.send(&buf.as_slice()) {
Ok(x) => {
if x != buf.len() {
continue;
}
}
Err(_) => {
continue;
}
};
break;
}
// do NOT remove from last_transmitted yet, wait for Ack to do that.
n += 1;
}
}
}
None => {
if !wait {
break;
}
}
}
}
self.socket
.set_read_timeout(Some(Duration::from_millis(1000)))
.unwrap();
return Ok(());
pub fn write_flush_safe(&mut self, buf: &[u8], flush: bool) -> Result<(), Error> {
self.internal_write_safe(buf, SafeReadWritePacket::Write, flush)
}
pub fn read_safe(&mut self, buf: &[u8]) -> Result<(Vec<u8>, usize), Error> {
@ -209,11 +109,25 @@ impl SafeReadWrite {
}
pub fn end(mut self) -> UdpSocket {
let _ = self.internal_write_safe(&mut [], SafeReadWritePacket::End, true);
self.socket
}
fn internal_write_safe(&mut self, buf: &[u8], packet: SafeReadWritePacket, flush: bool) -> Result<(), Error> {
if buf.len() > 0xfffc {
panic!(
"too large data packet sent over SafeReadWrite ({} > 0xfffc)",
buf.len()
);
}
let id = (self.packet_count_out as u16).to_be_bytes();
let idn = self.packet_count_out as u16;
self.packet_count_out += 1;
let mut vbuf = Vec::new();
vbuf.insert(0, SafeReadWritePacket::End as u8);
let mut vbuf = Vec::from(buf);
vbuf.insert(0, packet as u8);
vbuf.insert(0, id[1]);
vbuf.insert(0, id[0]); // this is now the first byte
let buf = vbuf.as_slice();
@ -233,45 +147,81 @@ impl SafeReadWrite {
break;
}
let mut buf = [0, 0, 0];
while self.last_transmitted.len() != 0 {
if self.last_transmitted.len() < 50 {
self.socket
.set_read_timeout(Some(Duration::from_millis(1)))
.unwrap();
}
let mut wait = idn == 0xffff || flush;
if idn == 0xffff {
print!("\r\x1b[KPacket ID needs to wrap. Waiting for partner to catch up...")
}
let mut is_catching_up = false;
loop {
match self.socket.recv(&mut buf).ok() {
Some(x) => {
if x != 3 {
continue;
}
if buf[2] == SafeReadWritePacket::Ack as u8 {
self.last_transmitted
.remove(&u16::from_be_bytes([buf[0], buf[1]]));
let n = u16::from_be_bytes([buf[0], buf[1]]);
self.last_transmitted.remove(&n);
if n == idn {
if idn == 0xffff {
println!("\r\x1b[KPacket ID wrap successful.");
}
wait = false;
self.last_transmitted.clear(); // if the latest packet is ACK'd, all
// previous ones must be as well.
}
}
if buf[2] == SafeReadWritePacket::ResendRequest as u8 {
let buf = self
.last_transmitted
.get(&u16::from_be_bytes([buf[0], buf[1]]))
.expect("tried to ResendRequest an Ack'd packet");
println!("\nCatching up...");
loop {
// resend until success
match self.socket.send(&buf.as_slice()) {
Ok(x) => {
if x != buf.len() {
let mut n = u16::from_be_bytes([buf[0], buf[1]]);
if !is_catching_up {
println!("\r\x1b[KA packet dropped: {}", &n);
}
wait = true;
is_catching_up = true;
while n <= idn && !(idn == 0xffff && n == 0) {
let buf = self.last_transmitted.get(&n).expect(
format!(
"tried to ResendRequest an Ack'd packet with ID {}. Current ID: {}",
&n, &idn
)
.as_str(),
);
loop {
// resend until success
match self.socket.send(&buf.as_slice()) {
Ok(x) => {
if x != buf.len() {
continue;
}
}
Err(_) => {
continue;
}
}
Err(_) => {
continue;
}
};
break;
};
break;
}
// do NOT remove from last_transmitted yet, wait for Ack to do that.
n += 1;
}
// do NOT remove from last_transmitted yet, wait for Ack to do that.
}
}
None => {}
None => {
if !wait {
break;
}
}
}
}
self.socket
.set_read_timeout(Some(Duration::from_millis(1000)))
.unwrap();
return Ok(());
}
}
fn main() {