#!/usr/bin/env python3
# encoding: utf-8
from __future__ import annotations
__all__ = ["P115ZipPath", "P115ZipFileSystem"]
from collections.abc import (
AsyncIterator, Coroutine, Iterable, Iterator, MutableMapping,
)
from itertools import count
from os import PathLike
from posixpath import basename, dirname
from threading import Lock
from typing import overload, Any, Literal
from dictattr import AttrDict
from dicttools import dict_key_to_lower_update
from errno2 import errno
from iterutils import run_gen_step, run_gen_step_iter, with_iter_next, Yield
from ..client import check_response, P115Client
from ..exception import throw
from ..tool import extract_iter_files, extract_iterdir
from ..type import P115URL
from ..util import lock_as_async
from .fs_base import IDOrPathType, P115PathBase, P115FileSystemBase
# TODO: 尽量也要兼容 zipfile.Path 的接口
class P115ZipPath(P115PathBase):
__slots__ = ("fs", "attr")
fs: P115ZipFileSystem
[docs]
class P115ZipFileSystem(P115FileSystemBase[P115ZipPath]):
path_class = P115ZipPath
def __init__(
self,
/,
client: str | PathLike | P115Client,
pickcode: str,
refresh: bool = False,
id_to_readdir: None | int | dict[int, dict[int, MutableMapping]] = None,
):
super().__init__(client, refresh=refresh, id_to_readdir=id_to_readdir)
self.pickcode = pickcode
self._path_to_id: dict[str, int] = {"/": 0}
self._id_to_path: dict[int, str] = {0: "/"}
self._load_lock = Lock()
@overload
def id_to_path(
self,
id: int,
/,
async_: Literal[False] = False,
**request_kwargs,
) -> str:
...
@overload
def id_to_path(
self,
id: int,
/,
async_: Literal[True],
**request_kwargs,
) -> Coroutine[Any, Any, str]:
...
def id_to_path(
self,
id: int,
/,
async_: Literal[False, True] = False,
**request_kwargs,
) -> str | Coroutine[Any, Any, str]:
def gen_step():
if not self.full_loaded:
yield self.load(locked=True, async_=async_, **request_kwargs)
try:
return self._id_to_path[id]
except KeyError:
throw(errno.ENOENT, id)
return run_gen_step(gen_step, async_)
@overload
def path_to_id(
self,
path: str,
/,
async_: Literal[False] = False,
**request_kwargs,
) -> int:
...
@overload
def path_to_id(
self,
path: str,
/,
async_: Literal[True],
**request_kwargs,
) -> Coroutine[Any, Any, int]:
...
def path_to_id(
self,
path: str,
/,
async_: Literal[False, True] = False,
**request_kwargs,
) -> int | Coroutine[Any, Any, int]:
def gen_step():
if not self.full_loaded:
yield self.load(locked=True, async_=async_, **request_kwargs)
try:
return self._path_to_id[path]
except KeyError:
throw(errno.ENOENT, path)
return run_gen_step(gen_step, async_)
@overload
def extract(
self,
/,
path: IDOrPathType = "",
to_pid: int | str = 0,
pid: None | int = None,
*,
async_: Literal[False] = False,
**request_kwargs,
) -> int:
...
@overload
def extract(
self,
/,
path: IDOrPathType = "",
to_pid: int | str = 0,
pid: None | int = None,
*,
async_: Literal[True],
**request_kwargs,
) -> Coroutine[Any, Any, int]:
...
@overload
def extract_many(
self,
/,
paths: Iterable[IDOrPathType],
to_pid: int | str = 0,
pid: None | int = None,
*,
async_: Literal[False] = False,
**request_kwargs,
) -> int:
...
@overload
def extract_many(
self,
/,
paths: Iterable[IDOrPathType],
to_pid: int | str = 0,
pid: None | int = None,
*,
async_: Literal[True],
**request_kwargs,
) -> Coroutine[Any, Any, int]:
...
@overload
def get_url(
self,
id_or_path: IDOrPathType,
/,
pid: None | int = None,
refresh: None | bool = None,
*,
async_: Literal[False] = False,
**request_kwargs,
) -> P115URL:
...
@overload
def get_url(
self,
id_or_path: IDOrPathType,
/,
pid: None | int = None,
refresh: None | bool = None,
*,
async_: Literal[True],
**request_kwargs,
) -> Coroutine[Any, Any, P115URL]:
...
[docs]
def get_url(
self,
id_or_path: IDOrPathType,
/,
pid: None | int = None,
refresh: None | bool = None,
*,
async_: Literal[False, True] = False,
**request_kwargs,
) -> P115URL | Coroutine[Any, Any, P115URL]:
"获取下载链接"
def gen_step():
headers = request_kwargs["headers"] = dict(request_kwargs.get("headers") or ())
dict_key_to_lower_update(headers)
user_agent = headers.pop("user-agent", "")
attr = yield self.get_attr(
id_or_path,
pid=pid,
ensure_file=True,
async_=async_,
**request_kwargs,
)
return self.client.extract_download_url(
self.pickcode,
attr["path"],
user_agent=user_agent,
async_=async_,
**request_kwargs,
)
return run_gen_step(gen_step, async_)
@overload
def iterdir(
self,
id: int,
/,
async_: Literal[False] = False,
**request_kwargs,
) -> Iterator[dict]:
...
@overload
def iterdir(
self,
id: int,
/,
async_: Literal[True],
**request_kwargs,
) -> AsyncIterator[dict]:
...
[docs]
def iterdir(
self,
id: int,
/,
async_: Literal[False, True] = False,
**request_kwargs,
) -> Iterator[dict] | AsyncIterator[dict]:
"""迭代获取目录内直属的文件或目录的信息
"""
def gen_step():
if not self.full_loaded:
yield self.load(locked=True, async_=async_, **request_kwargs)
if id:
try:
attr = self.id_to_attr[id]
except KeyError:
throw(errno.ENOENT, id)
if not attr["is_dir"]:
throw(errno.ENOTDIR, attr)
path = attr["path"]
else:
path = "/"
with with_iter_next(extract_iterdir(
self.client,
self.pickcode,
path,
async_=async_,
**request_kwargs,
)) as get_next:
while True:
attr = yield get_next()
try:
attr["id"] = self._path_to_id[attr["path"]]
except KeyError:
continue
yield Yield(attr)
return run_gen_step_iter(gen_step, async_)
@overload
def readdir(
self,
id_or_path: IDOrPathType = "",
/,
pid: None | int = None,
refresh: None | bool = None,
*,
async_: Literal[False] = False,
**request_kwargs,
) -> list[MutableMapping]:
...
@overload
def readdir(
self,
id_or_path: IDOrPathType = "",
/,
pid: None | int = None,
refresh: None | bool = None,
*,
async_: Literal[True],
**request_kwargs,
) -> Coroutine[Any, Any, list[MutableMapping]]:
...
def readdir(
self,
id_or_path: IDOrPathType = "",
/,
pid: None | int = None,
refresh: None | bool = None,
*,
async_: Literal[False, True] = False,
**request_kwargs,
) -> list[MutableMapping] | Coroutine[Any, Any, list[MutableMapping]]:
readdir = super().readdir
def gen_step():
if not self.full_loaded:
yield self.load(locked=True, async_=async_, **request_kwargs)
return readdir(
id_or_path,
pid=pid,
refresh=refresh,
async_=async_,
**request_kwargs,
)
return run_gen_step(gen_step, async_)
@overload
def load(
self,
/,
locked: bool = True,
*,
async_: Literal[False] = False,
**request_kwargs,
) -> None:
...
@overload
def load(
self,
/,
locked: bool = False,
*,
async_: Literal[True],
**request_kwargs,
) -> Coroutine[Any, Any, None]:
...
[docs]
def load(
self,
/,
locked: bool = False,
*,
async_: Literal[False, True] = False,
**request_kwargs,
) -> None | Coroutine[Any, Any, None]:
"""一次性加载整个压缩包中的文件列表
"""
def gen_step():
if locked:
if async_:
async def request():
async with lock_as_async(self._load_lock):
if not self.full_loaded:
await self.load(async_=True, **request_kwargs)
yield request()
else:
with self._load_lock:
if not self.full_loaded:
self.load(**request_kwargs)
return
path_to_id = self._path_to_id
id_to_path = self._id_to_path
id_to_dirnode = self.id_to_dirnode
id_to_readdir = self.id_to_readdir
id_to_attr = self.id_to_attr
get_id = count(1).__next__
id_to_readdir[0] = {}
def get_parent_id(path: str, /) -> int:
dir_ = dirname(path)
if dir_ == "/":
return 0
try:
return path_to_id[dir_]
except KeyError:
pid = get_parent_id(dir_)
cid = path_to_id[dir_] = get_id()
id_to_path[cid] = dir_
name = basename(dir_)
id_to_readdir[pid][cid] = id_to_attr[cid] = AttrDict({
"id": cid,
"parent_id": pid,
"name": name,
"is_dir": True,
"path": dir_,
})
id_to_dirnode[cid] = (name, pid)
id_to_readdir[cid] = {}
return cid
with with_iter_next(extract_iter_files(
self.client,
self.pickcode,
async_=async_,
**request_kwargs,
)) as get_next:
while True:
attr: MutableMapping = AttrDict((yield get_next()))
attr["parent_id"] = pid = get_parent_id(attr["path"])
attr["id"] = id = get_id()
attr["is_dir"] = False
id_to_readdir[pid][id]= id_to_attr[id] = attr
path = attr["path"]
id_to_path[id] = path
path_to_id[attr["path"]] = id
self.full_loaded = True
return run_gen_step(gen_step, async_)
# TODO: 增加接口,可用于检查文件是否已经云解压
# resp = check_response(client.extract_push_progress(pickcode))
# if resp["data"]["extract_status"]["unzip_status"] != 4:
# raise OSError(errno.EIO, "file was not decompressed")
# TODO: 增加接口,可用于推送云解压,云解压进度,解压到文件夹进度等
# TODO: 参考 zipfile 模块的接口设计 namelist、filelist 等属性,以及其它的和 zipfile 兼容的接口
# TODO: 当文件特别多时,可以用 zipfile 等模块来读取文件列表