about summary refs log tree commit diff stats
path: root/pkgs/sources/yt/src/downloader.rs
diff options
context:
space:
mode:
Diffstat (limited to 'pkgs/sources/yt/src/downloader.rs')
-rw-r--r--pkgs/sources/yt/src/downloader.rs212
1 files changed, 212 insertions, 0 deletions
diff --git a/pkgs/sources/yt/src/downloader.rs b/pkgs/sources/yt/src/downloader.rs
new file mode 100644
index 00000000..e915700d
--- /dev/null
+++ b/pkgs/sources/yt/src/downloader.rs
@@ -0,0 +1,212 @@
+use std::{
+    fs::{self, canonicalize},
+    io::{stderr, stdout, Read},
+    mem,
+    os::unix::fs::symlink,
+    path::PathBuf,
+    process::Command,
+    sync::mpsc::{self, Receiver, Sender},
+    thread::{self, JoinHandle},
+};
+
+use anyhow::{bail, Context, Result};
+use log::{debug, error, warn};
+use url::Url;
+
+use crate::constants::{status_path, CONCURRENT, DOWNLOAD_DIR, MPV_FLAGS, YT_DLP_FLAGS};
+
+#[derive(Debug)]
+pub struct Downloadable {
+    pub url: Url,
+    pub id: Option<u32>,
+}
+
+impl std::fmt::Display for Downloadable {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
+        write!(
+            f,
+            "{}|{}",
+            self.url.as_str().replace('|', ";"),
+            self.id.unwrap_or(0),
+        )
+    }
+}
+
+pub struct Downloader {
+    sent: usize,
+    download_thread: JoinHandle<Result<()>>,
+    orx: Receiver<(PathBuf, Option<u32>)>,
+    itx: Option<Sender<Downloadable>>,
+    playspec: Vec<Downloadable>,
+}
+
+impl Downloader {
+    pub fn new(mut playspec: Vec<Downloadable>) -> anyhow::Result<Downloader> {
+        let (itx, irx): (Sender<Downloadable>, Receiver<Downloadable>) = mpsc::channel();
+        let (otx, orx) = mpsc::channel();
+        let jh = thread::spawn(move || -> Result<()> {
+            while let Ok(pt) = irx.recv() {
+                debug!("Got '{}' to be downloaded", pt);
+                let path = download_url(&pt.url)
+                    .with_context(|| format!("Failed to download url: '{}'", &pt.url))?;
+                otx.send((path, pt.id)).expect("Should not be dropped");
+            }
+            debug!("Finished Downloading everything");
+            Ok(())
+        });
+
+        playspec.reverse();
+        let mut output = Downloader {
+            sent: 0,
+            download_thread: jh,
+            orx,
+            itx: Some(itx),
+            playspec,
+        };
+        if output.playspec.len() <= CONCURRENT as usize {
+            output.add(output.playspec.len() as u32)?;
+        } else {
+            output.add(CONCURRENT)?;
+        }
+        Ok(output)
+    }
+
+    pub fn add(&mut self, number_to_add: u32) -> Result<()> {
+        debug!("Adding {} to be downloaded concurrently", number_to_add);
+        for _ in 0..number_to_add {
+            let pt = self.playspec.pop().expect("This call should be guarded");
+            self.itx.as_ref().expect("Should still be valid").send(pt)?;
+            self.sent += 1;
+        }
+        Ok(())
+    }
+
+    /// Return the next video already downloaded, will block until the download is complete
+    pub fn next(&mut self) -> Option<(PathBuf, Option<u32>)> {
+        debug!("Requesting next output");
+        match self.orx.recv() {
+            Ok(ok) => {
+                debug!("Output downloaded to: {}", ok.0.display());
+                if !self.playspec.is_empty() {
+                    self.add(1).ok()?;
+                } else {
+                    debug!(
+                        "Done sending videos to be downloaded, downoladed: {} videos",
+                        self.sent
+                    );
+                    let itx = mem::take(&mut self.itx);
+                    drop(itx)
+                }
+                debug!("Returning: {}|{}", ok.0.display(), ok.1.unwrap_or(0));
+                Some(ok)
+            }
+            Err(err) => {
+                debug!("Received error while listening: {}", err);
+                None
+            }
+        }
+    }
+
+    pub fn drop(self) -> anyhow::Result<()> {
+        // Check that we really downloaded everything
+        assert_eq!(self.playspec.len(), 0);
+        match self.download_thread.join() {
+            Ok(ok) => ok,
+            Err(err) => panic!("Failed to join downloader thread: '{:#?}'", err),
+        }
+    }
+
+    pub fn consume(mut self) -> anyhow::Result<()> {
+        while let Some((path, id)) = self.next() {
+            debug!("Next path to play is: '{}'", path.display());
+            let mut info_json = canonicalize(&path).context("Failed to canoncialize path")?;
+            info_json.set_extension("info.json");
+
+            if status_path()?.is_symlink() {
+                fs::remove_file(status_path()?).context("Failed to delete old status file")?;
+            } else if !status_path()?.exists() {
+                debug!(
+                    "The status path at '{}' does not exists",
+                    status_path()?.display()
+                );
+            } else {
+                bail!(
+                    "The status path ('{}') is not a symlink but exists!",
+                    status_path()?.display()
+                );
+            }
+
+            symlink(info_json, status_path()?).context("Failed to symlink")?;
+
+            let mut mpv = Command::new("mpv");
+            mpv.stdout(stdout());
+            mpv.stderr(stderr());
+            mpv.args(MPV_FLAGS);
+            // TODO: Set the title to the name of the video, not the path <2024-02-09>
+            // mpv.arg(format!("--title="))
+            mpv.arg(&path);
+
+            let status = mpv.status().context("Failed to run mpv")?;
+            if status.success() {
+                fs::remove_file(&path)?;
+                if let Some(id) = id {
+                    println!("\x1b[32;1mMarking {} as watched!\x1b[0m", id);
+                    let mut ytcc = std::process::Command::new("ytcc");
+                    ytcc.stdout(stdout());
+                    ytcc.stderr(stderr());
+                    ytcc.args(["mark"]);
+                    ytcc.arg(id.to_string());
+                    let status = ytcc.status().context("Failed to run ytcc")?;
+                    if let Some(code) = status.code() {
+                        if code != 0 {
+                            bail!("Ytcc failed with status: {}", code);
+                        }
+                    }
+                }
+                debug!("mpv exited with: '{}'", status);
+            } else {
+                warn!("mpv exited with: '{}'", status);
+            }
+        }
+        self.drop()?;
+        Ok(())
+    }
+}
+
+fn download_url(url: &Url) -> Result<PathBuf> {
+    let output_file = tempfile::NamedTempFile::new().context("Failed to create tempfile")?;
+    output_file
+        .as_file()
+        .set_len(0)
+        .context("Failed to truncate temp-file")?;
+    if !Into::<PathBuf>::into(DOWNLOAD_DIR).exists() {
+        fs::create_dir_all(DOWNLOAD_DIR)
+            .with_context(|| format!("Failed to create download dir at: {}", DOWNLOAD_DIR))?
+    }
+    let mut yt_dlp = Command::new("yt-dlp");
+    yt_dlp.current_dir(DOWNLOAD_DIR);
+    yt_dlp.stdout(stdout());
+    yt_dlp.stderr(stderr());
+    yt_dlp.args(YT_DLP_FLAGS);
+    yt_dlp.args([
+        "--output",
+        "%(channel)s/%(title)s.%(ext)s",
+        url.as_str(),
+        "--print-to-file",
+        "after_move:filepath",
+    ]);
+    yt_dlp.arg(output_file.path().as_os_str());
+
+    let status = yt_dlp.status().context("Failed to run yt-dlp")?;
+    if !status.success() {
+        error!("yt-dlp execution failed with error: '{}'", status);
+    }
+
+    let mut path = String::new();
+    output_file
+        .as_file()
+        .read_to_string(&mut path)
+        .context("Failed to read output file temp file")?;
+    let path = path.trim();
+    Ok(path.into())
+}