Skip to content

异步Socket通信

概述:

KBEngine 提供了一套文件描述符注册接口(registerFileDescriptor 系列API),允许开发者在 interfaces 进程(或其他单线程 app)中安全地使用原生 socket 进行异步网络通信,而不会阻塞主线程。

由于 interfaces 是单线程服务器,如果直接使用 Python 的同步 socket(如 socket.recv()),会卡住主线程导致整个进程无法响应其他请求。这套 API 通过将 socket 的文件描述符(fd)注册到引擎的事件循环中,在有事件到达时通过回调函数通知脚本层,从而实现完全异步非阻塞的网络 I/O。

这套 API 非常适合以下场景:

  • interfaces 中启动 HTTP/TCP 服务器,接收第三方平台的回调通知(如充值回调、账号验证回调)。
  • 与第三方平台建立长连接进行双向通信。
  • 实现自定义的 TCP/UDP 协议服务。

TIP

文件描述符注册接口在 KBEngine 模块中,所有 app 进程均可使用,但在 interfaces 中最为常见。

API 介绍:

文件描述符注册接口由以下 5 个核心函数组成,分为监听类、读取类和写入类三种操作:

类别函数说明
监听registerAcceptFileDescriptor注册一个监听 socket 的 fd
监听deregisterAcceptFileDescriptor取消注册监听 socket 的 fd
读取registerReadDataFileDescriptor注册一个已连接 socket 的 fd
读取deregisterReadDataFileDescriptor取消注册已连接 socket 的 fd
写入writeFileDescriptor向已注册的 socket fd 写入数据

registerAcceptFileDescriptor

def KBEngine.registerAcceptFileDescriptor(fileno, callback):

将一个处于监听状态的 socket 的文件描述符注册到引擎的完成端口,当有新客户端连接到达时,引擎会回调指定的 callback 函数。

参数

参数名参数类型是否必要介绍
filenoint必须监听 socket 的文件描述符(socket.fileno())。
callbackfunction必须新连接到达时的回调函数,签名见下方。

回调函数签名

python
def callback(listenerFD, clientFD, errorCode):
参数类型介绍
listenerFDint触发事件的监听 socket 的文件描述符。
clientFDint新接入的客户端 socket 的文件描述符,可用于后续读取/写入操作。
errorCodeint错误码,0 表示成功,非 0 表示有错误发生。

deregisterAcceptFileDescriptor

def KBEngine.deregisterAcceptFileDescriptor(fileno):

将之前注册的监听 socket 文件描述符从引擎的完成端口中移除。通常在关闭服务器时调用。

参数

参数名参数类型是否必要介绍
filenoint必须监听 socket 的文件描述符(socket.fileno())。

registerReadDataFileDescriptor

def KBEngine.registerReadDataFileDescriptor(fileno, callback):

将一个已连接的 socket 的文件描述符注册到引擎的完成端口,当该 socket 上有数据可读时,引擎会回调指定的 callback 函数。

WARNING

必须在 registerAcceptFileDescriptor 的回调中拿到 clientFD 后,才能调用此函数进行注册。

参数

参数名参数类型是否必要介绍
filenoint必须已连接 socket 的文件描述符。
callbackfunction必须数据到达时的回调函数,签名见下方。

回调函数签名

python
def callback(fd, data, errorCode):
参数类型介绍
fdint触发可读事件的 socket 文件描述符。
databytes读取到的数据。当 len(data) == 0 时表示对方已断开连接。
errorCodeint错误码,0 表示成功,非 0 表示有错误发生(此时应关闭该连接)。

deregisterReadDataFileDescriptor

def KBEngine.deregisterReadDataFileDescriptor(fileno):

将之前注册的已连接 socket 文件描述符从引擎的完成端口中移除。通常在关闭客户端连接时调用。

参数

参数名参数类型是否必要介绍
filenoint必须已连接 socket 的文件描述符。

writeFileDescriptor

def KBEngine.writeFileDescriptor(fileno, data, callback):

向一个已注册读取的 socket 文件描述符写入数据。写入完成后引擎会回调指定的 callback 函数。

WARNING

调用此函数前,目标 fd 必须已通过 registerReadDataFileDescriptor 注册。

参数

参数名参数类型是否必要介绍
filenoint必须目标 socket 的文件描述符。
databytes必须要发送的数据。
callbackfunction必须写入完成时的回调函数,签名见下方。

回调函数签名

python
def callback(fd, bytesWritten, errorCode):
参数类型介绍
fdint触发写入完成事件的 socket 文件描述符。
bytesWrittenint实际写入的字节数。
errorCodeint错误码,0 表示成功,非 0 表示有错误发生。

API 调用流程:

使用文件描述符注册接口的典型流程如下:

1. 创建监听 socket(socket.socket + bind + listen)

2. registerAcceptFileDescriptor(listenerFD, onAccept)

3. 新连接到达 → onAccept(listenerFD, clientFD, errorCode) 被回调

4. registerReadDataFileDescriptor(clientFD, onRead)

5. 数据到达 → onRead(fd, data, errorCode) 被回调

6. writeFileDescriptor(fd, response, onWriteComplete)

7. 写入完成 → onWriteComplete(fd, bytesWritten, errorCode) 被回调

8. deregisterReadDataFileDescriptor(fd) + socket.close()

完整例子:HTTP 回调服务器

以下是一个完整的示例,展示如何在 interfaces 进程中启动一个 HTTP 服务器,用于接收第三方平台的回调通知。

这个例子由两部分组成:

  • Poller.py:封装了 socket 监听、连接管理、数据读取和 HTTP 响应的完整逻辑。
  • kbemain.pyinterfaces 的入口脚本,在 onInterfaceAppReady 中启动 Poller。

1. Poller.py

python
import socket
import KBEngine
from KBEDebug import *


class Poller:
    def __init__(self):
        self._listener = None
        self._clients = {}

    def start(self, addr, port):
        """
        启动 TCP 监听服务器
        """
        self._listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self._listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self._listener.bind((addr, port))
        self._listener.listen(128)

        # 将监听 socket 注册到引擎的完成端口
        KBEngine.registerAcceptFileDescriptor(
            self._listener.fileno(), self.onAccept
        )
        INFO_MSG("Poller::start: listen %s:%s" % (addr, port))

    def stop(self):
        """
        停止 TCP 监听服务器
        """
        if self._listener:
            # 取消监听 socket 的注册
            KBEngine.deregisterAcceptFileDescriptor(self._listener.fileno())
            self._listener.close()
            self._listener = None

        # 关闭所有客户端连接
        for fd in list(self._clients.keys()):
            self.closeClient(fd)

    def onAccept(self, listenerFD, clientFD, errorCode):
        """
        新客户端连接到达的回调
        @param listenerFD: 监听 socket 的 fd
        @param clientFD: 新客户端 socket 的 fd
        @param errorCode: 错误码,0 表示成功
        """
        if errorCode != 0:
            ERROR_MSG("Poller::onAccept: listenerFD=%i error=%i"
                      % (listenerFD, errorCode))
            return

        # 通过 fd 包装一个 Python socket 对象
        try:
            sock = socket.socket(fileno=clientFD)
            sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
        except Exception:
            ERROR_MSG("Poller::onAccept: wrap client socket failed, "
                      "clientFD=%i" % clientFD)
            return

        # 记录客户端信息
        self._clients[clientFD] = {
            "socket": sock,
            "buffer": bytearray(),
            "responded": False,
        }

        # 将客户端 socket 注册到引擎的完成端口,监听数据到达
        KBEngine.registerReadDataFileDescriptor(clientFD, self.onRead)
        DEBUG_MSG("Poller::onAccept: new clientFD=%i" % clientFD)

    def onRead(self, fd, data, errorCode):
        """
        客户端数据到达的回调
        @param fd: 触发事件的 socket fd
        @param data: 读取到的数据(bytes)
        @param errorCode: 错误码,0 表示成功
        """
        client = self._clients.get(fd)
        if client is None:
            return

        if errorCode != 0:
            ERROR_MSG("Poller::onRead: fd=%i error=%i" % (fd, errorCode))
            self.closeClient(fd)
            return

        # len(data) == 0 表示对方关闭了连接
        if len(data) == 0:
            DEBUG_MSG("Poller::onRead: fd=%i disconnect" % fd)
            self.closeClient(fd)
            return

        # 防止重复处理(本例中每个连接只处理一次请求)
        if client["responded"]:
            return

        # 累积数据到缓冲区
        client["buffer"].extend(data)
        DEBUG_MSG("Poller::onRead: fd=%i dataSize=%i totalSize=%i"
                  % (fd, len(data), len(client["buffer"])))

        # 简单判断 HTTP 请求头是否接收完整
        if b"\r\n\r\n" not in client["buffer"]:
            return

        client["responded"] = True
        self.processData(fd, bytes(client["buffer"]))

    def processData(self, fd, data):
        """
        处理收到的 HTTP 请求并发送响应
        @param fd: 客户端 socket 的 fd
        @param data: 完整的 HTTP 请求数据
        """
        # 此处可以解析 data,根据请求内容做不同的业务处理
        # 例如:解析第三方平台的充值回调参数,调用 KBEngine.chargeResponse 等

        body = b"Hello KBEngine completion API\n"
        response = (
            b"HTTP/1.1 200 OK\r\n"
            b"Content-Type: text/plain; charset=utf-8\r\n"
            b"Content-Length: " + str(len(body)).encode("ascii") + b"\r\n"
            b"Connection: close\r\n"
            b"\r\n" +
            body
        )

        # 通过引擎异步发送响应数据
        KBEngine.writeFileDescriptor(fd, response, self.onWriteComplete)

    def onWriteComplete(self, fd, bytesWritten, errorCode):
        """
        数据写入完成的回调
        @param fd: 目标 socket 的 fd
        @param bytesWritten: 实际写入的字节数
        @param errorCode: 错误码,0 表示成功
        """
        if errorCode != 0:
            ERROR_MSG("Poller::onWriteComplete: fd=%i error=%i"
                      % (fd, errorCode))
        else:
            DEBUG_MSG("Poller::onWriteComplete: fd=%i bytesWritten=%i"
                      % (fd, bytesWritten))

        # 写入完成后关闭客户端连接
        self.closeClient(fd)

    def closeClient(self, fd):
        """
        关闭客户端连接并清理资源
        @param fd: 客户端 socket 的 fd
        """
        client = self._clients.pop(fd, None)
        if client is None:
            return

        # 取消客户端 socket 的读取注册
        KBEngine.deregisterReadDataFileDescriptor(fd)

        # 关闭 socket
        try:
            client["socket"].close()
        except Exception:
            pass

        DEBUG_MSG("Poller::closeClient: fd=%i" % fd)

2. kbemain.py(interfaces 入口脚本)

python
# -*- coding: utf-8 -*-
import os
import KBEngine
from KBEDebug import *
from Poller import Poller

g_poller = Poller()

def onInterfaceAppReady():
    """
    KBEngine method.
    interfaces 已经准备好了
    """
    INFO_MSG('onInterfaceAppReady: bootstrapGroupIndex=%s, '
             'bootstrapGlobalIndex=%s' %
             (os.getenv("KBE_BOOTIDX_GROUP"),
              os.getenv("KBE_BOOTIDX_GLOBAL")))

    # 启动 HTTP 回调服务器,监听 127.0.0.1:30040
    g_poller.start("127.0.0.1", 30040)

def onInterfaceAppShutDown():
    """
    KBEngine method.
    这个 interfaces 被关闭前的回调函数
    """
    INFO_MSG('onInterfaceAppShutDown()')
    g_poller.stop()

3. 运行效果

启动服务器后,在命令行中使用 curl 测试:

bash
curl http://127.0.0.1:30040/

返回:

Hello KBEngine completion API

引擎日志输出类似:

INFO: Poller::start: listen 127.0.0.1:30040
DEBUG: Poller::onAccept: new clientFD=123
DEBUG: Poller::onRead: fd=123 dataSize=78 totalSize=78
DEBUG: Poller::onWriteComplete: fd=123 bytesWritten=130
DEBUG: Poller::closeClient: fd=123

注意事项:

  1. 仅在单线程 App 中必要interfaces 是单线程的,因此必须使用文件描述符注册接口来避免阻塞。对于 baseapp / cellapp,可以使用 asyncio(参考 asyncio 支持)。

  2. 正确管理 fd 生命周期

    • 监听 socket 应在进程关闭时调用 deregisterAcceptFileDescriptor 取消注册。
    • 每个客户端 socket 在关闭前必须调用 deregisterReadDataFileDescriptor 取消注册,否则引擎会继续尝试读取已关闭的 fd。
  3. fd 包装 socket:在 onAccept 中拿到的 clientFD 是一个整数,需要使用 socket.socket(fileno=clientFD) 将其包装为 Python 的 socket 对象,才能进行后续操作(如 setsockoptclose 等)。

  4. 错误处理:每次回调都应检查 errorCode,非 0 时表示发生了错误,应当关闭对应的连接。

  5. 数据边界:TCP 是流式协议,onRead 回调中收到的数据可能不完整。例子中使用 b"\r\n\r\n" 判断 HTTP 请求头是否接收完整,实际开发中需要根据具体协议处理粘包/拆包问题。

  6. 与 HTTP 库的关系:如果只是需要对外发送 HTTP 请求,建议使用引擎内置的 KBEngine.urlopen() 异步 HTTP 请求,无需自己封装 socket。文件描述符注册接口更适合需要接收外部主动推送的场景(如第三方平台的回调通知)。