# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import os
import queue
import threading
import uuid
import requests
from etcd3gw import exceptions
from etcd3gw.lease import Lease
from etcd3gw.lock import Lock
from etcd3gw.utils import _decode
from etcd3gw.utils import _encode
from etcd3gw.utils import _increment_last_byte
from etcd3gw.utils import DEFAULT_TIMEOUT
from etcd3gw import watch
_SORT_ORDER = ['none', 'ascend', 'descend']
_SORT_TARGET = ['key', 'version', 'create', 'mod', 'value']
_EXCEPTIONS_BY_CODE = {
requests.codes['internal_server_error']: exceptions.InternalServerError,
requests.codes['service_unavailable']: exceptions.ConnectionFailedError,
requests.codes['request_timeout']: exceptions.ConnectionTimeoutError,
requests.codes['gateway_timeout']: exceptions.ConnectionTimeoutError,
requests.codes['precondition_failed']: exceptions.PreconditionFailedError,
}
DEFAULT_API_PATH = os.getenv('ETCD3GW_API_PATH')
[docs]
class Etcd3Client(object):
def __init__(self, host='localhost', port=2379, protocol="http",
ca_cert=None, cert_key=None, cert_cert=None, timeout=None,
api_path=DEFAULT_API_PATH):
"""Construct an client to talk to etcd3's grpc-gateway's /v3 HTTP API
:param host:
:param port:
:param protocol:
"""
self.host = host
self.port = port
self.protocol = protocol
self.session = requests.Session()
self.timeout = timeout
if ca_cert is not None:
self.session.verify = ca_cert
if cert_cert is not None and cert_key is not None:
self.session.cert = (cert_cert, cert_key)
self._api_path = api_path
@property
def api_path(self):
if self._api_path is not None:
return self._api_path
self._discover_api_path()
return self._api_path
@property
def base_url(self):
host = ('[' + self.host + ']' if (self.host.find(':') != -1)
else self.host)
return self.protocol + '://' + host + ':' + str(self.port)
def _discover_api_path(self):
"""Discover api version and set api_path
"""
resp = self._request('get', self.base_url + '/version')
try:
version_str = resp['etcdserver']
except KeyError:
raise exceptions.ApiVersionDiscoveryFailedError(
'Malformed response from version API')
try:
version = tuple(int(part) for part in version_str.split('.', 2))
except ValueError:
raise exceptions.ApiVersionDiscoveryFailedError(
'Failed to parse etcd cluster version: %s' % version_str)
# NOTE(tkajinam): https://etcd.io/docs/v3.5/dev-guide/api_grpc_gateway/
# explains mapping between etcd version and available
# api versions
if version >= (3, 4):
self._api_path = '/v3/'
elif version >= (3, 3):
self._api_path = '/v3beta/'
else:
self._api_path = '/v3alpha/'
[docs]
def get_url(self, path):
"""Construct a full url to the v3 API given a specific path
:param path:
:return: url
"""
return self.base_url + self.api_path + path.lstrip("/")
def _request(self, method, *args, **kwargs):
"""helper method for HTTP requests
:param args:
:param kwargs:
:return: json response
"""
try:
resp = getattr(self.session, method)(*args, timeout=self.timeout,
**kwargs)
if resp.status_code in _EXCEPTIONS_BY_CODE:
raise _EXCEPTIONS_BY_CODE[resp.status_code](
resp.text,
resp.reason
)
if resp.status_code != requests.codes['ok']:
raise exceptions.Etcd3Exception(resp.text, resp.reason)
except requests.exceptions.Timeout as ex:
raise exceptions.ConnectionTimeoutError(str(ex))
except requests.exceptions.ConnectionError as ex:
raise exceptions.ConnectionFailedError(str(ex))
return resp.json()
[docs]
def post(self, *args, **kwargs):
"""helper method for HTTP POST
:param args:
:param kwargs:
:return: json response
"""
return self._request('post', *args, **kwargs)
[docs]
def status(self):
"""Status gets the status of the etcd cluster member.
:return: json response
"""
return self.post(self.get_url("/maintenance/status"),
json={})
[docs]
def members(self):
"""Lists all the members in the cluster.
:return: json response
"""
result = self.post(self.get_url("/cluster/member/list"),
json={})
return result['members']
[docs]
def lease(self, ttl=DEFAULT_TIMEOUT):
"""Create a Lease object given a timeout
:param ttl: timeout
:return: Lease object
"""
result = self.post(self.get_url("/lease/grant"),
json={"TTL": ttl, "ID": 0})
return Lease(int(result['ID']), client=self)
[docs]
def lock(self, id=None, ttl=DEFAULT_TIMEOUT):
"""Create a Lock object given an ID and timeout
:param id: ID for the lock, creates a new uuid if not provided
:param ttl: timeout
:return: Lock object
"""
if id is None:
id = str(uuid.uuid4())
return Lock(id, ttl=ttl, client=self)
[docs]
def create(self, key, value, lease=None):
"""Atomically create the given key only if the key doesn't exist.
This verifies that the create_revision of a key equales to 0, then
creates the key with the value.
This operation takes place in a transaction.
:param key: key in etcd to create
:param value: value of the key
:type value: bytes or string
:param lease: lease to connect with, optional
:returns: status of transaction, ``True`` if the create was
successful, ``False`` otherwise
:rtype: bool
"""
base64_key = _encode(key)
base64_value = _encode(value)
txn = {
'compare': [{
'key': base64_key,
'result': 'EQUAL',
'target': 'CREATE',
'create_revision': 0
}],
'success': [{
'request_put': {
'key': base64_key,
'value': base64_value,
}
}],
'failure': []
}
if lease:
txn['success'][0]['request_put']['lease'] = lease.id
result = self.transaction(txn)
if 'succeeded' in result:
return result['succeeded']
return False
[docs]
def put(self, key, value, lease=None):
"""Put puts the given key into the key-value store.
A put request increments the revision of the key-value store
and generates one event in the event history.
:param key:
:param value:
:param lease:
:return: boolean
"""
payload = {
"key": _encode(key),
"value": _encode(value)
}
if lease:
payload['lease'] = lease.id
self.post(self.get_url("/kv/put"), json=payload)
return True
[docs]
def get(self, key, metadata=False, sort_order=None,
sort_target=None, **kwargs):
"""Range gets the keys in the range from the key-value store.
:param key:
:param metadata:
:param sort_order: 'ascend' or 'descend' or None
:param sort_target: 'key' or 'version' or 'create' or 'mod' or 'value'
:param kwargs:
:return:
"""
try:
order = 0
if sort_order:
order = _SORT_ORDER.index(sort_order)
except ValueError:
raise ValueError('sort_order must be one of "ascend" or "descend"')
try:
target = 0
if sort_target:
target = _SORT_TARGET.index(sort_target)
except ValueError:
raise ValueError('sort_target must be one of "key", '
'"version", "create", "mod" or "value"')
payload = {
"key": _encode(key),
"sort_order": order,
"sort_target": target,
}
payload.update(kwargs)
result = self.post(self.get_url("/kv/range"),
json=payload)
if 'kvs' not in result:
return []
if metadata:
def value_with_metadata(item):
item['key'] = _decode(item['key'])
value = _decode(item.pop('value', ''))
return value, item
return [value_with_metadata(item) for item in result['kvs']]
return [_decode(item.get('value', '')) for item in result['kvs']]
[docs]
def get_all(self, sort_order=None, sort_target='key'):
"""Get all keys currently stored in etcd.
:returns: sequence of (value, metadata) tuples
"""
return self.get(
key=_encode(b'\0'),
metadata=True,
sort_order=sort_order,
sort_target=sort_target,
range_end=_encode(b'\0'),
)
[docs]
def get_prefix(self, key_prefix, sort_order=None, sort_target=None):
"""Get a range of keys with a prefix.
:param sort_order: 'ascend' or 'descend' or None
:param key_prefix: first key in range
:returns: sequence of (value, metadata) tuples
"""
return self.get(key_prefix,
metadata=True,
range_end=_encode(_increment_last_byte(key_prefix)),
sort_order=sort_order,
sort_target=sort_target)
[docs]
def replace(self, key, initial_value, new_value):
"""Atomically replace the value of a key with a new value.
This compares the current value of a key, then replaces it with a new
value if it is equal to a specified value. This operation takes place
in a transaction.
:param key: key in etcd to replace
:param initial_value: old value to replace
:type initial_value: bytes or string
:param new_value: new value of the key
:type new_value: bytes or string
:returns: status of transaction, ``True`` if the replace was
successful, ``False`` otherwise
:rtype: bool
"""
base64_key = _encode(key)
base64_initial_value = _encode(initial_value)
base64_new_value = _encode(new_value)
txn = {
'compare': [{
'key': base64_key,
'result': 'EQUAL',
'target': 'VALUE',
'value': base64_initial_value
}],
'success': [{
'request_put': {
'key': base64_key,
'value': base64_new_value,
}
}],
'failure': []
}
result = self.transaction(txn)
if 'succeeded' in result:
return result['succeeded']
return False
[docs]
def delete(self, key, **kwargs):
"""DeleteRange deletes the given range from the key-value store.
A delete request increments the revision of the key-value store and
generates a delete event in the event history for every deleted key.
:param key:
:param kwargs:
:return:
"""
payload = {
"key": _encode(key),
}
payload.update(kwargs)
result = self.post(self.get_url("/kv/deleterange"),
json=payload)
if 'deleted' in result:
return True
return False
[docs]
def delete_prefix(self, key_prefix):
"""Delete a range of keys with a prefix in etcd."""
return self.delete(
key_prefix, range_end=_encode(_increment_last_byte(key_prefix)))
[docs]
def transaction(self, txn):
"""Txn processes multiple requests in a single transaction.
A txn request increments the revision of the key-value store and
generates events with the same revision for every completed request.
It is not allowed to modify the same key several times within one txn.
:param txn:
:return:
"""
return self.post(self.get_url("/kv/txn"),
data=json.dumps(txn))
[docs]
def watch(self, key, **kwargs):
"""Watch a key.
:param key: key to watch
:returns: tuple of ``events_iterator`` and ``cancel``.
Use ``events_iterator`` to get the events of key changes
and ``cancel`` to cancel the watch request
"""
event_queue = queue.Queue()
def callback(event):
event_queue.put(event)
w = watch.Watcher(self, key, callback, **kwargs)
canceled = threading.Event()
def cancel():
canceled.set()
event_queue.put(None)
w.stop()
def iterator():
while not canceled.is_set():
event = event_queue.get()
if event is None:
canceled.set()
if not canceled.is_set():
yield event
return iterator(), cancel
[docs]
def watch_prefix(self, key_prefix, **kwargs):
"""The same as ``watch``, but watches a range of keys with a prefix."""
kwargs['range_end'] = \
_increment_last_byte(key_prefix)
return self.watch(key_prefix, **kwargs)
[docs]
def watch_once(self, key, timeout=None, **kwargs):
"""Watch a key and stops after the first event.
:param key: key to watch
:param timeout: (optional) timeout in seconds.
:returns: event
"""
event_queue = queue.Queue()
def callback(event):
event_queue.put(event)
w = watch.Watcher(self, key, callback, **kwargs)
try:
return event_queue.get(timeout=timeout)
except queue.Empty:
raise exceptions.WatchTimedOut()
finally:
w.stop()
[docs]
def watch_prefix_once(self, key_prefix, timeout=None, **kwargs):
"""Watches a range of keys with a prefix, similar to watch_once"""
kwargs['range_end'] = \
_increment_last_byte(key_prefix)
return self.watch_once(key_prefix, timeout=timeout, **kwargs)
[docs]
def client(host='localhost', port=2379,
ca_cert=None, cert_key=None, cert_cert=None,
timeout=None, protocol="http", api_path=DEFAULT_API_PATH):
"""Return an instance of an Etcd3Client."""
return Etcd3Client(host=host,
port=port,
ca_cert=ca_cert,
cert_key=cert_key,
cert_cert=cert_cert,
timeout=timeout,
api_path=api_path,
protocol=protocol)