Skip to main content
SQLiteHistoryMirrorMiddleware records event and handler-result snapshots into SQLite for queryable audit history.

Constructor params

  • db_path: SQLite file path

Setup with EventBus

from bubus import EventBus
from bubus.middlewares import SQLiteHistoryMirrorMiddleware

bus = EventBus(
    name='AppBus',
    middlewares=[SQLiteHistoryMirrorMiddleware('./events.sqlite3')],
)

Behavior

  • Records event lifecycle snapshots into events_log.
  • Records handler result snapshots into event_results_log.
  • Stores serialized payload JSON plus key metadata (event id/type, handler id/name, phase/status).
  • Uses WAL mode and thread-safe connection access for concurrent writes.