普通视图

发现新文章,点击刷新页面。
昨天 — 2025年12月28日首页

Node.js 原生实现JSON-RPC及长进程双向通信实践

作者 前端流一
2025年12月28日 18:22

Node.js 原生实现JSON-RPC及长进程双向通信实践

问题

由于负责的业务项目中原来的架构每个操作都 spawn 一个 Python大模型的 进程,执行完就退出。当 Agent 需要人机交互(help_needed)时,进程已经结束了,submitUserResponse 没法把响应传回去。

解决思路

改成长进程,Python 启动后不退出,通过 stdin/stdout 持续通信。

协议设计

用 JSON-RPC 风格,每行一个 JSON。

请求(JS → Python stdin):

{"jsonrpc":"2.0","id":"req_1","method":"get_llm_providers","params":{}}

响应(Python stdout → JS):

[RESPONSE]{"jsonrpc":"2.0","id":"req_1","result":{...}}

事件通知(Python → JS):

[EVENT]{"jsonrpc":"2.0","method":"help_needed","params":{"query":"需要什么帮助?"}}

加前缀 [RESPONSE][EVENT] 是为了和其他日志区分开。

Python 端实现

核心是一个死循环读 stdin:

class DaemonWrapper:
    def __init__(self):
        self.wrapper = BRTWrapper()  # 复用同一个实例,状态保持
        self.running = True
        
    async def read_stdin_line(self):
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(None, sys.stdin.readline)
        
    async def run(self):
        self.send_event('ready', {'message': '守护进程已就绪'})
        
        while self.running:
            line = await self.read_stdin_line()
            if not line:  # EOF
                break
            request = json.loads(line.strip())
            asyncio.create_task(self.handle_request(request))

关键点:

  1. run_in_executor 把同步的 readline 变成异步,不阻塞事件循环
  2. create_task 处理请求,不阻塞主循环继续读 stdin
  3. BRTWrapper 实例复用,asyncio.Event 等状态都在

JS 端实现

spawn 进程后,监听 stdout 解析响应:

async startDaemon() {
    this.daemonProcess = spawn('/bin/bash', [this.runScript, '--daemon'], {
        stdio: ['pipe', 'pipe', 'pipe']
    })
    
    this.daemonProcess.stdout.on('data', (data) => {
        this._handleDaemonOutput(data.toString())
    })
}

_handleDaemonOutput(data) {
    // 处理跨行数据
    this.inputBuffer += data
    const lines = this.inputBuffer.split('\n')
    this.inputBuffer = lines.pop() || ''
    
    for (const line of lines) {
        if (line.startsWith('[RESPONSE]')) {
            const response = JSON.parse(line.substring(10))
            this._handleDaemonResponse(response)
        } else if (line.startsWith('[EVENT]')) {
            const event = JSON.parse(line.substring(7))
            this._handleDaemonEvent(event)
        }
    }
}

发请求就是往 stdin 写:

async _sendRequest(method, params = {}) {
    const requestId = `req_${++this.requestIdCounter}`
    
    return new Promise((resolve, reject) => {
        this.pendingRequests.set(requestId, { resolve, reject })
        
        const request = { jsonrpc: '2.0', id: requestId, method, params }
        this.daemonProcess.stdin.write(JSON.stringify(request) + '\n')
    })
}

踩坑

1. stdout 数据分片

stdout 的 data 事件不保证按行来,可能一次收到半行,也可能一次收到好几行。必须用 buffer 拼接,按 \n 切分。

2. 长任务阻塞

python的大模型任务跑起来可能几分钟,不能阻塞 stdin 读取,否则 submitUserResponse 发过来收不到。用 create_task 把长任务丢后台。

3. stdin 是同步的

Python 的 sys.stdin.readline() 是同步阻塞的,直接 await 会卡住事件循环。必须用 run_in_executor 扔到线程池。

4. 进程清理

关闭时先尝试发 shutdown 命令优雅退出,超时后 SIGTERM,再不行 SIGKILL。Windows 用 taskkill。

效果

const client = new BundleBRTClient(bundlePath)

// 监听需要帮助事件
client.on('help_needed', async (data) => {
    const answer = await promptUser(data.query)
    await client.submitUserResponse(answer)  // 传回同一个进程
})

await client.startBRTask('...')  // 任务跑在后台
await client.getTaskStatus()          // 复用同一进程
await client.close()                  // 清理

进程启动一次,后续所有调用都是 stdin/stdout 通信,状态保持,多轮对话可以正常工作。

❌
❌