Skip to main content

bubus: Production-ready multi-language event bus

bubus logo DeepWiki: Python PyPI - Version NPM Version GitHub License Bubus is an in-memory event bus for async Python and TypeScript (Node and browser environments), built for predictable event-driven workflows with strong typing and consistent cross-language behavior. Core strengths:
  • Typed event payloads and typed handler return values
  • Deterministic queue semantics with configurable concurrency
  • Nested event lineage tracking (event_parent_id / event_path)
  • Event forwarding, bridge transports, and middleware integration

Minimal usage

from bubus import BaseEvent, EventBus

class SomeEvent(BaseEvent):
    some_data: int

async def on_some_event(event: SomeEvent) -> None:
    print(event.some_data)
    # 132

bus = EventBus('MyBus')
bus.on(SomeEvent, on_some_event)

await bus.emit(SomeEvent(some_data=132))
See Quickstart for installation and first full example.

Repository examples

Runnable end-to-end examples (Python + TypeScript) live in the repo: