feat(ingestion/redshift): CLL support in redshift (#8921)

This commit is contained in:
siddiquebagwan-gslab 2023-10-11 08:54:08 +05:30 committed by GitHub
parent dfcea2441e
commit 10a190470e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 233 additions and 80 deletions

View File

@ -132,6 +132,10 @@ class RedshiftConfig(
description="Whether `schema_pattern` is matched against fully qualified schema name `<database>.<schema>`.", description="Whether `schema_pattern` is matched against fully qualified schema name `<database>.<schema>`.",
) )
extract_column_level_lineage: bool = Field(
default=True, description="Whether to extract column level lineage."
)
@root_validator(pre=True) @root_validator(pre=True)
def check_email_is_set_on_usage(cls, values): def check_email_is_set_on_usage(cls, values):
if values.get("include_usage_statistics"): if values.get("include_usage_statistics"):

View File

@ -9,10 +9,12 @@ from urllib.parse import urlparse
import humanfriendly import humanfriendly
import redshift_connector import redshift_connector
from sqllineage.runner import LineageRunner
import datahub.emitter.mce_builder as builder
import datahub.utilities.sqlglot_lineage as sqlglot_l
from datahub.emitter import mce_builder from datahub.emitter import mce_builder
from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.aws.s3_util import strip_s3_prefix from datahub.ingestion.source.aws.s3_util import strip_s3_prefix
from datahub.ingestion.source.redshift.common import get_db_name from datahub.ingestion.source.redshift.common import get_db_name
from datahub.ingestion.source.redshift.config import LineageMode, RedshiftConfig from datahub.ingestion.source.redshift.config import LineageMode, RedshiftConfig
@ -28,13 +30,19 @@ from datahub.ingestion.source.redshift.report import RedshiftReport
from datahub.ingestion.source.state.redundant_run_skip_handler import ( from datahub.ingestion.source.state.redundant_run_skip_handler import (
RedundantLineageRunSkipHandler, RedundantLineageRunSkipHandler,
) )
from datahub.metadata.com.linkedin.pegasus2avro.dataset import UpstreamLineage from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
FineGrainedLineage,
FineGrainedLineageDownstreamType,
FineGrainedLineageUpstreamType,
UpstreamLineage,
)
from datahub.metadata.schema_classes import ( from datahub.metadata.schema_classes import (
DatasetLineageTypeClass, DatasetLineageTypeClass,
UpstreamClass, UpstreamClass,
UpstreamLineageClass, UpstreamLineageClass,
) )
from datahub.utilities import memory_footprint from datahub.utilities import memory_footprint
from datahub.utilities.urns import dataset_urn
logger: logging.Logger = logging.getLogger(__name__) logger: logging.Logger = logging.getLogger(__name__)
@ -56,13 +64,14 @@ class LineageCollectorType(Enum):
@dataclass(frozen=True, eq=True) @dataclass(frozen=True, eq=True)
class LineageDataset: class LineageDataset:
platform: LineageDatasetPlatform platform: LineageDatasetPlatform
path: str urn: str
@dataclass() @dataclass()
class LineageItem: class LineageItem:
dataset: LineageDataset dataset: LineageDataset
upstreams: Set[LineageDataset] upstreams: Set[LineageDataset]
cll: Optional[List[sqlglot_l.ColumnLineageInfo]]
collector_type: LineageCollectorType collector_type: LineageCollectorType
dataset_lineage_type: str = field(init=False) dataset_lineage_type: str = field(init=False)
@ -83,10 +92,12 @@ class RedshiftLineageExtractor:
self, self,
config: RedshiftConfig, config: RedshiftConfig,
report: RedshiftReport, report: RedshiftReport,
context: PipelineContext,
redundant_run_skip_handler: Optional[RedundantLineageRunSkipHandler] = None, redundant_run_skip_handler: Optional[RedundantLineageRunSkipHandler] = None,
): ):
self.config = config self.config = config
self.report = report self.report = report
self.context = context
self._lineage_map: Dict[str, LineageItem] = defaultdict() self._lineage_map: Dict[str, LineageItem] = defaultdict()
self.redundant_run_skip_handler = redundant_run_skip_handler self.redundant_run_skip_handler = redundant_run_skip_handler
@ -121,33 +132,37 @@ class RedshiftLineageExtractor:
return path return path
def _get_sources_from_query(self, db_name: str, query: str) -> List[LineageDataset]: def _get_sources_from_query(
self, db_name: str, query: str
) -> Tuple[List[LineageDataset], Optional[List[sqlglot_l.ColumnLineageInfo]]]:
sources: List[LineageDataset] = list() sources: List[LineageDataset] = list()
parser = LineageRunner(query) parsed_result: Optional[
sqlglot_l.SqlParsingResult
] = sqlglot_l.create_lineage_sql_parsed_result(
query=query,
platform=LineageDatasetPlatform.REDSHIFT.value,
platform_instance=self.config.platform_instance,
database=db_name,
schema=str(self.config.default_schema),
graph=self.context.graph,
env=self.config.env,
)
for table in parser.source_tables: if parsed_result is None:
split = str(table).split(".") logger.debug(f"native query parsing failed for {query}")
if len(split) == 3: return sources, None
db_name, source_schema, source_table = split
elif len(split) == 2:
source_schema, source_table = split
else:
raise ValueError(
f"Invalid table name {table} in query {query}. "
f"Expected format: [db_name].[schema].[table] or [schema].[table] or [table]."
)
if source_schema == "<default>": logger.debug(f"parsed_result = {parsed_result}")
source_schema = str(self.config.default_schema)
for table_urn in parsed_result.in_tables:
source = LineageDataset( source = LineageDataset(
platform=LineageDatasetPlatform.REDSHIFT, platform=LineageDatasetPlatform.REDSHIFT,
path=f"{db_name}.{source_schema}.{source_table}", urn=table_urn,
) )
sources.append(source) sources.append(source)
return sources return sources, parsed_result.column_lineage
def _build_s3_path_from_row(self, filename: str) -> str: def _build_s3_path_from_row(self, filename: str) -> str:
path = filename.strip() path = filename.strip()
@ -165,9 +180,11 @@ class RedshiftLineageExtractor:
source_table: Optional[str], source_table: Optional[str],
ddl: Optional[str], ddl: Optional[str],
filename: Optional[str], filename: Optional[str],
) -> List[LineageDataset]: ) -> Tuple[List[LineageDataset], Optional[List[sqlglot_l.ColumnLineageInfo]]]:
sources: List[LineageDataset] = list() sources: List[LineageDataset] = list()
# Source # Source
cll: Optional[List[sqlglot_l.ColumnLineageInfo]] = None
if ( if (
lineage_type lineage_type
in { in {
@ -177,7 +194,7 @@ class RedshiftLineageExtractor:
and ddl is not None and ddl is not None
): ):
try: try:
sources = self._get_sources_from_query(db_name=db_name, query=ddl) sources, cll = self._get_sources_from_query(db_name=db_name, query=ddl)
except Exception as e: except Exception as e:
logger.warning( logger.warning(
f"Error parsing query {ddl} for getting lineage. Error was {e}." f"Error parsing query {ddl} for getting lineage. Error was {e}."
@ -192,22 +209,38 @@ class RedshiftLineageExtractor:
"Only s3 source supported with copy. The source was: {path}." "Only s3 source supported with copy. The source was: {path}."
) )
self.report.num_lineage_dropped_not_support_copy_path += 1 self.report.num_lineage_dropped_not_support_copy_path += 1
return sources return sources, cll
path = strip_s3_prefix(self._get_s3_path(path)) path = strip_s3_prefix(self._get_s3_path(path))
urn = make_dataset_urn_with_platform_instance(
platform=platform.value,
name=path,
env=self.config.env,
platform_instance=self.config.platform_instance_map.get(
platform.value
)
if self.config.platform_instance_map is not None
else None,
)
elif source_schema is not None and source_table is not None: elif source_schema is not None and source_table is not None:
platform = LineageDatasetPlatform.REDSHIFT platform = LineageDatasetPlatform.REDSHIFT
path = f"{db_name}.{source_schema}.{source_table}" path = f"{db_name}.{source_schema}.{source_table}"
urn = make_dataset_urn_with_platform_instance(
platform=platform.value,
platform_instance=self.config.platform_instance,
name=path,
env=self.config.env,
)
else: else:
return [] return [], cll
sources = [ sources = [
LineageDataset( LineageDataset(
platform=platform, platform=platform,
path=path, urn=urn,
) )
] ]
return sources return sources, cll
def _populate_lineage_map( def _populate_lineage_map(
self, self,
@ -231,6 +264,7 @@ class RedshiftLineageExtractor:
:rtype: None :rtype: None
""" """
try: try:
cll: Optional[List[sqlglot_l.ColumnLineageInfo]] = None
raw_db_name = database raw_db_name = database
alias_db_name = get_db_name(self.config) alias_db_name = get_db_name(self.config)
@ -243,7 +277,7 @@ class RedshiftLineageExtractor:
if not target: if not target:
continue continue
sources = self._get_sources( sources, cll = self._get_sources(
lineage_type, lineage_type,
alias_db_name, alias_db_name,
source_schema=lineage_row.source_schema, source_schema=lineage_row.source_schema,
@ -251,6 +285,7 @@ class RedshiftLineageExtractor:
ddl=lineage_row.ddl, ddl=lineage_row.ddl,
filename=lineage_row.filename, filename=lineage_row.filename,
) )
target.cll = cll
target.upstreams.update( target.upstreams.update(
self._get_upstream_lineages( self._get_upstream_lineages(
@ -262,20 +297,16 @@ class RedshiftLineageExtractor:
) )
# Merging downstreams if dataset already exists and has downstreams # Merging downstreams if dataset already exists and has downstreams
if target.dataset.path in self._lineage_map: if target.dataset.urn in self._lineage_map:
self._lineage_map[ self._lineage_map[target.dataset.urn].upstreams = self._lineage_map[
target.dataset.path target.dataset.urn
].upstreams = self._lineage_map[ ].upstreams.union(target.upstreams)
target.dataset.path
].upstreams.union(
target.upstreams
)
else: else:
self._lineage_map[target.dataset.path] = target self._lineage_map[target.dataset.urn] = target
logger.debug( logger.debug(
f"Lineage[{target}]:{self._lineage_map[target.dataset.path]}" f"Lineage[{target}]:{self._lineage_map[target.dataset.urn]}"
) )
except Exception as e: except Exception as e:
self.warn( self.warn(
@ -308,17 +339,34 @@ class RedshiftLineageExtractor:
target_platform = LineageDatasetPlatform.S3 target_platform = LineageDatasetPlatform.S3
# Following call requires 'filename' key in lineage_row # Following call requires 'filename' key in lineage_row
target_path = self._build_s3_path_from_row(lineage_row.filename) target_path = self._build_s3_path_from_row(lineage_row.filename)
urn = make_dataset_urn_with_platform_instance(
platform=target_platform.value,
name=target_path,
env=self.config.env,
platform_instance=self.config.platform_instance_map.get(
target_platform.value
)
if self.config.platform_instance_map is not None
else None,
)
except ValueError as e: except ValueError as e:
self.warn(logger, "non-s3-lineage", str(e)) self.warn(logger, "non-s3-lineage", str(e))
return None return None
else: else:
target_platform = LineageDatasetPlatform.REDSHIFT target_platform = LineageDatasetPlatform.REDSHIFT
target_path = f"{alias_db_name}.{lineage_row.target_schema}.{lineage_row.target_table}" target_path = f"{alias_db_name}.{lineage_row.target_schema}.{lineage_row.target_table}"
urn = make_dataset_urn_with_platform_instance(
platform=target_platform.value,
platform_instance=self.config.platform_instance,
name=target_path,
env=self.config.env,
)
return LineageItem( return LineageItem(
dataset=LineageDataset(platform=target_platform, path=target_path), dataset=LineageDataset(platform=target_platform, urn=urn),
upstreams=set(), upstreams=set(),
collector_type=lineage_type, collector_type=lineage_type,
cll=None,
) )
def _get_upstream_lineages( def _get_upstream_lineages(
@ -331,11 +379,22 @@ class RedshiftLineageExtractor:
targe_source = [] targe_source = []
for source in sources: for source in sources:
if source.platform == LineageDatasetPlatform.REDSHIFT: if source.platform == LineageDatasetPlatform.REDSHIFT:
db, schema, table = source.path.split(".") qualified_table_name = dataset_urn.DatasetUrn.create_from_string(
source.urn
).get_entity_id()[1]
db, schema, table = qualified_table_name.split(".")
if db == raw_db_name: if db == raw_db_name:
db = alias_db_name db = alias_db_name
path = f"{db}.{schema}.{table}" path = f"{db}.{schema}.{table}"
source = LineageDataset(platform=source.platform, path=path) source = LineageDataset(
platform=source.platform,
urn=make_dataset_urn_with_platform_instance(
platform=LineageDatasetPlatform.REDSHIFT.value,
platform_instance=self.config.platform_instance,
name=path,
env=self.config.env,
),
)
# Filtering out tables which does not exist in Redshift # Filtering out tables which does not exist in Redshift
# It was deleted in the meantime or query parser did not capture well the table name # It was deleted in the meantime or query parser did not capture well the table name
@ -345,7 +404,7 @@ class RedshiftLineageExtractor:
or not any(table == t.name for t in all_tables[db][schema]) or not any(table == t.name for t in all_tables[db][schema])
): ):
logger.debug( logger.debug(
f"{source.path} missing table, dropping from lineage.", f"{source.urn} missing table, dropping from lineage.",
) )
self.report.num_lineage_tables_dropped += 1 self.report.num_lineage_tables_dropped += 1
continue continue
@ -433,36 +492,73 @@ class RedshiftLineageExtractor:
memory_footprint.total_size(self._lineage_map) memory_footprint.total_size(self._lineage_map)
) )
def make_fine_grained_lineage_class(
self, lineage_item: LineageItem, dataset_urn: str
) -> List[FineGrainedLineage]:
fine_grained_lineages: List[FineGrainedLineage] = []
if (
self.config.extract_column_level_lineage is False
or lineage_item.cll is None
):
logger.debug("CLL extraction is disabled")
return fine_grained_lineages
logger.debug("Extracting column level lineage")
cll: List[sqlglot_l.ColumnLineageInfo] = lineage_item.cll
for cll_info in cll:
downstream = (
[builder.make_schema_field_urn(dataset_urn, cll_info.downstream.column)]
if cll_info.downstream is not None
and cll_info.downstream.column is not None
else []
)
upstreams = [
builder.make_schema_field_urn(column_ref.table, column_ref.column)
for column_ref in cll_info.upstreams
]
fine_grained_lineages.append(
FineGrainedLineage(
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=downstream,
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=upstreams,
)
)
logger.debug(f"Created fine_grained_lineage for {dataset_urn}")
return fine_grained_lineages
def get_lineage( def get_lineage(
self, self,
table: Union[RedshiftTable, RedshiftView], table: Union[RedshiftTable, RedshiftView],
dataset_urn: str, dataset_urn: str,
schema: RedshiftSchema, schema: RedshiftSchema,
) -> Optional[Tuple[UpstreamLineageClass, Dict[str, str]]]: ) -> Optional[Tuple[UpstreamLineageClass, Dict[str, str]]]:
dataset_key = mce_builder.dataset_urn_to_key(dataset_urn)
if dataset_key is None:
return None
upstream_lineage: List[UpstreamClass] = [] upstream_lineage: List[UpstreamClass] = []
if dataset_key.name in self._lineage_map: cll_lineage: List[FineGrainedLineage] = []
item = self._lineage_map[dataset_key.name]
if dataset_urn in self._lineage_map:
item = self._lineage_map[dataset_urn]
for upstream in item.upstreams: for upstream in item.upstreams:
upstream_table = UpstreamClass( upstream_table = UpstreamClass(
dataset=make_dataset_urn_with_platform_instance( dataset=upstream.urn,
upstream.platform.value,
upstream.path,
platform_instance=self.config.platform_instance_map.get(
upstream.platform.value
)
if self.config.platform_instance_map
else None,
env=self.config.env,
),
type=item.dataset_lineage_type, type=item.dataset_lineage_type,
) )
upstream_lineage.append(upstream_table) upstream_lineage.append(upstream_table)
cll_lineage = self.make_fine_grained_lineage_class(
lineage_item=item,
dataset_urn=dataset_urn,
)
tablename = table.name tablename = table.name
if table.type == "EXTERNAL_TABLE": if table.type == "EXTERNAL_TABLE":
# external_db_params = schema.option # external_db_params = schema.option
@ -489,7 +585,12 @@ class RedshiftLineageExtractor:
else: else:
return None return None
return UpstreamLineage(upstreams=upstream_lineage), {} return (
UpstreamLineage(
upstreams=upstream_lineage, fineGrainedLineages=cll_lineage or None
),
{},
)
def report_status(self, step: str, status: bool) -> None: def report_status(self, step: str, status: bool) -> None:
if self.redundant_run_skip_handler: if self.redundant_run_skip_handler:

View File

@ -881,6 +881,7 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource):
self.lineage_extractor = RedshiftLineageExtractor( self.lineage_extractor = RedshiftLineageExtractor(
config=self.config, config=self.config,
report=self.report, report=self.report,
context=self.ctx,
redundant_run_skip_handler=self.redundant_lineage_run_skip_handler, redundant_run_skip_handler=self.redundant_lineage_run_skip_handler,
) )

View File

@ -1,6 +1,8 @@
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.redshift.config import RedshiftConfig from datahub.ingestion.source.redshift.config import RedshiftConfig
from datahub.ingestion.source.redshift.lineage import RedshiftLineageExtractor from datahub.ingestion.source.redshift.lineage import RedshiftLineageExtractor
from datahub.ingestion.source.redshift.report import RedshiftReport from datahub.ingestion.source.redshift.report import RedshiftReport
from datahub.utilities.sqlglot_lineage import ColumnLineageInfo, DownstreamColumnRef
def test_get_sources_from_query(): def test_get_sources_from_query():
@ -10,14 +12,20 @@ def test_get_sources_from_query():
test_query = """ test_query = """
select * from my_schema.my_table select * from my_schema.my_table
""" """
lineage_extractor = RedshiftLineageExtractor(config, report) lineage_extractor = RedshiftLineageExtractor(
lineage_datasets = lineage_extractor._get_sources_from_query( config, report, PipelineContext(run_id="foo")
)
lineage_datasets, _ = lineage_extractor._get_sources_from_query(
db_name="test", query=test_query db_name="test", query=test_query
) )
assert len(lineage_datasets) == 1 assert len(lineage_datasets) == 1
lineage = lineage_datasets[0] lineage = lineage_datasets[0]
assert lineage.path == "test.my_schema.my_table"
assert (
lineage.urn
== "urn:li:dataset:(urn:li:dataPlatform:redshift,test.my_schema.my_table,PROD)"
)
def test_get_sources_from_query_with_only_table_name(): def test_get_sources_from_query_with_only_table_name():
@ -27,14 +35,20 @@ def test_get_sources_from_query_with_only_table_name():
test_query = """ test_query = """
select * from my_table select * from my_table
""" """
lineage_extractor = RedshiftLineageExtractor(config, report) lineage_extractor = RedshiftLineageExtractor(
lineage_datasets = lineage_extractor._get_sources_from_query( config, report, PipelineContext(run_id="foo")
)
lineage_datasets, _ = lineage_extractor._get_sources_from_query(
db_name="test", query=test_query db_name="test", query=test_query
) )
assert len(lineage_datasets) == 1 assert len(lineage_datasets) == 1
lineage = lineage_datasets[0] lineage = lineage_datasets[0]
assert lineage.path == "test.public.my_table"
assert (
lineage.urn
== "urn:li:dataset:(urn:li:dataPlatform:redshift,test.public.my_table,PROD)"
)
def test_get_sources_from_query_with_database(): def test_get_sources_from_query_with_database():
@ -44,14 +58,20 @@ def test_get_sources_from_query_with_database():
test_query = """ test_query = """
select * from test.my_schema.my_table select * from test.my_schema.my_table
""" """
lineage_extractor = RedshiftLineageExtractor(config, report) lineage_extractor = RedshiftLineageExtractor(
lineage_datasets = lineage_extractor._get_sources_from_query( config, report, PipelineContext(run_id="foo")
)
lineage_datasets, _ = lineage_extractor._get_sources_from_query(
db_name="test", query=test_query db_name="test", query=test_query
) )
assert len(lineage_datasets) == 1 assert len(lineage_datasets) == 1
lineage = lineage_datasets[0] lineage = lineage_datasets[0]
assert lineage.path == "test.my_schema.my_table"
assert (
lineage.urn
== "urn:li:dataset:(urn:li:dataPlatform:redshift,test.my_schema.my_table,PROD)"
)
def test_get_sources_from_query_with_non_default_database(): def test_get_sources_from_query_with_non_default_database():
@ -61,14 +81,20 @@ def test_get_sources_from_query_with_non_default_database():
test_query = """ test_query = """
select * from test2.my_schema.my_table select * from test2.my_schema.my_table
""" """
lineage_extractor = RedshiftLineageExtractor(config, report) lineage_extractor = RedshiftLineageExtractor(
lineage_datasets = lineage_extractor._get_sources_from_query( config, report, PipelineContext(run_id="foo")
)
lineage_datasets, _ = lineage_extractor._get_sources_from_query(
db_name="test", query=test_query db_name="test", query=test_query
) )
assert len(lineage_datasets) == 1 assert len(lineage_datasets) == 1
lineage = lineage_datasets[0] lineage = lineage_datasets[0]
assert lineage.path == "test2.my_schema.my_table"
assert (
lineage.urn
== "urn:li:dataset:(urn:li:dataPlatform:redshift,test2.my_schema.my_table,PROD)"
)
def test_get_sources_from_query_with_only_table(): def test_get_sources_from_query_with_only_table():
@ -78,27 +104,48 @@ def test_get_sources_from_query_with_only_table():
test_query = """ test_query = """
select * from my_table select * from my_table
""" """
lineage_extractor = RedshiftLineageExtractor(config, report) lineage_extractor = RedshiftLineageExtractor(
lineage_datasets = lineage_extractor._get_sources_from_query( config, report, PipelineContext(run_id="foo")
)
lineage_datasets, _ = lineage_extractor._get_sources_from_query(
db_name="test", query=test_query db_name="test", query=test_query
) )
assert len(lineage_datasets) == 1 assert len(lineage_datasets) == 1
lineage = lineage_datasets[0] lineage = lineage_datasets[0]
assert lineage.path == "test.public.my_table"
assert (
lineage.urn
== "urn:li:dataset:(urn:li:dataPlatform:redshift,test.public.my_table,PROD)"
)
def test_get_sources_from_query_with_four_part_table_should_throw_exception(): def test_cll():
config = RedshiftConfig(host_port="localhost:5439", database="test") config = RedshiftConfig(host_port="localhost:5439", database="test")
report = RedshiftReport() report = RedshiftReport()
test_query = """ test_query = """
select * from database.schema.my_table.test select a,b,c from db.public.customer inner join db.public.order on db.public.customer.id = db.public.order.customer_id
""" """
lineage_extractor = RedshiftLineageExtractor(config, report) lineage_extractor = RedshiftLineageExtractor(
try: config, report, PipelineContext(run_id="foo")
lineage_extractor._get_sources_from_query(db_name="test", query=test_query) )
except ValueError: _, cll = lineage_extractor._get_sources_from_query(db_name="db", query=test_query)
pass
assert f"{test_query} should have thrown a ValueError exception but it didn't" assert cll == [
ColumnLineageInfo(
downstream=DownstreamColumnRef(table=None, column="a"),
upstreams=[],
logic=None,
),
ColumnLineageInfo(
downstream=DownstreamColumnRef(table=None, column="b"),
upstreams=[],
logic=None,
),
ColumnLineageInfo(
downstream=DownstreamColumnRef(table=None, column="c"),
upstreams=[],
logic=None,
),
]