Source code for p115client.fs.fs_base

#!/usr/bin/env python3
# encoding: utf-8

from __future__ import annotations

__all__ = ["IDOrPathType", "P115PathBase", "P115FileSystemBase"]

from abc import ABC, abstractmethod
from asyncio import Lock as AsyncLock
from collections.abc import (
    AsyncIterable, AsyncIterator, Awaitable, Callable, Coroutine, 
    Iterable, Iterator, Mapping, MutableMapping, Sequence, 
)
from functools import partial
from io import BufferedReader, TextIOWrapper
from mimetypes import guess_type
from operator import itemgetter
from os import path as ospath, fsdecode, stat_result, PathLike
from posixpath import splitext
from re import compile as re_compile, escape as re_escape
from stat import S_IFDIR, S_IFREG
from threading import Lock
from time import time
from typing import (
    cast, overload, Any, ClassVar, Final, Literal, Self, TypedDict, 
)
from weakref import WeakValueDictionary

from cachedict import LRUDict
from download import download, download_async
from ed2k import ed2k_hash, ed2k_hash_async
from errno2 import errno
from filewrap import AsyncBufferedReader, AsyncTextIOWrapper
from glob_pattern import translate_iter
from hashtools import (
    HashObj, file_digest, file_mdigest, file_digest_async, file_mdigest_async, 
)
from httpfile import AsyncHTTPFileReader, HTTPFileReader
from id2dirnode import IdToDirnode
from iterdir import iterdir_generic, walk_generic
from iterutils import (
    map as do_map, run_gen_step, run_gen_step_iter, with_iter_next, 
    Yield, YieldFrom, 
)
from posixpatht import escape, joinpath, joins, normpath, path_is_dir_form, splits
from undefined import undefined, is_undefined, Undefined

from ..client import P115Client
from ..exception import throw
from ..type import P115URL


type IDOrPathType = int | str | Sequence[str] | Mapping | P115PathBase

ED2K_NAME_TRANSTAB: Final = dict(zip(b"/|", ("%2F", "%7C")))


class AncestorDict(TypedDict):
    id: int
    parent_id: int
    name: str


class P115PathBase:
    __slots__ = ("fs", "attr")

    def __init__(
        self, 
        /, 
        fs: P115FileSystemBase, 
        attr: int | str | Mapping, 
    ):
        self.fs = fs
        if isinstance(attr, int):
            attr = {"id": attr}
        elif isinstance(attr, str):
            attr = {"path": attr}
        else:
            assert "id" in attr or "path" in attr or "parent_id" in attr and "name" in attr
        self.attr = attr

    def __contains__(self, child: int | str, /) -> bool:
        """判断是否有某个子节点
        """
        return self.has_child(child)

    def __eq__(self, path, /) -> bool:
        return type(self) is type(path) and self.fs == path.fs and self.id == path.id

    def __fspath__(self, /) -> str:
        return self.path

    def __getattr__(self, attr, /):
        try:
            return self[attr]
        except KeyError as e:
            raise AttributeError(attr) from e

    def __getitem__(self, key, /):
        try:
            return self.attr[key]
        except KeyError:
            if key in ("id", "parent_id", "name", "is_dir"):
                return self.get_attr()[key]
            raise

    def __hash__(self, /) -> int:
        return id(self)

    def __aiter__(self, /) -> AsyncIterator[Self]:
        """获取所有子节点
        """
        return self.iterdir(async_=True)

    def __iter__(self, /) -> Iterator[Self]:
        """获取所有子节点
        """
        return self.iterdir()

    def __len__(self, /) -> int:
        """如果是文件,则返回其大小,如果是目录,则返回其中文件数
        """
        return self.get_length()

    def __repr__(self, /) -> str:
        cls = type(self)
        return f"{cls.__module__}.{cls.__qualname__}(fs={self.fs!r}, attr={self.attr!r})"

    def __str__(self, /) -> str:
        return self.path

    def __truediv__(self, path: str, /) -> Self:
        return self.joinpath(path)

    @property
    def id(self, /) -> int:
        """
        """
        return self["id"]

    @property
    def ancestors(self, /) -> list[AncestorDict]:
        """
        """
        return self.get_ancestors()

    @property
    def media_type(self, /) -> None | str:
        """
        """
        if self.is_dir():
            return None
        return guess_type(self.name)[0] or "application/octet-stream"

    @property
    def parent(self, /) -> Self:
        """
        """
        return self.get_parent()

    @property
    def parents(self, /) -> list[Self]:
        """
        """
        return self.get_parents()

    @property
    def parts(self, /) -> list[str]:
        """
        """
        return ["/", *map(escape, self.patht[1:])]

    @property
    def path(self, /) -> str:
        """
        """
        return self.get_path()

    @property
    def patht(self, /) -> list[str]:
        """
        """
        return self.get_patht()

    @property
    def root(self, /) -> Self:
        """
        """
        return type(self)(self.fs, {"id": 0})

    @property
    def stem(self, /) -> str:
        """
        """
        return splitext(self.name)[0]

    @property
    def suffix(self, /) -> str:
        """
        """
        return splitext(self.name)[1]

    @property
    def suffixes(self, /) -> list[str]:
        """
        """
        return ["." + part for part in self.name.lstrip(".").split(".")]

    @overload
    def download(
        self, 
        /, 
        path: bytes | str | PathLike = "", 
        mode: Literal["a", "w", "x", "i"] = "a", 
        refresh: None | bool = None, 
        *, 
        async_: Literal[False] = False, 
        **request_kwargs, 
    ) -> tuple[str, int]:
        ...
    @overload
    def download(
        self, 
        /, 
        path: bytes | str | PathLike = "", 
        mode: Literal["a", "w", "x", "i"] = "a", 
        refresh: None | bool = None, 
        *, 
        async_: Literal[True], 
        **request_kwargs, 
    ) -> Coroutine[Any, Any, tuple[str, int]]:
        ...
    def download(
        self, 
        /, 
        path: bytes | str | PathLike = "", 
        mode: Literal["a", "w", "x", "i"] = "a", 
        refresh: None | bool = None, 
        *, 
        async_: Literal[False, True] = False, 
        **request_kwargs, 
    ) -> tuple[str, int] | Coroutine[Any, Any, tuple[str, int]]:
        return self.fs.download(
            self, 
            path=path, 
            mode=mode, 
            refresh=refresh, 
            async_=async_, 
            **request_kwargs, 
        )

    @overload
    def exists(
        self, 
        /, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[False] = False, 
        **request_kwargs, 
    ) -> bool:
        ...
    @overload
    def exists(
        self, 
        /, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[True], 
        **request_kwargs, 
    ) -> Coroutine[Any, Any, bool]:
        ...
    def exists(
        self, 
        /, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[False, True] = False, 
        **request_kwargs, 
    ) -> bool | Coroutine[Any, Any, bool]:
        return self.fs.exists(
            self, 
            refresh=refresh, 
            async_=async_, 
            **request_kwargs, 
        )

    @overload
    def get_attr(
        self, 
        /, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[False] = False, 
        **request_kwargs, 
    ) -> Mapping:
        ...
    @overload
    def get_attr(
        self, 
        /, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[True], 
        **request_kwargs, 
    ) -> Coroutine[Any, Any, Mapping]:
        ...
    def get_attr(
        self, 
        /, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[False, True] = False, 
        **request_kwargs, 
    ) -> Mapping | Coroutine[Any, Any, Mapping]:
        def gen_step():
            attr = yield self.fs.get_attr(
                self, 
                refresh=refresh, 
                async_=async_, 
                **request_kwargs, 
            )
            self.attr = attr
            return attr
        return run_gen_step(gen_step, async_)

    @overload
    def get_ancestors(
        self, 
        /, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[False] = False, 
        **request_kwargs, 
    ) -> list[AncestorDict]:
        ...
    @overload
    def get_ancestors(
        self, 
        /, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[True], 
        **request_kwargs, 
    ) -> Coroutine[Any, Any, list[AncestorDict]]:
        ...
    def get_ancestors(
        self, 
        /, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[False, True] = False, 
        **request_kwargs, 
    ) -> list[AncestorDict] | Coroutine[Any, Any, list[AncestorDict]]:
        return self.fs.get_ancestors(
            self, 
            refresh=refresh, 
            async_=async_, 
            **request_kwargs, 
        )

    @overload
    def get_length(
        self, 
        /, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[False] = False, 
        **request_kwargs, 
    ) -> int:
        ...
    @overload
    def get_length(
        self, 
        /, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[True], 
        **request_kwargs, 
    ) -> Coroutine[Any, Any, int]:
        ...
    def get_length(
        self, 
        /, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[False, True] = False, 
        **request_kwargs, 
    ) -> int | Coroutine[Any, Any, int]:
        def gen_step():
            attr = yield self.get_attr(
                refresh=False, 
                async_=async_, 
                **request_kwargs, 
            )
            if attr["is_dir"]:
                return self.fs.dirlen(
                    self, 
                    refresh=refresh, 
                    async_=async_, 
                    **request_kwargs, 
                )
            else:
                return attr["size"]
        return run_gen_step(gen_step, async_)

    @overload
    def get_parent(
        self, 
        /, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[False] = False, 
        **request_kwargs, 
    ) -> Self:
        ...
    @overload
    def get_parent(
        self, 
        /, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[True], 
        **request_kwargs, 
    ) -> Coroutine[Any, Any, Self]:
        ...
    def get_parent(
        self, 
        /, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[False, True] = False, 
        **request_kwargs, 
    ) -> Self | Coroutine[Any, Any, Self]:
        def gen_step():
            parent_id = yield self.fs.get_parent_id(
                self, 
                refresh=refresh, 
                async_=async_, 
                **request_kwargs, 
            )
            return type(self)(self.fs, {"id": parent_id})
        return run_gen_step(gen_step, async_)

    @overload
    def get_parents(
        self, 
        /, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[False] = False, 
        **request_kwargs, 
    ) -> list[Self]:
        ...
    @overload
    def get_parents(
        self, 
        /, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[True], 
        **request_kwargs, 
    ) -> Coroutine[Any, Any, list[Self]]:
        ...
    def get_parents(
        self, 
        /, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[False, True] = False, 
        **request_kwargs, 
    ) -> list[Self] | Coroutine[Any, Any, list[Self]]:
        def gen_step():
            ancestors: list[AncestorDict] = yield self.get_ancestors(
                refresh=refresh, 
                async_=async_, 
                **request_kwargs, 
            )
            cls = type(self)
            fs = self.fs
            return [cls(fs, {"id": a["id"]}) for a in reversed(ancestors[:-1])]
        return run_gen_step(gen_step, async_)

    @overload
    def get_path(
        self, 
        /, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[False] = False, 
        **request_kwargs, 
    ) -> str:
        ...
    @overload
    def get_path(
        self, 
        /, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[True], 
        **request_kwargs, 
    ) -> Coroutine[Any, Any, str]:
        ...
    def get_path(
        self, 
        /, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[False, True] = False, 
        **request_kwargs, 
    ) -> str | Coroutine[Any, Any, str]:
        return self.fs.get_path(
            self, 
            refresh=refresh, 
            async_=async_, 
            **request_kwargs, 
        )

    @overload
    def get_patht(
        self, 
        /, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[False] = False, 
        **request_kwargs, 
    ) -> list[str]:
        ...
    @overload
    def get_patht(
        self, 
        /, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[True], 
        **request_kwargs, 
    ) -> Coroutine[Any, Any, list[str]]:
        ...
    def get_patht(
        self, 
        /, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[False, True] = False, 
        **request_kwargs, 
    ) -> list[str] | Coroutine[Any, Any, list[str]]:
        return self.fs.get_patht(
            self, 
            refresh=refresh, 
            async_=async_, 
            **request_kwargs, 
        )

    @overload
    def get_url(
        self, 
        /, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[False] = False, 
        **request_kwargs, 
    ) -> str:
        ...
    @overload
    def get_url(
        self, 
        /, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[True], 
        **request_kwargs, 
    ) -> Coroutine[Any, Any, str]:
        ...
    def get_url(
        self, 
        /, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[False, True] = False, 
        **request_kwargs, 
    ) -> str | Coroutine[Any, Any, str]:
        return self.fs.get_url(
            self, 
            refresh=refresh, 
            async_=async_, 
            **request_kwargs, 
        )

    @overload
    def glob(
        self, 
        /, 
        pattern: str = "*", 
        ignore_case: bool = False, 
        allow_escaped_slash: bool = True, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[False] = False, 
        **request_kwargs, 
    ) -> Iterator[Self]:
        ...
    @overload
    def glob(
        self, 
        /, 
        pattern: str = "*", 
        ignore_case: bool = False, 
        allow_escaped_slash: bool = True, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[True], 
        **request_kwargs, 
    ) -> AsyncIterator[Self]:
        ...
    def glob(
        self, 
        /, 
        pattern: str = "*", 
        ignore_case: bool = False, 
        allow_escaped_slash: bool = True, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[False, True] = False, 
        **request_kwargs, 
    ) -> Iterator[Self] | AsyncIterator[Self]:
        return self.wrap_path_iter(self.fs.glob(
            pattern, 
            dirname=self, 
            ignore_case=ignore_case, 
            allow_escaped_slash=allow_escaped_slash, 
            refresh=refresh, 
            async_=async_, 
            **request_kwargs, 
        ))

    @overload
    def has_child(
        self, 
        /, 
        child: int | str, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[False] = False, 
        **request_kwargs, 
    ) -> bool:
        ...
    @overload
    def has_child(
        self, 
        /, 
        child: int | str, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[True], 
        **request_kwargs, 
    ) -> Coroutine[Any, Any, bool]:
        ...
    def has_child(
        self, 
        /, 
        child: int | str, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[False, True] = False, 
        **request_kwargs, 
    ) -> bool | Coroutine[Any, Any, bool]:
        return self.fs.has_child(
            child, 
            self, 
            refresh=refresh, 
            async_=async_, 
            **request_kwargs, 
        )

    @overload
    def hash[T](
        self, 
        /, 
        digest: str | HashObj | Callable[[], HashObj] | Callable[[], Callable[[bytes, T], T]] = "md5", 
        start: int = 0, 
        stop: None | int = None, 
        *, 
        async_: Literal[False] = False, 
        **request_kwargs, 
    ) -> tuple[int, HashObj | T]:
        ...
    @overload
    def hash[T](
        self, 
        /, 
        digest: str | HashObj | Callable[[], HashObj] | Callable[[], Callable[[bytes, T], T]] | Callable[[], Callable[[bytes, T], Awaitable[T]]] = "md5", 
        start: int = 0, 
        stop: None | int = None, 
        *, 
        async_: Literal[True], 
        **request_kwargs, 
    ) -> Coroutine[Any, Any, tuple[int, HashObj | T]]:
        ...
    def hash[T](
        self, 
        /, 
        digest: str | HashObj | Callable[[], HashObj] | Callable[[], Callable[[bytes, T], T]] | Callable[[], Callable[[bytes, T], Awaitable[T]]] = "md5", 
        start: int = 0, 
        stop: None | int = None, 
        *, 
        async_: Literal[False, True] = False, 
        **request_kwargs, 
    ) -> tuple[int, HashObj | T] | Coroutine[Any, Any, tuple[int, HashObj | T]]:
        return self.fs.hash(
            self, 
            digest=digest, # type: ignore
            start=start, 
            stop=stop, 
            async_=async_, # type: ignore
            **request_kwargs, 
        )

    @overload
    def hashes[T](
        self, 
        /, 
        digest: str | HashObj | Callable[[], HashObj] | Callable[[], Callable[[bytes, T], T]] = "md5", 
        *digests: str | HashObj | Callable[[], HashObj] | Callable[[], Callable[[bytes, T], T]], 
        start: int = 0, 
        stop: None | int = None, 
        async_: Literal[False] = False, 
        **request_kwargs, 
    ) -> tuple[int, list[HashObj | T]]:
        ...
    @overload
    def hashes[T](
        self, 
        /, 
        digest: str | HashObj | Callable[[], HashObj] | Callable[[], Callable[[bytes, T], T]] | Callable[[], Callable[[bytes, T], Awaitable[T]]] = "md5", 
        *digests: str | HashObj | Callable[[], HashObj] | Callable[[], Callable[[bytes, T], T]] | Callable[[], Callable[[bytes, T], Awaitable[T]]], 
        start: int = 0, 
        stop: None | int = None, 
        async_: Literal[True], 
        **request_kwargs, 
    ) -> Coroutine[Any, Any, tuple[int, list[HashObj | T]]]:
        ...
    def hashes[T](
        self, 
        /, 
        digest: str | HashObj | Callable[[], HashObj] | Callable[[], Callable[[bytes, T], T]] | Callable[[], Callable[[bytes, T], Awaitable[T]]] = "md5", 
        *digests: str | HashObj | Callable[[], HashObj] | Callable[[], Callable[[bytes, T], T]] | Callable[[], Callable[[bytes, T], Awaitable[T]]], 
        start: int = 0, 
        stop: None | int = None, 
        async_: Literal[False, True] = False, 
        **request_kwargs, 
    ) -> tuple[int, list[HashObj | T]] | Coroutine[Any, Any, tuple[int, list[HashObj | T]]]:
        return self.fs.hashes(
            self, 
            digest, # type: ignore
            *digests, # type: ignore
            start=start, 
            stop=stop, 
            async_=async_, # type: ignore
            **request_kwargs, 
        )

    def inode(self, /) -> int:
        return self.id

    def is_absolute(self, /) -> bool:
        return True

    @overload
    def is_dir(
        self, 
        /, 
        async_: Literal[False] = False, 
        **request_kwargs, 
    ) -> bool:
        ...
    @overload
    def is_dir(
        self, 
        /, 
        async_: Literal[True], 
        **request_kwargs, 
    ) -> Coroutine[Any, Any, bool]:
        ...
    def is_dir(
        self, 
        /, 
        async_: Literal[False, True] = False, 
        **request_kwargs, 
    ) -> bool | Coroutine[Any, Any, bool]:
        def gen_step():
            attr = yield self.get_attr(
                refresh=False, 
                async_=async_, 
                **request_kwargs, 
            )
            return attr["is_dir"]
        return run_gen_step(gen_step, async_)

    @overload
    def is_empty(
        self, 
        /, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[False] = False, 
        **request_kwargs, 
    ) -> bool:
        ...
    @overload
    def is_empty(
        self, 
        /, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[True], 
        **request_kwargs, 
    ) -> Coroutine[Any, Any, bool]:
        ...
    def is_empty(
        self, 
        /, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[False, True] = False, 
        **request_kwargs, 
    ) -> bool | Coroutine[Any, Any, bool]:
        return self.fs.is_empty(
            self, 
            refresh=refresh, 
            async_=async_, 
            **request_kwargs, 
        )

    @overload
    def is_file(
        self, 
        /, 
        async_: Literal[False] = False, 
        **request_kwargs, 
    ) -> bool:
        ...
    @overload
    def is_file(
        self, 
        /, 
        async_: Literal[True], 
        **request_kwargs, 
    ) -> Coroutine[Any, Any, bool]:
        ...
    def is_file(
        self, 
        /, 
        async_: Literal[False, True] = False, 
        **request_kwargs, 
    ) -> bool | Coroutine[Any, Any, bool]:
        def gen_step():
            is_dir = yield self.is_dir(async_=async_, **request_kwargs)
            return not is_dir
        return run_gen_step(gen_step, async_)

    def is_symlink(self, /) -> bool:
        return False

    @overload
    def isdir(
        self, 
        /, 
        *, 
        async_: Literal[False] = False, 
        **request_kwargs, 
    ) -> bool:
        ...
    @overload
    def isdir(
        self, 
        /, 
        *, 
        async_: Literal[True], 
        **request_kwargs, 
    ) -> Coroutine[Any, Any, bool]:
        ...
    def isdir(
        self, 
        /, 
        *, 
        async_: Literal[False, True] = False, 
        **request_kwargs, 
    ) -> bool | Coroutine[Any, Any, bool]:
        return self.fs.isdir(
            self, 
            async_=async_, 
            **request_kwargs, 
        )

    @overload
    def isfile(
        self, 
        /, 
        *, 
        async_: Literal[False] = False, 
        **request_kwargs, 
    ) -> bool:
        ...
    @overload
    def isfile(
        self, 
        /, 
        *, 
        async_: Literal[True], 
        **request_kwargs, 
    ) -> Coroutine[Any, Any, bool]:
        ...
    def isfile(
        self, 
        /, 
        *, 
        async_: Literal[False, True] = False, 
        **request_kwargs, 
    ) -> bool | Coroutine[Any, Any, bool]:
        return self.fs.isfile(
            self, 
            async_=async_, 
            **request_kwargs, 
        )

    @overload
    def iter(
        self, 
        /, 
        topdown: None | bool = True, 
        min_depth: int = 1, 
        max_depth: int = 1, 
        predicate: None | Callable[[Mapping], Literal[None, 1, False, True]] = None, 
        onerror: bool | Callable[[OSError], bool] = True, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[False] = False, 
        **request_kwargs, 
    ) -> Iterator[Self]:
        ...
    @overload
    def iter(
        self, 
        /, 
        topdown: None | bool = True, 
        min_depth: int = 1, 
        max_depth: int = 1, 
        predicate: None | Callable[[Mapping], Literal[None, 1, False, True]] = None, 
        onerror: bool | Callable[[OSError], bool] = True, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[True], 
        **request_kwargs, 
    ) -> AsyncIterator[Self]:
        ...
    def iter(
        self, 
        /, 
        topdown: None | bool = True, 
        min_depth: int = 1, 
        max_depth: int = 1, 
        predicate: None | Callable[[Mapping], Literal[None, 1, False, True]] = None, 
        onerror: bool | Callable[[OSError], bool] = True, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[False, True] = False, 
        **request_kwargs, 
    ) -> Iterator[Self] | AsyncIterator[Self]:
        return self.wrap_path_iter(self.fs.iter(
            self,  
            topdown=topdown, 
            min_depth=min_depth, 
            max_depth=max_depth, 
            predicate=predicate, 
            onerror=onerror, 
            refresh=refresh, 
            async_=async_, # type: ignore
            **request_kwargs, 
        ))

    @overload
    def iterdir(
        self, 
        /, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[False] = False, 
        **request_kwargs, 
    ) -> Iterator[Self]:
        ...
    @overload
    def iterdir(
        self, 
        /, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[True], 
        **request_kwargs, 
    ) -> AsyncIterator[Self]:
        ...
    def iterdir(
        self, 
        /, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[False, True] = False, 
        **request_kwargs, 
    ) -> Iterator[Self] | AsyncIterator[Self]:
        def gen_step():
            children = yield self.fs.readdir(
                self, 
                refresh=refresh, 
                async_=async_, 
                **request_kwargs, 
            )
            path_class = type(self)
            fs = self.fs
            return YieldFrom(path_class(fs, a) for a in children)
        return run_gen_step_iter(gen_step, async_)

    @overload
    def join(
        self, 
        /, 
        *names: str, 
        refresh: None | bool = None, 
        async_: Literal[False] = False, 
        **request_kwargs, 
    ) -> Self:
        ...
    @overload
    def join(
        self, 
        /, 
        *names: str, 
        refresh: None | bool = None, 
        async_: Literal[True], 
        **request_kwargs, 
    ) -> Coroutine[Any, Any, Self]:
        ...
    def join(
        self, 
        /, 
        *names: str, 
        refresh: None | bool = None, 
        async_: Literal[False, True] = False, 
        **request_kwargs, 
    ) -> Self | Coroutine[Any, Any, Self]:
        return self.joinpath(
            joins(names), 
            refresh=refresh, 
            async_=async_, 
            **request_kwargs, 
        )

    @overload
    def joinpath(
        self, 
        /, 
        *paths: str, 
        refresh: None | bool = None, 
        async_: Literal[False] = False, 
        **request_kwargs, 
    ) -> Self:
        ...
    @overload
    def joinpath(
        self, 
        /, 
        *paths: str, 
        refresh: None | bool = None, 
        async_: Literal[True], 
        **request_kwargs, 
    ) -> Coroutine[Any, Any, Self]:
        ...
    def joinpath(
        self, 
        /, 
        *paths: str, 
        refresh: None | bool = None, 
        async_: Literal[False, True] = False, 
        **request_kwargs, 
    ) -> Self | Coroutine[Any, Any, Self]:
        def gen_step():
            if not paths:
                return self
            path_new = normpath(joinpath(*paths))
            if not path_new:
                return self
            elif path_new == "/":
                return self.root
            path = yield self.get_path(
                refresh=refresh, 
                async_=async_, 
                **request_kwargs, 
            )
            if path == path_new:
                return self
            return type(self)(self.fs, {"path": joinpath(path, path_new)})
        return run_gen_step(gen_step, async_)

    @overload
    def match(
        self, 
        /, 
        path_pattern: str, 
        ignore_case: bool = False, 
        allow_escaped_slash: bool = True, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[False] = False, 
        **request_kwargs, 
    ) -> bool:
        ...
    @overload
    def match(
        self, 
        /, 
        path_pattern: str, 
        ignore_case: bool = False, 
        allow_escaped_slash: bool = True, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[True], 
        **request_kwargs, 
    ) -> bool:
        ...
    def match(
        self, 
        /, 
        path_pattern: str, 
        ignore_case: bool = False, 
        allow_escaped_slash: bool = True, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[False, True] = False, 
        **request_kwargs, 
    ) -> bool | Coroutine[Any, Any, bool]:
        def gen_step():
            pattern = "(?%s:%s)" % (
                "i"[:ignore_case], 
                "".join(
                    "(?:/%s)?" % pat if typ == "dstar" else "/" + pat 
                    for pat, typ, _ in translate_iter(
                        path_pattern, 
                        allow_escaped_slash=allow_escaped_slash, 
                    )
                ), 
            )
            path = yield self.get_path(
                refresh=refresh, 
                async_=async_, 
                **request_kwargs, 
            )
            return re_compile(pattern).fullmatch(path) is not None
        return run_gen_step(gen_step, async_)

    @overload
    def open(
        self, 
        /, 
        mode: Literal["rb", "br"] = "rb", 
        buffering: None | int = None, 
        encoding: None | str = None, 
        errors: None | str = None, 
        newline: None | str = None, 
        start: int = 0, 
        seek_threshold: int = 1 << 20, 
        http_file_reader_cls: None | type[HTTPFileReader] = None, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[False] = False, 
        **request_kwargs, 
    ) -> HTTPFileReader | BufferedReader:
        ...
    @overload
    def open(
        self, 
        /, 
        mode: Literal["r", "rt", "tr"], 
        buffering: None | int = None, 
        encoding: None | str = None, 
        errors: None | str = None, 
        newline: None | str = None, 
        start: int = 0, 
        seek_threshold: int = 1 << 20, 
        http_file_reader_cls: None | type[HTTPFileReader] = None, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[False] = False, 
        **request_kwargs, 
    ) -> TextIOWrapper:
        ...
    @overload
    def open(
        self, 
        /, 
        mode: Literal["rb", "br"] = "rb", 
        buffering: None | int = None, 
        encoding: None | str = None, 
        errors: None | str = None, 
        newline: None | str = None, 
        start: int = 0, 
        seek_threshold: int = 1 << 20, 
        http_file_reader_cls: None | type[AsyncHTTPFileReader] = None, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[True], 
        **request_kwargs, 
    ) -> AsyncHTTPFileReader | AsyncBufferedReader:
        ...
    @overload
    def open(
        self, 
        /, 
        mode: Literal["r", "rt", "tr"], 
        buffering: None | int = None, 
        encoding: None | str = None, 
        errors: None | str = None, 
        newline: None | str = None, 
        start: int = 0, 
        seek_threshold: int = 1 << 20, 
        http_file_reader_cls: None | type[AsyncHTTPFileReader] = None, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[True], 
        **request_kwargs, 
    ) -> AsyncTextIOWrapper:
        ...
    def open(
        self, 
        /, 
        mode: Literal["r", "rt", "tr", "rb", "br"] = "rb", 
        buffering: None | int = None, 
        encoding: None | str = None, 
        errors: None | str = None, 
        newline: None | str = None, 
        start: int = 0, 
        seek_threshold: int = 1 << 20, 
        http_file_reader_cls: None | type[HTTPFileReader] | type[AsyncHTTPFileReader] = None, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[False, True] = False, 
        **request_kwargs, 
    ) -> HTTPFileReader | BufferedReader | TextIOWrapper | AsyncHTTPFileReader | AsyncBufferedReader | AsyncTextIOWrapper:
        return self.fs.open(
            self, 
            mode=mode, # type: ignore
            buffering=buffering, 
            encoding=encoding, 
            errors=errors, 
            newline=newline, 
            start=start, 
            seek_threshold=seek_threshold, 
            http_file_reader_cls=http_file_reader_cls, 
            refresh=refresh, 
            async_=async_, # type: ignore
            **request_kwargs, 
        )

    @overload
    def read(
        self, 
        /, 
        start: None | int = None, 
        stop: None | int | Undefined = undefined, 
        *, 
        async_: Literal[False] = False, 
        **request_kwargs, 
    ) -> bytes:
        ...
    @overload
    def read(
        self, 
        /, 
        start: None | int = None, 
        stop: None | int | Undefined = undefined, 
        *, 
        async_: Literal[True], 
        **request_kwargs, 
    ) -> Coroutine[Any, Any, bytes]:
        ...
    def read(
        self, 
        /, 
        start: None | int = None, 
        stop: None | int | Undefined = undefined, 
        *, 
        async_: Literal[False, True] = False, 
        **request_kwargs, 
    ) -> bytes | Coroutine[Any, Any, bytes]:
        return self.fs.read(
            self, 
            start, 
            stop, 
            async_=async_, 
            **request_kwargs, 
        )

    @overload
    def read_range(
        self, 
        /, 
        bytes_range: str = "0-", 
        *, 
        async_: Literal[False] = False, 
        **request_kwargs, 
    ) -> bytes:
        ...
    @overload
    def read_range(
        self, 
        /, 
        bytes_range: str = "0-", 
        *, 
        async_: Literal[True], 
        **request_kwargs, 
    ) -> Coroutine[Any, Any, bytes]:
        ...
    def read_range(
        self, 
        /, 
        bytes_range: str = "0-", 
        *, 
        async_: Literal[False, True] = False, 
        **request_kwargs, 
    ) -> bytes | Coroutine[Any, Any, bytes]:
        return self.fs.read_range(
            self, 
            bytes_range, 
            async_=async_, 
            **request_kwargs, 
        )

    @overload
    def read_block(
        self, 
        /, 
        size: int = 0, 
        offset: int = 0, 
        *, 
        async_: Literal[False] = False, 
        **request_kwargs, 
    ) -> bytes:
        ...
    @overload
    def read_block(
        self, 
        /, 
        size: int = 0, 
        offset: int = 0, 
        *, 
        async_: Literal[True], 
        **request_kwargs, 
    ) -> Coroutine[Any, Any, bytes]:
        ...
    def read_block(
        self, 
        /, 
        size: int = 0, 
        offset: int = 0, 
        *, 
        async_: Literal[False, True] = False, 
        **request_kwargs, 
    ) -> bytes | Coroutine[Any, Any, bytes]:
        return self.fs.read_block(
            self, 
            size, 
            offset, 
            async_=async_, 
            **request_kwargs, 
        )

    @overload
    def read_text(
        self, 
        /, 
        encoding: None | str = None, 
        errors: None | str = None, 
        newline: None | str = None, 
        *, 
        async_: Literal[False] = False, 
        **request_kwargs, 
    ) -> str:
        ...
    @overload
    def read_text(
        self, 
        /, 
        encoding: None | str = None, 
        errors: None | str = None, 
        newline: None | str = None, 
        *, 
        async_: Literal[True], 
        **request_kwargs, 
    ) -> Coroutine[Any, Any, str]:
        ...
    def read_text(
        self, 
        /, 
        encoding: None | str = None, 
        errors: None | str = None, 
        newline: None | str = None, 
        *, 
        async_: Literal[False, True] = False, 
        **request_kwargs, 
    ) -> str | Coroutine[Any, Any, str]:
        return self.fs.read_text(
            self, 
            encoding=encoding, 
            errors=errors, 
            newline=newline, 
            async_=async_, 
            **request_kwargs, 
        )

    @overload
    def rglob(
        self, 
        /, 
        pattern: str = "", 
        ignore_case: bool = False, 
        allow_escaped_slash: bool = True, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[False] = False, 
        **request_kwargs, 
    ) -> Iterator[Self]:
        ...
    @overload
    def rglob(
        self, 
        /, 
        pattern: str = "", 
        ignore_case: bool = False, 
        allow_escaped_slash: bool = True, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[True], 
        **request_kwargs, 
    ) -> AsyncIterator[Self]:
        ...
    def rglob(
        self, 
        /, 
        pattern: str = "", 
        ignore_case: bool = False, 
        allow_escaped_slash: bool = True, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[False, True] = False, 
        **request_kwargs, 
    ) -> Iterator[Self] | AsyncIterator[Self]:
        return self.wrap_path_iter(self.fs.rglob(
            pattern, 
            dirname=self, 
            ignore_case=ignore_case, 
            allow_escaped_slash=allow_escaped_slash, 
            refresh=refresh, 
            async_=async_, 
            **request_kwargs, 
        ))

    @overload
    def stat(
        self, 
        /, 
        async_: Literal[False] = False, 
        **request_kwargs, 
    ) -> stat_result:
        ...
    @overload
    def stat(
        self, 
        /, 
        async_: Literal[True], 
        **request_kwargs, 
    ) -> Coroutine[Any, Any, stat_result]:
        ...
    def stat(
        self, 
        /, 
        async_: Literal[False, True] = False, 
        **request_kwargs, 
    ) -> stat_result | Coroutine[Any, Any, stat_result]:
        return self.fs.stat(self, async_=async_, **request_kwargs)

    @overload
    def walk(
        self, 
        /, 
        topdown: None | bool = True, 
        min_depth: int = 1, 
        max_depth: int = -1, 
        onerror: bool | Callable[[OSError], bool] = True, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[False] = False, 
        **request_kwargs, 
    ) -> Iterator[tuple[Self, list[Self], list[Self]]]:
        ...
    @overload
    def walk(
        self, 
        /, 
        topdown: None | bool = True, 
        min_depth: int = 1, 
        max_depth: int = -1, 
        onerror: bool | Callable[[OSError], bool] = True, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[True], 
        **request_kwargs, 
    ) -> AsyncIterator[tuple[Self, list[Self], list[Self]]]:
        ...
    def walk(
        self, 
        /, 
        topdown: None | bool = True, 
        min_depth: int = 1, 
        max_depth: int = -1, 
        onerror: bool | Callable[[OSError], bool] = True, 
        refresh: None | bool = None, 
        *, 
        async_: Literal[False, True] = False, 
        **request_kwargs, 
    ) -> Iterator[tuple[Self, list[Self], list[Self]]] | AsyncIterator[tuple[Self, list[Self], list[Self]]]:
        def gen_step():
            path_class = type(self)
            fs = self.fs
            with with_iter_next(self.fs.walk(
                self, 
                topdown=topdown, 
                min_depth=min_depth, 
                max_depth=max_depth, 
                onerror=onerror, 
                refresh=refresh, 
                async_=async_, 
                **request_kwargs, 
            )) as get_next:
                parent, dirs, files = yield get_next()
                yield (
                    path_class(fs, parent), 
                    [path_class(fs, a) for a in dirs], 
                    [path_class(fs, a) for a in files], 
                )
        return run_gen_step(gen_step, async_)

    def with_name(self, name: str, /) -> Self:
        assert name
        return type(self)(self.fs, {"name": name, "parent_id": self.parent_id})

    def with_stem(self, stem: str, /) -> Self:
        return self.with_name(stem + self.suffix)

    def with_suffix(self, suffix: str, /) -> Self:
        return self.with_name(self.stem + suffix)

    @overload
    def wrap_path_iter(
        self, 
        /, 
        path_iter: Iterable[int | str | Mapping], 
    ) -> Iterator[Self]:
        ...
    @overload
    def wrap_path_iter(
        self, 
        /, 
        path_iter: AsyncIterable[int | str | Mapping], 
    ) -> AsyncIterator[Self]:
        ...
    def wrap_path_iter(
        self, 
        /, 
        path_iter: Iterable[int | str | Mapping] | AsyncIterable[int | str | Mapping], 
    ) -> Iterator[Self] | AsyncIterator[Self]:
        path_class = type(self)
        fs = self.fs
        return do_map(lambda a, /: path_class(fs, a), path_iter)


[docs] class P115FileSystemBase[P115PathType: P115PathBase](ABC): STAT_MODE: ClassVar[int] = 0o444 NO_INCREMENT: ClassVar[bool] = True path_class: ClassVar[type[P115PathType]] id: int = 0 refresh: bool = False full_loaded: bool = False def __init__( self, /, client: str | PathLike | P115Client, refresh: bool = False, id_to_readdir: None | int | dict[int, dict[int, MutableMapping]] = None, ): if isinstance(client, (str, PathLike)): client = P115Client(client, check_for_relogin=True) self.client: P115Client = client self.refresh = refresh if id_to_readdir is None: id_to_readdir = {} elif isinstance(id_to_readdir, int): maxsize = id_to_readdir if maxsize <= 0: id_to_readdir = {} else: id_to_readdir = LRUDict(maxsize) self.id_to_readdir: dict[int, dict[int, MutableMapping]] = id_to_readdir self.id_to_attr: MutableMapping[int, MutableMapping] = WeakValueDictionary() self.id_to_dirnode = IdToDirnode() self._pid_name_to_attr: MutableMapping[tuple[int, str], MutableMapping] = WeakValueDictionary() self._readdir_locks: LRUDict[int, Any] = LRUDict(1024, default_factory=Lock) self._readdir_alocks: LRUDict[int, Any] = LRUDict(1024, default_factory=AsyncLock) def __contains__(self, id_or_path: IDOrPathType, /) -> bool: return self.exists(id_or_path) def __eq__(self, other, /) -> bool: return type(self) is type(other) and self.client == other.client def __getitem__(self, id_or_path: IDOrPathType, /) -> P115PathType: return self.as_path(id_or_path) def __hash__(self, /) -> int: return id(self) def __itruediv__(self, id_or_path: IDOrPathType, /) -> Self: self.chdir(id_or_path) return self def __repr__(self, /) -> str: cls = type(self) return f"<{cls.__module__}.{cls.__qualname__}(client={self.client!r}, id={self.id!r}) at {hex(id(self))}>" @property def user_id(self, /) -> int: return self.client.user_id @overload @abstractmethod def iterdir( self, id: int, /, async_: Literal[False] = False, **request_kwargs, ) -> Iterable[MutableMapping]: ... @overload @abstractmethod def iterdir( self, id: int, /, async_: Literal[True], **request_kwargs, ) -> AsyncIterable[MutableMapping]: ...
[docs] @abstractmethod def iterdir( self, id: int, /, async_: Literal[False, True] = False, **request_kwargs, ) -> Iterable[MutableMapping] | AsyncIterable[MutableMapping]: """迭代获取某个目录下直属的文件或目录的信息,并且需要负责更新 `self.id_to_dirnode` """
@overload def as_path( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, ensure_file: None | bool = None, refresh: None | bool = None, *, async_: Literal[False] = False, **request_kwargs, ) -> P115PathType: ... @overload def as_path( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, ensure_file: None | bool = None, refresh: None | bool = None, *, async_: Literal[True], **request_kwargs, ) -> Coroutine[Any, Any, P115PathType]: ...
[docs] def as_path( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, ensure_file: None | bool = None, refresh: None | bool = None, *, async_: Literal[False, True] = False, **request_kwargs, ) -> P115PathType | Coroutine[Any, Any, P115PathType]: """获取对应的路径对象 """ if refresh is None: refresh = self.refresh def gen_step(): nonlocal id_or_path path_class = type(self).path_class if isinstance(id_or_path, P115PathBase): id_or_path = id_or_path.attr if not refresh and isinstance(id_or_path, Mapping): return path_class(self, id_or_path) attr = yield self.get_attr( id_or_path, pid=pid, ensure_file=ensure_file, refresh=refresh, async_=async_, **request_kwargs, ) return path_class(self, attr) return run_gen_step(gen_step, async_)
def attr_to_stat(self, attr: Mapping, /) -> stat_result: is_dir = attr["is_dir"] return stat_result(( (S_IFDIR if is_dir else S_IFREG) | type(self).STAT_MODE, # mode cast(int, attr["id"]), # ino cast(int, attr["parent_id"]), # dev 1, # nlink 0, # uid 0, # gid 0 if is_dir else attr["size"], # size cast(float, attr.get("atime", 0)), # atime cast(float, attr.get("mtime", 0)), # mtime cast(float, attr.get("ctime", 0)), # ctime )) @overload def chdir( self, id_or_path: IDOrPathType = 0, /, pid: None | int = None, *, async_: Literal[False] = False, **request_kwargs, ) -> int: ... @overload def chdir( self, id_or_path: IDOrPathType = 0, /, pid: None | int = None, *, async_: Literal[True], **request_kwargs, ) -> Coroutine[Any, Any, int]: ...
[docs] def chdir( self, id_or_path: IDOrPathType = 0, /, pid: None | int = None, *, async_: Literal[False, True] = False, **request_kwargs, ) -> int | Coroutine[Any, Any, int]: """切换工作目录 """ def gen_step(): nonlocal id_or_path if isinstance(id_or_path, int): id = id_or_path else: if isinstance(id_or_path, P115PathBase): id_or_path = id_or_path.attr if isinstance(id_or_path, Mapping) and "id" in id_or_path: id = id_or_path["id"] else: attr = yield self.get_attr( id_or_path, pid=pid, ensure_file=False, async_=async_, **request_kwargs, ) id = attr["id"] self.id = id return id return run_gen_step(gen_step, async_)
@overload def dirlen( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False] = False, **request_kwargs, ) -> int: ... @overload def dirlen( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[True], **request_kwargs, ) -> Coroutine[Any, Any, int]: ...
[docs] def dirlen( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False, True] = False, **request_kwargs, ) -> int | Coroutine[Any, Any, int]: """获取目录直属的文件和目录个数 """ def gen_step(): children = yield self.readdir( id_or_path, pid=pid, refresh=refresh, async_=async_, **request_kwargs, ) return len(children) return run_gen_step(gen_step, async_)
@overload def download( self, id_or_path: IDOrPathType, /, path: bytes | str | PathLike = "", mode: Literal["a", "w", "x", "i"] = "a", pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False] = False, **request_kwargs, ) -> tuple[str, int]: ... @overload def download( self, id_or_path: IDOrPathType, /, path: bytes | str | PathLike = "", mode: Literal["a", "w", "x", "i"] = "a", pid: None | int = None, refresh: None | bool = None, *, async_: Literal[True], **request_kwargs, ) -> Coroutine[Any, Any, tuple[str, int]]: ...
[docs] def download( self, id_or_path: IDOrPathType, /, path: bytes | str | PathLike = "", mode: Literal["a", "w", "x", "i"] = "a", pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False, True] = False, **request_kwargs, ) -> tuple[str, int] | Coroutine[Any, Any, tuple[str, int]]: """下载文件到本地 :param id_or_path: 文件在 115 网盘上的 id 或路径 :param path: 本地文件路径 :param mode: 写入模式 - a: append,如果文件不存在则创建,存在则追加(断点续传),返回一个任务 - w: write, 如果文件不存在则创建,存在则覆盖 - x: exists,如果文件不存在则创建,存在则报错 FileExistsError - i: ignore,如果文件不存在则创建,存在则忽略 :param pid: 相对路径的根目录 id :param refresh: 是否刷新数据 :param async_: 是否异步 :param request_kwargs: 其它请求参数 :return: 写入的文件路径和字节数 """ def gen_step(): nonlocal path attr = yield self.get_attr( id_or_path, pid=pid, refresh=refresh, async_=async_, **request_kwargs, ) if attr["is_dir"]: throw(errno.EISDIR, attr) path = fsdecode(path) if not path: path = attr["name"] if ospath.lexists(path): if mode == "x": throw(errno.EEXIST, f"file already exists: {path!r}") elif mode == "i" or mode == "a" and ospath.getsize(path) == attr["size"]: return path, 0 url = yield self.get_url(attr, async_=async_, **request_kwargs) kwargs: dict = { "url": url, "file": path, "headers": url.get("headers"), "resume": mode == "a", } if async_: progress = yield download_async(**kwargs) else: progress = download(**kwargs) return path, progress.downloaded return run_gen_step(gen_step, async_)
@overload def ed2k( self, id_or_path: IDOrPathType, /, name: str = "", pid: None | int = None, *, async_: Literal[False] = False, **request_kwargs, ) -> str: ... @overload def ed2k( self, id_or_path: IDOrPathType, /, name: str = "", pid: None | int = None, *, async_: Literal[True], **request_kwargs, ) -> Coroutine[Any, Any, str]: ... def ed2k( self, id_or_path: IDOrPathType, /, name: str = "", pid: None | int = None, *, async_: Literal[False, True] = False, **request_kwargs, ) -> str | Coroutine[Any, Any, str]: def gen_step(): nonlocal name url = yield self.get_url( id_or_path, pid=pid, async_=async_, **request_kwargs, ) if not name: name = url.get("name", "") name = name.translate(ED2K_NAME_TRANSTAB) if async_: async def request(): async with self.open(url, "rb", async_=True) as file: return await ed2k_hash_async(file) size, hash = yield request() else: with self.open(url, "rb") as file: size, hash = ed2k_hash(file) return f"ed2k://|file|{name}|{size}|{hash}|/" return run_gen_step(gen_step, async_) @overload def exists( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False] = False, **request_kwargs, ) -> bool: ... @overload def exists( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[True], **request_kwargs, ) -> Coroutine[Any, Any, bool]: ... def exists( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False, True] = False, **request_kwargs, ) -> bool | Coroutine[Any, Any, bool]: def gen_step(): try: yield self.get_attr( id_or_path, pid=pid, refresh=refresh, async_=async_, **request_kwargs, ) return True except FileNotFoundError: return False return run_gen_step(gen_step, async_) def getcid(self, /) -> int: return self.id @overload def getcwd( self, /, refresh: None | bool = None, *, async_: Literal[False] = False, **request_kwargs, ) -> str: ... @overload def getcwd( self, /, refresh: None | bool = None, *, async_: Literal[True], **request_kwargs, ) -> Coroutine[Any, Any, str]: ... def getcwd( self, /, refresh: None | bool = None, *, async_: Literal[False, True] = False, **request_kwargs, ) -> str | Coroutine[Any, Any, str]: return self.get_path( self.id, refresh=refresh, async_=async_, **request_kwargs, ) def _get_root_attr(self, /) -> MutableMapping: return {"id": 0, "parent_id": 0, "name": "", "is_dir": True, "size": 0} @overload def _get_attr_by_id( self, id: int, /, refresh: None | bool = None, *, async_: Literal[False] = False, **request_kwargs, ) -> MutableMapping: ... @overload def _get_attr_by_id( self, id: int, /, refresh: None | bool = None, *, async_: Literal[True], **request_kwargs, ) -> Coroutine[Any, Any, MutableMapping]: ... def _get_attr_by_id( self, id: int, /, refresh: None | bool = None, *, async_: Literal[False, True] = False, **request_kwargs, ) -> MutableMapping | Coroutine[Any, Any, MutableMapping]: if refresh is None: refresh = self.refresh def gen_step(): if id == 0: return self._get_root_attr() attr = self.id_to_attr.get(id) if not refresh and attr: return attr if self.full_loaded and self.NO_INCREMENT: if not attr: throw(errno.EIO, id) if attr: it: Any = self.readdir( attr["parent_id"], refresh=refresh, async_=async_, **request_kwargs, ) else: it = self.iter( 0, topdown=None, max_depth=-1, refresh=refresh, async_=async_, **request_kwargs, ) with with_iter_next(it) as get_next: while True: attr = yield get_next() if attr["id"] == id: return attr throw(errno.EIO, id) return run_gen_step(gen_step, async_) @overload def _get_attr_by_path( self, path: str | Sequence[str], /, ensure_file: None | bool = None, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False] = False, **request_kwargs, ) -> MutableMapping: ... @overload def _get_attr_by_path( self, path: str | Sequence[str], /, ensure_file: None | bool = None, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[True], **request_kwargs, ) -> Coroutine[Any, Any, MutableMapping]: ... def _get_attr_by_path( self, path: str | Sequence[str], /, ensure_file: None | bool = None, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False, True] = False, **request_kwargs, ) -> MutableMapping | Coroutine[Any, Any, MutableMapping]: if refresh is None: refresh = self.refresh def gen_step(): nonlocal path, pid, ensure_file if pid is None: pid = self.id patht: Sequence[str] if isinstance(path, str): if ensure_file is None and path_is_dir_form(path): ensure_file = False patht, go_up = splits(path) else: patht, go_up = path, 0 if patht and not patht[0]: pid = 0 patht = patht[1:] get_parent_id = self.get_parent_id while pid and go_up: pid = yield get_parent_id( pid, refresh=refresh, async_=async_, **request_kwargs, ) if not patht: if ensure_file: throw(errno.ENOENT, path) if not pid: return self._get_root_attr() return self.get_attr( pid, ensure_file=ensure_file, refresh=refresh, async_=async_, **request_kwargs, ) get_id = self.id_to_dirnode.get_id readdir = self.readdir for name in patht[:-1]: if not refresh: try: pid = get_id([name], parent_id=pid) continue except KeyError: pass found = False for attr in (yield readdir( pid, refresh=refresh, async_=async_, **request_kwargs, )): if attr["is_dir"] and attr["name"] == name: pid = attr["id"] found = True if not found: throw(errno.ENOENT, path) name = patht[-1] if not refresh and pid in self.id_to_readdir: attr = self._pid_name_to_attr.get((pid, name)) if attr and attr["parent_id"] == pid and attr["name"] == name: return attr else: self._pid_name_to_attr.pop((pid, name), None) throw(errno.ENOENT, path) for attr in (yield readdir( pid, refresh=refresh, async_=async_, **request_kwargs, )): if attr["name"] == name: if ensure_file is None or ensure_file ^ attr["is_dir"]: return attr throw(errno.ENOENT, path) return run_gen_step(gen_step, async_) @overload def get_attr( self, id_or_path: IDOrPathType = "", /, ensure_file: None | bool = None, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False] = False, **request_kwargs, ) -> MutableMapping: ... @overload def get_attr( self, id_or_path: IDOrPathType = "", /, ensure_file: None | bool = None, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[True], **request_kwargs, ) -> Coroutine[Any, Any, MutableMapping]: ... def get_attr( self, id_or_path: IDOrPathType = "", /, ensure_file: None | bool = None, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False, True] = False, **request_kwargs, ) -> MutableMapping | Coroutine[Any, Any, MutableMapping]: if pid is None: pid = self.id if refresh is None: refresh = self.refresh def gen_step(): nonlocal id_or_path, ensure_file, pid if isinstance(id_or_path, P115PathBase): id_or_path = id_or_path.attr if isinstance(id_or_path, Mapping): attr = id_or_path if "id" in attr: id_or_path = attr["id"] elif "path" in attr: id_or_path = attr["path"] else: id_or_path = [attr["name"]] if "parent_id" in attr: pid = attr["parent_id"] if "is_dir" in attr: ensure_file = not attr["is_dir"] if isinstance(id_or_path, int): return (yield self._get_attr_by_id( id_or_path, refresh=refresh, async_=async_, **request_kwargs, )) return self._get_attr_by_path( cast(str | Sequence[str], id_or_path), ensure_file=ensure_file, pid=pid, refresh=refresh, async_=async_, **request_kwargs, ) return run_gen_step(gen_step, async_) @overload def get_ancestors( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False] = False, **request_kwargs, ) -> list[AncestorDict]: ... @overload def get_ancestors( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[True], **request_kwargs, ) -> Coroutine[Any, Any, list[AncestorDict]]: ... def get_ancestors( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False, True] = False, **request_kwargs, ) -> list[AncestorDict] | Coroutine[Any, Any, list[AncestorDict]]: if pid is None: pid = self.id if refresh is None: refresh = self.refresh def gen_step(): nonlocal id_or_path, pid if isinstance(id_or_path, P115PathBase): id_or_path = id_or_path.attr if isinstance(id_or_path, Mapping): attr = id_or_path if "id" in attr: id_or_path = attr["id"] elif "path" in attr: id_or_path = attr["path"] else: id_or_path = [attr["name"]] if "parent_id" in attr: pid = attr["parent_id"] if not refresh and pid == 0 and "id" in attr and "name" in attr: return [ {"id": 0, "parent_id": 0, "name": ""}, {"id": attr["id"], "parent_id": pid, "name": attr["name"]}, ] if id_or_path == 0: return [{"id": 0, "parent_id": 0, "name": ""}] attr = yield self.get_attr( id_or_path, pid=pid, refresh=refresh, async_=async_, **request_kwargs, ) pid = attr["parent_id"] pancestors: None | list[AncestorDict] = None if not refresh: try: pancestors = self.id_to_dirnode.get_ancestors(pid) except KeyError: pass if pancestors is None: pancestors = yield self._get_ancestors_by_cid( pid, refresh=refresh, async_=async_, **request_kwargs, ) pancestors = cast(list[AncestorDict], pancestors) pancestors.append({"id": attr["id"], "parent_id": pid, "name": attr["name"]}) return pancestors return run_gen_step(gen_step, async_) @overload def _get_ancestors_by_cid( self, cid: int = 0, /, refresh: None | bool = False, *, async_: Literal[False] = False, **request_kwargs, ) -> list[AncestorDict]: ... @overload def _get_ancestors_by_cid( self, cid: int = 0, /, refresh: None | bool = False, *, async_: Literal[True], **request_kwargs, ) -> Coroutine[Any, Any, list[AncestorDict]]: ... def _get_ancestors_by_cid( self, cid: int = 0, /, refresh: None | bool = False, *, async_: Literal[False, True] = False, **request_kwargs, ) -> list[AncestorDict] | Coroutine[Any, Any, list[AncestorDict]]: """获取某个目录 id 对应的祖先节点信息(包括自身) """ def gen_step(): if cid == 0: return [{"id": 0, "parent_id": 0, "name": ""}] if not refresh: try: return self.id_to_dirnode.get_ancestors(cid) except KeyError: pass attr = yield self.get_attr( cid, refresh=refresh, async_=async_, **request_kwargs, ) ancestors: list[AncestorDict] = yield self._get_ancestors_by_cid( attr["parent_id"], refresh=refresh, async_=async_, **request_kwargs, ) ancestors.append({ "id": attr["id"], "parent_id": attr["parent_id"], "name": attr["name"], }) return ancestors return run_gen_step(gen_step, async_) @overload def get_id( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False] = False, **request_kwargs, ) -> int: ... @overload def get_id( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[True], **request_kwargs, ) -> Coroutine[Any, Any, int]: ... def get_id( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False, True] = False, **request_kwargs, ) -> int | Coroutine[Any, Any, int]: if refresh is None: refresh = self.refresh def gen_step(): nonlocal id_or_path, pid if isinstance(id_or_path, int): return id_or_path if isinstance(id_or_path, P115PathBase): id_or_path = id_or_path.attr if isinstance(id_or_path, Mapping): attr = id_or_path if "id" in attr: return attr["id"] elif "path" in attr: id_or_path = attr["path"] else: id_or_path = [attr["name"]] if "parent_id" in attr: pid = attr["parent_id"] if pid is None: pid = self.id id_or_path = cast(str | Sequence[str], id_or_path) if isinstance(id_or_path, str): path = id_or_path if id_or_path == "/": return 0 elif path in ("", "."): return pid else: if not id_or_path: return pid elif len(id_or_path) == 1 and not id_or_path[0]: return 0 patht: Sequence[str] if not refresh: try: if isinstance(id_or_path, str): patht, go_up = splits(id_or_path) while pid and go_up: pid = self.id_to_dirnode.get_parent_id(pid) go_up -= 1 else: patht = id_or_path return self.id_to_dirnode.get_id(patht, pid) except KeyError: pass attr = yield self.get_attr( id_or_path, pid=pid, refresh=refresh, async_=async_, **request_kwargs, ) return attr["id"] return run_gen_step(gen_step, async_) @overload def get_parent_id( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False] = False, **request_kwargs, ) -> list[dict]: ... @overload def get_parent_id( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[True], **request_kwargs, ) -> Coroutine[Any, Any, list[dict]]: ... def get_parent_id( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False, True] = False, **request_kwargs, ) -> list[dict] | Coroutine[Any, Any, list[dict]]: if refresh is None: refresh = self.refresh def gen_step(): nonlocal id_or_path, pid if isinstance(id_or_path, P115PathBase): id_or_path = id_or_path.attr if isinstance(id_or_path, Mapping): attr = id_or_path if "id" in attr: id_or_path = attr["id"] elif "path" in attr: id_or_path = attr["path"] else: id_or_path = [attr["name"]] if "parent_id" in attr: pid = attr["parent_id"] if pid is None: pid = self.id if not refresh and isinstance(id_or_path, int): try: return self.id_to_dirnode.get_parent_id(id_or_path) except KeyError: pass attr = yield self.get_attr( id_or_path, pid=pid, refresh=refresh, async_=async_, **request_kwargs, ) return attr["parent_id"] return run_gen_step(gen_step, async_) @overload def get_path( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False] = False, **request_kwargs, ) -> str: ... @overload def get_path( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[True], **request_kwargs, ) -> Coroutine[Any, Any, str]: ... def get_path( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False, True] = False, **request_kwargs, ) -> str | Coroutine[Any, Any, str]: def gen_step(): patht: list[str] = yield self.get_patht( id_or_path, pid=pid, refresh=refresh, async_=async_, **request_kwargs, ) return joins(patht) return run_gen_step(gen_step, async_) @overload def get_patht( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False] = False, **request_kwargs, ) -> list[str]: ... @overload def get_patht( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[True], **request_kwargs, ) -> Coroutine[Any, Any, list[str]]: ... def get_patht( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False, True] = False, **request_kwargs, ) -> list[str] | Coroutine[Any, Any, list[str]]: if refresh is None: refresh = self.refresh def gen_step(): nonlocal id_or_path, pid if isinstance(id_or_path, P115PathBase): id_or_path = id_or_path.attr if isinstance(id_or_path, Mapping): attr = id_or_path if "id" in attr: id_or_path = attr["id"] elif "path" in attr: id_or_path = attr["path"] else: id_or_path = [attr["name"]] if "parent_id" in attr: pid = attr["parent_id"] if isinstance(id_or_path, int): id = id_or_path if id == 0: return [""] if not refresh: try: return self.id_to_dirnode.get_patht(id) except KeyError: pass ancestors = yield self.get_ancestors( id, refresh=refresh, async_=async_, **request_kwargs, ) return [a["name"] for a in ancestors] if pid is None: pid = self.id id_or_path = cast(str | Sequence[str], id_or_path) if isinstance(id_or_path, str): id_or_path, go_up = splits(id_or_path) else: go_up = 0 if id_or_path and not id_or_path[0]: return id_or_path if pid == 0: patht = [""] go_up = 0 else: patht = yield self.get_patht( pid, refresh=refresh, async_=async_, **request_kwargs, ) if go_up: if len(patht) <= go_up: patht = [""] else: patht = patht[:-go_up] if id_or_path: patht.extend(id_or_path) return patht return run_gen_step(gen_step, async_) @overload def glob( self, /, pattern: str = "*", top: IDOrPathType = "", ignore_case: bool = False, allow_escaped_slash: bool = True, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False] = False, **request_kwargs, ) -> Iterator[Mapping]: ... @overload def glob( self, /, pattern: str = "*", top: IDOrPathType = "", ignore_case: bool = False, allow_escaped_slash: bool = True, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[True], **request_kwargs, ) -> AsyncIterator[Mapping]: ... def glob( self, /, pattern: str = "*", top: IDOrPathType = "", ignore_case: bool = False, allow_escaped_slash: bool = True, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False, True] = False, **request_kwargs, ) -> Iterator[Mapping] | AsyncIterator[Mapping]: if pattern == "*": return self.iter( top, pid=pid, refresh=refresh, async_=async_, **request_kwargs, ) elif len(pattern) >= 2 and not pattern.strip("*"): return self.iter( top, max_depth=-1, pid=pid, refresh=refresh, async_=async_, **request_kwargs, ) def gen_step(): nonlocal pattern, pid if not pattern: try: yield Yield(self.get_attr( top, pid=pid, refresh=refresh, async_=async_, **request_kwargs, )) except FileNotFoundError: pass return elif not pattern.lstrip("/"): return Yield(self.get_attr(0)) splitted_pats = tuple(translate_iter( pattern, allow_escaped_slash=allow_escaped_slash, )) if pattern.startswith("/"): attr = self.get_attr(0) pid = 0 dirname = "/" else: attr = yield self.get_attr( top, pid=pid, refresh=refresh, async_=async_, **request_kwargs, ) pid = cast(int, attr["id"]) dirname = self.get_path(attr) i = 0 subpath = "" if ignore_case: if any(typ == "dstar" for _, typ, _ in splitted_pats): pattern = "".join( "(?:/%s)?" % pat if typ == "dstar" else "/" + pat for pat, typ, _ in splitted_pats ) if dirname != "/": pattern = re_escape(dirname) + pattern match = re_compile("(?i:%s)" % pattern).fullmatch yield YieldFrom(self.iter( attr, max_depth=-1, predicate=lambda a: match(self.get_path(a)) is not None, refresh=refresh, async_=async_, **request_kwargs, )) return else: typ = None for i, (pat, typ, orig) in enumerate(splitted_pats): if typ != "orig": break subpath = joinpath(subpath, orig) if typ == "orig": try: yield Yield(self.get_attr( subpath, pid=pid, refresh=refresh, async_=async_, **request_kwargs, )) except FileNotFoundError: pass return elif typ == "dstar" and i + 1 == len(splitted_pats): return YieldFrom(self.iter( subpath, pid=pid, max_depth=-1, refresh=refresh, async_=async_, **request_kwargs, )) if any(typ == "dstar" for _, typ, _ in splitted_pats[i:]): pattern = "".join( "(?:/%s)?" % pat if typ == "dstar" else "/" + pat for pat, typ, _ in splitted_pats[i:] ) if dirname != "/": pattern = re_escape(dirname) + pattern match = re_compile(pattern).fullmatch return YieldFrom(self.iter( subpath, pid=pid, max_depth=-1, predicate=lambda a: match(self.get_path(a)) is not None, refresh=refresh, async_=async_, **request_kwargs, )) cref_cache: dict[int, Callable] = {} if subpath: try: attr = yield self.get_attr( subpath, pid=pid, refresh=refresh, async_=async_, **request_kwargs, ) except FileNotFoundError: return if not attr["is_dir"]: return def glob_step_match(attr: Mapping, i: int): j = i + 1 at_end = j == len(splitted_pats) pat, typ, orig = splitted_pats[i] if typ == "orig": try: subattr = yield self.get_attr( orig, pid=attr["id"], refresh=refresh, async_=async_, **request_kwargs, ) except FileNotFoundError: return if at_end: yield Yield(subattr) elif subattr["is_dir"]: yield from glob_step_match(subattr, j) elif typ == "star": if at_end: yield YieldFrom(self.iter( attr, refresh=refresh, async_=async_, **request_kwargs, )) else: subattrs = yield self.readdir( attr, refresh=refresh, async_=async_, **request_kwargs, ) for subattr in subattrs: if subattr["is_dir"]: yield from glob_step_match(subattr, j) else: subattrs = yield self.readdir( attr, refresh=refresh, async_=async_, **request_kwargs, ) for subattr in subattrs: try: cref = cref_cache[i] except KeyError: if ignore_case: pat = "(?i:%s)" % pat cref = cref_cache[i] = re_compile(pat).fullmatch if cref(subattr["name"]): if at_end: yield Yield(subattr) elif subattr["is_dir"]: yield from glob_step_match(subattr, j) yield from glob_step_match(attr, i) return run_gen_step_iter(gen_step, async_) @overload def get_url( self, id_or_path: IDOrPathType, /, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False] = False, **request_kwargs, ) -> str: ... @overload def get_url( self, id_or_path: IDOrPathType, /, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[True], **request_kwargs, ) -> Coroutine[Any, Any, str]: ... def get_url( self, id_or_path: IDOrPathType, /, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False, True] = False, **request_kwargs, ) -> str | Coroutine[Any, Any, str]: def gen_step(): attr = yield self.get_attr( id_or_path, pid=pid, ensure_file=True, refresh=refresh, async_=async_, **request_kwargs, ) if attr["is_dir"]: throw(errno.EISDIR, attr) return attr["url"] return run_gen_step(gen_step, async_) @overload def has_child( self, /, child: int | str, parent: IDOrPathType = "", pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False] = False, **request_kwargs, ) -> bool: ... @overload def has_child( self, /, child: int | str, parent: IDOrPathType = "", pid: None | int = None, refresh: None | bool = None, *, async_: Literal[True], **request_kwargs, ) -> Coroutine[Any, Any, bool]: ...
[docs] def has_child( self, /, child: int | str, parent: IDOrPathType = "", pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False, True] = False, **request_kwargs, ) -> bool | Coroutine[Any, Any, bool]: """检查目录中是否存在某个文件或目录 """ def gen_step(): attr = yield self.get_attr( parent, pid=pid, refresh=refresh, async_=async_, **request_kwargs, ) if not attr["is_dir"]: return False try: child_attr = yield self.get_attr( child if isinstance(child, int) else [child], pid=attr["id"], refresh=refresh, async_=async_, **request_kwargs, ) except FileNotFoundError: return False return attr["id"] == child_attr["parent_id"] return run_gen_step(gen_step, async_)
@overload def hash[T]( self, id_or_path: IDOrPathType = "", /, digest: str | HashObj | Callable[[], HashObj] | Callable[[], Callable[[bytes, T], T]] = "md5", start: int = 0, size: None | int = None, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False] = False, **request_kwargs, ) -> tuple[int, HashObj | T]: ... @overload def hash[T]( self, id_or_path: IDOrPathType = "", /, digest: str | HashObj | Callable[[], HashObj] | Callable[[], Callable[[bytes, T], T]] | Callable[[], Callable[[bytes, T], Awaitable[T]]] = "md5", start: int = 0, size: None | int = None, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[True], **request_kwargs, ) -> Coroutine[Any, Any, tuple[int, HashObj | T]]: ... def hash[T]( self, id_or_path: IDOrPathType = "", /, digest: str | HashObj | Callable[[], HashObj] | Callable[[], Callable[[bytes, T], T]] | Callable[[], Callable[[bytes, T], Awaitable[T]]] = "md5", start: int = 0, size: None | int = None, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False, True] = False, **request_kwargs, ) -> tuple[int, HashObj | T] | Coroutine[Any, Any, tuple[int, HashObj | T]]: def gen_step(): if async_: async def request(): async with self.open( id_or_path, mode="rb", start=start, pid=pid, refresh=refresh, async_=True, **request_kwargs, ) as file: return await file_digest_async(file, digest, stop=size) return request() else: with self.open( id_or_path, mode="rb", start=start, pid=pid, refresh=refresh, **request_kwargs, ) as file: return file_digest(file, digest, stop=size) return run_gen_step(gen_step, async_) @overload def hashes[T]( self, id_or_path: IDOrPathType = "", /, digest: str | HashObj | Callable[[], HashObj] | Callable[[], Callable[[bytes, T], T]] = "md5", *digests: str | HashObj | Callable[[], HashObj] | Callable[[], Callable[[bytes, T], T]], start: int = 0, size: None | int = None, pid: None | int = None, refresh: None | bool = None, async_: Literal[False] = False, **request_kwargs, ) -> tuple[int, list[HashObj | T]]: ... @overload def hashes[T]( self, id_or_path: IDOrPathType = "", /, digest: str | HashObj | Callable[[], HashObj] | Callable[[], Callable[[bytes, T], T]] | Callable[[], Callable[[bytes, T], Awaitable[T]]] = "md5", *digests: str | HashObj | Callable[[], HashObj] | Callable[[], Callable[[bytes, T], T]] | Callable[[], Callable[[bytes, T], Awaitable[T]]], start: int = 0, size: None | int = None, pid: None | int = None, refresh: None | bool = None, async_: Literal[True], **request_kwargs, ) -> Coroutine[Any, Any, tuple[int, list[HashObj | T]]]: ... def hashes[T]( self, id_or_path: IDOrPathType = "", /, digest: str | HashObj | Callable[[], HashObj] | Callable[[], Callable[[bytes, T], T]] | Callable[[], Callable[[bytes, T], Awaitable[T]]] = "md5", *digests: str | HashObj | Callable[[], HashObj] | Callable[[], Callable[[bytes, T], T]] | Callable[[], Callable[[bytes, T], Awaitable[T]]], start: int = 0, size: None | int = None, pid: None | int = None, refresh: None | bool = None, async_: Literal[False, True] = False, **request_kwargs, ) -> tuple[int, list[HashObj | T]] | Coroutine[Any, Any, tuple[int, list[HashObj | T]]]: def gen_step(): if async_: async def request(): async with self.open( id_or_path, mode="rb", start=start, pid=pid, refresh=refresh, async_=True, **request_kwargs, ) as file: return await file_mdigest_async(file, digest, *digests, stop=size) return request() else: with self.open( id_or_path, mode="rb", start=start, pid=pid, refresh=refresh, **request_kwargs, ) as file: return file_mdigest(file, digest, *digests, stop=size) return run_gen_step(gen_step, async_) @overload def is_empty( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False] = False, **request_kwargs, ) -> bool: ... @overload def is_empty( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[True], **request_kwargs, ) -> Coroutine[Any, Any, bool]: ...
[docs] def is_empty( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False, True] = False, **request_kwargs, ) -> bool | Coroutine[Any, Any, bool]: """是否为空文件、空目录或者不存在 """ def gen_step(): try: attr = yield self.get_attr( id_or_path, pid=pid, refresh=False, async_=async_, **request_kwargs, ) except FileNotFoundError: return True if attr["is_dir"]: count_children = yield self.dirlen( id_or_path, pid=pid, refresh=refresh, async_=async_, **request_kwargs, ) return count_children == 0 else: return attr["size"] == 0 return run_gen_step(gen_step, async_)
@overload def isdir( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, *, async_: Literal[False] = False, **request_kwargs, ) -> bool: ... @overload def isdir( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, *, async_: Literal[True], **request_kwargs, ) -> Coroutine[Any, Any, bool]: ... def isdir( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, *, async_: Literal[False, True] = False, **request_kwargs, ) -> bool | Coroutine[Any, Any, bool]: def gen_step(): try: attr = yield self.get_attr( id_or_path, pid=pid, refresh=False, async_=async_, **request_kwargs, ) return attr["is_dir"] except FileNotFoundError: return False return run_gen_step(gen_step, async_) @overload def isfile( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, *, async_: Literal[False] = False, **request_kwargs, ) -> bool: ... @overload def isfile( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, *, async_: Literal[True], **request_kwargs, ) -> Coroutine[Any, Any, bool]: ... def isfile( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, *, async_: Literal[False, True] = False, **request_kwargs, ) -> bool | Coroutine[Any, Any, bool]: def gen_step(): try: attr = yield self.get_attr( id_or_path, pid=pid, refresh=False, async_=async_, **request_kwargs, ) return not attr["is_dir"] except FileNotFoundError: return False return run_gen_step(gen_step, async_) @overload def iter( self, top: IDOrPathType = "", /, topdown: None | bool = True, min_depth: int = 1, max_depth: int = 1, predicate: None | Callable[[Mapping], Literal[None, 1, False, True]] = None, onerror: bool | Callable[[OSError], bool] = True, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False] = False, **request_kwargs, ) -> Iterator[MutableMapping]: ... @overload def iter( self, top: IDOrPathType = "", /, topdown: None | bool = True, min_depth: int = 1, max_depth: int = 1, predicate: None | Callable[[Mapping], Literal[None, 1, False, True]] = None, onerror: bool | Callable[[OSError], bool] = True, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[True], **request_kwargs, ) -> AsyncIterator[MutableMapping]: ... def iter( self, top: IDOrPathType = "", /, topdown: None | bool = True, min_depth: int = 1, max_depth: int = 1, predicate: None | Callable[[Mapping], Literal[None, 1, False, True]] = None, onerror: bool | Callable[[OSError], bool] = True, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False, True] = False, **request_kwargs, ) -> Iterator[MutableMapping] | AsyncIterator[MutableMapping]: def gen_step(): try: attr = yield self.get_attr( top, pid=pid, refresh=refresh, async_=async_, **request_kwargs, ) except OSError as e: if callable(onerror): yield onerror(e) elif onerror: raise return if min_depth <= 0: yield attr yield YieldFrom(iterdir_generic( attr, iterdir=partial(self.readdir, refresh=refresh, async_=async_, **request_kwargs), topdown=topdown, min_depth=min_depth, max_depth=max_depth, isdir=itemgetter("is_dir"), predicate=predicate, onerror=onerror, async_=async_, )) if attr["id"] == 0: self.full_loaded = True return run_gen_step_iter(gen_step, async_) @overload def listdir( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False] = False, **request_kwargs, ) -> list[P115PathType]: ... @overload def listdir( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[True], **request_kwargs, ) -> Coroutine[Any, Any, list[P115PathType]]: ... def listdir( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False, True] = False, **request_kwargs, ) -> list[P115PathType] | Coroutine[Any, Any, list[P115PathType]]: def gen_step(): children = yield self.readdir( id_or_path, pid=pid, refresh=refresh, async_=async_, **request_kwargs, ) path_class = type(self).path_class return [path_class(self, attr) for attr in children] return run_gen_step(gen_step, async_) @overload def readdir( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False] = False, **request_kwargs, ) -> list[MutableMapping]: ... @overload def readdir( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[True], **request_kwargs, ) -> Coroutine[Any, Any, list[MutableMapping]]: ... def readdir( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False, True] = False, **request_kwargs, ) -> list[MutableMapping] | Coroutine[Any, Any, list[MutableMapping]]: if pid is None: pid = self.id if refresh is None: refresh = self.refresh if not id_or_path: id_or_path = pid def gen_step(): if isinstance(id_or_path, int): id = id_or_path else: id = yield self.get_id( id_or_path, pid=pid, refresh=refresh, ensure_file=False, async_=async_, **request_kwargs, ) id_to_readdir = self.id_to_readdir children = id_to_readdir.get(id) if refresh or not id_to_readdir or children is None: if children is None: children = {} if async_: lock: Any = self._readdir_alocks[id] else: lock = self._readdir_locks[id] timeout = 0.1 # 100 ms start_t = time() yield lock.acquire() try: if time() - start_t >= timeout: children_new = id_to_readdir.get(id) if children_new is not None: return list(children_new.values()) with with_iter_next(self.iterdir( id, async_=async_, **request_kwargs, )) as get_next: while True: attr = yield get_next() fid = attr["id"] try: children[fid].update(attr) except KeyError: children[fid] = attr finally: lock.release() id_to_readdir[id] = children self._pid_name_to_attr.update(((id, attr["name"]), attr) for attr in children.values()) self.id_to_attr.update(children) return list(children.values()) return run_gen_step(gen_step, async_) @overload def open( self, id_or_path: IDOrPathType, /, mode: Literal["rb", "br"] = "rb", buffering: None | int = None, encoding: None | str = None, errors: None | str = None, newline: None | str = None, start: int = 0, seek_threshold: int = 1 << 20, http_file_reader_cls: None | type[HTTPFileReader] = None, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False] = False, **request_kwargs, ) -> HTTPFileReader | BufferedReader: ... @overload def open( self, id_or_path: IDOrPathType, /, mode: Literal["r", "rt", "tr"], buffering: None | int = None, encoding: None | str = None, errors: None | str = None, newline: None | str = None, start: int = 0, seek_threshold: int = 1 << 20, http_file_reader_cls: None | type[HTTPFileReader] = None, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False] = False, **request_kwargs, ) -> TextIOWrapper: ... @overload def open( self, id_or_path: IDOrPathType, /, mode: Literal["rb", "br"] = "rb", buffering: None | int = None, encoding: None | str = None, errors: None | str = None, newline: None | str = None, start: int = 0, seek_threshold: int = 1 << 20, http_file_reader_cls: None | type[AsyncHTTPFileReader] = None, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[True], **request_kwargs, ) -> AsyncHTTPFileReader | AsyncBufferedReader: ... @overload def open( self, id_or_path: IDOrPathType, /, mode: Literal["r", "rt", "tr"], buffering: None | int = None, encoding: None | str = None, errors: None | str = None, newline: None | str = None, start: int = 0, seek_threshold: int = 1 << 20, http_file_reader_cls: None | type[AsyncHTTPFileReader] = None, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[True], **request_kwargs, ) -> AsyncTextIOWrapper: ...
[docs] def open( self, id_or_path: IDOrPathType, /, mode: Literal["r", "rt", "tr", "rb", "br"] = "rb", buffering: None | int = None, encoding: None | str = None, errors: None | str = None, newline: None | str = None, start: int = 0, seek_threshold: int = 1 << 20, http_file_reader_cls: None | type[HTTPFileReader] | type[AsyncHTTPFileReader] = None, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False, True] = False, **request_kwargs, ) -> HTTPFileReader | BufferedReader | TextIOWrapper | AsyncHTTPFileReader | AsyncBufferedReader | AsyncTextIOWrapper: """打开一个文件,仅用于读取 """ if mode not in ("r", "rt", "tr", "rb", "br"): throw(errno.EINVAL, f"invalid (or unsupported) mode: {mode!r}") def gen_step(): url = yield self.get_url( id_or_path, pid=pid, refresh=refresh, async_=async_, **request_kwargs, ) file = yield self.client.open( url, start=start, seek_threshold=seek_threshold, http_file_reader_cls=http_file_reader_cls, **request_kwargs, ) return file.wrap( text_mode="b" not in mode, buffering=buffering, encoding=encoding, errors=errors, newline=newline, ) return run_gen_step(gen_step, async_)
@overload def read( self, id_or_path: IDOrPathType = "", /, start: None | int = None, stop: None | int | Undefined = undefined, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False] = False, **request_kwargs, ) -> bytes: ... @overload def read( self, id_or_path: IDOrPathType = "", /, start: None | int = None, stop: None | int | Undefined = undefined, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[True], **request_kwargs, ) -> Coroutine[Any, Any, bytes]: ...
[docs] def read( self, id_or_path: IDOrPathType = "", /, start: None | int = None, stop: None | int | Undefined = undefined, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False, True] = False, **request_kwargs, ) -> bytes | Coroutine[Any, Any, bytes]: """ """ def gen_step(): nonlocal id_or_path, start, stop if is_undefined(stop): start, stop = 0, start elif start is None: start = 0 if stop is None: if start < 0: bytes_range = str(start) else: bytes_range = f"{start}-" else: if start < 0 or stop < 0: attr = id_or_path = yield self.get_attr( id_or_path, pid=pid, refresh=refresh, ensure_file=True, async_=async_, **request_kwargs, ) size = cast(int, attr["size"]) if start < 0: start += size if start < 0: start = 0 if stop < 0: stop += size elif stop > size: stop = size if start >= stop: return b"" bytes_range = f"{start}-{stop-1}" return self.read_range( id_or_path, bytes_range, pid=pid, refresh=refresh, async_=async_, **request_kwargs, ) return run_gen_step(gen_step, async_)
@overload def read_range( self, id_or_path: IDOrPathType = "", /, bytes_range: str = "0-", pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False] = False, **request_kwargs, ) -> bytes: ... @overload def read_range( self, id_or_path: IDOrPathType = "", /, bytes_range: str = "0-", pid: None | int = None, refresh: None | bool = None, *, async_: Literal[True], **request_kwargs, ) -> Coroutine[Any, Any, bytes]: ... def read_range( self, id_or_path: IDOrPathType = "", /, bytes_range: str = "0-", pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False, True] = False, **request_kwargs, ) -> bytes | Coroutine[Any, Any, bytes]: def gen_step(): url = yield self.get_url( id_or_path, pid=pid, refresh=refresh, async_=async_, **request_kwargs, ) headers = request_kwargs["headers"] = dict(request_kwargs.get("headers") or ()) if isinstance(url, P115URL): headers.update(url.get("headers") or ()) headers["accept-encoding"] = "identity" headers["range"] = "bytes=" + bytes_range request_kwargs["parse"] = False return self.client.request( url, async_=async_, **request_kwargs, ) return run_gen_step(gen_step, async_) @overload def read_block( self, id_or_path: IDOrPathType = "", /, start: int = 0, size: None | int = None, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False] = False, **request_kwargs, ) -> bytes: ... @overload def read_block( self, id_or_path: IDOrPathType = "", /, start: int = 0, size: None | int = None, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[True], **request_kwargs, ) -> Coroutine[Any, Any, bytes]: ...
[docs] def read_block( self, id_or_path: IDOrPathType = "", /, start: int = 0, size: None | int = None, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False, True] = False, **request_kwargs, ) -> bytes | Coroutine[Any, Any, bytes]: """ """ def gen_step(): if size is None: stop = None elif size <= 0: return b"" else: stop = start + size if start < 0 and stop >= 0: stop = None return (yield self.read( id_or_path, start=start, stop=stop, pid=pid, refresh=refresh, async_=async_, # type: ignore **request_kwargs, )) return run_gen_step(gen_step, async_)
@overload def read_text( self, id_or_path: IDOrPathType = "", /, encoding: None | str = None, errors: None | str = None, newline: None | str = None, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False] = False, **request_kwargs, ) -> str: ... @overload def read_text( self, id_or_path: IDOrPathType = "", /, encoding: None | str = None, errors: None | str = None, newline: None | str = None, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[True], **request_kwargs, ) -> Coroutine[Any, Any, str]: ... def read_text( self, id_or_path: IDOrPathType = "", /, encoding: None | str = None, errors: None | str = None, newline: None | str = None, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False, True] = False, **request_kwargs, ) -> str | Coroutine[Any, Any, str]: def gen_step(): file = yield self.open( id_or_path, mode="r", encoding=encoding, errors=errors, newline=newline, pid=pid, refresh=refresh, async_=async_, # type: ignore **request_kwargs, ) return file.read() return run_gen_step(gen_step, async_) @overload def rglob( self, /, pattern: str = "", dirname: IDOrPathType = "", ignore_case: bool = False, allow_escaped_slash: bool = True, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False] = False, **request_kwargs, ) -> Iterator[Mapping]: ... @overload def rglob( self, /, pattern: str = "", dirname: IDOrPathType = "", ignore_case: bool = False, allow_escaped_slash: bool = True, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[True], **request_kwargs, ) -> AsyncIterator[Mapping]: ... def rglob( self, /, pattern: str = "", dirname: IDOrPathType = "", ignore_case: bool = False, allow_escaped_slash: bool = True, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False, True] = False, **request_kwargs, ) -> Iterator[Mapping] | AsyncIterator[Mapping]: if not pattern: return self.iter( dirname, max_depth=-1, pid=pid, refresh=refresh, async_=async_, **request_kwargs, ) if pattern.startswith("/"): pattern = joinpath("/", "**", pattern.lstrip("/")) else: pattern = joinpath("**", pattern) return self.glob( pattern, dirname=dirname, ignore_case=ignore_case, allow_escaped_slash=allow_escaped_slash, pid=pid, refresh=refresh, async_=async_, **request_kwargs, ) @overload def stat( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False] = False, **request_kwargs, ) -> stat_result: ... @overload def stat( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[True], **request_kwargs, ) -> Coroutine[Any, Any, stat_result]: ...
[docs] def stat( self, id_or_path: IDOrPathType = "", /, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False, True] = False, **request_kwargs, ) -> stat_result | Coroutine[Any, Any, stat_result]: "检查文件或目录的属性,就像 `os.stat`" def gen_step(): attr = yield self.get_attr( id_or_path, pid=pid, refresh=refresh, async_=async_, **request_kwargs, ) return self.attr_to_stat(attr) return run_gen_step(gen_step, async_)
@overload def walk( self, top: IDOrPathType = "", /, topdown: None | bool = True, min_depth: int = 1, max_depth: int = -1, onerror: bool | Callable[[OSError], bool] = True, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False] = False, **request_kwargs, ) -> Iterator[tuple[MutableMapping, list[MutableMapping], list[MutableMapping]]]: ... @overload def walk( self, top: IDOrPathType = "", /, topdown: None | bool = True, min_depth: int = 1, max_depth: int = -1, onerror: bool | Callable[[OSError], bool] = True, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[True], **request_kwargs, ) -> AsyncIterator[tuple[MutableMapping, list[MutableMapping], list[MutableMapping]]]: ... def walk( self, top: IDOrPathType = "", /, topdown: None | bool = True, min_depth: int = 1, max_depth: int = -1, onerror: bool | Callable[[OSError], bool] = True, pid: None | int = None, refresh: None | bool = None, *, async_: Literal[False, True] = False, **request_kwargs, ) -> Iterator[tuple[MutableMapping, list[MutableMapping], list[MutableMapping]]] | AsyncIterator[tuple[MutableMapping, list[MutableMapping], list[MutableMapping]]]: def gen_step(): try: attr = yield self.get_attr( top, pid=pid, refresh=refresh, async_=async_, **request_kwargs, ) except OSError as e: if callable(onerror): yield onerror(e) elif onerror: raise return yield YieldFrom(walk_generic( attr, iterdir=partial(self.readdir, refresh=refresh, async_=async_, **request_kwargs), topdown=topdown, min_depth=min_depth, max_depth=max_depth, isdir=itemgetter("is_dir"), onerror=onerror, async_=async_, )) if attr["id"] == 0: self.full_loaded = True return run_gen_step_iter(gen_step, async_)
# TODO: 增加方法 id_to_path 和 path_to_id 以加快查询速度 # TODO: 尽量先用数据库查一下,再去做下一步(如果不确定是不是目录,先用数据库查一下,成功就ok,不行再另外搞) # TODO: 有时还需要路径到 id 到映射,或者 id_to_dirnode 中 name 到 id 到映射 # TODO: 如果是 NO_INCREMENT,那么自动构建 id 和 path 之间的映射,否则只构建 pid_to_names 的映射 # TODO: id_to_dirnode 在并发写入时,会有 database is locked 的问题