2022-08-16 10:07:47 +05:30
|
|
|
import os
|
2024-11-22 20:32:24 +05:30
|
|
|
import pathlib
|
2021-02-03 17:37:09 -08:00
|
|
|
import subprocess
|
2021-04-13 17:30:24 -07:00
|
|
|
import time
|
2024-11-22 20:32:24 +05:30
|
|
|
from pathlib import Path
|
2021-02-03 17:37:09 -08:00
|
|
|
|
2021-04-01 12:15:05 -07:00
|
|
|
import pytest
|
2021-02-03 17:37:09 -08:00
|
|
|
|
2024-11-22 20:32:24 +05:30
|
|
|
from datahub.ingestion.source.sql.mssql.job_models import StoredProcedure
|
|
|
|
from datahub.ingestion.source.sql.mssql.stored_procedure_lineage import (
|
|
|
|
generate_procedure_lineage,
|
|
|
|
)
|
|
|
|
from datahub.sql_parsing.schema_resolver import SchemaResolver
|
2021-12-16 23:07:38 -05:00
|
|
|
from tests.test_helpers import mce_helpers
|
|
|
|
from tests.test_helpers.click_helpers import run_datahub_cmd
|
2023-10-31 21:28:38 -07:00
|
|
|
from tests.test_helpers.docker_helpers import cleanup_image, wait_for_port
|
2021-02-18 14:47:49 -08:00
|
|
|
|
|
|
|
|
2022-08-16 10:07:47 +05:30
|
|
|
@pytest.fixture(scope="module")
|
|
|
|
def mssql_runner(docker_compose_runner, pytestconfig):
|
2021-02-18 14:47:49 -08:00
|
|
|
test_resources_dir = pytestconfig.rootpath / "tests/integration/sql_server"
|
2021-04-13 17:30:24 -07:00
|
|
|
with docker_compose_runner(
|
|
|
|
test_resources_dir / "docker-compose.yml", "sql-server"
|
|
|
|
) as docker_services:
|
|
|
|
# Wait for SQL Server to be ready. We wait an extra couple seconds, as the port being available
|
|
|
|
# does not mean the server is accepting connections.
|
|
|
|
# TODO: find a better way to check for liveness.
|
|
|
|
wait_for_port(docker_services, "testsqlserver", 1433)
|
|
|
|
time.sleep(5)
|
|
|
|
|
|
|
|
# Run the setup.sql file to populate the database.
|
2024-07-24 13:11:25 +02:00
|
|
|
command = "docker exec testsqlserver /opt/mssql-tools18/bin/sqlcmd -C -S localhost -U sa -P 'test!Password' -d master -i /setup/setup.sql"
|
2024-05-15 22:31:05 -07:00
|
|
|
ret = subprocess.run(command, shell=True, capture_output=True)
|
2021-04-13 17:30:24 -07:00
|
|
|
assert ret.returncode == 0
|
2022-08-16 10:07:47 +05:30
|
|
|
yield docker_services
|
2021-04-13 17:30:24 -07:00
|
|
|
|
2023-10-31 21:28:38 -07:00
|
|
|
# The image is pretty large, so we remove it after the test.
|
|
|
|
cleanup_image("mcr.microsoft.com/mssql/server")
|
|
|
|
|
2021-12-16 23:07:38 -05:00
|
|
|
|
2022-08-16 10:07:47 +05:30
|
|
|
SOURCE_FILES_PATH = "./tests/integration/sql_server/source_files"
|
|
|
|
config_file = os.listdir(SOURCE_FILES_PATH)
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize("config_file", config_file)
|
|
|
|
@pytest.mark.integration
|
|
|
|
def test_mssql_ingest(mssql_runner, pytestconfig, tmp_path, mock_time, config_file):
|
|
|
|
test_resources_dir = pytestconfig.rootpath / "tests/integration/sql_server"
|
|
|
|
# Run the metadata ingestion pipeline.
|
|
|
|
config_file_path = (test_resources_dir / f"source_files/{config_file}").resolve()
|
|
|
|
run_datahub_cmd(
|
|
|
|
["ingest", "-c", f"{config_file_path}"], tmp_path=tmp_path, check_result=True
|
|
|
|
)
|
|
|
|
|
|
|
|
# Verify the output.
|
|
|
|
mce_helpers.check_golden_file(
|
|
|
|
pytestconfig,
|
|
|
|
output_path=tmp_path / "mssql_mces.json",
|
|
|
|
golden_path=test_resources_dir
|
2025-01-18 15:06:20 +05:30
|
|
|
/ f"golden_files/golden_mces_{config_file.replace('yml', 'json')}",
|
2023-08-24 12:18:03 +03:00
|
|
|
ignore_paths=[
|
|
|
|
r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]\['job_id'\]",
|
|
|
|
r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]\['date_created'\]",
|
|
|
|
r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]\['date_modified'\]",
|
|
|
|
],
|
2022-08-16 10:07:47 +05:30
|
|
|
)
|
2024-11-22 20:32:24 +05:30
|
|
|
|
|
|
|
|
|
|
|
PROCEDURE_SQLS_DIR = pathlib.Path(__file__).parent / "procedures"
|
|
|
|
PROCEDURES_GOLDEN_DIR = pathlib.Path(__file__).parent / "golden_files/procedures/"
|
|
|
|
procedure_sqls = [sql_file.name for sql_file in PROCEDURE_SQLS_DIR.iterdir()]
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize("procedure_sql_file", procedure_sqls)
|
|
|
|
@pytest.mark.integration
|
2025-02-12 15:32:31 -08:00
|
|
|
def test_stored_procedure_lineage(procedure_sql_file: str) -> None:
|
2024-11-22 20:32:24 +05:30
|
|
|
sql_file_path = PROCEDURE_SQLS_DIR / procedure_sql_file
|
|
|
|
procedure_code = sql_file_path.read_text()
|
|
|
|
|
|
|
|
# Procedure file is named as <db>.<schema>.<procedure_name>
|
|
|
|
splits = procedure_sql_file.split(".")
|
|
|
|
db = splits[0]
|
|
|
|
schema = splits[1]
|
|
|
|
name = splits[2]
|
|
|
|
|
|
|
|
procedure = StoredProcedure(
|
|
|
|
db=db,
|
|
|
|
schema=schema,
|
|
|
|
name=name,
|
|
|
|
flow=None, # type: ignore # flow is not used in this test
|
|
|
|
code=procedure_code,
|
|
|
|
)
|
|
|
|
data_job_urn = f"urn:li:dataJob:(urn:li:dataFlow:(mssql,{db}.{schema}.stored_procedures,PROD),{name})"
|
|
|
|
|
|
|
|
schema_resolver = SchemaResolver(platform="mssql")
|
|
|
|
|
|
|
|
mcps = list(
|
|
|
|
generate_procedure_lineage(
|
|
|
|
schema_resolver=schema_resolver,
|
|
|
|
procedure=procedure,
|
|
|
|
procedure_job_urn=data_job_urn,
|
|
|
|
is_temp_table=lambda name: "temp" in name.lower(),
|
|
|
|
)
|
|
|
|
)
|
|
|
|
mce_helpers.check_goldens_stream(
|
|
|
|
outputs=mcps,
|
|
|
|
golden_path=(
|
|
|
|
PROCEDURES_GOLDEN_DIR / Path(procedure_sql_file).with_suffix(".json")
|
|
|
|
),
|
|
|
|
)
|