Source code for stestr.repository.sql

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

"""Persistent storage of test results."""


import datetime
import io
import os.path
import re
import subprocess
import sys

import six
import sqlalchemy
from sqlalchemy import orm
import subunit.v2
from subunit2sql.db import api as db_api
from subunit2sql import read_subunit
from subunit2sql import shell
from subunit2sql import write_subunit
import testtools

from stestr.repository import abstract as repository
from stestr import utils


def atomicish_rename(source, target):
    if os.name != "posix" and os.path.exists(target):
        os.remove(target)
    os.rename(source, target)


[docs]class RepositoryFactory(repository.AbstractRepositoryFactory):
[docs] def initialise(klass, url): """Create a repository at url/path.""" print("WARNING: The SQL repository type is still experimental. You " "might encounter issues while using it.") result = Repository(url) # TODO(mtreinish): Figure out the python api to run the migrations for # setting up the schema. proc = subprocess.Popen(['subunit2sql-db-manage', '--database-connection', url, 'upgrade', 'head'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = proc.communicate() sys.stdout.write(six.text_type(out)) sys.stderr.write(six.text_type(err)) return result
[docs] def open(self, url): repo = Repository(url) # To test the repository's existence call get_ids_for_all_tests() # if it raises an OperationalError that means the DB doesn't exist or # it couldn't connect, either way the repository was not found. try: session = repo.session_factory() db_api.get_ids_for_all_tests(session=session) session.close() except sqlalchemy.exc.OperationalError: raise repository.RepositoryNotFound(url) return repo
[docs]class Repository(repository.AbstractRepository): """subunit2sql based storage of test results. This repository stores each stream in a subunit2sql DB. Refer to the subunit2sql documentation for """ def __init__(self, url): """Create a subunit2sql-based repository object for the repo at 'url'. :param base: The path to the repository. """ self.base = url self.engine = sqlalchemy.create_engine(url) self.session_factory = orm.sessionmaker(bind=self.engine) # TODO(mtreinish): We need to add a subunit2sql api to get the count
[docs] def count(self): super(Repository, self).count()
def _get_latest_run(self): session = self.session_factory() latest_run = db_api.get_latest_run(session) session.close() if not latest_run: raise KeyError("No tests in repository") return latest_run
[docs] def latest_id(self): return self._get_latest_run().uuid
[docs] def get_failing(self): latest_run = self._get_latest_run() session = self.session_factory() failed_test_runs = db_api.get_test_runs_by_status_for_run_ids( 'fail', [latest_run.id], session=session) session.close() return _Subunit2SqlRun(self.base, None, test_runs=failed_test_runs)
[docs] def get_test_run(self, run_id): return _Subunit2SqlRun(self.base, run_id)
def _get_inserter(self, partial, run_id=None): return _SqlInserter(self, partial, run_id) def _get_test_times(self, test_ids): result = {} # TODO(mtreinish): after subunit2sql adds a bulk query for getting # multiple tests by test_id at once remove the for loop session = self.session_factory() for test_id in test_ids: stripped_test_id = utils.cleanup_test_name(test_id) test = db_api.get_test_by_test_id(stripped_test_id, session=session) if test: # NOTE(mtreinish): We need to make sure the test_id with attrs # is used in the output dict, otherwise the scheduler won't # see it result[test_id] = test.run_time session.close() return result
class _Subunit2SqlRun(repository.AbstractTestRun): """A test run that was inserted into the repository.""" def __init__(self, url, run_id, test_runs=None): engine = sqlalchemy.create_engine(url) self.session_factory = orm.sessionmaker(bind=engine) self._run_id = run_id self._test_runs = test_runs def get_id(self): return self._run_id def get_subunit_stream(self): stream = io.BytesIO() if self._run_id: session = self.session_factory() test_runs = db_api.get_tests_run_dicts_from_run_id(self._run_id, session) session.close() else: test_runs = self._test_runs output = subunit.v2.StreamResultToBytes(stream) output.startTestRun() for test_id in test_runs: test = test_runs[test_id] # NOTE(mtreinish): test_run_metadata is not guaranteed to be # present for the test run metadata = test.get('metadata', None) write_subunit.write_test(output, test['start_time'], test['stop_time'], test['status'], test_id, metadata) output.stopTestRun() stream.seek(0) return stream def get_test(self): stream = self.get_subunit_stream() case = subunit.ByteStreamToStreamResult(stream) return case class _SqlInserter(repository.AbstractTestRun): """Insert test results into a sql repository.""" def __init__(self, repository, partial=False, run_id=None): self._repository = repository self.partial = partial self._subunit = None self._run_id = run_id # Create a new session factory self.engine = sqlalchemy.create_engine(self._repository.base) self.session_factory = orm.sessionmaker(bind=self.engine, autocommit=True) def startTestRun(self): self._subunit = io.BytesIO() self.subunit_stream = subunit.v2.StreamResultToBytes(self._subunit) self.hook = testtools.CopyStreamResult([ testtools.StreamToDict(self._handle_test), self.subunit_stream]) self.hook.startTestRun() self.start_time = datetime.datetime.utcnow() session = self.session_factory() if not self._run_id: self.run = db_api.create_run(session=session) self._run_id = self.run.uuid else: int_id = db_api.get_run_id_from_uuid(self._run_id, session=session) self.run = db_api.get_run_by_id(int_id, session=session) session.close() self.totals = {} def _update_test(self, test_dict, session, start_time, stop_time): test_id = utils.cleanup_test_name(test_dict['id']) db_test = db_api.get_test_by_test_id(test_id, session) if not db_test: if test_dict['status'] == 'success': success = 1 fails = 0 elif test_dict['status'] == 'fail': fails = 1 success = 0 else: fails = 0 success = 0 run_time = read_subunit.get_duration(start_time, stop_time) db_test = db_api.create_test(test_id, (success + fails), success, fails, run_time, session) else: test_dict['start_time'] = start_time test_dict['end_time'] = stop_time test_values = shell.increment_counts(db_test, test_dict) # If skipped nothing to update if test_values: db_api.update_test(test_values, db_test.id, session) return db_test def _get_attrs(self, test_id): attr_regex = re.compile('\[(.*)\]') matches = attr_regex.search(test_id) attrs = None if matches: attrs = matches.group(1) return attrs def _handle_test(self, test_dict): start, end = test_dict.pop('timestamps') if test_dict['status'] == 'exists' or None in (start, end): return elif test_dict['id'] == 'process-returncode': return session = self.session_factory() try: # Update the run counts if test_dict['status'] not in self.totals: self.totals[test_dict['status']] = 1 else: self.totals[test_dict['status']] += 1 values = {} if test_dict['status'] in ('success', 'xfail'): values['passes'] = self.totals['success'] elif test_dict['status'] in ('fail', 'uxsuccess'): values['fails'] = self.totals['fail'] elif test_dict['status'] == 'skip': values['skips'] = self.totals['skip'] db_api.update_run(values, self.run.id, session=session) # Update the test totals db_test = self._update_test(test_dict, session, start, end) # Add the test run test_run = db_api.create_test_run(db_test.id, self.run.id, test_dict['status'], start, end, session) metadata = {} attrs = self._get_attrs(test_dict['id']) if attrs: metadata['attrs'] = attrs if test_dict.get('tags', None): metadata['tags'] = ",".join(test_dict['tags']) if metadata: db_api.add_test_run_metadata( metadata, test_run.id, session) # TODO(mtreinish): Add attachments support to the DB. session.close() except Exception: session.rollback() raise def stopTestRun(self): self.hook.stopTestRun() stop_time = datetime.datetime.utcnow() self._subunit.seek(0) values = {} values['run_time'] = read_subunit.get_duration(self.start_time, stop_time) session = self.session_factory() db_api.update_run(values, self.run.id, session=session) session.close() def status(self, *args, **kwargs): self.hook.status(*args, **kwargs) def get_id(self): return self._run_id