Skip to main content
Both runtimes expose two related (but different) runtime stores:
  • pending_event_queue: events accepted by the bus but not yet started by the runloop
  • event_history: events the bus knows about (pending, started, and completed until trimmed)
If you were looking for pending_events_queue, the runtime field is pending_event_queue in both Python and TypeScript.

What each store is for

StorePurposeTypical contents
pending_event_queueScheduling bufferevents waiting their turn to start
event_historyObservability + lookuprecent pending/started/completed events, bounded by history settings
The key difference: queue is “what still needs to start”, history is “what this bus has seen”.

Retention config options

OptionMeaning
max_history_sizeMax number of events retained in event_history (null/None means unbounded, 0 means keep only in-flight visibility).
max_history_dropIf true, accept new events and trim oldest history entries when over limit. If false, reject new events at the limit (for max_history_size > 0).

Event lifecycle: queue -> history -> trim

  1. Emit:
    • Event is accepted.
    • Event is added to event_history.
    • Event is enqueued into pending_event_queue.
  2. Runloop begins processing:
    • Event is removed from pending_event_queue.
    • Event stays in event_history while handlers run.
  3. Completion:
    • Event is marked completed.
    • Event may remain in event_history or be dropped based on retention settings.
  4. Trimming:
    • max_history_size and max_history_drop determine whether old history is removed or new emits are rejected.

Trimming behavior by mode

  • max_history_size = None/null: no automatic history limit.
  • max_history_size = 0: completed events are removed immediately; only pending/in-flight visibility remains.
  • max_history_size > 0 and max_history_drop = false: bus rejects new emits once history reaches the limit.
  • max_history_size > 0 and max_history_drop = true: bus trims oldest history entries (prefers completed first; can drop uncompleted entries under extreme pressure).
Both runtimes follow this policy. Internally, trim timing is implementation-specific (eager vs amortized cleanup), but externally the semantics above are the contract to rely on.

Common configurations

from bubus import EventBus

bounded_drop = EventBus(max_history_size=100, max_history_drop=True)
bounded_reject = EventBus(max_history_size=100, max_history_drop=False)
unbounded = EventBus(max_history_size=None)
in_flight_only = EventBus(max_history_size=0)

Inspecting queue vs history at runtime

event = bus.emit(MyEvent())
pending_count = bus.pending_event_queue.qsize() if bus.pending_event_queue else 0
history_count = len(bus.event_history)
print('pending_event_queue=', pending_count, 'event_history=', history_count)
# pending_event_queue= 1 event_history= 1

await event
pending_after = bus.pending_event_queue.qsize() if bus.pending_event_queue else 0
history_after = len(bus.event_history)
print('after completion -> pending_event_queue=', pending_after, 'event_history=', history_after)
# after completion -> pending_event_queue= 0 event_history= 1