Skip to main content
[!WARNING] EventBusMiddleware is a Python-only feature. The TypeScript package does not currently expose middleware hooks.
EventBusMiddleware is the base class for custom middleware.

Constructor params

None.

Setup with EventBus

from bubus import EventBus
from bubus.middlewares import EventBusMiddleware

class AnalyticsMiddleware(EventBusMiddleware):
    async def on_event_result_change(self, eventbus, event, event_result, status):
        if status == 'completed':
            print(event.event_type, event_result.handler_name)
            # SomeEvent on_some_event

bus = EventBus('AppBus', middlewares=[AnalyticsMiddleware()])

Behavior

  • on_event_change(eventbus, event, status) runs on event lifecycle transitions.
  • on_event_result_change(eventbus, event, event_result, status) runs on handler result transitions.
  • on_handler_change(eventbus, handler, registered) runs when handlers are added/removed.
  • Override only the hooks you need.