事件总线(Event Bus)指南

February 2, 2026 · View on GitHub

事件总线是 Coral 框架的核心组件,负责处理事件的发布、订阅和分发。它采用异步设计,支持中间件和批处理,为插件提供了灵活的事件处理机制。

基本使用

导入事件总线

from Coral import event_bus

订阅事件

from Coral.protocol import MessageEvent

# 订阅消息事件
@event_bus.subscribe(MessageEvent)
async def handle_message(event: MessageEvent):
    """处理消息事件"""
    if "你好" in event.message.to_plain_text():
        return "你好!我是Coral机器人"

发布事件

from Coral.protocol import MessageEvent, UserInfo, MessageChain, MessageSegment

# 创建事件
event = MessageEvent(
    event_id="msg_123",
    platform="qq",
    self_id="1000000",
    message=MessageChain([MessageSegment.text("你好")]),
    user=UserInfo(platform="qq", user_id="123456789")
)

# 发布事件
await event_bus.publish(event)

事件类型

Coral 支持多种事件类型:

1. 消息事件(MessageEvent)

from Coral.protocol import MessageEvent

@event_bus.subscribe(MessageEvent)
async def handle_message(event: MessageEvent):
    # 检查是否为私聊
    if event.is_private():
        return "这是私聊消息"
    
    # 检查是否被@
    if event.to_me():
        return "您提到了我"
    
    # 获取消息文本
    text = event.message.to_plain_text()
    return f"收到消息:{text}"

2. 通知事件(NoticeEvent)

from Coral.protocol import NoticeEvent

@event_bus.subscribe(NoticeEvent)
async def handle_notice(event: NoticeEvent):
    if event.type == "group_increase":
        return f"欢迎新成员 {event.user.nickname} 加入群聊!"
    elif event.type == "friend_add":
        return f"已添加新好友:{event.user.nickname}"

3. 命令事件(CommandEvent)

from Coral.protocol import CommandEvent

@event_bus.subscribe(CommandEvent)
async def handle_command(event: CommandEvent):
    if event.command == "weather":
        city = event.args[0] if event.args else "北京"
        return f"正在查询{city}的天气..."

4. 自定义事件

from Coral.protocol import GenericEvent
from dataclasses import dataclass

# 定义自定义事件
@dataclass
class CustomEvent:
    platform: str
    name: str
    data: dict

# 订阅自定义事件
@event_bus.subscribe(CustomEvent)
async def handle_custom(event: CustomEvent):
    return f"处理自定义事件:{event.name},数据:{event.data}"

中间件系统

事件总线支持中间件,可以在事件处理前后执行自定义逻辑:

创建中间件

async def logging_middleware(event):
    """日志记录中间件"""
    import logging
    logger = logging.getLogger(__name__)
    
    logger.info(f"处理事件:{type(event).__name__}")
    
    # 修改事件(可选)
    if hasattr(event, 'platform'):
        event.platform = event.platform.upper()
    
    # 返回事件继续处理,返回None则终止
    return event

async def auth_middleware(event):
    """权限检查中间件"""
    from Coral import perm_system
    
    # 检查用户权限
    if hasattr(event, 'user'):
        has_perm = perm_system.check_perm(
            ["chat.allow"], 
            event.user.user_id,
            event.group.group_id if hasattr(event, 'group') and event.group else None
        )
        
        if not has_perm:
            logger.warning(f"用户 {event.user.user_id} 无权限")
            return None  # 终止事件处理
    
    return event

注册中间件

# 添加中间件到事件总线
event_bus.add_middleware(logging_middleware)
event_bus.add_middleware(auth_middleware)

# 中间件按添加顺序执行

优先级系统

事件订阅支持优先级控制,数字越大优先级越高:

# 高优先级处理器(先执行)
@event_bus.subscribe(MessageEvent, priority=10)
async def high_priority_handler(event: MessageEvent):
    if "紧急" in event.message.to_plain_text():
        return "紧急消息已处理"
    return None  # 返回None让其他处理器继续处理

# 低优先级处理器(后执行)
@event_bus.subscribe(MessageEvent, priority=1)
async def low_priority_handler(event: MessageEvent):
    return "这是默认回复"

结果处理

返回类型支持

事件处理器可以返回多种类型的结果:

@event_bus.subscribe(MessageEvent)
async def handle_event(event: MessageEvent):
    # 1. 返回字符串(自动转换为MessageRequest)
    return "Hello, World!"
    
    # 2. 返回MessageRequest对象
    from Coral.protocol import MessageRequest, MessageChain, MessageSegment
    return MessageRequest(
        platform=event.platform,
        event_id=event.event_id,
        self_id=event.self_id,
        message=MessageChain([MessageSegment.text("Hello!")]),
        user=event.user
    )
    
    # 3. 返回列表(多个结果)
    return [
        "第一条消息",
        MessageRequest(...),  # 第二条消息
    ]
    
    # 4. 返回None(不发送回复)
    if event.user.user_id == "admin":
        return None  # 管理员消息不回复

结果队列

事件总线使用异步队列处理结果,避免阻塞:

# 获取队列状态
queue_size = event_bus.get_queue_size()
is_full = event_bus.is_queue_full()

print(f"当前队列大小:{queue_size}")
print(f"队列是否已满:{is_full}")

性能监控

事件总线提供性能指标:

# 获取性能指标
metrics = event_bus.get_metrics()

print(f"已处理事件数:{metrics.total_events_processed}")
print(f"已处理结果数:{metrics.total_results_processed}")
print(f"平均事件处理时间:{metrics.avg_event_processing_time:.4f}s")
print(f"平均结果处理时间:{metrics.avg_result_processing_time:.4f}s")
print(f"最大队列大小:{metrics.max_queue_size}")
print(f"当前队列大小:{metrics.current_queue_size}")
print(f"错误总数:{metrics.total_errors}")

高级用法

批量事件处理

import asyncio
from Coral.protocol import MessageEvent

async def batch_process_events(events: list[MessageEvent]):
    """批量处理事件"""
    tasks = []
    for event in events:
        task = event_bus.publish(event)
        tasks.append(task)
    
    # 并发处理所有事件
    results = await asyncio.gather(*tasks, return_exceptions=True)
    return results

事件过滤

from Coral import filters

# 使用过滤器组合
@event_bus.subscribe(MessageEvent)
async def filtered_handler(event: MessageEvent):
    # 检查消息是否包含关键词
    if not filters.contains("重要")(event):
        return None
    
    # 检查是否来自管理员
    if not filters.from_user(["admin1", "admin2"])(event):
        return None
    
    return "处理重要管理员消息"

错误处理

@event_bus.subscribe(MessageEvent)
async def safe_handler(event: MessageEvent):
    try:
        # 可能出错的代码
        result = some_risky_operation(event)
        return result
    except Exception as e:
        import logging
        logger = logging.getLogger(__name__)
        logger.error(f"事件处理错误:{e}", exc_info=True)
        
        # 返回错误信息或None
        return f"处理出错:{str(e)}"

最佳实践

1. 合理使用优先级

# 安全检查和权限验证使用高优先级
@event_bus.subscribe(MessageEvent, priority=10)
async def security_check(event: MessageEvent):
    if not is_safe_message(event):
        return None  # 不安全消息,终止处理

# 业务逻辑使用中等优先级
@event_bus.subscribe(MessageEvent, priority=5)
async def business_logic(event: MessageEvent):
    return process_business(event)

# 日志记录使用低优先级
@event_bus.subscribe(MessageEvent, priority=1)
async def logging_handler(event: MessageEvent):
    log_message(event)
    return None  # 不返回结果,只记录日志

2. 避免阻塞操作

# 错误示例:同步阻塞操作
@event_bus.subscribe(MessageEvent)
async def bad_handler(event: MessageEvent):
    import time
    time.sleep(5)  # ❌ 同步阻塞,会阻塞整个事件总线
    return "处理完成"

# 正确示例:异步非阻塞操作
@event_bus.subscribe(MessageEvent)
async def good_handler(event: MessageEvent):
    await asyncio.sleep(5)  # ✅ 异步等待,不会阻塞
    return "处理完成"

3. 合理处理返回值

@event_bus.subscribe(MessageEvent)
async def smart_handler(event: MessageEvent):
    # 根据情况决定是否返回结果
    if should_reply(event):
        return "这是回复"
    else:
        return None  # 不回复,让其他处理器处理
    
    # 或者返回多个结果
    if needs_multiple_replies(event):
        return [
            "第一条消息",
            "第二条消息",
            MessageRequest(...)  # 复杂消息
        ]

4. 使用中间件复用逻辑

# 创建可复用的中间件
async def rate_limit_middleware(event):
    """速率限制中间件"""
    user_id = event.user.user_id
    current_time = time.time()
    
    # 检查用户请求频率
    if is_rate_limited(user_id, current_time):
        logger.warning(f"用户 {user_id} 触发速率限制")
        return None  # 终止处理
    
    return event

# 注册到事件总线
event_bus.add_middleware(rate_limit_middleware)

故障排除

常见问题

  1. 事件未触发

    # 检查事件类型是否正确
    print(f"订阅的事件类型:{MessageEvent}")
    print(f"发布的事件类型:{type(event)}")
    
    # 检查事件总线是否已初始化
    # 在插件初始化时调用
    await event_bus.initialize()
    
  2. 结果未发送

    # 检查返回值类型
    # 字符串会自动转换,其他类型需要符合协议
    
    # 检查队列状态
    print(f"队列大小:{event_bus.get_queue_size()}")
    print(f"性能指标:{event_bus.get_metrics()}")
    
  3. 性能问题

    # 监控性能指标
    metrics = event_bus.get_metrics()
    if metrics.avg_event_processing_time > 1.0:
        logger.warning("事件处理时间过长")
    
    if metrics.current_queue_size > 100:
        logger.warning("队列积压严重")
    

调试技巧

import logging

# 启用调试日志
logging.basicConfig(level=logging.DEBUG)

# 添加调试中间件
async def debug_middleware(event):
    print(f"[DEBUG] 处理事件:{type(event).__name__}")
    print(f"[DEBUG] 事件数据:{event}")
    return event

event_bus.add_middleware(debug_middleware)

相关资源


最后更新:2026-01-31
文档版本:v1.0.0