Skip to main content
You can forward events across multiple buses while preserving event path metadata and loop safety. Repository example files:

Why multiple buses are useful

Multiple buses let you separate concerns and tune runtime behavior per boundary:
  • service-local bus for business logic with strict ordering and useful history
  • transport/relay bus focused on throughput and forwarding (little or no history retention)
  • specialized buses for domains that need different timeout or concurrency policies
This is especially useful in microservice-style designs, where each component has different consistency and observability needs.

Example: service buses with different policies

In this example:
  • AuthBus is strict and debuggable: event_concurrency='bus-serial', event_handler_concurrency='serial', max_history_size=100
  • RelayBus is a transport forwarder: event_concurrency='parallel', max_history_size=0
  • BillingBus is another service bus with its own settings
from bubus import BaseEvent, EventBus

class UserCreatedEvent(BaseEvent[str]):
    user_id: str

class AuthService:
    def __init__(self) -> None:
        self.bus = EventBus(
            'AuthBus',
            event_concurrency='bus-serial',
            event_handler_concurrency='serial',
            max_history_size=100,
        )
        self.bus.on(UserCreatedEvent, self.on_user_created)

    async def on_user_created(self, event: UserCreatedEvent) -> str:
        return f'auth-ok:{event.user_id}'

class RelayService:
    def __init__(self) -> None:
        self.bus = EventBus(
            'RelayBus',
            event_concurrency='parallel',
            max_history_size=0,
        )

class BillingService:
    def __init__(self) -> None:
        self.bus = EventBus(
            'BillingBus',
            event_concurrency='bus-serial',
            event_handler_concurrency='serial',
            max_history_size=100,
        )
        self.bus.on(UserCreatedEvent, self.on_user_created)

    async def on_user_created(self, event: UserCreatedEvent) -> str:
        return f'billing-ok:{event.user_id}'

auth = AuthService()
relay = RelayService()
billing = BillingService()

auth.bus.on('*', relay.bus.emit)
relay.bus.on('*', billing.bus.emit)

result = await auth.bus.emit(UserCreatedEvent(user_id='u-a8d1')).event_result()
print(result)
# 'auth-ok:u-a8d1'

root = auth.bus.emit(UserCreatedEvent(user_id='u-a8d1'))
await root
print(root.event_path)
# ['AuthBus#a8d1', 'RelayBus#3f2c', 'BillingBus#b91e']

Uni-directional and bi-directional forwarding

Forwarding can be one-way or two-way depending on your topology.
  • Uni-directional: one producer bus forwards to one consumer bus.
  • Bi-directional: both buses forward to each other (common for peer sync).
left = EventBus('LeftBus')
right = EventBus('RightBus')

# uni-directional
left.on('*', right.emit)

# bi-directional (add reverse path)
right.on('*', left.emit)
Loop prevention still applies in both modes: if an event already visited a bus (tracked in event_path), forwarding back to that bus is a no-op and it is not re-processed there.

How loop prevention works (event_path)

Loop prevention is automatic and based on event_path:
  1. Each bus appends its own label (for example AuthBus#a8d1) to event_path when it first sees an event.
  2. When a forwarding handler points to another bus, that bus checks whether its label is already in event_path.
  3. If yes, forwarding to that bus is skipped (no-op), so cycles terminate naturally.
This means you can wire cyclic topologies without infinite forwarding loops.
from bubus import BaseEvent, EventBus

class PingEvent(BaseEvent):
    message: str

bus_a = EventBus('BusA')
bus_b = EventBus('BusB')
bus_c = EventBus('BusC')

# cycle: A -> B -> C -> A
bus_a.on('*', bus_b.emit)
bus_b.on('*', bus_c.emit)
bus_c.on('*', bus_a.emit)

event = bus_a.emit(PingEvent(message='hello'))
await event

print(event.event_path)
# ['BusA#a8d1', 'BusB#3f2c', 'BusC#b91e']

Parent-child tracking across forwarded flows

Parent-child tracking also works across forwarded flows:
  • if a forwarded event is handled on a downstream bus and that handler emits a child event, the child still links back to the parent via event_parent_id
  • nested descendants emitted on downstream buses keep that lineage as they continue through forwarding
  • this remains true for both queue-jumped children (await child) and normally queued children (emitted but not immediately awaited)
See Parent-Child Tracking for a deeper walkthrough and tree-log example. See Immediate Execution (RPC-style) for queue-jump execution behavior.

Bridges are forwarding with transport

Bridges are fundamentally the same forwarding pattern, but with serialization + remote transport in the middle. See Bridges Overview for HTTP/Redis/NATS/Postgres/socket/file-backed bridge options and setup patterns.