// yt - A fully featured command line YouTube client // // Copyright (C) 2024 Benedikt Peetz // SPDX-License-Identifier: GPL-3.0-or-later // // This file is part of Yt. // // You should have received a copy of the License along with this program. // If not, see . // The pyo3 `pyfunction` proc-macros call unsafe functions internally, which trigger this lint. #![allow(unsafe_op_in_unsafe_fn)] #![allow(clippy::missing_errors_doc)] use std::env; use std::io::stdout; use std::{fs::File, io::Write}; use std::{path::PathBuf, sync::Once}; use crate::{duration::Duration, logging::setup_logging, wrapper::info_json::InfoJson}; use bytes::Bytes; use log::{info, log_enabled, Level}; use pyo3::types::{PyString, PyTuple, PyTupleMethods}; use pyo3::{ pyfunction, types::{PyAnyMethods, PyDict, PyDictMethods, PyList, PyListMethods, PyModule}, wrap_pyfunction_bound, Bound, PyAny, PyResult, Python, }; use serde::Serialize; use serde_json::{Map, Value}; use url::Url; pub mod duration; pub mod logging; pub mod wrapper; #[cfg(test)] mod tests; /// Synchronisation helper, to ensure that we don't setup the logger multiple times static SYNC_OBJ: Once = Once::new(); /// Add a logger to the yt-dlp options. /// If you have an logger set (i.e. for rust), than this will log to rust /// /// # Panics /// This should never panic. pub fn add_logger_and_sig_handler<'a>( opts: Bound<'a, PyDict>, py: Python<'_>, ) -> PyResult> { setup_logging(py, "yt_dlp")?; let logging = PyModule::import_bound(py, "logging")?; let ytdl_logger = logging.call_method1("getLogger", ("yt_dlp",))?; // Ensure that all events are logged by setting the log level to NOTSET (we filter on rust's side) // Also use this static, to ensure that we don't configure the logger every time SYNC_OBJ.call_once(|| { // Disable the SIGINT (Ctrl+C) handler, python installs. // This allows the user to actually stop the application with Ctrl+C. // This is here because it can only be run in the main thread and this was here already. py.run_bound( "\ import signal signal.signal(signal.SIGINT, signal.SIG_DFL)", None, None, ) .expect("This code should always work"); let config_opts = PyDict::new_bound(py); config_opts .set_item("level", 0) .expect("Setting this item should always work"); logging .call_method("basicConfig", (), Some(&config_opts)) .expect("This method exists"); }); // This was taken from `ytcc`, I don't think it is still applicable // ytdl_logger.setattr("propagate", false)?; // let logging_null_handler = logging.call_method0("NullHandler")?; // ytdl_logger.setattr("addHandler", logging_null_handler)?; opts.set_item("logger", ytdl_logger).expect("Should work"); Ok(opts) } #[pyfunction] #[allow(clippy::too_many_lines)] #[allow(clippy::missing_panics_doc)] #[allow(clippy::items_after_statements)] #[allow( clippy::cast_possible_truncation, clippy::cast_sign_loss, clippy::cast_precision_loss )] pub fn progress_hook(py: Python<'_>, input: &Bound<'_, PyDict>) -> PyResult<()> { // Only add the handler, if the log-level is higher than Debug (this avoids covering debug // messages). // FIXME: We should actually just find a way to not cover printed messages. <2024-10-19> if log_enabled!(Level::Debug) { return Ok(()); } // ANSI ESCAPE CODES Wrappers {{{ // see: https://en.wikipedia.org/wiki/ANSI_escape_code#Control_Sequence_Introducer_commands const CSI: &str = "\x1b["; fn clear_whole_line() { print!("{CSI}2K"); } fn move_to_col(x: usize) { print!("{CSI}{x}G"); } // }}} let input: Map = serde_json::from_str(&json_dumps( py, input .downcast::() .expect("Will always work") .to_owned(), )?) .expect("Python should always produce valid json"); macro_rules! get { (@interrogate $item:ident, $type_fun:ident, $get_fun:ident, $name:expr) => {{ let a = $item.get($name).expect(concat!( "The field '", stringify!($name), "' should exist." )); if a.$type_fun() { a.$get_fun().expect( "The should have been checked in the if guard, so unpacking here is fine", ) } else { panic!( "Value {} => \n{}\n is not of type: {}", $name, a, stringify!($type_fun) ); } }}; ($type_fun:ident, $get_fun:ident, $name1:expr, $name2:expr) => {{ let a = get! {@interrogate input, is_object, as_object, $name1}; let b = get! {@interrogate a, $type_fun, $get_fun, $name2}; b }}; ($type_fun:ident, $get_fun:ident, $name:expr) => {{ get! {@interrogate input, $type_fun, $get_fun, $name} }}; } macro_rules! default_get { (@interrogate $item:ident, $default:expr, $get_fun:ident, $name:expr) => {{ let a = if let Some(field) = $item.get($name) { field.$get_fun().unwrap_or($default) } else { $default }; a }}; ($get_fun:ident, $default:expr, $name1:expr, $name2:expr) => {{ let a = get! {@interrogate input, is_object, as_object, $name1}; let b = default_get! {@interrogate a, $default, $get_fun, $name2}; b }}; ($get_fun:ident, $default:expr, $name:expr) => {{ default_get! {@interrogate input, $default, $get_fun, $name} }}; } macro_rules! c { ($color:expr, $format:expr) => { format!("\x1b[{}m{}\x1b[0m", $color, $format) }; } fn format_bytes(bytes: u64) -> String { let bytes = Bytes::new(bytes); bytes.to_string() } fn format_speed(speed: f64) -> String { #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)] let bytes = Bytes::new(speed.floor() as u64); format!("{bytes}/s") } let get_title = |add_extension: bool| -> String { match get! {is_string, as_str, "info_dict", "ext"} { "vtt" => { format!( "Subtitles ({})", default_get! {as_str, "", "info_dict", "name"} ) } title_extension @ ("webm" | "mp4" | "m4a") => { if add_extension { format!( "{} ({})", default_get! { as_str, "", "info_dict", "title"}, title_extension ) } else { default_get! { as_str, "", "info_dict", "title"}.to_owned() } } other => panic!("The extension '{other}' is not yet implemented"), } }; match get! {is_string, as_str, "status"} { "downloading" => { let elapsed = default_get! {as_f64, 0.0f64, "elapsed"}; let eta = default_get! {as_f64, 0.0, "eta"}; let speed = default_get! {as_f64, 0.0, "speed"}; let downloaded_bytes = get! {is_u64, as_u64, "downloaded_bytes"}; let total_bytes = { let total_bytes = default_get!(as_u64, 0, "total_bytes"); if total_bytes == 0 { let maybe_estimate = default_get!(as_u64, 0, "total_bytes_estimate"); if maybe_estimate == 0 { // The download speed should be in bytes per second and the eta in seconds. // Thus multiplying them gets us the raw bytes (which were estimated by `yt_dlp`, from their `info.json`) let bytes_still_needed = (speed * eta).ceil() as u64; downloaded_bytes + bytes_still_needed } else { maybe_estimate } } else { total_bytes } }; let percent: f64 = { if total_bytes == 0 { 100.0 } else { (downloaded_bytes as f64 / total_bytes as f64) * 100.0 } }; clear_whole_line(); move_to_col(1); print!( "'{}' [{}/{} at {}] -> [{}/{} {}] ", c!("34;1", get_title(true)), c!("33;1", Duration::from(Some(elapsed))), c!("33;1", Duration::from(Some(eta))), c!("32;1", format_speed(speed)), c!("31;1", format_bytes(downloaded_bytes)), c!("31;1", format_bytes(total_bytes)), c!("36;1", format!("{:.02}%", percent)) ); stdout().flush()?; } "finished" => { println!("-> Finished downloading: '{}'", c!("34;1", get_title(true))); } "error" => { panic!("-> Error while downloading: {}", get_title(true)) } other => unreachable!("'{other}' should not be a valid state!"), }; Ok(()) } pub fn add_hooks<'a>(opts: Bound<'a, PyDict>, py: Python<'_>) -> PyResult> { if let Some(hooks) = opts.get_item("progress_hooks")? { let hooks = hooks.downcast::()?; hooks.append(wrap_pyfunction_bound!(progress_hook, py)?)?; opts.set_item("progress_hooks", hooks)?; } else { // No hooks are set yet let hooks_list = PyList::new_bound(py, &[wrap_pyfunction_bound!(progress_hook, py)?]); opts.set_item("progress_hooks", hooks_list)?; } Ok(opts) } /// `extract_info(self, url, download=True, ie_key=None, extra_info=None, process=True, force_generic_extractor=False)` /// /// Extract and return the information dictionary of the URL /// /// Arguments: /// @param url URL to extract /// /// Keyword arguments: /// @param download Whether to download videos /// @param process Whether to resolve all unresolved references (URLs, playlist items). /// Must be True for download to work /// @param `ie_key` Use only the extractor with this key /// /// @param `extra_info` Dictionary containing the extra values to add to the info (For internal use only) /// @`force_generic_extractor` Force using the generic extractor (Deprecated; use `ie_key`='Generic') #[allow(clippy::unused_async)] #[allow(clippy::missing_panics_doc)] pub async fn extract_info( yt_dlp_opts: &Map, url: &Url, download: bool, process: bool, ) -> PyResult { Python::with_gil(|py| { let opts = json_map_to_py_dict(yt_dlp_opts, py)?; let instance = get_yt_dlp(py, opts)?; let args = (url.as_str(),); let kwargs = PyDict::new_bound(py); kwargs.set_item("download", download)?; kwargs.set_item("process", process)?; let result = instance.call_method("extract_info", args, Some(&kwargs))?; // Remove the ``, by setting it to null if !process { result.set_item("entries", ())?; } let result_str = json_dumps(py, result)?; if let Ok(confirm) = env::var("YT_STORE_INFO_JSON") { if confirm == "yes" { let mut file = File::create("output.info.json")?; write!(file, "{result_str}").unwrap(); } } Ok(serde_json::from_str(&result_str) .expect("Python should be able to produce correct json")) }) } /// # Panics /// Only if python fails to return a valid URL. pub fn unsmuggle_url(smug_url: &Url) -> PyResult { Python::with_gil(|py| { let utils = get_yt_dlp_utils(py)?; let url = utils .call_method1("unsmuggle_url", (smug_url.as_str(),))? .downcast::()? .get_item(0)?; let url: Url = url .downcast::()? .to_string() .parse() .expect("Python should be able to return a valid url"); Ok(url) }) } /// Download a given list of URLs. /// Returns the paths they were downloaded to. /// /// # Panics /// Only if `yt_dlp` changes their `info_json` schema. pub async fn download( urls: &[Url], download_options: &Map, ) -> PyResult> { let mut out_paths = Vec::with_capacity(urls.len()); for url in urls { info!("Started downloading url: '{}'", url); let info_json = extract_info(download_options, url, true, true).await?; // Try to work around yt-dlp type weirdness let result_string = if let Some(filename) = info_json.filename { filename } else { info_json.requested_downloads.expect("This must exist")[0] .filename .clone() }; out_paths.push(result_string); info!("Finished downloading url: '{}'", url); } Ok(out_paths) } fn json_map_to_py_dict<'a>( map: &Map, py: Python<'a>, ) -> PyResult> { let json_string = serde_json::to_string(&map).expect("This must always work"); let python_dict = json_loads(py, json_string)?; Ok(python_dict) } fn json_dumps(py: Python<'_>, input: Bound<'_, PyAny>) -> PyResult { // json.dumps(yt_dlp.sanitize_info(input)) let yt_dlp = get_yt_dlp(py, PyDict::new_bound(py))?; let sanitized_result = yt_dlp.call_method1("sanitize_info", (input,))?; let json = PyModule::import_bound(py, "json")?; let dumps = json.getattr("dumps")?; let output = dumps.call1((sanitized_result,))?; let output_str = output.extract::()?; Ok(output_str) } fn json_loads_str(py: Python<'_>, input: T) -> PyResult> { let string = serde_json::to_string(&input).expect("Correct json must be pased"); json_loads(py, string) } fn json_loads(py: Python<'_>, input: String) -> PyResult> { // json.loads(input) let json = PyModule::import_bound(py, "json")?; let dumps = json.getattr("loads")?; let output = dumps.call1((input,))?; Ok(output .downcast::() .expect("This should always be a PyDict") .clone()) } fn get_yt_dlp_utils(py: Python<'_>) -> PyResult> { let yt_dlp = PyModule::import_bound(py, "yt_dlp")?; let utils = yt_dlp.getattr("utils")?; Ok(utils) } fn get_yt_dlp<'a>(py: Python<'a>, opts: Bound<'a, PyDict>) -> PyResult> { // Unconditionally set a logger let opts = add_logger_and_sig_handler(opts, py)?; let opts = add_hooks(opts, py)?; let yt_dlp = PyModule::import_bound(py, "yt_dlp")?; let youtube_dl = yt_dlp.call_method1("YoutubeDL", (opts,))?; Ok(youtube_dl) }