retry#

streaming.base.util.retry(exc_class=<class 'Exception'>, clean_up_fn=None, num_attempts=3, initial_backoff=1.0, max_jitter=0.5)[source]#

Decorator to retry a function with backoff and jitter.

Attempts are spaced out with initial_backoff * 2**num_attempts + random.random() * max_jitter seconds.

Example

from streaming.base.util import retry

num_tries = 0

def clean_up():
    # Do clean up stuff here
    print("cleaning up")

@retry(RuntimeError, clean_up_fn=clean_up, num_attempts=3, initial_backoff=0.1)
def flaky_function():
    global num_tries
    if num_tries < 2:
        num_tries += 1
        raise RuntimeError("Called too soon!")
    return "Third time's a charm."

print(flaky_function())
cleaning up
cleaning up
Third time's a charm.
Parameters
  • exc_class (Type[Exception] | Sequence[Type[Exception]]], optional) – The exception class or classes to retry. Defaults to Exception.

  • clean_up_fn (Callable[[], None], optional) – A function to call after each failed attempt before retrying. Defaults to None.

  • num_attempts (int, optional) – The total number of attempts to make. Defaults to 3.

  • initial_backoff (float, optional) – The initial backoff, in seconds. Defaults to 1.0.

  • max_jitter (float, optional) –

    The maximum amount of random jitter to add. Defaults to 0.5.

    Increasing the max_jitter can help prevent overloading a resource when multiple processes in parallel are calling the same underlying function.