from typing import Type import os import shutil import sys import urllib.parse from collections import defaultdict from PyQt5.QtWidgets import QApplication from PyQt5.QtQml import qmlRegisterType, QQmlApplicationEngine from PyQt5.QtQuick import QQuickImageProvider from PyQt5.QtCore import QObject, pyqtProperty, pyqtSlot from PyQt5.QtGui import QImage from .qtmpv import MpvObject import requests import toml class Episode(QObject): def __init__(self, source, parent=None): super().__init__(parent) self._source = source @pyqtProperty("QString", constant=True) def title(self): return self._source["Title"] @pyqtProperty(int, constant=True) def season(self): return self._source["Season"] @pyqtProperty(int, constant=True) def episode(self): return self._source["Episode"] @pyqtProperty("QString", constant=True) def filename(self): return self._source["OriginalFilename"] class Show(QObject): def __init__(self, source, episodes, parent=None): super().__init__(parent) self._source = source self._episodes = episodes @pyqtProperty("QString", constant=True) def title(self) -> str: return self._source["title"] @pyqtProperty(int, constant=True) def year(self) -> int: return self._source["year"] @pyqtProperty("QString", constant=True) def description(self) -> str: return self._source["description"] @pyqtProperty(int, constant=True) def watched(self) -> int: return self._source["watched"] @pyqtProperty("QString", constant=True) def poster(self) -> str: return self._source["poster"] @pyqtProperty(list, constant=True) def episodes(self) -> list[Episode]: return self._episodes class ProviderImageProvider(QQuickImageProvider): def __init__(self, icon_data): self._icon_data = icon_data super(ProviderImageProvider, self).__init__(QQuickImageProvider.Image) def requestImage(self, p_str, size): print(p_str) print(size) import base64 data: bytes = base64.b64decode(self._icon_data) img = QImage.fromData(data) return img, img.size() def getUrl(base: str, path: str) -> dict: url: str = urllib.parse.urljoin(base, path) r: requests.Response = requests.get(url) return r.json() class Provider(QObject): def __init__(self, url: str, data_dir: str, parent=None): super().__init__(parent) self.url: str = url describe: dict = getUrl(self.url, "describe") self._name: str = describe["name"] self.logo_provider: ProviderImageProvider = ProviderImageProvider( describe["icon"] ) # Create dictionary of show episodes episodes: dict = getUrl(self.url, "episodes") _episodes: defaultdict[str, list[Episode]] = defaultdict(lambda: []) for e in episodes["episodes"]: _episodes[e["ShowTitle"]].append(Episode(e)) shows: dict = getUrl(self.url, "shows") # Create image_cache directory provider_data_dir: str = os.path.join( data_dir, "image_cache", "providers", self._name ) os.makedirs(provider_data_dir, exist_ok=True) # Cache show posters for s in shows["data"]: # Get local cache path poster_url: str = s["poster"] poster_path: str = urllib.parse.urlparse(poster_url).path poster_ext: str = os.path.splitext(poster_path)[1] download_path: str = os.path.join( provider_data_dir, str(s["id"]) + poster_ext ) # Download poster try: with open(download_path, "xb") as f: r = requests.get(poster_url, stream=True) if r.status_code == 200: r.raw.decode_content = True shutil.copyfileobj(r.raw, f) except FileExistsError: pass # Overwrite poster URL with local path s["poster"] = download_path self._shows: dict[int, Show] = { e["id"]: Show(e, _episodes[e["title"]]) for e in shows["data"] } recently_added: dict = getUrl(self.url, "recently_added") self._recently_added: list[int] = recently_added["data"] in_progress: dict = getUrl(self.url, "in_progress") self._in_progress: list[int] = in_progress["data"] @pyqtProperty("QString", constant=True) def name(self) -> str: return self._name @pyqtProperty("QString", constant=True) def logo(self) -> str: return f"image://{self._name}/logo" @pyqtSlot(int, result=QObject) def getShow(self, id) -> Show: return self._shows[id] @pyqtProperty(list, constant=True) def showsAlphabetic(self) -> list[int]: return [ elem[0] for elem in sorted( [(id, show) for id, show in self._shows.items()], key=lambda elem: elem[1].title, ) ] @pyqtProperty(list, constant=True) def recentlyAdded(self) -> list[int]: return self._recently_added @pyqtProperty(list, constant=True) def inProgress(self) -> list[int]: return self._in_progress class DataSource: def __init__(self, providers: list[str], data_dir: str): self.providers: list[Provider] = [Provider(url, data_dir) for url in providers] def DatabaseType(data_source) -> Type: class Database(QObject): def __init__(self, parent=None): super().__init__(parent) self.data_source: DataSource = data_source @pyqtProperty(list, constant=True) def Providers(self) -> list[Provider]: return self.data_source.providers return Database def load_config() -> list[str]: try: config_dir: str = os.path.join(os.environ["XDG_CONFIG_HOME"], "ikinuki") except: config_dir: str = os.path.join(os.environ["HOME"], ".config", "ikinuki") os.makedirs(config_dir, exist_ok=True) config_file: str = os.path.join(config_dir, "client.toml") try: config: dict = toml.load(config_file) except FileNotFoundError: print(f'Config file not found at "{config_file}"') print("Writing example config file. Please update and relaunch.") default_config = """# [[backends]] # address = "127.0.0.1" # port = 32520""" with open(config_file, "w") as f: f.write(default_config) sys.exit(-1) return [f'http://{b["address"]}:{b["port"]}/' for b in config["backends"]] def get_data_dir() -> str: try: data_dir: str = os.path.join(os.environ["XDG_DATA_HOME"], "ikinuki") except: data_dir: str = os.path.join(os.environ["HOME"], ".local", "share", "ikinuki") os.makedirs(data_dir, exist_ok=True) return data_dir def main(): app = QApplication(sys.argv) qmlRegisterType(MpvObject, "Ikinuki.Client", 1, 0, "Mpv") import locale locale.setlocale(locale.LC_NUMERIC, "C") try: backends: list[str] = load_config() except Exception as e: print(f"ERROR: Could not load config file: {repr(e)}") sys.exit(-1) data_dir: str = get_data_dir() data_source = DataSource(backends, data_dir) qmlRegisterType(DatabaseType(data_source), "Ikinuki.Client", 1, 0, "Database") engine = QQmlApplicationEngine() for provider in data_source.providers: print(f"Registering provider with name {provider._name}") engine.addImageProvider( provider._name, provider.logo_provider, ) engine.load("layouts/ikinuki-default.qml") win = QObject() win = engine.rootObjects()[0] win.show() sys.exit(app.exec_())