Skip to main content
Using the default options out-of-the-box, all events and handlers on a bus process in strict serial order to make execution order predictable and consistency easy. This is the default behavior because:
  • event_concurrency='bus-serial'
  • event_handler_concurrency='serial'
  • event_handler_completion='all'
On a single bus, that means event N+1 never starts before event N is complete, even if event N+1 handlers are “faster”. As you scale, you can tune these guarantees. See Concurrency Control in the sidebar for all modes and tradeoffs.

Variable handler runtimes still stay FIFO

import asyncio
from bubus import BaseEvent, EventBus

class JobEvent(BaseEvent):
    order: int
    delay_s: float

bus = EventBus('FifoBus')
started_order: list[int] = []
completed_order: list[int] = []

async def on_job(event: JobEvent) -> None:
    started_order.append(event.order)
    await asyncio.sleep(event.delay_s)
    completed_order.append(event.order)

bus.on(JobEvent, on_job)

emitted = [
    bus.emit(JobEvent(order=0, delay_s=0.030)),
    bus.emit(JobEvent(order=1, delay_s=0.001)),
    bus.emit(JobEvent(order=2, delay_s=0.020)),
]

await bus.wait_until_idle()

print(started_order)
# [0, 1, 2]
print(completed_order)
# [0, 1, 2]
print([event.event_started_at is not None for event in emitted])
# [True, True, True]
print([event.event_completed_at is not None for event in emitted])
# [True, True, True]
print(emitted[0].event_started_at <= emitted[1].event_started_at <= emitted[2].event_started_at)
# True
print(emitted[0].event_completed_at <= emitted[1].event_completed_at <= emitted[2].event_completed_at)
# True

Ambiguous case: slow then fast still runs serially

Even if you emit a slow event and then a fast event right after, the fast one does not overtake on the same bus under defaults.
import asyncio
from bubus import BaseEvent, EventBus

class SlowEvent(BaseEvent):
    name: str

class FastEvent(BaseEvent):
    name: str

bus = EventBus('FifoBus')
trace: list[str] = []

async def on_slow(event: SlowEvent) -> None:
    trace.append(f'start:{event.event_type}:{event.name}')
    await asyncio.sleep(0.040)
    trace.append(f'end:{event.event_type}:{event.name}')

async def on_fast(event: FastEvent) -> None:
    trace.append(f'start:{event.event_type}:{event.name}')
    await asyncio.sleep(0.001)
    trace.append(f'end:{event.event_type}:{event.name}')

bus.on(SlowEvent, on_slow)
bus.on(FastEvent, on_fast)

slow = bus.emit(SlowEvent(name='slow-a'))
fast = bus.emit(FastEvent(name='fast-b'))
await bus.wait_until_idle()

print(trace)
# ['start:SlowEvent:slow-a', 'end:SlowEvent:slow-a', 'start:FastEvent:fast-b', 'end:FastEvent:fast-b']
print(slow.event_completed_at <= fast.event_started_at)
# True
tree_lines = [
    line for line in bus.log_tree().splitlines()
    if 'SlowEvent#' in line or 'FastEvent#' in line
]
print(tree_lines)
# ['├── SlowEvent#6aa1 [14:09:10.120 (0.040s)]', '└── FastEvent#6aa2 [14:09:10.161 (0.001s)]']

Important exception: awaited child events

Inside a running handler, if you emit and await a child event, that child can queue-jump for RPC-style behavior. This is the intentional exception to plain FIFO queue order. See Immediate Execution (RPC-style) for exact behavior and mode interactions.