Skip to main content
If you like events in theory but hate the day-to-day developer experience, this page is for you. Common pain points:
  • calling boilerplate (emit + await completion + unwrap result) for every request
  • eventual consistency anxiety (“will my response event arrive?”)
  • duplicating signatures across schemas, handlers, and implementation functions
The goal here is to keep event architecture benefits without forcing painful calling patterns.

1) Pain: painful calling interface boilerplate

You usually end up writing verbose call sites repeatedly. events_suck.wrap(...) gives you a method-shaped client API (client.create(...)) while still routing through events.
# Without wrap: valid, but noisy at every call site
event = bus.emit(CreateUserEvent(name='bob', age=45))
user_id = await event.event_result()

# With wrap: looks like normal async function calls
SDKClient = events_suck.wrap('SDKClient', {'create': CreateUserEvent, 'update': UpdateUserEvent})
client = SDKClient(bus=bus)
user_id = await client.create(name='bob', age=45, nickname='bobby')
updated = await client.update(id=user_id, age=46, source='sync')

Minimal end-to-end wrap(...) wiring

from bubus import BaseEvent, EventBus, events_suck

class CreateUserEvent(BaseEvent[str]):
    name: str
    age: int

class UpdateUserEvent(BaseEvent[bool]):
    id: str
    age: int | None = None

class UserService:
    def __init__(self) -> None:
        self.users: dict[str, dict[str, int | str]] = {}

    async def on_create(self, event: CreateUserEvent) -> str:
        user_id = f'user-{event.age}'
        self.users[user_id] = {'id': user_id, 'name': event.name, 'age': event.age}
        return user_id

    async def on_update(self, event: UpdateUserEvent) -> bool:
        if event.id not in self.users:
            return False
        if event.age is not None:
            self.users[event.id]['age'] = event.age
        return True

bus = EventBus('SDKBus')
service = UserService()

bus.on(CreateUserEvent, service.on_create)
bus.on(UpdateUserEvent, service.on_update)

SDKClient = events_suck.wrap('SDKClient', {
    'create': CreateUserEvent,
    'update': UpdateUserEvent,
})
client = SDKClient(bus=bus)

user_id = await client.create(name='bob', age=45, nickname='bobby')
updated = await client.update(id=user_id, age=46, source='sync')
Related docs:

2) Pain: eventual consistency headaches

If your mental model is “I called something, I need a result now,” pure fire-and-forget event flows can feel stressful. Two patterns reduce that stress:
  • request/response on one bus with direct return values (event_result / first())
  • immediate execution for nested calls inside handlers (RPC-style queue-jump)
These patterns feel function-like for in-process flows. If you later move a step across process/network boundaries (bridges), treat that edge as eventually consistent again. Immediate execution docs: Immediate Execution (RPC-style)

Nested request/response with immediate execution

class CheckoutEvent(BaseEvent[str]):
    order_id: str

class ChargeCardEvent(BaseEvent[str]):
    order_id: str

async def on_checkout(event: CheckoutEvent) -> str:
    child = event.bus.emit(ChargeCardEvent(order_id=event.order_id))
    await child                       # immediate path while inside handler
    receipt_id = await child.event_result()
    return receipt_id

async def on_charge(event: ChargeCardEvent) -> str:
    return f'receipt-{event.order_id}'
Related docs:

3) Pain: defining signatures multiple times

You can keep one source of truth for payload shapes and reuse it in implementation code.

Python: @validate_call + make_events(...) + make_handler(...)

Use implementation function signatures as the source of truth, then generate event classes from them.
from bubus import EventBus, events_suck
from pydantic import validate_call

@validate_call
def create_user(id: str | None, name: str, age: int) -> str:
    return f'{name}-{age}'

@validate_call
def update_user(id: str, age: int | None = None, **extra) -> bool:
    return True

events = events_suck.make_events({
    'UserCreateEvent': create_user,
    'UserUpdateEvent': update_user,
})

bus = EventBus('LegacyBus')
bus.on(events.UserCreateEvent, events_suck.make_handler(create_user))
bus.on(events.UserUpdateEvent, events_suck.make_handler(update_user))

UserClient = events_suck.wrap('UserClient', {'create': events.UserCreateEvent, 'update': events.UserUpdateEvent})
client = UserClient(bus=bus)

TypeScript: zod schema + z.infer shared with implementation

Keep the schema as the source of truth, infer implementation input types from it, and reuse the same shape in BaseEvent.extend(...).
import { BaseEvent, EventBus, events_suck } from 'bubus'
import { z } from 'zod'

const CreateUserInputSchema = z.object({
  id: z.string().nullable().optional(),
  name: z.string(),
  age: z.number(),
})
type CreateUserInput = z.infer<typeof CreateUserInputSchema>

const UserCreateEvent = BaseEvent.extend('UserCreateEvent', {
  ...CreateUserInputSchema.shape,
  event_result_type: z.string(),
})

const bus = new EventBus('LegacyBus')
const create_user = async (input: CreateUserInput): Promise<string> => `${input.name}-${input.age}`

bus.on(UserCreateEvent, ({ id, name, age }) => create_user({ id, name, age }))

const UserClient = events_suck.wrap('UserClient', {
  create: UserCreateEvent,
})

const client = new UserClient(bus)
const id = await client.create({ id: null, name: 'bob', age: 45 })
Related docs:

Migration playbook

  1. Start with wrap(...) to clean up call-site boilerplate first.
  2. Use immediate execution patterns where you need function-call-like request/response behavior.
  3. Consolidate types with @validate_call (Python) or z.infer (TypeScript) to avoid signature drift.
  4. Add timeouts/retry policies where needed, instead of forcing eventual-consistency semantics everywhere.
You do not need to choose between clean DX and events. You can keep method-shaped APIs and adopt event internals incrementally.