datahub/metadata-ingestion/tests/unit/test_redshift_lineage.py
2023-10-10 20:24:08 -07:00

152 lines
4.5 KiB
Python

from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.redshift.config import RedshiftConfig
from datahub.ingestion.source.redshift.lineage import RedshiftLineageExtractor
from datahub.ingestion.source.redshift.report import RedshiftReport
from datahub.utilities.sqlglot_lineage import ColumnLineageInfo, DownstreamColumnRef
def test_get_sources_from_query():
config = RedshiftConfig(host_port="localhost:5439", database="test")
report = RedshiftReport()
test_query = """
select * from my_schema.my_table
"""
lineage_extractor = RedshiftLineageExtractor(
config, report, PipelineContext(run_id="foo")
)
lineage_datasets, _ = lineage_extractor._get_sources_from_query(
db_name="test", query=test_query
)
assert len(lineage_datasets) == 1
lineage = lineage_datasets[0]
assert (
lineage.urn
== "urn:li:dataset:(urn:li:dataPlatform:redshift,test.my_schema.my_table,PROD)"
)
def test_get_sources_from_query_with_only_table_name():
config = RedshiftConfig(host_port="localhost:5439", database="test")
report = RedshiftReport()
test_query = """
select * from my_table
"""
lineage_extractor = RedshiftLineageExtractor(
config, report, PipelineContext(run_id="foo")
)
lineage_datasets, _ = lineage_extractor._get_sources_from_query(
db_name="test", query=test_query
)
assert len(lineage_datasets) == 1
lineage = lineage_datasets[0]
assert (
lineage.urn
== "urn:li:dataset:(urn:li:dataPlatform:redshift,test.public.my_table,PROD)"
)
def test_get_sources_from_query_with_database():
config = RedshiftConfig(host_port="localhost:5439", database="test")
report = RedshiftReport()
test_query = """
select * from test.my_schema.my_table
"""
lineage_extractor = RedshiftLineageExtractor(
config, report, PipelineContext(run_id="foo")
)
lineage_datasets, _ = lineage_extractor._get_sources_from_query(
db_name="test", query=test_query
)
assert len(lineage_datasets) == 1
lineage = lineage_datasets[0]
assert (
lineage.urn
== "urn:li:dataset:(urn:li:dataPlatform:redshift,test.my_schema.my_table,PROD)"
)
def test_get_sources_from_query_with_non_default_database():
config = RedshiftConfig(host_port="localhost:5439", database="test")
report = RedshiftReport()
test_query = """
select * from test2.my_schema.my_table
"""
lineage_extractor = RedshiftLineageExtractor(
config, report, PipelineContext(run_id="foo")
)
lineage_datasets, _ = lineage_extractor._get_sources_from_query(
db_name="test", query=test_query
)
assert len(lineage_datasets) == 1
lineage = lineage_datasets[0]
assert (
lineage.urn
== "urn:li:dataset:(urn:li:dataPlatform:redshift,test2.my_schema.my_table,PROD)"
)
def test_get_sources_from_query_with_only_table():
config = RedshiftConfig(host_port="localhost:5439", database="test")
report = RedshiftReport()
test_query = """
select * from my_table
"""
lineage_extractor = RedshiftLineageExtractor(
config, report, PipelineContext(run_id="foo")
)
lineage_datasets, _ = lineage_extractor._get_sources_from_query(
db_name="test", query=test_query
)
assert len(lineage_datasets) == 1
lineage = lineage_datasets[0]
assert (
lineage.urn
== "urn:li:dataset:(urn:li:dataPlatform:redshift,test.public.my_table,PROD)"
)
def test_cll():
config = RedshiftConfig(host_port="localhost:5439", database="test")
report = RedshiftReport()
test_query = """
select a,b,c from db.public.customer inner join db.public.order on db.public.customer.id = db.public.order.customer_id
"""
lineage_extractor = RedshiftLineageExtractor(
config, report, PipelineContext(run_id="foo")
)
_, cll = lineage_extractor._get_sources_from_query(db_name="db", query=test_query)
assert cll == [
ColumnLineageInfo(
downstream=DownstreamColumnRef(table=None, column="a"),
upstreams=[],
logic=None,
),
ColumnLineageInfo(
downstream=DownstreamColumnRef(table=None, column="b"),
upstreams=[],
logic=None,
),
ColumnLineageInfo(
downstream=DownstreamColumnRef(table=None, column="c"),
upstreams=[],
logic=None,
),
]