use std::{ io::{stderr, stdout, Read}, mem, path::PathBuf, process::Command, sync::mpsc::{self, Receiver, Sender}, thread::{self, JoinHandle}, }; use anyhow::{bail, Context, Result}; use log::debug; use crate::PlayThing; const YT_DLP_FLAGS: [&str; 12] = [ "--format", "bestvideo[height<=?1080]+bestaudio/best", "--embed-chapters", "--progress", "--write-comments", "--extractor-args", "youtube:max_comments=150,all,100;comment_sort=top", "--write-info-json", "--sponsorblock-mark", "default", "--sponsorblock-remove", "sponsor", ]; const CONCURRENT: u32 = 5; const DOWNLOAD_DIR: &str = "/tmp/ytcc"; pub struct Downloader { sent: usize, download_thread: JoinHandle>, orx: Receiver<(PathBuf, Option)>, itx: Option>, playspec: Vec, } impl Downloader { pub fn new(mut playspec: Vec) -> anyhow::Result { let (itx, irx): (Sender, Receiver) = mpsc::channel(); let (otx, orx) = mpsc::channel(); let jh = thread::spawn(move || -> Result<()> { while let Some(pt) = irx.recv().ok() { debug!("Got '{}|{}' to be downloaded", pt.url, pt.id.unwrap_or(0)); let path = download_url(&pt.url) .with_context(|| format!("Failed to download url: '{}'", &pt.url))?; otx.send((path, pt.id)).expect("Should not be dropped"); } debug!("Finished Downloading everything"); Ok(()) }); playspec.reverse(); let mut output = Downloader { sent: 0, download_thread: jh, orx, itx: Some(itx), playspec, }; if output.playspec.len() <= CONCURRENT as usize { output.add(output.playspec.len() as u32)?; } else { output.add(CONCURRENT)?; } Ok(output) } pub fn add(&mut self, number_to_add: u32) -> Result<()> { debug!("Adding {} to be downloaded concurrently", number_to_add); for _ in 0..number_to_add { let pt = self.playspec.pop().context("No more playthings to pop")?; self.itx.as_ref().expect("Should still be valid").send(pt)?; } Ok(()) } /// Return the next video already downloaded, will block until the download is complete pub fn next(&mut self) -> Option<(PathBuf, Option)> { debug!("Requesting next output"); match self.orx.recv() { Ok(ok) => { debug!("Output downloaded to: {}", ok.0.display()); self.sent += 1; if self.sent < self.playspec.len() { debug!("Will add 1"); self.add(1).ok()?; } else { debug!("Will drop sender"); let itx = mem::take(&mut self.itx); drop(itx) } debug!("Returning: {:#?}", ok); Some(ok) } Err(err) => { debug!("Recieved error while listening: {}", err); None } } } pub fn drop(self) -> anyhow::Result<()> { match self.download_thread.join() { Ok(ok) => ok, Err(err) => panic!("Can't join thread: '{:#?}'", err), } } } fn download_url(url: &str) -> Result { let output_file = tempfile::NamedTempFile::new().context("Failed to create tempfile")?; output_file .as_file() .set_len(0) .context("Failed to truncate temp-file")?; let mut yt_dlp = Command::new("yt-dlp"); yt_dlp.current_dir(DOWNLOAD_DIR); yt_dlp.stdout(stdout()); yt_dlp.stderr(stderr()); yt_dlp.args(YT_DLP_FLAGS); yt_dlp.args([ "--output", "%(channel)s/%(title)s.%(ext)s", url, "--print-to-file", "after_move:filepath", ]); yt_dlp.arg(output_file.path().as_os_str()); let status = yt_dlp.status().context("Failed to run yt_dlp")?; if let Some(code) = status.code() { if code != 0 { bail!("yt_dlp execution failed with error: '{}'", status); } } let mut path = String::new(); output_file .as_file() .read_to_string(&mut path) .context("Failed to read output file temp file")?; let path = path.trim(); Ok(path.into()) }