Skip to main content
Backpressure in bubus is history-policy based, not queue-capacity based.
  • emit() enqueues synchronously and returns immediately.
  • Pending queues are unbounded in both runtimes.
  • Overload behavior is controlled by max_history_size + max_history_drop.

1) If I emit 1,000,000 events, will errors be raised?

Error conditions

RuntimeConditionWhat is raised
Pythonemit() called with no running event loopRuntimeError (emit() called but no event loop is running)
Pythonmax_history_size > 0, max_history_drop=False, and history already at limitRuntimeError (history limit reached)
TypeScriptemit() with max_history_size > 0, max_history_drop=false, and history already at limitError (message contains history limit reached)
BothProcess runs out of memory under extreme loadRuntime/VM OOM failure (not a bus-specific exception type)
In normal operation, queue-capacity errors are not the backpressure mechanism. max_history_size=0 is a special case in both runtimes: it does not trigger history-limit rejection, and instead keeps only in-flight visibility. With max_history_drop=true, emit() does not reject on history size. Under sustained overload, old uncompleted entries can be dropped and a warning is logged.

Reject vs drop behavior

from bubus import EventBus

# Reject new emits once history reaches N
reject_bus = EventBus(max_history_size=10_000, max_history_drop=False)

# Never reject on history size; trim oldest history entries instead
drop_bus = EventBus(max_history_size=10_000, max_history_drop=True)

2) If 1,000,000 events complete, how many are kept?

Let N = max_history_size.
SettingEvents retained after bus becomes idleNotes
N = None / nullAll completed events (so up to 1,000,000)History is unbounded.
N > 0, max_history_drop = falseUp to NNew emits are rejected once history reaches N.
N > 0, max_history_drop = trueBounded to N at steady stateOldest history entries are removed first.
N = 00 completed events retainedOnly pending/in-flight visibility is kept; completed entries are dropped.
Python nuance: in heavy bursts with max_history_drop=True, cleanup is amortized, so history can temporarily exceed N before converging back to <= N. For the broader retention model, see Event History Store.

3) How RAM usage scales

At a high level, memory grows with:
  • pending queue depth,
  • retained history size,
  • per-event handler/result payload size.
A practical model is: RAM ~= O(pending_event_queue) + O(event_history) + O(event_results and payloads)

Measured slopes from perf suites

  • Python README matrix reports scenario-dependent peak RSS slopes between about 0.025kb/event and 8.024kb/event.
  • TypeScript README matrix reports scenario/runtime-dependent peak RSS slopes between about 0.1kb/event and 7.9kb/event.
  • TypeScript README notes those kb/event values are measured during active processing with history aggressively bounded (max_history_size=1 in perf harnesses).
Use those numbers as throughput-era slope indicators, not exact long-term retention multipliers for your payloads. Operationally:
  • bounded history (N finite) keeps steady-state memory bounded by queue depth + N,
  • unbounded history (N=None/null) makes retained RAM grow roughly linearly with total completed events.

4) Queue vs history lifecycle (exact behavior)

Events do not “move from queue to history.” They are added to history at emit() time, and can exist in both structures while pending.

Python timeline (emit)

  1. Validate pressure policy.
  2. Enqueue into pending_event_queue.
  3. Add same event object to event_history.
  4. Runloop dequeues event (queue.get()), then executes handlers.
  5. Event remains in event_history as pending -> started -> completed unless trimmed/removed by history policy.

TypeScript timeline (emit)

  1. Validate pressure policy.
  2. Add event to event_history.
  3. Apply trimHistory().
  4. Push event into pending_event_queue.
  5. Runloop shifts from queue and executes handlers.
  6. Event remains in event_history unless trimmed/removed by policy.
So yes:
  • an event can be in both pending_event_queue and event_history at the same time,
  • event_history can contain pending events (not only started/completed events).

Observe both structures directly

event = bus.emit(MyEvent())
pending_count = bus.pending_event_queue.qsize() if bus.pending_event_queue else 0
history_count = len(bus.event_history)
print('pending_event_queue=', pending_count, 'event_history=', history_count)
# pending_event_queue= 1 event_history= 1