diff --git a/src/frame.rs b/src/frame.rs new file mode 100644 index 0000000..626b754 --- /dev/null +++ b/src/frame.rs @@ -0,0 +1,134 @@ +use form_data_builder::FormData; +use openssl::ssl::{Ssl, SslContext, SslMethod, SslStream}; +use std::{ + ffi::OsStr, + io::{Cursor, Read, Write}, + net::{Shutdown, TcpStream}, + time::Duration, +}; + +pub struct Frame { + pub bytes: Vec, + pub channel: u64, + cache_stream: Option>, + byte_to_write: Option, +} + +impl Frame { + pub fn new(bytes: Vec, channel: u64) -> Frame { + Frame { + bytes, + channel, + cache_stream: None, + byte_to_write: None, + } + } + + pub fn cache_frame(&mut self, message: u64, content: &str, token: &str) { + let ssl_context = SslContext::builder(SslMethod::tls_client()) + .expect("ssl: context init failed") + .build(); + let ssl = Ssl::new(&ssl_context).expect("ssl: init failed"); + let tcp_stream = TcpStream::connect("discord.com:443").expect("api: connect error"); + let mut stream = SslStream::new(ssl, tcp_stream).expect("ssl: stream init failed"); + + let mut form = FormData::new(Vec::new()); + + form.write_file( + "payload_json", + Cursor::new( + stringify!({ + "content": "{content}", + "attachments": [ + { + "id": 0, + "filename": "projbot3.gif" + } + ] + }) + .replace("{content}", content), + ), + None, + "application/json", + ) + .expect("form: payload_json failed"); + form.write_file( + "files[0]", + Cursor::new(self.bytes.as_slice()), + Some(OsStr::new("projbot3.gif")), + "image/gif", + ) + .expect("form: attachment failed"); + let mut data = form.finish().expect("form: finish failed"); + + stream.connect().expect("api: connection failed"); + stream + .write_all( + format!( + "PATCH /api/v10/channels/{}/messages/{message} HTTP/1.1\n", + &self.channel + ) + .as_bytes(), + ) + .expect("api: write failed"); + stream + .write_all( + "Host: discord.com\nUser-Agent: projbot3 image uploader (tudbut@tudbut.de)\n" + .as_bytes(), + ) + .expect("api: write failed"); + stream + .write_all(format!("Content-Length: {}\n", data.len()).as_bytes()) + .expect("api: write failed"); + stream + .write_all(format!("Content-Type: {}\n", form.content_type_header()).as_bytes()) + .expect("api: write failed"); + stream + .write_all(format!("Authorization: Bot {}\n\n", token).as_bytes()) + .expect("api: write failed"); + + // remove the last byte and cache it in the frame object for later write finish + self.byte_to_write = Some( + *data + .last() + .expect("form: empty array returned (finish failed)"), + ); + data.remove(data.len() - 1); + + stream + .write_all(data.as_slice()) + .expect("api: write failed"); + stream.flush().expect("api: flush failed"); + + self.cache_stream = Some(stream); + // now the frame is ready to send the next part + } + + pub fn complete_send(&mut self) { + let cache_stream = &mut self.cache_stream; + let byte_to_write = &self.byte_to_write; + if let Some(stream) = cache_stream { + if let Some(byte) = byte_to_write { + stream + .write_all(&[*byte]) + .expect("api: write failed at complete_send"); + stream.flush().expect("api: flush failed"); + stream + .get_ref() + .set_read_timeout(Some(Duration::from_millis(500))) + .expect("tcp: unable to set timeout"); + let mut buf = Vec::new(); + let _ = stream.read_to_end(&mut buf); // failure is normal + stream.shutdown().expect("ssl: shutdown failed"); + stream + .get_ref() + .shutdown(Shutdown::Both) + .expect("tcp: shutdown failed"); + self.cache_stream = None; + self.byte_to_write = None; + return; + } + } + panic!("complete_send called on uncached frame!"); + } +} diff --git a/src/main.rs b/src/main.rs index 2fb62b0..9464f1d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,19 +1,7 @@ -use std::{ - env, - ffi::OsStr, - fs::{self, File, OpenOptions}, - io::{Cursor, Read, Write}, - net::{Shutdown, TcpStream}, - path::Path, - process::{self, Stdio}, - sync::{Arc, Mutex}, - thread, - time::{Duration, SystemTime}, -}; +mod frame; -use form_data_builder::FormData; +use crate::frame::Frame; use gif::Encoder; -use openssl::ssl::{Ssl, SslContext, SslMethod, SslStream}; use png::Decoder; use serenity::{ async_trait, @@ -24,127 +12,21 @@ use serenity::{ Client, }; use songbird::SerenityInit; +use std::{ + env, + fs::{self, File, OpenOptions}, + io::Read, + path::Path, + process::{self, Stdio}, + sync::{Arc, Mutex}, + thread, + time::{Duration, SystemTime}, +}; -struct Frame { - bytes: Vec, - channel: u64, - cache_stream: Option>, - byte_to_write: Option, -} - -impl Frame { - fn cache_frame(&mut self, message: u64, content: &str, token: &str) { - let ssl_context = SslContext::builder(SslMethod::tls_client()) - .expect("ssl: context init failed") - .build(); - let ssl = Ssl::new(&ssl_context).expect("ssl: init failed"); - let tcp_stream = TcpStream::connect("discord.com:443").expect("api: connect error"); - let mut stream = SslStream::new(ssl, tcp_stream).expect("ssl: stream init failed"); - - let mut form = FormData::new(Vec::new()); - - form.write_file( - "payload_json", - Cursor::new( - stringify!({ - "content": "{content}", - "attachments": [ - { - "id": 0, - "filename": "projbot3.gif" - } - ] - }) - .replace("{content}", content), - ), - None, - "application/json", - ) - .expect("form: payload_json failed"); - form.write_file( - "files[0]", - Cursor::new(self.bytes.as_slice()), - Some(OsStr::new("projbot3.gif")), - "image/gif", - ) - .expect("form: attachment failed"); - let mut data = form.finish().expect("form: finish failed"); - - stream.connect().expect("api: connection failed"); - stream - .write_all( - format!( - "PATCH /api/v10/channels/{}/messages/{message} HTTP/1.1\n", - &self.channel - ) - .as_bytes(), - ) - .expect("api: write failed"); - stream - .write_all( - "Host: discord.com\nUser-Agent: projbot3 image uploader (tudbut@tudbut.de)\n" - .as_bytes(), - ) - .expect("api: write failed"); - stream - .write_all(format!("Content-Length: {}\n", data.len()).as_bytes()) - .expect("api: write failed"); - stream - .write_all(format!("Content-Type: {}\n", form.content_type_header()).as_bytes()) - .expect("api: write failed"); - stream - .write_all(format!("Authorization: Bot {}\n\n", token).as_bytes()) - .expect("api: write failed"); - - // remove the last byte and cache it in the frame object for later write finish - self.byte_to_write = Some( - *data - .last() - .expect("form: empty array returned (finish failed)"), - ); - data.remove(data.len() - 1); - - stream - .write_all(data.as_slice()) - .expect("api: write failed"); - stream.flush().expect("api: flush failed"); - - self.cache_stream = Some(stream); - // now the frame is ready to send the next part - } - - fn complete_send(&mut self) { - let cache_stream = &mut self.cache_stream; - let byte_to_write = &self.byte_to_write; - if let Some(stream) = cache_stream { - if let Some(byte) = byte_to_write { - stream - .write_all(&[*byte]) - .expect("api: write failed at complete_send"); - stream.flush().expect("api: flush failed"); - stream - .get_ref() - .set_read_timeout(Some(Duration::from_millis(500))) - .expect("tcp: unable to set timeout"); - let mut buf = Vec::new(); - let _ = stream.read_to_end(&mut buf); // failure is normal - stream.shutdown().expect("ssl: shutdown failed"); - stream - .get_ref() - .shutdown(Shutdown::Both) - .expect("tcp: shutdown failed"); - self.cache_stream = None; - self.byte_to_write = None; - return; - } - } - panic!("complete_send called on uncached frame!"); - } -} - -async fn send_frames(message: Message, ctx: Context) { +async fn send_video(message: Message, ctx: Context) { use tokio::sync::Mutex; + // Read all frames from vid_encoded let mut v: Vec = Vec::new(); let dir = fs::read_dir("vid_encoded") .expect("unable to read dir") @@ -159,13 +41,10 @@ async fn send_frames(message: Message, ctx: Context) { let mut buf = Vec::new(); file.read_to_end(&mut buf) .expect("readvid: unable to read file"); - v.push(Frame { - bytes: buf, - channel: message.channel_id.0, - cache_stream: None, - byte_to_write: None, - }); + v.push(Frame::new(buf, message.channel_id.0)); } + + // Get serenity and songbird items let guild_id = message.guild_id.unwrap(); let http = ctx.http.clone(); let songbird = songbird::get(&ctx) @@ -173,6 +52,8 @@ async fn send_frames(message: Message, ctx: Context) { .expect("voice: unable to initialize songbird"); let c0: Arc>> = Arc::new(Mutex::new(None)); let c1 = c0.clone(); + + // Spawn task to send video tokio::spawn(async move { let message = message; let ctx = ctx; @@ -187,6 +68,8 @@ async fn send_frames(message: Message, ctx: Context) { ) .await .expect("discord: unable to send"); + // Spawn task to send audio - This has to be here, because this is also where the timer is + // started tokio::spawn(async move { let sa = unix_millis(); println!("voice: init"); @@ -226,10 +109,15 @@ async fn send_frames(message: Message, ctx: Context) { println!("voice: playing"); handle.play().expect("voice: unable to play"); }); + + // Initialize and start timing let mut sa = unix_millis(); let mut to_compensate_for = 0; let mut free_time = 0; + + // Send frames (5 second long gifs) for mut frame in v.by_ref() { + // Upload the frame to the API, but don't finish off the request. println!("vid: caching"); let token = token.clone(); let mut frame = tokio::task::spawn_blocking(move || { @@ -240,14 +128,20 @@ async fn send_frames(message: Message, ctx: Context) { ); frame }).await.unwrap(); + + // Get recent messages let msgs = n .channel_id .messages_iter(&ctx.http) .take(30) .collect::>() .await; + + // Do timing for good synchronization and commands println!("vid: waiting"); let mut to_sleep = 5000 - ((unix_millis() - sa) as i128); + // Check for commands (timing this is required because there are a few IO + // operations being awaited) sa = unix_millis(); if let Some(Ok(msg)) = msgs.iter().find(|x| x.as_ref().unwrap().content == "!stop") { msg.delete(&ctx.http) @@ -288,6 +182,9 @@ async fn send_frames(message: Message, ctx: Context) { .expect("discord: unable to send commannd response"); } to_sleep -= (unix_millis() - sa) as i128; + // Now factor in to_compensate_for + // Clippy doesn't like this, but it's the only way to do it in stable + #[allow(clippy::never_loop)] 'calc: loop { if to_sleep < 0 { to_compensate_for += -to_sleep; @@ -307,9 +204,12 @@ async fn send_frames(message: Message, ctx: Context) { break 'calc; } + // Set free_time to display free_time = to_sleep; tokio::time::sleep(Duration::from_millis(to_sleep as u64)).await; sa = unix_millis(); + + // Now complete the request. This allows each request to take O(1) time println!("vid: completing"); tokio::task::spawn_blocking(move || { frame.complete_send(); @@ -317,7 +217,11 @@ async fn send_frames(message: Message, ctx: Context) { .await .unwrap(); } + + // The last frame would immediately be deleted if we didn't wait here. tokio::time::sleep(Duration::from_millis(5000)).await; + + // Now clean up n.delete(&ctx.http) .await .expect("discord: unable to delete message"); @@ -339,16 +243,18 @@ impl EventHandler for Handler { } if message.content == "!play" { - send_frames(message, ctx).await; + send_video(message, ctx).await; } } } #[tokio::main(flavor = "multi_thread", worker_threads = 10)] async fn main() { + // If vid_encoded doesn't exist, convert vid.mp4 into vid_encoded if !Path::new("vid_encoded/").is_dir() { println!("encode: encoding video..."); if let Ok(_) = fs::create_dir("vid") { + // We're using ffmpeg commands because ffmpeg's api is a hunk of junk let mut command = process::Command::new("ffmpeg") .args([ "-i", @@ -393,6 +299,9 @@ async fn main() { fs::rename("aud.opus", "aud_encoded") .expect("encode: unable to move aud.opus to aud_encoded"); } + // ffmpeg is now done converting vid.mp4 into vid/*.png + + // Create vid_encoded and encode gifs into it let _ = fs::create_dir("vid_encoded"); let dir = fs::read_dir("vid") .expect("encode: unable to read files") @@ -403,6 +312,8 @@ async fn main() { *running.lock().unwrap() += 1; { let running = running.clone(); + // This thread will not interfere with tokio because it doesn't use anything async + // and will exit before anything important is done in tokio. thread::spawn(move || { let mut image = File::create(format!("vid_encoded/{n}")) .expect("encode: unable to create gif file"); @@ -410,6 +321,7 @@ async fn main() { Encoder::new(&mut image, 240, 180, &[]) .expect("encode: unable to create gif"), ); + // Write the gif control bytes encoder .as_mut() .unwrap() @@ -425,30 +337,39 @@ async fn main() { .unwrap() .set_repeat(gif::Repeat::Finite(0)) .expect("encode: unable to set repeat"); + // Encode frames into gif println!("encode: encoding {n}..."); for i in (n * (25 * 5))..dir { + // n number of previously encoded gifs * 25 frames per second * 5 seconds { - let i = i + 1; - let decoder = - Decoder::new(File::open(format!("vid/{}.png", i)).expect( - format!("encode: unable to read vid/{}.png", i).as_str(), - )); + let i = i + 1; // because ffmpeg starts counting at 1 :p + // Decode frame + let decoder = Decoder::new( + File::open(format!("vid/{i}.png")) + .expect(format!("encode: unable to read vid/{i}.png").as_str()), + ); let mut reader = decoder.read_info().expect( - format!("encode: invalid ffmpeg output in vid/{}.png", i).as_str(), + format!("encode: invalid ffmpeg output in vid/{i}.png").as_str(), ); let mut buf: Vec = vec![0; reader.output_buffer_size()]; let info = reader.next_frame(&mut buf).expect( - format!("encode: invalid ffmpeg output in vid/{}.png", i).as_str(), + format!("encode: invalid ffmpeg output in vid/{i}.png").as_str(), ); let bytes = &mut buf[..info.buffer_size()]; + // Encode frame let mut frame = gif::Frame::from_rgb(240, 180, bytes); + // The gif crate is a little weird with extension data, it writes a + // block for each frame, so we have to remind it of what we want again + // for each frame frame.delay = 4; + // Add to gif encoder .as_mut() .unwrap() .write_frame(&frame) .expect("encode: unable to encode frame to gif"); } + // We don't want to encode something that is supposed to go into the next frame if i / (25 * 5) != n { break; } @@ -457,6 +378,7 @@ async fn main() { println!("encode: encoded {n}"); }); } + // Always have 6 running, but no more while *running.lock().unwrap() >= 6 { tokio::time::sleep(Duration::from_millis(100)).await; } @@ -468,6 +390,7 @@ async fn main() { println!("encode: done"); } + // Start the discord bot let framework = StandardFramework::new().configure(|c| c.prefix("!")); let mut client = Client::builder( env::args() @@ -489,6 +412,7 @@ async fn main() { } } +// Helper function to get millis from unix epoch as a u64 fn unix_millis() -> u64 { SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH)