// yt - A fully featured command line YouTube client
//
// Copyright (C) 2024 Benedikt Peetz <benedikt.peetz@b-peetz.de>
// SPDX-License-Identifier: GPL-3.0-or-later
//
// This file is part of Yt.
//
// You should have received a copy of the License along with this program.
// If not, see <https://www.gnu.org/licenses/gpl-3.0.txt>.
// use std::{fs::File, io::Write};
use std::{path::PathBuf, sync::Once};
use crate::{duration::Duration, logging::setup_logging, wrapper::info_json::InfoJson};
use bytes::Bytes;
use log::{info, warn};
use pyo3::types::{PyString, PyTuple, PyTupleMethods};
use pyo3::{
pyfunction,
types::{PyAnyMethods, PyDict, PyDictMethods, PyList, PyListMethods, PyModule},
wrap_pyfunction_bound, Bound, PyAny, PyResult, Python,
};
use serde::Serialize;
use serde_json::{Map, Value};
use url::Url;
pub mod duration;
pub mod logging;
pub mod wrapper;
/// Synchronisation helper, to ensure that we don't setup the logger multiple times
static SYNC_OBJ: Once = Once::new();
/// Add a logger to the yt-dlp options.
/// If you have an logger set (i.e. for rust), than this will log to rust
pub fn add_logger_and_sig_handler<'a>(
opts: Bound<'a, PyDict>,
py: Python,
) -> PyResult<Bound<'a, PyDict>> {
setup_logging(py, "yt_dlp")?;
let logging = PyModule::import_bound(py, "logging")?;
let ytdl_logger = logging.call_method1("getLogger", ("yt_dlp",))?;
// Ensure that all events are logged by setting the log level to NOTSET (we filter on rust's side)
// Also use this static, to ensure that we don't configure the logger every time
SYNC_OBJ.call_once(|| {
// Disable the SIGINT (Ctrl+C) handler, python installs.
// This allows the user to actually stop the application with Ctrl+C.
// This is here because it can only be run in the main thread and this was here already.
py.run_bound(
r#"
import signal
signal.signal(signal.SIGINT, signal.SIG_DFL)
"#,
None,
None,
)
.expect("This code should always work");
let config_opts = PyDict::new_bound(py);
config_opts
.set_item("level", 0)
.expect("Setting this item should always work");
logging
.call_method("basicConfig", (), Some(&config_opts))
.expect("This method exists");
});
// This was taken from `ytcc`, I don't think it is still applicable
// ytdl_logger.setattr("propagate", false)?;
// let logging_null_handler = logging.call_method0("NullHandler")?;
// ytdl_logger.setattr("addHandler", logging_null_handler)?;
opts.set_item("logger", ytdl_logger).expect("Should work");
Ok(opts)
}
#[pyfunction]
pub fn progress_hook<'a>(py: Python, input: Bound<'_, PyDict>) -> PyResult<()> {
let input: serde_json::Map<String, Value> = serde_json::from_str(&json_dumps(
py,
input
.downcast::<PyAny>()
.expect("Will always work")
.to_owned(),
)?)
.expect("Python should always produce valid json");
macro_rules! get {
(@interrogate $item:ident, $type_fun:ident, $get_fun:ident, $name:expr) => {{
let a = $item.get($name).expect(concat!(
"The field '",
stringify!($name),
"' should exist."
));
if a.$type_fun() {
a.$get_fun().expect(
"The should have been checked in the if guard, so unpacking here is fine",
)
} else {
panic!(
"Value {} => \n{}\n is not of type: {}",
$name,
a,
stringify!($type_fun)
);
}
}};
($type_fun:ident, $get_fun:ident, $name1:expr, $name2:expr) => {{
let a = get! {@interrogate input, is_object, as_object, $name1};
let b = get! {@interrogate a, $type_fun, $get_fun, $name2};
b
}};
($type_fun:ident, $get_fun:ident, $name:expr) => {{
get! {@interrogate input, $type_fun, $get_fun, $name}
}};
}
macro_rules! default_get {
(@interrogate $item:ident, $default:expr, $get_fun:ident, $name:expr) => {{
let a = if let Some(field) = $item.get($name) {
field.$get_fun().unwrap_or($default)
} else {
$default
};
a
}};
($get_fun:ident, $default:expr, $name1:expr, $name2:expr) => {{
let a = get! {@interrogate input, is_object, as_object, $name1};
let b = default_get! {@interrogate a, $default, $get_fun, $name2};
b
}};
($get_fun:ident, $default:expr, $name:expr) => {{
default_get! {@interrogate input, $default, $get_fun, $name}
}};
}
macro_rules! c {
($color:expr, $format:expr) => {
format!("\x1b[{}m{}\x1b[0m", $color, $format)
};
}
fn format_bytes(bytes: u64) -> String {
let bytes = Bytes::new(bytes);
bytes.to_string()
}
fn format_speed(speed: f64) -> String {
let bytes = Bytes::new(speed.floor() as u64);
format!("{}/s", bytes)
}
let get_title = |add_extension: bool| -> String {
match get! {is_string, as_str, "info_dict", "ext"} {
"vtt" => {
format!(
"Subtitles ({})",
get! {is_string, as_str, "info_dict", "name"}
)
}
title_extension @ ("webm" | "mp4" | "m4a") => {
if add_extension {
format!(
"{} ({})",
default_get! { as_str, "<No title>", "info_dict", "title"},
title_extension
)
} else {
default_get! { as_str, "<No title>", "info_dict", "title"}.to_owned()
}
}
other => panic!("The extension '{}' is not yet implemented", other),
}
};
match get! {is_string, as_str, "status"} {
"downloading" => {
let elapsed = default_get! {as_f64, 0.0f64, "elapsed"};
let eta = default_get! {as_f64, 0.0, "eta"};
let speed = default_get! {as_f64, 0.0, "speed"};
let downloaded_bytes = get! {is_u64, as_u64, "downloaded_bytes"};
let total_bytes = {
let total_bytes = default_get!(as_u64, 0, "total_bytes");
if total_bytes == 0 {
let estimate = default_get!(as_u64, 0, "total_bytes_estimate");
warn!(
"The video does not have a total_byte count, using an estimate of '{}'",
estimate
);
estimate
} else {
total_bytes
}
};
let percent: f64 = {
if total_bytes == 0 {
100.0
} else {
(downloaded_bytes as f64 / total_bytes as f64) * 100.0
}
};
print!("\x1b[1F"); // Move one line up, to allow the `println` after it to print a newline
print!("\x1b[2K"); // Clear whole line.
print!("\x1b[1G"); // Move cursor to column 1.
println!(
"'{}' [{}/{} at {}] -> [{}/{} {}]",
c!("34;1", get_title(true)),
c!("33;1", Duration::from(Some(elapsed))),
c!("33;1", Duration::from(Some(eta))),
c!("32;1", format_speed(speed)),
c!("31;1", format_bytes(downloaded_bytes)),
c!("31;1", format_bytes(total_bytes)),
c!("36;1", format!("{:.02}%", percent))
);
}
"finished" => {
println!("Finished downloading: '{}'", c!("34;1", get_title(false)))
}
"error" => {
panic!("Error whilst downloading: {}", get_title(true))
}
other => panic!("{} is not a valid state!", other),
};
Ok(())
}
pub fn add_hooks<'a>(opts: Bound<'a, PyDict>, py: Python) -> PyResult<Bound<'a, PyDict>> {
if let Some(hooks) = opts.get_item("progress_hooks")? {
let hooks = hooks.downcast::<PyList>()?;
hooks.append(wrap_pyfunction_bound!(progress_hook, py)?)?;
opts.set_item("progress_hooks", hooks)?;
} else {
// No hooks are set yet
let hooks_list = PyList::new_bound(py, &[wrap_pyfunction_bound!(progress_hook, py)?]);
opts.set_item("progress_hooks", hooks_list)?;
}
Ok(opts)
}
/// `extract_info(self, url, download=True, ie_key=None, extra_info=None, process=True, force_generic_extractor=False)`
///
/// Extract and return the information dictionary of the URL
///
/// Arguments:
/// @param url URL to extract
///
/// Keyword arguments:
/// @param download Whether to download videos
/// @param process Whether to resolve all unresolved references (URLs, playlist items).
/// Must be True for download to work
/// @param ie_key Use only the extractor with this key
///
/// @param extra_info Dictionary containing the extra values to add to the info (For internal use only)
/// @force_generic_extractor Force using the generic extractor (Deprecated; use ie_key='Generic')
pub async fn extract_info(
yt_dlp_opts: &Map<String, Value>,
url: &Url,
download: bool,
process: bool,
) -> PyResult<InfoJson> {
Python::with_gil(|py| {
let opts = json_map_to_py_dict(yt_dlp_opts, py)?;
let instance = get_yt_dlp(py, opts)?;
let args = (url.as_str(),);
let kwargs = PyDict::new_bound(py);
kwargs.set_item("download", download)?;
kwargs.set_item("process", process)?;
let result = instance.call_method("extract_info", args, Some(&kwargs))?;
// Remove the `<generator at 0xsome_hex>`, by setting it to null
if !process {
result.set_item("entries", ())?;
}
let result_str = json_dumps(py, result)?;
//let mut file = File::create("output.info.json").unwrap();
//write!(file, "{}", result_str).unwrap();
Ok(serde_json::from_str(&result_str)
.expect("Python should be able to produce correct json"))
})
}
pub fn unsmuggle_url(smug_url: Url) -> PyResult<Url> {
Python::with_gil(|py| {
let utils = get_yt_dlp_utils(py)?;
let url = utils
.call_method1("unsmuggle_url", (smug_url.as_str(),))?
.downcast::<PyTuple>()?
.get_item(0)?;
let url: Url = url
.downcast::<PyString>()?
.to_string()
.parse()
.expect("Python should be able to return a valid url");
Ok(url)
})
}
/// Download a given list of URLs.
/// Returns the paths they were downloaded to.
pub async fn download(
urls: &[Url],
download_options: &Map<String, Value>,
) -> PyResult<Vec<PathBuf>> {
let mut out_paths = Vec::with_capacity(urls.len());
for url in urls {
info!("Started downloading url: '{}'", url);
let info_json = extract_info(download_options, url, true, true).await?;
// Try to work around yt-dlp type weirdness
let result_string = if let Some(filename) = info_json.filename {
filename
} else {
(&info_json.requested_downloads.expect("This must exist")[0].filename).to_owned()
};
out_paths.push(result_string);
info!("Finished downloading url: '{}'", url);
}
Ok(out_paths)
}
fn json_map_to_py_dict<'a>(
map: &Map<String, Value>,
py: Python<'a>,
) -> PyResult<Bound<'a, PyDict>> {
let json_string = serde_json::to_string(&map).expect("This must always work");
let python_dict = json_loads(py, json_string)?;
Ok(python_dict)
}
fn json_dumps(py: Python, input: Bound<PyAny>) -> PyResult<String> {
// json.dumps(yt_dlp.sanitize_info(input))
let yt_dlp = get_yt_dlp(py, PyDict::new_bound(py))?;
let sanitized_result = yt_dlp.call_method1("sanitize_info", (input,))?;
let json = PyModule::import_bound(py, "json")?;
let dumps = json.getattr("dumps")?;
let output = dumps.call1((sanitized_result,))?;
let output_str = output.extract::<String>()?;
Ok(output_str)
}
fn json_loads_str<T: Serialize>(py: Python, input: T) -> PyResult<Bound<PyDict>> {
let string = serde_json::to_string(&input).expect("Correct json must be pased");
json_loads(py, string)
}
fn json_loads(py: Python, input: String) -> PyResult<Bound<PyDict>> {
// json.loads(input)
let json = PyModule::import_bound(py, "json")?;
let dumps = json.getattr("loads")?;
let output = dumps.call1((input,))?;
Ok(output
.downcast::<PyDict>()
.expect("This should always be a PyDict")
.clone())
}
fn get_yt_dlp_utils<'a>(py: Python<'a>) -> PyResult<Bound<'a, PyAny>> {
let yt_dlp = PyModule::import_bound(py, "yt_dlp")?;
let utils = yt_dlp.getattr("utils")?;
Ok(utils)
}
fn get_yt_dlp<'a>(py: Python<'a>, opts: Bound<'a, PyDict>) -> PyResult<Bound<'a, PyAny>> {
// Unconditionally set a logger
let opts = add_logger_and_sig_handler(opts, py)?;
let opts = add_hooks(opts, py)?;
let yt_dlp = PyModule::import_bound(py, "yt_dlp")?;
let youtube_dl = yt_dlp.call_method1("YoutubeDL", (opts,))?;
Ok(youtube_dl)
}