Node.js 原生实现JSON-RPC及长进程双向通信实践
Node.js 原生实现JSON-RPC及长进程双向通信实践
问题
由于负责的业务项目中原来的架构每个操作都 spawn 一个 Python大模型的 进程,执行完就退出。当 Agent 需要人机交互(help_needed)时,进程已经结束了,submitUserResponse 没法把响应传回去。
解决思路
改成长进程,Python 启动后不退出,通过 stdin/stdout 持续通信。
协议设计
用 JSON-RPC 风格,每行一个 JSON。
请求(JS → Python stdin):
{"jsonrpc":"2.0","id":"req_1","method":"get_llm_providers","params":{}}
响应(Python stdout → JS):
[RESPONSE]{"jsonrpc":"2.0","id":"req_1","result":{...}}
事件通知(Python → JS):
[EVENT]{"jsonrpc":"2.0","method":"help_needed","params":{"query":"需要什么帮助?"}}
加前缀 [RESPONSE] 和 [EVENT] 是为了和其他日志区分开。
Python 端实现
核心是一个死循环读 stdin:
class DaemonWrapper:
def __init__(self):
self.wrapper = BRTWrapper() # 复用同一个实例,状态保持
self.running = True
async def read_stdin_line(self):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, sys.stdin.readline)
async def run(self):
self.send_event('ready', {'message': '守护进程已就绪'})
while self.running:
line = await self.read_stdin_line()
if not line: # EOF
break
request = json.loads(line.strip())
asyncio.create_task(self.handle_request(request))
关键点:
-
run_in_executor把同步的readline变成异步,不阻塞事件循环 -
create_task处理请求,不阻塞主循环继续读 stdin -
BRTWrapper实例复用,asyncio.Event等状态都在
JS 端实现
spawn 进程后,监听 stdout 解析响应:
async startDaemon() {
this.daemonProcess = spawn('/bin/bash', [this.runScript, '--daemon'], {
stdio: ['pipe', 'pipe', 'pipe']
})
this.daemonProcess.stdout.on('data', (data) => {
this._handleDaemonOutput(data.toString())
})
}
_handleDaemonOutput(data) {
// 处理跨行数据
this.inputBuffer += data
const lines = this.inputBuffer.split('\n')
this.inputBuffer = lines.pop() || ''
for (const line of lines) {
if (line.startsWith('[RESPONSE]')) {
const response = JSON.parse(line.substring(10))
this._handleDaemonResponse(response)
} else if (line.startsWith('[EVENT]')) {
const event = JSON.parse(line.substring(7))
this._handleDaemonEvent(event)
}
}
}
发请求就是往 stdin 写:
async _sendRequest(method, params = {}) {
const requestId = `req_${++this.requestIdCounter}`
return new Promise((resolve, reject) => {
this.pendingRequests.set(requestId, { resolve, reject })
const request = { jsonrpc: '2.0', id: requestId, method, params }
this.daemonProcess.stdin.write(JSON.stringify(request) + '\n')
})
}
踩坑
1. stdout 数据分片
stdout 的 data 事件不保证按行来,可能一次收到半行,也可能一次收到好几行。必须用 buffer 拼接,按 \n 切分。
2. 长任务阻塞
python的大模型任务跑起来可能几分钟,不能阻塞 stdin 读取,否则 submitUserResponse 发过来收不到。用 create_task 把长任务丢后台。
3. stdin 是同步的
Python 的 sys.stdin.readline() 是同步阻塞的,直接 await 会卡住事件循环。必须用 run_in_executor 扔到线程池。
4. 进程清理
关闭时先尝试发 shutdown 命令优雅退出,超时后 SIGTERM,再不行 SIGKILL。Windows 用 taskkill。
效果
const client = new BundleBRTClient(bundlePath)
// 监听需要帮助事件
client.on('help_needed', async (data) => {
const answer = await promptUser(data.query)
await client.submitUserResponse(answer) // 传回同一个进程
})
await client.startBRTask('...') // 任务跑在后台
await client.getTaskStatus() // 复用同一进程
await client.close() // 清理
进程启动一次,后续所有调用都是 stdin/stdout 通信,状态保持,多轮对话可以正常工作。