最新下载
热门教程
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
Python长连接服务中asyncio.Queue内存泄漏问题如何解决
时间:2026-05-28 09:10:01 编辑:袖梨 来源:一聚教程网
在使用Python asyncio构建长连接服务时,内存泄漏问题常常困扰开发者。本文将深入分析问题根源,并提供一套行之有效的解决方案。
问题背景
在基于 asyncio 的长连接服务(如 SSE 推送、WebSocket、实时仪表盘)中,事件总线是最常见的基础设施之一。其核心运作模式可以概括为:
# 订阅者注册一个 asyncio.Queue
queue = event_bus.subscribe("task:progress")# 发布者往所有匹配的 Queue 里 put_nowait
event_bus.emit(Event("task:progress", data={"percent": 80}))
实际生产环境中,订阅者的生命周期管理往往面临诸多挑战。前端断连、浏览器关闭、SSE连接超时或客户端忘记调用unsubscribe等情况,都会导致订阅队列成为"孤儿队列"。这些队列仍然保留在订阅列表中,emit()操作会持续向其塞入数据,最终导致内存持续增长直至OOM。
这就是长连接服务中典型的隐形内存泄漏问题:并非代码逻辑错误,而是缺乏对异常断开情况的兜底处理机制。
常见错误方案分析
方案 A:依赖客户端主动 unsubscribe
# SSE 端点
async def sse_endpoint(request):
queue = event_bus.subscribe("task:progress")
try:
while True:
event = await queue.get()
yield sse_format(event)
finally:
event_bus.unsubscribe(queue)
该方案存在明显缺陷:finally仅在协程正常退出时执行。当客户端直接断开TCP连接时,服务端的queue.get()可能仍处于await状态,协程不会立即取消。即便框架执行清理,也存在时间窗口问题——连接断开与协程取消之间的短暂时间内,emit()可能已经向失效队列塞入了多个事件。
方案 B:给 Queue 设 maxlen
queue = asyncio.Queue(maxsize=100)
此方案的限制在于:maxsize仅控制put操作的阻塞行为,而非内存使用。当使用put_nowait时,超出限制会直接抛出QueueFull异常,开发者不得不选择丢弃事件或让emit操作失败。
方案 C:弱引用 WeakRef
self._subscribers.append(weakref.ref(queue))
该方案存在逻辑矛盾:asyncio.Queue一旦失去强引用就会立即被GC回收,但队列本身需要由EventBus持有引用。弱引用在此场景下无法满足需求。
高效解决方案
核心思路是让队列主动报告存活状态,同时系统定期检查闲置队列。
关键实现步骤
第一步:包装 Queue,记录最后活跃时间
class _TrackedQueue(asyncio.Queue[Event]):
"""带追踪信息的 Queue,记录最后消费时间和订阅参数""" __slots__ = ("last_access", "_sub_event_type", "_sub_task_id") def __init__(
self,
event_type: str | None = None,
task_id: uuid.UUID | None = None,
) -> None:
super().__init__()
self.last_access: float = time.monotonic()
self._sub_event_type = event_type
self._sub_task_id = task_id def touch(self) -> None:
self.last_access = time.monotonic()
设计亮点:
设计点说明继承 asyncio.Queue对消费方完全透明,await queue.get() 无需任何改动__slots__避免每个实例创建 __dict__,大量 Queue 场景下节省内存last_access 用 monotonic不受系统时间回调影响,比 time.time() 更安全保存订阅参数清理时需要知道"这个 Queue 挂在哪个列表里",否则要从所有列表里线性搜索
第二步:全局注册表 + 统一清理入口
class EventBus:
def __init__(self) -> None:
self._subscribers: dict[str, list[_TrackedQueue]] = defaultdict(list)
self._global_subscribers: list[_TrackedQueue] = []
self._task_subscribers: dict[uuid.UUID, list[_TrackedQueue]] = defaultdict(list)
self._all_queues: set[_TrackedQueue] =
相关文章
- POKI.免费游戏免下载畅玩-POKI.免费游戏全端同步 05-28
- laravel 实现阿里云oss文件上传功能的示例 05-28
- 删除PHP数组中的重复元素的实现代码 05-28
- DBeaver怎样连接HBase数据库 05-28
- ubuntu下securecrt脚本录制方法详解 05-28
- 《梦幻西游》挤线器推荐-光速挤线器稳定高效 05-28