异步Socket通信
概述:
KBEngine 提供了一套文件描述符注册接口(registerFileDescriptor 系列API),允许开发者在 interfaces 进程(或其他单线程 app)中安全地使用原生 socket 进行异步网络通信,而不会阻塞主线程。
由于 interfaces 是单线程服务器,如果直接使用 Python 的同步 socket(如 socket.recv()),会卡住主线程导致整个进程无法响应其他请求。这套 API 通过将 socket 的文件描述符(fd)注册到引擎的事件循环中,在有事件到达时通过回调函数通知脚本层,从而实现完全异步非阻塞的网络 I/O。
这套 API 非常适合以下场景:
- 在
interfaces中启动 HTTP/TCP 服务器,接收第三方平台的回调通知(如充值回调、账号验证回调)。 - 与第三方平台建立长连接进行双向通信。
- 实现自定义的 TCP/UDP 协议服务。
TIP
文件描述符注册接口在 KBEngine 模块中,所有 app 进程均可使用,但在 interfaces 中最为常见。
API 介绍:
文件描述符注册接口由以下 5 个核心函数组成,分为监听类、读取类和写入类三种操作:
| 类别 | 函数 | 说明 |
|---|---|---|
| 监听 | registerAcceptFileDescriptor | 注册一个监听 socket 的 fd |
| 监听 | deregisterAcceptFileDescriptor | 取消注册监听 socket 的 fd |
| 读取 | registerReadDataFileDescriptor | 注册一个已连接 socket 的 fd |
| 读取 | deregisterReadDataFileDescriptor | 取消注册已连接 socket 的 fd |
| 写入 | writeFileDescriptor | 向已注册的 socket fd 写入数据 |
registerAcceptFileDescriptor
def KBEngine.registerAcceptFileDescriptor(fileno, callback):
将一个处于监听状态的 socket 的文件描述符注册到引擎的完成端口,当有新客户端连接到达时,引擎会回调指定的 callback 函数。
参数
| 参数名 | 参数类型 | 是否必要 | 介绍 |
|---|---|---|---|
| fileno | int | 必须 | 监听 socket 的文件描述符(socket.fileno())。 |
| callback | function | 必须 | 新连接到达时的回调函数,签名见下方。 |
回调函数签名
def callback(listenerFD, clientFD, errorCode):| 参数 | 类型 | 介绍 |
|---|---|---|
| listenerFD | int | 触发事件的监听 socket 的文件描述符。 |
| clientFD | int | 新接入的客户端 socket 的文件描述符,可用于后续读取/写入操作。 |
| errorCode | int | 错误码,0 表示成功,非 0 表示有错误发生。 |
deregisterAcceptFileDescriptor
def KBEngine.deregisterAcceptFileDescriptor(fileno):
将之前注册的监听 socket 文件描述符从引擎的完成端口中移除。通常在关闭服务器时调用。
参数
| 参数名 | 参数类型 | 是否必要 | 介绍 |
|---|---|---|---|
| fileno | int | 必须 | 监听 socket 的文件描述符(socket.fileno())。 |
registerReadDataFileDescriptor
def KBEngine.registerReadDataFileDescriptor(fileno, callback):
将一个已连接的 socket 的文件描述符注册到引擎的完成端口,当该 socket 上有数据可读时,引擎会回调指定的 callback 函数。
WARNING
必须在 registerAcceptFileDescriptor 的回调中拿到 clientFD 后,才能调用此函数进行注册。
参数
| 参数名 | 参数类型 | 是否必要 | 介绍 |
|---|---|---|---|
| fileno | int | 必须 | 已连接 socket 的文件描述符。 |
| callback | function | 必须 | 数据到达时的回调函数,签名见下方。 |
回调函数签名
def callback(fd, data, errorCode):| 参数 | 类型 | 介绍 |
|---|---|---|
| fd | int | 触发可读事件的 socket 文件描述符。 |
| data | bytes | 读取到的数据。当 len(data) == 0 时表示对方已断开连接。 |
| errorCode | int | 错误码,0 表示成功,非 0 表示有错误发生(此时应关闭该连接)。 |
deregisterReadDataFileDescriptor
def KBEngine.deregisterReadDataFileDescriptor(fileno):
将之前注册的已连接 socket 文件描述符从引擎的完成端口中移除。通常在关闭客户端连接时调用。
参数
| 参数名 | 参数类型 | 是否必要 | 介绍 |
|---|---|---|---|
| fileno | int | 必须 | 已连接 socket 的文件描述符。 |
writeFileDescriptor
def KBEngine.writeFileDescriptor(fileno, data, callback):
向一个已注册读取的 socket 文件描述符写入数据。写入完成后引擎会回调指定的 callback 函数。
WARNING
调用此函数前,目标 fd 必须已通过 registerReadDataFileDescriptor 注册。
参数
| 参数名 | 参数类型 | 是否必要 | 介绍 |
|---|---|---|---|
| fileno | int | 必须 | 目标 socket 的文件描述符。 |
| data | bytes | 必须 | 要发送的数据。 |
| callback | function | 必须 | 写入完成时的回调函数,签名见下方。 |
回调函数签名
def callback(fd, bytesWritten, errorCode):| 参数 | 类型 | 介绍 |
|---|---|---|
| fd | int | 触发写入完成事件的 socket 文件描述符。 |
| bytesWritten | int | 实际写入的字节数。 |
| errorCode | int | 错误码,0 表示成功,非 0 表示有错误发生。 |
API 调用流程:
使用文件描述符注册接口的典型流程如下:
1. 创建监听 socket(socket.socket + bind + listen)
↓
2. registerAcceptFileDescriptor(listenerFD, onAccept)
↓
3. 新连接到达 → onAccept(listenerFD, clientFD, errorCode) 被回调
↓
4. registerReadDataFileDescriptor(clientFD, onRead)
↓
5. 数据到达 → onRead(fd, data, errorCode) 被回调
↓
6. writeFileDescriptor(fd, response, onWriteComplete)
↓
7. 写入完成 → onWriteComplete(fd, bytesWritten, errorCode) 被回调
↓
8. deregisterReadDataFileDescriptor(fd) + socket.close()完整例子:HTTP 回调服务器
以下是一个完整的示例,展示如何在 interfaces 进程中启动一个 HTTP 服务器,用于接收第三方平台的回调通知。
这个例子由两部分组成:
- Poller.py:封装了 socket 监听、连接管理、数据读取和 HTTP 响应的完整逻辑。
- kbemain.py:
interfaces的入口脚本,在onInterfaceAppReady中启动 Poller。
1. Poller.py
import socket
import KBEngine
from KBEDebug import *
class Poller:
def __init__(self):
self._listener = None
self._clients = {}
def start(self, addr, port):
"""
启动 TCP 监听服务器
"""
self._listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._listener.bind((addr, port))
self._listener.listen(128)
# 将监听 socket 注册到引擎的完成端口
KBEngine.registerAcceptFileDescriptor(
self._listener.fileno(), self.onAccept
)
INFO_MSG("Poller::start: listen %s:%s" % (addr, port))
def stop(self):
"""
停止 TCP 监听服务器
"""
if self._listener:
# 取消监听 socket 的注册
KBEngine.deregisterAcceptFileDescriptor(self._listener.fileno())
self._listener.close()
self._listener = None
# 关闭所有客户端连接
for fd in list(self._clients.keys()):
self.closeClient(fd)
def onAccept(self, listenerFD, clientFD, errorCode):
"""
新客户端连接到达的回调
@param listenerFD: 监听 socket 的 fd
@param clientFD: 新客户端 socket 的 fd
@param errorCode: 错误码,0 表示成功
"""
if errorCode != 0:
ERROR_MSG("Poller::onAccept: listenerFD=%i error=%i"
% (listenerFD, errorCode))
return
# 通过 fd 包装一个 Python socket 对象
try:
sock = socket.socket(fileno=clientFD)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
except Exception:
ERROR_MSG("Poller::onAccept: wrap client socket failed, "
"clientFD=%i" % clientFD)
return
# 记录客户端信息
self._clients[clientFD] = {
"socket": sock,
"buffer": bytearray(),
"responded": False,
}
# 将客户端 socket 注册到引擎的完成端口,监听数据到达
KBEngine.registerReadDataFileDescriptor(clientFD, self.onRead)
DEBUG_MSG("Poller::onAccept: new clientFD=%i" % clientFD)
def onRead(self, fd, data, errorCode):
"""
客户端数据到达的回调
@param fd: 触发事件的 socket fd
@param data: 读取到的数据(bytes)
@param errorCode: 错误码,0 表示成功
"""
client = self._clients.get(fd)
if client is None:
return
if errorCode != 0:
ERROR_MSG("Poller::onRead: fd=%i error=%i" % (fd, errorCode))
self.closeClient(fd)
return
# len(data) == 0 表示对方关闭了连接
if len(data) == 0:
DEBUG_MSG("Poller::onRead: fd=%i disconnect" % fd)
self.closeClient(fd)
return
# 防止重复处理(本例中每个连接只处理一次请求)
if client["responded"]:
return
# 累积数据到缓冲区
client["buffer"].extend(data)
DEBUG_MSG("Poller::onRead: fd=%i dataSize=%i totalSize=%i"
% (fd, len(data), len(client["buffer"])))
# 简单判断 HTTP 请求头是否接收完整
if b"\r\n\r\n" not in client["buffer"]:
return
client["responded"] = True
self.processData(fd, bytes(client["buffer"]))
def processData(self, fd, data):
"""
处理收到的 HTTP 请求并发送响应
@param fd: 客户端 socket 的 fd
@param data: 完整的 HTTP 请求数据
"""
# 此处可以解析 data,根据请求内容做不同的业务处理
# 例如:解析第三方平台的充值回调参数,调用 KBEngine.chargeResponse 等
body = b"Hello KBEngine completion API\n"
response = (
b"HTTP/1.1 200 OK\r\n"
b"Content-Type: text/plain; charset=utf-8\r\n"
b"Content-Length: " + str(len(body)).encode("ascii") + b"\r\n"
b"Connection: close\r\n"
b"\r\n" +
body
)
# 通过引擎异步发送响应数据
KBEngine.writeFileDescriptor(fd, response, self.onWriteComplete)
def onWriteComplete(self, fd, bytesWritten, errorCode):
"""
数据写入完成的回调
@param fd: 目标 socket 的 fd
@param bytesWritten: 实际写入的字节数
@param errorCode: 错误码,0 表示成功
"""
if errorCode != 0:
ERROR_MSG("Poller::onWriteComplete: fd=%i error=%i"
% (fd, errorCode))
else:
DEBUG_MSG("Poller::onWriteComplete: fd=%i bytesWritten=%i"
% (fd, bytesWritten))
# 写入完成后关闭客户端连接
self.closeClient(fd)
def closeClient(self, fd):
"""
关闭客户端连接并清理资源
@param fd: 客户端 socket 的 fd
"""
client = self._clients.pop(fd, None)
if client is None:
return
# 取消客户端 socket 的读取注册
KBEngine.deregisterReadDataFileDescriptor(fd)
# 关闭 socket
try:
client["socket"].close()
except Exception:
pass
DEBUG_MSG("Poller::closeClient: fd=%i" % fd)2. kbemain.py(interfaces 入口脚本)
# -*- coding: utf-8 -*-
import os
import KBEngine
from KBEDebug import *
from Poller import Poller
g_poller = Poller()
def onInterfaceAppReady():
"""
KBEngine method.
interfaces 已经准备好了
"""
INFO_MSG('onInterfaceAppReady: bootstrapGroupIndex=%s, '
'bootstrapGlobalIndex=%s' %
(os.getenv("KBE_BOOTIDX_GROUP"),
os.getenv("KBE_BOOTIDX_GLOBAL")))
# 启动 HTTP 回调服务器,监听 127.0.0.1:30040
g_poller.start("127.0.0.1", 30040)
def onInterfaceAppShutDown():
"""
KBEngine method.
这个 interfaces 被关闭前的回调函数
"""
INFO_MSG('onInterfaceAppShutDown()')
g_poller.stop()3. 运行效果
启动服务器后,在命令行中使用 curl 测试:
curl http://127.0.0.1:30040/返回:
Hello KBEngine completion API引擎日志输出类似:
INFO: Poller::start: listen 127.0.0.1:30040
DEBUG: Poller::onAccept: new clientFD=123
DEBUG: Poller::onRead: fd=123 dataSize=78 totalSize=78
DEBUG: Poller::onWriteComplete: fd=123 bytesWritten=130
DEBUG: Poller::closeClient: fd=123注意事项:
仅在单线程 App 中必要:
interfaces是单线程的,因此必须使用文件描述符注册接口来避免阻塞。对于baseapp/cellapp,可以使用asyncio(参考 asyncio 支持)。正确管理 fd 生命周期:
- 监听 socket 应在进程关闭时调用
deregisterAcceptFileDescriptor取消注册。 - 每个客户端 socket 在关闭前必须调用
deregisterReadDataFileDescriptor取消注册,否则引擎会继续尝试读取已关闭的 fd。
- 监听 socket 应在进程关闭时调用
fd 包装 socket:在
onAccept中拿到的clientFD是一个整数,需要使用socket.socket(fileno=clientFD)将其包装为 Python 的 socket 对象,才能进行后续操作(如setsockopt、close等)。错误处理:每次回调都应检查
errorCode,非0时表示发生了错误,应当关闭对应的连接。数据边界:TCP 是流式协议,
onRead回调中收到的数据可能不完整。例子中使用b"\r\n\r\n"判断 HTTP 请求头是否接收完整,实际开发中需要根据具体协议处理粘包/拆包问题。与 HTTP 库的关系:如果只是需要对外发送 HTTP 请求,建议使用引擎内置的
KBEngine.urlopen()异步 HTTP 请求,无需自己封装 socket。文件描述符注册接口更适合需要接收外部主动推送的场景(如第三方平台的回调通知)。
