# -*- coding: utf-8 -*-
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
# Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import errno
import io
import os
import shutil
import cachetools
import fasteners
from oslo_serialization import jsonutils
from oslo_utils import fileutils
from taskflow import exceptions as exc
from taskflow.persistence import path_based
from taskflow.utils import misc
@contextlib.contextmanager
def _storagefailure_wrapper():
try:
yield
except exc.TaskFlowException:
raise
except Exception as e:
if isinstance(e, (IOError, OSError)) and e.errno == errno.ENOENT:
exc.raise_with_cause(exc.NotFound,
'Item not found: %s' % e.filename,
cause=e)
else:
exc.raise_with_cause(exc.StorageFailure,
"Storage backend internal error", cause=e)
[docs]class DirBackend(path_based.PathBasedBackend):
"""A directory and file based backend.
This backend does *not* provide true transactional semantics. It does
guarantee that there will be no interprocess race conditions when
writing and reading by using a consistent hierarchy of file based locks.
Example configuration::
conf = {
"path": "/tmp/taskflow", # save data to this root directory
"max_cache_size": 1024, # keep up-to 1024 entries in memory
}
"""
DEFAULT_FILE_ENCODING = 'utf-8'
"""
Default encoding used when decoding or encoding files into or from
text/unicode into binary or binary into text/unicode.
"""
def __init__(self, conf):
super(DirBackend, self).__init__(conf)
max_cache_size = self._conf.get('max_cache_size')
if max_cache_size is not None:
max_cache_size = int(max_cache_size)
if max_cache_size < 1:
raise ValueError("Maximum cache size must be greater than"
" or equal to one")
self.file_cache = cachetools.LRUCache(max_cache_size)
else:
self.file_cache = {}
self.encoding = self._conf.get('encoding', self.DEFAULT_FILE_ENCODING)
if not self._path:
raise ValueError("Empty path is disallowed")
self._path = os.path.abspath(self._path)
self.lock = fasteners.ReaderWriterLock()
def get_connection(self):
return Connection(self)
def close(self):
pass
class Connection(path_based.PathBasedConnection):
def _read_from(self, filename):
# This is very similar to the oslo-incubator fileutils module, but
# tweaked to not depend on a global cache, as well as tweaked to not
# pull-in the oslo logging module (which is a huge pile of code).
mtime = os.path.getmtime(filename)
cache_info = self.backend.file_cache.setdefault(filename, {})
if not cache_info or mtime > cache_info.get('mtime', 0):
with io.open(filename, 'r', encoding=self.backend.encoding) as fp:
cache_info['data'] = fp.read()
cache_info['mtime'] = mtime
return cache_info['data']
def _write_to(self, filename, contents):
contents = misc.binary_encode(contents,
encoding=self.backend.encoding)
with io.open(filename, 'wb') as fp:
fp.write(contents)
self.backend.file_cache.pop(filename, None)
@contextlib.contextmanager
def _path_lock(self, path):
lockfile = self._join_path(path, 'lock')
with fasteners.InterProcessLock(lockfile) as lock:
with _storagefailure_wrapper():
yield lock
def _join_path(self, *parts):
return os.path.join(*parts)
def _get_item(self, path):
with self._path_lock(path):
item_path = self._join_path(path, 'metadata')
return misc.decode_json(self._read_from(item_path))
def _set_item(self, path, value, transaction):
with self._path_lock(path):
item_path = self._join_path(path, 'metadata')
self._write_to(item_path, jsonutils.dumps(value))
def _del_tree(self, path, transaction):
with self._path_lock(path):
shutil.rmtree(path)
def _get_children(self, path):
if path == self.book_path:
filter_func = os.path.isdir
else:
filter_func = os.path.islink
with _storagefailure_wrapper():
return [child for child in os.listdir(path)
if filter_func(self._join_path(path, child))]
def _ensure_path(self, path):
with _storagefailure_wrapper():
fileutils.ensure_tree(path)
def _create_link(self, src_path, dest_path, transaction):
with _storagefailure_wrapper():
try:
os.symlink(src_path, dest_path)
except OSError as e:
if e.errno != errno.EEXIST:
raise
@contextlib.contextmanager
def _transaction(self):
"""This just wraps a global write-lock."""
lock = self.backend.lock.write_lock
with lock():
yield
def validate(self):
with _storagefailure_wrapper():
for p in (self.flow_path, self.atom_path, self.book_path):
if not os.path.isdir(p):
raise RuntimeError("Missing required directory: %s" % (p))