65 lines
1.9 KiB
Python

import pathlib
from pathlib import Path
import pytest
from datahub.ingestion.source.sql.stored_procedures.base import (
BaseProcedure,
generate_procedure_lineage,
)
from datahub.sql_parsing.schema_resolver import SchemaResolver
from datahub.testing import mce_helpers
PROCEDURE_SQLS_DIR = pathlib.Path(__file__).parent / "procedures"
PROCEDURES_GOLDEN_DIR = pathlib.Path(__file__).parent / "golden_files"
platforms = [folder.name for folder in PROCEDURE_SQLS_DIR.iterdir() if folder.is_dir()]
procedure_sqls = [
(platform, sql_file.name, f"{platform}/{sql_file.name}")
for platform in platforms
for sql_file in PROCEDURE_SQLS_DIR.glob(f"{platform}/*.sql")
]
@pytest.mark.parametrize("platform, sql_file_name, procedure_sql_file", procedure_sqls)
@pytest.mark.integration
def test_stored_procedure_lineage(
platform: str, sql_file_name: str, procedure_sql_file: str
) -> None:
sql_file_path = PROCEDURE_SQLS_DIR / procedure_sql_file
procedure_code = sql_file_path.read_text()
name = sql_file_name
db = "default_db"
schema = "default_schema"
procedure = BaseProcedure(
name=name,
procedure_definition=procedure_code,
created=None,
last_altered=None,
comment=None,
argument_signature=None,
return_type=None,
language="SQL",
extra_properties=None,
)
data_job_urn = f"urn:li:dataJob:(urn:li:dataFlow:({platform},{db}.{schema}.stored_procedures,PROD),{name})"
schema_resolver = SchemaResolver(platform=platform)
mcps = list(
generate_procedure_lineage(
schema_resolver=schema_resolver,
procedure=procedure,
procedure_job_urn=data_job_urn,
default_db=db,
default_schema=schema,
)
)
mce_helpers.check_goldens_stream(
outputs=mcps,
golden_path=(
PROCEDURES_GOLDEN_DIR / Path(procedure_sql_file).with_suffix(".json")
),
)