1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
|
use std::time::Duration;
use crate::{
app::App,
download::download_options::download_opts,
storage::video_database::{
downloader::{get_next_uncached_video, set_video_cache_path}, extractor_hash::ExtractorHash, getters::get_video_yt_dlp_opts, Video
},
};
use anyhow::{Context, Result};
use log::{debug, info};
use tokio::{task::JoinHandle, time};
mod download_options;
#[derive(Debug)]
pub struct CurrentDownload {
task_handle: JoinHandle<Result<()>>,
extractor_hash: ExtractorHash,
}
impl CurrentDownload {
fn new_from_video(video: Video) -> Self {
let extractor_hash = video.extractor_hash.clone();
let task_handle = tokio::spawn(async move {
// FIXME: Remove this app reconstruction <2024-07-29>
let new_app = App::new().await?;
Downloader::actually_cache_video(&new_app, &video)
.await
.with_context(|| format!("Failed to cache video: '{}'", video.title))?;
Ok(())
});
Self {
task_handle,
extractor_hash,
}
}
}
pub struct Downloader {
current_download: Option<CurrentDownload>,
}
impl Downloader {
pub fn new() -> Self {
Self {
current_download: None,
}
}
/// The entry point to the Downloader.
/// This Downloader will periodically check if the database has changed, and then also
/// change which videos it downloads.
/// This will run, until the database doesn't contain any watchable videos
pub async fn consume(&mut self, app: &App) -> Result<()> {
while let Some(next_video) = get_next_uncached_video(app).await? {
if let Some(_) = &self.current_download {
let current_download = self.current_download.take().expect("Is Some");
if current_download.task_handle.is_finished() {
current_download.task_handle.await??;
continue;
}
if next_video.extractor_hash != current_download.extractor_hash {
info!(
"Noticed, that the next video is not the video being downloaded, replacing it ('{}' vs. '{}')!",
next_video.extractor_hash.into_short_hash(app).await?, current_download.extractor_hash.into_short_hash(app).await?
);
// Replace the currently downloading video
current_download.task_handle.abort();
let new_current_download = CurrentDownload::new_from_video(next_video);
self.current_download = Some(new_current_download);
} else {
debug!(
"Currently downloading '{}'",
current_download.extractor_hash.into_short_hash(app).await?
);
// Reset the taken value
self.current_download = Some(current_download);
time::sleep(Duration::new(1, 0)).await;
}
} else {
info!(
"No video is being downloaded right now, setting it to '{}'",
next_video.title
);
let new_current_download = CurrentDownload::new_from_video(next_video);
self.current_download = Some(new_current_download);
}
// if get_allocated_cache().await? < CONCURRENT {
// .cache_video(next_video).await?;
// }
}
info!("Finished downloading!");
Ok(())
}
async fn actually_cache_video(app: &App, video: &Video) -> Result<()> {
debug!("Download started: {}", &video.title);
let addional_opts = get_video_yt_dlp_opts(&app, &video.extractor_hash).await?;
let result = yt_dlp::download(&[video.url.clone()], &download_opts(addional_opts))
.await
.with_context(|| format!("Failed to download video: '{}'", video.title))?;
assert_eq!(result.len(), 1);
let result = &result[0];
set_video_cache_path(app, &video.extractor_hash, Some(&result)).await?;
info!(
"Video '{}' was downlaoded to path: {}",
video.title,
result.display()
);
Ok(())
}
}
|