Mayuri Nehate ac9997d970
feat(ingest/snowflake): ingest stored procedures (#12929)
Co-authored-by: Sergio Gómez Villamor <sgomezvillamor@gmail.com>
2025-03-26 20:02:55 +05:30

113 lines
4.2 KiB
Python

import os
import pathlib
import subprocess
import time
from pathlib import Path
import pytest
from datahub.ingestion.source.sql.mssql.job_models import StoredProcedure
from datahub.ingestion.source.sql.stored_procedures.base import (
generate_procedure_lineage,
)
from datahub.sql_parsing.schema_resolver import SchemaResolver
from tests.test_helpers import mce_helpers
from tests.test_helpers.click_helpers import run_datahub_cmd
from tests.test_helpers.docker_helpers import cleanup_image, wait_for_port
@pytest.fixture(scope="module")
def mssql_runner(docker_compose_runner, pytestconfig):
test_resources_dir = pytestconfig.rootpath / "tests/integration/sql_server"
with docker_compose_runner(
test_resources_dir / "docker-compose.yml", "sql-server"
) as docker_services:
# Wait for SQL Server to be ready. We wait an extra couple seconds, as the port being available
# does not mean the server is accepting connections.
# TODO: find a better way to check for liveness.
wait_for_port(docker_services, "testsqlserver", 1433)
time.sleep(5)
# Run the setup.sql file to populate the database.
command = "docker exec testsqlserver /opt/mssql-tools18/bin/sqlcmd -C -S localhost -U sa -P 'test!Password' -d master -i /setup/setup.sql"
ret = subprocess.run(command, shell=True, capture_output=True)
assert ret.returncode == 0
yield docker_services
# The image is pretty large, so we remove it after the test.
cleanup_image("mcr.microsoft.com/mssql/server")
SOURCE_FILES_PATH = "./tests/integration/sql_server/source_files"
config_file = os.listdir(SOURCE_FILES_PATH)
@pytest.mark.parametrize("config_file", config_file)
@pytest.mark.integration
def test_mssql_ingest(mssql_runner, pytestconfig, tmp_path, mock_time, config_file):
test_resources_dir = pytestconfig.rootpath / "tests/integration/sql_server"
# Run the metadata ingestion pipeline.
config_file_path = (test_resources_dir / f"source_files/{config_file}").resolve()
run_datahub_cmd(
["ingest", "-c", f"{config_file_path}"], tmp_path=tmp_path, check_result=True
)
# Verify the output.
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / "mssql_mces.json",
golden_path=test_resources_dir
/ f"golden_files/golden_mces_{config_file.replace('yml', 'json')}",
ignore_paths=[
r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]\['job_id'\]",
r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]\['date_created'\]",
r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]\['date_modified'\]",
],
)
PROCEDURE_SQLS_DIR = pathlib.Path(__file__).parent / "procedures"
PROCEDURES_GOLDEN_DIR = pathlib.Path(__file__).parent / "golden_files/procedures/"
procedure_sqls = [sql_file.name for sql_file in PROCEDURE_SQLS_DIR.iterdir()]
@pytest.mark.parametrize("procedure_sql_file", procedure_sqls)
@pytest.mark.integration
def test_stored_procedure_lineage(procedure_sql_file: str) -> None:
sql_file_path = PROCEDURE_SQLS_DIR / procedure_sql_file
procedure_code = sql_file_path.read_text()
# Procedure file is named as <db>.<schema>.<procedure_name>
splits = procedure_sql_file.split(".")
db = splits[0]
schema = splits[1]
name = splits[2]
procedure = StoredProcedure(
db=db,
schema=schema,
name=name,
flow=None, # type: ignore # flow is not used in this test
code=procedure_code,
)
data_job_urn = f"urn:li:dataJob:(urn:li:dataFlow:(mssql,{db}.{schema}.stored_procedures,PROD),{name})"
schema_resolver = SchemaResolver(platform="mssql")
mcps = list(
generate_procedure_lineage(
schema_resolver=schema_resolver,
procedure=procedure.to_base_procedure(),
procedure_job_urn=data_job_urn,
is_temp_table=lambda name: "temp" in name.lower(),
default_db=procedure.db,
default_schema=procedure.schema,
)
)
mce_helpers.check_goldens_stream(
outputs=mcps,
golden_path=(
PROCEDURES_GOLDEN_DIR / Path(procedure_sql_file).with_suffix(".json")
),
)