Skip to main content
Middlewares can observe event lifecycle transitions, emit auto events, and persist or trace runtime behavior.

Quick setup

from bubus import EventBus
from bubus.middlewares import (
    WALEventBusMiddleware,
    LoggerEventBusMiddleware,
    SQLiteHistoryMirrorMiddleware,
    OtelTracingMiddleware,
)

bus = EventBus(
    name='MyBus',
    middlewares=[
        SQLiteHistoryMirrorMiddleware('./events.sqlite3'),
        WALEventBusMiddleware('./events.jsonl'),
        LoggerEventBusMiddleware('./events.log'),
        OtelTracingMiddleware(),
    ],
)

Middleware pages