File manager - Edit - /home/newsbmcs.com/public_html/static/img/logo/retry.tar
Back
retry_base.py 0000644 00000030221 15030251505 0007247 0 ustar 00 # Copyright 2017 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Shared classes and functions for retrying requests. :class:`_BaseRetry` is the base class for :class:`Retry`, :class:`AsyncRetry`, :class:`StreamingRetry`, and :class:`AsyncStreamingRetry`. """ from __future__ import annotations import logging import random import time from enum import Enum from typing import Any, Callable, Optional, TYPE_CHECKING import requests.exceptions from google.api_core import exceptions from google.auth import exceptions as auth_exceptions if TYPE_CHECKING: import sys if sys.version_info >= (3, 11): from typing import Self else: from typing_extensions import Self _DEFAULT_INITIAL_DELAY = 1.0 # seconds _DEFAULT_MAXIMUM_DELAY = 60.0 # seconds _DEFAULT_DELAY_MULTIPLIER = 2.0 _DEFAULT_DEADLINE = 60.0 * 2.0 # seconds _LOGGER = logging.getLogger("google.api_core.retry") def if_exception_type( *exception_types: type[Exception], ) -> Callable[[Exception], bool]: """Creates a predicate to check if the exception is of a given type. Args: exception_types (Sequence[:func:`type`]): The exception types to check for. Returns: Callable[Exception]: A predicate that returns True if the provided exception is of the given type(s). """ def if_exception_type_predicate(exception: Exception) -> bool: """Bound predicate for checking an exception type.""" return isinstance(exception, exception_types) return if_exception_type_predicate # pylint: disable=invalid-name # Pylint sees this as a constant, but it is also an alias that should be # considered a function. if_transient_error = if_exception_type( exceptions.InternalServerError, exceptions.TooManyRequests, exceptions.ServiceUnavailable, requests.exceptions.ConnectionError, requests.exceptions.ChunkedEncodingError, auth_exceptions.TransportError, ) """A predicate that checks if an exception is a transient API error. The following server errors are considered transient: - :class:`google.api_core.exceptions.InternalServerError` - HTTP 500, gRPC ``INTERNAL(13)`` and its subclasses. - :class:`google.api_core.exceptions.TooManyRequests` - HTTP 429 - :class:`google.api_core.exceptions.ServiceUnavailable` - HTTP 503 - :class:`requests.exceptions.ConnectionError` - :class:`requests.exceptions.ChunkedEncodingError` - The server declared chunked encoding but sent an invalid chunk. - :class:`google.auth.exceptions.TransportError` - Used to indicate an error occurred during an HTTP request. """ # pylint: enable=invalid-name def exponential_sleep_generator( initial: float, maximum: float, multiplier: float = _DEFAULT_DELAY_MULTIPLIER ): """Generates sleep intervals based on the exponential back-off algorithm. This implements the `Truncated Exponential Back-off`_ algorithm. .. _Truncated Exponential Back-off: https://cloud.google.com/storage/docs/exponential-backoff Args: initial (float): The minimum amount of time to delay. This must be greater than 0. maximum (float): The maximum amount of time to delay. multiplier (float): The multiplier applied to the delay. Yields: float: successive sleep intervals. """ max_delay = min(initial, maximum) while True: yield random.uniform(0.0, max_delay) max_delay = min(max_delay * multiplier, maximum) class RetryFailureReason(Enum): """ The cause of a failed retry, used when building exceptions """ TIMEOUT = 0 NON_RETRYABLE_ERROR = 1 def build_retry_error( exc_list: list[Exception], reason: RetryFailureReason, timeout_val: float | None, **kwargs: Any, ) -> tuple[Exception, Exception | None]: """ Default exception_factory implementation. Returns a RetryError if the failure is due to a timeout, otherwise returns the last exception encountered. Args: - exc_list: list of exceptions that occurred during the retry - reason: reason for the retry failure. Can be TIMEOUT or NON_RETRYABLE_ERROR - timeout_val: the original timeout value for the retry (in seconds), for use in the exception message Returns: - tuple: a tuple of the exception to be raised, and the cause exception if any """ if reason == RetryFailureReason.TIMEOUT: # return RetryError with the most recent exception as the cause src_exc = exc_list[-1] if exc_list else None timeout_val_str = f"of {timeout_val:0.1f}s " if timeout_val is not None else "" return ( exceptions.RetryError( f"Timeout {timeout_val_str}exceeded", src_exc, ), src_exc, ) elif exc_list: # return most recent exception encountered return exc_list[-1], None else: # no exceptions were given in exc_list. Raise generic RetryError return exceptions.RetryError("Unknown error", None), None def _retry_error_helper( exc: Exception, deadline: float | None, next_sleep: float, error_list: list[Exception], predicate_fn: Callable[[Exception], bool], on_error_fn: Callable[[Exception], None] | None, exc_factory_fn: Callable[ [list[Exception], RetryFailureReason, float | None], tuple[Exception, Exception | None], ], original_timeout: float | None, ): """ Shared logic for handling an error for all retry implementations - Raises an error on timeout or non-retryable error - Calls on_error_fn if provided - Logs the error Args: - exc: the exception that was raised - deadline: the deadline for the retry, calculated as a diff from time.monotonic() - next_sleep: the next sleep interval - error_list: the list of exceptions that have been raised so far - predicate_fn: takes `exc` and returns true if the operation should be retried - on_error_fn: callback to execute when a retryable error occurs - exc_factory_fn: callback used to build the exception to be raised on terminal failure - original_timeout_val: the original timeout value for the retry (in seconds), to be passed to the exception factory for building an error message """ error_list.append(exc) if not predicate_fn(exc): final_exc, source_exc = exc_factory_fn( error_list, RetryFailureReason.NON_RETRYABLE_ERROR, original_timeout, ) raise final_exc from source_exc if on_error_fn is not None: on_error_fn(exc) if deadline is not None and time.monotonic() + next_sleep > deadline: final_exc, source_exc = exc_factory_fn( error_list, RetryFailureReason.TIMEOUT, original_timeout, ) raise final_exc from source_exc _LOGGER.debug( "Retrying due to {}, sleeping {:.1f}s ...".format(error_list[-1], next_sleep) ) class _BaseRetry(object): """ Base class for retry configuration objects. This class is intended to capture retry and backoff configuration that is common to both synchronous and asynchronous retries, for both unary and streaming RPCs. It is not intended to be instantiated directly, but rather to be subclassed by the various retry configuration classes. """ def __init__( self, predicate: Callable[[Exception], bool] = if_transient_error, initial: float = _DEFAULT_INITIAL_DELAY, maximum: float = _DEFAULT_MAXIMUM_DELAY, multiplier: float = _DEFAULT_DELAY_MULTIPLIER, timeout: Optional[float] = _DEFAULT_DEADLINE, on_error: Optional[Callable[[Exception], Any]] = None, **kwargs: Any, ) -> None: self._predicate = predicate self._initial = initial self._multiplier = multiplier self._maximum = maximum self._timeout = kwargs.get("deadline", timeout) self._deadline = self._timeout self._on_error = on_error def __call__(self, *args, **kwargs) -> Any: raise NotImplementedError("Not implemented in base class") @property def deadline(self) -> float | None: """ DEPRECATED: use ``timeout`` instead. Refer to the ``Retry`` class documentation for details. """ return self._timeout @property def timeout(self) -> float | None: return self._timeout def with_deadline(self, deadline: float | None) -> Self: """Return a copy of this retry with the given timeout. DEPRECATED: use :meth:`with_timeout` instead. Refer to the ``Retry`` class documentation for details. Args: deadline (float|None): How long to keep retrying, in seconds. If None, no timeout is enforced. Returns: Retry: A new retry instance with the given timeout. """ return self.with_timeout(deadline) def with_timeout(self, timeout: float | None) -> Self: """Return a copy of this retry with the given timeout. Args: timeout (float): How long to keep retrying, in seconds. If None, no timeout will be enforced. Returns: Retry: A new retry instance with the given timeout. """ return type(self)( predicate=self._predicate, initial=self._initial, maximum=self._maximum, multiplier=self._multiplier, timeout=timeout, on_error=self._on_error, ) def with_predicate(self, predicate: Callable[[Exception], bool]) -> Self: """Return a copy of this retry with the given predicate. Args: predicate (Callable[Exception]): A callable that should return ``True`` if the given exception is retryable. Returns: Retry: A new retry instance with the given predicate. """ return type(self)( predicate=predicate, initial=self._initial, maximum=self._maximum, multiplier=self._multiplier, timeout=self._timeout, on_error=self._on_error, ) def with_delay( self, initial: Optional[float] = None, maximum: Optional[float] = None, multiplier: Optional[float] = None, ) -> Self: """Return a copy of this retry with the given delay options. Args: initial (float): The minimum amount of time to delay (in seconds). This must be greater than 0. If None, the current value is used. maximum (float): The maximum amount of time to delay (in seconds). If None, the current value is used. multiplier (float): The multiplier applied to the delay. If None, the current value is used. Returns: Retry: A new retry instance with the given delay options. """ return type(self)( predicate=self._predicate, initial=initial if initial is not None else self._initial, maximum=maximum if maximum is not None else self._maximum, multiplier=multiplier if multiplier is not None else self._multiplier, timeout=self._timeout, on_error=self._on_error, ) def __str__(self) -> str: return ( "<{} predicate={}, initial={:.1f}, maximum={:.1f}, " "multiplier={:.1f}, timeout={}, on_error={}>".format( type(self).__name__, self._predicate, self._initial, self._maximum, self._multiplier, self._timeout, # timeout can be None, thus no {:.1f} self._on_error, ) ) retry_unary.py 0000644 00000032032 15030251505 0007475 0 ustar 00 # Copyright 2017 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Helpers for retrying functions with exponential back-off. The :class:`Retry` decorator can be used to retry functions that raise exceptions using exponential backoff. Because a exponential sleep algorithm is used, the retry is limited by a `timeout`. The timeout determines the window in which retries will be attempted. This is used instead of total number of retries because it is difficult to ascertain the amount of time a function can block when using total number of retries and exponential backoff. By default, this decorator will retry transient API errors (see :func:`if_transient_error`). For example: .. code-block:: python @retry.Retry() def call_flaky_rpc(): return client.flaky_rpc() # Will retry flaky_rpc() if it raises transient API errors. result = call_flaky_rpc() You can pass a custom predicate to retry on different exceptions, such as waiting for an eventually consistent item to be available: .. code-block:: python @retry.Retry(predicate=if_exception_type(exceptions.NotFound)) def check_if_exists(): return client.does_thing_exist() is_available = check_if_exists() Some client library methods apply retry automatically. These methods can accept a ``retry`` parameter that allows you to configure the behavior: .. code-block:: python my_retry = retry.Retry(timeout=60) result = client.some_method(retry=my_retry) """ from __future__ import annotations import functools import sys import time import inspect import warnings from typing import Any, Callable, Iterable, TypeVar, TYPE_CHECKING from google.api_core.retry.retry_base import _BaseRetry from google.api_core.retry.retry_base import _retry_error_helper from google.api_core.retry.retry_base import exponential_sleep_generator from google.api_core.retry.retry_base import build_retry_error from google.api_core.retry.retry_base import RetryFailureReason if TYPE_CHECKING: if sys.version_info >= (3, 10): from typing import ParamSpec else: from typing_extensions import ParamSpec _P = ParamSpec("_P") # target function call parameters _R = TypeVar("_R") # target function returned value _ASYNC_RETRY_WARNING = "Using the synchronous google.api_core.retry.Retry with asynchronous calls may lead to unexpected results. Please use google.api_core.retry_async.AsyncRetry instead." def retry_target( target: Callable[_P, _R], predicate: Callable[[Exception], bool], sleep_generator: Iterable[float], timeout: float | None = None, on_error: Callable[[Exception], None] | None = None, exception_factory: Callable[ [list[Exception], RetryFailureReason, float | None], tuple[Exception, Exception | None], ] = build_retry_error, **kwargs, ): """Call a function and retry if it fails. This is the lowest-level retry helper. Generally, you'll use the higher-level retry helper :class:`Retry`. Args: target(Callable): The function to call and retry. This must be a nullary function - apply arguments with `functools.partial`. predicate (Callable[Exception]): A callable used to determine if an exception raised by the target should be considered retryable. It should return True to retry or False otherwise. sleep_generator (Iterable[float]): An infinite iterator that determines how long to sleep between retries. timeout (Optional[float]): How long to keep retrying the target. Note: timeout is only checked before initiating a retry, so the target may run past the timeout value as long as it is healthy. on_error (Optional[Callable[Exception]]): If given, the on_error callback will be called with each retryable exception raised by the target. Any error raised by this function will *not* be caught. exception_factory: A function that is called when the retryable reaches a terminal failure state, used to construct an exception to be raised. It takes a list of all exceptions encountered, a retry.RetryFailureReason enum indicating the failure cause, and the original timeout value as arguments. It should return a tuple of the exception to be raised, along with the cause exception if any. The default implementation will raise a RetryError on timeout, or the last exception encountered otherwise. deadline (float): DEPRECATED: use ``timeout`` instead. For backward compatibility, if specified it will override ``timeout`` parameter. Returns: Any: the return value of the target function. Raises: ValueError: If the sleep generator stops yielding values. Exception: a custom exception specified by the exception_factory if provided. If no exception_factory is provided: google.api_core.RetryError: If the timeout is exceeded while retrying. Exception: If the target raises an error that isn't retryable. """ timeout = kwargs.get("deadline", timeout) deadline = time.monotonic() + timeout if timeout is not None else None error_list: list[Exception] = [] for sleep in sleep_generator: try: result = target() if inspect.isawaitable(result): warnings.warn(_ASYNC_RETRY_WARNING) return result # pylint: disable=broad-except # This function explicitly must deal with broad exceptions. except Exception as exc: # defer to shared logic for handling errors _retry_error_helper( exc, deadline, sleep, error_list, predicate, on_error, exception_factory, timeout, ) # if exception not raised, sleep before next attempt time.sleep(sleep) raise ValueError("Sleep generator stopped yielding sleep values.") class Retry(_BaseRetry): """Exponential retry decorator for unary synchronous RPCs. This class is a decorator used to add retry or polling behavior to an RPC call. Although the default behavior is to retry transient API errors, a different predicate can be provided to retry other exceptions. There are two important concepts that retry/polling behavior may operate on, Deadline and Timeout, which need to be properly defined for the correct usage of this class and the rest of the library. Deadline: a fixed point in time by which a certain operation must terminate. For example, if a certain operation has a deadline "2022-10-18T23:30:52.123Z" it must terminate (successfully or with an error) by that time, regardless of when it was started or whether it was started at all. Timeout: the maximum duration of time after which a certain operation must terminate (successfully or with an error). The countdown begins right after an operation was started. For example, if an operation was started at 09:24:00 with timeout of 75 seconds, it must terminate no later than 09:25:15. Unfortunately, in the past this class (and the api-core library as a whole) has not been properly distinguishing the concepts of "timeout" and "deadline", and the ``deadline`` parameter has meant ``timeout``. That is why ``deadline`` has been deprecated and ``timeout`` should be used instead. If the ``deadline`` parameter is set, it will override the ``timeout`` parameter. In other words, ``retry.deadline`` should be treated as just a deprecated alias for ``retry.timeout``. Said another way, it is safe to assume that this class and the rest of this library operate in terms of timeouts (not deadlines) unless explicitly noted the usage of deadline semantics. It is also important to understand the three most common applications of the Timeout concept in the context of this library. Usually the generic Timeout term may stand for one of the following actual timeouts: RPC Timeout, Retry Timeout, or Polling Timeout. RPC Timeout: a value supplied by the client to the server so that the server side knows the maximum amount of time it is expected to spend handling that specific RPC. For example, in the case of gRPC transport, RPC Timeout is represented by setting "grpc-timeout" header in the HTTP2 request. The `timeout` property of this class normally never represents the RPC Timeout as it is handled separately by the ``google.api_core.timeout`` module of this library. Retry Timeout: this is the most common meaning of the ``timeout`` property of this class, and defines how long a certain RPC may be retried in case the server returns an error. Polling Timeout: defines how long the client side is allowed to call the polling RPC repeatedly to check a status of a long-running operation. Each polling RPC is expected to succeed (its errors are supposed to be handled by the retry logic). The decision as to whether a new polling attempt needs to be made is based not on the RPC status code but on the status of the returned status of an operation. In other words: we will poll a long-running operation until the operation is done or the polling timeout expires. Each poll will inform us of the status of the operation. The poll consists of an RPC to the server that may itself be retried as per the poll-specific retry settings in case of errors. The operation-level retry settings do NOT apply to polling-RPC retries. With the actual timeout types being defined above, the client libraries often refer to just Timeout without clarifying which type specifically that is. In that case the actual timeout type (sometimes also referred to as Logical Timeout) can be determined from the context. If it is a unary rpc call (i.e. a regular one) Timeout usually stands for the RPC Timeout (if provided directly as a standalone value) or Retry Timeout (if provided as ``retry.timeout`` property of the unary RPC's retry config). For ``Operation`` or ``PollingFuture`` in general Timeout stands for Polling Timeout. Args: predicate (Callable[Exception]): A callable that should return ``True`` if the given exception is retryable. initial (float): The minimum amount of time to delay in seconds. This must be greater than 0. maximum (float): The maximum amount of time to delay in seconds. multiplier (float): The multiplier applied to the delay. timeout (Optional[float]): How long to keep retrying, in seconds. Note: timeout is only checked before initiating a retry, so the target may run past the timeout value as long as it is healthy. on_error (Callable[Exception]): A function to call while processing a retryable exception. Any error raised by this function will *not* be caught. deadline (float): DEPRECATED: use `timeout` instead. For backward compatibility, if specified it will override the ``timeout`` parameter. """ def __call__( self, func: Callable[_P, _R], on_error: Callable[[Exception], Any] | None = None, ) -> Callable[_P, _R]: """Wrap a callable with retry behavior. Args: func (Callable): The callable to add retry behavior to. on_error (Optional[Callable[Exception]]): If given, the on_error callback will be called with each retryable exception raised by the wrapped function. Any error raised by this function will *not* be caught. If on_error was specified in the constructor, this value will be ignored. Returns: Callable: A callable that will invoke ``func`` with retry behavior. """ if self._on_error is not None: on_error = self._on_error @functools.wraps(func) def retry_wrapped_func(*args: _P.args, **kwargs: _P.kwargs) -> _R: """A wrapper that calls target function with retry.""" target = functools.partial(func, *args, **kwargs) sleep_generator = exponential_sleep_generator( self._initial, self._maximum, multiplier=self._multiplier ) return retry_target( target, self._predicate, sleep_generator, timeout=self._timeout, on_error=on_error, ) return retry_wrapped_func __pycache__/__init__.cpython-310.pyc 0000644 00000002034 15030251505 0013207 0 ustar 00 o �h( � @ s� d Z ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlm Z dd l mZ ddl m Z dd l mZ ddl mZ ddlmZ ddlmZ d dlmZ d dlmZ d dlmZ dZdS )z5Retry implementation for Google API client libraries.� )�exponential_sleep_generator)�if_exception_type)�if_transient_error)�build_retry_error)�RetryFailureReason)�Retry)�retry_target)� AsyncRetry)�StreamingRetry)�retry_target_stream)�AsyncStreamingRetry� )�datetime_helpers)� exceptions) r r r r r r r r r r �retry_target_asyncr �retry_target_stream_asyncN)�__doc__� retry_baser r r r r �retry_unaryr r �retry_unary_asyncr r �retry_streamingr r �retry_streaming_asyncr r �google.api_corer r �google.auth�auth_exceptions�__all__� r r �Q/usr/local/CyberCP/lib/python3.10/site-packages/google/api_core/retry/__init__.py�<module> s$ __pycache__/retry_streaming.cpython-310.pyc 0000644 00000022622 15030251505 0014673 0 ustar 00 o �hl* � @ s� d Z ddlmZ ddlmZmZmZmZmZm Z m Z mZmZ ddl Z ddlZddlZddlmZ ddlmZ ddlmZ ddlmZ dd lmZ erde jd krVddlmZ nddlmZ ed�Ze d �Zddedi fd%d!d"�ZG d#d$� d$e�ZdS )&z1 Generator wrapper for retryable streaming RPCs. � )�annotations) �Callable�Optional�List�Tuple�Iterable� Generator�TypeVar�Any� TYPE_CHECKINGN)� _BaseRetry)�_retry_error_helper)�exponential_sleep_generator)�build_retry_error)�RetryFailureReason)� � )� ParamSpec�_P�_Y� �target�Callable[_P, Iterable[_Y]]� predicate�Callable[[Exception], bool]�sleep_generator�Iterable[float]�timeout�Optional[float]�on_error�%Optional[Callable[[Exception], None]]�exception_factory�gCallable[[List[Exception], RetryFailureReason, Optional[float]], Tuple[Exception, Optional[Exception]]]� init_args�_P.args�init_kwargs� _P.kwargs�return�Generator[_Y, Any, None]c k s� � |� d|�}|durt�� | nd} g } |D ]3}z| |i |��}|E dH W S tyJ } zt| | || ||||� t�|� W Y d} ~ qd} ~ ww td��)av Create a generator wrapper that retries the wrapped stream if it fails. This is the lowest-level retry helper. Generally, you'll use the higher-level retry helper :class:`Retry`. Args: target: The generator function to call and retry. predicate: A callable used to determine if an exception raised by the target should be considered retryable. It should return True to retry or False otherwise. sleep_generator: An infinite iterator that determines how long to sleep between retries. timeout: How long to keep retrying the target. Note: timeout is only checked before initiating a retry, so the target may run past the timeout value as long as it is healthy. on_error: If given, the on_error callback will be called with each retryable exception raised by the target. Any error raised by this function will *not* be caught. exception_factory: A function that is called when the retryable reaches a terminal failure state, used to construct an exception to be raised. It takes a list of all exceptions encountered, a retry.RetryFailureReason enum indicating the failure cause, and the original timeout value as arguments. It should return a tuple of the exception to be raised, along with the cause exception if any. The default implementation will raise a RetryError on timeout, or the last exception encountered otherwise. init_args: Positional arguments to pass to the target function. init_kwargs: Keyword arguments to pass to the target function. Returns: Generator: A retryable generator that wraps the target generator function. Raises: ValueError: If the sleep generator stops yielding values. Exception: a custom exception specified by the exception_factory if provided. If no exception_factory is provided: google.api_core.RetryError: If the timeout is exceeded while retrying. Exception: If the target raises an error that isn't retryable. �deadlineNz.Sleep generator stopped yielding sleep values.)�get�time� monotonic� Exceptionr �sleep� ValueError)r r r r r r! r# r% �kwargsr) � error_listr. �subgenerator�excr r �X/usr/local/CyberCP/lib/python3.10/site-packages/google/api_core/retry/retry_streaming.py�retry_target_stream4 s0 �5����r5 c @ s e Zd ZdZ ddd d �ZdS ) �StreamingRetrya: Exponential retry decorator for streaming synchronous RPCs. This class returns a Generator when called, which wraps the target stream in retry logic. If any exception is raised by the target, the entire stream will be retried within the wrapper. Although the default behavior is to retry transient API errors, a different predicate can be provided to retry other exceptions. Important Note: when a stream encounters a retryable error, it will silently construct a fresh iterator instance in the background and continue yielding (likely duplicate) values as if no error occurred. This is the most general way to retry a stream, but it often is not the desired behavior. Example: iter([1, 2, 1/0]) -> [1, 2, 1, 2, ...] There are two ways to build more advanced retry logic for streams: 1. Wrap the target Use a ``target`` that maintains state between retries, and creates a different generator on each retry call. For example, you can wrap a network call in a function that modifies the request based on what has already been returned: .. code-block:: python def attempt_with_modified_request(target, request, seen_items=[]): # remove seen items from request on each attempt new_request = modify_request(request, seen_items) new_generator = target(new_request) for item in new_generator: yield item seen_items.append(item) retry_wrapped_fn = StreamingRetry()(attempt_with_modified_request) retryable_generator = retry_wrapped_fn(target, request) 2. Wrap the retry generator Alternatively, you can wrap the retryable generator itself before passing it to the end-user to add a filter on the stream. For example, you can keep track of the items that were successfully yielded in previous retry attempts, and only yield new items when the new attempt surpasses the previous ones: .. code-block:: python def retryable_with_filter(target): stream_idx = 0 # reset stream_idx when the stream is retried def on_error(e): nonlocal stream_idx stream_idx = 0 # build retryable retryable_gen = StreamingRetry(...)(target) # keep track of what has been yielded out of filter seen_items = [] for item in retryable_gen(): if stream_idx >= len(seen_items): seen_items.append(item) yield item elif item != seen_items[stream_idx]: raise ValueError("Stream differs from last attempt") stream_idx += 1 filter_retry_wrapped = retryable_with_filter(target) Args: predicate (Callable[Exception]): A callable that should return ``True`` if the given exception is retryable. initial (float): The minimum amount of time to delay in seconds. This must be greater than 0. maximum (float): The maximum amount of time to delay in seconds. multiplier (float): The multiplier applied to the delay. timeout (float): How long to keep retrying, in seconds. Note: timeout is only checked before initiating a retry, so the target may run past the timeout value as long as it is healthy. on_error (Callable[Exception]): A function to call while processing a retryable exception. Any error raised by this function will *not* be caught. deadline (float): DEPRECATED: use `timeout` instead. For backward compatibility, if specified it will override the ``timeout`` parameter. N�funcr r �!Callable[[Exception], Any] | Noner'