diff options
Diffstat (limited to 'yt_dlp/src/lib.rs')
-rw-r--r-- | yt_dlp/src/lib.rs | 412 |
1 files changed, 0 insertions, 412 deletions
diff --git a/yt_dlp/src/lib.rs b/yt_dlp/src/lib.rs deleted file mode 100644 index 37d0945..0000000 --- a/yt_dlp/src/lib.rs +++ /dev/null @@ -1,412 +0,0 @@ -// yt - A fully featured command line YouTube client -// -// Copyright (C) 2024 Benedikt Peetz <benedikt.peetz@b-peetz.de> -// SPDX-License-Identifier: GPL-3.0-or-later -// -// This file is part of Yt. -// -// You should have received a copy of the License along with this program. -// If not, see <https://www.gnu.org/licenses/gpl-3.0.txt>. - -// use std::{fs::File, io::Write}; - -use std::{path::PathBuf, sync::Once}; - -use crate::{duration::Duration, logging::setup_logging, wrapper::info_json::InfoJson}; - -use bytes::Bytes; -use log::{info, warn}; -use pyo3::types::{PyString, PyTuple, PyTupleMethods}; -use pyo3::{ - pyfunction, - types::{PyAnyMethods, PyDict, PyDictMethods, PyList, PyListMethods, PyModule}, - wrap_pyfunction_bound, Bound, PyAny, PyResult, Python, -}; -use serde::Serialize; -use serde_json::{Map, Value}; -use url::Url; - -pub mod duration; -pub mod logging; -pub mod wrapper; - -/// Synchronisation helper, to ensure that we don't setup the logger multiple times -static SYNC_OBJ: Once = Once::new(); - -/// Add a logger to the yt-dlp options. -/// If you have an logger set (i.e. for rust), than this will log to rust -pub fn add_logger_and_sig_handler<'a>( - opts: Bound<'a, PyDict>, - py: Python, -) -> PyResult<Bound<'a, PyDict>> { - setup_logging(py, "yt_dlp")?; - - let logging = PyModule::import_bound(py, "logging")?; - let ytdl_logger = logging.call_method1("getLogger", ("yt_dlp",))?; - - // Ensure that all events are logged by setting the log level to NOTSET (we filter on rust's side) - // Also use this static, to ensure that we don't configure the logger every time - SYNC_OBJ.call_once(|| { - // Disable the SIGINT (Ctrl+C) handler, python installs. - // This allows the user to actually stop the application with Ctrl+C. - // This is here because it can only be run in the main thread and this was here already. - py.run_bound( - r#" -import signal -signal.signal(signal.SIGINT, signal.SIG_DFL) - "#, - None, - None, - ) - .expect("This code should always work"); - - let config_opts = PyDict::new_bound(py); - config_opts - .set_item("level", 0) - .expect("Setting this item should always work"); - - logging - .call_method("basicConfig", (), Some(&config_opts)) - .expect("This method exists"); - }); - - // This was taken from `ytcc`, I don't think it is still applicable - // ytdl_logger.setattr("propagate", false)?; - // let logging_null_handler = logging.call_method0("NullHandler")?; - // ytdl_logger.setattr("addHandler", logging_null_handler)?; - - opts.set_item("logger", ytdl_logger).expect("Should work"); - - Ok(opts) -} - -#[pyfunction] -pub fn progress_hook<'a>(py: Python, input: Bound<'_, PyDict>) -> PyResult<()> { - let input: serde_json::Map<String, Value> = serde_json::from_str(&json_dumps( - py, - input - .downcast::<PyAny>() - .expect("Will always work") - .to_owned(), - )?) - .expect("Python should always produce valid json"); - - macro_rules! get { - (@interrogate $item:ident, $type_fun:ident, $get_fun:ident, $name:expr) => {{ - let a = $item.get($name).expect(concat!( - "The field '", - stringify!($name), - "' should exist." - )); - - if a.$type_fun() { - a.$get_fun().expect( - "The should have been checked in the if guard, so unpacking here is fine", - ) - } else { - panic!( - "Value {} => \n{}\n is not of type: {}", - $name, - a, - stringify!($type_fun) - ); - } - }}; - - ($type_fun:ident, $get_fun:ident, $name1:expr, $name2:expr) => {{ - let a = get! {@interrogate input, is_object, as_object, $name1}; - let b = get! {@interrogate a, $type_fun, $get_fun, $name2}; - b - }}; - - ($type_fun:ident, $get_fun:ident, $name:expr) => {{ - get! {@interrogate input, $type_fun, $get_fun, $name} - }}; - } - - macro_rules! default_get { - (@interrogate $item:ident, $default:expr, $get_fun:ident, $name:expr) => {{ - let a = if let Some(field) = $item.get($name) { - field.$get_fun().unwrap_or($default) - } else { - $default - }; - a - }}; - - ($get_fun:ident, $default:expr, $name1:expr, $name2:expr) => {{ - let a = get! {@interrogate input, is_object, as_object, $name1}; - let b = default_get! {@interrogate a, $default, $get_fun, $name2}; - b - }}; - - ($get_fun:ident, $default:expr, $name:expr) => {{ - default_get! {@interrogate input, $default, $get_fun, $name} - }}; - } - - macro_rules! c { - ($color:expr, $format:expr) => { - format!("\x1b[{}m{}\x1b[0m", $color, $format) - }; - } - - fn format_bytes(bytes: u64) -> String { - let bytes = Bytes::new(bytes); - bytes.to_string() - } - - fn format_speed(speed: f64) -> String { - let bytes = Bytes::new(speed.floor() as u64); - format!("{}/s", bytes) - } - - let get_title = |add_extension: bool| -> String { - match get! {is_string, as_str, "info_dict", "ext"} { - "vtt" => { - format!( - "Subtitles ({})", - get! {is_string, as_str, "info_dict", "name"} - ) - } - title_extension @ ("webm" | "mp4" | "m4a") => { - if add_extension { - format!( - "{} ({})", - default_get! { as_str, "<No title>", "info_dict", "title"}, - title_extension - ) - } else { - default_get! { as_str, "<No title>", "info_dict", "title"}.to_owned() - } - } - other => panic!("The extension '{}' is not yet implemented", other), - } - }; - - match get! {is_string, as_str, "status"} { - "downloading" => { - let elapsed = default_get! {as_f64, 0.0f64, "elapsed"}; - let eta = default_get! {as_f64, 0.0, "eta"}; - let speed = default_get! {as_f64, 0.0, "speed"}; - - let downloaded_bytes = get! {is_u64, as_u64, "downloaded_bytes"}; - let total_bytes = { - let total_bytes = default_get!(as_u64, 0, "total_bytes"); - if total_bytes == 0 { - let estimate = default_get!(as_u64, 0, "total_bytes_estimate"); - warn!( - "The video does not have a total_byte count, using an estimate of '{}'", - estimate - ); - estimate - } else { - total_bytes - } - }; - let percent: f64 = { - if total_bytes == 0 { - 100.0 - } else { - (downloaded_bytes as f64 / total_bytes as f64) * 100.0 - } - }; - - print!("\x1b[1F"); // Move one line up, to allow the `println` after it to print a newline - print!("\x1b[2K"); // Clear whole line. - print!("\x1b[1G"); // Move cursor to column 1. - - println!( - "'{}' [{}/{} at {}] -> [{}/{} {}]", - c!("34;1", get_title(true)), - c!("33;1", Duration::from(Some(elapsed))), - c!("33;1", Duration::from(Some(eta))), - c!("32;1", format_speed(speed)), - c!("31;1", format_bytes(downloaded_bytes)), - c!("31;1", format_bytes(total_bytes)), - c!("36;1", format!("{:.02}%", percent)) - ); - } - "finished" => { - println!("Finished downloading: '{}'", c!("34;1", get_title(false))) - } - "error" => { - panic!("Error whilst downloading: {}", get_title(true)) - } - other => panic!("{} is not a valid state!", other), - }; - - Ok(()) -} - -pub fn add_hooks<'a>(opts: Bound<'a, PyDict>, py: Python) -> PyResult<Bound<'a, PyDict>> { - if let Some(hooks) = opts.get_item("progress_hooks")? { - let hooks = hooks.downcast::<PyList>()?; - hooks.append(wrap_pyfunction_bound!(progress_hook, py)?)?; - - opts.set_item("progress_hooks", hooks)?; - } else { - // No hooks are set yet - let hooks_list = PyList::new_bound(py, &[wrap_pyfunction_bound!(progress_hook, py)?]); - - opts.set_item("progress_hooks", hooks_list)?; - } - - Ok(opts) -} - -/// `extract_info(self, url, download=True, ie_key=None, extra_info=None, process=True, force_generic_extractor=False)` -/// -/// Extract and return the information dictionary of the URL -/// -/// Arguments: -/// @param url URL to extract -/// -/// Keyword arguments: -/// @param download Whether to download videos -/// @param process Whether to resolve all unresolved references (URLs, playlist items). -/// Must be True for download to work -/// @param ie_key Use only the extractor with this key -/// -/// @param extra_info Dictionary containing the extra values to add to the info (For internal use only) -/// @force_generic_extractor Force using the generic extractor (Deprecated; use ie_key='Generic') -pub async fn extract_info( - yt_dlp_opts: &Map<String, Value>, - url: &Url, - download: bool, - process: bool, -) -> PyResult<InfoJson> { - Python::with_gil(|py| { - let opts = json_map_to_py_dict(yt_dlp_opts, py)?; - - let instance = get_yt_dlp(py, opts)?; - let args = (url.as_str(),); - - let kwargs = PyDict::new_bound(py); - kwargs.set_item("download", download)?; - kwargs.set_item("process", process)?; - - let result = instance.call_method("extract_info", args, Some(&kwargs))?; - - // Remove the `<generator at 0xsome_hex>`, by setting it to null - if !process { - result.set_item("entries", ())?; - } - - let result_str = json_dumps(py, result)?; - - //let mut file = File::create("output.info.json").unwrap(); - //write!(file, "{}", result_str).unwrap(); - - Ok(serde_json::from_str(&result_str) - .expect("Python should be able to produce correct json")) - }) -} - -pub fn unsmuggle_url(smug_url: Url) -> PyResult<Url> { - Python::with_gil(|py| { - let utils = get_yt_dlp_utils(py)?; - let url = utils - .call_method1("unsmuggle_url", (smug_url.as_str(),))? - .downcast::<PyTuple>()? - .get_item(0)?; - - let url: Url = url - .downcast::<PyString>()? - .to_string() - .parse() - .expect("Python should be able to return a valid url"); - - Ok(url) - }) -} - -/// Download a given list of URLs. -/// Returns the paths they were downloaded to. -pub async fn download( - urls: &[Url], - download_options: &Map<String, Value>, -) -> PyResult<Vec<PathBuf>> { - let mut out_paths = Vec::with_capacity(urls.len()); - - for url in urls { - info!("Started downloading url: '{}'", url); - let info_json = extract_info(download_options, url, true, true).await?; - - // Try to work around yt-dlp type weirdness - let result_string = if let Some(filename) = info_json.filename { - filename - } else { - (&info_json.requested_downloads.expect("This must exist")[0].filename).to_owned() - }; - - out_paths.push(result_string); - info!("Finished downloading url: '{}'", url); - } - - Ok(out_paths) -} - -fn json_map_to_py_dict<'a>( - map: &Map<String, Value>, - py: Python<'a>, -) -> PyResult<Bound<'a, PyDict>> { - let json_string = serde_json::to_string(&map).expect("This must always work"); - - let python_dict = json_loads(py, json_string)?; - - Ok(python_dict) -} - -fn json_dumps(py: Python, input: Bound<PyAny>) -> PyResult<String> { - // json.dumps(yt_dlp.sanitize_info(input)) - - let yt_dlp = get_yt_dlp(py, PyDict::new_bound(py))?; - let sanitized_result = yt_dlp.call_method1("sanitize_info", (input,))?; - - let json = PyModule::import_bound(py, "json")?; - let dumps = json.getattr("dumps")?; - - let output = dumps.call1((sanitized_result,))?; - - let output_str = output.extract::<String>()?; - - Ok(output_str) -} - -fn json_loads_str<T: Serialize>(py: Python, input: T) -> PyResult<Bound<PyDict>> { - let string = serde_json::to_string(&input).expect("Correct json must be pased"); - - json_loads(py, string) -} - -fn json_loads(py: Python, input: String) -> PyResult<Bound<PyDict>> { - // json.loads(input) - - let json = PyModule::import_bound(py, "json")?; - let dumps = json.getattr("loads")?; - - let output = dumps.call1((input,))?; - - Ok(output - .downcast::<PyDict>() - .expect("This should always be a PyDict") - .clone()) -} - -fn get_yt_dlp_utils<'a>(py: Python<'a>) -> PyResult<Bound<'a, PyAny>> { - let yt_dlp = PyModule::import_bound(py, "yt_dlp")?; - let utils = yt_dlp.getattr("utils")?; - - Ok(utils) -} -fn get_yt_dlp<'a>(py: Python<'a>, opts: Bound<'a, PyDict>) -> PyResult<Bound<'a, PyAny>> { - // Unconditionally set a logger - let opts = add_logger_and_sig_handler(opts, py)?; - let opts = add_hooks(opts, py)?; - - let yt_dlp = PyModule::import_bound(py, "yt_dlp")?; - let youtube_dl = yt_dlp.call_method1("YoutubeDL", (opts,))?; - - Ok(youtube_dl) -} |