Source code for p115client.tool.upload

#!/usr/bin/env python3
# encoding: utf-8

from __future__ import annotations

__author__ = "ChenyangGao <https://chenyanggao.github.io>"
__all__ = [
    "upload_host_image", "iter_115_to_115", "iter_115_to_115_resume", 
    "sha1_for_check_existence", "upload_for_check_existence", 
    "upload_init", "P115MultipartUpload", 
]
__doc__ = "这个模块提供了一些和上传有关的函数"

from collections.abc import (
    AsyncIterable, AsyncIterator, Awaitable, Buffer, Iterable, Callable, 
    Coroutine, Iterator, 
)
from inspect import isawaitable
from itertools import count, dropwhile
from os import fsdecode, PathLike
from typing import cast, overload, Any, Literal

from asynctools import to_list
from concurrenttools import threadpool_map, taskgroup_map, Return
from dicttools import dict_map
from filewrap import SupportsRead
from http_request import SupportsGeturl
from iterutils import (
    as_gen_step, foreach, run_gen_step, run_gen_step_iter, 
    with_iter_next, YieldFrom, 
)
from p115oss import (
    upload_file_init, oss_multipart_upload_init, oss_multipart_upload_complete, 
    oss_multipart_upload_url, oss_multipart_part_iter, oss_multipart_upload_part_iter, 
)
from p115pickcode import to_id
from yarl import URL

from ..client import check_response, P115Client, P115OpenClient
from ..type import P115URL
from ..tool import load_final_image
from .attr import normalize_attr_simple
from .download import iter_download_files
from .iterdir import iterdir, iter_files_with_path, unescape_115_charref


@overload
def upload_host_image(
    client: str | PathLike | P115Client, 
    file: ( Buffer | str | PathLike | URL | SupportsGeturl | 
            SupportsRead | Iterable[Buffer] ), 
    base_url: bool | str = False, 
    *, 
    async_: Literal[False] = False, 
    **request_kwargs, 
) -> P115URL:
    ...
@overload
def upload_host_image(
    client: str | PathLike | P115Client, 
    file: ( Buffer | str | PathLike | URL | SupportsGeturl | 
            SupportsRead | Iterable[Buffer] | AsyncIterable[Buffer] ), 
    base_url: bool | str = False, 
    *, 
    async_: Literal[True], 
    **request_kwargs, 
) -> Coroutine[P115URL, Any, Any]:
    ...
[docs] def upload_host_image( client: str | PathLike | P115Client, file: ( Buffer | str | PathLike | URL | SupportsGeturl | SupportsRead | Iterable[Buffer] | AsyncIterable[Buffer] ), base_url: bool | str = False, *, async_: Literal[False, True] = False, **request_kwargs, ) -> P115URL | Coroutine[P115URL, Any, Any]: """上传图片,然后可作为图床使用 .. caution:: 115 网盘允许图片最大到 50 MB :param client: 115 网盘客户端对象 :param file: 待上传的文件 :param base_url: 图片的基地址 - 如果为 False,上传到 U_4_-1,获取一次性的图片链接,有效时间 1 小时 - 如果为 True,上传到 U_4_-1,获取永久的图片链接 - 如果为 str,上传到 U_12_0,视为 302 代理,会把 user_id、id、pickcode、sha1 和 size 作为查询参数拼接到其后 :param async_: 是否异步 :param request_kwargs: 其余请求参数 :return: 图片链接 """ if isinstance(client, (str, PathLike)): client = P115Client(client, check_for_relogin=True) def gen_step(): if isinstance(base_url, bool): resp = yield client.upload_file_image( file, # type: ignore filename="x.jpg", async_=async_, # type: ignore **request_kwargs, ) check_response(resp) data = { "oss": resp["data"]["sha1"], "sha1": resp["data"]["file_sha1"], "size": int(resp["data"]["file_size"]), } if base_url: resp = yield client.life_get_pic_url( resp["data"]["sha1"], async_=async_, # type: ignore **request_kwargs, ) check_response(resp) return P115URL(resp["data"][0]["json"].replace("&i=0", "&i=1"), data) url = resp["data"]["thumb_url"] return P115URL(url[:url.index("?")], data) resp = yield client.upload_file_sample( file, # type: ignore filename="x.jpg", pid="U_12_0", async_=async_, # type: ignore **request_kwargs, ) check_response(resp) data = resp["data"] url = base_url + "?&"["?" in base_url] return P115URL( url + f"user_id={client.user_id}&id={data["file_id"]}&pickcode={data["pick_code"]}&sha1={data["sha1"]}&size={data["file_size"]}", resp["data"], ) return run_gen_step(gen_step, async_)
# TODO: 需要优化,减少代码量 # TODO: 支持 open 接口 # TODO: 再支持一个方法,目标 115 并不提供 client,只有 user_id 和 user_key @overload def iter_115_to_115( from_client: P115Client, to_client: P115Client, from_cid: int | str = 0, to_pid: int | str = 0, max_workers: int = 8, with_root: bool = True, use_iter_files: bool = False, *, async_: Literal[False] = False, **request_kwargs, ) -> Iterator[dict]: ... @overload def iter_115_to_115( from_client: P115Client, to_client: P115Client, from_cid: int | str = 0, to_pid: int | str = 0, max_workers: int = 8, with_root: bool = True, use_iter_files: bool = False, *, async_: Literal[True], **request_kwargs, ) -> AsyncIterator[dict]: ...
[docs] def iter_115_to_115( from_client: P115Client, to_client: P115Client, from_cid: int | str = 0, to_pid: int | str = 0, max_workers: int = 8, with_root: bool = True, use_iter_files: bool = False, async_: Literal[False, True] = False, **request_kwargs, ) -> Iterator[dict] | AsyncIterator[dict]: """从 115 传到 115 :param from_client: 来源 115 客户端对象 :param to_client: 去向 115 客户端对象 :param from_cid: 来源 115 的目录 id 或 pickcode :param to_pid: 去向 115 的父目录 id 或 pickcode :param max_workers: 最大并发数 :param with_root: 是否保留 `from_cid` 对应的目录名(如果为 False,则会少 1 级目录) :param use_iter_files: 如果为 True,则调用 iter_files_with_path,否则调用 iter_download_files :param async_: 是否异步 :param request_kwargs: 其它请求参数 :return: 迭代器,产生转移结果,有 3 种类型:"good"、"fail" 和 "skip" """ from_cid = to_id(from_cid) to_pid = to_id(to_pid) @as_gen_step def upload(attr: dict, pid: int, /): @as_gen_step def read_range_bytes_or_hash(sign_check: str, /): if attr["is_collect"]: url = yield from_client.download_url( attr["pickcode"], use_web_api=True, async_=async_, **request_kwargs, ) else: url = yield from_client.download_url( attr["pickcode"], app="android", async_=async_, **request_kwargs, ) return from_client.request( url, headers=dict(url["headers"], range="bytes="+sign_check), parse=False, async_=async_, **request_kwargs, ) try: if not use_iter_files: resp = yield from_client.fs_supervision( attr["pickcode"], async_=async_, **request_kwargs, ) check_response(resp) info = resp["data"] attr.update( id=int(info["file_id"]), name=info["file_name"], sha1=info["file_sha1"], size=int(info["file_size"]), is_collect=int(info["is_collect"]), file_type=int(info["file_type"]), ) if attr["is_collect"] and attr["size"] >= 1024 * 1024 * 115: return {"type": "skip", "attr": attr, "resp": None} resp = yield to_client.upload_file_init( filename=attr["name"], filesize=attr["size"], filesha1=attr["sha1"], pid=pid, read_range_bytes_or_hash=read_range_bytes_or_hash, base_url=True, async_=async_, **request_kwargs, ) check_response(resp) if resp.get("statuscode"): return {"type": "fail", "attr": attr, "resp": resp} else: return {"type": "good", "attr": attr, "resp": resp} except BaseException as e: if isinstance(e, OSError) and len(e.args) == 2 and isinstance(e.args[1], dict): return {"type": "fail", "attr": attr, "resp": e.args[1], "exc": e} else: return {"type": "fail", "attr": attr, "resp": None, "exc": e} key_of_id = "id" if with_root else "parent_id" @as_gen_step def get_pid(attr: dict, /): if use_iter_files: if attr["is_collect"] and attr["size"] >= 1024 * 1024 * 115: return Return({"type": "skip", "attr": attr, "resp": None}) if from_cid: dir_ = "/".join(a["name"] for a in dropwhile( lambda a: a[key_of_id] != from_cid, attr["ancestors"][1:-1], )) else: dir_ = "/".join(a["name"] for a in attr["ancestors"][1:-1]) else: if from_cid: dir_ = "/".join(a["name"] for a in dropwhile( lambda a: a[key_of_id] != from_cid, attr["dir_ancestors"][1:], )) else: dir_ = attr["dirname"][1:] if dir_ in dir_to_cid: return dir_to_cid[dir_] else: resp = yield to_client.fs_makedirs_app( dir_, to_pid, async_=async_, **request_kwargs, ) check_response(resp) pid = dir_to_cid[dir_] = resp["cid"] return pid dir_to_cid = {"": 0} if use_iter_files: it = iter_files_with_path( from_client, from_cid, normalize_attr=normalize_attr_simple, async_=async_, **request_kwargs, ) else: it = iter_download_files( from_client, from_cid, async_=async_, **request_kwargs, ) if async_: return taskgroup_map(upload, it, arg_func=get_pid, max_workers=max_workers) else: return threadpool_map(upload, it, arg_func=get_pid, max_workers=max_workers)
# TODO: 需要优化,减少代码量 # TODO: 支持一次性把所有文件找完,也支持慢慢处理(可能会风控) # TODO: 支持 open 接口 @overload def iter_115_to_115_resume( from_client: P115Client, to_client: P115Client, from_cid: int | str = 0, to_pid: int | str = 0, max_workers: int = 8, with_root: bool = True, *, async_: Literal[False] = False, **request_kwargs, ) -> Iterator[dict]: ... @overload def iter_115_to_115_resume( from_client: P115Client, to_client: P115Client, from_cid: int | str = 0, to_pid: int | str = 0, max_workers: int = 8, with_root: bool = True, *, async_: Literal[True], **request_kwargs, ) -> AsyncIterator[dict]: ...
[docs] def iter_115_to_115_resume( from_client: P115Client, to_client: P115Client, from_cid: int | str = 0, to_pid: int | str = 0, max_workers: int = 8, with_root: bool = True, *, async_: Literal[False, True] = False, **request_kwargs, ) -> Iterator[dict] | AsyncIterator[dict]: """从 115 传到 115(可以跳过已经存在的文件) :param from_client: 来源 115 客户端对象 :param to_client: 去向 115 客户端对象 :param from_cid: 来源 115 的目录 id 或 pickcode(文件数最好控制在 100 万以内,太多的话,里面多个子文件夹分别传即可) :param to_pid: 去向 115 的父目录 id 或 pickcode :param max_workers: 最大并发数 :param with_root: 是否保留 `from_cid` 对应的目录名(如果为 False,则会少 1 级目录) :param async_: 是否异步 :param request_kwargs: 其它请求参数 :return: 迭代器,产生转移结果,有 3 种类型:"good"、"fail" 和 "skip" """ from_cid = to_id(from_cid) to_pid = to_id(to_pid) @as_gen_step def upload(attr: dict, pid: int, /): @as_gen_step def read_range_bytes_or_hash(sign_check: str, /): if attr["is_collect"]: url = yield from_client.download_url( attr["pickcode"], use_web_api=True, async_=async_, **request_kwargs, ) else: url = yield from_client.download_url( attr["pickcode"], app="android", async_=async_, **request_kwargs, ) return from_client.request( url, headers=dict(url["headers"], range="bytes="+sign_check), parse=False, async_=async_, **request_kwargs, ) try: resp = yield to_client.upload_file_init( filename=attr["name"], filesize=attr["size"], filesha1=attr["sha1"], pid=pid, read_range_bytes_or_hash=read_range_bytes_or_hash, base_url=True, async_=async_, **request_kwargs, ) check_response(resp) if resp.get("statuscode"): return {"type": "fail", "attr": attr, "resp": resp} else: return {"type": "good", "attr": attr, "resp": resp} except BaseException as e: if isinstance(e, OSError) and len(e.args) == 2 and isinstance(e.args[1], dict): return {"type": "fail", "attr": attr, "resp": e.args[1], "exc": e} else: return {"type": "fail", "attr": attr, "resp": None, "exc": e} dirt_to_cid: dict[tuple[str, ...], int] = {} key_of_id = "id" if with_root else "parent_id" @as_gen_step def get_pid(attr: dict, /): if attr["is_collect"] and attr["size"] >= 1024 * 1024 * 115: return Return({"type": "skip", "attr": attr, "resp": None}) dirt = tuple(a["name"] for a in dropwhile( lambda a: a[key_of_id] != from_cid, attr["ancestors"][1:-1], )) try: return dirt_to_cid[dirt] except KeyError: pid = dirt_to_cid[()] for i, name in enumerate(dirt, 1): p_dirt = dirt[:i] if p_dirt in dirt_to_cid: pid = dirt_to_cid[p_dirt] else: resp = yield to_client.fs_mkdir( name, pid, async_=async_, **request_kwargs, ) check_response(resp) pid = dirt_to_cid[p_dirt] = int(resp["cid"]) return pid def gen_step(): from_files: Any = iter_files_with_path( from_client, from_cid, normalize_attr=normalize_attr_simple, with_ancestors=True, async_=async_, **request_kwargs, ) if from_cid: resp = yield from_client.fs_file_skim( from_cid, async_=async_, **request_kwargs, ) check_response(resp) name = unescape_115_charref(resp["data"][0]["file_name"]) resp = yield to_client.fs_mkdir( name, to_pid, async_=async_, **request_kwargs, ) if resp.get("errno") == 20004: if "/" in name: with with_iter_next(iterdir( to_client, to_pid, normalize_attr=normalize_attr_simple, ensure_file=False, async_=async_, **request_kwargs, )) as get_next: while True: attr = yield get_next() if attr["name"] == name: to_cid = attr["id"] break else: resp = yield to_client.fs_makedirs_app( name, to_pid, async_=async_, **request_kwargs, ) check_response(resp) to_cid = int(resp["cid"]) dirt_to_cid[()] = to_cid id_to_dirnode: dict[int, tuple[str, int]] = {} to_files: Any = iter_files_with_path( to_client, to_cid, id_to_dirnode=id_to_dirnode, normalize_attr=normalize_attr_simple, with_ancestors=True, async_=async_, **request_kwargs, ) if async_: from_files, to_files = yield to_list( taskgroup_map(to_list, (from_files, to_files), max_workers=2)) else: from_files, to_files = threadpool_map(list, (from_files, to_files), max_workers=2) while to_cid: _, to_cid = id_to_dirnode.pop(to_cid) cid_to_dirt: dict[int, tuple[str, ...]] = {} def get_dirt(cid: int, /) -> tuple[str, ...]: if cid not in id_to_dirnode: return () name, pid = id_to_dirnode[cid] if pid in cid_to_dirt: p_dirt = cid_to_dirt[pid] else: p_dirt = get_dirt(pid) dirt = (*p_dirt, name) dirt_to_cid[dirt] = cid cid_to_dirt[cid] = dirt return dirt for cid, (name, pid) in id_to_dirnode.items(): if cid not in cid_to_dirt: get_dirt(cid) del cid_to_dirt, id_to_dirnode to_cid = dirt_to_cid[()] seen = { tuple(a["name"] for a in dropwhile( lambda a: a["parent_id"] != to_cid, attr["ancestors"][1:], )) for attr in to_files } from_files = [ attr for attr in from_files if tuple(a["name"] for a in dropwhile( lambda a: a["parent_id"] != from_cid, attr["ancestors"][1:], )) not in seen ] del to_files, seen else: check_response(resp) dirt_to_cid[()] = int(resp["cid"]) else: dirt_to_cid[()] = 0 if async_: return YieldFrom(taskgroup_map( upload, from_files, arg_func=get_pid, max_workers=max_workers, )) else: return YieldFrom(threadpool_map( upload, from_files, arg_func=get_pid, max_workers=max_workers, )) return run_gen_step_iter(gen_step, async_)
@overload def sha1_for_check_existence( client: str | PathLike | P115Client, sha1: str, *, async_: Literal[False] = False, **request_kwargs, ) -> bool: ... @overload def sha1_for_check_existence( client: str | PathLike | P115Client, sha1: str, *, async_: Literal[True], **request_kwargs, ) -> Coroutine[Any, Any, bool]: ...
[docs] def sha1_for_check_existence( client: str | PathLike | P115Client, sha1: str, *, async_: Literal[False, True] = False, **request_kwargs, ) -> bool | Coroutine[Any, Any, bool]: """判断某个文件(用 `sha1` 唯一确定)是否存在于 115 网盘上(但不一定在你自己的网盘中) :param client: 115 客户端或 cookies :param sha1: 文件的 sha1 哈希值 :param async_: 是否异步 :param request_kwargs: 其它请求参数 :return: 是否存在文件 """ if isinstance(client, (str, PathLike)): client = P115Client(client, check_for_relogin=True) def gen_step(): resp = yield client.note_get_pic_url(sha1, async_=async_, **request_kwargs) check_response(resp) ret = yield load_final_image(resp["data"][0], async_=async_) return ret != 404 return run_gen_step(gen_step, async_)
@overload def upload_for_check_existence( client: str | PathLike | P115Client | P115OpenClient, sha1: str, size: int, *, async_: Literal[False] = False, **request_kwargs, ) -> bool: ... @overload def upload_for_check_existence( client: str | PathLike | P115Client | P115OpenClient, sha1: str, size: int, *, async_: Literal[True], **request_kwargs, ) -> Coroutine[Any, Any, bool]: ...
[docs] def upload_for_check_existence( client: str | PathLike | P115Client | P115OpenClient, sha1: str, size: int, *, async_: Literal[False, True] = False, **request_kwargs, ) -> bool | Coroutine[Any, Any, bool]: """通过秒传接口,判断某个文件(用 `sha1` 和 `size` 唯一确定)是否存在于 115 网盘上(但不一定在你自己的网盘中) :param client: 115 客户端或 cookies :param sha1: 文件的 sha1 哈希值 :param size: 文件大小 :param async_: 是否异步 :param request_kwargs: 其它请求参数 :return: 是否存在文件 """ if isinstance(client, (str, PathLike)): client = P115Client(client, check_for_relogin=True) def gen_step(): if isinstance(client, P115Client): resp = yield client.upload_init( {"fileid": sha1.upper(), "filesize": size, "filename": "?"}, async_=async_, **request_kwargs, ) else: resp = yield client.upload_init_open( {"fileid": sha1.upper(), "file_size": size, "file_name": "?", "target": "U_1_0"}, async_=async_, **request_kwargs, ) resp = resp["data"] return resp["status"] in (2, 7) return run_gen_step(gen_step, async_)
@overload def upload_init( client: str | PathLike | P115Client | P115OpenClient, file: Buffer | str | PathLike | URL | SupportsGeturl | SupportsRead, pid: int | str = 0, filename: str = "", filesha1: str = "", filesize: int = -1, endpoint: str = "http://oss-cn-shenzhen.aliyuncs.com", *, async_: Literal[False] = False, **request_kwargs, ) -> dict: ... @overload def upload_init( client: str | PathLike | P115Client | P115OpenClient, file: Buffer | str | PathLike | URL | SupportsGeturl | SupportsRead, pid: int | str = 0, filename: str = "", filesha1: str = "", filesize: int = -1, endpoint: str = "http://oss-cn-shenzhen.aliyuncs.com", *, async_: Literal[True], **request_kwargs, ) -> Coroutine[Any, Any, dict]: ...
[docs] def upload_init( client: str | PathLike | P115Client | P115OpenClient, file: Buffer | str | PathLike | URL | SupportsGeturl | SupportsRead, pid: int | str = 0, filename: str = "", filesha1: str = "", filesize: int = -1, endpoint: str = "http://oss-cn-shenzhen.aliyuncs.com", *, async_: Literal[False, True] = False, **request_kwargs, ) -> dict | Coroutine[Any, Any, dict]: """准备上传,获取必要信息,可能秒传成功 :param client: 115 客户端或 cookies :param file: 待上传的文件或其路径 :param pid: 上传文件到目录的 id 或 pickcode :param filename: 文件名,若为空则自动确定 :param filesha1: 文件的 sha1 哈希值,若为空则自动计算 :param filesize: 文件大小,若为负数则自动计算 :param endpoint: 上传目的网址 :param async_: 是否异步 :param request_kwargs: 其它请求参数 :return: 响应信息,如果有字段 "reuse" 为 True,则说明秒传成功 """ if isinstance(client, (str, PathLike)): client = P115Client(client, check_for_relogin=True) if isinstance(client, P115Client): return upload_file_init( file=file, pid=to_id(pid), filename=filename, filesha1=filesha1, filesize=filesize, user_id=client.user_id, user_key=client.user_key, endpoint=endpoint, async_=async_, **request_kwargs, ) else: request_kwargs["headers"] = dict( request_kwargs.get("headers") or (), authorization=client.headers["authorization"], ) return upload_file_init( file=file, pid=to_id(pid), filename=filename, filesha1=filesha1, filesize=filesize, endpoint=endpoint, async_=async_, **request_kwargs, )
[docs] class P115MultipartUpload: """待分块上传对象 :param url: HTTP 请求链接,包含存储桶和对象的名字 :param path: 待上传的文件路径 :param callback: 回调数据 :param upload_id: 上传任务 id 下面是一个上传的例子,会在命令行显示进度条 .. code:: from pathlib import Path from p115client import P115Client from p115client.tool import P115MultipartUpload client = P115Client(Path("~/115-cookies.txt").expanduser()) # NOTE: 待上传文件的路径(同样也支持 URL) path = "/path/to/file" uploader = P115MultipartUpload.from_path(path, user_id=client.user_id, user_key=client.user_key) # NOTE: 返回字典说明秒传成功 if isinstance(uploader, dict): print(uploader) else: from os.path import getsize # NOTE: 你可以随意指定其它各种进度条模块,或者自己写的函数 from tqdm import tqdm # NOTE: 文件总大小需要你自己获取,`reporthook`只做增量推送 with tqdm(total=getsize(path), unit="B", unit_scale=True, desc="Uploading") as t: # NOTE: `iter_upload` 支持其它请求模块,例如 urllib3 # from urllib3_request import request # uploader.iter_upload(request=request) for _ in uploader.iter_upload(reporthook=t.update): pass print(uploader.complete()) 你也可以自己写一个进度条 .. code:: from collections import deque from time import perf_counter def make_reporthook(total: None | int = None): dq: deque[tuple[int, float]] = deque(maxlen=64) push = dq.append read_num = 0 push((read_num, perf_counter())) while True: read_num += yield cur_t = perf_counter() speed = (read_num - dq[0][0]) / 1024 / 1024 / (cur_t - dq[0][1]) if total: percentage = read_num / total * 100 print(f"\\r\\x1b[K{read_num} / {total} | {speed:.2f} MB/s | {percentage:.2f} %", end="", flush=True) else: print(f"\\r\\x1b[K{read_num} | {speed:.2f} MB/s", end="", flush=True) push((read_num, cur_t)) 然后像下面这样使用 .. code:: for _ in uploader.iter_upload(reporthook=make_reporthook(getsize(path)).send): pass """ __slots__ = ("url", "path", "callback", "upload_id", "_result") def __init__( self, /, url: str, path: str | PathLike | URL | SupportsGeturl, callback: dict, upload_id: str = "", ): self.url = url if isinstance(path, PathLike): path = fsdecode(path) elif isinstance(path, URL): path = str(path) elif isinstance(path, SupportsGeturl): path = path.geturl() self.path = cast(str, path) self.callback = callback if not upload_id: upload_id = oss_multipart_upload_init(url) self.upload_id = upload_id self._result = None def __repr__(self, /) -> str: cls = type(self) mod = cls.__module__ name = cls.__qualname__ url = self.url path = self.path callback = self.callback upload_id = self.upload_id return f"{mod}.{name}({url=!r}, {path=!r}, {callback=!r}, {upload_id=!r})" @overload @classmethod def from_path( cls, /, path: str | PathLike | URL | SupportsGeturl, pid: int | str = 0, filename: str = "", filesha1: str = "", filesize: int = -1, user_id: int | str = "", user_key: str = "", endpoint: str = "http://oss-cn-shenzhen.aliyuncs.com", *, async_: Literal[False] = False, **request_kwargs, ) -> dict | P115MultipartUpload: ... @overload @classmethod def from_path( cls, /, path: str | PathLike | URL | SupportsGeturl, pid: int | str = 0, filename: str = "", filesha1: str = "", filesize: int = -1, user_id: int | str = "", user_key: str = "", endpoint: str = "http://oss-cn-shenzhen.aliyuncs.com", *, async_: Literal[True], **request_kwargs, ) -> Coroutine[Any, Any, dict | P115MultipartUpload]: ...
[docs] @classmethod def from_path( cls, /, path: str | PathLike | URL | SupportsGeturl, pid: int | str = 0, filename: str = "", filesha1: str = "", filesize: int = -1, user_id: int | str = "", user_key: str = "", endpoint: str = "http://oss-cn-shenzhen.aliyuncs.com", *, async_: Literal[False, True] = False, **request_kwargs, ) -> dict | P115MultipartUpload | Coroutine[Any, Any, dict | P115MultipartUpload]: """准备上传,获取必要信息,可能秒传成功 :param path: 待上传的文件路径 :param pid: 上传文件到目录的 id 或 pickcode :param filename: 文件名,若为空则自动确定 :param filesha1: 文件的 sha1 哈希值,若为空则自动计算 :param filesize: 文件大小,若为负数则自动计算 :param user_id: 用户 id :param user_key: 用户的 key :param endpoint: 上传目的网址 :param async_: 是否异步 :param request_kwargs: 其它请求参数 :return: 秒传成功的响应或者待分块上传对象 """ def gen_step(): if user_id and user_key: resp = yield upload_file_init( file=path, pid=to_id(pid), filename=filename, filesha1=filesha1, filesize=filesize, user_id=user_id, user_key=user_key, endpoint=endpoint, async_=async_, **request_kwargs, ) else: try: headers = request_kwargs["headers"] except KeyError as e: raise TypeError(f"{cls.from_path!r} missing 1 required keyword-only argument: 'headers'") from e headers = request_kwargs["headers"] = dict_map(headers or (), key=str.lower) if "authorization" in headers: resp = yield upload_file_init( file=path, pid=to_id(pid), filename=filename, filesha1=filesha1, filesize=filesize, endpoint=endpoint, async_=async_, **request_kwargs, ) elif "cookie" in headers: client = P115Client(headers["cookie"]) resp = yield upload_file_init( file=path, pid=to_id(pid), filename=filename, filesha1=filesha1, filesize=filesize, user_id=client.user_id, user_key=client.user_key, endpoint=endpoint, async_=async_, **request_kwargs, ) else: raise ValueError("please provide the request header: 'authorization' or 'cookie'") check_response(resp) if resp["reuse"]: return resp data = resp["data"] url = data["url"] upload_id = yield oss_multipart_upload_init( url, async_=async_, **request_kwargs, ) return cls(url, path, data["callback"], upload_id) return run_gen_step(gen_step, async_)
@property def completed(self, /) -> bool: "是否已完成" return bool(self._result) @property def result(self, /) -> None | dict: "完成后的结果" return self._result @property def succeeded(self, /) -> bool: "结果是否成功" return bool(self._result and self._result["state"]) @overload def complete( self, /, parts: None | list[dict] = None, *, async_: Literal[False] = False, **request_kwargs, ) -> dict: ... @overload def complete( self, /, parts: None | list[dict] = None, *, async_: Literal[True], **request_kwargs, ) -> Coroutine[Any, Any, dict]: ...
[docs] def complete( self, /, parts: None | list[dict] = None, *, async_: Literal[False, True] = False, **request_kwargs, ) -> dict | Coroutine[Any, Any, dict]: """完成分块上传 :param parts: 分块信息列表 :param async_: 是否异步 :param request_kwargs: 其它请求参数 :return: 接口响应 """ if self.completed: raise RuntimeError("already completed") def gen_step(): nonlocal parts if parts is None: parts = yield self.list_parts(async_=async_, **request_kwargs) resp = self._result = yield oss_multipart_upload_complete( self.url, upload_id=self.upload_id, callback=self.callback, parts=parts, async_=async_, **request_kwargs, ) return resp return run_gen_step(gen_step, async_)
@overload def list_parts( self, /, async_: Literal[False] = False, **request_kwargs, ) -> list[dict]: ... @overload def list_parts( self, /, async_: Literal[True], **request_kwargs, ) -> Coroutine[Any, Any, list[dict]]: ...
[docs] def list_parts( self, /, async_: Literal[False, True] = False, **request_kwargs, ) -> list[dict] | Coroutine[Any, Any, list[dict]]: """罗列已上传的分块信息 :param async_: 是否异步 :param request_kwargs: 其它请求参数 :return: 分块信息列表 """ def gen_step(): parts: list[dict] = [] yield foreach( parts.append, oss_multipart_part_iter( self.url, self.upload_id, async_=async_, **request_kwargs, ), ) return parts return run_gen_step(gen_step, async_)
@overload def upload_url( self, /, part_number: int = 1, *, async_: Literal[False] = False, **request_kwargs, ) -> dict: ... @overload def upload_url( self, /, part_number: int = 1, *, async_: Literal[True], **request_kwargs, ) -> Coroutine[Any, Any, dict]: ...
[docs] def upload_url( self, /, part_number: int = 1, *, async_: Literal[False, True] = False, **request_kwargs, ) -> dict | Coroutine[Any, Any, dict]: """获取分块上传的链接和请求头 .. caution:: 这个接口只用来获取上传链接和请求头,并不会做实际的上传 :param part_number: 分块编号(从 1 开始) :param async_: 是否异步 :param request_kwargs: 其它请求参数 :return: 上传链接 和 请求头 的 2 元组 """ def gen_step(): from p115oss.api import _upload_token token = yield _upload_token(async_=async_) result = yield oss_multipart_upload_url( self.url, upload_id=self.upload_id, part_number=part_number, async_=async_, # type: ignore **request_kwargs, ) result["token"] = token return result return run_gen_step(gen_step, async_)
@overload def iter_upload( self, /, partsize: int = 10485760, reporthook: None | Callable[[int], Any] = None, opener: None | Callable[[str, int], SupportsRead | Iterable[Buffer]] = None, *, async_: Literal[False] = False, **request_kwargs, ) -> Iterator[dict]: ... @overload def iter_upload( self, /, partsize: int = 10485760, reporthook: None | Callable[[int], Any] = None, opener: None | Callable[[str, int], SupportsRead | Awaitable[SupportsRead] | Iterable[Buffer] | AsyncIterable[Buffer]] = None, *, async_: Literal[True], **request_kwargs, ) -> AsyncIterator[dict]: ...
[docs] def iter_upload( self, /, partsize: int = 10485760, reporthook: None | Callable[[int], Any] = None, opener: ( None | Callable[[str, int], SupportsRead | Iterable[Buffer]] | Callable[[str, int], SupportsRead | Awaitable[SupportsRead] | Iterable[Buffer] | AsyncIterable[Buffer]] ) = None, *, async_: Literal[False, True] = False, **request_kwargs, ) -> Iterator[dict] | AsyncIterator[dict]: """逐个上传分块 .. attention:: 上传完成后,并不会提交,请手动调用 ``.complete()`` 方法 .. note:: - 可随意搭配请求模块: 指定 ``request`` 参数 - 可随意搭配进度条: 指定 ``reporthook`` 参数 - 可随意搭配文件打开器: 指定 ``opener`` 参数 .. note:: 如果想把把上传过程外包出去,由其它任何工具完成,则调用 ``upload_url(part_number)`` 方法获得指定分块的上传链接和带签名请求头 或者调用 ``iter_upload_url(part_number_start)`` 创建一个迭代器,从某个分块编号开始,获得一系列的上传链接和带签名请求头 :param partsize: 分块大小 :param reporthook: 回调函数,可以用来统计已上传的数据量或者展示进度条 :param opener: 打开文件路径(本地路径或 URL)并从指定位置开始,如果为 None,则用默认方式 :param async_: 是否异步 :param request_kwargs: 其它请求参数 :return: 迭代器,产生各个刚上传完成的分块信息 """ def gen_step(): parts = yield self.list_parts(async_=async_, **request_kwargs) skipsize = sum(p["Size"] for p in parts) if reporthook is not None: reporthook(skipsize) path = self.path if opener is not None: file = opener(path, skipsize) if async_ and isawaitable(file): file = yield file elif path.startswith(("http://", "https://")): if async_: from httpfile import AsyncHTTPFileReader async def process(): return await AsyncHTTPFileReader.new(path, start=skipsize) file = yield process() else: from httpfile import HTTPFileReader file = HTTPFileReader(path, start=skipsize) else: file = open(path, "rb") if skipsize: file.seek(skipsize) file = cast(SupportsRead | Iterable[Buffer] | AsyncIterable[Buffer], file) try: yield YieldFrom(oss_multipart_upload_part_iter( url=self.url, file=file, # type: ignore upload_id=self.upload_id, partsize=partsize, part_number_start=len(parts)+1, reporthook=reporthook, async_=async_, # type: ignore **request_kwargs, )) finally: if async_: if hasattr(file, "aclose"): yield file.aclose() elif hasattr(file, "close"): ret = file.close() if isawaitable(ret): yield ret elif hasattr(file, "close"): file.close() return run_gen_step_iter(gen_step, async_)
[docs] def iter_upload_url( self, /, part_number_start: int = 1, headers: None | dict[str, str] = None, ) -> Iterator[dict]: """逐个获取上传链接和请求头 .. caution:: 这个接口只用来获取上传链接和请求头,并不会做实际的上传,而且也不会判断总共有多少个分块,而是无限生成 :param part_number_start: 开始的分块编号,从 1 开始 :param headers: 默认的请求头,会被扩展 :return: 迭代器,产生上传链接和请求头(带签名) """ if part_number_start <= 0: part_number_start = 1 get_url = self.upload_url for part_number in count(part_number_start): yield get_url(part_number, headers=headers)
# TODO: 增加一个工具函数,用于从某个本地目录下载到网盘目录,允许提供自定义的进度条调用 # TODO: 增加一个工具函数,用于在两个115网盘之间的转移,允许提供自定义的进度条调用