diff options
Diffstat (limited to '')
-rwxr-xr-x | python_update/raw_update.py | 160 |
1 files changed, 160 insertions, 0 deletions
diff --git a/python_update/raw_update.py b/python_update/raw_update.py new file mode 100755 index 0000000..82be0a1 --- /dev/null +++ b/python_update/raw_update.py @@ -0,0 +1,160 @@ +#!/usr/bin/env python + +# yt - A fully featured command line YouTube client +# +# Copyright (C) 2024 Benedikt Peetz <benedikt.peetz@b-peetz.de> +# SPDX-License-Identifier: GPL-3.0-or-later +# +# This file is part of Yt. +# +# You should have received a copy of the License along with this program. +# If not, see <https://www.gnu.org/licenses/gpl-3.0.txt>. + +# This has been take from the `ytcc` updater code (at `8893bc98428cb78d458a9cf3ded03f519d86a46b`). +# Source URL: https://github.com/woefe/ytcc/commit/8893bc98428cb78d458a9cf3ded03f519d86a46b + +import asyncio +import itertools +import json +import logging +import sys +from dataclasses import dataclass +from functools import partial +from typing import Any, Iterable, Optional, Tuple, TypeVar + +import yt_dlp + + +@dataclass(frozen=True) +class Playlist: + name: str + url: str + reverse: bool + + +@dataclass(frozen=True) +class Video: + url: str + title: str + description: str + publish_date: float + watch_date: Optional[float] + duration: float + thumbnail_url: Optional[str] + extractor_hash: str + + @property + def watched(self) -> bool: + return self.watch_date is not None + + +logger = logging.getLogger("yt") +logging.basicConfig(encoding="utf-8", level=logging.DEBUG) + +_ytdl_logger = logging.getLogger("yt_dlp") +_ytdl_logger.propagate = False +_ytdl_logger.addHandler(logging.NullHandler()) +YTDL_COMMON_OPTS = {"logger": _ytdl_logger} + +T = TypeVar("T") + + +def take(amount: int, iterable: Iterable[T]) -> Iterable[T]: + """Take the first elements of an iterable. + + If the given iterable has less elements than the given amount, the returned iterable has the + same amount of elements as the given iterable. Otherwise the returned iterable has `amount` + elements. + + :param amount: The number of elements to take + :param iterable: The iterable to take elements from + :return: The first elements of the given iterable + """ + for _, elem in zip(range(amount), iterable): + yield elem + + +class Fetcher: + def __init__(self, max_backlog): + self.max_items = max_backlog + self.ydl_opts = { + **YTDL_COMMON_OPTS, + "playliststart": 1, + "playlistend": max_backlog, + "noplaylist": False, + "extractor_args": {"youtubetab": {"approximate_date": [""]}}, + } + + async def get_unprocessed_entries(self, url: str) -> Iterable[Tuple[str, Any]]: + result = [] + with yt_dlp.YoutubeDL(self.ydl_opts) as ydl: + logger.info("Checking playlist '%s'...", url) + try: + loop = asyncio.get_event_loop() + info = await loop.run_in_executor( + None, + partial(ydl.extract_info, url, download=False, process=False), + ) + except yt_dlp.DownloadError as download_error: + logger.error( + "Failed to get playlist '%s'. Error was: '%s'", + url, + download_error, + ) + else: + entries = info.get("entries", []) + for entry in take(self.max_items, entries): + result.append((url, entry)) + return result + + def _process_ie(self, entry): + with yt_dlp.YoutubeDL(self.ydl_opts) as ydl: + processed = ydl.process_ie_result(entry, False) + + # walk through the ie_result dictionary to force evaluation of lazily loaded resources + repr(processed) + + return processed + + async def process_entry(self, url: str, entry: Any) -> Optional[Any]: + try: + loop = asyncio.get_event_loop() + processed = await loop.run_in_executor(None, self._process_ie, entry) + except yt_dlp.DownloadError as download_error: + logger.error( + "Failed to get a video of playlist '%s'. Error was: '%s'", + url, + download_error, + ) + return None + else: + print(json.dumps({url: processed})) + + +class Updater: + def __init__(self, max_backlog=20): + self.max_items = max_backlog + self.fetcher = Fetcher(max_backlog) + + async def update_url(self, url: str): + print(f"Updating {url}...", file=sys.stderr) + new_entries = await self.fetcher.get_unprocessed_entries(url) + + await asyncio.gather( + *itertools.starmap(self.fetcher.process_entry, new_entries) + ) + + async def do_update(self, urls: Iterable[str]): + await asyncio.gather(*map(self.update_url, urls)) + + def update(self, urls: Iterable[str]): + asyncio.run(self.do_update(urls)) + + +def update(max_backlog: int): + u = Updater(max_backlog=max_backlog) + u.update(sys.argv[2:]) + + +max_backlog = int(sys.argv[1]) +update(max_backlog) |