Source code for infuse.breaker.states

from datetime import datetime, timedelta

from inspect import isawaitable
from typing import Callable, Any

from pybreaker import (
    STATE_OPEN,
    STATE_HALF_OPEN,
    STATE_CLOSED,
    CircuitBreakerState,
    CircuitBreakerError,
)


[docs]class AioCircuitBreakerState(CircuitBreakerState): """ Asyncio implementation for the behavior needed by all circuit breaker states. """
[docs] @classmethod async def initialize(cls, cb, prev_state: str = None, notify: bool = False): self = cls(cb, prev_state, notify) await self._initialize(cb, prev_state, notify) return self
async def _initialize(self, cb, prev_state: str, notify: bool): """ Override this method to initialize async state. """ pass async def _handle_error(self, exc: Exception, reraise: bool = True): """ Handles a failed call to the guarded operation. :param reraise: If true, raises the error, else passes. """ if self._breaker.is_system_error(exc): await self._breaker._inc_counter() for listener in self._breaker.listeners: listener.failure(self._breaker, exc) await self.on_failure(exc) else: await self._handle_success() if reraise: raise exc async def _handle_success(self) -> None: """ Handles a successful call to the guarded operation. """ await self._breaker._state_storage.reset_counter() await self.on_success() for listener in self._breaker.listeners: listener.success(self._breaker)
[docs] async def call(self, func: Callable, *args, **kwargs): """ Calls async `func` with the given `args` and `kwargs`, and updates the circuit breaker state according to the result. Return a closure to prevent import errors when using without tornado present """ ret = None await self.before_call(func, *args, **kwargs) for listener in self._breaker.listeners: listener.before_call(self._breaker, func, *args, **kwargs) try: ret = func(*args, **kwargs) if isawaitable(ret): ret = await ret except BaseException as e: await self._handle_error(e) else: await self._handle_success() return ret
[docs] async def before_call(self, func: Callable, *args, **kwargs): """ Override this method to be notified before a call to the guarded operation is attempted. """ pass
[docs] async def on_success(self): """ Override this method to be notified when a call to the guarded operation succeeds. """ pass
[docs] async def on_failure(self, exc: Exception): """ Override this method to be notified when a call to the guarded operation fails. """ pass
[docs]class AioCircuitClosedState(AioCircuitBreakerState): """ In the normal "closed" state, the circuit breaker executes operations as usual. If the call succeeds, nothing happens. If it fails, however, the circuit breaker makes a note of the failure. Once the number of failures exceeds a threshold, the circuit breaker trips and "opens" the circuit. """ def __init__(self, cb, prev_state: str = None, notify: bool = False): """ Moves the given circuit breaker `cb` to the "closed" state. """ super(AioCircuitClosedState, self).__init__(cb, STATE_CLOSED) # self._breaker._state_storage.reset_counter() if notify: for listener in self._breaker.listeners: listener.state_change(self._breaker, prev_state, self) async def _initialize(self, cb, prev_state: str, notify: bool) -> None: if notify: await self._breaker._state_storage.reset_counter()
[docs] async def on_failure(self, exc: Exception) -> None: """ Moves the circuit breaker to the "open" state once the failures threshold is reached. :raises CircuitBreakerError: If the failure threshold has been reached. """ counter = await self._breaker._state_storage.counter if counter >= self._breaker.fail_max: await self._breaker.open() error_msg = "Failures threshold reached, circuit breaker opened" raise CircuitBreakerError(error_msg)
[docs]class AioCircuitOpenState(AioCircuitBreakerState): """ When the circuit is "open", calls to the circuit breaker fail immediately, without any attempt to execute the real operation. This is indicated by the ``CircuitBreakerError`` exception. After a suitable amount of time, the circuit breaker decides that the operation has a chance of succeeding, so it goes into the "half-open" state. """ def __init__(self, cb, prev_state: str = None, notify: bool = False): """ Moves the given circuit breaker `cb` to the "open" state. """ super(AioCircuitOpenState, self).__init__(cb, STATE_OPEN) # self._breaker._state_storage.opened_at = datetime.utcnow() if notify: for listener in self._breaker.listeners: listener.state_change(self._breaker, prev_state, self)
[docs] async def before_call(self, func: Callable, *args, **kwargs) -> Any: """ After the timeout elapses, move the circuit breaker to the "half-open" state; otherwise, raises ``CircuitBreakerError`` without any attempt to execute the real operation. :raises CircuitBreakerError: If timeout has not elapsed. """ timeout = timedelta(seconds=self._breaker.reset_timeout) opened_at = await self._breaker._state_storage.opened_at if opened_at and datetime.utcnow() < opened_at + timeout: error_msg = "Timeout not elapsed yet, circuit breaker still open" raise CircuitBreakerError(error_msg) else: await self._breaker.half_open() return await self._breaker.call(func, *args, **kwargs)
[docs] async def call(self, func: Callable, *args, **kwargs) -> Any: """ Delegate the call to before_call, if the time out is not elapsed it will throw an exception, otherwise we get the results from the call performed after the state is switch to half-open """ return await self.before_call(func, *args, **kwargs)
[docs]class AioCircuitHalfOpenState(AioCircuitBreakerState): """ In the "half-open" state, the next call to the circuit breaker is allowed to execute the dangerous operation. Should the call succeed, the circuit breaker resets and returns to the "closed" state. If this trial call fails, however, the circuit breaker returns to the "open" state until another timeout elapses. """ def __init__(self, cb, prev_state: str = None, notify: bool = False): """ Moves the given circuit breaker `cb` to the "half-open" state. """ super(AioCircuitHalfOpenState, self).__init__(cb, STATE_HALF_OPEN) if notify: for listener in self._breaker._listeners: listener.state_change(self._breaker, prev_state, self)
[docs] async def on_failure(self, exc: Exception) -> None: """ Opens the circuit breaker. :raises CircuitBreakerError: "Trial call failed, circuit breaker opened" """ await self._breaker.open() raise CircuitBreakerError("Trial call failed, circuit breaker opened")
[docs] async def on_success(self) -> None: """ Closes the circuit breaker. """ await self._breaker.close()