Skip to main content
bus-serial enforces one active event per bus, while different buses can process events simultaneously. Companion runnable example:

Lifecycle impact

  1. Events enqueue per bus in FIFO order.
  2. Each bus holds its own event lock.
  3. A busy bus does not block other buses.
  4. Queue-jump child events can preempt that same bus queue when awaited in-handler.

Execution order example

import asyncio
from bubus import BaseEvent, EventBus

class WorkEvent(BaseEvent):
    order: int
    source: str

bus_a = EventBus('BusSerialA', event_concurrency='bus-serial')
bus_b = EventBus('BusSerialB', event_concurrency='bus-serial')

starts_a: list[int] = []
starts_b: list[int] = []
in_flight_global = 0
max_in_flight_global = 0

async def handler_a(event: WorkEvent) -> None:
    global in_flight_global, max_in_flight_global
    in_flight_global += 1
    max_in_flight_global = max(max_in_flight_global, in_flight_global)
    starts_a.append(event.order)
    await asyncio.sleep(0.01)
    in_flight_global -= 1

async def handler_b(event: WorkEvent) -> None:
    global in_flight_global, max_in_flight_global
    in_flight_global += 1
    max_in_flight_global = max(max_in_flight_global, in_flight_global)
    starts_b.append(event.order)
    await asyncio.sleep(0.01)
    in_flight_global -= 1

bus_a.on(WorkEvent, handler_a)
bus_b.on(WorkEvent, handler_b)

for i in range(4):
    bus_a.emit(WorkEvent(order=i, source='a'))
    bus_b.emit(WorkEvent(order=i, source='b'))

await bus_a.wait_until_idle()
await bus_b.wait_until_idle()

assert starts_a == [0, 1, 2, 3]
assert starts_b == [0, 1, 2, 3]
assert max_in_flight_global >= 2

Notes

  • This is typically the best default for multi-bus systems.
  • It preserves local determinism while retaining cross-bus throughput.