Created
November 3, 2018 16:44
-
-
Save cdent/a668cca4cd11aa14bd9eec21bef140fe to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import contextlib | |
import functools | |
import testtools | |
from alembic import script | |
import fixtures | |
import mock | |
from oslo_concurrency.fixture import lockutils as concurrency | |
from oslo_config import fixture as config_fixture | |
from oslo_db import exception as db_exc | |
from oslo_db.sqlalchemy import enginefacade | |
from oslo_db.sqlalchemy import provision | |
from oslo_db.sqlalchemy import test_fixtures | |
from oslo_db.sqlalchemy import test_migrations | |
from oslo_log import log as logging | |
from oslotest import base as test_base | |
from placement import conf | |
from placement.db.sqlalchemy import migration | |
from placement.db.sqlalchemy import models | |
from placement import db_api | |
""" | |
All the magic in the OpportunistDBMixin was making it very hard to figure | |
out what was going on with test failures. I had figured out that at least | |
some of the time a different database name was being used, and this may | |
have been confusing the use of the [placement_database]/connection. So | |
I started trying to scale things back to something that actually works. | |
This now passes in a way that I expect it to but it has some caveats: | |
* It uses its own checking to see if the required database is around. | |
If not, it calls skipTest. | |
* Because it always uses the same database name, there are concurrency | |
problems which have to be guarded with a lock. | |
* That lock currently live in /tmp but presumably that's just not good | |
enough. | |
I suspect the magic available from the oslo_db fixtures is very handy | |
if you grok how it works and if you are doing something normal with | |
your db, but that's not what is going on here. The placement context | |
manager is a bit different in how it configures itself, thus not normal. | |
And I'm _really_ struggling to tease out the relationships between the | |
vast numbers of classes involved in oslo_db..test_{fixtures,migrations}. | |
""" | |
CONF = conf.CONF | |
DB_NAME = 'openstack_citest' | |
LOG = logging.getLogger(__name__) | |
@contextlib.contextmanager | |
def patch_with_engine(engine): | |
with mock.patch.object(enginefacade.writer, | |
'get_engine') as patch_engine: | |
patch_engine.return_value = engine | |
yield | |
def generate_url(driver): | |
"""Make a database URL to be used with the opportunistic tests.""" | |
# NOTE(cdent): Because of the way we need to configure the | |
# [placement_database]/connection, we need to have a predictable | |
# database URL. | |
backend = provision.BackendImpl.impl(driver) | |
url = backend.create_opportunistic_driver_url() | |
# if a dbname is present or the url ends with '/' take it off | |
url = url[:url.rindex('/')] | |
url = url + '/' + DB_NAME | |
return url | |
class WalkVersionsMixin(object): | |
def _walk_versions(self, engine=None, alembic_cfg=None): | |
# Determine latest version script from the repo, then | |
# upgrade from 1 through to the latest, with no data | |
# in the databases. This just checks that the schema itself | |
# upgrades successfully. | |
# Place the database under version control | |
with patch_with_engine(engine): | |
script_directory = script.ScriptDirectory.from_config(alembic_cfg) | |
self.assertIsNone(self.migration_api.version(alembic_cfg)) | |
versions = [ver for ver in script_directory.walk_revisions()] | |
for version in reversed(versions): | |
self._migrate_up(engine, alembic_cfg, | |
version.revision, with_data=True) | |
def _migrate_up(self, engine, config, version, with_data=False): | |
"""migrate up to a new version of the db. | |
We allow for data insertion and post checks at every | |
migration version with special _pre_upgrade_### and | |
_check_### functions in the main test. | |
""" | |
# NOTE(sdague): try block is here because it's impossible to debug | |
# where a failed data migration happens otherwise | |
try: | |
if with_data: | |
data = None | |
pre_upgrade = getattr( | |
self, "_pre_upgrade_%s" % version, None) | |
if pre_upgrade: | |
data = pre_upgrade(engine) | |
self.migration_api.upgrade(version, config=config) | |
self.assertEqual(version, self.migration_api.version(config)) | |
if with_data: | |
check = getattr(self, "_check_%s" % version, None) | |
if check: | |
check(engine, data) | |
except Exception: | |
LOG.error("Failed to migrate to version %(version)s on engine " | |
"%(engine)s", | |
{'version': version, 'engine': engine}) | |
raise | |
class MigrationCheckersMixin(object): | |
def setUp(self): | |
conf_fixture = self.useFixture(config_fixture.Config(CONF)) | |
# find the right path for this | |
conf_fixture.config(lock_path='/tmp', group='oslo_concurrency') | |
self.useFixture(concurrency.LockFixture('test_mig')) | |
url = generate_url(self.DRIVER) | |
conf_fixture.config(group='placement_database', connection=url) | |
db_api.configure(CONF) | |
try: | |
self.engine = db_api.get_placement_engine() | |
except db_exc.DBNonExistentDatabase: | |
self.skipTest('%s not available' % self.DRIVER) | |
self.config = migration._alembic_config() | |
self.migration_api = migration | |
super(MigrationCheckersMixin, self).setUp() | |
backend = provision.Backend(self.engine.name, self.engine.url) | |
self.addCleanup( | |
functools.partial(backend.drop_all_objects, self.engine)) | |
self.addCleanup(self.engine.dispose) | |
def test_walk_versions(self): | |
self._walk_versions(self.engine, self.config) | |
# # Leaving this here as a sort of template for when we do migration tests. | |
# def _check_fb3f10dd262e(self, engine, data): | |
# nodes_tbl = db_utils.get_table(engine, 'nodes') | |
# col_names = [column.name for column in nodes_tbl.c] | |
# self.assertIn('fault', col_names) | |
# self.assertIsInstance(nodes_tbl.c.fault.type, | |
# sqlalchemy.types.String) | |
def test_upgrade_and_version(self): | |
self.migration_api.upgrade('head') | |
self.assertIsNotNone(self.migration_api.version()) | |
def test_upgrade_twice(self): | |
# Start with the empty version | |
self.migration_api.upgrade('158782c7f38c') | |
v1 = self.migration_api.version() | |
# Now upgrade to head | |
self.migration_api.upgrade('head') | |
v2 = self.migration_api.version() | |
self.assertNotEqual(v1, v2) | |
class TestMigrationsMySQL(MigrationCheckersMixin, | |
WalkVersionsMixin, | |
test_base.BaseTestCase): | |
DRIVER = 'mysql' | |
class TestMigrationsPostgresql(MigrationCheckersMixin, | |
WalkVersionsMixin, | |
test_base.BaseTestCase): | |
DRIVER = 'postgresql' | |
class ModelsMigrationSyncMixin(object): | |
def setUp(self): | |
conf_fixture = self.useFixture(config_fixture.Config(CONF)) | |
conf_fixture.config(lock_path='/tmp', group='oslo_concurrency') | |
self.useFixture(concurrency.LockFixture('test_mig')) | |
url = generate_url(self.DRIVER) | |
import sys | |
sys.stderr.write('#### url is %s\n' % url) | |
conf_fixture.config(group='placement_database', connection=url) | |
db_api.configure(CONF) | |
super(ModelsMigrationSyncMixin, self).setUp() | |
def get_metadata(self): | |
return models.BASE.metadata | |
def get_engine(self): | |
try: | |
return db_api.get_placement_engine() | |
except db_exc.DBNonExistentDatabase: | |
self.skipTest('%s not available' % self.DRIVER) | |
def db_sync(self, engine): | |
migration.upgrade('head') | |
class ModelsMigrationsSyncMysql(ModelsMigrationSyncMixin, | |
test_migrations.ModelsMigrationsSync, | |
test_base.BaseTestCase): | |
DRIVER = 'mysql' | |
class ModelsMigrationsSyncPostgresql(ModelsMigrationSyncMixin, | |
test_migrations.ModelsMigrationsSync, | |
test_base.BaseTestCase): | |
DRIVER = 'postgresql' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment