Skip to main content
EventBus is the central runtime for handler registration, event emit, history lookup, and lifecycle control.

EventBus(...)

EventBus(
    name: str | None = None,
    event_concurrency: Literal['global-serial', 'bus-serial', 'parallel'] | str | None = None,
    event_handler_concurrency: Literal['serial', 'parallel'] | str = 'serial',
    event_handler_completion: Literal['all', 'first'] | str = 'all',
    max_history_size: int | None = 50,
    max_history_drop: bool = False,
    event_timeout: float | None = 60.0,
    event_slow_timeout: float | None = 300.0,
    event_handler_slow_timeout: float | None = 30.0,
    event_handler_detect_file_paths: bool = True,
    middlewares: Sequence[EventBusMiddleware] | None = None,
)

Shared configuration semantics

OptionDescription
nameHuman-readable bus name used in logs/labels.
event_concurrencyEvent scheduling policy across queue processing (global-serial, bus-serial, parallel).
event_handler_concurrencyHow handlers for one event execute (serial vs parallel).
event_handler_completionCompletion mode (all waits for all handlers, first resolves on first successful result).
event_timeoutDefault outer timeout budget for event/handler execution.
event_slow_timeoutSlow-event warning threshold.
event_handler_slow_timeoutSlow-handler warning threshold.
event_handler_detect_file_pathsWhether to capture source path metadata for handlers.
max_history_sizeMaximum retained history (null = unbounded, 0 = keep only in-flight).
max_history_dropIf true, drop oldest history entries when full; if false, reject new emits at limit.

Runtime state

Both implementations expose equivalent runtime state:
  • Bus identity: id, name, label
  • Registered handlers and indexes
  • Event history and pending queue
  • In-flight tracking
  • Locking/concurrency runtime objects

on(...)

Registers a handler for an event key (EventClass, event type string, or '*').
bus.on(UserEvent, handler)
bus.on('UserEvent', handler)
bus.on('*', wildcard_handler)

off(...)

Unregisters handlers by event key, handler function/reference, or handler id.
bus.off(UserEvent, handler)
bus.off(UserEvent)   # remove all handlers for UserEvent
bus.off('*')         # remove all wildcard handlers

emit(...)

emit(...) enqueues synchronously and returns the pending event immediately.
event = bus.emit(MyEvent(data='x'))
result = await event.event_result()

find(...)

find(...) supports history lookup, optional future waiting, predicate filtering, and parent/child scoping.
event = await bus.find(ResponseEvent)  # history lookup by default
future = await bus.find(ResponseEvent, past=False, future=5)
child = await bus.find(ChildEvent, child_of=parent_event, future=5)

Lifecycle helpers

Wait for idle

await bus.wait_until_idle()
await bus.wait_until_idle(timeout=5)

Parent/child relationship checks

bus.event_is_child_of(child_event, parent_event)
bus.event_is_parent_of(parent_event, child_event)

Serialization and teardown

await bus.stop(timeout=1.0)
await bus.stop(clear=True)

Timeout and precedence

Shared precedence model:
  1. Handler override
  2. Event override
  3. Bus default
Effective handler timeout is capped by event timeout when both are set.