The zaqar.storage.base module

Implements the DriverBase abstract class for Zaqar storage drivers.

class Capabilities

Bases: enum.IntEnum

Enum of storage capabilities.

AOD = <Capabilities.AOD: 4>
CLAIMS = <Capabilities.CLAIMS: 2>
DURABILITY = <Capabilities.DURABILITY: 3>
FIFO = <Capabilities.FIFO: 1>
HIGH_THROUGHPUT = <Capabilities.HIGH_THROUGHPUT: 5>
class CatalogueBase(driver)

Bases: zaqar.storage.base.ControllerBase

A controller for managing the catalogue.

The catalogue is responsible for maintaining a mapping between project.queue entries to their pool.

delete(project, queue)

Removes this entry from the catalogue.

Parameters:
  • project (six.text_type) – The namespace to search for this queue
  • queue (six.text_type) – The queue name to remove
drop_all()

Drops all catalogue entries from storage.

exists(project, queue)

Determines whether the given queue exists under project.

Parameters:
  • project (six.text_type) – Namespace to check.
  • queue (six.text_type) – str - Particular queue to check for
Returns:

True if the queue exists under this project

Return type:

bool

get(project, queue)

Returns the pool identifier for the given queue.

Parameters:
  • project (six.text_type) – Namespace to search for the given queue
  • queue (six.text_type) – The name of the queue to search for
Returns:

{‘pool’: ...}

Return type:

dict

Raises:

QueueNotMapped

insert(project, queue, pool)

Creates a new catalogue entry, or updates it if it already exists.

Parameters:
  • project (six.text_type) – str - Namespace to insert the given queue into
  • queue (six.text_type) – str - The name of the queue to insert
  • pool (six.text_type) – pool identifier to associate this queue with
list(project)

Get a list of queues from the catalogue.

Parameters:project (six.text_type) – The project to use when filtering through queue entries.
Returns:[{‘project’: ..., ‘queue’: ..., ‘pool’: ...},]
Return type:[dict]
update(project, queue, pools=None)

Updates the pool identifier for this queue.

Parameters:
  • project (six.text_type) – Namespace to search
  • queue (six.text_type) – The name of the queue
  • pools (six.text_type) – The name of the pool where this project/queue lives.
Raises:

QueueNotMapped

class Claim(driver)

Bases: zaqar.storage.base.ControllerBase

create(queue, metadata, project=None, limit=10)

Base method for creating a claim.

Parameters:
  • queue – Name of the queue this claim belongs to.
  • metadata – Claim’s parameters to be stored.
  • project – Project id
  • limit – (Default 10) Max number of messages to claim.
Returns:

(Claim ID, claimed messages)

delete(queue, claim_id, project=None)

Base method for deleting a claim.

Parameters:
  • queue – Name of the queue this claim belongs to.
  • claim_id – Claim to be deleted
  • project – Project id
get(queue, claim_id, project=None)

Base method for getting a claim.

Parameters:
  • queue – Name of the queue this claim belongs to.
  • claim_id – The claim id
  • project – Project id
Returns:

(Claim’s metadata, claimed messages)

Raises:

DoesNotExist

update(queue, claim_id, metadata, project=None)

Base method for updating a claim.

Parameters:
  • queue – Name of the queue this claim belongs to.
  • claim_id – Claim to be updated
  • metadata – Claim’s parameters to be updated.
  • project – Project id
class ControlDriverBase(conf, cache)

Bases: zaqar.storage.base.DriverBase

Interface definition for control plane storage drivers.

Storage drivers that work at the control plane layer allow one to modify aspects of the functionality of the system. This is ideal for administrative purposes.

Allows access to the pool registry through a catalogue and a pool controller.

Parameters:
  • conf (oslo_config.ConfigOpts) – Configuration containing options for this driver.
  • cache (dogpile.cache.region.CacheRegion) – Cache instance to use for reducing latency for certain lookups.
catalogue_controller

Returns the driver’s catalogue controller.

close()

Close connections to the backend.

flavors_controller

Returns storage’s flavor management controller.

pools_controller

Returns storage’s pool management controller.

queue_controller

Returns the driver’s queue controller.

class ControllerBase(driver)

Bases: object

Top-level class for controllers.

Parameters:driver – Instance of the driver instantiating this controller.
class DataDriverBase(conf, cache, control_driver)

Bases: zaqar.storage.base.DriverBase

Interface definition for storage drivers.

Data plane storage drivers are responsible for implementing the core functionality of the system.

Connection information and driver-specific options are loaded from the config file or the pool catalog.

Parameters:
  • conf (oslo_config.ConfigOpts) – Configuration containing options for this driver.
  • cache (dogpile.cache.region.CacheRegion) – Cache instance to use for reducing latency for certain lookups.
BASE_CAPABILITIES = []
capabilities

Returns storage’s capabilities.

claim_controller

Returns the driver’s claim controller.

close()

Close connections to the backend.

gc()

Perform manual garbage collection of claims and messages.

This method can be overridden in order to provide a trigger that can be called by so-called “garbage collection” scripts that are required by some drivers.

By default, this method does nothing.

health()

Return the health status of service.

is_alive()

Check whether the storage is ready.

message_controller

Returns the driver’s message controller.

queue_controller
subscription_controller

Returns the driver’s subscription controller.

class DriverBase(conf, cache)

Bases: object

Base class for both data and control plane drivers

Parameters:
  • conf (oslo_config.ConfigOpts) – Configuration containing options for this driver.
  • cache (dogpile.cache.region.CacheRegion) – Cache instance to use for reducing latency for certain lookups.
class FlavorsBase(driver)

Bases: zaqar.storage.base.ControllerBase

A controller for managing flavors.

create(name, pool, project=None, capabilities=None)

Registers a flavor entry.

Parameters:
  • name (six.text_type) – The name of this flavor
  • project (six.text_type) – Project this flavor belongs to.
  • pool (six.text_type) – The name of the pool to use for this flavor.
  • capabilities (dict) – Flavor capabilities
delete(name, project=None)

Removes a flavor entry.

Parameters:
  • name (six.text_type) – The name of this flavor
  • project (six.text_type) – Project this flavor belongs to.
Return type:

None

drop_all()

Deletes all flavors from storage.

exists(name, project=None)

Verifies whether the flavor exists.

Parameters:
  • name (six.text_type) – The name of this flavor
  • project (six.text_type) – Project this flavor belongs to.
Returns:

True if the flavor exists

Return type:

bool

get(name, project=None)

Returns a single flavor entry.

Parameters:
  • name (six.text_type) – The name of this flavor
  • project (six.text_type) – Project this flavor belongs to.
Return type:

{}

Raises:

FlavorDoesNotExist if not found

list(project=None, marker=None, limit=10)

Lists all registered flavors.

Parameters:
  • project (six.text_type) – Project this flavor belongs to.
  • marker (six.text_type) – used to determine which flavor to start with
  • limit (int) – (Default 10) Max number of results to return
Returns:

A list of flavors - name, project, flavor

Return type:

[{}]

update(name, project=None, **kwargs)

Updates the flavor and/or capabilities of this flavor

Parameters:
  • name (text) – Name of the flavor
  • project (six.text_type) – Project this flavor belongs to.
  • kwargs (dict) – one of: uri, weight, options
Raises:

FlavorDoesNotExist

class Message(driver)

Bases: zaqar.storage.base.ControllerBase

This class is responsible for managing message CRUD.

bulk_delete(queue, message_ids, project=None)

Base method for deleting multiple messages.

Parameters:
  • queue – Name of the queue to post message to.
  • message_ids – A sequence of message IDs to be deleted.
  • project – Project id
bulk_get(queue, message_ids, project=None)

Base method for getting multiple messages.

Parameters:
  • queue – Name of the queue to get the message from.
  • project – Project id
  • message_ids – A sequence of message IDs.
Returns:

An iterable, yielding dicts containing message details

delete(queue, message_id, project=None, claim=None)

Base method for deleting a single message.

Parameters:
  • queue – Name of the queue to post message to.
  • message_id – Message to be deleted
  • project – Project id
  • claim – Claim this message belongs to. When specified, claim must be valid and message_id must belong to it.
first(queue, project=None, sort=1)

Get first message in the queue (including claimed).

Parameters:
  • queue – Name of the queue to list
  • sort – (Default 1) Sort order for the listing. Pass 1 for ascending (oldest message first), or -1 for descending (newest message first).
Returns:

First message in the queue, or None if the queue is empty

get(queue, message_id, project=None)

Base method for getting a message.

Parameters:
  • queue – Name of the queue to get the message from.
  • project – Project id
  • message_id – Message ID
Returns:

Dictionary containing message data

Raises:

DoesNotExist

list(queue, project=None, marker=None, limit=10, echo=False, client_uuid=None, include_claimed=False)

Base method for listing messages.

Parameters:
  • queue – Name of the queue to get the message from.
  • project – Project id
  • marker – Tail identifier
  • limit (Maybe int) – (Default 10) Max number of messages to return.
  • echo – (Default False) Boolean expressing whether or not this client should receive its own messages.
  • client_uuid – A UUID object. Required when echo=False.
  • include_claimed (bool) – omit claimed messages from listing?
Returns:

An iterator giving a sequence of messages and the marker of the next page.

pop(queue, limit, project=None)

Base method for popping messages.

Parameters:
  • queue – Name of the queue to pop message from.
  • limit – Number of messages to pop.
  • project – Project id
post(queue, messages, client_uuid, project=None)

Base method for posting one or more messages.

Implementations of this method should guarantee and preserve the order, in the returned list, of incoming messages.

Parameters:
  • queue – Name of the queue to post message to.
  • messages – Messages to post to queue, an iterable yielding 1 or more elements. An empty iterable results in undefined behavior.
  • client_uuid – A UUID object.
  • project – Project id
Returns:

List of message ids

class PoolsBase(driver)

Bases: zaqar.storage.base.ControllerBase

A controller for managing pools.

capabilities(group=None, name=None)

Gets the set of capabilities for this group/name

Parameters:
  • group (six.text_type) – The pool group to get capabilities for
  • name (six.text_type) – The pool name to get capabilities for
create(name, weight, uri, group=None, options=None)

Registers a pool entry.

Parameters:
  • name (six.text_type) – The name of this pool
  • weight (int) – the likelihood that this pool will be used
  • uri (six.text_type) – A URI that can be used by a storage client (e.g., pymongo) to access this pool.
  • group (six.text_type) – The group of this pool
  • options (dict) – Options used to configure this pool
delete(name)

Removes a pool entry.

Parameters:name (six.text_type) – The name of this pool
Return type:None
drop_all()

Deletes all pools from storage.

exists(name)

Returns a single pool entry.

Parameters:name (six.text_type) – The name of this pool
Returns:True if the pool exists
Return type:bool
get(name, detailed=False)

Returns a single pool entry.

Parameters:
  • name (six.text_type) – The name of this pool
  • detailed (bool) – Should the options data be included?
Returns:

weight, uri, and options for this pool

Return type:

{}

Raises:

PoolDoesNotExist if not found

get_pools_by_group(group=None, detailed=False)

Returns a pool list filtered by given pool group.

Parameters:
  • group (six.text_type) – The group to filter on. None returns pools that are not assigned to any pool group.
  • detailed (bool) – Should the options data be included?
Returns:

weight, uri, and options for this pool

Return type:

{}

Raises:

PoolDoesNotExist if not found

list(marker=None, limit=10, detailed=False)

Lists all registered pools.

Parameters:
  • marker (six.text_type) – used to determine which pool to start with
  • limit (int) – (Default 10) Max number of results to return
  • detailed (bool) – whether to include options
Returns:

A list of pools - name, weight, uri

Return type:

[{}]

update(name, **kwargs)

Updates the weight, uris, and/or options of this pool

Parameters:
  • name (text) – Name of the pool
  • kwargs (dict) – one of: uri, weight, options
Raises:

PoolDoesNotExist

class Queue(driver)

Bases: zaqar.storage.base.ControllerBase

This class is responsible for managing queues.

Queue operations include CRUD, monitoring, etc.

Storage driver implementations of this class should be capable of handling high workloads and huge numbers of queues.

create(name, metadata=None, project=None)

Base method for queue creation.

Parameters:
  • name – The queue name
  • project – Project id
Returns:

True if a queue was created and False if it was updated.

delete(name, project=None)

Base method for deleting a queue.

Parameters:
  • name – The queue name
  • project – Project id
exists(name, project=None)

Base method for testing queue existence.

Parameters:
  • name – The queue name
  • project – Project id
Returns:

True if a queue exists and False if it does not.

get(name, project=None)

Base method for queue metadata retrieval.

Parameters:
  • name – The queue name
  • project – Project id
Returns:

Dictionary containing queue metadata

Raises:

DoesNotExist

get_metadata(name, project=None)

Base method for queue metadata retrieval.

Parameters:
  • name – The queue name
  • project – Project id
Returns:

Dictionary containing queue metadata

Raises:

DoesNotExist

list(project=None, marker=None, limit=10, detailed=False)

Base method for listing queues.

Parameters:
  • project – Project id
  • marker – The last queue name
  • limit – (Default 10) Max number of queues to return
  • detailed – Whether metadata is included
Returns:

An iterator giving a sequence of queues and the marker of the next page.

set_metadata(name, metadata, project=None)

Base method for updating a queue metadata.

Parameters:
  • name – The queue name
  • metadata – Queue metadata as a dict
  • project – Project id
Raises:

DoesNotExist

stats(name, project=None)

Base method for queue stats.

Parameters:
  • name – The queue name
  • project – Project id
Returns:

Dictionary with the queue stats

class Subscription(driver)

Bases: zaqar.storage.base.ControllerBase

This class is responsible for managing subscriptions of notification.

confirm(queue, subscription_id, project=None, confirmed=True)

Base method for confirming a subscription.

Parameters:
  • queue (six.text_type) – Name of the queue subscription belongs to.
  • subscription_id (six.text_type) – ID of the subscription to be deleted.
  • project (six.text_type) – Project id
  • confirmed – Confirm a subscription or cancel the confirmation of

a subscription. :type confirmed: boolean

create(queue, subscriber, ttl, options, project=None)

Create a new subscription.

:param queue:The source queue for notifications :type queue: six.text_type :param subscriber: The subscriber URI :type subscriber: six.text_type :param ttl: time to live for this subscription :type ttl: int :param options: Options used to configure this subscription :type options: dict :param project: Project id :type project: six.text_type :returns: True if a subscription was created and False if it is failed. :rtype: boolean

delete(queue, subscription_id, project=None)

Base method for deleting a subscription.

Parameters:
  • queue (six.text_type) – Name of the queue subscription belongs to.
  • subscription_id (six.text_type) – ID of the subscription to be deleted.
  • project (six.text_type) – Project id
exists(queue, subscription_id, project=None)

Base method for testing subscription existence.

Parameters:
  • queue (six.text_type) – Name of the queue subscription belongs to.
  • subscription_id (six.text_type) – ID of subscription
  • project (six.text_type) – Project id
Returns:

True if a subscription exists and False if it does not.

get(queue, subscription_id, project=None)

Returns a single subscription entry.

Parameters:
  • queue (six.text_type) – Name of the queue subscription belongs to.
  • subscription_id (six.text_type) – ID of this subscription
  • project (six.text_type) – Project this subscription belongs to.
Returns:

Dictionary containing subscription data

Return type:

{}

Raises:

SubscriptionDoesNotExist if not found

get_with_subscriber(queue, subscriber, project=None)

Base method for get a subscription with the subscriber.

Parameters:
  • queue (six.text_type) – Name of the queue subscription belongs to.
  • subscriber (six.text_type) – link of the subscription to be notified.
  • project (six.text_type) – Project id
Returns:

Dictionary containing subscription data

Return type:

dict

list(queue, project=None, marker=None, limit=10)

Base method for listing subscriptions.

Parameters:
  • queue (six.text_type) – Name of the queue to get the subscriptions from.
  • project (six.text_type) – Project this subscription belongs to.
  • marker (six.text_type) – used to determine which subscription to start with
  • limit (int) – (Default 10) Max number of results to return
Returns:

An iterator giving a sequence of subscriptions and the marker of the next page.

Return type:

[{}]

update(queue, subscription_id, project=None, **kwargs)

Updates the weight, uris, and/or options of this subscription

Parameters:
  • queue (six.text_type) – Name of the queue subscription belongs to.
  • name (text) – ID of the subscription
  • kwargs (dict) – one of: source, subscriber, ttl, options
Raises:

SubscriptionDoesNotExist if not found

Raises:

SubscriptionAlreadyExists on attempt to update in a way to create duplicate subscription