// yt - A fully featured command line YouTube client // // Copyright (C) 2024 Benedikt Peetz // SPDX-License-Identifier: GPL-3.0-or-later // // This file is part of Yt. // // You should have received a copy of the License along with this program. // If not, see . use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration}; use crate::{ app::App, download::download_options::download_opts, storage::video_database::{ downloader::{get_next_uncached_video, set_video_cache_path}, extractor_hash::ExtractorHash, getters::get_video_yt_dlp_opts, Video, YtDlpOptions, }, }; use anyhow::{bail, Context, Result}; use bytes::Bytes; use futures::{future::BoxFuture, FutureExt}; use log::{debug, error, info, warn}; use tokio::{fs, task::JoinHandle, time}; mod download_options; #[derive(Debug)] pub struct CurrentDownload { task_handle: JoinHandle>, extractor_hash: ExtractorHash, } impl CurrentDownload { fn new_from_video(app: Arc, video: Video) -> Self { let extractor_hash = video.extractor_hash.clone(); let task_handle = tokio::spawn(async move { Downloader::actually_cache_video(&app, &video) .await .with_context(|| format!("Failed to cache video: '{}'", video.title))?; Ok(()) }); Self { task_handle, extractor_hash, } } } enum CacheSizeCheck { /// The video can be downloaded Fits, /// The video and the current cache size together would exceed the size TooLarge, /// The video would not even fit into the empty cache ExceedsMaxCacheSize, } pub struct Downloader { current_download: Option, video_size_cache: HashMap, printed_warning: bool, cached_cache_allocation: Option, } impl Downloader { pub fn new() -> Self { Self { current_download: None, video_size_cache: HashMap::new(), printed_warning: false, cached_cache_allocation: None, } } /// Check if enough cache is available. Will wait for 10s if it's not. async fn is_enough_cache_available( &mut self, app: &App, max_cache_size: u64, next_video: &Video, ) -> Result { if let Some(cdownload) = &self.current_download { if cdownload.extractor_hash == next_video.extractor_hash { // If the video is already being downloaded it will always fit. Otherwise the // download would not have been started. return Ok(CacheSizeCheck::Fits); } } let cache_allocation = Self::get_current_cache_allocation(&app).await?; let video_size = self.get_approx_video_size(&app, &next_video).await?; if video_size >= max_cache_size { error!( "The video '{}' ({}) exceeds the maximum cache size ({})! \ Please set a bigger maximum (`--max-cache-size`) or skip it.", next_video.title, Bytes::new(video_size), Bytes::new(max_cache_size) ); return Ok(CacheSizeCheck::ExceedsMaxCacheSize); } if cache_allocation + video_size >= max_cache_size { if !self.printed_warning { warn!( "Can't download video: '{}' ({}) as it's too large for the cache ({} of {} allocated). \ Waiting for cache size reduction..", next_video.title, Bytes::new(video_size), Bytes::new(cache_allocation), Bytes::new(max_cache_size) ); self.printed_warning = true; } if let Some(cca) = self.cached_cache_allocation { if cca != cache_allocation { warn!( "Current cache size has changed, it's now: '{}'", Bytes::new(cache_allocation) ); self.cached_cache_allocation = Some(cache_allocation); } } else { info!( "Current cache size allocation: '{}'", Bytes::new(cache_allocation) ); self.cached_cache_allocation = Some(cache_allocation); } // Wait and hope, that a large video is deleted from the cache. time::sleep(Duration::from_secs(10)).await; Ok(CacheSizeCheck::TooLarge) } else { self.printed_warning = false; Ok(CacheSizeCheck::Fits) } } /// The entry point to the Downloader. /// This Downloader will periodically check if the database has changed, and then also /// change which videos it downloads. /// This will run, until the database doesn't contain any watchable videos pub async fn consume(&mut self, app: Arc, max_cache_size: u64) -> Result<()> { while let Some(next_video) = get_next_uncached_video(&app).await? { match self .is_enough_cache_available(&app, max_cache_size, &next_video) .await? { CacheSizeCheck::Fits => (), CacheSizeCheck::TooLarge => continue, CacheSizeCheck::ExceedsMaxCacheSize => bail!("Giving up."), }; if let Some(_) = &self.current_download { let current_download = self.current_download.take().expect("Is Some"); if current_download.task_handle.is_finished() { current_download.task_handle.await??; continue; } if next_video.extractor_hash != current_download.extractor_hash { info!( "Noticed, that the next video is not the video being downloaded, replacing it ('{}' vs. '{}')!", next_video.extractor_hash.into_short_hash(&app).await?, current_download.extractor_hash.into_short_hash(&app).await? ); // Replace the currently downloading video current_download.task_handle.abort(); let new_current_download = CurrentDownload::new_from_video(Arc::clone(&app), next_video); self.current_download = Some(new_current_download); } else { // Reset the taken value self.current_download = Some(current_download); } } else { info!( "No video is being downloaded right now, setting it to '{}'", next_video.title ); let new_current_download = CurrentDownload::new_from_video(Arc::clone(&app), next_video); self.current_download = Some(new_current_download); } time::sleep(Duration::new(1, 0)).await; } info!("Finished downloading!"); Ok(()) } pub async fn get_current_cache_allocation(app: &App) -> Result { fn dir_size(mut dir: fs::ReadDir) -> BoxFuture<'static, Result> { async move { let mut acc = 0; while let Some(entry) = dir.next_entry().await? { let size = match entry.metadata().await? { data if data.is_dir() => { let path = entry.path(); let read_dir = fs::read_dir(path).await?; dir_size(read_dir).await? } data => data.len(), }; acc += size; } Ok(acc) } .boxed() } dir_size(fs::read_dir(&app.config.paths.download_dir).await?).await } async fn get_approx_video_size(&mut self, app: &App, video: &Video) -> Result { if let Some(value) = self.video_size_cache.get(&video.extractor_hash) { Ok(*value) } else { // the subtitle file size should be negligible let add_opts = YtDlpOptions { subtitle_langs: "".to_owned(), }; let opts = &download_opts(&app, add_opts); let result = yt_dlp::extract_info(&opts, &video.url, false, true) .await .with_context(|| { format!("Failed to extract video information: '{}'", video.title) })?; let size = if let Some(val) = result.filesize { val } else if let Some(val) = result.filesize_approx { val } else if result.duration.is_some() && result.tbr.is_some() { let duration = result.duration.expect("Is some").ceil() as u64; // TODO: yt_dlp gets this from the format let tbr = result.tbr.expect("Is Some").ceil() as u64; duration * tbr * (1000 / 8) } else { let hardcoded_default = Bytes::from_str("250 MiB").expect("This is hardcoded"); error!( "Failed to find a filesize for video: '{}' (Using hardcoded value of {})", video.title, hardcoded_default ); hardcoded_default.as_u64() }; assert_eq!( self.video_size_cache .insert(video.extractor_hash.clone(), size), None ); Ok(size) } } async fn actually_cache_video(app: &App, video: &Video) -> Result<()> { debug!("Download started: {}", &video.title); let addional_opts = get_video_yt_dlp_opts(&app, &video.extractor_hash).await?; let result = yt_dlp::download(&[video.url.clone()], &download_opts(&app, addional_opts)) .await .with_context(|| format!("Failed to download video: '{}'", video.title))?; assert_eq!(result.len(), 1); let result = &result[0]; set_video_cache_path(app, &video.extractor_hash, Some(&result)).await?; info!( "Video '{}' was downlaoded to path: {}", video.title, result.display() ); Ok(()) } }