Skip to main content
Install bubus, define one typed event, register a handler, and emit the event. Repository example files:

Install

pip install bubus

First event

import asyncio
from bubus import BaseEvent, EventBus

class CreateUserEvent(BaseEvent[dict]):
    email: str

async def on_create_user(event: CreateUserEvent) -> dict:
    user = await your_create_user_logic(event.email)
    return {'user_id': user['id']}

async def main() -> None:
    bus = EventBus('MyAuthEventBus')
    bus.on(CreateUserEvent, on_create_user)

    result = await bus.emit(CreateUserEvent(email='[email protected]')).event_result()
    print(result)
    # {'user_id': 'some-user-uuid'}

asyncio.run(main())

Next steps