Source code for p115client.tool.export_dir

#!/usr/bin/env python3
# encoding: utf-8

__author__ = "ChenyangGao <https://chenyanggao.github.io>"
__all__ = [
    "parse_export_dir_as_dict_iter", "parse_export_dir_as_path_iter", 
    "parse_export_dir_as_patht_iter", "export_dir", "export_dir_result", 
    "export_dir_parse_iter", 
]
__doc__ = "这个模块提供了一些和导出目录树有关的函数"

from asyncio import sleep as async_sleep
from collections.abc import (
    AsyncIterable, AsyncIterator, Callable, Coroutine, Iterable, Iterator, 
)
from functools import partial
from io import BufferedReader, TextIOWrapper
from itertools import count
from os import PathLike
from re import compile as re_compile
from time import sleep, time
from typing import cast, overload, Any, Final, IO, Literal

from asynctools import ensure_async, ensure_aiter
from filewrap import AsyncBufferedReader, AsyncTextIOWrapper
from iterutils import (
    backgroud_loop, context, run_gen_step, run_gen_step_iter, 
    Yield, YieldFrom, 
)
from p115pickcode import to_id

from ..client import check_response, P115Client


CRE_TREE_PREFIX_match: Final = re_compile(r"^(?:\| )+\|-(.*)").match


@overload
def parse_export_dir_as_dict_iter(
    file: bytes | str | PathLike | Iterable[bytes | str], 
    /, 
    encoding: str = "utf-16", 
    close_file: bool = False, 
    *, 
    async_: Literal[False] = False, 
) -> Iterator[dict]:
    ...
@overload
def parse_export_dir_as_dict_iter(
    file: bytes | str | PathLike | Iterable[bytes | str] | AsyncIterable[bytes | str], 
    /, 
    encoding: str = "utf-16", 
    close_file: bool = False, 
    *, 
    async_: Literal[True], 
) -> AsyncIterator[dict]:
    ...
[docs] def parse_export_dir_as_dict_iter( file: bytes | str | PathLike | Iterable[bytes | str] | AsyncIterable[bytes | str], /, encoding: str = "utf-16", close_file: bool = False, *, async_: Literal[False, True] = False, ) -> Iterator[dict] | AsyncIterator[dict]: """解析 115 导出的目录树(可通过 P115Client.fs_export_dir 提交导出任务) :param file: 文件路径、打开的文件或者迭代器(每次返回一行) :param encoding: 字符编码,对字节数据使用,转换为字符串 :param close_file: 结束(包括异常退出)时尝试关闭 `file` :param async_: 是否异步 :return: 把每一行解析为一个字典,迭代返回,格式为 .. code:: python { "key": int, # 序号 "parent_key": int, # 上级目录的序号 "depth": int, # 深度 "name": str, # 名字 } """ if isinstance(file, (bytes, str, PathLike)): file = open(file, encoding=encoding) close_file = True def gen_step(): it = ensure_aiter(file, threaded=True) if async_ else file do_next: Callable = anext if async_ else next # type: ignore stack: list[dict] = [{"key": 0, "parent_key": 0, "depth": 0, "name": ""}] push = stack.append root = yield do_next(it, None) if not root: return try: depth = 0 for i in count(1): line = yield do_next(it) if not isinstance(line, str): line = str(line, encoding) m = CRE_TREE_PREFIX_match(line) if m is None: stack[depth]["name"] += "\n" + line[:-1] continue else: yield Yield(stack[depth]) name = m[1] depth = (len(line) - len(name)) // 2 - 1 item = { "key": i, "parent_key": stack[depth-1]["key"], "depth": depth, "name": name, } try: stack[depth] = item except IndexError: push(item) except (StopIteration, StopAsyncIteration): if depth: yield Yield(stack[depth]) finally: if close_file: if async_: if callable(aclose := getattr(file, "aclose", None)): yield aclose() elif callable(close := getattr(file, "close", None)): yield ensure_async(close, threaded=True) elif callable(close := getattr(file, "close", None)): close() return run_gen_step_iter(gen_step, async_)
@overload def parse_export_dir_as_path_iter( file: bytes | str | PathLike | Iterable[bytes | str], /, encoding: str = "utf-16", escape: None | bool | Callable[[str], str] = True, close_file: bool = False, *, async_: Literal[False] = False, ) -> Iterator[str]: ... @overload def parse_export_dir_as_path_iter( file: bytes | str | PathLike | Iterable[bytes | str] | AsyncIterable[bytes | str], /, encoding: str = "utf-16", escape: None | bool | Callable[[str], str] = True, close_file: bool = False, *, async_: Literal[True], ) -> AsyncIterator[str]: ...
[docs] def parse_export_dir_as_path_iter( file: bytes | str | PathLike | Iterable[bytes | str] | AsyncIterable[bytes | str], /, encoding: str = "utf-16", escape: None | bool | Callable[[str], str] = True, close_file: bool = False, *, async_: Literal[False, True] = False, ) -> Iterator[str] | AsyncIterator[str]: """解析 115 导出的目录树(可通过 P115Client.fs_export_dir 提交导出任务) :param file: 文件路径、打开的文件或者迭代器(每次返回一行) :param encoding: 字符编码,对字节数据使用,转换为字符串 :param escape: 对文件名进行转义 - 如果为 None,则不处理;否则,这个函数用来对文件名中某些符号进行转义,例如 "/" 等 - 如果为 True,则使用 `posixpatht.escape`,会对文件名中 "/",或单独出现的 "." 和 ".." 用 "\\" 进行转义 - 如果为 False,则使用 `posix_escape_name` 函数对名字进行转义,会把文件名中的 "/" 转换为 "|" - 如果为 Callable,则用你所提供的调用,以或者转义后的名字 :param close_file: 结束(包括异常退出)时尝试关闭 `file` :param async_: 是否异步 :return: 把每一行解析为一个路径,并逐次迭代返回 """ if isinstance(file, (bytes, str, PathLike)): file = open(file, encoding=encoding) close_file = True if isinstance(escape, bool): if escape: from posixpatht import escape else: from p115client.util import posix_escape_name as escape escape = cast(None | Callable[[str], str], escape) def gen_step(): it = ensure_aiter(file, threaded=True) if async_ else file do_next: Callable = anext if async_ else next # type: ignore root = yield do_next(it, None) if not root: return if not isinstance(root, str): root = str(root, encoding) root = root.removesuffix("\n")[3:] if root == "根目录": stack = [""] root = "/" else: if escape is not None: root = escape(root) root = "/" + root stack = [root] push = stack.append try: depth = 0 while True: line = yield do_next(it) if not isinstance(line, str): line = str(line, encoding) m = CRE_TREE_PREFIX_match(line) if m is None: stack[depth] += "\n" + line[:-1] continue elif depth: yield Yield(stack[depth]) else: yield Yield(root) name = m[1] depth = (len(line) - len(name)) // 2 - 1 if escape is not None: name = escape(name) path = stack[depth-1] + "/" + name try: stack[depth] = path except IndexError: push(path) except (StopIteration, StopAsyncIteration): if depth: yield Yield(stack[depth]) finally: if close_file: if async_: if callable(aclose := getattr(file, "aclose", None)): yield aclose() elif callable(close := getattr(file, "close", None)): yield ensure_async(close, threaded=True) elif callable(close := getattr(file, "close", None)): close() return run_gen_step_iter(gen_step, async_)
@overload def parse_export_dir_as_patht_iter( file: bytes | str | PathLike | Iterable[bytes | str], /, encoding: str = "utf-16", close_file: bool = False, *, async_: Literal[False] = False, ) -> Iterator[list[str]]: ... @overload def parse_export_dir_as_patht_iter( file: bytes | str | PathLike | Iterable[bytes | str] | AsyncIterable[bytes | str], /, encoding: str = "utf-16", close_file: bool = False, *, async_: Literal[True], ) -> AsyncIterator[list[str]]: ...
[docs] def parse_export_dir_as_patht_iter( file: bytes | str | PathLike | Iterable[bytes | str] | AsyncIterable[bytes | str], /, encoding: str = "utf-16", close_file: bool = False, *, async_: Literal[False, True] = False, ) -> Iterator[list[str]] | AsyncIterator[list[str]]: """解析 115 导出的目录树(可通过 P115Client.fs_export_dir 提交导出任务) :param file: 文件路径、打开的文件或者迭代器(每次返回一行) :param encoding: 字符编码,对字节数据使用,转换为字符串 :param close_file: 结束(包括异常退出)时尝试关闭 `file` :param async_: 是否异步 :return: 把每一行解析为一个名字列表,并逐次迭代返回 """ if isinstance(file, (bytes, str, PathLike)): file = open(file, encoding=encoding) close_file = True def gen_step(): it = ensure_aiter(file, threaded=True) if async_ else file do_next: Callable = anext if async_ else next # type: ignore root = yield do_next(it, None) if not root: return if not isinstance(root, str): root = str(root, encoding) root = root.removesuffix("\n")[3:] stack = [""] from_top_root = root == "根目录" if not from_top_root: stack.append(root) push = stack.append try: depth = len(stack) - 1 while True: line = yield do_next(it) if not isinstance(line, str): line = str(line, encoding) m = CRE_TREE_PREFIX_match(line) if m is None: stack[depth] += "\n" + line[:-1] continue else: yield Yield(stack[:depth+1]) name = m[1] depth = (len(line) - len(name)) // 2 - from_top_root try: stack[depth] = name except IndexError: push(name) except (StopIteration, StopAsyncIteration): if depth: yield Yield(stack[:depth+1]) finally: if close_file: if async_: if callable(aclose := getattr(file, "aclose", None)): yield aclose() elif callable(close := getattr(file, "close", None)): yield ensure_async(close, threaded=True) elif callable(close := getattr(file, "close", None)): close() return run_gen_step_iter(gen_step, async_)
@overload def export_dir( client: str | PathLike | P115Client, export_file_ids: int | str | Iterable[int | str], target_pid: int | str = 0, layer_limit: int = 0, *, async_: Literal[False] = False, **request_kwargs, ) -> int: ... @overload def export_dir( client: str | PathLike | P115Client, export_file_ids: int | str | Iterable[int | str], target_pid: int | str = 0, layer_limit: int = 0, *, async_: Literal[True], **request_kwargs, ) -> Coroutine[Any, Any, int]: ...
[docs] def export_dir( client: str | PathLike | P115Client, export_file_ids: int | str | Iterable[int | str], target_pid: int | str = 0, layer_limit: int = 0, *, async_: Literal[False, True] = False, **request_kwargs, ) -> int | Coroutine[Any, Any, int]: """导出目录树 :param client: 115 客户端或 cookies :param export_file_ids: 待导出的目录 id 或 pickcode :param target_pid: 导出到的目标目录 id 或 pickcode :param layer_limit: 层级深度,小于等于 0 时不限 :param async_: 是否异步 :param request_kwargs: 其它请求参数 :return: 返回任务 id,可用 `P115Client.fs_export_dir_status` 查询进度 """ if not isinstance(client, P115Client): client = P115Client(client, check_for_relogin=True) def gen_step(): nonlocal export_file_ids, target_pid if isinstance(export_file_ids, int): pass elif isinstance(export_file_ids, str): if "," not in export_file_ids: export_file_ids = to_id(export_file_ids) else: export_file_ids = ",".join(str(to_id(eid)) for eid in export_file_ids) target_pid = to_id(target_pid) payload = {"file_ids": export_file_ids, "target": f"U_0_{target_pid}"} if layer_limit > 0: payload["layer_limit"] = layer_limit resp = yield client.fs_export_dir(payload, async_=async_, **request_kwargs) return check_response(resp)["data"]["export_id"] return run_gen_step(gen_step, async_)
@overload def export_dir_result( client: str | PathLike | P115Client, export_id: int | str, timeout: None | int | float = None, check_interval: int | float = 1, *, async_: Literal[False] = False, **request_kwargs, ) -> dict: ... @overload def export_dir_result( client: str | PathLike | P115Client, export_id: int | str, timeout: None | int | float = None, check_interval: int | float = 1, *, async_: Literal[True], **request_kwargs, ) -> Coroutine[Any, Any, dict]: ...
[docs] def export_dir_result( client: str | PathLike | P115Client, export_id: int | str, timeout: None | int | float = None, check_interval: int | float = 1, *, async_: Literal[False, True] = False, **request_kwargs, ) -> dict | Coroutine[Any, Any, dict]: """获取导出目录树的结果 .. attention:: 如果指定超时时间为正数,则会在过期时抛出 TimeoutError,但这并不会取消远程正在执行的任务,而 115 同时只允许运行一个导出目录树的任务,所以如果要开始下一个导出任务,还需要此任务完成或者被 115 自动超时取消 :param client: 115 客户端或 cookies :param export_id: 任务 id,由 `P115Client.fs_export_dir` 接口调用产生 :param timeout: 超时秒数,如果为 None 或 小于等于 0,则相当于 float("inf"),即永不超时 :param check_interval: 两次轮询之间的等待秒数,如果 <= 0,则不等待 :param async_: 是否异步 :param request_kwargs: 其它请求参数 :return: 接口返回结果,格式为 .. code:: python { "export_id": str, # 任务 id "file_id": str, # 导出文件的 id "file_name": str, # 导出文件的名字 "pick_code": str # 导出文件的提取码 } """ if not isinstance(client, P115Client): client = P115Client(client, check_for_relogin=True) if check_interval < 0: check_interval = 0 def gen_step(): nonlocal timeout if timeout is None or timeout <= 0: timeout = float("inf") do_sleep: Callable = async_sleep if async_ else sleep # type: ignore expired_t = time() + timeout while True: resp = yield client.fs_export_dir_status( export_id, async_=async_, **request_kwargs, ) if data := check_response(resp)["data"]: return data remaining_seconds = expired_t - time() if remaining_seconds <= 0: raise TimeoutError(export_id) if check_interval: yield do_sleep(min(check_interval, remaining_seconds)) return run_gen_step(gen_step, async_)
@overload def export_dir_parse_iter( client: str | PathLike | P115Client, export_file_ids: int | str | Iterable[int | str] = 0, export_id: int | str = 0, target_pid: int | str = 0, layer_limit: int = 0, parse_iter: None | Callable[[IO[bytes]], Iterator] = None, delete: bool = True, timeout: None | int | float = None, check_interval: int | float = 1, show_clock: bool | Callable[[], Any] = False, clock_interval: int | float = 0.05, *, async_: Literal[False] = False, **request_kwargs, ) -> Iterator: ... @overload def export_dir_parse_iter( client: str | PathLike | P115Client, export_file_ids: int | str | Iterable[int | str] = 0, export_id: int | str = 0, target_pid: int | str = 0, layer_limit: int = 0, parse_iter: None | Callable[[IO[bytes]], AsyncIterator] = None, delete: bool = True, timeout: None | int | float = None, check_interval: int | float = 1, show_clock: bool | Callable[[], Any] = False, clock_interval: int | float = 0.05, *, async_: Literal[True], **request_kwargs, ) -> AsyncIterator: ...
[docs] def export_dir_parse_iter( client: str | PathLike | P115Client, export_file_ids: int | str | Iterable[int | str] = 0, export_id: int | str = 0, target_pid: int | str = 0, layer_limit: int = 0, parse_iter: None | Callable[[IO[bytes]], Iterator] | Callable[[IO[bytes]], AsyncIterator] = None, delete: bool = True, timeout: None | int | float = None, check_interval: int | float = 1, show_clock: bool | Callable[[], Any] = False, clock_interval: int | float = 0.05, *, async_: Literal[False, True] = False, **request_kwargs, ) -> Iterator | AsyncIterator: """导出目录树到文件,读取文件并解析后返回迭代器,关闭后自动删除导出的文件 :param client: 115 客户端或 cookies :param export_file_ids: 待导出的目录 id、pickcode 或 路径(如果有多个,需传入可迭代对象) :param export_id: 优先级高于 `export_file_ids`,之前提交的 `export_dir` 任务的 id,如果是 str,则视为导出的目录树文件的提取码(因此无需导出) :param target_pid: 导出到的目标目录 id 或 pickcode :param layer_limit: 层级深度,小于等于 0 时不限 :param parse_iter: 解析打开的二进制文件,返回可迭代对象 :param delete: 最终删除目录树文件(如果 export_id 为 str(即提取码),则这个值不生效,必不删除) :param timeout: 导出任务的超时秒数,如果为 None 或 小于等于 0,则相当于 float("inf"),即永不超时 :param check_interval: 导出任务的状态,两次轮询之间的等待秒数,如果 <= 0,则不等待 :param show_clock: 是否在等待导出目录树时,显示时钟。如果为 True,则显示默认的时钟,如果为 Callable,则作为自定义时钟进行调用(无参数) :param clock_interval: 更新时钟的时间间隔 :param async_: 是否异步 :param request_kwargs: 其它请求参数 :return: 解析导出文件的迭代器 """ if not isinstance(client, P115Client): client = P115Client(client, check_for_relogin=True) if parse_iter is None: if async_: parse_iter = partial(parse_export_dir_as_path_iter, async_=True) else: parse_iter = parse_export_dir_as_path_iter get_url = client.download_url def gen_step(): nonlocal export_id, delete if not export_id: export_id = yield export_dir( client, export_file_ids=export_file_ids, target_pid=target_pid, layer_limit=layer_limit, async_=async_, **request_kwargs, ) if isinstance(export_id, int): if not show_clock: result: dict = yield export_dir_result( client, export_id, timeout=timeout, check_interval=check_interval, async_=async_, **request_kwargs, ) else: result = yield context( lambda *_: export_dir_result( client, export_id, timeout=timeout, check_interval=check_interval, async_=async_, **request_kwargs, ), backgroud_loop( None if show_clock is True else show_clock, interval=clock_interval, async_=async_, # type: ignore ), async_=async_, # type: ignore ) pickcode = result["pick_code"] else: pickcode = export_id delete = False try: try: url: str = yield get_url( pickcode, use_web_api=True, async_=async_, **request_kwargs, ) except OSError: url = yield get_url( pickcode, async_=async_, **request_kwargs, ) file = client.open(url, async_=async_) # type: ignore try: if async_: file_wrapper: IO = AsyncTextIOWrapper(AsyncBufferedReader(file), encoding="utf-16", newline="\n") else: file_wrapper = TextIOWrapper(BufferedReader(file), encoding="utf-16", newline="\n") yield YieldFrom(parse_iter(file_wrapper)) # type: ignore finally: if async_: if callable(aclose := getattr(file, "aclose", None)): yield aclose() elif callable(close := getattr(file, "close", None)): yield ensure_async(close, threaded=True) elif callable(close := getattr(file, "close", None)): close() finally: if delete: yield client.fs_delete( cast(str, result["file_id"]), async_=async_, # type: ignore **request_kwargs, ) return run_gen_step_iter(gen_step, async_)