Python middleware integrations for EventBus lifecycle hooks.
from bubus import EventBus
from bubus.middlewares import (
WALEventBusMiddleware,
LoggerEventBusMiddleware,
SQLiteHistoryMirrorMiddleware,
OtelTracingMiddleware,
)
bus = EventBus(
name='MyBus',
middlewares=[
SQLiteHistoryMirrorMiddleware('./events.sqlite3'),
WALEventBusMiddleware('./events.jsonl'),
LoggerEventBusMiddleware('./events.log'),
OtelTracingMiddleware(),
],
)