Skip to main content
retry adds per-attempt timeout, retry/backoff, and optional semaphore-based concurrency control around async functions (including bus handlers).

Signature

def retry(
    retry_after: float = 0,
    max_attempts: int = 1,
    timeout: float | None = None,
    retry_on_errors: list[type[Exception] | re.Pattern[str]] | tuple[type[Exception] | re.Pattern[str], ...] | None = None,
    retry_backoff_factor: float = 1.0,
    semaphore_limit: int | None = None,
    semaphore_name: str | Callable[..., str] | None = None,
    semaphore_lax: bool = True,
    semaphore_scope: Literal['multiprocess', 'global', 'class', 'instance'] = 'global',
    semaphore_timeout: float | None = None,
) -> Callable[[Callable[P, Coroutine[Any, Any, T]]], Callable[P, Coroutine[Any, Any, T]]]

Options

OptionDescription
max_attemptsTotal attempts including the first call (1 disables retries).
retry_afterBase delay between retries, in seconds.
retry_backoff_factorDelay multiplier applied after each failed attempt.
retry_on_errorsOptional matcher list to restrict which errors are retried.
timeoutPer-attempt timeout in seconds (None/undefined means no per-attempt timeout).
semaphore_limitMax concurrent executions sharing the same semaphore.
semaphore_nameSemaphore key (string or function deriving a key from call args).
semaphore_scopeSemaphore sharing scope (global, class, instance; Python also supports multiprocess).
semaphore_timeoutMax wait time for semaphore acquisition before timeout/lax fallback.
semaphore_laxIf true, continue execution without semaphore limit when acquisition times out.

Example: Inline wrapper

from bubus import EventBus, BaseEvent
from bubus.retry import retry

class FetchEvent(BaseEvent[dict]):
    url: str

bus = EventBus('AppBus')

async def fetch_with_retry(event: FetchEvent) -> dict:
    return await fetch_json(event.url)

bus.on(
    FetchEvent,
    retry(max_attempts=3, retry_after=1, timeout=5)(fetch_with_retry),
)

Example: Decorated class method

from bubus.retry import retry

class ApiService:
    @retry(max_attempts=4, retry_after=1, timeout=10, semaphore_limit=2, semaphore_scope='class')
    async def get_user(self, user_id: str) -> dict:
        return await call_remote_api(user_id)

Behavior

  • Semaphore acquisition happens once per call, then all retry attempts run within that acquired slot.
  • Backoff delay per retry is: retry_after * retry_backoff_factor^(attempt - 1).
  • Retries stop immediately when the thrown error does not match retry_on_errors.
  • Bus/event timeouts act as outer execution budgets; retry.timeout is per-attempt.

Runtime differences

  • Python supports semaphore scope multiprocess in addition to global, class, and instance.
  • TypeScript supports global, class, and instance, and uses async-context re-entrancy tracking in Node/Bun to avoid same-semaphore nested deadlocks.
  • retry_on_errors matching differs slightly:
    • Python: exception classes or compiled regex patterns (matched against "ErrorClass: message").
    • TypeScript: error constructors, error-name strings, or regex patterns.