MINOR: fix mssql integration test (#17923)

* change tag for sql server due to https://github.com/microsoft/mssql-docker/issues/441 (or some similar issue)

* use 2022-latest

* fixed mssql tests

* format

* used new columns

* use the custom sql server
This commit is contained in:
Imri Paran 2024-09-20 08:52:40 +02:00 committed by GitHub
parent f4ce0e8693
commit d09bca26f6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 94 additions and 37 deletions

View File

@ -370,7 +370,7 @@ test = {
VERSIONS["grpc-tools"], VERSIONS["grpc-tools"],
VERSIONS["neo4j"], VERSIONS["neo4j"],
"testcontainers==3.7.1;python_version<'3.9'", "testcontainers==3.7.1;python_version<'3.9'",
"testcontainers==4.8.0;python_version>='3.9'", "testcontainers==4.8.1;python_version>='3.9'",
"minio==7.2.5", "minio==7.2.5",
*plugins["mlflow"], *plugins["mlflow"],
*plugins["datalake-s3"], *plugins["datalake-s3"],

View File

@ -1,5 +1,6 @@
import os import os
import shutil import shutil
import tempfile
import pytest import pytest
from sqlalchemy import create_engine, text from sqlalchemy import create_engine, text
@ -22,24 +23,55 @@ from metadata.generated.schema.entity.services.databaseService import (
from ..conftest import ingestion_config as base_ingestion_config from ..conftest import ingestion_config as base_ingestion_config
@pytest.fixture(scope="module") @pytest.fixture(scope="session")
def mssql_container(tmp_path_factory): def db_name():
container = SqlServerContainer( return "AdventureWorksLT2022"
"mcr.microsoft.com/mssql/server:2017-latest", dbname="AdventureWorks"
class CustomSqlServerContainer(SqlServerContainer):
def start(self) -> "DbContainer":
dockerfile = f"""
FROM {self.image}
USER root
RUN mkdir -p /data
RUN chown mssql /data
USER mssql
"""
temp_dir = os.path.join(tempfile.gettempdir(), "mssql")
os.makedirs(temp_dir, exist_ok=True)
temp_dockerfile_path = os.path.join(temp_dir, "Dockerfile")
with open(temp_dockerfile_path, "w") as temp_dockerfile:
temp_dockerfile.write(dockerfile)
self.get_docker_client().build(temp_dir, tag=self.image)
return super().start()
def _configure(self) -> None:
super()._configure()
self.with_env("SQL_SA_PASSWORD", self.password)
@pytest.fixture(scope="session")
def mssql_container(tmp_path_factory, db_name):
container = CustomSqlServerContainer(
"mcr.microsoft.com/mssql/server:2022-latest", dbname="master"
) )
data_dir = tmp_path_factory.mktemp("data") data_dir = tmp_path_factory.mktemp("data")
shutil.copy( shutil.copy(
os.path.join(os.path.dirname(__file__), "data", "AdventureWorks2017.bak"), os.path.join(os.path.dirname(__file__), "data", f"{db_name}.bak"),
str(data_dir), str(data_dir),
) )
with open(data_dir / "install.sql", "w") as f: with open(data_dir / "install.sql", "w") as f:
f.write( f.write(
""" f"""
USE [master] USE [master]
RESTORE DATABASE [AdventureWorks] RESTORE FILELISTONLY
FROM DISK = '/data/AdventureWorks2017.bak' FROM DISK = '/data/{db_name}.bak';
WITH MOVE 'AdventureWorks2017' TO '/var/opt/mssql/data/AdventureWorks.mdf', GO
MOVE 'AdventureWorks2017_log' TO '/var/opt/mssql/data/AdventureWorks_log.ldf'
RESTORE DATABASE [{db_name}]
FROM DISK = '/data/{db_name}.bak'
WITH MOVE '{db_name}_Data' TO '/var/opt/mssql/data/{db_name}.mdf',
MOVE '{db_name}_Log' TO '/var/opt/mssql/data/{db_name}.ldf';
GO GO
""" """
) )
@ -49,17 +81,22 @@ GO
copy_dir_to_container(str(data_dir), docker_container, "/data") copy_dir_to_container(str(data_dir), docker_container, "/data")
res = docker_container.exec_run( res = docker_container.exec_run(
[ [
"/opt/mssql-tools/bin/sqlcmd", "bash",
"-S", "-c",
"localhost", " ".join(
"-U", [
container.username, "/opt/mssql-tools*/bin/sqlcmd",
"-P", "-U",
container.password, container.username,
"-d", "-P",
"master", f"'{container.password}'",
"-i", "-d",
"/data/install.sql", "master",
"-i",
"/data/install.sql",
"-C",
]
),
] ]
) )
if res[0] != 0: if res[0] != 0:
@ -72,7 +109,7 @@ GO
transaciton = conn.begin() transaciton = conn.begin()
conn.execute( conn.execute(
text( text(
"SELECT * INTO AdventureWorks.HumanResources.DepartmenCopy FROM AdventureWorks.HumanResources.Department;" f"SELECT * INTO {db_name}.SalesLT.CustomerCopy FROM {db_name}.SalesLT.Customer;"
) )
) )
transaciton.commit() transaciton.commit()
@ -91,7 +128,7 @@ def scheme(request):
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def create_service_request(mssql_container, scheme, tmp_path_factory): def create_service_request(mssql_container, scheme, tmp_path_factory, db_name):
return CreateDatabaseServiceRequest( return CreateDatabaseServiceRequest(
name="docker_test_" + tmp_path_factory.mktemp("mssql").name + "_" + scheme.name, name="docker_test_" + tmp_path_factory.mktemp("mssql").name + "_" + scheme.name,
serviceType=DatabaseServiceType.Mssql, serviceType=DatabaseServiceType.Mssql,
@ -101,7 +138,7 @@ def create_service_request(mssql_container, scheme, tmp_path_factory):
password=mssql_container.password, password=mssql_container.password,
hostPort="localhost:" hostPort="localhost:"
+ mssql_container.get_exposed_port(mssql_container.port), + mssql_container.get_exposed_port(mssql_container.port),
database="AdventureWorks", database=db_name,
scheme=scheme, scheme=scheme,
ingestAllDatabases=True, ingestAllDatabases=True,
connectionOptions={ connectionOptions={
@ -115,12 +152,17 @@ def create_service_request(mssql_container, scheme, tmp_path_factory):
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def ingestion_config( def ingestion_config(
db_service, tmp_path_factory, workflow_config, sink_config, base_ingestion_config db_service,
tmp_path_factory,
workflow_config,
sink_config,
base_ingestion_config,
db_name,
): ):
base_ingestion_config["source"]["sourceConfig"]["config"][ base_ingestion_config["source"]["sourceConfig"]["config"][
"databaseFilterPattern" "databaseFilterPattern"
] = { ] = {
"includes": ["TestDB", "AdventureWorks"], "includes": ["TestDB", db_name],
} }
return base_ingestion_config return base_ingestion_config

View File

@ -27,7 +27,7 @@ def language_config(mssql_container, request):
@pytest.fixture() @pytest.fixture()
def lineage_config(language_config, db_service, workflow_config, sink_config): def lineage_config(language_config, db_service, workflow_config, sink_config, db_name):
return { return {
"source": { "source": {
"type": "mssql-lineage", "type": "mssql-lineage",
@ -35,7 +35,7 @@ def lineage_config(language_config, db_service, workflow_config, sink_config):
"sourceConfig": { "sourceConfig": {
"config": { "config": {
"type": "DatabaseLineage", "type": "DatabaseLineage",
"databaseFilterPattern": {"includes": ["TestDB", "AdventureWorks"]}, "databaseFilterPattern": {"includes": ["TestDB", db_name]},
}, },
}, },
}, },
@ -52,13 +52,14 @@ def test_lineage(
lineage_config, lineage_config,
db_service, db_service,
metadata, metadata,
db_name,
): ):
search_cache.clear() search_cache.clear()
run_workflow(MetadataWorkflow, ingestion_config) run_workflow(MetadataWorkflow, ingestion_config)
run_workflow(MetadataWorkflow, lineage_config) run_workflow(MetadataWorkflow, lineage_config)
department_table = metadata.get_by_name( department_table = metadata.get_by_name(
Table, Table,
f"{db_service.fullyQualifiedName.root}.AdventureWorks.HumanResources.Department", f"{db_service.fullyQualifiedName.root}.{db_name}.SalesLT.Customer",
nullable=False, nullable=False,
) )
lineage = metadata.get_lineage_by_id(Table, department_table.id.root) lineage = metadata.get_lineage_by_id(Table, department_table.id.root)

View File

@ -15,15 +15,29 @@ def test_ingest_metadata(
ingestion_config, ingestion_config,
db_service, db_service,
metadata, metadata,
db_name,
): ):
run_workflow(MetadataWorkflow, ingestion_config) run_workflow(MetadataWorkflow, ingestion_config)
table: Table = metadata.get_by_name( table: Table = metadata.get_by_name(
Table, Table,
f"{db_service.fullyQualifiedName.root}.AdventureWorks.HumanResources.Department", f"{db_service.fullyQualifiedName.root}.{db_name}.SalesLT.Customer",
) )
assert table is not None assert table is not None
assert table.columns[0].name.root == "DepartmentID" assert [c.name.root for c in table.columns] == [
"CustomerID",
"NameStyle",
"Title",
"FirstName",
"MiddleName",
"LastName",
"Suffix",
"CompanyName",
"SalesPerson",
"EmailAddress",
"Phone",
"PasswordHash",
"PasswordSalt",
"rowguid",
"ModifiedDate",
]
assert table.columns[0].constraint == Constraint.PRIMARY_KEY assert table.columns[0].constraint == Constraint.PRIMARY_KEY
assert table.columns[1].name.root == "Name"
assert table.columns[2].name.root == "GroupName"
assert table.columns[3].name.root == "ModifiedDate"

View File

@ -10,7 +10,7 @@ if not sys.version_info >= (3, 9):
@pytest.fixture() @pytest.fixture()
def usage_config(db_service, workflow_config): def usage_config(db_service, workflow_config, db_name):
return { return {
"source": { "source": {
"type": "mssql-usage", "type": "mssql-usage",
@ -19,7 +19,7 @@ def usage_config(db_service, workflow_config):
"config": { "config": {
"queryLogDuration": 2, "queryLogDuration": 2,
"resultLimit": 1000, "resultLimit": 1000,
"databaseFilterPattern": {"includes": ["TestDB", "AdventureWorks"]}, "databaseFilterPattern": {"includes": ["TestDB", db_name]},
}, },
}, },
}, },