#-*- coding:utf-8 -*-
"""
Threadsafe pure-Python implementation of the Circuit Breaker pattern, described
by Michael T. Nygard in his book 'Release It!'.
For more information on this and other patterns and best practices, buy the
book at http://pragprog.com/titles/mnee/release-it
"""
import types
import time
import calendar
import logging
from datetime import datetime, timedelta
from functools import wraps
import threading
import six
import sys
try:
from tornado import gen
HAS_TORNADO_SUPPORT = True
except ImportError:
HAS_TORNADO_SUPPORT = False
try:
from redis.exceptions import RedisError
HAS_REDIS_SUPPORT = True
except ImportError:
HAS_REDIS_SUPPORT = False
__all__ = (
'CircuitBreaker', 'CircuitBreakerListener', 'CircuitBreakerError',
'CircuitMemoryStorage', 'CircuitRedisStorage', 'STATE_OPEN', 'STATE_CLOSED',
'STATE_HALF_OPEN',)
STATE_OPEN = 'open'
STATE_CLOSED = 'closed'
STATE_HALF_OPEN = 'half-open'
class CircuitBreaker(object):
"""
More abstractly, circuit breakers exists to allow one subsystem to fail
without destroying the entire system.
This is done by wrapping dangerous operations (typically integration points)
with a component that can circumvent calls when the system is not healthy.
This pattern is described by Michael T. Nygard in his book 'Release It!'.
"""
def __init__(self, fail_max=5, reset_timeout=60, exclude=None,
listeners=None, state_storage=None, name=None):
"""
Creates a new circuit breaker with the given parameters.
"""
self._lock = threading.RLock()
self._state_storage = state_storage or CircuitMemoryStorage(STATE_CLOSED)
self._state = self._create_new_state(self.current_state)
self._fail_max = fail_max
self._reset_timeout = reset_timeout
self._excluded_exceptions = list(exclude or [])
self._listeners = list(listeners or [])
self._name = name
@property
def fail_counter(self):
"""
Returns the current number of consecutive failures.
"""
return self._state_storage.counter
@property
def fail_max(self):
"""
Returns the maximum number of failures tolerated before the circuit is
opened.
"""
return self._fail_max
@fail_max.setter
def fail_max(self, number):
"""
Sets the maximum `number` of failures tolerated before the circuit is
opened.
"""
self._fail_max = number
@property
def reset_timeout(self):
"""
Once this circuit breaker is opened, it should remain opened until the
timeout period, in seconds, elapses.
"""
return self._reset_timeout
@reset_timeout.setter
def reset_timeout(self, timeout):
"""
Sets the `timeout` period, in seconds, this circuit breaker should be
kept open.
"""
self._reset_timeout = timeout
def _create_new_state(self, new_state, prev_state=None, notify=False):
"""
Return state object from state string, i.e.,
'closed' -> <CircuitClosedState>
"""
state_map = {
STATE_CLOSED: CircuitClosedState,
STATE_OPEN: CircuitOpenState,
STATE_HALF_OPEN: CircuitHalfOpenState,
}
try:
cls = state_map[new_state]
return cls(self, prev_state=prev_state, notify=notify)
except KeyError:
msg = "Unknown state {!r}, valid states: {}"
raise ValueError(msg.format(new_state, ', '.join(state_map)))
@property
def state(self):
"""
Update (if needed) and returns the cached state object.
"""
# Ensure cached state is up-to-date
if self.current_state != self._state.name:
# If cached state is out-of-date, that means that it was likely
# changed elsewhere (e.g. another process instance). We still send
# out a notification, informing others that this particular circuit
# breaker instance noticed the changed circuit.
self.state = self.current_state
return self._state
@state.setter
def state(self, state_str):
"""
Set cached state and notify listeners of newly cached state.
"""
with self._lock:
self._state = self._create_new_state(
state_str, prev_state=self._state, notify=True)
@property
def current_state(self):
"""
Returns a string that identifies the state of the circuit breaker as
reported by the _state_storage. i.e., 'closed', 'open', 'half-open'.
"""
return self._state_storage.state
@property
def excluded_exceptions(self):
"""
Returns the list of excluded exceptions, e.g., exceptions that should
not be considered system errors by this circuit breaker.
"""
return tuple(self._excluded_exceptions)
def add_excluded_exception(self, exception):
"""
Adds an exception to the list of excluded exceptions.
"""
with self._lock:
self._excluded_exceptions.append(exception)
def add_excluded_exceptions(self, *exceptions):
"""
Adds exceptions to the list of excluded exceptions.
"""
for exc in exceptions:
self.add_excluded_exception(exc)
def remove_excluded_exception(self, exception):
"""
Removes an exception from the list of excluded exceptions.
"""
with self._lock:
self._excluded_exceptions.remove(exception)
def _inc_counter(self):
"""
Increments the counter of failed calls.
"""
self._state_storage.increment_counter()
def is_system_error(self, exception):
"""
Returns whether the exception `exception` is considered a signal of
system malfunction. Business exceptions should not cause this circuit
breaker to open.
"""
exception_type = type(exception)
for exclusion in self._excluded_exceptions:
if type(exclusion) is type:
if issubclass(exception_type, exclusion):
return False
elif callable(exclusion):
if exclusion(exception):
return False
return True
def call(self, func, *args, **kwargs):
"""
Calls `func` with the given `args` and `kwargs` according to the rules
implemented by the current state of this circuit breaker.
"""
with self._lock:
return self.state.call(func, *args, **kwargs)
def call_async(self, func, *args, **kwargs):
"""
Calls async `func` with the given `args` and `kwargs` according to the rules
implemented by the current state of this circuit breaker.
Return a closure to prevent import errors when using without tornado present
"""
@gen.coroutine
def wrapped():
with self._lock:
ret = yield self.state.call_async(func, *args, **kwargs)
raise gen.Return(ret)
return wrapped()
def open(self):
"""
Opens the circuit, e.g., the following calls will immediately fail
until timeout elapses.
"""
with self._lock:
self._state_storage.opened_at = datetime.utcnow()
self.state = self._state_storage.state = STATE_OPEN
def half_open(self):
"""
Half-opens the circuit, e.g. lets the following call pass through and
opens the circuit if the call fails (or closes the circuit if the call
succeeds).
"""
with self._lock:
self.state = self._state_storage.state = STATE_HALF_OPEN
def close(self):
"""
Closes the circuit, e.g. lets the following calls execute as usual.
"""
with self._lock:
self.state = self._state_storage.state = STATE_CLOSED
def __call__(self, *call_args, **call_kwargs):
"""
Returns a wrapper that calls the function `func` according to the rules
implemented by the current state of this circuit breaker.
Optionally takes the keyword argument `__pybreaker_call_coroutine`,
which will will call `func` as a Tornado co-routine.
"""
call_async = call_kwargs.pop('__pybreaker_call_async', False)
if call_async and not HAS_TORNADO_SUPPORT:
raise ImportError('No module named tornado')
def _outer_wrapper(func):
@wraps(func)
def _inner_wrapper(*args, **kwargs):
if call_async:
return self.call_async(func, *args, **kwargs)
return self.call(func, *args, **kwargs)
return _inner_wrapper
if call_args:
return _outer_wrapper(*call_args)
return _outer_wrapper
@property
def listeners(self):
"""
Returns the registered listeners as a tuple.
"""
return tuple(self._listeners)
def add_listener(self, listener):
"""
Registers a listener for this circuit breaker.
"""
with self._lock:
self._listeners.append(listener)
def add_listeners(self, *listeners):
"""
Registers listeners for this circuit breaker.
"""
for listener in listeners:
self.add_listener(listener)
def remove_listener(self, listener):
"""
Unregisters a listener of this circuit breaker.
"""
with self._lock:
self._listeners.remove(listener)
@property
def name(self):
"""
Returns the name of this circuit breaker. Useful for logging.
"""
return self._name
@name.setter
def name(self, name):
"""
Set the name of this circuit breaker.
"""
self._name = name
[docs]class CircuitBreakerStorage(object):
"""
Defines the underlying storage for a circuit breaker - the underlying
implementation should be in a subclass that overrides the method this
class defines.
"""
def __init__(self, name):
"""
Creates a new instance identified by `name`.
"""
self._name = name
@property
def name(self):
"""
Returns a human friendly name that identifies this state.
"""
return self._name
@property
def state(self):
"""
Override this method to retrieve the current circuit breaker state.
"""
pass
@state.setter
def state(self, state):
"""
Override this method to set the current circuit breaker state.
"""
pass
[docs] def increment_counter(self):
"""
Override this method to increase the failure counter by one.
"""
pass
[docs] def reset_counter(self):
"""
Override this method to set the failure counter to zero.
"""
pass
@property
def counter(self):
"""
Override this method to retrieve the current value of the failure counter.
"""
pass
@property
def opened_at(self):
"""
Override this method to retrieve the most recent value of when the
circuit was opened.
"""
pass
@opened_at.setter
def opened_at(self, datetime):
"""
Override this method to set the most recent value of when the circuit
was opened.
"""
pass
[docs]class CircuitMemoryStorage(CircuitBreakerStorage):
"""
Implements a `CircuitBreakerStorage` in local memory.
"""
def __init__(self, state):
"""
Creates a new instance with the given `state`.
"""
super(CircuitMemoryStorage, self).__init__('memory')
self._fail_counter = 0
self._opened_at = None
self._state = state
@property
def state(self):
"""
Returns the current circuit breaker state.
"""
return self._state
@state.setter
def state(self, state):
"""
Set the current circuit breaker state to `state`.
"""
self._state = state
[docs] def increment_counter(self):
"""
Increases the failure counter by one.
"""
self._fail_counter += 1
[docs] def reset_counter(self):
"""
Sets the failure counter to zero.
"""
self._fail_counter = 0
@property
def counter(self):
"""
Returns the current value of the failure counter.
"""
return self._fail_counter
@property
def opened_at(self):
"""
Returns the most recent value of when the circuit was opened.
"""
return self._opened_at
@opened_at.setter
def opened_at(self, datetime):
"""
Sets the most recent value of when the circuit was opened to
`datetime`.
"""
self._opened_at = datetime
class CircuitRedisStorage(CircuitBreakerStorage):
"""
Implements a `CircuitBreakerStorage` using redis.
"""
BASE_NAMESPACE = 'pybreaker'
logger = logging.getLogger(__name__)
def __init__(self, state, redis_object, namespace=None, fallback_circuit_state=STATE_CLOSED):
"""
Creates a new instance with the given `state` and `redis` object. The
redis object should be similar to pyredis' StrictRedis class. If there
are any connection issues with redis, the `fallback_circuit_state` is
used to determine the state of the circuit.
"""
# Module does not exist, so this feature is not available
if not HAS_REDIS_SUPPORT:
raise ImportError("CircuitRedisStorage can only be used if the required dependencies exist")
super(CircuitRedisStorage, self).__init__('redis')
try:
self.RedisError = __import__('redis').exceptions.RedisError
except ImportError:
# Module does not exist, so this feature is not available
raise ImportError("CircuitRedisStorage can only be used if 'redis' is available")
self._redis = redis_object
self._namespace_name = namespace
self._fallback_circuit_state = fallback_circuit_state
self._initial_state = str(state)
self._initialize_redis_state(self._initial_state)
def _initialize_redis_state(self, state):
self._redis.setnx(self._namespace('fail_counter'), 0)
self._redis.setnx(self._namespace('state'), state)
@property
def state(self):
"""
Returns the current circuit breaker state.
If the circuit breaker state on Redis is missing, re-initialize it
with the fallback circuit state and reset the fail counter.
"""
try:
state_bytes = self._redis.get(self._namespace('state'))
except self.RedisError:
self.logger.error('RedisError: falling back to default circuit state', exc_info=True)
return self._fallback_circuit_state
state = self._fallback_circuit_state
if state_bytes is not None:
state = state_bytes.decode('utf-8')
else:
# state retrieved from redis was missing, so we re-initialize
# the circuit breaker state on redis
self._initialize_redis_state(self._fallback_circuit_state)
return state
@state.setter
def state(self, state):
"""
Set the current circuit breaker state to `state`.
"""
try:
self._redis.set(self._namespace('state'), str(state))
except self.RedisError:
self.logger.error('RedisError', exc_info=True)
pass
def increment_counter(self):
"""
Increases the failure counter by one.
"""
try:
self._redis.incr(self._namespace('fail_counter'))
except self.RedisError:
self.logger.error('RedisError', exc_info=True)
pass
def reset_counter(self):
"""
Sets the failure counter to zero.
"""
try:
self._redis.set(self._namespace('fail_counter'), 0)
except self.RedisError:
self.logger.error('RedisError', exc_info=True)
pass
@property
def counter(self):
"""
Returns the current value of the failure counter.
"""
try:
value = self._redis.get(self._namespace('fail_counter'))
if value:
return int(value)
else:
return 0
except self.RedisError:
self.logger.error('RedisError: Assuming no errors', exc_info=True)
return 0
@property
def opened_at(self):
"""
Returns a datetime object of the most recent value of when the circuit
was opened.
"""
try:
timestamp = self._redis.get(self._namespace('opened_at'))
if timestamp:
return datetime(*time.gmtime(int(timestamp))[:6])
except self.RedisError:
self.logger.error('RedisError', exc_info=True)
return None
@opened_at.setter
def opened_at(self, now):
"""
Atomically sets the most recent value of when the circuit was opened
to `now`. Stored in redis as a simple integer of unix epoch time.
To avoid timezone issues between different systems, the passed in
datetime should be in UTC.
"""
try:
key = self._namespace('opened_at')
def set_if_greater(pipe):
current_value = pipe.get(key)
next_value = int(calendar.timegm(now.timetuple()))
pipe.multi()
if not current_value or next_value > int(current_value):
pipe.set(key, next_value)
self._redis.transaction(set_if_greater, key)
except self.RedisError:
self.logger.error('RedisError', exc_info=True)
pass
def _namespace(self, key):
name_parts = [self.BASE_NAMESPACE, key]
if self._namespace_name:
name_parts.insert(0, self._namespace_name)
return ':'.join(name_parts)
class CircuitBreakerListener(object):
"""
Listener class used to plug code to a ``CircuitBreaker`` instance when
certain events happen.
"""
def before_call(self, cb, func, *args, **kwargs):
"""
This callback function is called before the circuit breaker `cb` calls
`fn`.
"""
pass
def failure(self, cb, exc):
"""
This callback function is called when a function called by the circuit
breaker `cb` fails.
"""
pass
def success(self, cb):
"""
This callback function is called when a function called by the circuit
breaker `cb` succeeds.
"""
pass
def state_change(self, cb, old_state, new_state):
"""
This callback function is called when the state of the circuit breaker
`cb` state changes.
"""
pass
class CircuitBreakerState(object):
"""
Implements the behavior needed by all circuit breaker states.
"""
def __init__(self, cb, name):
"""
Creates a new instance associated with the circuit breaker `cb` and
identified by `name`.
"""
self._breaker = cb
self._name = name
@property
def name(self):
"""
Returns a human friendly name that identifies this state.
"""
return self._name
def _handle_error(self, exc, reraise=True):
"""
Handles a failed call to the guarded operation.
"""
if self._breaker.is_system_error(exc):
self._breaker._inc_counter()
for listener in self._breaker.listeners:
listener.failure(self._breaker, exc)
self.on_failure(exc)
else:
self._handle_success()
if reraise:
raise exc
def _handle_success(self):
"""
Handles a successful call to the guarded operation.
"""
self._breaker._state_storage.reset_counter()
self.on_success()
for listener in self._breaker.listeners:
listener.success(self._breaker)
def call(self, func, *args, **kwargs):
"""
Calls `func` with the given `args` and `kwargs`, and updates the
circuit breaker state according to the result.
"""
ret = None
self.before_call(func, *args, **kwargs)
for listener in self._breaker.listeners:
listener.before_call(self._breaker, func, *args, **kwargs)
try:
ret = func(*args, **kwargs)
if isinstance(ret, types.GeneratorType):
return self.generator_call(ret)
except BaseException as e:
self._handle_error(e)
else:
self._handle_success()
return ret
def call_async(self, func, *args, **kwargs):
"""
Calls async `func` with the given `args` and `kwargs`, and updates the
circuit breaker state according to the result.
Return a closure to prevent import errors when using without tornado present
"""
@gen.coroutine
def wrapped():
ret = None
self.before_call(func, *args, **kwargs)
for listener in self._breaker.listeners:
listener.before_call(self._breaker, func, *args, **kwargs)
try:
ret = yield func(*args, **kwargs)
if isinstance(ret, types.GeneratorType):
raise gen.Return(self.generator_call(ret))
except BaseException as e:
self._handle_error(e)
else:
self._handle_success()
raise gen.Return(ret)
return wrapped()
def generator_call(self, wrapped_generator):
try:
value = yield next(wrapped_generator)
while True:
value = yield wrapped_generator.send(value)
except StopIteration:
self._handle_success()
return
except BaseException as e:
self._handle_error(e, reraise=False)
wrapped_generator.throw(e)
def before_call(self, func, *args, **kwargs):
"""
Override this method to be notified before a call to the guarded
operation is attempted.
"""
pass
def on_success(self):
"""
Override this method to be notified when a call to the guarded
operation succeeds.
"""
pass
def on_failure(self, exc):
"""
Override this method to be notified when a call to the guarded
operation fails.
"""
pass
class CircuitClosedState(CircuitBreakerState):
"""
In the normal "closed" state, the circuit breaker executes operations as
usual. If the call succeeds, nothing happens. If it fails, however, the
circuit breaker makes a note of the failure.
Once the number of failures exceeds a threshold, the circuit breaker trips
and "opens" the circuit.
"""
def __init__(self, cb, prev_state=None, notify=False):
"""
Moves the given circuit breaker `cb` to the "closed" state.
"""
super(CircuitClosedState, self).__init__(cb, STATE_CLOSED)
if notify:
# We only reset the counter if notify is True, otherwise the CircuitBreaker
# will lose it's failure count due to a second CircuitBreaker being created
# using the same _state_storage object, or if the _state_storage objects
# share a central source of truth (as would be the case with the redis
# storage).
self._breaker._state_storage.reset_counter()
for listener in self._breaker.listeners:
listener.state_change(self._breaker, prev_state, self)
def on_failure(self, exc):
"""
Moves the circuit breaker to the "open" state once the failures
threshold is reached.
"""
if self._breaker._state_storage.counter >= self._breaker.fail_max:
self._breaker.open()
error_msg = 'Failures threshold reached, circuit breaker opened'
six.reraise(CircuitBreakerError, CircuitBreakerError(error_msg), sys.exc_info()[2])
class CircuitOpenState(CircuitBreakerState):
"""
When the circuit is "open", calls to the circuit breaker fail immediately,
without any attempt to execute the real operation. This is indicated by the
``CircuitBreakerError`` exception.
After a suitable amount of time, the circuit breaker decides that the
operation has a chance of succeeding, so it goes into the "half-open" state.
"""
def __init__(self, cb, prev_state=None, notify=False):
"""
Moves the given circuit breaker `cb` to the "open" state.
"""
super(CircuitOpenState, self).__init__(cb, STATE_OPEN)
if notify:
for listener in self._breaker.listeners:
listener.state_change(self._breaker, prev_state, self)
def before_call(self, func, *args, **kwargs):
"""
After the timeout elapses, move the circuit breaker to the "half-open"
state; otherwise, raises ``CircuitBreakerError`` without any attempt
to execute the real operation.
"""
timeout = timedelta(seconds=self._breaker.reset_timeout)
opened_at = self._breaker._state_storage.opened_at
if opened_at and datetime.utcnow() < opened_at + timeout:
error_msg = 'Timeout not elapsed yet, circuit breaker still open'
raise CircuitBreakerError(error_msg)
else:
self._breaker.half_open()
return self._breaker.call(func, *args, **kwargs)
def call(self, func, *args, **kwargs):
"""
Delegate the call to before_call, if the time out is not elapsed it will throw an exception, otherwise we get
the results from the call performed after the state is switch to half-open
"""
return self.before_call(func, *args, **kwargs)
class CircuitHalfOpenState(CircuitBreakerState):
"""
In the "half-open" state, the next call to the circuit breaker is allowed
to execute the dangerous operation. Should the call succeed, the circuit
breaker resets and returns to the "closed" state. If this trial call fails,
however, the circuit breaker returns to the "open" state until another
timeout elapses.
"""
def __init__(self, cb, prev_state=None, notify=False):
"""
Moves the given circuit breaker `cb` to the "half-open" state.
"""
super(CircuitHalfOpenState, self).__init__(cb, STATE_HALF_OPEN)
if notify:
for listener in self._breaker._listeners:
listener.state_change(self._breaker, prev_state, self)
def on_failure(self, exc):
"""
Opens the circuit breaker.
"""
self._breaker.open()
error_msg = 'Trial call failed, circuit breaker opened'
six.reraise(CircuitBreakerError, CircuitBreakerError(error_msg), sys.exc_info()[2])
def on_success(self):
"""
Closes the circuit breaker.
"""
self._breaker.close()
class CircuitBreakerError(Exception):
"""
When calls to a service fails because the circuit is open, this error is
raised to allow the caller to handle this type of exception differently.
"""
pass