Source code for stestr.repository.sql

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

"""Persistent storage of test results."""
from __future__ import print_function

import datetime
import io
import os.path
import re
import subprocess
import sys

import sqlalchemy
from sqlalchemy import orm
import subunit.v2
from subunit2sql.db import api as db_api
from subunit2sql import read_subunit
from subunit2sql import shell
from subunit2sql import write_subunit
import testtools

from stestr.repository import abstract as repository
from stestr import utils


def atomicish_rename(source, target):
    if os.name != "posix" and os.path.exists(target):
        os.remove(target)
    os.rename(source, target)


[docs]class RepositoryFactory(repository.AbstractRepositoryFactory):
[docs] def initialise(klass, url): """Create a repository at url/path.""" print("WARNING: The SQL repository type is still experimental. You " "might encounter issues while using it.", file=sys.stderr) result = Repository(url) # TODO(mtreinish): Figure out the python api to run the migrations for # setting up the schema. proc = subprocess.Popen(['subunit2sql-db-manage', '--database-connection', url, 'upgrade', 'head'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = proc.communicate() sys.stdout.write(str(out)) sys.stderr.write(str(err)) return result
[docs] def open(self, url): repo = Repository(url) # To test the repository's existence call get_ids_for_all_tests() # if it raises an OperationalError that means the DB doesn't exist or # it couldn't connect, either way the repository was not found. try: session = repo.session_factory() db_api.get_ids_for_all_tests(session=session) session.close() except sqlalchemy.exc.OperationalError: raise repository.RepositoryNotFound(url) return repo
[docs]class Repository(repository.AbstractRepository): """subunit2sql based storage of test results. This repository stores each stream in a subunit2sql DB. Refer to the subunit2sql documentation for """ def __init__(self, url): """Create a subunit2sql-based repository object for the repo at 'url'. :param base: The path to the repository. """ self.base = url self.engine = sqlalchemy.create_engine(url) self.session_factory = orm.sessionmaker(bind=self.engine) # TODO(mtreinish): We need to add a subunit2sql api to get the count
[docs] def count(self): super(Repository, self).count()
def _get_latest_run(self): session = self.session_factory() latest_run = db_api.get_latest_run(session) session.close() if not latest_run: raise KeyError("No tests in repository") return latest_run
[docs] def latest_id(self): return self._get_latest_run().uuid
[docs] def get_failing(self): latest_run = self._get_latest_run() session = self.session_factory() failed_test_runs = db_api.get_test_runs_by_status_for_run_ids( 'fail', [latest_run.id], session=session) session.close() return _Subunit2SqlRun(self.base, None, test_runs=failed_test_runs)
[docs] def get_test_run(self, run_id): return _Subunit2SqlRun(self.base, run_id)
def _get_inserter(self, partial, run_id=None, metadata=None): return _SqlInserter(self, partial, run_id, metadata) def _get_test_times(self, test_ids): result = {} # TODO(mtreinish): after subunit2sql adds a bulk query for getting # multiple tests by test_id at once remove the for loop session = self.session_factory() for test_id in test_ids: stripped_test_id = utils.cleanup_test_name(test_id) test = db_api.get_test_by_test_id(stripped_test_id, session=session) if test: # NOTE(mtreinish): We need to make sure the test_id with attrs # is used in the output dict, otherwise the scheduler won't # see it result[test_id] = test.run_time session.close() return result
[docs] def find_metadata(self, metadata): session = self.session_factory() runs = db_api.get_runs_by_key_value('stestr_run_meta', metadata, session=session) return [x.uuid for x in runs]
class _Subunit2SqlRun(repository.AbstractTestRun): """A test run that was inserted into the repository.""" def __init__(self, url, run_id, test_runs=None): engine = sqlalchemy.create_engine(url) self.session_factory = orm.sessionmaker(bind=engine) self._run_id = run_id self._test_runs = test_runs def get_id(self): return self._run_id def get_subunit_stream(self): stream = io.BytesIO() if self._run_id: session = self.session_factory() test_runs = db_api.get_tests_run_dicts_from_run_id(self._run_id, session) session.close() else: test_runs = self._test_runs output = subunit.v2.StreamResultToBytes(stream) output.startTestRun() for test_id in test_runs: test = test_runs[test_id] # NOTE(mtreinish): test_run_metadata is not guaranteed to be # present for the test run metadata = test.get('metadata', None) write_subunit.write_test(output, test['start_time'], test['stop_time'], test['status'], test_id, metadata) output.stopTestRun() stream.seek(0) return stream def get_test(self): stream = self.get_subunit_stream() case = subunit.ByteStreamToStreamResult(stream) return case def get_metadata(self): if self._run_id: session = self.session_factory() metadata = db_api.get_run_metadata(self._run_id, session=session) for meta in metadata: if meta.key == 'stestr_run_meta': return meta.value return None class _SqlInserter(repository.AbstractTestRun): """Insert test results into a sql repository.""" def __init__(self, repository, partial=False, run_id=None, metadata=None): self._repository = repository self.partial = partial self._subunit = None self._run_id = run_id self._metadata = metadata # Create a new session factory self.engine = sqlalchemy.create_engine(self._repository.base) self.session_factory = orm.sessionmaker(bind=self.engine, autocommit=True) def startTestRun(self): self._subunit = io.BytesIO() self.subunit_stream = subunit.v2.StreamResultToBytes(self._subunit) self.hook = testtools.CopyStreamResult([ testtools.StreamToDict(self._handle_test), self.subunit_stream]) self.hook.startTestRun() self.start_time = datetime.datetime.utcnow() session = self.session_factory() if not self._run_id: self.run = db_api.create_run(session=session) if self._metadata: db_api.add_run_metadata({'stestr_run_meta': self._metadata}, self.run.id, session=session) self._run_id = self.run.uuid else: int_id = db_api.get_run_id_from_uuid(self._run_id, session=session) self.run = db_api.get_run_by_id(int_id, session=session) session.close() self.totals = {} def _update_test(self, test_dict, session, start_time, stop_time): test_id = utils.cleanup_test_name(test_dict['id']) db_test = db_api.get_test_by_test_id(test_id, session) if not db_test: if test_dict['status'] == 'success': success = 1 fails = 0 elif test_dict['status'] == 'fail': fails = 1 success = 0 else: fails = 0 success = 0 run_time = read_subunit.get_duration(start_time, stop_time) db_test = db_api.create_test(test_id, (success + fails), success, fails, run_time, session) else: test_dict['start_time'] = start_time test_dict['end_time'] = stop_time test_values = shell.increment_counts(db_test, test_dict) # If skipped nothing to update if test_values: db_api.update_test(test_values, db_test.id, session) return db_test def _get_attrs(self, test_id): attr_regex = re.compile(r'\[(.*)\]') matches = attr_regex.search(test_id) attrs = None if matches: attrs = matches.group(1) return attrs def _handle_test(self, test_dict): start, end = test_dict.pop('timestamps') if test_dict['status'] == 'exists' or None in (start, end): return elif test_dict['id'] == 'process-returncode': return session = self.session_factory() try: # Update the run counts if test_dict['status'] not in self.totals: self.totals[test_dict['status']] = 1 else: self.totals[test_dict['status']] += 1 values = {} if test_dict['status'] in ('success', 'xfail'): values['passes'] = self.totals['success'] elif test_dict['status'] in ('fail', 'uxsuccess'): values['fails'] = self.totals['fail'] elif test_dict['status'] == 'skip': values['skips'] = self.totals['skip'] db_api.update_run(values, self.run.id, session=session) # Update the test totals db_test = self._update_test(test_dict, session, start, end) # Add the test run test_run = db_api.create_test_run(db_test.id, self.run.id, test_dict['status'], start, end, session) metadata = {} attrs = self._get_attrs(test_dict['id']) if attrs: metadata['attrs'] = attrs if test_dict.get('tags', None): metadata['tags'] = ",".join(test_dict['tags']) if metadata: db_api.add_test_run_metadata( metadata, test_run.id, session) # TODO(mtreinish): Add attachments support to the DB. session.close() except Exception: session.rollback() raise def stopTestRun(self): self.hook.stopTestRun() stop_time = datetime.datetime.utcnow() self._subunit.seek(0) values = {} values['run_time'] = read_subunit.get_duration(self.start_time, stop_time) session = self.session_factory() db_api.update_run(values, self.run.id, session=session) session.close() def status(self, *args, **kwargs): self.hook.status(*args, **kwargs) def get_id(self): return self._run_id