Source code for tooz.coordination

# -*- coding: utf-8 -*-
#
#    Copyright (C) 2016 Red Hat, Inc.
#    Copyright (C) 2013-2014 eNovance Inc. All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import abc
import collections
import enum
import logging
import threading

from oslo_utils import excutils
from oslo_utils import netutils
from oslo_utils import timeutils
import six
from stevedore import driver

import tooz
from tooz import _retry

LOG = logging.getLogger(__name__)


TOOZ_BACKENDS_NAMESPACE = "tooz.backends"


[docs]class Characteristics(enum.Enum): """Attempts to describe the characteristic that a driver supports.""" DISTRIBUTED_ACROSS_THREADS = 'DISTRIBUTED_ACROSS_THREADS' """Coordinator components when used by multiple **threads** work the same as if those components were only used by a single thread.""" DISTRIBUTED_ACROSS_PROCESSES = 'DISTRIBUTED_ACROSS_PROCESSES' """Coordinator components when used by multiple **processes** work the same as if those components were only used by a single thread.""" DISTRIBUTED_ACROSS_HOSTS = 'DISTRIBUTED_ACROSS_HOSTS' """Coordinator components when used by multiple **hosts** work the same as if those components were only used by a single thread.""" NON_TIMEOUT_BASED = 'NON_TIMEOUT_BASED' """The driver has the following property: * Its operations are not based on the timeout of other clients, but on some other more robust mechanisms. """ LINEARIZABLE = 'LINEARIZABLE' """The driver has the following properties: * Ensures each operation must take place before its completion time. * Any operation invoked subsequently must take place after the invocation and by extension, after the original operation itself. """ SEQUENTIAL = 'SEQUENTIAL' """The driver has the following properties: * Operations can take effect before or after completion – but all operations retain the constraint that operations from any given process must take place in that processes order. """ CAUSAL = 'CAUSAL' """The driver has the following properties: * Does **not** have to enforce the order of every operation from a process, perhaps, only causally related operations must occur in order. """ SERIALIZABLE = 'SERIALIZABLE' """The driver has the following properties: * The history of **all** operations is equivalent to one that took place in some single atomic order but with unknown invocation and completion times - it places no bounds on time or order. """ SAME_VIEW_UNDER_PARTITIONS = 'SAME_VIEW_UNDER_PARTITIONS' """When a client is connected to a server and that server is partitioned from a group of other servers it will (somehow) have the same view of data as a client connected to a server on the other side of the partition (typically this is accomplished by write availability being lost and therefore nothing can change). """ SAME_VIEW_ACROSS_CLIENTS = 'SAME_VIEW_ACROSS_CLIENTS' """A client connected to one server will *always* have the same view every other client will have (no matter what server those other clients are connected to). Typically this is a sacrifice in write availability because before a write can be acknowledged it must be acknowledged by *all* servers in a cluster (so that all clients that are connected to those servers read the exact *same* thing). """
class Hooks(list): def run(self, *args, **kwargs): return list(map(lambda cb: cb(*args, **kwargs), self)) class Event(object): """Base class for events.""" class MemberJoinedGroup(Event): """A member joined a group event.""" def __init__(self, group_id, member_id): self.group_id = group_id self.member_id = member_id class MemberLeftGroup(Event): """A member left a group event.""" def __init__(self, group_id, member_id): self.group_id = group_id self.member_id = member_id class LeaderElected(Event): """A leader as been elected.""" def __init__(self, group_id, member_id): self.group_id = group_id self.member_id = member_id class Heart(object): """Coordination drivers main liveness pump (its heart).""" def __init__(self, driver, thread_cls=threading.Thread, event_cls=threading.Event): self._thread_cls = thread_cls self._dead = event_cls() self._finished = event_cls() self._finished.set() self._runner = None self._driver = driver self._beats = 0 @property def beats(self): """How many times the heart has beaten.""" return self._beats def is_alive(self): """Returns if the heart is beating.""" return not (self._runner is None or not self._runner.is_alive() or self._finished.is_set()) @excutils.forever_retry_uncaught_exceptions def _beat_forever_until_stopped(self): """Inner beating loop.""" try: while not self._dead.is_set(): with timeutils.StopWatch() as w: wait_until_next_beat = self._driver.heartbeat() ran_for = w.elapsed() if ran_for > wait_until_next_beat: LOG.warning( "Heartbeating took too long to execute (it ran for" " %0.2f seconds which is %0.2f seconds longer than" " the next heartbeat idle time). This may cause" " timeouts (in locks, leadership, ...) to" " happen (which will not end well).", ran_for, ran_for - wait_until_next_beat) self._beats += 1 # NOTE(harlowja): use the event object for waiting and # not a sleep function since doing that will allow this code # to terminate early if stopped via the stop() method vs # having to wait until the sleep function returns. self._dead.wait(wait_until_next_beat) finally: self._finished.set() def start(self, thread_cls=None): """Starts the heart beating thread (noop if already started).""" if not self.is_alive(): self._finished.clear() self._dead.clear() self._beats = 0 if thread_cls is None: thread_cls = self._thread_cls self._runner = thread_cls(target=self._beat_forever_until_stopped) self._runner.daemon = True self._runner.start() def stop(self): """Requests the heart beating thread to stop beating.""" self._dead.set() def wait(self, timeout=None): """Wait up to given timeout for the heart beating thread to stop.""" self._finished.wait(timeout) return self._finished.is_set() @six.add_metaclass(abc.ABCMeta)
[docs]class CoordinationDriver(object): requires_beating = False """ Usage requirement that if true requires that the :py:meth:`~.heartbeat` be called periodically (at a given rate) to avoid locks, sessions and other from being automatically closed/discarded by the coordinators backing store. """ CHARACTERISTICS = () """ Tuple of :py:class:`~tooz.coordination.Characteristics` introspectable enum member(s) that can be used to interogate how this driver works. """ def __init__(self): self._started = False self._hooks_join_group = collections.defaultdict(Hooks) self._hooks_leave_group = collections.defaultdict(Hooks) self._hooks_elected_leader = collections.defaultdict(Hooks) # A cache for group members self._group_members = collections.defaultdict(set) self.requires_beating = ( CoordinationDriver.heartbeat != self.__class__.heartbeat ) self.heart = Heart(self) def _has_hooks_for_group(self, group_id): return (len(self._hooks_join_group[group_id]) + len(self._hooks_leave_group[group_id])) @staticmethod
[docs] def run_watchers(timeout=None): """Run the watchers callback. This may also activate :py:meth:`.run_elect_coordinator` (depending on driver implementation). """ raise tooz.NotImplemented
@staticmethod
[docs] def run_elect_coordinator(): """Try to leader elect this coordinator & activate hooks on success.""" raise tooz.NotImplemented
@abc.abstractmethod
[docs] def watch_join_group(self, group_id, callback): """Call a function when group_id sees a new member joined. The callback functions will be executed when `run_watchers` is called. :param group_id: The group id to watch :param callback: The function to execute when a member joins this group """ self._hooks_join_group[group_id].append(callback)
@abc.abstractmethod
[docs] def unwatch_join_group(self, group_id, callback): """Stop executing a function when a group_id sees a new member joined. :param group_id: The group id to unwatch :param callback: The function that was executed when a member joined this group """ try: self._hooks_join_group[group_id].remove(callback) except ValueError: raise WatchCallbackNotFound(group_id, callback) if (not self._has_hooks_for_group(group_id) and group_id in self._group_members): del self._group_members[group_id]
@abc.abstractmethod
[docs] def watch_leave_group(self, group_id, callback): """Call a function when group_id sees a new member leaving. The callback functions will be executed when `run_watchers` is called. :param group_id: The group id to watch :param callback: The function to execute when a member leaves this group """ self._hooks_leave_group[group_id].append(callback)
@abc.abstractmethod
[docs] def unwatch_leave_group(self, group_id, callback): """Stop executing a function when a group_id sees a new member leaving. :param group_id: The group id to unwatch :param callback: The function that was executed when a member left this group """ try: self._hooks_leave_group[group_id].remove(callback) except ValueError: raise WatchCallbackNotFound(group_id, callback) if (not self._has_hooks_for_group(group_id) and group_id in self._group_members): del self._group_members[group_id] if not self._hooks_leave_group[group_id]: del self._hooks_leave_group[group_id]
@abc.abstractmethod
[docs] def watch_elected_as_leader(self, group_id, callback): """Call a function when member gets elected as leader. The callback functions will be executed when `run_watchers` is called. :param group_id: The group id to watch :param callback: The function to execute when a member leaves this group """ self._hooks_elected_leader[group_id].append(callback)
@abc.abstractmethod
[docs] def unwatch_elected_as_leader(self, group_id, callback): """Call a function when member gets elected as leader. The callback functions will be executed when `run_watchers` is called. :param group_id: The group id to watch :param callback: The function to execute when a member leaves this group """ try: self._hooks_elected_leader[group_id].remove(callback) except ValueError: raise WatchCallbackNotFound(group_id, callback) if not self._hooks_elected_leader[group_id]: del self._hooks_elected_leader[group_id]
@staticmethod
[docs] def stand_down_group_leader(group_id): """Stand down as the group leader if we are. :param group_id: The group where we don't want to be a leader anymore """ raise tooz.NotImplemented
@property def is_started(self): return self._started
[docs] def start(self, start_heart=False): """Start the service engine. If needed, the establishment of a connection to the servers is initiated. """ if self._started: raise ToozError( "Can not start a driver which has not been stopped") self._start() if self.requires_beating and start_heart: self.heart.start() self._started = True # Tracks which group are joined self._joined_groups = set()
def _start(self): pass
[docs] def stop(self): """Stop the service engine. If needed, the connection to servers is closed and the client will disappear from all joined groups. """ if not self._started: raise ToozError("Can not stop a driver which has not been started") if self.heart.is_alive(): self.heart.stop() self.heart.wait() leaving = [self.leave_group(group) for group in self._joined_groups] for leave in leaving: try: leave.get() except ToozError: # Whatever happens, ignore. Maybe we got booted out/never # existed in the first place, or something is down, but we just # want to call _stop after whatever happens to not leak any # connection. pass self._stop() self._started = False
def _stop(self): pass @staticmethod
[docs] def create_group(group_id): """Request the creation of a group asynchronously. :param group_id: the id of the group to create :type group_id: str :returns: None :rtype: CoordAsyncResult """ raise tooz.NotImplemented
@staticmethod
[docs] def get_groups(): """Return the list composed by all groups ids asynchronously. :returns: the list of all created group ids :rtype: CoordAsyncResult """ raise tooz.NotImplemented
@staticmethod
[docs] def join_group(group_id, capabilities=b""): """Join a group and establish group membership asynchronously. :param group_id: the id of the group to join :type group_id: str :param capabilities: the capabilities of the joined member :type capabilities: object (typically str) :returns: None :rtype: CoordAsyncResult """ raise tooz.NotImplemented
@_retry.retry()
[docs] def join_group_create(self, group_id): """Join a group and create it if necessary. If the group cannot be joined because it does not exist, it is created before being joined. This function will keep retrying until it can create the group and join it. Since nothing is transactional, it may have to retry several times if another member is creating/deleting the group at the same time. :param group_id: Identifier of the group to join and create """ req = self.join_group(group_id) try: req.get() except GroupNotCreated: req = self.create_group(group_id) try: req.get() except GroupAlreadyExist: # The group might have been created in the meantime, ignore pass # Now retry to join the group raise _retry.TryAgain
@staticmethod
[docs] def leave_group(group_id): """Leave a group asynchronously. :param group_id: the id of the group to leave :type group_id: str :returns: None :rtype: CoordAsyncResult """ raise tooz.NotImplemented
@staticmethod
[docs] def delete_group(group_id): """Delete a group asynchronously. :param group_id: the id of the group to leave :type group_id: str :returns: Result :rtype: CoordAsyncResult """ raise tooz.NotImplemented
@staticmethod
[docs] def get_members(group_id): """Return the set of all members ids of the specified group. :returns: set of all created group ids :rtype: CoordAsyncResult """ raise tooz.NotImplemented
@staticmethod
[docs] def get_member_capabilities(group_id, member_id): """Return the capabilities of a member asynchronously. :param group_id: the id of the group of the member :type group_id: str :param member_id: the id of the member :type member_id: str :returns: capabilities of a member :rtype: CoordAsyncResult """ raise tooz.NotImplemented
@staticmethod
[docs] def get_member_info(group_id, member_id): """Return the statistics and capabilities of a member asynchronously. :param group_id: the id of the group of the member :type group_id: str :param member_id: the id of the member :type member_id: str :returns: capabilities and statistics of a member :rtype: CoordAsyncResult """ raise tooz.NotImplemented
@staticmethod
[docs] def update_capabilities(group_id, capabilities): """Update member capabilities in the specified group. :param group_id: the id of the group of the current member :type group_id: str :param capabilities: the capabilities of the updated member :type capabilities: object (typically str) :returns: None :rtype: CoordAsyncResult """ raise tooz.NotImplemented
@staticmethod
[docs] def get_leader(group_id): """Return the leader for a group. :param group_id: the id of the group: :returns: the leader :rtype: CoordAsyncResult """ raise tooz.NotImplemented
@staticmethod
[docs] def get_lock(name): """Return a distributed lock. This is a exclusive lock, a second call to acquire() will block or return False. :param name: The lock name that is used to identify it across all nodes. """ raise tooz.NotImplemented
@staticmethod
[docs] def heartbeat(): """Update member status to indicate it is still alive. Method to run once in a while to be sure that the member is not dead and is still an active member of a group. :return: The number of seconds to wait before sending a new heartbeat. """ pass
@six.add_metaclass(abc.ABCMeta) class CoordAsyncResult(object): """Representation of an asynchronous task. Every call API returns an CoordAsyncResult object on which the result or the status of the task can be requested. """ @abc.abstractmethod def get(self, timeout=10): """Retrieve the result of the corresponding asynchronous call. :param timeout: block until the timeout expire. :type timeout: float """ @abc.abstractmethod def done(self): """Returns True if the task is done, False otherwise.""" class _RunWatchersMixin(object): """Mixin to share the *mostly* common ``run_watchers`` implementation.""" def run_watchers(self, timeout=None): with timeutils.StopWatch(duration=timeout) as w: known_groups = self.get_groups().get( timeout=w.leftover(return_none=True)) result = [] for group_id in known_groups: try: group_members_fut = self.get_members(group_id) group_members = group_members_fut.get( timeout=w.leftover(return_none=True)) except GroupNotCreated: group_members = set() if (group_id in self._joined_groups and self._member_id not in group_members): self._joined_groups.discard(group_id) old_group_members = self._group_members.get(group_id, set()) for member_id in (group_members - old_group_members): result.extend( self._hooks_join_group[group_id].run( MemberJoinedGroup(group_id, member_id))) for member_id in (old_group_members - group_members): result.extend( self._hooks_leave_group[group_id].run( MemberLeftGroup(group_id, member_id))) self._group_members[group_id] = group_members return result def get_coordinator(backend_url, member_id, characteristics=frozenset(), **kwargs): """Initialize and load the backend. :param backend_url: the backend URL to use :type backend: str :param member_id: the id of the member :type member_id: str :param characteristics: set :type characteristics: set of :py:class:`.Characteristics` that will be matched to the requested driver (this **will** become a **required** parameter in a future tooz version) :param kwargs: additional coordinator options (these take precedence over options of the **same** name found in the ``backend_url`` arguments query string) """ parsed_url = netutils.urlsplit(backend_url) parsed_qs = six.moves.urllib.parse.parse_qs(parsed_url.query) if kwargs: options = {} for (k, v) in six.iteritems(kwargs): options[k] = [v] for (k, v) in six.iteritems(parsed_qs): if k not in options: options[k] = v else: options = parsed_qs d = driver.DriverManager( namespace=TOOZ_BACKENDS_NAMESPACE, name=parsed_url.scheme, invoke_on_load=True, invoke_args=(member_id, parsed_url, options)).driver characteristics = set(characteristics) driver_characteristics = set(getattr(d, 'CHARACTERISTICS', set())) missing_characteristics = characteristics - driver_characteristics if missing_characteristics: raise ToozDriverChosenPoorly("Desired characteristics %s" " is not a strict subset of driver" " characteristics %s, %s" " characteristics were not found" % (characteristics, driver_characteristics, missing_characteristics)) return d
[docs]class ToozError(Exception): """Exception raised when an internal error occurs. Raised for instance in case of server internal error. :ivar cause: the cause of the exception being raised, when not none this will itself be an exception instance, this is useful for creating a chain of exceptions for versions of python where this is not yet implemented/supported natively. """ def __init__(self, message, cause=None): super(ToozError, self).__init__(message) self.cause = cause
class ToozDriverChosenPoorly(ToozError): """Raised when a driver does not match desired characteristics."""
[docs]class ToozConnectionError(ToozError): """Exception raised when the client cannot connect to the server."""
[docs]class OperationTimedOut(ToozError): """Exception raised when an operation times out."""
class LockAcquireFailed(ToozError): """Exception raised when a lock acquire fails in a context manager."""
[docs]class GroupNotCreated(ToozError): """Exception raised when the caller request an nonexistent group.""" def __init__(self, group_id): self.group_id = group_id super(GroupNotCreated, self).__init__( "Group %s does not exist" % group_id)
[docs]class GroupAlreadyExist(ToozError): """Exception raised trying to create an already existing group.""" def __init__(self, group_id): self.group_id = group_id super(GroupAlreadyExist, self).__init__( "Group %s already exists" % group_id)
[docs]class MemberAlreadyExist(ToozError): """Exception raised trying to join a group already joined.""" def __init__(self, group_id, member_id): self.group_id = group_id self.member_id = member_id super(MemberAlreadyExist, self).__init__( "Member %s has already joined %s" % (member_id, group_id))
[docs]class MemberNotJoined(ToozError): """Exception raised trying to access a member not in a group.""" def __init__(self, group_id, member_id): self.group_id = group_id self.member_id = member_id super(MemberNotJoined, self).__init__("Member %s has not joined %s" % (member_id, group_id))
[docs]class GroupNotEmpty(ToozError): "Exception raised when the caller try to delete a group with members." def __init__(self, group_id): self.group_id = group_id super(GroupNotEmpty, self).__init__("Group %s is not empty" % group_id)
class WatchCallbackNotFound(ToozError): """Exception raised when unwatching a group. Raised when the caller tries to unwatch a group with a callback that does not exist. """ def __init__(self, group_id, callback): self.group_id = group_id self.callback = callback super(WatchCallbackNotFound, self).__init__( 'Callback %s is not registered on group %s' % (callback.__name__, group_id)) class SerializationError(ToozError): "Exception raised when serialization or deserialization breaks."
[docs]def raise_with_cause(exc_cls, message, *args, **kwargs): """Helper to raise + chain exceptions (when able) and associate a *cause*. **For internal usage only.** NOTE(harlowja): Since in py3.x exceptions can be chained (due to :pep:`3134`) we should try to raise the desired exception with the given *cause*. :param exc_cls: the :py:class:`~tooz.coordination.ToozError` class to raise. :param message: the text/str message that will be passed to the exceptions constructor as its first positional argument. :param args: any additional positional arguments to pass to the exceptions constructor. :param kwargs: any additional keyword arguments to pass to the exceptions constructor. """ if not issubclass(exc_cls, ToozError): raise ValueError("Subclass of tooz error is required") excutils.raise_with_cause(exc_cls, message, *args, **kwargs)

Project Source