Source code for tooz.coordination

# -*- coding: utf-8 -*-
#
#    Copyright (C) 2016 Red Hat, Inc.
#    Copyright (C) 2013-2014 eNovance Inc. All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import abc
import collections
import enum
import logging
import threading

from oslo_utils import excutils
from oslo_utils import netutils
from oslo_utils import timeutils
import six
from stevedore import driver

import tooz

LOG = logging.getLogger(__name__)


TOOZ_BACKENDS_NAMESPACE = "tooz.backends"


[docs]class Characteristics(enum.Enum): """Attempts to describe the characteristic that a driver supports.""" DISTRIBUTED_ACROSS_THREADS = 'DISTRIBUTED_ACROSS_THREADS' """Coordinator components when used by multiple **threads** work the same as if those components were only used by a single thread.""" DISTRIBUTED_ACROSS_PROCESSES = 'DISTRIBUTED_ACROSS_PROCESSES' """Coordinator components when used by multiple **processes** work the same as if those components were only used by a single thread.""" DISTRIBUTED_ACROSS_HOSTS = 'DISTRIBUTED_ACROSS_HOSTS' """Coordinator components when used by multiple **hosts** work the same as if those components were only used by a single thread.""" NON_TIMEOUT_BASED = 'NON_TIMEOUT_BASED' """The driver has the following property: * Its operations are not based on the timeout of other clients, but on some other more robust mechanisms. """ LINEARIZABLE = 'LINEARIZABLE' """The driver has the following properties: * Ensures each operation must take place before its completion time. * Any operation invoked subsequently must take place after the invocation and by extension, after the original operation itself. """ SEQUENTIAL = 'SEQUENTIAL' """The driver has the following properties: * Operations can take effect before or after completion – but all operations retain the constraint that operations from any given process must take place in that processes order. """ CAUSAL = 'CAUSAL' """The driver has the following properties: * Does **not** have to enforce the order of every operation from a process, perhaps, only causally related operations must occur in order. """ SERIALIZABLE = 'SERIALIZABLE' """The driver has the following properties: * The history of **all** operations is equivalent to one that took place in some single atomic order but with unknown invocation and completion times - it places no bounds on time or order. """ SAME_VIEW_UNDER_PARTITIONS = 'SAME_VIEW_UNDER_PARTITIONS' """When a client is connected to a server and that server is partitioned from a group of other servers it will (somehow) have the same view of data as a client connected to a server on the other side of the partition (typically this is accomplished by write availability being lost and therefore nothing can change). """ SAME_VIEW_ACROSS_CLIENTS = 'SAME_VIEW_ACROSS_CLIENTS' """A client connected to one server will *always* have the same view every other client will have (no matter what server those other clients are connected to). Typically this is a sacrifice in write availability because before a write can be acknowledged it must be acknowledged by *all* servers in a cluster (so that all clients that are connected to those servers read the exact *same* thing). """
class Hooks(list): def run(self, *args, **kwargs): return list(map(lambda cb: cb(*args, **kwargs), self)) class Event(object): """Base class for events.""" class MemberJoinedGroup(Event): """A member joined a group event.""" def __init__(self, group_id, member_id): self.group_id = group_id self.member_id = member_id class MemberLeftGroup(Event): """A member left a group event.""" def __init__(self, group_id, member_id): self.group_id = group_id self.member_id = member_id class LeaderElected(Event): """A leader as been elected.""" def __init__(self, group_id, member_id): self.group_id = group_id self.member_id = member_id class Heart(object): """Coordination drivers main liveness pump (its heart).""" def __init__(self, driver, thread_cls=threading.Thread, event_cls=threading.Event): self._thread_cls = thread_cls self._dead = event_cls() self._finished = event_cls() self._finished.set() self._runner = None self._driver = driver self._beats = 0 @property def beats(self): """How many times the heart has beaten.""" return self._beats def is_alive(self): """Returns if the heart is beating.""" return not (self._runner is None or not self._runner.is_alive() or self._finished.is_set()) @excutils.forever_retry_uncaught_exceptions def _beat_forever_until_stopped(self): """Inner beating loop.""" try: while not self._dead.is_set(): with timeutils.StopWatch() as w: wait_until_next_beat = self._driver.heartbeat() ran_for = w.elapsed() if ran_for > wait_until_next_beat: LOG.warning( "Heartbeating took too long to execute (it ran for" " %0.2f seconds which is %0.2f seconds longer than" " the next heartbeat idle time). This may cause" " timeouts (in locks, leadership, ...) to" " happen (which will not end well).", ran_for, ran_for - wait_until_next_beat) self._beats += 1 # NOTE(harlowja): use the event object for waiting and # not a sleep function since doing that will allow this code # to terminate early if stopped via the stop() method vs # having to wait until the sleep function returns. self._dead.wait(wait_until_next_beat) finally: self._finished.set() def start(self, thread_cls=None): """Starts the heart beating thread (noop if already started).""" if not self.is_alive(): self._finished.clear() self._dead.clear() self._beats = 0 if thread_cls is None: thread_cls = self._thread_cls self._runner = thread_cls(target=self._beat_forever_until_stopped) self._runner.daemon = True self._runner.start() def stop(self): """Requests the heart beating thread to stop beating.""" self._dead.set() def wait(self, timeout=None): """Wait up to given timeout for the heart beating thread to stop.""" self._finished.wait(timeout) return self._finished.is_set() @six.add_metaclass(abc.ABCMeta)
[docs]class CoordinationDriver(object): requires_beating = False """ Usage requirement that if true requires that the :py:meth:`~.heartbeat` be called periodically (at a given rate) to avoid locks, sessions and other from being automatically closed/discarded by the coordinators backing store. """ CHARACTERISTICS = () """ Tuple of :py:class:`~tooz.coordination.Characteristics` introspectable enum member(s) that can be used to interogate how this driver works. """ def __init__(self): self._started = False self._hooks_join_group = collections.defaultdict(Hooks) self._hooks_leave_group = collections.defaultdict(Hooks) self._hooks_elected_leader = collections.defaultdict(Hooks) # A cache for group members self._group_members = collections.defaultdict(set) self.requires_beating = ( CoordinationDriver.heartbeat != self.__class__.heartbeat ) self.heart = Heart(self) def _has_hooks_for_group(self, group_id): return (len(self._hooks_join_group[group_id]) + len(self._hooks_leave_group[group_id])) @staticmethod
[docs] def run_watchers(timeout=None): """Run the watchers callback. This may also activate :py:meth:`.run_elect_coordinator` (depending on driver implementation). """ raise tooz.NotImplemented
@staticmethod
[docs] def run_elect_coordinator(): """Try to leader elect this coordinator & activate hooks on success.""" raise tooz.NotImplemented
@abc.abstractmethod
[docs] def watch_join_group(self, group_id, callback): """Call a function when group_id sees a new member joined. The callback functions will be executed when `run_watchers` is called. :param group_id: The group id to watch :param callback: The function to execute when a member joins this group """ self._hooks_join_group[group_id].append(callback)
@abc.abstractmethod
[docs] def unwatch_join_group(self, group_id, callback): """Stop executing a function when a group_id sees a new member joined. :param group_id: The group id to unwatch :param callback: The function that was executed when a member joined this group """ try: self._hooks_join_group[group_id].remove(callback) except ValueError: raise WatchCallbackNotFound(group_id, callback) if (not self._has_hooks_for_group(group_id) and group_id in self._group_members): del self._group_members[group_id]
@abc.abstractmethod
[docs] def watch_leave_group(self, group_id, callback): """Call a function when group_id sees a new member leaving. The callback functions will be executed when `run_watchers` is called. :param group_id: The group id to watch :param callback: The function to execute when a member leaves this group """ self._hooks_leave_group[group_id].append(callback)
@abc.abstractmethod
[docs] def unwatch_leave_group(self, group_id, callback): """Stop executing a function when a group_id sees a new member leaving. :param group_id: The group id to unwatch :param callback: The function that was executed when a member left this group """ try: self._hooks_leave_group[group_id].remove(callback) except ValueError: raise WatchCallbackNotFound(group_id, callback) if (not self._has_hooks_for_group(group_id) and group_id in self._group_members): del self._group_members[group_id] if not self._hooks_leave_group[group_id]: del self._hooks_leave_group[group_id]
@abc.abstractmethod
[docs] def watch_elected_as_leader(self, group_id, callback): """Call a function when member gets elected as leader. The callback functions will be executed when `run_watchers` is called. :param group_id: The group id to watch :param callback: The function to execute when a member leaves this group """ self._hooks_elected_leader[group_id].append(callback)
@abc.abstractmethod
[docs] def unwatch_elected_as_leader(self, group_id, callback): """Call a function when member gets elected as leader. The callback functions will be executed when `run_watchers` is called. :param group_id: The group id to watch :param callback: The function to execute when a member leaves this group """ try: self._hooks_elected_leader[group_id].remove(callback) except ValueError: raise WatchCallbackNotFound(group_id, callback) if not self._hooks_elected_leader[group_id]: del self._hooks_elected_leader[group_id]
@staticmethod
[docs] def stand_down_group_leader(group_id): """Stand down as the group leader if we are. :param group_id: The group where we don't want to be a leader anymore """ raise tooz.NotImplemented
@property def is_started(self): return self._started
[docs] def start(self, start_heart=False): """Start the service engine. If needed, the establishment of a connection to the servers is initiated. """ if self._started: raise ToozError( "Can not start a driver which has not been stopped") self._start() if self.requires_beating and start_heart: self.heart.start() self._started = True
def _start(self): pass
[docs] def stop(self): """Stop the service engine. If needed, the connection to servers is closed and the client will disappear from all joined groups. """ if not self._started: raise ToozError("Can not stop a driver which has not been started") if self.heart.is_alive(): self.heart.stop() self.heart.wait() self._stop() self._started = False
def _stop(self): pass @staticmethod
[docs] def create_group(group_id): """Request the creation of a group asynchronously. :param group_id: the id of the group to create :type group_id: str :returns: None :rtype: CoordAsyncResult """ raise tooz.NotImplemented
@staticmethod
[docs] def get_groups(): """Return the list composed by all groups ids asynchronously. :returns: the list of all created group ids :rtype: CoordAsyncResult """ raise tooz.NotImplemented
@staticmethod
[docs] def join_group(group_id, capabilities=b""): """Join a group and establish group membership asynchronously. :param group_id: the id of the group to join :type group_id: str :param capabilities: the capabilities of the joined member :type capabilities: object (typically str) :returns: None :rtype: CoordAsyncResult """ raise tooz.NotImplemented
@staticmethod
[docs] def leave_group(group_id): """Leave a group asynchronously. :param group_id: the id of the group to leave :type group_id: str :returns: None :rtype: CoordAsyncResult """ raise tooz.NotImplemented
@staticmethod
[docs] def delete_group(group_id): """Delete a group asynchronously. :param group_id: the id of the group to leave :type group_id: str :returns: Result :rtype: CoordAsyncResult """ raise tooz.NotImplemented
@staticmethod
[docs] def get_members(group_id): """Return the list of all members ids of the specified group. :returns: list of all created group ids :rtype: CoordAsyncResult """ raise tooz.NotImplemented
@staticmethod
[docs] def get_member_capabilities(group_id, member_id): """Return the capabilities of a member asynchronously. :param group_id: the id of the group of the member :type group_id: str :param member_id: the id of the member :type member_id: str :returns: capabilities of a member :rtype: CoordAsyncResult """ raise tooz.NotImplemented
@staticmethod
[docs] def get_member_info(group_id, member_id): """Return the statistics and capabilities of a member asynchronously. :param group_id: the id of the group of the member :type group_id: str :param member_id: the id of the member :type member_id: str :returns: capabilities and statistics of a member :rtype: CoordAsyncResult """ raise tooz.NotImplemented
@staticmethod
[docs] def update_capabilities(group_id, capabilities): """Update member capabilities in the specified group. :param group_id: the id of the group of the current member :type group_id: str :param capabilities: the capabilities of the updated member :type capabilities: object (typically str) :returns: None :rtype: CoordAsyncResult """ raise tooz.NotImplemented
@staticmethod
[docs] def get_leader(group_id): """Return the leader for a group. :param group_id: the id of the group: :returns: the leader :rtype: CoordAsyncResult """ raise tooz.NotImplemented
@staticmethod
[docs] def get_lock(name): """Return a distributed lock. This is a exclusive lock, a second call to acquire() will block or return False. :param name: The lock name that is used to identify it across all nodes. """ raise tooz.NotImplemented
@staticmethod
[docs] def heartbeat(): """Update member status to indicate it is still alive. Method to run once in a while to be sure that the member is not dead and is still an active member of a group. :return: The number of seconds to wait before sending a new heartbeat. """ pass
@six.add_metaclass(abc.ABCMeta) class CoordAsyncResult(object): """Representation of an asynchronous task. Every call API returns an CoordAsyncResult object on which the result or the status of the task can be requested. """ @abc.abstractmethod def get(self, timeout=10): """Retrieve the result of the corresponding asynchronous call. :param timeout: block until the timeout expire. :type timeout: float """ @abc.abstractmethod def done(self): """Returns True if the task is done, False otherwise.""" class _RunWatchersMixin(object): """Mixin to share the *mostly* common ``run_watchers`` implementation.""" def run_watchers(self, timeout=None): with timeutils.StopWatch(duration=timeout) as w: known_groups = self.get_groups().get( timeout=w.leftover(return_none=True)) result = [] for group_id in known_groups: try: group_members_fut = self.get_members(group_id) group_members = group_members_fut.get( timeout=w.leftover(return_none=True)) except GroupNotCreated: group_members = set() else: group_members = set(group_members) if (group_id in self._joined_groups and self._member_id not in group_members): self._joined_groups.discard(group_id) old_group_members = self._group_members.get(group_id, set()) for member_id in (group_members - old_group_members): result.extend( self._hooks_join_group[group_id].run( MemberJoinedGroup(group_id, member_id))) for member_id in (old_group_members - group_members): result.extend( self._hooks_leave_group[group_id].run( MemberLeftGroup(group_id, member_id))) self._group_members[group_id] = group_members return result def get_coordinator(backend_url, member_id, characteristics=frozenset(), **kwargs): """Initialize and load the backend. :param backend_url: the backend URL to use :type backend: str :param member_id: the id of the member :type member_id: str :param characteristics: set :type characteristics: set of :py:class:`.Characteristics` that will be matched to the requested driver (this **will** become a **required** parameter in a future tooz version) :param kwargs: additional coordinator options (these take precedence over options of the **same** name found in the ``backend_url`` arguments query string) """ parsed_url = netutils.urlsplit(backend_url) parsed_qs = six.moves.urllib.parse.parse_qs(parsed_url.query) if kwargs: options = {} for (k, v) in six.iteritems(kwargs): options[k] = [v] for (k, v) in six.iteritems(parsed_qs): if k not in options: options[k] = v else: options = parsed_qs d = driver.DriverManager( namespace=TOOZ_BACKENDS_NAMESPACE, name=parsed_url.scheme, invoke_on_load=True, invoke_args=(member_id, parsed_url, options)).driver characteristics = set(characteristics) driver_characteristics = set(getattr(d, 'CHARACTERISTICS', set())) missing_characteristics = characteristics - driver_characteristics if missing_characteristics: raise ToozDriverChosenPoorly("Desired characteristics %s" " is not a strict subset of driver" " characteristics %s, %s" " characteristics were not found" % (characteristics, driver_characteristics, missing_characteristics)) return d
[docs]class ToozError(Exception): """Exception raised when an internal error occurs. Raised for instance in case of server internal error. :ivar cause: the cause of the exception being raised, when not none this will itself be an exception instance, this is useful for creating a chain of exceptions for versions of python where this is not yet implemented/supported natively. """ def __init__(self, message, cause=None): super(ToozError, self).__init__(message) self.cause = cause
class ToozDriverChosenPoorly(ToozError): """Raised when a driver does not match desired characteristics."""
[docs]class ToozConnectionError(ToozError): """Exception raised when the client cannot connect to the server."""
[docs]class OperationTimedOut(ToozError): """Exception raised when an operation times out."""
class LockAcquireFailed(ToozError): """Exception raised when a lock acquire fails in a context manager."""
[docs]class GroupNotCreated(ToozError): """Exception raised when the caller request an nonexistent group.""" def __init__(self, group_id): self.group_id = group_id super(GroupNotCreated, self).__init__( "Group %s does not exist" % group_id)
[docs]class GroupAlreadyExist(ToozError): """Exception raised trying to create an already existing group.""" def __init__(self, group_id): self.group_id = group_id super(GroupAlreadyExist, self).__init__( "Group %s already exists" % group_id)
[docs]class MemberAlreadyExist(ToozError): """Exception raised trying to join a group already joined.""" def __init__(self, group_id, member_id): self.group_id = group_id self.member_id = member_id super(MemberAlreadyExist, self).__init__( "Member %s has already joined %s" % (member_id, group_id))
[docs]class MemberNotJoined(ToozError): """Exception raised trying to access a member not in a group.""" def __init__(self, group_id, member_id): self.group_id = group_id self.member_id = member_id super(MemberNotJoined, self).__init__("Member %s has not joined %s" % (member_id, group_id))
[docs]class GroupNotEmpty(ToozError): "Exception raised when the caller try to delete a group with members." def __init__(self, group_id): self.group_id = group_id super(GroupNotEmpty, self).__init__("Group %s is not empty" % group_id)
class WatchCallbackNotFound(ToozError): """Exception raised when unwatching a group. Raised when the caller tries to unwatch a group with a callback that does not exist. """ def __init__(self, group_id, callback): self.group_id = group_id self.callback = callback super(WatchCallbackNotFound, self).__init__( 'Callback %s is not registered on group %s' % (callback.__name__, group_id)) class SerializationError(ToozError): "Exception raised when serialization or deserialization breaks."
[docs]def raise_with_cause(exc_cls, message, *args, **kwargs): """Helper to raise + chain exceptions (when able) and associate a *cause*. **For internal usage only.** NOTE(harlowja): Since in py3.x exceptions can be chained (due to :pep:`3134`) we should try to raise the desired exception with the given *cause*. :param exc_cls: the :py:class:`~tooz.coordination.ToozError` class to raise. :param message: the text/str message that will be passed to the exceptions constructor as its first positional argument. :param args: any additional positional arguments to pass to the exceptions constructor. :param kwargs: any additional keyword arguments to pass to the exceptions constructor. """ if not issubclass(exc_cls, ToozError): raise ValueError("Subclass of tooz error is required") excutils.raise_with_cause(exc_cls, message, *args, **kwargs)

Project Source