04.拉取websocket消息¶
115 网盘可以用 websocket 拉取部分消息,例如文件删除、离线任务重试等。只不过能获得的消息类型有限,覆盖范围极小,所以暂时没有什么利用价值。
1. 使用 websocket-client 拉取¶
#!/usr/bin/env python3
# encoding: utf-8
__author__ = "ChenyangGao <https://chenyanggao.github.io>"
__all__ = ["connect_to_websocket"]
__doc__ = """115 websocket信息收集:用 websocket-client 模块
https://websocket-client.readthedocs.io/en/latest/
"""
from collections.abc import Callable
from _thread import start_new_thread
from time import sleep
from p115 import check_response, P115Client
from orjson import dumps, loads
from websocket import enableTrace, WebSocket, WebSocketApp, WebSocketConnectionClosedException
def connect_to_websocket(
client: str | P115Client,
collect: Callable = lambda message: print(f"Received: {message}"),
session_id: int = 0,
sequence_id_from: int = 1,
ping_interval: int = 40,
*,
debug: bool = False,
):
"""获取 115 的 websocket 消息
:param client: 115 的客户端或 cookies
:param collect: 自定义的消息收集函数
:param session_id: websocket 的会话 id
:param sequence_id_from: 会话从此序列 id 开始
:param ping_interval: 两次 ping 之间的时间间隔
:param debug: 是否开启调试
"""
enableTrace(debug)
sequence_id = sequence_id_from + 1
max_id = 0
def send_ping_periodically(websocket: WebSocket, /):
nonlocal sequence_id
while websocket.sock:
sleep(ping_interval)
try:
websocket.send(dumps({"cmd_key":2,"sequence_id":sequence_id}))
sequence_id += 1
except WebSocketConnectionClosedException:
print("WebSocket connection closed, stopping ping thread.")
break
except Exception as e:
print(f"Error sending ping: {e}")
break
def on_open(websocket: WebSocket, /):
nonlocal sequence_id
print(f"Connected to {uri}")
websocket.send(dumps({
"body": '{"id":0,"type":"115"}',
"cmd_key": 16781314,
"sequence_id": sequence_id,
}))
sequence_id += 1
start_new_thread(send_ping_periodically, (websocket,))
def on_message(websocket: WebSocket, raw_message, /):
nonlocal sequence_id, max_id
message = loads(raw_message)
if "pong" in message:
return
if body := message.get("body"):
message["body"] = loads(body)
collect(message)
match message.get("type"):
case 116:
websocket.send(dumps({
"body": '{"id":%s,"type":"115"}' % max_id,
"cmd_key": 16781314,
"sequence_id": sequence_id,
}))
sequence_id += 1
case 511:
max_id = message["body"]["data"].get("max_id") or max_id
def on_error(websocket: WebSocket, error, /):
print(f"Error: {error}")
def on_close(websocket: WebSocket, close_status_code, close_msg, /):
print(f"Connection closed: {close_status_code} - {close_msg}")
if isinstance(client, str):
client = P115Client(client, check_for_relogin=True)
if session_id:
data = {
"server": "ws.115.com:8280",
"session_id": session_id,
"user_id": client.user_id,
}
else:
resp = client.msg_get_websocket_host()
check_response(resp)
data = resp["data"]
data["sequence_id"] = sequence_id_from
uri = "wss://{server}/?uid={user_id}&client_version=1&client_type=5&sequence_id={sequence_id}&session={session_id}".format_map(data)
websocket = WebSocketApp(
uri,
header={"cookie": client.cookies_str},
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close,
)
websocket.run_forever()
if __name__ == "__main__":
from pathlib import Path
client = P115Client(Path("~/115-cookies.txt").expanduser())
connect_to_websocket(client)
2. 使用 websockets 拉取¶
#!/usr/bin/env python3
# encoding: utf-8
__author__ = "ChenyangGao <https://chenyanggao.github.io>"
__all__ = ["connect_to_websocket"]
__doc__ = """115 websocket 信息收集:用 websockets 模块
https://websockets.readthedocs.io/en/latest/
"""
from asyncio import sleep as async_sleep, create_task
from collections.abc import Callable
from _thread import start_new_thread
from time import sleep
from typing import Literal
from iterutils import run_gen_step, with_iter_next
from orjson import dumps, loads
from p115client import check_response, P115Client
from websockets.exceptions import ConnectionClosed, ConnectionClosedOK, ConnectionClosedError
from websockets.asyncio.client import connect as async_connect, ClientConnection as AsyncClientConnection
from websockets.sync.client import connect, ClientConnection
def connect_to_websocket(
client: str | P115Client,
collect: Callable = lambda message: print(f"Received: {message}"),
session_id: int = 0,
sequence_id_from: int = 1,
ping_interval: int = 40,
*,
async_: Literal[False, True] = False,
):
"""获取 115 的 websocket 消息
:param client: 115 的客户端或 cookies
:param collect: 自定义的消息收集函数
:param session_id: websocket 的会话 id
:param sequence_id_from: 会话从此序列 id 开始
:param ping_interval: 两次 ping 之间的时间间隔
:param async_: 是否异步
"""
if isinstance(client, str):
client = P115Client(client, check_for_relogin=True)
def gen_step():
sequence_id = sequence_id_from + 1
max_id = 0
if session_id:
data = {
"server": "ws.115.com:8280",
"session_id": session_id,
"user_id": client.user_id,
}
else:
resp = yield client.msg_get_websocket_host(async_=async_)
check_response(resp)
data = resp["data"]
data["sequence_id"] = sequence_id_from
uri = "wss://{server}?uid={user_id}&client_version=1&client_type=5&sequence_id={sequence_id}&session={session_id}".format_map(resp["data"])
websocket: ClientConnection | AsyncClientConnection
if async_:
websocket = yield async_connect(uri, additional_headers={"cookie": client.cookies_str})
else:
websocket = connect(uri, additional_headers={"cookie": client.cookies_str})
try:
print(f"Connected to {uri}")
yield websocket.send(dumps({
"body": '{"id":0,"type":"115"}',
"cmd_key": 16781314,
"sequence_id": sequence_id,
}))
sequence_id += 1
if async_:
async def asend_ping_periodically(websocket, /):
nonlocal sequence_id
while not websocket.close_code:
await async_sleep(ping_interval)
try:
await websocket.send(dumps({"cmd_key": 2, "sequence_id": sequence_id}))
sequence_id += 1
except ConnectionClosed:
print("WebSocket connection closed, stopping ping thread.")
break
except Exception as e:
print(f"Error sending ping: {e}")
break
background_ping = create_task(asend_ping_periodically(websocket))
else:
def send_ping_periodically(websocket, /):
nonlocal sequence_id
while not websocket.close_code:
sleep(ping_interval)
try:
websocket.send(dumps({"cmd_key": 2, "sequence_id": sequence_id}))
sequence_id += 1
except ConnectionClosed:
print("WebSocket connection closed, stopping ping thread.")
break
except Exception as e:
print(f"Error sending ping: {e}")
break
start_new_thread(send_ping_periodically, (websocket,))
with with_iter_next(websocket) as do_next:
while True:
message = loads((yield do_next()))
if "pong" in message:
continue
if body := message.get("body"):
message["body"] = loads(body)
yield collect(message)
match message.get("type"):
case 116:
yield websocket.send(dumps({
"body": '{"id":%s,"type":"115"}' % max_id,
"cmd_key": 16781314,
"sequence_id": sequence_id,
}))
sequence_id += 1
case 511:
max_id = message["body"]["data"].get("max_id") or max_id
except ConnectionClosedOK:
print("Connection closed gracefully by server.")
except ConnectionClosedError as e:
print(f"Connection closed with error: {e}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
finally:
yield websocket.close()
return run_gen_step(gen_step, async_=async_)
if __name__ == "__main__":
from pathlib import Path
client = P115Client(Path("~/115-cookies.txt").expanduser())
connect_to_websocket(client)