"""
Threadsafe pure-Python implementation of the Circuit Breaker pattern, described
by Michael T. Nygard in his book 'Release It!'.
For more information on this and other patterns and best practices, buy the
book at http://pragprog.com/titles/mnee/release-it
"""
import asyncio
import inspect
from datetime import datetime
from functools import wraps
from typing import List, Callable, Union, Awaitable
from pybreaker import CircuitBreaker, STATE_CLOSED, STATE_HALF_OPEN, STATE_OPEN
from infuse.breaker.storages import (
CircuitBreakerStorage,
CircuitAioRedisStorage,
CircuitAioMemoryStorage,
)
from infuse.breaker.states import (
AioCircuitClosedState,
AioCircuitHalfOpenState,
AioCircuitOpenState,
AioCircuitBreakerState,
)
__all__ = (
"AioCircuitBreaker",
"CircuitBreakerStorage",
"CircuitAioRedisStorage",
"CircuitAioMemoryStorage",
"AioCircuitClosedState",
"AioCircuitHalfOpenState",
"AioCircuitOpenState",
"AioCircuitBreakerState",
)
[docs]class AioCircuitBreaker(CircuitBreaker):
"""
More abstractly, circuit breakers exists to allow one subsystem to fail
without destroying the entire system.
This is done by wrapping dangerous operations (typically integration points)
with a component that can circumvent calls when the system is not healthy.
This pattern is described by Michael T. Nygard in his book 'Release It!'.
"""
[docs] @classmethod
async def initialize(
cls,
fail_max: int = 5,
reset_timeout: int = 60,
exclude: List[Exception] = None,
listeners: List = None,
state_storage: CircuitBreakerStorage = None,
name: str = None,
):
self = cls(
fail_max=fail_max,
reset_timeout=reset_timeout,
exclude=exclude,
listeners=listeners,
state_storage=state_storage
or CircuitAioMemoryStorage(STATE_CLOSED),
name=name,
)
# need this because CircuitBreaker calls our create_new_state
self._state = await self._state
return self
@property
async def fail_counter(self) -> int:
"""
Returns the current number of consecutive failures.
"""
return await self._state_storage.counter
async def _create_new_state(
self,
new_state: Union[str, Awaitable],
prev_state: AioCircuitBreakerState = None,
notify: bool = False,
) -> AioCircuitBreakerState:
"""
Return state object from state string, i.e.,
'closed' -> <CircuitClosedState>
"""
state_map = {
STATE_CLOSED: AioCircuitClosedState,
STATE_OPEN: AioCircuitOpenState,
STATE_HALF_OPEN: AioCircuitHalfOpenState,
}
if inspect.isawaitable(new_state):
new_state = await new_state
try:
cls = state_map[new_state]
return await cls.initialize(
self, prev_state=prev_state, notify=notify
)
except KeyError:
msg = "Unknown state {!r}, valid states: {}"
raise ValueError(msg.format(new_state, ", ".join(state_map)))
@property
async def state(self) -> AioCircuitBreakerState:
"""
Returns the current state of this circuit breaker.
"""
name = await self.current_state
if name != self._state.name or name is None:
name = STATE_HALF_OPEN if name is None else name
await self.set_state(name)
return self._state
[docs] async def set_state(self, state_str: str) -> None:
with self._lock:
self._state = await self._create_new_state(
state_str, prev_state=self._state, notify=True
)
@property
async def current_state(self) -> str:
"""
Returns a string that identifies this circuit breaker's state, i.e.,
'closed', 'open', 'half-open'.
"""
s = self._state_storage.state
if inspect.isawaitable(s):
s = await s
return s
async def _inc_counter(self):
"""
Increments the counter of failed calls.
"""
await self._state_storage.increment_counter()
[docs] async def call(self, func, *args, **kwargs):
"""
Calls async `func` with the given `args` and
`kwargs` according to the rules
implemented by the current state of this circuit breaker.
Return a closure to prevent import errors
when using without tornado present
"""
with self._lock:
state = await self.state
ret = await state.call(func, *args, **kwargs)
return ret
[docs] async def open(self) -> None:
"""
Opens the circuit, e.g., the following calls will immediately fail
until timeout elapses.
"""
with self._lock:
await asyncio.gather(
self.set_state(STATE_OPEN),
self._state_storage.set_state(STATE_OPEN),
self._state_storage.set_opened_at(datetime.utcnow()),
)
# self._state = await AioCircuitOpenState.initialize(
# self, self._state, notify=True
# )
[docs] async def half_open(self) -> None:
"""
Half-opens the circuit, e.g. lets the following call pass through and
opens the circuit if the call fails (or closes the circuit if the call
succeeds).
"""
with self._lock:
await asyncio.gather(
self.set_state(STATE_HALF_OPEN),
self._state_storage.set_state(STATE_HALF_OPEN),
)
[docs] async def close(self) -> None:
"""
Closes the circuit, e.g. lets the following calls execute as usual.
"""
with self._lock:
await asyncio.gather(
self.set_state(STATE_CLOSED),
self._state_storage.set_state(STATE_CLOSED),
)
def __call__(self, *call_args, **call_kwargs) -> Callable:
"""
Returns a wrapper that calls the function `func` according to the rules
implemented by the current state of this circuit breaker.
Optionally takes the keyword argument `__pybreaker_call_coroutine`,
which will will call `func` as a Tornado co-routine.
"""
def _outer_wrapper(func):
@wraps(func)
async def _inner_wrapper(*args, **kwargs):
ret = await self.call(func, *args, **kwargs)
return ret
return _inner_wrapper
if call_args:
return _outer_wrapper(*call_args)
return _outer_wrapper