comment code and refactor

This commit is contained in:
Daniella 2022-10-15 13:42:55 +02:00
parent 581396c831
commit 52c2ec7769
2 changed files with 204 additions and 146 deletions

134
src/frame.rs Normal file
View file

@ -0,0 +1,134 @@
use form_data_builder::FormData;
use openssl::ssl::{Ssl, SslContext, SslMethod, SslStream};
use std::{
ffi::OsStr,
io::{Cursor, Read, Write},
net::{Shutdown, TcpStream},
time::Duration,
};
pub struct Frame {
pub bytes: Vec<u8>,
pub channel: u64,
cache_stream: Option<SslStream<TcpStream>>,
byte_to_write: Option<u8>,
}
impl Frame {
pub fn new(bytes: Vec<u8>, channel: u64) -> Frame {
Frame {
bytes,
channel,
cache_stream: None,
byte_to_write: None,
}
}
pub fn cache_frame(&mut self, message: u64, content: &str, token: &str) {
let ssl_context = SslContext::builder(SslMethod::tls_client())
.expect("ssl: context init failed")
.build();
let ssl = Ssl::new(&ssl_context).expect("ssl: init failed");
let tcp_stream = TcpStream::connect("discord.com:443").expect("api: connect error");
let mut stream = SslStream::new(ssl, tcp_stream).expect("ssl: stream init failed");
let mut form = FormData::new(Vec::new());
form.write_file(
"payload_json",
Cursor::new(
stringify!({
"content": "{content}",
"attachments": [
{
"id": 0,
"filename": "projbot3.gif"
}
]
})
.replace("{content}", content),
),
None,
"application/json",
)
.expect("form: payload_json failed");
form.write_file(
"files[0]",
Cursor::new(self.bytes.as_slice()),
Some(OsStr::new("projbot3.gif")),
"image/gif",
)
.expect("form: attachment failed");
let mut data = form.finish().expect("form: finish failed");
stream.connect().expect("api: connection failed");
stream
.write_all(
format!(
"PATCH /api/v10/channels/{}/messages/{message} HTTP/1.1\n",
&self.channel
)
.as_bytes(),
)
.expect("api: write failed");
stream
.write_all(
"Host: discord.com\nUser-Agent: projbot3 image uploader (tudbut@tudbut.de)\n"
.as_bytes(),
)
.expect("api: write failed");
stream
.write_all(format!("Content-Length: {}\n", data.len()).as_bytes())
.expect("api: write failed");
stream
.write_all(format!("Content-Type: {}\n", form.content_type_header()).as_bytes())
.expect("api: write failed");
stream
.write_all(format!("Authorization: Bot {}\n\n", token).as_bytes())
.expect("api: write failed");
// remove the last byte and cache it in the frame object for later write finish
self.byte_to_write = Some(
*data
.last()
.expect("form: empty array returned (finish failed)"),
);
data.remove(data.len() - 1);
stream
.write_all(data.as_slice())
.expect("api: write failed");
stream.flush().expect("api: flush failed");
self.cache_stream = Some(stream);
// now the frame is ready to send the next part
}
pub fn complete_send(&mut self) {
let cache_stream = &mut self.cache_stream;
let byte_to_write = &self.byte_to_write;
if let Some(stream) = cache_stream {
if let Some(byte) = byte_to_write {
stream
.write_all(&[*byte])
.expect("api: write failed at complete_send");
stream.flush().expect("api: flush failed");
stream
.get_ref()
.set_read_timeout(Some(Duration::from_millis(500)))
.expect("tcp: unable to set timeout");
let mut buf = Vec::new();
let _ = stream.read_to_end(&mut buf); // failure is normal
stream.shutdown().expect("ssl: shutdown failed");
stream
.get_ref()
.shutdown(Shutdown::Both)
.expect("tcp: shutdown failed");
self.cache_stream = None;
self.byte_to_write = None;
return;
}
}
panic!("complete_send called on uncached frame!");
}
}

View file

@ -1,19 +1,7 @@
use std::{ mod frame;
env,
ffi::OsStr,
fs::{self, File, OpenOptions},
io::{Cursor, Read, Write},
net::{Shutdown, TcpStream},
path::Path,
process::{self, Stdio},
sync::{Arc, Mutex},
thread,
time::{Duration, SystemTime},
};
use form_data_builder::FormData; use crate::frame::Frame;
use gif::Encoder; use gif::Encoder;
use openssl::ssl::{Ssl, SslContext, SslMethod, SslStream};
use png::Decoder; use png::Decoder;
use serenity::{ use serenity::{
async_trait, async_trait,
@ -24,127 +12,21 @@ use serenity::{
Client, Client,
}; };
use songbird::SerenityInit; use songbird::SerenityInit;
use std::{
env,
fs::{self, File, OpenOptions},
io::Read,
path::Path,
process::{self, Stdio},
sync::{Arc, Mutex},
thread,
time::{Duration, SystemTime},
};
struct Frame { async fn send_video(message: Message, ctx: Context) {
bytes: Vec<u8>,
channel: u64,
cache_stream: Option<SslStream<TcpStream>>,
byte_to_write: Option<u8>,
}
impl Frame {
fn cache_frame(&mut self, message: u64, content: &str, token: &str) {
let ssl_context = SslContext::builder(SslMethod::tls_client())
.expect("ssl: context init failed")
.build();
let ssl = Ssl::new(&ssl_context).expect("ssl: init failed");
let tcp_stream = TcpStream::connect("discord.com:443").expect("api: connect error");
let mut stream = SslStream::new(ssl, tcp_stream).expect("ssl: stream init failed");
let mut form = FormData::new(Vec::new());
form.write_file(
"payload_json",
Cursor::new(
stringify!({
"content": "{content}",
"attachments": [
{
"id": 0,
"filename": "projbot3.gif"
}
]
})
.replace("{content}", content),
),
None,
"application/json",
)
.expect("form: payload_json failed");
form.write_file(
"files[0]",
Cursor::new(self.bytes.as_slice()),
Some(OsStr::new("projbot3.gif")),
"image/gif",
)
.expect("form: attachment failed");
let mut data = form.finish().expect("form: finish failed");
stream.connect().expect("api: connection failed");
stream
.write_all(
format!(
"PATCH /api/v10/channels/{}/messages/{message} HTTP/1.1\n",
&self.channel
)
.as_bytes(),
)
.expect("api: write failed");
stream
.write_all(
"Host: discord.com\nUser-Agent: projbot3 image uploader (tudbut@tudbut.de)\n"
.as_bytes(),
)
.expect("api: write failed");
stream
.write_all(format!("Content-Length: {}\n", data.len()).as_bytes())
.expect("api: write failed");
stream
.write_all(format!("Content-Type: {}\n", form.content_type_header()).as_bytes())
.expect("api: write failed");
stream
.write_all(format!("Authorization: Bot {}\n\n", token).as_bytes())
.expect("api: write failed");
// remove the last byte and cache it in the frame object for later write finish
self.byte_to_write = Some(
*data
.last()
.expect("form: empty array returned (finish failed)"),
);
data.remove(data.len() - 1);
stream
.write_all(data.as_slice())
.expect("api: write failed");
stream.flush().expect("api: flush failed");
self.cache_stream = Some(stream);
// now the frame is ready to send the next part
}
fn complete_send(&mut self) {
let cache_stream = &mut self.cache_stream;
let byte_to_write = &self.byte_to_write;
if let Some(stream) = cache_stream {
if let Some(byte) = byte_to_write {
stream
.write_all(&[*byte])
.expect("api: write failed at complete_send");
stream.flush().expect("api: flush failed");
stream
.get_ref()
.set_read_timeout(Some(Duration::from_millis(500)))
.expect("tcp: unable to set timeout");
let mut buf = Vec::new();
let _ = stream.read_to_end(&mut buf); // failure is normal
stream.shutdown().expect("ssl: shutdown failed");
stream
.get_ref()
.shutdown(Shutdown::Both)
.expect("tcp: shutdown failed");
self.cache_stream = None;
self.byte_to_write = None;
return;
}
}
panic!("complete_send called on uncached frame!");
}
}
async fn send_frames(message: Message, ctx: Context) {
use tokio::sync::Mutex; use tokio::sync::Mutex;
// Read all frames from vid_encoded
let mut v: Vec<Frame> = Vec::new(); let mut v: Vec<Frame> = Vec::new();
let dir = fs::read_dir("vid_encoded") let dir = fs::read_dir("vid_encoded")
.expect("unable to read dir") .expect("unable to read dir")
@ -159,13 +41,10 @@ async fn send_frames(message: Message, ctx: Context) {
let mut buf = Vec::new(); let mut buf = Vec::new();
file.read_to_end(&mut buf) file.read_to_end(&mut buf)
.expect("readvid: unable to read file"); .expect("readvid: unable to read file");
v.push(Frame { v.push(Frame::new(buf, message.channel_id.0));
bytes: buf,
channel: message.channel_id.0,
cache_stream: None,
byte_to_write: None,
});
} }
// Get serenity and songbird items
let guild_id = message.guild_id.unwrap(); let guild_id = message.guild_id.unwrap();
let http = ctx.http.clone(); let http = ctx.http.clone();
let songbird = songbird::get(&ctx) let songbird = songbird::get(&ctx)
@ -173,6 +52,8 @@ async fn send_frames(message: Message, ctx: Context) {
.expect("voice: unable to initialize songbird"); .expect("voice: unable to initialize songbird");
let c0: Arc<Mutex<Option<ChannelId>>> = Arc::new(Mutex::new(None)); let c0: Arc<Mutex<Option<ChannelId>>> = Arc::new(Mutex::new(None));
let c1 = c0.clone(); let c1 = c0.clone();
// Spawn task to send video
tokio::spawn(async move { tokio::spawn(async move {
let message = message; let message = message;
let ctx = ctx; let ctx = ctx;
@ -187,6 +68,8 @@ async fn send_frames(message: Message, ctx: Context) {
) )
.await .await
.expect("discord: unable to send"); .expect("discord: unable to send");
// Spawn task to send audio - This has to be here, because this is also where the timer is
// started
tokio::spawn(async move { tokio::spawn(async move {
let sa = unix_millis(); let sa = unix_millis();
println!("voice: init"); println!("voice: init");
@ -226,10 +109,15 @@ async fn send_frames(message: Message, ctx: Context) {
println!("voice: playing"); println!("voice: playing");
handle.play().expect("voice: unable to play"); handle.play().expect("voice: unable to play");
}); });
// Initialize and start timing
let mut sa = unix_millis(); let mut sa = unix_millis();
let mut to_compensate_for = 0; let mut to_compensate_for = 0;
let mut free_time = 0; let mut free_time = 0;
// Send frames (5 second long gifs)
for mut frame in v.by_ref() { for mut frame in v.by_ref() {
// Upload the frame to the API, but don't finish off the request.
println!("vid: caching"); println!("vid: caching");
let token = token.clone(); let token = token.clone();
let mut frame = tokio::task::spawn_blocking(move || { let mut frame = tokio::task::spawn_blocking(move || {
@ -240,14 +128,20 @@ async fn send_frames(message: Message, ctx: Context) {
); );
frame frame
}).await.unwrap(); }).await.unwrap();
// Get recent messages
let msgs = n let msgs = n
.channel_id .channel_id
.messages_iter(&ctx.http) .messages_iter(&ctx.http)
.take(30) .take(30)
.collect::<Vec<_>>() .collect::<Vec<_>>()
.await; .await;
// Do timing for good synchronization and commands
println!("vid: waiting"); println!("vid: waiting");
let mut to_sleep = 5000 - ((unix_millis() - sa) as i128); let mut to_sleep = 5000 - ((unix_millis() - sa) as i128);
// Check for commands (timing this is required because there are a few IO
// operations being awaited)
sa = unix_millis(); sa = unix_millis();
if let Some(Ok(msg)) = msgs.iter().find(|x| x.as_ref().unwrap().content == "!stop") { if let Some(Ok(msg)) = msgs.iter().find(|x| x.as_ref().unwrap().content == "!stop") {
msg.delete(&ctx.http) msg.delete(&ctx.http)
@ -288,6 +182,9 @@ async fn send_frames(message: Message, ctx: Context) {
.expect("discord: unable to send commannd response"); .expect("discord: unable to send commannd response");
} }
to_sleep -= (unix_millis() - sa) as i128; to_sleep -= (unix_millis() - sa) as i128;
// Now factor in to_compensate_for
// Clippy doesn't like this, but it's the only way to do it in stable
#[allow(clippy::never_loop)]
'calc: loop { 'calc: loop {
if to_sleep < 0 { if to_sleep < 0 {
to_compensate_for += -to_sleep; to_compensate_for += -to_sleep;
@ -307,9 +204,12 @@ async fn send_frames(message: Message, ctx: Context) {
break 'calc; break 'calc;
} }
// Set free_time to display
free_time = to_sleep; free_time = to_sleep;
tokio::time::sleep(Duration::from_millis(to_sleep as u64)).await; tokio::time::sleep(Duration::from_millis(to_sleep as u64)).await;
sa = unix_millis(); sa = unix_millis();
// Now complete the request. This allows each request to take O(1) time
println!("vid: completing"); println!("vid: completing");
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
frame.complete_send(); frame.complete_send();
@ -317,7 +217,11 @@ async fn send_frames(message: Message, ctx: Context) {
.await .await
.unwrap(); .unwrap();
} }
// The last frame would immediately be deleted if we didn't wait here.
tokio::time::sleep(Duration::from_millis(5000)).await; tokio::time::sleep(Duration::from_millis(5000)).await;
// Now clean up
n.delete(&ctx.http) n.delete(&ctx.http)
.await .await
.expect("discord: unable to delete message"); .expect("discord: unable to delete message");
@ -339,16 +243,18 @@ impl EventHandler for Handler {
} }
if message.content == "!play" { if message.content == "!play" {
send_frames(message, ctx).await; send_video(message, ctx).await;
} }
} }
} }
#[tokio::main(flavor = "multi_thread", worker_threads = 10)] #[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() { async fn main() {
// If vid_encoded doesn't exist, convert vid.mp4 into vid_encoded
if !Path::new("vid_encoded/").is_dir() { if !Path::new("vid_encoded/").is_dir() {
println!("encode: encoding video..."); println!("encode: encoding video...");
if let Ok(_) = fs::create_dir("vid") { if let Ok(_) = fs::create_dir("vid") {
// We're using ffmpeg commands because ffmpeg's api is a hunk of junk
let mut command = process::Command::new("ffmpeg") let mut command = process::Command::new("ffmpeg")
.args([ .args([
"-i", "-i",
@ -393,6 +299,9 @@ async fn main() {
fs::rename("aud.opus", "aud_encoded") fs::rename("aud.opus", "aud_encoded")
.expect("encode: unable to move aud.opus to aud_encoded"); .expect("encode: unable to move aud.opus to aud_encoded");
} }
// ffmpeg is now done converting vid.mp4 into vid/*.png
// Create vid_encoded and encode gifs into it
let _ = fs::create_dir("vid_encoded"); let _ = fs::create_dir("vid_encoded");
let dir = fs::read_dir("vid") let dir = fs::read_dir("vid")
.expect("encode: unable to read files") .expect("encode: unable to read files")
@ -403,6 +312,8 @@ async fn main() {
*running.lock().unwrap() += 1; *running.lock().unwrap() += 1;
{ {
let running = running.clone(); let running = running.clone();
// This thread will not interfere with tokio because it doesn't use anything async
// and will exit before anything important is done in tokio.
thread::spawn(move || { thread::spawn(move || {
let mut image = File::create(format!("vid_encoded/{n}")) let mut image = File::create(format!("vid_encoded/{n}"))
.expect("encode: unable to create gif file"); .expect("encode: unable to create gif file");
@ -410,6 +321,7 @@ async fn main() {
Encoder::new(&mut image, 240, 180, &[]) Encoder::new(&mut image, 240, 180, &[])
.expect("encode: unable to create gif"), .expect("encode: unable to create gif"),
); );
// Write the gif control bytes
encoder encoder
.as_mut() .as_mut()
.unwrap() .unwrap()
@ -425,30 +337,39 @@ async fn main() {
.unwrap() .unwrap()
.set_repeat(gif::Repeat::Finite(0)) .set_repeat(gif::Repeat::Finite(0))
.expect("encode: unable to set repeat"); .expect("encode: unable to set repeat");
// Encode frames into gif
println!("encode: encoding {n}..."); println!("encode: encoding {n}...");
for i in (n * (25 * 5))..dir { for i in (n * (25 * 5))..dir {
// n number of previously encoded gifs * 25 frames per second * 5 seconds
{ {
let i = i + 1; let i = i + 1; // because ffmpeg starts counting at 1 :p
let decoder = // Decode frame
Decoder::new(File::open(format!("vid/{}.png", i)).expect( let decoder = Decoder::new(
format!("encode: unable to read vid/{}.png", i).as_str(), File::open(format!("vid/{i}.png"))
)); .expect(format!("encode: unable to read vid/{i}.png").as_str()),
);
let mut reader = decoder.read_info().expect( let mut reader = decoder.read_info().expect(
format!("encode: invalid ffmpeg output in vid/{}.png", i).as_str(), format!("encode: invalid ffmpeg output in vid/{i}.png").as_str(),
); );
let mut buf: Vec<u8> = vec![0; reader.output_buffer_size()]; let mut buf: Vec<u8> = vec![0; reader.output_buffer_size()];
let info = reader.next_frame(&mut buf).expect( let info = reader.next_frame(&mut buf).expect(
format!("encode: invalid ffmpeg output in vid/{}.png", i).as_str(), format!("encode: invalid ffmpeg output in vid/{i}.png").as_str(),
); );
let bytes = &mut buf[..info.buffer_size()]; let bytes = &mut buf[..info.buffer_size()];
// Encode frame
let mut frame = gif::Frame::from_rgb(240, 180, bytes); let mut frame = gif::Frame::from_rgb(240, 180, bytes);
// The gif crate is a little weird with extension data, it writes a
// block for each frame, so we have to remind it of what we want again
// for each frame
frame.delay = 4; frame.delay = 4;
// Add to gif
encoder encoder
.as_mut() .as_mut()
.unwrap() .unwrap()
.write_frame(&frame) .write_frame(&frame)
.expect("encode: unable to encode frame to gif"); .expect("encode: unable to encode frame to gif");
} }
// We don't want to encode something that is supposed to go into the next frame
if i / (25 * 5) != n { if i / (25 * 5) != n {
break; break;
} }
@ -457,6 +378,7 @@ async fn main() {
println!("encode: encoded {n}"); println!("encode: encoded {n}");
}); });
} }
// Always have 6 running, but no more
while *running.lock().unwrap() >= 6 { while *running.lock().unwrap() >= 6 {
tokio::time::sleep(Duration::from_millis(100)).await; tokio::time::sleep(Duration::from_millis(100)).await;
} }
@ -468,6 +390,7 @@ async fn main() {
println!("encode: done"); println!("encode: done");
} }
// Start the discord bot
let framework = StandardFramework::new().configure(|c| c.prefix("!")); let framework = StandardFramework::new().configure(|c| c.prefix("!"));
let mut client = Client::builder( let mut client = Client::builder(
env::args() env::args()
@ -489,6 +412,7 @@ async fn main() {
} }
} }
// Helper function to get millis from unix epoch as a u64
fn unix_millis() -> u64 { fn unix_millis() -> u64 {
SystemTime::now() SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH) .duration_since(SystemTime::UNIX_EPOCH)