#!/usr/bin/env python3
# encoding: utf-8
__all__ = [
"complete_url", "reduce_image_url_layers", "max_image_quality",
"load_final_image", "share_extract_payload", "unescape_115_charref",
"determine_part_size", "to_cdn_url", "is_valid_id", "is_valid_sha1",
"is_valid_name", "is_valid_pickcode", "posix_escape_name", "lock_as_async",
"call_with_lock",
]
__doc__ = "这个模块提供了一些工具函数,且不依赖于 p115client.client 中的实现"
from asyncio import sleep as async_sleep
from collections.abc import Callable, Coroutine, Mapping, Sequence
from contextlib import asynccontextmanager, AbstractAsyncContextManager
from http import HTTPStatus
from inspect import isawaitable, iscoroutinefunction
from re import compile as re_compile
from string import ascii_uppercase, digits, hexdigits
from typing import (
cast, overload, Any, AsyncContextManager, ContextManager, Final,
Literal, NotRequired, TypedDict,
)
from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit
from iterutils import run_gen_step
from p115pickcode import is_valid_pickcode
from urllib3_future_request import request
from yarl import URL
URL_PATH_TRANSTAB: Final = {b: f"%{b:X}" for b in b"?#"}
CRE_115_CHARREF_sub: Final = re_compile("\\[\x02([0-9]+)\\]").sub
CRE_SHARE_LINK_search: Final = re_compile(r"(?:^|(?<=/))(?P<share_code>[a-z0-9]+)(?:-|\?password=|\?)(?P<receive_code>[a-z0-9]{4})(?!==)\b").search
CRE_ERR_JPG_search: Final = re_compile(r"/err/([0-9]+).jpg$").search
class SharePayload(TypedDict):
share_code: str
receive_code: NotRequired[None | str]
[docs]
def complete_url(
path: str | Callable[[], str] = "",
/,
base_url: str | Callable[[], str] = "",
app: str | Callable[[], str] = "",
domain: str | Callable[[], str] = "",
as_query: bool = False,
query: Mapping[str, Any] | Sequence[tuple[str, Any]] = (),
) -> str:
"""完整 HTTP Web 接口 URL
:param path: 请求路径
:param base_url: 请求基地址,例如 `https://webapi.115.com`
:param app: 使用此设备 app 的接口
:param domain: 域,拼接位置根据 `base_url` 和 `as_query` 确定
- 如果 `base_url` 为空,那么 `base_url` 会被处理为 `http://{domain}.115.com`
- 如果 `as_query` 为 False,那么拼接到 `base_url` 之后
- 如果 `as_query` 为 True,那么拼接到 `query` 之中
:param as_query: 是否把 `path` 参数视为查询参数
- 如果为 False,则拼接到 `base_url` 之后
- 如果为 True,则拼接到 `query` 之中
:param query: 其它查询参数
:return: 接口 URL
.. note::
大概有以下几种接口 URL
- `https://115.com{path}?{query}`
- `https://{domain}.115.com{path}?{query}`
- `https://115cdn.com/{domain}{path}?{query}`
- `https://115vod.com/{domain}{path}?{query}`
- `https://f.115.com/api/proxy/115?domain={domain}&path={path}&{query}`
- `https://n.115.com/api/proxy/115?domain={domain}&path={path}&{query}`
其中 `https://f.115.com` 和 `https://n.115.com` 可以拼接查询参数 `domain`,默认值是 "webapi"
"""
if callable(path):
path = path()
if callable(base_url):
base_url = base_url()
if callable(app):
app = app()
if callable(domain):
domain = domain()
if path and not path.startswith("/"):
path = "/" + path
if base_url:
if base_url.endswith(("://115cdn.com", "://115vod.com")):
as_query = False
if not domain:
domain = "site"
else:
as_query = False
if app or path.startswith("/open/"):
base_url = "https://proapi.115.com"
elif domain:
if domain in ("web.api", "pro.api"):
base_url = f"http://{domain}.115.com"
else:
base_url = f"https://{domain}.115.com"
else:
base_url = "https://webapi.115.com"
if app in ("windows", "mac", "linux"):
app = "os_" + app
if app and not path.startswith("/open/"):
if app in (
"ios", "115ios", "ipad", "115ipad",
"android", "115android", "harmony",
"wechatmini", "alipaymini", "tv", "apple_tv",
"os_windows", "os_mac", "os_linux",
):
pass
elif app.endswith("ios"):
app = "ios"
elif app.endswith("ipad"):
app = "ipad"
elif app.endswith("android"):
app = "android"
else:
app = "android"
path = "/" + app + path
url = base_url
if as_query:
query = dict(query)
if domain:
query["domain"] = domain
if path:
query["path"] = path
else:
if domain:
url += "/" + domain
if path:
url += path.translate(URL_PATH_TRANSTAB)
if query_string := urlencode(query):
sep = "&" if "?" in url else "?"
url += sep + query_string
return url
[docs]
def reduce_image_url_layers(
url: str,
/,
size: str | int = "",
) -> str:
"""从图片的缩略图链接中提取信息,以减少一次 302 访问
:param url: 图片缩略图链接
:param size: 图片规格大小,如果为 0,则是原图大小
:return: 提取后的图片缩略图链接
"""
urlp = urlsplit(url)
if urlp.hostname not in ("thumb.115.com", "thumbapi.115.com"):
return url
sha1, _, size0 = urlp.path.rsplit("/")[-1].partition("_")
if size == "":
size = size0 or "0"
return f"https://imgjump.115.com/?sha1={sha1}&size={size}&{urlp.query}"
[docs]
def max_image_quality(url: str, /) -> str:
"""将图片的链接调整为最高画质
:param url: 图片缩略图链接
:return: 调整后的链接
"""
urlp = urlsplit(url)
query = dict(parse_qsl(urlp.query))
if "x-oss-process" in query:
del query["x-oss-process"]
elif urlp.hostname == "imgjump.115.com":
query["size"] = "0"
elif urlp.hostname in ("thumb.115.com", "thumbapi.115.com"):
query["sha1"] = urlp.path.rsplit("/")[-1].partition("_")[0]
query["size"] = "0"
return "https://imgjump.115.com/?" + urlencode(query)
elif urlp.path.endswith("/imgload"):
query["i"] = "1"
else:
return url
return urlunsplit(urlp._replace(query=urlencode(query)))
@overload
def load_final_image(
url: str,
async_: Literal[False] = False,
request = request,
) -> HTTPStatus | str:
...
@overload
def load_final_image(
url: str,
async_: Literal[True],
request = request,
) -> Coroutine[Any, Any, HTTPStatus | str]:
...
[docs]
def load_final_image(
url: str,
async_: Literal[False, True] = False,
request = request,
) -> int | str | Coroutine[Any, Any, HTTPStatus | str]:
"""逐次 3XX 重定向,以获取最终的图片链接
:param url: 图片链接
:param async_: 是否异步
:return: 最终的图片链接(如果期间发生错误,则返回 None)
"""
def gen_step():
nonlocal url
while True:
urlp = urlsplit(url)
query = dict(parse_qsl(urlp.query))
if urlp.path.endswith("/imgload") or query.get("ct") == "imgload":
resp = yield request(url, "HEAD", follow_redirects=False, async_=async_)
url = resp.headers["location"]
if m := CRE_ERR_JPG_search(url):
return HTTPStatus(int(m[1]))
url = reduce_image_url_layers(url)
elif "x-oss-process" in query:
if m := CRE_ERR_JPG_search(urlp.path):
return HTTPStatus(int(m[1]))
del query["x-oss-process"]
return urlunsplit(urlp._replace(query=urlencode(query)))
elif urlp.hostname in ("thumb.115.com", "thumbapi.115.com"):
query["sha1"], _, query["size"] = urlp.path.rsplit("/")[-1].partition("_")
url = "https://imgjump.115.com/?" + urlencode(query)
elif urlp.hostname == "imgjump.115.com":
resp = yield request(url, "HEAD", follow_redirects=False, async_=async_)
url = resp.headers["location"]
if m := CRE_ERR_JPG_search(url):
return HTTPStatus(int(m[1]))
else:
return url
return run_gen_step(gen_step, async_)
[docs]
def unescape_115_charref(s: str, /) -> str:
"""对 115 的字符引用进行解码
:example:
.. code:: python
unescape_115_charref("[\x02128074]0号:优质资源") == "👊0号:优质资源"
"""
return CRE_115_CHARREF_sub(lambda a: chr(int(a[1])), s)
[docs]
def determine_part_size(
size: int,
min_part_size: int = 1024 * 1024 * 10,
max_part_count: int = 10 ** 4,
) -> int:
"""确定分片上传(multipart upload)时的分片大小
:param size: 数据大小
:param min_part_size: 用户期望的分片大小
:param max_part_count: 最大的分片个数
:return: 分片大小
"""
if size <= min_part_size:
return size
n = -(-size // max_part_count)
part_size = min_part_size
while part_size < n:
part_size <<= 1
return part_size
[docs]
def to_cdn_url(
url: str,
/,
host: str = "115cdn.com",
) -> str:
"""尝试把 ``url`` 转换为特定 CDN 域名下的链接,如果不能转换,则原样输出
:param url: 待转换的链接
:param host: 域名,比如可取 "115cdn.com" 或 "https://115vod.com"
:return: 转换后的链接
"""
urlp = URL(url)
original_host = urlp.host
if original_host == "115.com":
return str(urlp.with_host(host).with_path("/site" + urlp.path))
elif not original_host or not original_host.endswith(".115.com") or len(original_host.split(".", 3)) > 3:
return url
prefix = original_host.partition(".")[0]
if not prefix or prefix == "proapi":
return url
return str(urlp.with_host(host).with_path(prefix + urlp.path))
def is_valid_id(id: int | str, /) -> bool:
if isinstance(id, int):
return id >= 0
if id == "0":
return True
return len(id) > 0 and not (id.startswith("0") or id.strip(digits))
def is_valid_sha1(sha1, /) -> bool:
if not isinstance(sha1, str):
return False
if len(sha1) == 32:
return not sha1.upper().lstrip(ascii_uppercase+"234567")
return len(sha1) == 40 and not sha1.lstrip(hexdigits)
def is_valid_name(name: str, /) -> bool:
return not (">" in name or "/" in name)
[docs]
def posix_escape_name(name: str, /, repl: str = "|") -> str:
"""把文件名中的 "/" 转换为另一个字符
.. note::
默认把 "/" 替换成 "|",借鉴 `alist`。
更一般的,可借鉴 MacOSX,替换成 ":"。
:param name: 文件名
:param repl: 替换为的目标字符
:return: 替换后的名字
"""
return name.replace("/", repl)
@asynccontextmanager
async def lock_as_async(lock, check_interval: float = 0.001):
acquire = lock.acquire
if check_interval <= 0:
while not acquire(False):
pass
else:
while not acquire(False):
await async_sleep(check_interval)
try:
yield
finally:
lock.release()
@overload
def call_with_lock[**Args, R](
lock: ContextManager,
func: Callable[Args, R],
/,
*args: Args.args,
**kwds: Args.kwargs,
) -> R:
...
@overload
def call_with_lock[**Args, R](
lock: AsyncContextManager,
func: Callable[Args, Coroutine[Any, Any, R]] | Callable[Args, R],
/,
*args: Args.args,
**kwds: Args.kwargs,
) -> Coroutine[Any, Any, R]:
...
def call_with_lock[**Args, R](
lock: ContextManager | AsyncContextManager,
func: Callable[Args, Coroutine[Any, Any, R]] | Callable[Args, R],
/,
*args: Args.args,
**kwds: Args.kwargs,
) -> R | Coroutine[Any, Any, R]:
async def async_call(func, /, *args, **kwds):
if isinstance(lock, AbstractAsyncContextManager):
alock = lock
else:
alock = lock_as_async(lock)
async with alock:
ret = func(*args, **kwds)
if isawaitable(ret):
ret = await ret
return ret
if isinstance(lock, AbstractAsyncContextManager) or iscoroutinefunction(func):
return async_call(func, *args, **kwds)
else:
with lock:
return func(*args, **kwds)