about summary refs log tree commit diff stats
path: root/python_update/raw_update.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rwxr-xr-xpython_update/raw_update.py160
1 files changed, 160 insertions, 0 deletions
diff --git a/python_update/raw_update.py b/python_update/raw_update.py
new file mode 100755
index 0000000..82be0a1
--- /dev/null
+++ b/python_update/raw_update.py
@@ -0,0 +1,160 @@
+#!/usr/bin/env python
+
+# yt - A fully featured command line YouTube client
+#
+# Copyright (C) 2024 Benedikt Peetz <benedikt.peetz@b-peetz.de>
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+# This file is part of Yt.
+#
+# You should have received a copy of the License along with this program.
+# If not, see <https://www.gnu.org/licenses/gpl-3.0.txt>.
+
+# This has been take from the `ytcc` updater code (at `8893bc98428cb78d458a9cf3ded03f519d86a46b`).
+# Source URL: https://github.com/woefe/ytcc/commit/8893bc98428cb78d458a9cf3ded03f519d86a46b
+
+import asyncio
+import itertools
+import json
+import logging
+import sys
+from dataclasses import dataclass
+from functools import partial
+from typing import Any, Iterable, Optional, Tuple, TypeVar
+
+import yt_dlp
+
+
+@dataclass(frozen=True)
+class Playlist:
+    name: str
+    url: str
+    reverse: bool
+
+
+@dataclass(frozen=True)
+class Video:
+    url: str
+    title: str
+    description: str
+    publish_date: float
+    watch_date: Optional[float]
+    duration: float
+    thumbnail_url: Optional[str]
+    extractor_hash: str
+
+    @property
+    def watched(self) -> bool:
+        return self.watch_date is not None
+
+
+logger = logging.getLogger("yt")
+logging.basicConfig(encoding="utf-8", level=logging.DEBUG)
+
+_ytdl_logger = logging.getLogger("yt_dlp")
+_ytdl_logger.propagate = False
+_ytdl_logger.addHandler(logging.NullHandler())
+YTDL_COMMON_OPTS = {"logger": _ytdl_logger}
+
+T = TypeVar("T")
+
+
+def take(amount: int, iterable: Iterable[T]) -> Iterable[T]:
+    """Take the first elements of an iterable.
+
+    If the given iterable has less elements than the given amount, the returned iterable has the
+    same amount of elements as the given iterable. Otherwise the returned iterable has `amount`
+    elements.
+
+    :param amount: The number of elements to take
+    :param iterable: The iterable to take elements from
+    :return: The first elements of the given iterable
+    """
+    for _, elem in zip(range(amount), iterable):
+        yield elem
+
+
+class Fetcher:
+    def __init__(self, max_backlog):
+        self.max_items = max_backlog
+        self.ydl_opts = {
+            **YTDL_COMMON_OPTS,
+            "playliststart": 1,
+            "playlistend": max_backlog,
+            "noplaylist": False,
+            "extractor_args": {"youtubetab": {"approximate_date": [""]}},
+        }
+
+    async def get_unprocessed_entries(self, url: str) -> Iterable[Tuple[str, Any]]:
+        result = []
+        with yt_dlp.YoutubeDL(self.ydl_opts) as ydl:
+            logger.info("Checking playlist '%s'...", url)
+            try:
+                loop = asyncio.get_event_loop()
+                info = await loop.run_in_executor(
+                    None,
+                    partial(ydl.extract_info, url, download=False, process=False),
+                )
+            except yt_dlp.DownloadError as download_error:
+                logger.error(
+                    "Failed to get playlist '%s'. Error was: '%s'",
+                    url,
+                    download_error,
+                )
+            else:
+                entries = info.get("entries", [])
+                for entry in take(self.max_items, entries):
+                    result.append((url, entry))
+        return result
+
+    def _process_ie(self, entry):
+        with yt_dlp.YoutubeDL(self.ydl_opts) as ydl:
+            processed = ydl.process_ie_result(entry, False)
+
+            # walk through the ie_result dictionary to force evaluation of lazily loaded resources
+            repr(processed)
+
+            return processed
+
+    async def process_entry(self, url: str, entry: Any) -> Optional[Any]:
+        try:
+            loop = asyncio.get_event_loop()
+            processed = await loop.run_in_executor(None, self._process_ie, entry)
+        except yt_dlp.DownloadError as download_error:
+            logger.error(
+                "Failed to get a video of playlist '%s'. Error was: '%s'",
+                url,
+                download_error,
+            )
+            return None
+        else:
+            print(json.dumps({url: processed}))
+
+
+class Updater:
+    def __init__(self, max_backlog=20):
+        self.max_items = max_backlog
+        self.fetcher = Fetcher(max_backlog)
+
+    async def update_url(self, url: str):
+        print(f"Updating {url}...", file=sys.stderr)
+        new_entries = await self.fetcher.get_unprocessed_entries(url)
+
+        await asyncio.gather(
+            *itertools.starmap(self.fetcher.process_entry, new_entries)
+        )
+
+    async def do_update(self, urls: Iterable[str]):
+        await asyncio.gather(*map(self.update_url, urls))
+
+    def update(self, urls: Iterable[str]):
+        asyncio.run(self.do_update(urls))
+
+
+def update(max_backlog: int):
+    u = Updater(max_backlog=max_backlog)
+    u.update(sys.argv[2:])
+
+
+max_backlog = int(sys.argv[1])
+update(max_backlog)