// yt - A fully featured command line YouTube client // // Copyright (C) 2024 Benedikt Peetz // SPDX-License-Identifier: GPL-3.0-or-later // // This file is part of Yt. // // You should have received a copy of the License along with this program. // If not, see . use std::env; use std::{fs::File, io::Write}; use std::{path::PathBuf, sync::Once}; use crate::{duration::Duration, logging::setup_logging, wrapper::info_json::InfoJson}; use bytes::Bytes; use log::{info, log_enabled, warn, Level}; use pyo3::types::{PyString, PyTuple, PyTupleMethods}; use pyo3::{ pyfunction, types::{PyAnyMethods, PyDict, PyDictMethods, PyList, PyListMethods, PyModule}, wrap_pyfunction_bound, Bound, PyAny, PyResult, Python, }; use serde::Serialize; use serde_json::{Map, Value}; use url::Url; pub mod duration; pub mod logging; pub mod wrapper; /// Synchronisation helper, to ensure that we don't setup the logger multiple times static SYNC_OBJ: Once = Once::new(); /// Add a logger to the yt-dlp options. /// If you have an logger set (i.e. for rust), than this will log to rust pub fn add_logger_and_sig_handler<'a>( opts: Bound<'a, PyDict>, py: Python, ) -> PyResult> { setup_logging(py, "yt_dlp")?; let logging = PyModule::import_bound(py, "logging")?; let ytdl_logger = logging.call_method1("getLogger", ("yt_dlp",))?; // Ensure that all events are logged by setting the log level to NOTSET (we filter on rust's side) // Also use this static, to ensure that we don't configure the logger every time SYNC_OBJ.call_once(|| { // Disable the SIGINT (Ctrl+C) handler, python installs. // This allows the user to actually stop the application with Ctrl+C. // This is here because it can only be run in the main thread and this was here already. py.run_bound( r#" import signal signal.signal(signal.SIGINT, signal.SIG_DFL) "#, None, None, ) .expect("This code should always work"); let config_opts = PyDict::new_bound(py); config_opts .set_item("level", 0) .expect("Setting this item should always work"); logging .call_method("basicConfig", (), Some(&config_opts)) .expect("This method exists"); }); // This was taken from `ytcc`, I don't think it is still applicable // ytdl_logger.setattr("propagate", false)?; // let logging_null_handler = logging.call_method0("NullHandler")?; // ytdl_logger.setattr("addHandler", logging_null_handler)?; opts.set_item("logger", ytdl_logger).expect("Should work"); Ok(opts) } #[pyfunction] pub fn progress_hook(py: Python, input: Bound<'_, PyDict>) -> PyResult<()> { // Only add the handler, if the log-level is higher than Debug (this avoids covering debug // messages). if log_enabled!(Level::Debug) { return Ok(()); } let input: serde_json::Map = serde_json::from_str(&json_dumps( py, input .downcast::() .expect("Will always work") .to_owned(), )?) .expect("Python should always produce valid json"); macro_rules! get { (@interrogate $item:ident, $type_fun:ident, $get_fun:ident, $name:expr) => {{ let a = $item.get($name).expect(concat!( "The field '", stringify!($name), "' should exist." )); if a.$type_fun() { a.$get_fun().expect( "The should have been checked in the if guard, so unpacking here is fine", ) } else { panic!( "Value {} => \n{}\n is not of type: {}", $name, a, stringify!($type_fun) ); } }}; ($type_fun:ident, $get_fun:ident, $name1:expr, $name2:expr) => {{ let a = get! {@interrogate input, is_object, as_object, $name1}; let b = get! {@interrogate a, $type_fun, $get_fun, $name2}; b }}; ($type_fun:ident, $get_fun:ident, $name:expr) => {{ get! {@interrogate input, $type_fun, $get_fun, $name} }}; } macro_rules! default_get { (@interrogate $item:ident, $default:expr, $get_fun:ident, $name:expr) => {{ let a = if let Some(field) = $item.get($name) { field.$get_fun().unwrap_or($default) } else { $default }; a }}; ($get_fun:ident, $default:expr, $name1:expr, $name2:expr) => {{ let a = get! {@interrogate input, is_object, as_object, $name1}; let b = default_get! {@interrogate a, $default, $get_fun, $name2}; b }}; ($get_fun:ident, $default:expr, $name:expr) => {{ default_get! {@interrogate input, $default, $get_fun, $name} }}; } macro_rules! c { ($color:expr, $format:expr) => { format!("\x1b[{}m{}\x1b[0m", $color, $format) }; } fn format_bytes(bytes: u64) -> String { let bytes = Bytes::new(bytes); bytes.to_string() } fn format_speed(speed: f64) -> String { let bytes = Bytes::new(speed.floor() as u64); format!("{}/s", bytes) } let get_title = |add_extension: bool| -> String { match get! {is_string, as_str, "info_dict", "ext"} { "vtt" => { format!( "Subtitles ({})", default_get! {as_str, "", "info_dict", "name"} ) } title_extension @ ("webm" | "mp4" | "m4a") => { if add_extension { format!( "{} ({})", default_get! { as_str, "", "info_dict", "title"}, title_extension ) } else { default_get! { as_str, "", "info_dict", "title"}.to_owned() } } other => panic!("The extension '{}' is not yet implemented", other), } }; match get! {is_string, as_str, "status"} { "downloading" => { let elapsed = default_get! {as_f64, 0.0f64, "elapsed"}; let eta = default_get! {as_f64, 0.0, "eta"}; let speed = default_get! {as_f64, 0.0, "speed"}; let downloaded_bytes = get! {is_u64, as_u64, "downloaded_bytes"}; let total_bytes = { let total_bytes = default_get!(as_u64, 0, "total_bytes"); if total_bytes == 0 { let estimate = default_get!(as_u64, 0, "total_bytes_estimate"); warn!( "The video does not have a total_byte count, using an estimate of '{}'", estimate ); estimate } else { total_bytes // FIXME: The solution below _would_ work, if we had a info_json with the // values. <2024-08-24> // // let duration = default_get! {as_f64, 0.0, "duration"}.ceil() as u64; // // TODO: yt_dlp gets this from the format // let tbr = default_get! {as_f64, 0.0, "tbr"}.ceil() as u64; // duration * tbr * (1000 / 8) } }; let percent: f64 = { if total_bytes == 0 { 100.0 } else { (downloaded_bytes as f64 / total_bytes as f64) * 100.0 } }; print!("\x1b[1F"); // Move one line up, to allow the `println` after it to print a newline print!("\x1b[2K"); // Clear whole line. print!("\x1b[1G"); // Move cursor to column 1. println!( "'{}' [{}/{} at {}] -> [{}/{} {}]", c!("34;1", get_title(true)), c!("33;1", Duration::from(Some(elapsed))), c!("33;1", Duration::from(Some(eta))), c!("32;1", format_speed(speed)), c!("31;1", format_bytes(downloaded_bytes)), c!("31;1", format_bytes(total_bytes)), c!("36;1", format!("{:.02}%", percent)) ); } "finished" => { println!("Finished downloading: '{}'", c!("34;1", get_title(false))) } "error" => { panic!("Error whilst downloading: {}", get_title(true)) } other => panic!("{} is not a valid state!", other), }; Ok(()) } pub fn add_hooks<'a>(opts: Bound<'a, PyDict>, py: Python) -> PyResult> { if let Some(hooks) = opts.get_item("progress_hooks")? { let hooks = hooks.downcast::()?; hooks.append(wrap_pyfunction_bound!(progress_hook, py)?)?; opts.set_item("progress_hooks", hooks)?; } else { // No hooks are set yet let hooks_list = PyList::new_bound(py, &[wrap_pyfunction_bound!(progress_hook, py)?]); opts.set_item("progress_hooks", hooks_list)?; } Ok(opts) } /// `extract_info(self, url, download=True, ie_key=None, extra_info=None, process=True, force_generic_extractor=False)` /// /// Extract and return the information dictionary of the URL /// /// Arguments: /// @param url URL to extract /// /// Keyword arguments: /// @param download Whether to download videos /// @param process Whether to resolve all unresolved references (URLs, playlist items). /// Must be True for download to work /// @param ie_key Use only the extractor with this key /// /// @param extra_info Dictionary containing the extra values to add to the info (For internal use only) /// @force_generic_extractor Force using the generic extractor (Deprecated; use ie_key='Generic') pub async fn extract_info( yt_dlp_opts: &Map, url: &Url, download: bool, process: bool, ) -> PyResult { Python::with_gil(|py| { let opts = json_map_to_py_dict(yt_dlp_opts, py)?; let instance = get_yt_dlp(py, opts)?; let args = (url.as_str(),); let kwargs = PyDict::new_bound(py); kwargs.set_item("download", download)?; kwargs.set_item("process", process)?; let result = instance.call_method("extract_info", args, Some(&kwargs))?; // Remove the ``, by setting it to null if !process { result.set_item("entries", ())?; } let result_str = json_dumps(py, result)?; if let Ok(confirm) = env::var("YT_STORE_INFO_JSON") { if confirm == "yes" { let mut file = File::create("output.info.json").unwrap(); write!(file, "{}", result_str).unwrap(); } } Ok(serde_json::from_str(&result_str) .expect("Python should be able to produce correct json")) }) } pub fn unsmuggle_url(smug_url: Url) -> PyResult { Python::with_gil(|py| { let utils = get_yt_dlp_utils(py)?; let url = utils .call_method1("unsmuggle_url", (smug_url.as_str(),))? .downcast::()? .get_item(0)?; let url: Url = url .downcast::()? .to_string() .parse() .expect("Python should be able to return a valid url"); Ok(url) }) } /// Download a given list of URLs. /// Returns the paths they were downloaded to. pub async fn download( urls: &[Url], download_options: &Map, ) -> PyResult> { let mut out_paths = Vec::with_capacity(urls.len()); for url in urls { info!("Started downloading url: '{}'", url); let info_json = extract_info(download_options, url, true, true).await?; // Try to work around yt-dlp type weirdness let result_string = if let Some(filename) = info_json.filename { filename } else { info_json.requested_downloads.expect("This must exist")[0] .filename .to_owned() }; out_paths.push(result_string); info!("Finished downloading url: '{}'", url); } Ok(out_paths) } fn json_map_to_py_dict<'a>( map: &Map, py: Python<'a>, ) -> PyResult> { let json_string = serde_json::to_string(&map).expect("This must always work"); let python_dict = json_loads(py, json_string)?; Ok(python_dict) } fn json_dumps(py: Python, input: Bound) -> PyResult { // json.dumps(yt_dlp.sanitize_info(input)) let yt_dlp = get_yt_dlp(py, PyDict::new_bound(py))?; let sanitized_result = yt_dlp.call_method1("sanitize_info", (input,))?; let json = PyModule::import_bound(py, "json")?; let dumps = json.getattr("dumps")?; let output = dumps.call1((sanitized_result,))?; let output_str = output.extract::()?; Ok(output_str) } fn json_loads_str(py: Python, input: T) -> PyResult> { let string = serde_json::to_string(&input).expect("Correct json must be pased"); json_loads(py, string) } fn json_loads(py: Python, input: String) -> PyResult> { // json.loads(input) let json = PyModule::import_bound(py, "json")?; let dumps = json.getattr("loads")?; let output = dumps.call1((input,))?; Ok(output .downcast::() .expect("This should always be a PyDict") .clone()) } fn get_yt_dlp_utils(py: Python<'_>) -> PyResult> { let yt_dlp = PyModule::import_bound(py, "yt_dlp")?; let utils = yt_dlp.getattr("utils")?; Ok(utils) } fn get_yt_dlp<'a>(py: Python<'a>, opts: Bound<'a, PyDict>) -> PyResult> { // Unconditionally set a logger let opts = add_logger_and_sig_handler(opts, py)?; let opts = add_hooks(opts, py)?; let yt_dlp = PyModule::import_bound(py, "yt_dlp")?; let youtube_dl = yt_dlp.call_method1("YoutubeDL", (opts,))?; Ok(youtube_dl) }