diff options
Diffstat (limited to 'pkgs/sources/yt/src/downloader.rs')
-rw-r--r-- | pkgs/sources/yt/src/downloader.rs | 212 |
1 files changed, 0 insertions, 212 deletions
diff --git a/pkgs/sources/yt/src/downloader.rs b/pkgs/sources/yt/src/downloader.rs deleted file mode 100644 index e915700d..00000000 --- a/pkgs/sources/yt/src/downloader.rs +++ /dev/null @@ -1,212 +0,0 @@ -use std::{ - fs::{self, canonicalize}, - io::{stderr, stdout, Read}, - mem, - os::unix::fs::symlink, - path::PathBuf, - process::Command, - sync::mpsc::{self, Receiver, Sender}, - thread::{self, JoinHandle}, -}; - -use anyhow::{bail, Context, Result}; -use log::{debug, error, warn}; -use url::Url; - -use crate::constants::{status_path, CONCURRENT, DOWNLOAD_DIR, MPV_FLAGS, YT_DLP_FLAGS}; - -#[derive(Debug)] -pub struct Downloadable { - pub url: Url, - pub id: Option<u32>, -} - -impl std::fmt::Display for Downloadable { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { - write!( - f, - "{}|{}", - self.url.as_str().replace('|', ";"), - self.id.unwrap_or(0), - ) - } -} - -pub struct Downloader { - sent: usize, - download_thread: JoinHandle<Result<()>>, - orx: Receiver<(PathBuf, Option<u32>)>, - itx: Option<Sender<Downloadable>>, - playspec: Vec<Downloadable>, -} - -impl Downloader { - pub fn new(mut playspec: Vec<Downloadable>) -> anyhow::Result<Downloader> { - let (itx, irx): (Sender<Downloadable>, Receiver<Downloadable>) = mpsc::channel(); - let (otx, orx) = mpsc::channel(); - let jh = thread::spawn(move || -> Result<()> { - while let Ok(pt) = irx.recv() { - debug!("Got '{}' to be downloaded", pt); - let path = download_url(&pt.url) - .with_context(|| format!("Failed to download url: '{}'", &pt.url))?; - otx.send((path, pt.id)).expect("Should not be dropped"); - } - debug!("Finished Downloading everything"); - Ok(()) - }); - - playspec.reverse(); - let mut output = Downloader { - sent: 0, - download_thread: jh, - orx, - itx: Some(itx), - playspec, - }; - if output.playspec.len() <= CONCURRENT as usize { - output.add(output.playspec.len() as u32)?; - } else { - output.add(CONCURRENT)?; - } - Ok(output) - } - - pub fn add(&mut self, number_to_add: u32) -> Result<()> { - debug!("Adding {} to be downloaded concurrently", number_to_add); - for _ in 0..number_to_add { - let pt = self.playspec.pop().expect("This call should be guarded"); - self.itx.as_ref().expect("Should still be valid").send(pt)?; - self.sent += 1; - } - Ok(()) - } - - /// Return the next video already downloaded, will block until the download is complete - pub fn next(&mut self) -> Option<(PathBuf, Option<u32>)> { - debug!("Requesting next output"); - match self.orx.recv() { - Ok(ok) => { - debug!("Output downloaded to: {}", ok.0.display()); - if !self.playspec.is_empty() { - self.add(1).ok()?; - } else { - debug!( - "Done sending videos to be downloaded, downoladed: {} videos", - self.sent - ); - let itx = mem::take(&mut self.itx); - drop(itx) - } - debug!("Returning: {}|{}", ok.0.display(), ok.1.unwrap_or(0)); - Some(ok) - } - Err(err) => { - debug!("Received error while listening: {}", err); - None - } - } - } - - pub fn drop(self) -> anyhow::Result<()> { - // Check that we really downloaded everything - assert_eq!(self.playspec.len(), 0); - match self.download_thread.join() { - Ok(ok) => ok, - Err(err) => panic!("Failed to join downloader thread: '{:#?}'", err), - } - } - - pub fn consume(mut self) -> anyhow::Result<()> { - while let Some((path, id)) = self.next() { - debug!("Next path to play is: '{}'", path.display()); - let mut info_json = canonicalize(&path).context("Failed to canoncialize path")?; - info_json.set_extension("info.json"); - - if status_path()?.is_symlink() { - fs::remove_file(status_path()?).context("Failed to delete old status file")?; - } else if !status_path()?.exists() { - debug!( - "The status path at '{}' does not exists", - status_path()?.display() - ); - } else { - bail!( - "The status path ('{}') is not a symlink but exists!", - status_path()?.display() - ); - } - - symlink(info_json, status_path()?).context("Failed to symlink")?; - - let mut mpv = Command::new("mpv"); - mpv.stdout(stdout()); - mpv.stderr(stderr()); - mpv.args(MPV_FLAGS); - // TODO: Set the title to the name of the video, not the path <2024-02-09> - // mpv.arg(format!("--title=")) - mpv.arg(&path); - - let status = mpv.status().context("Failed to run mpv")?; - if status.success() { - fs::remove_file(&path)?; - if let Some(id) = id { - println!("\x1b[32;1mMarking {} as watched!\x1b[0m", id); - let mut ytcc = std::process::Command::new("ytcc"); - ytcc.stdout(stdout()); - ytcc.stderr(stderr()); - ytcc.args(["mark"]); - ytcc.arg(id.to_string()); - let status = ytcc.status().context("Failed to run ytcc")?; - if let Some(code) = status.code() { - if code != 0 { - bail!("Ytcc failed with status: {}", code); - } - } - } - debug!("mpv exited with: '{}'", status); - } else { - warn!("mpv exited with: '{}'", status); - } - } - self.drop()?; - Ok(()) - } -} - -fn download_url(url: &Url) -> Result<PathBuf> { - let output_file = tempfile::NamedTempFile::new().context("Failed to create tempfile")?; - output_file - .as_file() - .set_len(0) - .context("Failed to truncate temp-file")?; - if !Into::<PathBuf>::into(DOWNLOAD_DIR).exists() { - fs::create_dir_all(DOWNLOAD_DIR) - .with_context(|| format!("Failed to create download dir at: {}", DOWNLOAD_DIR))? - } - let mut yt_dlp = Command::new("yt-dlp"); - yt_dlp.current_dir(DOWNLOAD_DIR); - yt_dlp.stdout(stdout()); - yt_dlp.stderr(stderr()); - yt_dlp.args(YT_DLP_FLAGS); - yt_dlp.args([ - "--output", - "%(channel)s/%(title)s.%(ext)s", - url.as_str(), - "--print-to-file", - "after_move:filepath", - ]); - yt_dlp.arg(output_file.path().as_os_str()); - - let status = yt_dlp.status().context("Failed to run yt-dlp")?; - if !status.success() { - error!("yt-dlp execution failed with error: '{}'", status); - } - - let mut path = String::new(); - output_file - .as_file() - .read_to_string(&mut path) - .context("Failed to read output file temp file")?; - let path = path.trim(); - Ok(path.into()) -} |