04.拉取websocket消息

115 网盘可以用 websocket 拉取部分消息,例如文件删除、离线任务重试等。只不过能获得的消息类型有限,覆盖范围极小,所以暂时没有什么利用价值。

1. 使用 websocket-client 拉取

#!/usr/bin/env python3
# encoding: utf-8

__author__ = "ChenyangGao <https://chenyanggao.github.io>"
__all__ = ["connect_to_websocket"]
__doc__ = """115 websocket信息收集:用 websocket-client 模块

https://websocket-client.readthedocs.io/en/latest/
"""

from collections.abc import Callable
from _thread import start_new_thread
from time import sleep

from p115 import check_response, P115Client
from orjson import dumps, loads
from websocket import enableTrace, WebSocket, WebSocketApp, WebSocketConnectionClosedException


def connect_to_websocket(
    client: str | P115Client, 
    collect: Callable = lambda message: print(f"Received: {message}"), 
    session_id: int = 0, 
    sequence_id_from: int = 1, 
    ping_interval: int = 40, 
    *, 
    debug: bool = False, 
):
    """获取 115 的 websocket 消息

    :param client: 115 的客户端或 cookies
    :param collect: 自定义的消息收集函数
    :param session_id: websocket 的会话 id
    :param sequence_id_from: 会话从此序列 id 开始
    :param ping_interval: 两次 ping 之间的时间间隔
    :param debug: 是否开启调试
    """
    enableTrace(debug)

    sequence_id = sequence_id_from + 1
    max_id = 0

    def send_ping_periodically(websocket: WebSocket, /):
        nonlocal sequence_id
        while websocket.sock:
            sleep(ping_interval)
            try:
                websocket.send(dumps({"cmd_key":2,"sequence_id":sequence_id}))
                sequence_id += 1
            except WebSocketConnectionClosedException:
                print("WebSocket connection closed, stopping ping thread.")
                break
            except Exception as e:
                print(f"Error sending ping: {e}")
                break

    def on_open(websocket: WebSocket, /):
        nonlocal sequence_id
        print(f"Connected to {uri}")
        websocket.send(dumps({
            "body": '{"id":0,"type":"115"}', 
            "cmd_key": 16781314, 
            "sequence_id": sequence_id, 
        }))
        sequence_id += 1
        start_new_thread(send_ping_periodically, (websocket,))

    def on_message(websocket: WebSocket, raw_message, /):
        nonlocal sequence_id, max_id
        message = loads(raw_message)
        if "pong" in message:
            return
        if body := message.get("body"):
            message["body"] = loads(body)
        collect(message)
        match message.get("type"):
            case 116:
                websocket.send(dumps({
                    "body": '{"id":%s,"type":"115"}' % max_id, 
                    "cmd_key": 16781314, 
                    "sequence_id": sequence_id, 
                }))
                sequence_id += 1
            case 511:
                max_id = message["body"]["data"].get("max_id") or max_id

    def on_error(websocket: WebSocket, error, /):
        print(f"Error: {error}")

    def on_close(websocket: WebSocket, close_status_code, close_msg, /):
        print(f"Connection closed: {close_status_code} - {close_msg}")

    if isinstance(client, str):
        client = P115Client(client, check_for_relogin=True)

    if session_id:
        data = {
            "server": "ws.115.com:8280", 
            "session_id": session_id, 
            "user_id": client.user_id, 
        }
    else:
        resp = client.msg_get_websocket_host()
        check_response(resp)
        data = resp["data"]
    data["sequence_id"] = sequence_id_from
    uri = "wss://{server}/?uid={user_id}&client_version=1&client_type=5&sequence_id={sequence_id}&session={session_id}".format_map(data)
    websocket = WebSocketApp(
        uri, 
        header={"cookie": client.cookies_str}, 
        on_open=on_open, 
        on_message=on_message, 
        on_error=on_error, 
        on_close=on_close, 
    )
    websocket.run_forever()


if __name__ == "__main__":
    from pathlib import Path

    client = P115Client(Path("~/115-cookies.txt").expanduser())
    connect_to_websocket(client)

2. 使用 websockets 拉取

#!/usr/bin/env python3
# encoding: utf-8

__author__ = "ChenyangGao <https://chenyanggao.github.io>"
__all__ = ["connect_to_websocket"]
__doc__ = """115 websocket 信息收集:用 websockets 模块

https://websockets.readthedocs.io/en/latest/
"""

from asyncio import sleep as async_sleep, create_task
from collections.abc import Callable
from _thread import start_new_thread
from time import sleep
from typing import Literal

from iterutils import run_gen_step, with_iter_next
from orjson import dumps, loads
from p115client import check_response, P115Client
from websockets.exceptions import ConnectionClosed, ConnectionClosedOK, ConnectionClosedError
from websockets.asyncio.client import connect as async_connect, ClientConnection as AsyncClientConnection
from websockets.sync.client import connect, ClientConnection


def connect_to_websocket(
    client: str | P115Client, 
    collect: Callable = lambda message: print(f"Received: {message}"), 
    session_id: int = 0, 
    sequence_id_from: int = 1, 
    ping_interval: int = 40, 
    *, 
    async_: Literal[False, True] = False, 
):
    """获取 115 的 websocket 消息

    :param client: 115 的客户端或 cookies
    :param collect: 自定义的消息收集函数
    :param session_id: websocket 的会话 id
    :param sequence_id_from: 会话从此序列 id 开始
    :param ping_interval: 两次 ping 之间的时间间隔
    :param async_: 是否异步
    """
    if isinstance(client, str):
        client = P115Client(client, check_for_relogin=True)
    def gen_step():
        sequence_id = sequence_id_from + 1
        max_id = 0
        if session_id:
            data = {
                "server": "ws.115.com:8280", 
                "session_id": session_id, 
                "user_id": client.user_id, 
            }
        else:
            resp = yield client.msg_get_websocket_host(async_=async_)
            check_response(resp)
            data = resp["data"]
        data["sequence_id"] = sequence_id_from
        uri = "wss://{server}?uid={user_id}&client_version=1&client_type=5&sequence_id={sequence_id}&session={session_id}".format_map(resp["data"])
        websocket: ClientConnection | AsyncClientConnection
        if async_:
            websocket = yield async_connect(uri, additional_headers={"cookie": client.cookies_str})
        else:
            websocket = connect(uri, additional_headers={"cookie": client.cookies_str})
        try:
            print(f"Connected to {uri}")
            yield websocket.send(dumps({
                "body": '{"id":0,"type":"115"}', 
                "cmd_key": 16781314, 
                "sequence_id": sequence_id, 
            }))
            sequence_id += 1
            if async_:
                async def asend_ping_periodically(websocket, /):
                    nonlocal sequence_id
                    while not websocket.close_code:
                        await async_sleep(ping_interval)
                        try:
                            await websocket.send(dumps({"cmd_key": 2, "sequence_id": sequence_id}))
                            sequence_id += 1
                        except ConnectionClosed:
                            print("WebSocket connection closed, stopping ping thread.")
                            break
                        except Exception as e:
                            print(f"Error sending ping: {e}")
                            break
                background_ping = create_task(asend_ping_periodically(websocket))
            else:
                def send_ping_periodically(websocket, /):
                    nonlocal sequence_id
                    while not websocket.close_code:
                        sleep(ping_interval)
                        try:
                            websocket.send(dumps({"cmd_key": 2, "sequence_id": sequence_id}))
                            sequence_id += 1
                        except ConnectionClosed:
                            print("WebSocket connection closed, stopping ping thread.")
                            break
                        except Exception as e:
                            print(f"Error sending ping: {e}")
                            break
                start_new_thread(send_ping_periodically, (websocket,))
            with with_iter_next(websocket) as do_next:
                while True:
                    message = loads((yield do_next()))
                    if "pong" in message:
                        continue
                    if body := message.get("body"):
                        message["body"] = loads(body)
                    yield collect(message)
                    match message.get("type"):
                        case 116:
                            yield websocket.send(dumps({
                                "body": '{"id":%s,"type":"115"}' % max_id, 
                                "cmd_key": 16781314, 
                                "sequence_id": sequence_id, 
                            }))
                            sequence_id += 1
                        case 511:
                            max_id = message["body"]["data"].get("max_id") or max_id
        except ConnectionClosedOK:
            print("Connection closed gracefully by server.")
        except ConnectionClosedError as e:
            print(f"Connection closed with error: {e}")
        except Exception as e:
            print(f"An unexpected error occurred: {e}")
        finally:
            yield websocket.close()
    return run_gen_step(gen_step, async_=async_)


if __name__ == "__main__":
    from pathlib import Path

    client = P115Client(Path("~/115-cookies.txt").expanduser())
    connect_to_websocket(client)