#!/usr/bin/env python3
# encoding: utf-8
__author__ = "ChenyangGao <https://chenyanggao.github.io>"
__all__ = [
"updatedb_initdb", "updatedb", "updatedb_life_iter",
"updatedb_history_iter", "P115QueryDB",
]
__doc__ = "这个模块提供了一些和更新数据库有关的函数"
from collections.abc import (
AsyncIterator, Callable, Collection, Coroutine, Iterable, Iterator, Sequence,
)
from math import inf
from os import PathLike
from sqlite3 import register_adapter, register_converter, Connection, Cursor
from time import time
from typing import cast, overload, Any, Literal
from warnings import warn
from asynctools import ensure_async
from errno2 import errno
from iter_collect import grouped_mapping
from iterutils import (
bfs_gen, chunked, foreach, run_gen_step, run_gen_step_iter,
with_iter_next, Yield,
)
from orjson import dumps, loads
from p115pickcode import to_id
from posixpatht import path_is_dir_form, escape, splits
from sqlitetools import connect, execute, find, query, upsert_items
from ..client import P115Client, P115Warning
from ..util import posix_escape_name
from .attr import get_ancestors, normalize_attr_simple
from .history import iter_history_list
from .iterdir import iterdir, iter_nodes_using_event, traverse_tree
from .life import iter_life_behavior_list
register_adapter(list, dumps)
register_adapter(dict, dumps)
register_converter("JSON", loads)
[docs]
def updatedb_initdb(con: Connection | Cursor, /) -> Cursor:
"""初始化数据库,然后返回游标
"""
sql = """\
-- 修改日志模式为 WAL (Write Ahead Log)
PRAGMA journal_mode = WAL;
-- data 表,用来保存数据
CREATE TABLE IF NOT EXISTS data (
id INTEGER NOT NULL PRIMARY KEY, -- 主键
parent_id INTEGER NOT NULL DEFAULT 0, -- 上级目录的 id
name TEXT NOT NULL, -- 名字
sha1 TEXT NOT NULL DEFAULT '', -- 文件的 sha1 哈希值
size INTEGER NOT NULL DEFAULT 0, -- 文件大小
pickcode TEXT NOT NULL DEFAULT '', -- 提取码,下载等操作时需要用到
is_dir INTEGER NOT NULL DEFAULT 1 CHECK(is_dir IN (0, 1)), -- 是否目录
is_alive INTEGER NOT NULL DEFAULT 1 CHECK(is_alive IN (0, 1)), -- 是否存活(存活即是不是删除状态)
extra BLOB DEFAULT NULL, -- 额外的数据
created_at TIMESTAMP DEFAULT (unixepoch('subsec')), -- 创建时间
updated_at TIMESTAMP DEFAULT (CAST(STRFTIME('%s', 'now') AS INTEGER)) -- 更新时间
);
-- life 表,用来保存操作事件
CREATE TABLE IF NOT EXISTS life (
id INTEGER NOT NULL PRIMARY KEY, -- 文件或目录的 id
data JSON NOT NULL, -- 数据
created_at TIMESTAMP DEFAULT (unixepoch('subsec')) -- 创建时间
);
-- history 表,用来保存历史记录
CREATE TABLE IF NOT EXISTS history (
id INTEGER NOT NULL PRIMARY KEY, -- 文件或目录的 id
data JSON NOT NULL, -- 数据
created_at TIMESTAMP DEFAULT (unixepoch('subsec')) -- 创建时间
);
-- 索引
CREATE INDEX IF NOT EXISTS idx_data_pid ON data(parent_id);
CREATE INDEX IF NOT EXISTS idx_data_utime ON data(updated_at);
-- data 表的记录发生更新,自动更新它的更新时间
CREATE TRIGGER IF NOT EXISTS trg_data_update
AFTER UPDATE ON data
FOR EACH ROW
BEGIN
SELECT CASE
WHEN NEW.updated_at < OLD.updated_at THEN RAISE(IGNORE)
END;
UPDATE data SET updated_at = CAST(STRFTIME('%s', 'now') AS INTEGER) WHERE id = NEW.id AND NEW.updated_at = OLD.updated_at;
END;
-- fs_event 表,用来保存文件系统变更(由 data 表触发)
CREATE TABLE IF NOT EXISTS fs_event (
id INTEGER PRIMARY KEY AUTOINCREMENT, -- 事件 id
event TEXT NOT NULL, -- 事件类型:add(增)、remove(删)、rename(改名)、move(移动)
file_id INTEGER NOT NULL, -- 文件或目录的 id,此 id 必在 `data` 表中
pid0 INTEGER NOT NULL DEFAULT -1, -- 变更前上级目录的 id
pid1 INTEGER NOT NULL DEFAULT -1, -- 变更后上级目录的 id
name0 TEXT NOT NULL DEFAULT '', -- 变更前的名字
name1 TEXT NOT NULL DEFAULT '', -- 变更后的名字
created_at TIMESTAMP DEFAULT (unixepoch('subsec')) -- 创建时间
);
-- data 表发生插入
CREATE TRIGGER IF NOT EXISTS trg_data_insert
AFTER INSERT ON data
FOR EACH ROW
BEGIN
INSERT INTO fs_event(event, file_id, pid1, name1) VALUES (
'add', NEW.id, NEW.parent_id, NEW.name
);
END;
-- data 表发生还原
CREATE TRIGGER IF NOT EXISTS trg_data_revoke
AFTER UPDATE ON data
FOR EACH ROW WHEN (NOT OLD.is_alive AND NEW.is_alive)
BEGIN
INSERT INTO fs_event(event, file_id, pid1, name1) VALUES (
'add', NEW.id, NEW.parent_id, NEW.name
);
END;
-- data 表发生移除
CREATE TRIGGER IF NOT EXISTS trg_data_remove
AFTER UPDATE ON data
FOR EACH ROW WHEN (OLD.is_alive AND NOT NEW.is_alive)
BEGIN
INSERT INTO fs_event(event, file_id, pid0, name0) VALUES (
'remove', OLD.id, OLD.parent_id, OLD.name
);
END;
-- data 表发生改名或移动
CREATE TRIGGER IF NOT EXISTS trg_data_change
AFTER UPDATE ON data
FOR EACH ROW WHEN (OLD.is_alive AND NEW.is_alive)
BEGIN
INSERT INTO fs_event(event, file_id, pid0, pid1, name0, name1)
SELECT
'move', OLD.id, OLD.parent_id, NEW.parent_id, OLD.name, OLD.name
WHERE OLD.parent_id != NEW.parent_id;
INSERT INTO fs_event(event, file_id, pid0, pid1, name0, name1)
SELECT * FROM (
SELECT
'rename', NEW.id, NEW.parent_id, NEW.parent_id, OLD.name, NEW.name
WHERE OLD.name != NEW.name
);
END;"""
return con.executescript(sql)
def wrap_async(func, async_: bool = False, /, threaded: bool = False):
if async_:
return ensure_async(func, threaded=threaded)
else:
return func
def _init_client(
client: str | PathLike | P115Client,
dbfile: None | str | PathLike | Connection | Cursor = None,
init_sql: None | str = None,
) -> tuple[P115Client, Connection | Cursor]:
if isinstance(client, (str, PathLike)):
client = P115Client(client, check_for_relogin=True)
if client.login_app() in ("web", "desktop", "aps"):
warn(
f'app within ("web", "desktop", "aps") is not recommended, it will be replaced by "apple_tv" cookies',
category=P115Warning,
)
client.login_another_app("apple_tv", replace=True)
if not dbfile:
dbfile = f"p115db-{client.user_id}.db"
if isinstance(dbfile, (Connection, Cursor)):
con = dbfile
else:
con = connect(dbfile, check_same_thread=False, timeout=inf)
if init_sql is None:
updatedb_initdb(con)
elif init_sql:
con.executescript(init_sql)
return client, con
def has_id(con: Connection | Cursor, id: int, /) -> int:
sql = "SELECT 1 FROM data WHERE id = ? AND is_alive"
return find(con, sql, (id,), default=0)
def event_normalize_attr(event: dict, /) -> dict:
sha1 = event["sha1"]
return {
"id": int(event["file_id"]),
"parent_id": int(event["parent_id"]),
"name": event["file_name"],
"sha1": sha1,
"size": int(event.get("file_size") or 0),
"pickcode": event["pick_code"],
"is_dir": not sha1,
"is_alive": event["type"] != 22,
"updated_at": int(event["create_time"]),
}
@overload
def load_missing_ancestors(
client: P115Client,
con: Connection | Cursor,
attrs: list[dict],
cooldown: float = 0.2,
app: str = "android",
*,
async_: Literal[False] = False,
**request_kwargs,
) -> list[dict]:
...
@overload
def load_missing_ancestors(
client: P115Client,
con: Connection | Cursor,
attrs: list[dict],
cooldown: float = 0.2,
app: str = "android",
*,
async_: Literal[True],
**request_kwargs,
) -> Coroutine[Any, Any, list[dict]]:
...
def load_missing_ancestors(
client: P115Client,
con: Connection | Cursor,
attrs: list[dict],
cooldown: float = 0.2,
app: str = "android",
*,
async_: Literal[False, True] = False,
**request_kwargs,
) -> list[dict] | Coroutine[Any, Any, list[dict]]:
def gen_step():
seen_ids: set[int] = {a["id"] for a in attrs}
ancestors: list[dict] = []
add_to_seen = seen_ids.add
add_ancestor = ancestors.append
def add(attr: dict, /):
add_to_seen(attr["id"])
add_ancestor(attr)
while pids := [
pid for a in attrs
if (pid := a["parent_id"]) and not (pid in seen_ids or has_id(con, pid))
]:
yield foreach(
add,
iter_nodes_using_event(
client,
pids,
type="doc",
normalize_attr=event_normalize_attr,
id_to_dirnode=...,
cooldown=cooldown,
app=app,
async_=async_,
**request_kwargs,
),
)
return ancestors
return run_gen_step(gen_step, async_)
@overload
def updatedb(
client: str | PathLike | P115Client,
dbfile: None | str | PathLike | Connection | Cursor = None,
cid: int | str = 0,
recursive: bool = True,
max_workers: None | int = None,
max_files: int | None = 0,
max_dirs: int | None = 0,
app: str = "android",
*,
async_: Literal[False] = False,
**request_kwargs,
) -> int:
...
@overload
def updatedb(
client: str | PathLike | P115Client,
dbfile: None | str | PathLike | Connection | Cursor = None,
cid: int | str = 0,
recursive: bool = True,
max_workers: None | int = None,
max_files: int | None = 0,
max_dirs: int | None = 0,
app: str = "android",
*,
async_: Literal[True],
**request_kwargs,
) -> Coroutine[Any, Any, int]:
...
[docs]
def updatedb(
client: str | PathLike | P115Client,
dbfile: None | str | PathLike | Connection | Cursor = None,
cid: int | str = 0,
recursive: bool = True,
max_workers: None | int = None,
max_files: int | None = 0,
max_dirs: int | None = 0,
app: str = "android",
*,
async_: Literal[False, True] = False,
**request_kwargs,
) -> int | Coroutine[Any, Any, int]:
"""对某个目录执行一次拉取,以更新 SQLite 数据
:param client: 115 网盘客户端对象
:param dbfile: 数据库文件路径,如果为 None,则自动确定
:param cid: 目录的 id 或 pickcode
:param recursive: 如果为 True,则拉取所有以之为祖先(先驱)节点的节点信息;否则,拉取所有以之为父(前驱)节点的节点信息
:param max_workers: 最大并发数,如果为 None 或 < 0 则自动确定,如果为 0 则单工作者惰性执行
:param max_files: 估计最大存在的文件数,<= 0 时则无限
:param max_dirs: 估计最大存在的目录数,<= 0 时则无限
:param app: 使用指定 app(设备)的接口
:param async_: 是否异步
:param request_kwargs: 其它请求参数
:return: 返回总共影响到数据行数,即所有 DML SQL 执行后,游标的 ``.rowcount`` 累加
"""
client, con = _init_client(client, dbfile)
upsert = wrap_async(upsert_items, async_, threaded=True)
cid = to_id(cid)
def gen_step():
total = 0
if recursive:
start_t = int(time())
try:
if cid and not has_id(con, cid):
ancestors = yield get_ancestors(
client,
cid,
id_to_dirnode=...,
ensure_file=False,
app=app,
async_=async_,
**request_kwargs,
)
if ancestors:
if ancestors[0]["id"] == 0:
ancestors = ancestors[1:]
if ancestors:
to_pickcode = client.to_pickcode
for a in ancestors:
a["pickcode"] = to_pickcode(a["id"], "fa")
total += (yield upsert(
con,
ancestors,
{"is_alive": 1},
commit=True,
)).rowcount
with with_iter_next(chunked(
traverse_tree(
client,
cid,
id_to_dirnode=...,
max_workers=max_workers,
max_files=max_files,
max_dirs=max_dirs,
app=app,
async_=async_,
**request_kwargs,
),
1000,
)) as get_next:
while True:
batch = yield get_next()
total += (yield upsert(con, batch, {"is_alive": 1}, commit=True)).rowcount
if cid:
clean_sql = f"""\
UPDATE data SET is_alive = 0 WHERE id in (
WITH ids(id) AS (
SELECT id FROM data WHERE parent_id = {cid} AND is_alive AND updated_at < :start_t
UNION ALL
SELECT data.id FROM ids JOIN data ON (ids.id = data.parent_id) WHERE is_alive AND updated_at < :start_t
)
SELECT id FROM ids
);"""
else:
clean_sql = "UPDATE data SET is_alive = 0 WHERE is_alive AND updated_at < :start_t"
total += (yield wrap_async(execute, async_, threaded=True)(
con,
clean_sql,
{"start_t": start_t},
commit=True,
)).rowcount
except FileNotFoundError:
if cid:
clean_sql = f"""\
UPDATE data SET is_alive = 0 WHERE id in (
WITH ids(id) AS (
SELECT id FROM data WHERE parent_id = {cid} AND is_alive
UNION ALL
SELECT data.id FROM ids JOIN data ON (ids.id = data.parent_id) WHERE is_alive
)
SELECT id FROM ids
);"""
else:
clean_sql = "UPDATE data SET is_alive = 0 WHERE is_alive"
total = (yield wrap_async(execute, async_, threaded=True)(
con,
clean_sql,
commit=True,
)).rowcount
else:
id_to_dirnode: dict[int, tuple[str, int]] = {}
seen_ids: set[int] = set()
try:
with with_iter_next(chunked(iterdir(
client,
cid,
normalize_attr=normalize_attr_simple,
id_to_dirnode=id_to_dirnode,
raise_for_changed_count=True,
app=app,
cooldown=0.5,
max_workers=max_workers,
async_=async_,
**request_kwargs,
), 1000)) as get_next:
while True:
batch = yield get_next()
total += (yield upsert(
con,
batch,
extras={"is_alive": 1},
fields=("id", "parent_id", "name", "sha1", "size", "pickcode", "is_dir"),
commit=True,
)).rowcount
seen_ids.update(a["id"] for a in batch)
if id_to_dirnode:
to_pickcode = client.to_pickcode
total += (yield upsert(
con,
[
{"id": id, "name": name, "parent_id": pid, "ancestors": to_pickcode(id, "fa")}
for id, (name, pid) in id_to_dirnode.items()
if id not in seen_ids
],
{"is_alive": 1},
commit=True,
)).rowcount
clean_sql = "UPDATE data SET is_alive = 0 WHERE is_alive and parent_id = ?"
if seen_ids:
clean_sql += " AND id NOT IN (%s)" % ",".join(map(str, seen_ids))
total += (yield wrap_async(execute, async_, threaded=True)(
con,
clean_sql,
(cid,),
commit=True,
)).rowcount
except FileNotFoundError:
clean_sql = "UPDATE data SET is_alive = 0 WHERE is_alive and parent_id = ?"
total = (yield wrap_async(execute, async_, threaded=True)(
con,
clean_sql,
(cid,),
commit=True,
)).rowcount
return total
return run_gen_step(gen_step, async_)
@overload
def updatedb_life_iter(
client: str | PathLike | P115Client,
dbfile: None | str | PathLike | Connection | Cursor = None,
from_id: int = -1,
from_time: float = 0,
cooldown: float = 0.2,
app: str = "android",
*,
async_: Literal[False] = False,
**request_kwargs,
) -> Iterator[list[dict]]:
...
@overload
def updatedb_life_iter(
client: str | PathLike | P115Client,
dbfile: None | str | PathLike | Connection | Cursor = None,
from_id: int = -1,
from_time: float = 0,
cooldown: float = 0.2,
app: str = "android",
*,
async_: Literal[True],
**request_kwargs,
) -> AsyncIterator[list[dict]]:
...
[docs]
def updatedb_life_iter(
client: str | PathLike | P115Client,
dbfile: None | str | PathLike | Connection | Cursor = None,
from_id: int = -1,
from_time: float = 0,
cooldown: float = 0.2,
app: str = "android",
*,
async_: Literal[False, True] = False,
**request_kwargs,
) -> Iterator[list[dict]] | AsyncIterator[list[dict]]:
"""持续采集 115 生活日志,以更新 SQLite 数据库
.. note::
当 ``from_id < 0`` 时,会从数据库获取最大 id 作为 ``from_id``,获取不到时设为 0。
当 ``from_id != 0`` 时,如果 from_time 为 0,则自动重设为 -1。
:param client: 115 网盘客户端对象
:param dbfile: 数据库文件路径,如果为 None,则自动确定
:param from_id: 开始的事件 id (不含),若 < 0 则是从数据库获取最大 id
:param from_time: 开始时间(含),若为 0 则从当前时间开始,若 < 0 则从最早开始
:param cooldown: 冷却时间,大于 0 时,两次接口调用之间至少间隔这么多秒
:param app: 使用指定 app(设备)的接口
:param async_: 是否异步
:param request_kwargs: 其它请求参数
:return: 迭代器,每次产生一批事件(从当前到上次截止)
.. code::
from time import sleep
from p115client import P115Client
from p115client.tool import updatedb_life_iter
client = P115Client.from_path()
for event_list in updatedb_life_iter(client):
if event_list:
print("采集到操作事件列表:", event_list)
else:
sleep(1)
"""
client, con = _init_client(client, dbfile)
def gen_step():
nonlocal from_id
if from_id < 0:
from_id = yield wrap_async(find, async_, threaded=True)(
con,
"SELECT MAX(id) FROM life",
default=0,
)
with with_iter_next(iter_life_behavior_list(
client,
from_id=from_id,
from_time=from_time,
ignore_types=(10,),
cooldown=cooldown,
app=app,
async_=async_,
**request_kwargs,
)) as get_next:
while True:
event_list = yield get_next()
event_list.reverse()
if attrs := list(map(event_normalize_attr, event_list)):
if news := [a for a in attrs if a["is_alive"]]:
attrs.extend((yield load_missing_ancestors(
client,
con,
news,
cooldown=cooldown,
app=app,
async_=async_,
**request_kwargs,
)))
yield wrap_async(upsert_items, async_, threaded=True)(
con, attrs, commit=True)
if event_list:
yield wrap_async(execute, async_, threaded=True)(
con,
"INSERT OR IGNORE INTO life(id, data) VALUES (?, ?)",
[(int(event["id"]), dumps(event)) for event in event_list],
commit=True,
)
yield Yield(event_list)
return run_gen_step_iter(gen_step, async_)
@overload
def updatedb_history_iter(
client: str | PathLike | P115Client,
dbfile: None | str | PathLike | Connection | Cursor = None,
from_id: int = -1,
from_time: float = 0,
cooldown: float = 0.2,
app: str = "android",
*,
async_: Literal[False] = False,
**request_kwargs,
) -> Iterator[list[dict]]:
...
@overload
def updatedb_history_iter(
client: str | PathLike | P115Client,
dbfile: None | str | PathLike | Connection | Cursor = None,
from_id: int = -1,
from_time: float = 0,
cooldown: float = 0.2,
app: str = "android",
*,
async_: Literal[True],
**request_kwargs,
) -> AsyncIterator[list[dict]]:
...
[docs]
def updatedb_history_iter(
client: str | PathLike | P115Client,
dbfile: None | str | PathLike | Connection | Cursor = None,
from_id: int = -1,
from_time: float = 0,
cooldown: float = 0.2,
app: str = "android",
*,
async_: Literal[False, True] = False,
**request_kwargs,
) -> Iterator[list[dict]] | AsyncIterator[list[dict]]:
"""持续采集 115 历史记录,以更新 SQLite 数据库
.. note::
当 ``from_id < 0`` 时,会从数据库获取最大 id 作为 ``from_id``,获取不到时设为 0。
当 ``from_id != 0`` 时,如果 from_time 为 0,则自动重设为 -1。
:param client: 115 网盘客户端对象
:param dbfile: 数据库文件路径,如果为 None,则自动确定
:param from_id: 开始的事件 id (不含),若 < 0 则是从数据库获取最大 id
:param from_time: 开始时间(含),若为 0 则从当前时间开始,若 < 0 则从最早开始
:param cooldown: 冷却时间,大于 0 时,两次接口调用之间至少间隔这么多秒
:param app: 使用指定 app(设备)的接口
:param async_: 是否异步
:param request_kwargs: 其它请求参数
:return: 迭代器,每次产生一批事件(从当前到上次截止)
.. code::
from time import sleep
from p115client import P115Client
from p115client.tool import updatedb_history_iter
client = P115Client.from_path()
for event_list in updatedb_history_iter(client):
if event_list:
print("采集到历史记录列表:", event_list)
else:
sleep(1)
"""
client, con = _init_client(client, dbfile)
def gen_step():
nonlocal from_id
if from_id < 0:
from_id = yield wrap_async(find, async_, threaded=True)(
con,
"SELECT MAX(id) FROM history",
default=0,
)
with with_iter_next(iter_history_list(
client,
from_id=from_id,
from_time=from_time,
ignore_types=(),
cooldown=cooldown,
app=app,
async_=async_,
**request_kwargs,
)) as get_next:
while True:
event_list = yield get_next()
event_list.reverse()
if attrs := list(map(event_normalize_attr, event_list)):
if news := [a for a in attrs if a["is_alive"]]:
attrs.extend((yield load_missing_ancestors(
client,
con,
news,
cooldown=cooldown,
app=app,
async_=async_,
**request_kwargs,
)))
yield wrap_async(upsert_items, async_, threaded=True)(
con, attrs, commit=True)
if event_list:
yield wrap_async(execute, async_, threaded=True)(
con,
"INSERT OR IGNORE INTO history(id, data) VALUES (?, ?)",
[(int(event["id"]), dumps(event)) for event in event_list],
commit=True,
)
yield Yield(event_list)
return run_gen_step_iter(gen_step, async_)
[docs]
class P115QueryDB:
"""封装了一些常用的数据库查询方法,针对 updatedb 产生的 SQLite 数据库
.. note::
默认情况下,只有 "id"、"parent_id"、"updated_at" 有索引,所以请自行添加其它需要的索引
"""
__slots__ = "con",
def __init__(self, con, /):
self.con = connect(con, check_same_thread=False, timeout=inf)
[docs]
def get_ancestors(
self,
id: int = 0,
/,
) -> list[dict]:
"""获取某个文件或目录的祖先节点信息,包括 "id"、"parent_id" 和 "name" 字段
:param self: P115QueryDB 实例或者数据库连接或游标
:param id: 当前节点的 id
:return: 当前节点的祖先节点列表,从根目录开始(id 为 0)直到当前节点
"""
ancestors = [{"id": 0, "parent_id": 0, "name": ""}]
if not id:
return ancestors
con: Any
if isinstance(self, P115QueryDB):
con = self.con
else:
con = self
ls = list(query(con, """\
WITH t AS (
SELECT id, parent_id, name FROM data WHERE id = ?
UNION ALL
SELECT data.id, data.parent_id, data.name FROM t JOIN data ON (t.parent_id = data.id)
)
SELECT id, parent_id, name FROM t;""", id))
if not ls:
raise FileNotFoundError(errno.ENOENT, id)
if ls[-1][1]:
raise ValueError(f"dangling id: {id}")
ancestors.extend(dict(zip(("id", "parent_id", "name"), record)) for record in reversed(ls))
return ancestors
[docs]
def get_attr(
self,
id: int = 0,
/,
) -> dict:
"""获取某个文件或目录的信息
:param self: P115QueryDB 实例或者数据库连接或游标
:param id: 当前节点的 id
:return: 当前节点的信息字典
"""
if not id:
return {
"id": 0, "parent_id": 0, "name": "", "sha1": "", "size": 0, "pickcode": "",
"is_dir": 1, "is_alive": 1, "extra": None, "created_at": 0, "updated_at": 0,
}
con: Any
if isinstance(self, P115QueryDB):
con = self.con
else:
con = self
return find(
con,
f"SELECT * FROM data WHERE id=? LIMIT 1",
id,
FileNotFoundError(errno.ENOENT, id),
row_factory="dict",
)
[docs]
def get_id(
self,
/,
pickcode: str = "",
sha1: str = "",
path: str | Sequence[str] = "",
parent_id: int = 0,
is_alive: bool = True,
) -> int:
"""查询匹配某个字段的文件或目录的 id
:param self: P115QueryDB 实例或者数据库连接或游标
:param pickcode: 当前节点的提取码,优先级高于 sha1
:param sha1: 当前节点的 sha1 校验哈希值,优先级高于 path
:param path: 当前节点的路径
:param parent_id: 仅用于 `path` 参数,用来限定搜索的顶层路径
:return: 当前节点的 id
"""
con: Any
if isinstance(self, P115QueryDB):
con = self.con
else:
con = self
insertion = " AND is_alive" if is_alive else ""
if pickcode:
return find(
con,
f"SELECT id FROM data WHERE pickcode=?{insertion} LIMIT 1",
pickcode,
default=FileNotFoundError(pickcode),
)
elif sha1:
return find(
con,
f"SELECT id FROM data WHERE sha1=?{insertion} LIMIT 1",
sha1,
default=FileNotFoundError(sha1),
)
elif path:
return P115QueryDB.path_to_id(
con,
path,
parent_id=parent_id,
is_alive=is_alive,
)
return 0
[docs]
def get_parent_id(
self,
id: int = 0,
/,
default: None | int = None,
) -> int:
"""获取某个 id 对应的父 id
:param self: P115QueryDB 实例或者数据库连接或游标
:param id: 当前节点的 id
:return: 当前节点的父 id
"""
if id == 0:
return 0
con: Any
if isinstance(self, P115QueryDB):
con = self.con
else:
con = self
sql = "SELECT parent_id FROM data WHERE id=?"
return find(con, sql, id, FileNotFoundError(errno.ENOENT, id) if default is None else default)
[docs]
def get_path(
self,
id: int = 0,
/,
) -> str:
"""获取某个文件或目录的路径
:param self: P115QueryDB 实例或者数据库连接或游标
:param id: 当前节点的 id
:return: 当前节点的路径
"""
if not id:
return "/"
ancestors = P115QueryDB.get_ancestors(self, id)
return "/".join(escape(a["name"]) for a in ancestors)
[docs]
def get_patht(
self,
id: int = 0,
/,
) -> list[str]:
"""获取某个文件或目录的路径节点元组
:param self: P115QueryDB 实例或者数据库连接或游标
:param id: 当前节点的 id
:return: 当前节点的路径节点元组
"""
if not id:
return [""]
ancestors = P115QueryDB.get_ancestors(self, id)
return [a["name"] for a in ancestors]
[docs]
def get_pickcode(
self,
/,
id: int = -1,
sha1: str = "",
path: str | Sequence[str] = "",
parent_id: int = 0,
is_alive: bool = True,
) -> str:
"""查询匹配某个字段的文件或目录的提取码
:param self: P115QueryDB 实例或者数据库连接或游标
:param id: 当前节点的 id,优先级高于 sha1
:param sha1: 当前节点的 sha1 校验哈希值,优先级高于 path
:param path: 当前节点的路径
:param parent_id: 仅用于 `path` 参数,用来限定搜索的顶层路径
:return: 当前节点的提取码
"""
con: Any
if isinstance(self, P115QueryDB):
con = self.con
else:
con = self
insertion = " AND is_alive" if is_alive else ""
if id >= 0:
if not id:
return ""
return find(
con,
f"SELECT pickcode FROM data WHERE id=?{insertion} LIMIT 1;",
id,
default=FileNotFoundError(id),
)
elif sha1:
return find(
con,
f"SELECT pickcode FROM data WHERE sha1=?{insertion} LIMIT 1;",
sha1,
default=FileNotFoundError(sha1),
)
else:
if path in ("", "/"):
return ""
id = P115QueryDB.path_to_id(con, path, parent_id=parent_id, is_alive=is_alive)
return P115QueryDB.get_pickcode(con, id)
[docs]
def get_sha1(
self,
/,
id: int = -1,
pickcode: str = "",
path: str | Sequence[str] = "",
parent_id: int = 0,
is_alive: bool = True,
) -> str:
"""查询匹配某个字段的文件的 sha1
:param self: P115QueryDB 实例或者数据库连接或游标
:param id: 当前节点的 id,优先级高于 pickcode
:param pickcode: 当前节点的提取码,优先级高于 path
:param path: 当前节点的路径
:param parent_id: 仅用于 `path` 参数,用来限定搜索的顶层路径
:return: 当前节点的 sha1 校验哈希值
"""
con: Any
if isinstance(self, P115QueryDB):
con = self.con
else:
con = self
insertion = " AND is_alive" if is_alive else ""
if id >= 0:
if not id:
return ""
return find(
con,
f"SELECT sha1 FROM data WHERE id=?{insertion} LIMIT 1;",
id,
default=FileNotFoundError(id),
)
elif pickcode:
return find(
con,
f"SELECT sha1 FROM data WHERE pickcode=?{insertion} LIMIT 1;",
pickcode,
default=FileNotFoundError(pickcode),
)
else:
if path in ("", "/"):
return ""
id = P115QueryDB.path_to_id(con, path, parent_id=parent_id, is_alive=is_alive)
return P115QueryDB.get_sha1(con, id)
[docs]
def has_id(
self,
id: int,
/,
is_alive: bool = True,
) -> int:
"""是否存在某个 id
:param self: P115QueryDB 实例或者数据库连接或游标
:param id: 当前节点的 id
:param is_alive: 是否存活
:return: 如果是 1,则是 True;如果是 0,则是 False
"""
if id == 0:
return 1
elif id < 0:
return 0
con: Any
if isinstance(self, P115QueryDB):
con = self.con
else:
con = self
sql = "SELECT 1 FROM data WHERE id=?"
if is_alive:
sql += " AND is_alive"
return find(con, sql, id, 0)
[docs]
def path_to_id(
self,
/,
path: str | Sequence[str] = "",
ensure_file: None | bool = None,
parent_id: int = 0,
is_alive: bool = True,
) -> int:
"""查询匹配某个路径的文件或目录的信息字典,只返回找到的第 1 个
:param self: P115QueryDB 实例或者数据库连接或游标
:param path: 路径
:param ensure_file: 是否文件
- 如果为 True,必须是文件
- 如果为 False,必须是目录
- 如果为 None,可以是文件或目录
:param parent_id: 顶层目录的 id
:return: 找到的第 1 个匹配的节点 id
"""
try:
return next(P115QueryDB.iter_id_to_path(
self,
path=path,
ensure_file=ensure_file,
parent_id=parent_id,
is_alive=is_alive,
))
except StopIteration:
raise FileNotFoundError(errno.ENOENT, path) from None
[docs]
def iter_children(
self,
parent_id: int = 0,
/,
fields: Collection[str] = (),
ensure_file: None | bool = None,
) -> Iterator[dict]:
"""获取某个目录之下的文件或目录的信息
.. caution::
当 ``fields`` 为空时,获取全部字段
:param self: P115QueryDB 实例或者数据库连接或游标
:param parent_id: 父目录的 id
:param fields: 需要获取的字段
:param ensure_file: 是否仅输出文件
- 如果为 True,仅输出文件
- 如果为 False,仅输出目录
- 如果为 None,全部输出
:return: 迭代器,产生一组信息的字典,大概包含如下字段(具体由你的数据库 ``data`` 表确定):
.. code:: python
(
"id", "parent_id", "name", "sha1", "size", "pickcode",
"is_dir", "is_alive", "extra", "created_at", "updated_at",
)
"""
con: Any
if isinstance(self, P115QueryDB):
con = self.con
else:
con = self
if fields:
sql = f"SELECT {','.join(fields)} FROM data WHERE parent_id=? AND is_alive"
else:
sql = "SELECT * FROM data WHERE parent_id=? AND is_alive"
if ensure_file is not None:
if ensure_file:
sql += " AND NOT is_dir"
else:
sql += " AND is_dir"
return query(con, sql, parent_id, row_factory="dict")
[docs]
def iter_count_dir(
self,
parent_id: int = 0,
/,
) -> Iterator[dict]:
"""迭代获取所有指定 id 下所有目录节点(包括自己)直属的文件数和目录数
:param self: P115QueryDB 实例或者数据库连接或游标
:param parent_id: 顶层目录的 id
:return: 迭代器,返回字典
.. code::
{
"id": int,
"parent_id": int,
"dir_count": int,
"file_count": int,
}
"""
con: Any
if isinstance(self, P115QueryDB):
con = self.con
else:
con = self
sql = """\
WITH t AS (
SELECT id, parent_id, is_dir FROM data WHERE parent_id = ? AND is_alive
UNION ALL
SELECT data.id, data.parent_id, data.is_dir FROM t JOIN data ON (t.id = data.parent_id) WHERE is_alive
), count AS (
SELECT parent_id AS id, SUM(is_dir) AS dir_count, SUM(NOT is_dir) AS file_count
FROM t
GROUP BY parent_id
)
SELECT data.parent_id, count.* FROM count JOIN data USING (id)"""
return query(con, sql, parent_id, row_factory="dict")
[docs]
def iter_count_tree(
self,
parent_id: int = 0,
/,
) -> Iterator[dict]:
"""迭代获取所有指定 id 下所有目录节点(包括自己)直属的文件数和目录数,以及子树下的文件数合计和目录数合计
:param self: P115QueryDB 实例或者数据库连接或游标
:param parent_id: 顶层目录的 id
:return: 迭代器,返回字典
.. code::
{
"id": int,
"parent_id": int,
"dir_count": int,
"file_count": int,
"tree_dir_count": int,
"tree_file_count": int,
}
"""
data = {a["id"]: a for a in P115QueryDB.iter_count_dir(self, parent_id)}
id_to_children = grouped_mapping((a["parent_id"], id) for id, a in data.items())
def calc(attr: dict, /) -> dict:
if children := id_to_children.get(attr["id"]):
for cid in children:
cattr = data[cid]
if "tree_dir_count" not in cattr:
calc(cattr)
attr["tree_dir_count"] = attr.get("tree_dir_count", 0) + 1 + cattr["tree_dir_count"]
attr["tree_file_count"] = attr.get("tree_file_count", attr["file_count"]) + cattr["tree_file_count"]
elif "tree_dir_count" not in attr:
attr["tree_dir_count"] = attr["dir_count"]
attr["tree_file_count"] = attr["file_count"]
return attr
return map(calc, data.values())
[docs]
def iter_dangling_ids(
self,
/,
) -> Iterator[int]:
"""罗列所有悬空的文件或目录的 id
.. note::
悬空的 id,即祖先节点中,存在一个节点,它的 parent_id 是悬空的
:param self: P115QueryDB 实例或者数据库连接或游标
:return: 迭代器,一组目录的 id
"""
con: Any
if isinstance(self, P115QueryDB):
con = self.con
else:
con = self
sql = """\
WITH dangling_ids(id) AS (
SELECT d1.id
FROM data AS d1 LEFT JOIN data AS d2 ON (d1.parent_id = d2.id)
WHERE d1.parent_id AND d2.id IS NULL
UNION ALL
SELECT data.id FROM dangling_ids JOIN data ON (dangling_ids.id = data.parent_id)
)
SELECT id FROM dangling_ids"""
return query(con, sql, row_factory="one")
[docs]
def iter_dangling_parent_ids(
self,
/,
) -> Iterator[int]:
"""罗列所有悬空的 parent_id
.. note::
悬空的 parent_id,即所有的 parent_id 中,,不为 0 且不在 `data` 表中的部分
:param self: P115QueryDB 实例或者数据库连接或游标
:return: 迭代器,一组目录的 id
"""
con: Any
if isinstance(self, P115QueryDB):
con = self.con
else:
con = self
sql = """\
SELECT
DISTINCT d1.parent_id
FROM
data AS d1 LEFT JOIN data AS d2 ON (d1.parent_id = d2.id)
WHERE
d1.parent_id AND d2.id IS NULL"""
return query(con, sql, row_factory="one")
@overload
def iter_descendants(
self,
parent_id: int = 0,
/,
min_depth: int = 1,
max_depth: int = -1,
*,
fields: str,
escape: None | bool | Callable[[str], str] = True,
ensure_file: None | bool = None,
topdown: None | bool = True,
) -> Iterator[Any]:
...
@overload
def iter_descendants(
self,
parent_id: int = 0,
/,
min_depth: int = 1,
max_depth: int = -1,
*,
fields: Collection[str] | bool = True,
escape: None | bool | Callable[[str], str] = True,
ensure_file: None | bool = None,
topdown: None | bool = True,
) -> Iterator[dict]:
...
[docs]
def iter_descendants(
self,
parent_id: int = 0,
/,
min_depth: int = 1,
max_depth: int = -1,
*,
fields: str | Collection[str] | bool = True,
escape: None | bool | Callable[[str], str] = True,
ensure_file: None | bool = None,
topdown: None | bool = True,
) -> Iterator:
"""获取某个目录之下的所有节点信息
.. caution::
当 ``fields`` 为空时,获取全部字段
:param self: P115QueryDB 实例或者数据库连接或游标
:param parent_id: 顶层目录的 id
:param min_depth: 最小深度
:param max_depth: 最大深度。如果小于 0,则无限深度
:param fields: 需要获取的字段
- 如果为 str,直接获取这个字段的值(不返回字典)
- 如果为 True,获取所有字段,且包括("ancestors", "path", "relpath", "depth"),返回字典
- 如果为 False,获取所有字段,但除了("ancestors", "path", "relpath", "depth"),返回字典
- 否则,获取所指定的这组字段,返回字典
:param escape: 对文件名进行转义
- 如果为 None,则不处理;否则,这个函数用来对文件名中某些符号进行转义,例如 "/" 等
- 如果为 True,则使用 `posixpatht.escape`,会对文件名中 "/",或单独出现的 "." 和 ".." 用 "\\" 进行转义
- 如果为 False,则使用 `posix_escape_name` 函数对名字进行转义,会把文件名中的 "/" 转换为 "|"
- 如果为 Callable,则用你所提供的调用,以或者转义后的名字
:param ensure_file: 是否仅输出文件
- 如果为 True,仅输出文件
- 如果为 False,仅输出目录
- 如果为 None,全部输出
:param topdown: 是否自顶向下深度优先遍历
- 如果为 True,则自顶向下深度优先遍历
- 如果为 False,则自底向上深度优先遍历
- 如果为 None,则自顶向下宽度优先遍历
:return: 迭代器,产生一组信息的字典,大概包含如下字段:
.. code:: python
(
# NOTE: 这些具体由你的数据库 ``data`` 表确定
"id", "parent_id", "name", "sha1", "size", "pickcode",
"is_dir", "is_alive", "extra", "created_at", "updated_at",
# NOTE: 这些是另外附加的字段
"ancestors", "path", "relpath", "depth",
)
"""
if 0 <= max_depth < min_depth:
return
con: Any
if isinstance(self, P115QueryDB):
con = self.con
else:
con = self
if isinstance(escape, bool):
if escape:
from posixpatht import escape
else:
escape = posix_escape_name
escape = cast(None | Callable[[str], str], escape)
field: str = ""
if isinstance(fields, bool) or not fields:
with_ancestors = with_path = with_relpath = with_depth = fields if isinstance(fields, bool) else True
children_fields = set()
else:
if isinstance(fields, str):
field = fields
fields = {field}
else:
fields = set(fields)
with_ancestors = "ancestors" in fields
with_path = "path" in fields
with_relpath = "relpath" in fields
with_depth = "depth" in fields
children_fields = fields - frozenset(("ancestors", "path", "relpath", "depth")) | {"id", "is_dir"}
if with_ancestors or with_path or with_relpath:
children_fields |= frozenset(("name", "parent_id"))
ancestors: list[dict] = []
dir_: str = ""
reldir: str = ""
if parent_id:
if with_ancestors or with_path:
ancestors = P115QueryDB.get_ancestors(con, parent_id)
if with_path:
if escape is None:
dir_ = "".join(a["name"] + "/" for a in ancestors)
else:
dir_ = "".join(escape(a["name"]) + "/" for a in ancestors)
else:
if with_ancestors:
ancestors = [{"id": 0, "parent_id": 0, "name": ""}]
if with_path:
dir_ = "/"
def may_yield(attr: dict, /):
if ensure_file is None or (attr["is_dir"] ^ ensure_file):
if field:
yield attr[field]
else:
yield attr
children_ensure_file = False if ensure_file is False else None
if topdown is None:
gen = bfs_gen((parent_id, 0, ancestors, dir_, reldir))
send: Callable = gen.send
for parent_id, depth, ancestors, dir_, reldir in gen:
depth += 1
will_step_in = max_depth < 0 or depth < max_depth
will_yield = min_depth <= depth and (max_depth < 0 or depth <= max_depth)
for attr in P115QueryDB.iter_children(
con,
parent_id,
fields=children_fields,
ensure_file=children_ensure_file,
):
if with_depth:
attr["depth"] = depth
if with_ancestors:
attr["ancestors"] = [
*ancestors,
{k: attr[k] for k in ("id", "parent_id", "name")},
]
if with_path or with_relpath:
name = attr["name"]
if escape is not None:
name = escape(name)
if with_path:
attr["path"] = dir_ + name
if with_relpath:
attr["relpath"] = reldir + name
if will_step_in and attr["is_dir"]:
send((
attr["id"],
depth,
attr["ancestors"] if with_ancestors else None,
attr["path"] + "/" if with_path else "",
attr["relpath"] + "/" if with_relpath else "",
))
if will_yield:
yield from may_yield(attr)
else:
cache: dict[Iterator, dict] = {}
stack: list[tuple[Iterator[dict], list[dict], str, str]] = [(
iter(tuple(P115QueryDB.iter_children(
con,
parent_id,
fields=children_fields,
ensure_file=children_ensure_file,
))), ancestors, dir_, reldir)]
depth = 0
while depth >= 0:
attrs, ancestors, dir_, reldir = stack[depth]
depth += 1
will_step_in = max_depth < 0 or depth < max_depth
will_yield = min_depth <= depth and (max_depth < 0 or depth <= max_depth)
for attr in attrs:
if with_depth:
attr["depth"] = depth
if with_ancestors:
attr["ancestors"] = [
*ancestors,
{k: attr[k] for k in ("id", "parent_id", "name")},
]
if with_path or with_relpath:
name = attr["name"]
if escape is not None:
name = escape(name)
if with_path:
attr["path"] = dir_ + name
if with_relpath:
attr["relpath"] = reldir + name
if will_yield and topdown:
yield from may_yield(attr)
if will_step_in and attr["is_dir"]:
attrs = iter(tuple(P115QueryDB.iter_children(
con,
attr["id"],
fields=children_fields,
ensure_file=children_ensure_file,
)))
quadruple = (
attrs,
attr["ancestors"] if with_ancestors else ancestors,
attr["path"] + "/" if with_path else "",
attr["relpath"] + "/" if with_relpath else "",
)
try:
stack[depth] = quadruple
except IndexError:
stack.append(quadruple)
if will_yield and not topdown:
cache[attrs] = attr
break
if will_yield and not topdown:
yield from may_yield(attr)
else:
if cache and attrs in cache:
yield from may_yield(cache.pop(attrs))
depth -= 2
@overload
def iter_descendants_bfs(
self,
parent_id: int = 0,
/,
*,
fields: str,
escape: None | bool | Callable[[str], str] = True,
) -> Iterator[Any]:
...
@overload
def iter_descendants_bfs(
self,
parent_id: int = 0,
/,
*,
fields: Collection[str] | bool = True,
escape: None | bool | Callable[[str], str] = True,
) -> Iterator[dict]:
...
[docs]
def iter_descendants_bfs(
self,
parent_id: int = 0,
/,
*,
fields: str | Collection[str] | bool = True,
escape: None | bool | Callable[[str], str] = True,
) -> Iterator:
"""获取某个目录之下的所有节点信息
.. caution::
当 ``fields`` 为空时,获取全部字段
:param self: P115QueryDB 实例或者数据库连接或游标
:param parent_id: 顶层目录的 id
:param fields: 需要获取的字段
- 如果为 str,直接获取这个字段的值(不返回字典)
- 如果为 True,获取所有字段,且包括("ancestors", "path", "relpath", "depth"),返回字典
- 如果为 False,获取所有字段,但除了("ancestors", "path", "relpath", "depth"),返回字典
- 否则,获取所指定的这组字段,返回字典
:param escape: 对文件名进行转义
- 如果为 None,则不处理;否则,这个函数用来对文件名中某些符号进行转义,例如 "/" 等
- 如果为 True,则使用 `posixpatht.escape`,会对文件名中 "/",或单独出现的 "." 和 ".." 用 "\\" 进行转义
- 如果为 False,则使用 `posix_escape_name` 函数对名字进行转义,会把文件名中的 "/" 转换为 "|"
- 如果为 Callable,则用你所提供的调用,以或者转义后的名字
:return: 迭代器,产生一组信息的字典,大概包含如下字段:
.. code:: python
(
# NOTE: 这些具体由你的数据库 ``data`` 表确定
"id", "parent_id", "name", "sha1", "size", "pickcode",
"is_dir", "is_alive", "extra", "created_at", "updated_at",
# NOTE: 这些是另外附加的字段
"ancestors", "path", "relpath", "depth",
)
"""
con: Any
if isinstance(self, P115QueryDB):
con = self.con
else:
con = self
if isinstance(escape, bool):
if escape:
from posixpatht import escape
else:
escape = posix_escape_name
escape = cast(None | Callable[[str], str], escape)
field: str = ""
if isinstance(fields, bool) or not fields:
with_depth = with_ancestors = with_path = with_relpath = fields if isinstance(fields, bool) else True
fields = "*"
fields1 = "*"
fields2 = "data.*"
else:
if isinstance(fields, str):
field = fields
fields = {field}
else:
fields = set(fields)
fields.add("id")
with_depth = "depth" in fields
with_ancestors = "ancestors" in fields
with_path = "path" in fields
with_relpath = "relpath" in fields
if with_depth or with_ancestors or with_path or with_relpath:
fields.add("parent_id")
fields.add("name")
fields -= frozenset(("depth", "ancestors", "path", "relpath"))
fields1 = ",".join(fields)
fields2 = ",".join("data." + f for f in fields)
sql = f"""\
WITH t AS (
SELECT {fields1} FROM data WHERE parent_id={parent_id:d} AND is_alive
UNION ALL
SELECT {fields2} FROM t JOIN data ON(t.id = data.parent_id) WHERE data.is_alive
) SELECT * FROM t"""
row_factory: str | Callable = "any"
if with_depth or with_ancestors or with_path or with_relpath:
if with_depth:
d_depth = {parent_id: 0}
if with_ancestors or with_path:
if parent_id:
ancestors = P115QueryDB.get_ancestors(con, parent_id)
if with_ancestors:
d_ancestors = {parent_id: ancestors}
if with_path:
if escape is None:
d_path = {parent_id: "".join(a["name"] + "/" for a in ancestors)}
else:
d_path = {parent_id: "".join(escape(a["name"]) + "/" for a in ancestors)}
else:
if with_ancestors:
d_ancestors = {0: [{"id": 0, "parent_id": 0, "name": ""}]}
if with_path:
d_path = {0: "/"}
if with_relpath:
d_relpath = {parent_id: ""}
cursor_fields: None | tuple[str, ...] = None
def row_factory(cursor, record, /):
nonlocal cursor_fields
if cursor_fields is None:
cursor_fields = tuple(f[0] for f in cursor.description)
attr = dict(zip(cursor_fields, record))
id, pid, name = attr["id"], attr["parent_id"], attr["name"]
if with_depth:
d_depth[id] = attr["depth"] = d_depth[pid] + 1
if with_ancestors:
attr["ancestors"] = [*d_ancestors[pid], {"id": id, "parent_id": pid, "name": name}]
if escape is not None and (with_path or with_relpath):
name = escape(name)
if with_path:
attr["path"] = d_path[pid] + name
if with_relpath:
attr["relpath"] = d_relpath[pid] + name
if attr.get("is_dir", True):
if with_ancestors:
d_ancestors[id] = attr["ancestors"]
if with_path:
d_path[id] = attr["path"] + "/"
if with_relpath:
d_relpath[id] = attr["relpath"] + "/"
if field:
return attr[field]
return attr
elif field:
row_factory = "one"
else:
row_factory = "dict"
return query(con, sql, row_factory=row_factory)
[docs]
def iter_dup_files(
self,
/,
) -> Iterator[dict]:
"""罗列所有重复文件
:param self: P115QueryDB 实例或者数据库连接或游标
:return: 迭代器,一组文件的信息
"""
con: Any
if isinstance(self, P115QueryDB):
con = self.con
else:
con = self
sql = f"""\
WITH stats AS (
SELECT
COUNT(1) OVER w AS total,
ROW_NUMBER() OVER w AS nth,
*
FROM data
WHERE NOT is_dir AND is_alive
WINDOW w AS (PARTITION BY sha1, size)
)
SELECT * FROM stats WHERE total > 1"""
return query(con, sql, row_factory="dict")
[docs]
def iter_existing_id(
self,
ids: Iterable[int],
/,
is_alive: bool = True,
) -> Iterator[int]:
"""筛选出一系列 id 中,在数据库中存在的
:param self: P115QueryDB 实例或者数据库连接或游标
:param ids: 一系列的节点 id
:param is_alive: 是否存活
:return: 一系列 id 中在数据库中存在的那些的迭代器(实际直接返回一个游标)
"""
con: Any
if isinstance(self, P115QueryDB):
con = self.con
else:
con = self
sql = "SELECT id FROM data WHERE id IN (%s)" % (",".join(map("%d".__mod__, ids)) or "NULL")
if is_alive:
sql += " AND is_alive"
return query(con, sql, row_factory="one")
[docs]
def iter_id_to_parent_id(
self,
ids: Iterable[int],
/,
recursive: bool = False,
) -> Iterator[tuple[int, int]]:
"""找出一系列 id 所对应的父 id,返回 ``(id, parent_id)`` 的 2 元组
:param self: P115QueryDB 实例或者数据库连接或游标
:param ids: 一系列的节点 id
:param recursive: 是否递归,如果为 True,则还会处理它们的祖先节点
:return: 迭代器,产生 ``(id, parent_id)`` 的 2 元组(实际直接返回一个游标)
"""
con: Any
if isinstance(self, P115QueryDB):
con = self.con
else:
con = self
s_ids = "(%s)" % (",".join(map(str, ids)) or "NULL")
if recursive:
sql = """\
WITH pairs AS (
SELECT id, parent_id FROM data WHERE id IN %s
UNION ALL
SELECT data.id, data.parent_id FROM pairs JOIN data ON (pairs.parent_id = data.id)
) SELECT * FROM pairs""" % s_ids
else:
sql = "SELECT id, parent_id FROM data WHERE id IN %s" % s_ids
return query(con, sql)
[docs]
def iter_id_to_path(
self,
/,
path: str | Sequence[str] = "",
ensure_file: None | bool = None,
parent_id: int = 0,
is_alive: bool = True,
) -> Iterator[int]:
"""查询匹配某个路径的文件或目录的信息字典
.. note::
同一个路径可以有多条对应的数据
:param self: P115QueryDB 实例或者数据库连接或游标
:param path: 路径
:param ensure_file: 是否文件
- 如果为 True,必须是文件
- 如果为 False,必须是目录
- 如果为 None,可以是文件或目录
:param parent_id: 顶层目录的 id
:return: 迭代器,产生一组匹配指定路径的(文件或目录)节点的 id
"""
con: Any
if isinstance(self, P115QueryDB):
con = self.con
else:
con = self
patht: Sequence[str]
if isinstance(path, str):
if ensure_file is None and path_is_dir_form(path):
ensure_file = False
patht, _ = splits("/" + path)
else:
patht = ("", *filter(None, path))
if not parent_id and len(patht) == 1:
return iter((0,))
insertion = " AND is_alive" if is_alive else ""
if len(patht) > 2:
sql = f"SELECT id FROM data WHERE parent_id=? AND name=?{insertion} AND is_dir LIMIT 1"
for name in patht[1:-1]:
parent_id = find(con, sql, (parent_id, name), default=-1)
if parent_id < 0:
return iter(())
sql = f"SELECT id FROM data WHERE parent_id=? AND name=?{insertion}"
if ensure_file is None:
sql += " ORDER BY is_dir DESC"
elif ensure_file:
sql += " AND NOT is_dir"
else:
sql += " AND is_dir LIMIT 1"
return query(con, sql, (parent_id, patht[-1]), row_factory="one")
[docs]
def iter_parent_id(
self,
ids: Iterable[int],
/,
) -> Iterator[int]:
"""找出一系列 id 所对应的父 id
:param self: P115QueryDB 实例或者数据库连接或游标
:param ids: 一系列的节点 id
:return: 它们的父 id 的迭代器(实际直接返回一个游标)
"""
con: Any
if isinstance(self, P115QueryDB):
con = self.con
else:
con = self
sql = "SELECT parent_id FROM data WHERE id IN (%s)" % (",".join(map("%d".__mod__, ids)) or "NULL")
return query(con, sql, row_factory="one")
[docs]
def select_na_ids(
self,
/,
) -> set[int]:
"""找出所有的失效节点和悬空节点的 id
.. note::
悬空节点,就是此节点有一个祖先节点的 parant_id,不为 0 且不在 `data` 表中
:param self: P115QueryDB 实例或者数据库连接或游标
:return: 一组悬空节点的 id 的集合
"""
con: Any
if isinstance(self, P115QueryDB):
con = self.con
else:
con = self
ok_ids: set[int] = set(query(con, "SELECT id FROM data WHERE NOT is_alive", row_factory="one"))
na_ids: set[int] = set()
d = dict(query(con, "SELECT id, parent_id FROM data WHERE is_alive"))
temp: list[int] = []
push = temp.append
clear = temp.clear
update_ok = ok_ids.update
update_na = na_ids.update
for k in d:
try:
push(k)
while k := d[k]:
if k in ok_ids:
update_ok(temp)
break
elif k in na_ids:
update_na(temp)
break
push(k)
else:
update_ok(temp)
except KeyError:
update_na(temp)
finally:
clear()
return na_ids