feat(ingest/redshift): lineage for external schema created from redshift (#12826)

This commit is contained in:
Mayuri Nehate 2025-03-11 17:43:34 +05:30 committed by GitHub
parent dd371130c1
commit 31df9c4e67
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 160 additions and 30 deletions

View File

@ -814,8 +814,8 @@ class RedshiftLineageExtractor:
tablename = table.name tablename = table.name
if ( if (
table.is_external_table table.is_external_table()
and schema.is_external_schema and schema.is_external_schema()
and schema.external_platform and schema.external_platform
): ):
# external_db_params = schema.option # external_db_params = schema.option

View File

@ -403,8 +403,8 @@ class RedshiftSqlLineageV2(Closeable):
for table in tables: for table in tables:
schema = db_schemas[self.database][schema_name] schema = db_schemas[self.database][schema_name]
if ( if (
table.is_external_table table.is_external_table()
and schema.is_external_schema and schema.is_external_schema()
and schema.external_platform and schema.external_platform
): ):
# external_db_params = schema.option # external_db_params = schema.option
@ -416,14 +416,26 @@ class RedshiftSqlLineageV2(Closeable):
platform_instance=self.config.platform_instance, platform_instance=self.config.platform_instance,
env=self.config.env, env=self.config.env,
) )
upstream_urn = mce_builder.make_dataset_urn_with_platform_instance( if upstream_platform == self.platform:
upstream_platform, upstream_schema = schema.get_upstream_schema_name() or "public"
f"{schema.external_database}.{table.name}", upstream_dataset_name = (
platform_instance=( f"{schema.external_database}.{upstream_schema}.{table.name}"
)
upstream_platform_instance = self.config.platform_instance
else:
upstream_dataset_name = (
f"{schema.external_database}.{table.name}"
)
upstream_platform_instance = (
self.config.platform_instance_map.get(upstream_platform) self.config.platform_instance_map.get(upstream_platform)
if self.config.platform_instance_map if self.config.platform_instance_map
else None else None
), )
upstream_urn = mce_builder.make_dataset_urn_with_platform_instance(
upstream_platform,
upstream_dataset_name,
platform_instance=upstream_platform_instance,
env=self.config.env, env=self.config.env,
) )

View File

@ -48,7 +48,7 @@ class RedshiftProfiler(GenericProfiler):
if not self.config.schema_pattern.allowed(schema): if not self.config.schema_pattern.allowed(schema):
continue continue
for table in tables[db].get(schema, {}): for table in tables[db].get(schema, {}):
if table.is_external_table: if table.is_external_table() or self.report.is_shared_database:
if not self.config.profiling.profile_external_tables: if not self.config.profiling.profile_external_tables:
# Case 1: If user did not tell us to profile external tables, simply log this. # Case 1: If user did not tell us to profile external tables, simply log this.
self.report.profiling_skipped_other[schema] += 1 self.report.profiling_skipped_other[schema] += 1

View File

@ -83,7 +83,9 @@ class RedshiftCommonQuery:
# NOTE: Tables from shared database are not available in pg_catalog.pg_class # NOTE: Tables from shared database are not available in pg_catalog.pg_class
@staticmethod @staticmethod
def list_tables( def list_tables(
skip_external_tables: bool = False, is_shared_database: bool = False database: str,
skip_external_tables: bool = False,
is_shared_database: bool = False,
) -> str: ) -> str:
# NOTE: it looks like description is available only in pg_description # NOTE: it looks like description is available only in pg_description
# So this remains preferrred way # So this remains preferrred way
@ -123,7 +125,7 @@ class RedshiftCommonQuery:
AND n.nspname != 'information_schema' AND n.nspname != 'information_schema'
""" """
external_tables_query = """ external_tables_query = f"""
SELECT 'EXTERNAL_TABLE' as tabletype, SELECT 'EXTERNAL_TABLE' as tabletype,
NULL AS "schema_oid", NULL AS "schema_oid",
schemaname AS "schema", schemaname AS "schema",
@ -142,10 +144,11 @@ class RedshiftCommonQuery:
serde_parameters, serde_parameters,
NULL as table_description NULL as table_description
FROM pg_catalog.svv_external_tables FROM pg_catalog.svv_external_tables
WHERE redshift_database_name='{database}'
ORDER BY "schema", ORDER BY "schema",
"relname" "relname"
""" """
shared_database_tables_query = """ shared_database_tables_query = f"""
SELECT table_type as tabletype, SELECT table_type as tabletype,
NULL AS "schema_oid", NULL AS "schema_oid",
schema_name AS "schema", schema_name AS "schema",
@ -164,6 +167,7 @@ class RedshiftCommonQuery:
NULL as serde_parameters, NULL as serde_parameters,
NULL as table_description NULL as table_description
FROM svv_redshift_tables FROM svv_redshift_tables
WHERE database_name='{database}'
ORDER BY "schema", ORDER BY "schema",
"relname" "relname"
""" """
@ -175,9 +179,11 @@ class RedshiftCommonQuery:
return f"{tables_query} UNION {external_tables_query}" return f"{tables_query} UNION {external_tables_query}"
@staticmethod @staticmethod
def list_columns(is_shared_database: bool = False) -> str: def list_columns(
database_name: str, schema_name: str, is_shared_database: bool = False
) -> str:
if is_shared_database: if is_shared_database:
return """ return f"""
SELECT SELECT
schema_name as "schema", schema_name as "schema",
table_name as "table_name", table_name as "table_name",
@ -198,9 +204,10 @@ class RedshiftCommonQuery:
null as "table_oid" null as "table_oid"
FROM SVV_REDSHIFT_COLUMNS FROM SVV_REDSHIFT_COLUMNS
WHERE 1 and schema = '{schema_name}' WHERE 1 and schema = '{schema_name}'
AND database_name = '{database_name}'
ORDER BY "schema", "table_name", "attnum" ORDER BY "schema", "table_name", "attnum"
""" """
return """ return f"""
SELECT SELECT
n.nspname as "schema", n.nspname as "schema",
c.relname as "table_name", c.relname as "table_name",
@ -275,6 +282,7 @@ class RedshiftCommonQuery:
null as "table_oid" null as "table_oid"
FROM SVV_EXTERNAL_COLUMNS FROM SVV_EXTERNAL_COLUMNS
WHERE 1 and schema = '{schema_name}' WHERE 1 and schema = '{schema_name}'
AND redshift_database_name = '{database_name}'
ORDER BY "schema", "table_name", "attnum" ORDER BY "schema", "table_name", "attnum"
""" """

View File

@ -366,7 +366,7 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource):
self.db = self.data_dictionary.get_database_details(connection, database) self.db = self.data_dictionary.get_database_details(connection, database)
self.report.is_shared_database = ( self.report.is_shared_database = (
self.db is not None and self.db.is_shared_database self.db is not None and self.db.is_shared_database()
) )
with self.report.new_stage(METADATA_EXTRACTION): with self.report.new_stage(METADATA_EXTRACTION):
self.db_tables[database] = defaultdict() self.db_tables[database] = defaultdict()
@ -508,6 +508,7 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource):
schema_columns: Dict[str, Dict[str, List[RedshiftColumn]]] = {} schema_columns: Dict[str, Dict[str, List[RedshiftColumn]]] = {}
schema_columns[schema.name] = self.data_dictionary.get_columns_for_schema( schema_columns[schema.name] = self.data_dictionary.get_columns_for_schema(
conn=connection, conn=connection,
database=database,
schema=schema, schema=schema,
is_shared_database=self.report.is_shared_database, is_shared_database=self.report.is_shared_database,
) )
@ -829,9 +830,12 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource):
domain_config=self.config.domain, domain_config=self.config.domain,
) )
def cache_tables_and_views(self, connection, database): def cache_tables_and_views(
self, connection: redshift_connector.Connection, database: str
) -> None:
tables, views = self.data_dictionary.get_tables_and_views( tables, views = self.data_dictionary.get_tables_and_views(
conn=connection, conn=connection,
database=database,
skip_external_tables=self.config.skip_external_tables, skip_external_tables=self.config.skip_external_tables,
is_shared_database=self.report.is_shared_database, is_shared_database=self.report.is_shared_database,
) )
@ -982,7 +986,7 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource):
self.datashares_helper.to_platform_resource(list(outbound_shares)) self.datashares_helper.to_platform_resource(list(outbound_shares))
) )
if self.db and self.db.is_shared_database: if self.db and self.db.is_shared_database():
inbound_share = self.db.get_inbound_share() inbound_share = self.db.get_inbound_share()
if inbound_share is None: if inbound_share is None:
self.report.warning( self.report.warning(
@ -996,8 +1000,8 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource):
): ):
lineage_extractor.aggregator.add(known_lineage) lineage_extractor.aggregator.add(known_lineage)
# TODO: distinguish between definition level lineage and audit log based lineage # TODO: distinguish between definition level lineage and audit log based lineage.
# definition level lineage should never be skipped # Definition level lineage should never be skipped
if not self._should_ingest_lineage(): if not self._should_ingest_lineage():
return return

View File

@ -42,7 +42,6 @@ class RedshiftTable(BaseTable):
serde_parameters: Optional[str] = None serde_parameters: Optional[str] = None
last_altered: Optional[datetime] = None last_altered: Optional[datetime] = None
@property
def is_external_table(self) -> bool: def is_external_table(self) -> bool:
return self.type == "EXTERNAL_TABLE" return self.type == "EXTERNAL_TABLE"
@ -56,7 +55,6 @@ class RedshiftView(BaseTable):
size_in_bytes: Optional[int] = None size_in_bytes: Optional[int] = None
rows_count: Optional[int] = None rows_count: Optional[int] = None
@property
def is_external_table(self) -> bool: def is_external_table(self) -> bool:
return self.type == "EXTERNAL_TABLE" return self.type == "EXTERNAL_TABLE"
@ -71,10 +69,28 @@ class RedshiftSchema:
external_platform: Optional[str] = None external_platform: Optional[str] = None
external_database: Optional[str] = None external_database: Optional[str] = None
@property
def is_external_schema(self) -> bool: def is_external_schema(self) -> bool:
return self.type == "external" return self.type == "external"
def get_upstream_schema_name(self) -> Optional[str]:
"""Gets the schema name from the external schema option.
Returns:
Optional[str]: The schema name from the external schema option
if this is an external schema and has a valid option format, None otherwise.
"""
if not self.is_external_schema() or not self.option:
return None
# For external schema on redshift, option is in form
# {"SCHEMA":"tickit"}
schema_match = re.search(r'"SCHEMA"\s*:\s*"([^"]*)"', self.option)
if not schema_match:
return None
else:
return schema_match.group(1)
@dataclass @dataclass
class PartialInboundDatashare: class PartialInboundDatashare:
@ -117,7 +133,6 @@ class RedshiftDatabase:
type: str type: str
options: Optional[str] = None options: Optional[str] = None
@property
def is_shared_database(self) -> bool: def is_shared_database(self) -> bool:
return self.type == "shared" return self.type == "shared"
@ -128,7 +143,7 @@ class RedshiftDatabase:
def get_inbound_share( def get_inbound_share(
self, self,
) -> Optional[Union[InboundDatashare, PartialInboundDatashare]]: ) -> Optional[Union[InboundDatashare, PartialInboundDatashare]]:
if not self.is_shared_database or not self.options: if not self.is_shared_database() or not self.options:
return None return None
# Convert into single regex ?? # Convert into single regex ??
@ -323,6 +338,7 @@ class RedshiftDataDictionary:
def get_tables_and_views( def get_tables_and_views(
self, self,
conn: redshift_connector.Connection, conn: redshift_connector.Connection,
database: str,
skip_external_tables: bool = False, skip_external_tables: bool = False,
is_shared_database: bool = False, is_shared_database: bool = False,
) -> Tuple[Dict[str, List[RedshiftTable]], Dict[str, List[RedshiftView]]]: ) -> Tuple[Dict[str, List[RedshiftTable]], Dict[str, List[RedshiftView]]]:
@ -336,6 +352,7 @@ class RedshiftDataDictionary:
cur = RedshiftDataDictionary.get_query_result( cur = RedshiftDataDictionary.get_query_result(
conn, conn,
RedshiftCommonQuery.list_tables( RedshiftCommonQuery.list_tables(
database=database,
skip_external_tables=skip_external_tables, skip_external_tables=skip_external_tables,
is_shared_database=is_shared_database, is_shared_database=is_shared_database,
), ),
@ -484,14 +501,17 @@ class RedshiftDataDictionary:
@staticmethod @staticmethod
def get_columns_for_schema( def get_columns_for_schema(
conn: redshift_connector.Connection, conn: redshift_connector.Connection,
database: str,
schema: RedshiftSchema, schema: RedshiftSchema,
is_shared_database: bool = False, is_shared_database: bool = False,
) -> Dict[str, List[RedshiftColumn]]: ) -> Dict[str, List[RedshiftColumn]]:
cursor = RedshiftDataDictionary.get_query_result( cursor = RedshiftDataDictionary.get_query_result(
conn, conn,
RedshiftCommonQuery.list_columns( RedshiftCommonQuery.list_columns(
is_shared_database=is_shared_database database_name=database,
).format(schema_name=schema.name), schema_name=schema.name,
is_shared_database=is_shared_database,
),
) )
table_columns: Dict[str, List[RedshiftColumn]] = {} table_columns: Dict[str, List[RedshiftColumn]] = {}

View File

@ -16,7 +16,10 @@ from datahub.ingestion.source.redshift.datashares import (
RedshiftTable, RedshiftTable,
RedshiftView, RedshiftView,
) )
from datahub.ingestion.source.redshift.redshift_schema import PartialInboundDatashare from datahub.ingestion.source.redshift.redshift_schema import (
PartialInboundDatashare,
RedshiftDatabase,
)
from datahub.ingestion.source.redshift.report import RedshiftReport from datahub.ingestion.source.redshift.report import RedshiftReport
from datahub.metadata.schema_classes import ( from datahub.metadata.schema_classes import (
PlatformResourceInfoClass, PlatformResourceInfoClass,
@ -484,3 +487,47 @@ class TestDatasharesHelper:
list(report.warnings)[0].title list(report.warnings)[0].title
== "Downstream lineage to outbound datashare may not work" == "Downstream lineage to outbound datashare may not work"
) )
def test_database_get_inbound_datashare_success(self):
db = RedshiftDatabase(
name="db",
type="shared",
options='{"datashare_name":"xxx","datashare_producer_account":"1234","datashare_producer_namespace":"yyy"}',
)
assert db.get_inbound_share() == InboundDatashare(
share_name="xxx",
producer_namespace="yyy",
consumer_database="db",
)
def test_database_get_partial_inbound_datashare_success(self):
db = RedshiftDatabase(
name="db",
type="shared",
options='{"datashare_name":"xxx","datashare_producer_account":"1234","datashare_producer_namespace":"yy',
)
assert db.get_inbound_share() == PartialInboundDatashare(
share_name="xxx",
producer_namespace_prefix="yy",
consumer_database="db",
)
def test_database_no_inbound_datashare(self):
db = RedshiftDatabase(
name="db",
type="local",
options=None,
)
assert db.get_inbound_share() is None
def test_shared_database_no_inbound_datashare(self):
db = RedshiftDatabase(
name="db",
type="shared",
options=None,
)
assert db.get_inbound_share() is None

View File

@ -15,7 +15,10 @@ from datahub.ingestion.source.redshift.lineage import (
RedshiftLineageExtractor, RedshiftLineageExtractor,
parse_alter_table_rename, parse_alter_table_rename,
) )
from datahub.ingestion.source.redshift.redshift_schema import TempTableRow from datahub.ingestion.source.redshift.redshift_schema import (
RedshiftSchema,
TempTableRow,
)
from datahub.ingestion.source.redshift.report import RedshiftReport from datahub.ingestion.source.redshift.report import RedshiftReport
from datahub.metadata.schema_classes import NumberTypeClass, SchemaFieldDataTypeClass from datahub.metadata.schema_classes import NumberTypeClass, SchemaFieldDataTypeClass
from datahub.sql_parsing.schema_resolver import SchemaResolver from datahub.sql_parsing.schema_resolver import SchemaResolver
@ -794,3 +797,39 @@ def test_collapse_temp_recursive_cll_lineage_with_circular_reference():
assert len(datasets) == 1 assert len(datasets) == 1
# Here we only interested if it fails or not # Here we only interested if it fails or not
def test_external_schema_get_upstream_schema_success():
schema = RedshiftSchema(
name="schema",
database="XXXXXXXX",
type="external",
option='{"SCHEMA":"sales_schema"}',
external_platform="redshift",
)
assert schema.get_upstream_schema_name() == "sales_schema"
def test_external_schema_no_upstream_schema():
schema = RedshiftSchema(
name="schema",
database="XXXXXXXX",
type="external",
option=None,
external_platform="redshift",
)
assert schema.get_upstream_schema_name() is None
def test_local_schema_no_upstream_schema():
schema = RedshiftSchema(
name="schema",
database="XXXXXXXX",
type="local",
option='{"some_other_option":"x"}',
external_platform=None,
)
assert schema.get_upstream_schema_name() is None