// yt - A fully featured command line YouTube client // // Copyright (C) 2024 Benedikt Peetz // SPDX-License-Identifier: GPL-3.0-or-later // // This file is part of Yt. // // You should have received a copy of the License along with this program. // If not, see . use std::{sync::Arc, time::Duration}; use crate::{ app::App, download::download_options::download_opts, storage::video_database::{ downloader::{get_next_uncached_video, set_video_cache_path}, extractor_hash::ExtractorHash, getters::get_video_yt_dlp_opts, Video, }, }; use anyhow::{Context, Result}; use log::{debug, info}; use tokio::{task::JoinHandle, time}; mod download_options; #[derive(Debug)] pub struct CurrentDownload { task_handle: JoinHandle>, extractor_hash: ExtractorHash, } impl CurrentDownload { fn new_from_video(app: Arc, video: Video) -> Self { let extractor_hash = video.extractor_hash.clone(); let task_handle = tokio::spawn(async move { Downloader::actually_cache_video(&app, &video) .await .with_context(|| format!("Failed to cache video: '{}'", video.title))?; Ok(()) }); Self { task_handle, extractor_hash, } } } pub struct Downloader { current_download: Option, } impl Downloader { pub fn new() -> Self { Self { current_download: None, } } /// The entry point to the Downloader. /// This Downloader will periodically check if the database has changed, and then also /// change which videos it downloads. /// This will run, until the database doesn't contain any watchable videos pub async fn consume(&mut self, app: Arc, max_cache_size: u64) -> Result<()> { while let Some(next_video) = get_next_uncached_video(app).await? { if let Some(_) = &self.current_download { let current_download = self.current_download.take().expect("Is Some"); if current_download.task_handle.is_finished() { current_download.task_handle.await??; continue; } if next_video.extractor_hash != current_download.extractor_hash { info!( "Noticed, that the next video is not the video being downloaded, replacing it ('{}' vs. '{}')!", next_video.extractor_hash.into_short_hash(&app).await?, current_download.extractor_hash.into_short_hash(&app).await? ); // Replace the currently downloading video current_download.task_handle.abort(); let new_current_download = CurrentDownload::new_from_video(Arc::clone(&app), next_video); self.current_download = Some(new_current_download); } else { debug!( "Currently downloading '{}'", current_download .extractor_hash .into_short_hash(&app) .await? ); // Reset the taken value self.current_download = Some(current_download); time::sleep(Duration::new(1, 0)).await; } } else { info!( "No video is being downloaded right now, setting it to '{}'", next_video.title ); let new_current_download = CurrentDownload::new_from_video(Arc::clone(&app), next_video); self.current_download = Some(new_current_download); } // if get_allocated_cache().await? < CONCURRENT { // .cache_video(next_video).await?; // } } info!("Finished downloading!"); Ok(()) } async fn actually_cache_video(app: &App, video: &Video) -> Result<()> { debug!("Download started: {}", &video.title); let addional_opts = get_video_yt_dlp_opts(&app, &video.extractor_hash).await?; let result = yt_dlp::download(&[video.url.clone()], &download_opts(addional_opts)) .await .with_context(|| format!("Failed to download video: '{}'", video.title))?; assert_eq!(result.len(), 1); let result = &result[0]; set_video_cache_path(app, &video.extractor_hash, Some(&result)).await?; info!( "Video '{}' was downlaoded to path: {}", video.title, result.display() ); Ok(()) } }