1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
|
// yt - A fully featured command line YouTube client
//
// Copyright (C) 2024 Benedikt Peetz <benedikt.peetz@b-peetz.de>
// SPDX-License-Identifier: GPL-3.0-or-later
//
// This file is part of Yt.
//
// You should have received a copy of the License along with this program.
// If not, see <https://www.gnu.org/licenses/gpl-3.0.txt>.
use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration};
use crate::{
app::App,
download::download_options::download_opts,
storage::video_database::{
downloader::{get_next_uncached_video, set_video_cache_path},
extractor_hash::ExtractorHash,
getters::get_video_yt_dlp_opts,
Video, YtDlpOptions,
},
};
use anyhow::{bail, Context, Result};
use bytes::Bytes;
use futures::{future::BoxFuture, FutureExt};
use log::{debug, error, info, warn};
use tokio::{fs, task::JoinHandle, time};
mod download_options;
#[derive(Debug)]
pub struct CurrentDownload {
task_handle: JoinHandle<Result<()>>,
extractor_hash: ExtractorHash,
}
impl CurrentDownload {
fn new_from_video(app: Arc<App>, video: Video) -> Self {
let extractor_hash = video.extractor_hash.clone();
let task_handle = tokio::spawn(async move {
Downloader::actually_cache_video(&app, &video)
.await
.with_context(|| format!("Failed to cache video: '{}'", video.title))?;
Ok(())
});
Self {
task_handle,
extractor_hash,
}
}
}
enum CacheSizeCheck {
/// The video can be downloaded
Fits,
/// The video and the current cache size together would exceed the size
TooLarge,
/// The video would not even fit into the empty cache
ExceedsMaxCacheSize,
}
pub struct Downloader {
current_download: Option<CurrentDownload>,
video_size_cache: HashMap<ExtractorHash, u64>,
printed_warning: bool,
cached_cache_allocation: Option<u64>,
}
impl Default for Downloader {
fn default() -> Self {
Self::new()
}
}
impl Downloader {
pub fn new() -> Self {
Self {
current_download: None,
video_size_cache: HashMap::new(),
printed_warning: false,
cached_cache_allocation: None,
}
}
/// Check if enough cache is available. Will wait for 10s if it's not.
async fn is_enough_cache_available(
&mut self,
app: &App,
max_cache_size: u64,
next_video: &Video,
) -> Result<CacheSizeCheck> {
if let Some(cdownload) = &self.current_download {
if cdownload.extractor_hash == next_video.extractor_hash {
// If the video is already being downloaded it will always fit. Otherwise the
// download would not have been started.
return Ok(CacheSizeCheck::Fits);
}
}
let cache_allocation = Self::get_current_cache_allocation(app).await?;
let video_size = self.get_approx_video_size(app, next_video).await?;
if video_size >= max_cache_size {
error!(
"The video '{}' ({}) exceeds the maximum cache size ({})! \
Please set a bigger maximum (`--max-cache-size`) or skip it.",
next_video.title,
Bytes::new(video_size),
Bytes::new(max_cache_size)
);
return Ok(CacheSizeCheck::ExceedsMaxCacheSize);
}
if cache_allocation + video_size >= max_cache_size {
if !self.printed_warning {
warn!(
"Can't download video: '{}' ({}) as it's too large for the cache ({} of {} allocated). \
Waiting for cache size reduction..",
next_video.title, Bytes::new(video_size), Bytes::new(cache_allocation), Bytes::new(max_cache_size)
);
self.printed_warning = true;
}
if let Some(cca) = self.cached_cache_allocation {
if cca != cache_allocation {
warn!(
"Current cache size has changed, it's now: '{}'",
Bytes::new(cache_allocation)
);
self.cached_cache_allocation = Some(cache_allocation);
}
} else {
info!(
"Current cache size allocation: '{}'",
Bytes::new(cache_allocation)
);
self.cached_cache_allocation = Some(cache_allocation);
}
// Wait and hope, that a large video is deleted from the cache.
time::sleep(Duration::from_secs(10)).await;
Ok(CacheSizeCheck::TooLarge)
} else {
self.printed_warning = false;
Ok(CacheSizeCheck::Fits)
}
}
/// The entry point to the Downloader.
/// This Downloader will periodically check if the database has changed, and then also
/// change which videos it downloads.
/// This will run, until the database doesn't contain any watchable videos
pub async fn consume(&mut self, app: Arc<App>, max_cache_size: u64) -> Result<()> {
while let Some(next_video) = get_next_uncached_video(&app).await? {
match self
.is_enough_cache_available(&app, max_cache_size, &next_video)
.await?
{
CacheSizeCheck::Fits => (),
CacheSizeCheck::TooLarge => continue,
CacheSizeCheck::ExceedsMaxCacheSize => bail!("Giving up."),
};
if self.current_download.is_some() {
let current_download = self.current_download.take().expect("Is Some");
if current_download.task_handle.is_finished() {
current_download.task_handle.await??;
continue;
}
if next_video.extractor_hash != current_download.extractor_hash {
info!(
"Noticed, that the next video is not the video being downloaded, replacing it ('{}' vs. '{}')!",
next_video.extractor_hash.into_short_hash(&app).await?, current_download.extractor_hash.into_short_hash(&app).await?
);
// Replace the currently downloading video
current_download.task_handle.abort();
let new_current_download =
CurrentDownload::new_from_video(Arc::clone(&app), next_video);
self.current_download = Some(new_current_download);
} else {
// Reset the taken value
self.current_download = Some(current_download);
}
} else {
info!(
"No video is being downloaded right now, setting it to '{}'",
next_video.title
);
let new_current_download =
CurrentDownload::new_from_video(Arc::clone(&app), next_video);
self.current_download = Some(new_current_download);
}
time::sleep(Duration::new(1, 0)).await;
}
info!("Finished downloading!");
Ok(())
}
pub async fn get_current_cache_allocation(app: &App) -> Result<u64> {
fn dir_size(mut dir: fs::ReadDir) -> BoxFuture<'static, Result<u64>> {
async move {
let mut acc = 0;
while let Some(entry) = dir.next_entry().await? {
let size = match entry.metadata().await? {
data if data.is_dir() => {
let path = entry.path();
let read_dir = fs::read_dir(path).await?;
dir_size(read_dir).await?
}
data => data.len(),
};
acc += size;
}
Ok(acc)
}
.boxed()
}
dir_size(fs::read_dir(&app.config.paths.download_dir).await?).await
}
async fn get_approx_video_size(&mut self, app: &App, video: &Video) -> Result<u64> {
if let Some(value) = self.video_size_cache.get(&video.extractor_hash) {
Ok(*value)
} else {
// the subtitle file size should be negligible
let add_opts = YtDlpOptions {
subtitle_langs: "".to_owned(),
};
let opts = &download_opts(app, add_opts);
let result = yt_dlp::extract_info(opts, &video.url, false, true)
.await
.with_context(|| {
format!("Failed to extract video information: '{}'", video.title)
})?;
let size = if let Some(val) = result.filesize {
val
} else if let Some(val) = result.filesize_approx {
val
} else if result.duration.is_some() && result.tbr.is_some() {
let duration = result.duration.expect("Is some").ceil() as u64;
// TODO: yt_dlp gets this from the format
let tbr = result.tbr.expect("Is Some").ceil() as u64;
duration * tbr * (1000 / 8)
} else {
let hardcoded_default = Bytes::from_str("250 MiB").expect("This is hardcoded");
error!(
"Failed to find a filesize for video: '{}' (Using hardcoded value of {})",
video.title, hardcoded_default
);
hardcoded_default.as_u64()
};
assert_eq!(
self.video_size_cache
.insert(video.extractor_hash.clone(), size),
None
);
Ok(size)
}
}
async fn actually_cache_video(app: &App, video: &Video) -> Result<()> {
debug!("Download started: {}", &video.title);
let addional_opts = get_video_yt_dlp_opts(app, &video.extractor_hash).await?;
let result = yt_dlp::download(&[video.url.clone()], &download_opts(app, addional_opts))
.await
.with_context(|| format!("Failed to download video: '{}'", video.title))?;
assert_eq!(result.len(), 1);
let result = &result[0];
set_video_cache_path(app, &video.extractor_hash, Some(result)).await?;
info!(
"Video '{}' was downlaoded to path: {}",
video.title,
result.display()
);
Ok(())
}
}
|