From 4714f46f11a949fac21d36a4bcf711fcc33ba870 Mon Sep 17 00:00:00 2001 From: Mayuri Nehate <33225191+mayurinehate@users.noreply.github.com> Date: Thu, 6 Mar 2025 19:37:18 +0530 Subject: [PATCH] feat(ingest/redshift): support for datashares lineage (#12660) Co-authored-by: Harshal Sheth --- .../docs/sources/redshift/README.md | 0 .../docs/sources/redshift/redshift_pre.md | 104 ++++ .../api/entities/common/serialized_value.py | 7 +- .../ingestion/source/redshift/config.py | 4 + .../ingestion/source/redshift/datashares.py | 236 +++++++++ .../ingestion/source/redshift/lineage.py | 8 +- .../ingestion/source/redshift/lineage_v2.py | 11 +- .../ingestion/source/redshift/profile.py | 2 +- .../ingestion/source/redshift/query.py | 158 ++++-- .../ingestion/source/redshift/redshift.py | 113 ++-- .../source/redshift/redshift_schema.py | 172 ++++++- .../ingestion/source/redshift/report.py | 3 + .../unit/redshift/test_redshift_datashares.py | 486 ++++++++++++++++++ 13 files changed, 1183 insertions(+), 121 deletions(-) delete mode 100644 metadata-ingestion/docs/sources/redshift/README.md create mode 100644 metadata-ingestion/docs/sources/redshift/redshift_pre.md create mode 100644 metadata-ingestion/src/datahub/ingestion/source/redshift/datashares.py create mode 100644 metadata-ingestion/tests/unit/redshift/test_redshift_datashares.py diff --git a/metadata-ingestion/docs/sources/redshift/README.md b/metadata-ingestion/docs/sources/redshift/README.md deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/metadata-ingestion/docs/sources/redshift/redshift_pre.md b/metadata-ingestion/docs/sources/redshift/redshift_pre.md new file mode 100644 index 0000000000..ad54de6811 --- /dev/null +++ b/metadata-ingestion/docs/sources/redshift/redshift_pre.md @@ -0,0 +1,104 @@ +### Prerequisites + +This source needs to access system tables that require extra permissions. +To grant these permissions, please alter your datahub Redshift user the following way: +```sql +ALTER USER datahub_user WITH SYSLOG ACCESS UNRESTRICTED; +GRANT SELECT ON pg_catalog.svv_table_info to datahub_user; +GRANT SELECT ON pg_catalog.svl_user_info to datahub_user; +``` + +To ingest datashares lineage, ingestion user for both producer and consumer namespace would need alter/share +access to datashare. See [svv_datashares](https://docs.aws.amazon.com/redshift/latest/dg/r_SVV_DATASHARES.html) +docs for more information. +```sql +GRANT SHARE ON to datahub_user +``` + +:::note + +Giving a user unrestricted access to system tables gives the user visibility to data generated by other users. For example, STL_QUERY and STL_QUERYTEXT contain the full text of INSERT, UPDATE, and DELETE statements. + +::: + +### Concept mapping +| Source Concept | DataHub Concept | Notes | +| -------------- | --------------------------------------------------------- | ------------------ | +| `"redshift"` | [Data Platform](../../metamodel/entities/dataPlatform.md) | | +| Database | [Container](../../metamodel/entities/container.md) | Subtype `Database` | +| Schema | [Container](../../metamodel/entities/container.md) | Subtype `Schema` | +| Table | [Dataset](../../metamodel/entities/dataset.md) | Subtype `Table` | +| View | [Dataset](../../metamodel/entities/dataset.md) | Subtype `View` | + + +### Ingestion of multiple redshift databases, namespaces + +- If multiple databases are present in the Redshift namespace (or provisioned cluster), +you would need to set up a separate ingestion per database. + +- Ingestion recipes of all databases in a particular redshift namespace should use same platform instance. + +- If you've multiple redshift namespaces that you want to ingest within DataHub, it is highly recommended that +you specify a platform_instance equivalent to namespace in recipe. It can be same as namespace id or other +human readable name however it should be unique across all your redshift namespaces. + + +### Lineage + +There are multiple lineage collector implementations as Redshift does not support table lineage out of the box. + +#### stl_scan_based +The stl_scan based collector uses Redshift's [stl_insert](https://docs.aws.amazon.com/redshift/latest/dg/r_STL_INSERT.html) and [stl_scan](https://docs.aws.amazon.com/redshift/latest/dg/r_STL_SCAN.html) system tables to +discover lineage between tables. +Pros: +- Fast +- Reliable + +Cons: +- Does not work with Spectrum/external tables because those scans do not show up in stl_scan table. +- If a table is depending on a view then the view won't be listed as dependency. Instead the table will be connected with the view's dependencies. + +#### sql_based +The sql_based based collector uses Redshift's [stl_insert](https://docs.aws.amazon.com/redshift/latest/dg/r_STL_INSERT.html) to discover all the insert queries +and uses sql parsing to discover the dependencies. + +Pros: +- Works with Spectrum tables +- Views are connected properly if a table depends on it + +Cons: +- Slow. +- Less reliable as the query parser can fail on certain queries + +#### mixed +Using both collector above and first applying the sql based and then the stl_scan based one. + +Pros: +- Works with Spectrum tables +- Views are connected properly if a table depends on it +- A bit more reliable than the sql_based one only + +Cons: +- Slow +- May be incorrect at times as the query parser can fail on certain queries + +:::note + +The redshift stl redshift tables which are used for getting data lineage retain at most seven days of log history, and sometimes closer to 2-5 days. This means you cannot extract lineage from queries issued outside that window. + +::: + +### Datashares Lineage +This is enabled by default, can be disabled via setting `include_share_lineage: False` + +It is mandatory to run redshift ingestion of datashare producer namespace at least once so that lineage +shows up correctly after datashare consumer namespace is ingested. + +### Profiling +Profiling runs sql queries on the redshift cluster to get statistics about the tables. To be able to do that, the user needs to have read access to the tables that should be profiled. + +If you don't want to grant read access to the tables you can enable table level profiling which will get table statistics without reading the data. +```yaml +profiling: + profile_table_level_only: true +``` \ No newline at end of file diff --git a/metadata-ingestion/src/datahub/api/entities/common/serialized_value.py b/metadata-ingestion/src/datahub/api/entities/common/serialized_value.py index 0f13ea04ab..feed42cd2f 100644 --- a/metadata-ingestion/src/datahub/api/entities/common/serialized_value.py +++ b/metadata-ingestion/src/datahub/api/entities/common/serialized_value.py @@ -1,6 +1,6 @@ import json import logging -from typing import Dict, Optional, Type, Union +from typing import Dict, Optional, Type, TypeVar, Union from avrogen.dict_wrapper import DictWrapper from pydantic import BaseModel @@ -13,6 +13,7 @@ logger = logging.getLogger(__name__) _REMAPPED_SCHEMA_TYPES = { k.replace("pegasus2avro.", ""): v for k, v in SCHEMA_TYPES.items() } +T = TypeVar("T", bound=BaseModel) class SerializedResourceValue(BaseModel): @@ -83,8 +84,8 @@ class SerializedResourceValue(BaseModel): ) def as_pydantic_object( - self, model_type: Type[BaseModel], validate_schema_ref: bool = False - ) -> BaseModel: + self, model_type: Type[T], validate_schema_ref: bool = False + ) -> T: """ Parse the blob into a Pydantic-defined Python object based on the schema type and schema ref. diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py index 932ada0a90..80b834edb3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py @@ -128,6 +128,10 @@ class RedshiftConfig( default=True, description="Whether lineage should be collected from copy commands", ) + include_share_lineage: bool = Field( + default=True, + description="Whether lineage should be collected from datashares", + ) include_usage_statistics: bool = Field( default=False, diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/datashares.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/datashares.py new file mode 100644 index 0000000000..a4e7d509fd --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/datashares.py @@ -0,0 +1,236 @@ +from typing import Dict, Iterable, List, Optional, Union + +from pydantic import BaseModel + +from datahub.api.entities.platformresource.platform_resource import ( + ElasticPlatformResourceQuery, + PlatformResource, + PlatformResourceKey, + PlatformResourceSearchFields, +) +from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.graph.client import DataHubGraph +from datahub.ingestion.source.redshift.config import RedshiftConfig +from datahub.ingestion.source.redshift.redshift_schema import ( + InboundDatashare, + OutboundDatashare, + PartialInboundDatashare, + RedshiftTable, + RedshiftView, +) +from datahub.ingestion.source.redshift.report import RedshiftReport +from datahub.sql_parsing.sql_parsing_aggregator import KnownLineageMapping +from datahub.utilities.search_utils import LogicalOperator + + +class OutboundSharePlatformResource(BaseModel): + namespace: str + platform_instance: Optional[str] + env: str + source_database: str + share_name: str + + def get_key(self) -> str: + return f"{self.namespace}.{self.share_name}" + + +PLATFORM_RESOURCE_TYPE = "OUTBOUND_DATASHARE" + + +class RedshiftDatasharesHelper: + """ + Redshift datashares lineage generation relies on PlatformResource entity + to identify the producer namespace and its platform_instance and env + + Ingestion of any database in namespace will + A. generate PlatformResource entity for all outbound shares in namespace. + B. generate lineage with upstream tables from another namespace, if the database + is created from an inbound share + + """ + + def __init__( + self, + config: RedshiftConfig, + report: RedshiftReport, + graph: Optional[DataHubGraph], + ): + self.platform = "redshift" + self.config = config + self.report = report + self.graph = graph + + def to_platform_resource( + self, shares: List[OutboundDatashare] + ) -> Iterable[MetadataChangeProposalWrapper]: + if not shares: + self.report.outbound_shares_count = 0 + return + + self.report.outbound_shares_count = len(shares) + # Producer namespace will be current namespace for all + # outbound data shares + + for share in shares: + producer_namespace = share.producer_namespace + try: + platform_resource_key = PlatformResourceKey( + platform=self.platform, + platform_instance=self.config.platform_instance, + resource_type=PLATFORM_RESOURCE_TYPE, + primary_key=share.get_key(), + ) + + value = OutboundSharePlatformResource( + namespace=producer_namespace, + platform_instance=self.config.platform_instance, + env=self.config.env, + source_database=share.source_database, + share_name=share.share_name, + ) + + platform_resource = PlatformResource.create( + key=platform_resource_key, + value=value, + secondary_keys=[share.share_name, share.producer_namespace], + ) + + yield from platform_resource.to_mcps() + + except Exception as exc: + self.report.warning( + title="Downstream lineage to outbound datashare may not work", + message="Failed to generate platform resource for outbound datashares", + context=f"Namespace {share.producer_namespace} Share {share.share_name}", + exc=exc, + ) + + def generate_lineage( + self, + share: Union[InboundDatashare, PartialInboundDatashare], + tables: Dict[str, List[Union[RedshiftTable, RedshiftView]]], + ) -> Iterable[KnownLineageMapping]: + upstream_share = self.find_upstream_share(share) + + if not upstream_share: + return + + for schema in tables: + for table in tables[schema]: + dataset_urn = self.gen_dataset_urn( + f"{share.consumer_database}.{schema}.{table.name}", + self.config.platform_instance, + self.config.env, + ) + + upstream_dataset_urn = self.gen_dataset_urn( + f"{upstream_share.source_database}.{schema}.{table.name}", + upstream_share.platform_instance, + upstream_share.env, + ) + + yield KnownLineageMapping( + upstream_urn=upstream_dataset_urn, downstream_urn=dataset_urn + ) + + def find_upstream_share( + self, share: Union[InboundDatashare, PartialInboundDatashare] + ) -> Optional[OutboundSharePlatformResource]: + if not self.graph: + self.report.warning( + title="Upstream lineage of inbound datashare will be missing", + message="Missing datahub graph. Either use the datahub-rest sink or " + "set the top-level datahub_api config in the recipe", + ) + else: + resources = self.get_platform_resources(self.graph, share) + + if len(resources) == 0 or ( + not any( + [ + resource.resource_info is not None + and resource.resource_info.resource_type + == PLATFORM_RESOURCE_TYPE + for resource in resources + ] + ) + ): + self.report.info( + title="Upstream lineage of inbound datashare will be missing", + message="Missing platform resource for share. " + "Setup redshift ingestion for namespace if not already done. If ingestion is setup, " + "check whether ingestion user has ALTER/SHARE permission to share.", + context=share.get_description(), + ) + else: + # Ideally we should get only one resource as primary key is namespace+share + # and type is "OUTBOUND_DATASHARE" + for resource in resources: + try: + assert ( + resource.resource_info is not None + and resource.resource_info.value is not None + ) + return resource.resource_info.value.as_pydantic_object( + OutboundSharePlatformResource, True + ) + except Exception as e: + self.report.warning( + title="Upstream lineage of inbound datashare will be missing", + message="Failed to parse platform resource for outbound datashare", + context=share.get_description(), + exc=e, + ) + + return None + + def get_platform_resources( + self, + graph: DataHubGraph, + share: Union[InboundDatashare, PartialInboundDatashare], + ) -> List[PlatformResource]: + # NOTE: ideally we receive InboundDatashare and not PartialInboundDatashare. + # however due to varchar(128) type of database table that captures datashare options + # we may receive only partial information about inbound share + # Alternate option to get InboundDatashare using svv_datashares requires superuser + if isinstance(share, PartialInboundDatashare): + return list( + PlatformResource.search_by_filters( + graph, + ElasticPlatformResourceQuery.create_from() + .group(LogicalOperator.AND) + .add_field_match( + PlatformResourceSearchFields.RESOURCE_TYPE, + PLATFORM_RESOURCE_TYPE, + ) + .add_field_match( + PlatformResourceSearchFields.PLATFORM, self.platform + ) + .add_field_match( + PlatformResourceSearchFields.SECONDARY_KEYS, + share.share_name, + ) + .add_wildcard( + PlatformResourceSearchFields.SECONDARY_KEYS.field_name, + f"{share.producer_namespace_prefix}*", + ) + .end(), + ) + ) + return list( + PlatformResource.search_by_key( + graph, key=share.get_key(), primary=True, is_exact=True + ) + ) + + # TODO: Refactor and move to new RedshiftIdentifierBuilder class + def gen_dataset_urn( + self, datahub_dataset_name: str, platform_instance: Optional[str], env: str + ) -> str: + return make_dataset_urn_with_platform_instance( + platform=self.platform, + name=datahub_dataset_name, + platform_instance=platform_instance, + env=env, + ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py index 4b3d238a13..3b9b069902 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py @@ -813,9 +813,13 @@ class RedshiftLineageExtractor: ) tablename = table.name - if table.type == "EXTERNAL_TABLE": + if ( + table.is_external_table + and schema.is_external_schema + and schema.external_platform + ): # external_db_params = schema.option - upstream_platform = schema.type.lower() + upstream_platform = schema.external_platform.lower() catalog_upstream = UpstreamClass( mce_builder.make_dataset_urn_with_platform_instance( upstream_platform, diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py index 4b84c25965..1f43d0c3bf 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py @@ -401,11 +401,14 @@ class RedshiftSqlLineageV2(Closeable): ) -> None: for schema_name, tables in all_tables[self.database].items(): for table in tables: - if table.type == "EXTERNAL_TABLE": - schema = db_schemas[self.database][schema_name] - + schema = db_schemas[self.database][schema_name] + if ( + table.is_external_table + and schema.is_external_schema + and schema.external_platform + ): # external_db_params = schema.option - upstream_platform = schema.type.lower() + upstream_platform = schema.external_platform.lower() table_urn = mce_builder.make_dataset_urn_with_platform_instance( self.platform, diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/profile.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/profile.py index 6f611fa674..4ef7906267 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/profile.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/profile.py @@ -48,7 +48,7 @@ class RedshiftProfiler(GenericProfiler): if not self.config.schema_pattern.allowed(schema): continue for table in tables[db].get(schema, {}): - if table.type == "EXTERNAL_TABLE": + if table.is_external_table: if not self.config.profiling.profile_external_tables: # Case 1: If user did not tell us to profile external tables, simply log this. self.report.profiling_skipped_other[schema] += 1 diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py index 62f7d0a390..0c101dbb5e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py @@ -31,40 +31,62 @@ class RedshiftCommonQuery: AND (datname <> ('template1')::name) """ - list_schemas: str = """SELECT distinct n.nspname AS "schema_name", - 'local' as schema_type, - null as schema_owner_name, - '' as schema_option, - null as external_database - FROM pg_catalog.pg_class c - LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace - JOIN pg_catalog.pg_user u ON u.usesysid = c.relowner - WHERE c.relkind IN ('r','v','m','S','f') - AND n.nspname !~ '^pg_' - AND n.nspname != 'information_schema' -UNION ALL -SELECT schemaname as schema_name, - CASE s.eskind - WHEN '1' THEN 'GLUE' - WHEN '2' THEN 'HIVE' - WHEN '3' THEN 'POSTGRES' - WHEN '4' THEN 'REDSHIFT' - ELSE 'OTHER' - END as schema_type, - -- setting user_name to null as we don't use it now now and it breaks backward compatibility due to additional permission need - -- usename as schema_owner_name, - null as schema_owner_name, - esoptions as schema_option, - databasename as external_database + # NOTE: although schema owner id is available in tables, we do not use it + # as getting username from id requires access to pg_catalog.pg_user_info + # which is available only to superusers. + # NOTE: Need union here instead of using svv_all_schemas, in order to get + # external platform related lineage + # NOTE: Using database_name filter for svv_redshift_schemas, as otherwise + # schemas from other shared databases also show up. + @staticmethod + def list_schemas(database: str) -> str: + return f""" + SELECT + schema_name, + schema_type, + schema_option, + cast(null as varchar(256)) as external_platform, + cast(null as varchar(256)) as external_database + FROM svv_redshift_schemas + WHERE database_name = '{database}' + AND schema_name != 'pg_catalog' and schema_name != 'information_schema' + UNION ALL + SELECT + schemaname as schema_name, + 'external' as schema_type, + esoptions as schema_option, + CASE s.eskind + WHEN '1' THEN 'GLUE' + WHEN '2' THEN 'HIVE' + WHEN '3' THEN 'POSTGRES' + WHEN '4' THEN 'REDSHIFT' + ELSE 'OTHER' + END as external_platform, + databasename as external_database FROM SVV_EXTERNAL_SCHEMAS as s - -- inner join pg_catalog.pg_user_info as i on i.usesysid = s.esowner ORDER BY SCHEMA_NAME; """ + @staticmethod + def get_database_details(database): + return f"""\ + select + database_name, + database_type, + database_options + from svv_redshift_databases + where database_name='{database}';""" + + # NOTE: although table owner id is available in tables, we do not use it + # as getting username from id requires access to pg_catalog.pg_user_info + # which is available only to superusers. + # NOTE: Tables from shared database are not available in pg_catalog.pg_class @staticmethod def list_tables( - skip_external_tables: bool = False, + skip_external_tables: bool = False, is_shared_database: bool = False ) -> str: + # NOTE: it looks like description is available only in pg_description + # So this remains preferrred way tables_query = """ SELECT CASE c.relkind WHEN 'r' THEN 'TABLE' @@ -83,8 +105,6 @@ SELECT schemaname as schema_name, WHEN 8 THEN 'ALL' END AS "diststyle", c.relowner AS "owner_id", - -- setting user_name to null as we don't use it now now and it breaks backward compatibility due to additional permission need - -- u.usename AS "owner_name", null as "owner_name", TRIM(TRAILING ';' FROM pg_catalog.pg_get_viewdef (c.oid,TRUE)) AS "view_definition", pg_catalog.array_to_string(c.relacl,'\n') AS "privileges", @@ -98,11 +118,11 @@ SELECT schemaname as schema_name, LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace LEFT JOIN pg_class_info as ci on c.oid = ci.reloid LEFT JOIN pg_catalog.pg_description pgd ON pgd.objsubid = 0 AND pgd.objoid = c.oid - -- JOIN pg_catalog.pg_user u ON u.usesysid = c.relowner WHERE c.relkind IN ('r','v','m','S','f') AND n.nspname !~ '^pg_' AND n.nspname != 'information_schema' """ + external_tables_query = """ SELECT 'EXTERNAL_TABLE' as tabletype, NULL AS "schema_oid", @@ -125,13 +145,62 @@ SELECT schemaname as schema_name, ORDER BY "schema", "relname" """ - if skip_external_tables: + shared_database_tables_query = """ + SELECT table_type as tabletype, + NULL AS "schema_oid", + schema_name AS "schema", + NULL AS "rel_oid", + table_name AS "relname", + NULL as "creation_time", + NULL AS "diststyle", + table_owner AS "owner_id", + NULL AS "owner_name", + NULL AS "view_definition", + table_acl AS "privileges", + NULL as "location", + NULL as parameters, + NULL as input_format, + NULL As output_format, + NULL as serde_parameters, + NULL as table_description + FROM svv_redshift_tables + ORDER BY "schema", + "relname" +""" + if is_shared_database: + return shared_database_tables_query + elif skip_external_tables: return tables_query else: return f"{tables_query} UNION {external_tables_query}" - # Why is this unused. Is this a bug? - list_columns: str = """ + @staticmethod + def list_columns(is_shared_database: bool = False) -> str: + if is_shared_database: + return """ + SELECT + schema_name as "schema", + table_name as "table_name", + column_name as "name", + encoding as "encode", + -- Spectrum represents data types differently. + -- Standardize, so we can infer types. + data_type AS "type", + distkey as "distkey", + sortkey as "sortkey", + (case when is_nullable = 'no' then TRUE else FALSE end) as "notnull", + null as "comment", + null as "adsrc", + ordinal_position as "attnum", + data_type AS "format_type", + column_default as "default", + null as "schema_oid", + null as "table_oid" + FROM SVV_REDSHIFT_COLUMNS + WHERE 1 and schema = '{schema_name}' + ORDER BY "schema", "table_name", "attnum" +""" + return """ SELECT n.nspname as "schema", c.relname as "table_name", @@ -362,6 +431,29 @@ ORDER BY target_schema, target_table, filename ) -> str: raise NotImplementedError + @staticmethod + def list_outbound_datashares() -> str: + return """SELECT \ + share_type, \ + share_name, \ + trim(producer_namespace) as producer_namespace, \ + source_database \ + FROM svv_datashares + WHERE share_type='OUTBOUND'\ + """ + + @staticmethod + def get_inbound_datashare(database: str) -> str: + return f"""SELECT \ + share_type, \ + share_name, \ + trim(producer_namespace) as producer_namespace, \ + consumer_database \ + FROM svv_datashares + WHERE share_type='INBOUND' + AND consumer_database= '{database}'\ + """ + class RedshiftProvisionedQuery(RedshiftCommonQuery): @staticmethod diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py index cce282c710..5a35700188 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py @@ -33,7 +33,10 @@ from datahub.ingestion.api.source import ( TestableSource, TestConnectionReport, ) -from datahub.ingestion.api.source_helpers import create_dataset_props_patch_builder +from datahub.ingestion.api.source_helpers import ( + auto_workunit, + create_dataset_props_patch_builder, +) from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.glossary.classification_mixin import ( ClassificationHandler, @@ -45,6 +48,7 @@ from datahub.ingestion.source.common.subtypes import ( DatasetSubTypes, ) from datahub.ingestion.source.redshift.config import RedshiftConfig +from datahub.ingestion.source.redshift.datashares import RedshiftDatasharesHelper from datahub.ingestion.source.redshift.exception import handle_redshift_exceptions_yield from datahub.ingestion.source.redshift.lineage import RedshiftLineageExtractor from datahub.ingestion.source.redshift.lineage_v2 import RedshiftSqlLineageV2 @@ -52,6 +56,7 @@ from datahub.ingestion.source.redshift.profile import RedshiftProfiler from datahub.ingestion.source.redshift.redshift_data_reader import RedshiftDataReader from datahub.ingestion.source.redshift.redshift_schema import ( RedshiftColumn, + RedshiftDatabase, RedshiftDataDictionary, RedshiftSchema, RedshiftTable, @@ -150,76 +155,6 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource): - Table, row, and column statistics via optional SQL profiling - Table lineage - Usage statistics - - ### Prerequisites - - This source needs to access system tables that require extra permissions. - To grant these permissions, please alter your datahub Redshift user the following way: - ```sql - ALTER USER datahub_user WITH SYSLOG ACCESS UNRESTRICTED; - GRANT SELECT ON pg_catalog.svv_table_info to datahub_user; - GRANT SELECT ON pg_catalog.svl_user_info to datahub_user; - ``` - - :::note - - Giving a user unrestricted access to system tables gives the user visibility to data generated by other users. For example, STL_QUERY and STL_QUERYTEXT contain the full text of INSERT, UPDATE, and DELETE statements. - - ::: - - ### Lineage - - There are multiple lineage collector implementations as Redshift does not support table lineage out of the box. - - #### stl_scan_based - The stl_scan based collector uses Redshift's [stl_insert](https://docs.aws.amazon.com/redshift/latest/dg/r_STL_INSERT.html) and [stl_scan](https://docs.aws.amazon.com/redshift/latest/dg/r_STL_SCAN.html) system tables to - discover lineage between tables. - Pros: - - Fast - - Reliable - - Cons: - - Does not work with Spectrum/external tables because those scans do not show up in stl_scan table. - - If a table is depending on a view then the view won't be listed as dependency. Instead the table will be connected with the view's dependencies. - - #### sql_based - The sql_based based collector uses Redshift's [stl_insert](https://docs.aws.amazon.com/redshift/latest/dg/r_STL_INSERT.html) to discover all the insert queries - and uses sql parsing to discover the dependencies. - - Pros: - - Works with Spectrum tables - - Views are connected properly if a table depends on it - - Cons: - - Slow. - - Less reliable as the query parser can fail on certain queries - - #### mixed - Using both collector above and first applying the sql based and then the stl_scan based one. - - Pros: - - Works with Spectrum tables - - Views are connected properly if a table depends on it - - A bit more reliable than the sql_based one only - - Cons: - - Slow - - May be incorrect at times as the query parser can fail on certain queries - - :::note - - The redshift stl redshift tables which are used for getting data lineage retain at most seven days of log history, and sometimes closer to 2-5 days. This means you cannot extract lineage from queries issued outside that window. - - ::: - - ### Profiling - Profiling runs sql queries on the redshift cluster to get statistics about the tables. To be able to do that, the user needs to have read access to the tables that should be profiled. - - If you don't want to grant read access to the tables you can enable table level profiling which will get table statistics without reading the data. - ```yaml - profiling: - profile_table_level_only: true - ``` """ # TODO: Replace with standardized types in sql_types.py @@ -330,6 +265,9 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource): self.config: RedshiftConfig = config self.report: RedshiftReport = RedshiftReport() self.classification_handler = ClassificationHandler(self.config, self.report) + self.datashares_helper = RedshiftDatasharesHelper( + self.config, self.report, self.ctx.graph + ) self.platform = "redshift" self.domain_registry = None if self.config.domain: @@ -361,6 +299,7 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource): is_serverless=self.config.is_serverless ) + self.db: Optional[RedshiftDatabase] = None self.db_tables: Dict[str, Dict[str, List[RedshiftTable]]] = {} self.db_views: Dict[str, Dict[str, List[RedshiftView]]] = {} self.db_schemas: Dict[str, Dict[str, RedshiftSchema]] = {} @@ -424,6 +363,11 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource): database = self.config.database logger.info(f"Processing db {database}") + + self.db = self.data_dictionary.get_database_details(connection, database) + self.report.is_shared_database = ( + self.db is not None and self.db.is_shared_database + ) with self.report.new_stage(METADATA_EXTRACTION): self.db_tables[database] = defaultdict() self.db_views[database] = defaultdict() @@ -563,7 +507,9 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource): schema_columns: Dict[str, Dict[str, List[RedshiftColumn]]] = {} schema_columns[schema.name] = self.data_dictionary.get_columns_for_schema( - conn=connection, schema=schema + conn=connection, + schema=schema, + is_shared_database=self.report.is_shared_database, ) if self.config.include_tables: @@ -887,6 +833,7 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource): tables, views = self.data_dictionary.get_tables_and_views( conn=connection, skip_external_tables=self.config.skip_external_tables, + is_shared_database=self.report.is_shared_database, ) for schema in tables: if not is_schema_allowed( @@ -1029,6 +976,28 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource): database: str, lineage_extractor: RedshiftSqlLineageV2, ) -> Iterable[MetadataWorkUnit]: + if self.config.include_share_lineage: + outbound_shares = self.data_dictionary.get_outbound_datashares(connection) + yield from auto_workunit( + self.datashares_helper.to_platform_resource(list(outbound_shares)) + ) + + if self.db and self.db.is_shared_database: + inbound_share = self.db.get_inbound_share() + if inbound_share is None: + self.report.warning( + title="Upstream lineage of inbound datashare will be missing", + message="Database options do not contain sufficient information", + context=f"Database: {database}, Options {self.db.options}", + ) + else: + for known_lineage in self.datashares_helper.generate_lineage( + inbound_share, self.get_all_tables()[database] + ): + lineage_extractor.aggregator.add(known_lineage) + + # TODO: distinguish between definition level lineage and audit log based lineage + # definition level lineage should never be skipped if not self._should_ingest_lineage(): return diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift_schema.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift_schema.py index 594f88dd52..73456d445d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift_schema.py @@ -1,7 +1,8 @@ import logging +import re from dataclasses import dataclass, field from datetime import datetime, timezone -from typing import Dict, Iterable, List, Optional, Tuple +from typing import Dict, Iterable, List, Optional, Tuple, Union import redshift_connector @@ -41,6 +42,10 @@ class RedshiftTable(BaseTable): serde_parameters: Optional[str] = None last_altered: Optional[datetime] = None + @property + def is_external_table(self) -> bool: + return self.type == "EXTERNAL_TABLE" + @dataclass class RedshiftView(BaseTable): @@ -51,6 +56,10 @@ class RedshiftView(BaseTable): size_in_bytes: Optional[int] = None rows_count: Optional[int] = None + @property + def is_external_table(self) -> bool: + return self.type == "EXTERNAL_TABLE" + @dataclass class RedshiftSchema: @@ -59,8 +68,102 @@ class RedshiftSchema: type: str owner: Optional[str] = None option: Optional[str] = None + external_platform: Optional[str] = None external_database: Optional[str] = None + @property + def is_external_schema(self) -> bool: + return self.type == "external" + + +@dataclass +class PartialInboundDatashare: + share_name: str + producer_namespace_prefix: str + consumer_database: str + + def get_description(self) -> str: + return ( + f"Namespace Prefix {self.producer_namespace_prefix} Share {self.share_name}" + ) + + +@dataclass +class OutboundDatashare: + share_name: str + producer_namespace: str + source_database: str + + def get_key(self) -> str: + return f"{self.producer_namespace}.{self.share_name}" + + +@dataclass +class InboundDatashare: + share_name: str + producer_namespace: str + consumer_database: str + + def get_key(self) -> str: + return f"{self.producer_namespace}.{self.share_name}" + + def get_description(self) -> str: + return f"Namespace {self.producer_namespace} Share {self.share_name}" + + +@dataclass +class RedshiftDatabase: + name: str + type: str + options: Optional[str] = None + + @property + def is_shared_database(self) -> bool: + return self.type == "shared" + + # NOTE: ideally options are in form + # {"datashare_name":"xxx","datashare_producer_account":"1234","datashare_producer_namespace":"yyy"} + # however due to varchar(128) type of database table that captures options + # we may receive only partial information about inbound share + def get_inbound_share( + self, + ) -> Optional[Union[InboundDatashare, PartialInboundDatashare]]: + if not self.is_shared_database or not self.options: + return None + + # Convert into single regex ?? + share_name_match = re.search(r'"datashare_name"\s*:\s*"([^"]*)"', self.options) + namespace_match = re.search( + r'"datashare_producer_namespace"\s*:\s*"([^"]*)"', self.options + ) + partial_namespace_match = re.search( + r'"datashare_producer_namespace"\s*:\s*"([^"]*)$', self.options + ) + + if not share_name_match: + # We will always at least get share name + return None + + share_name = share_name_match.group(1) + if namespace_match: + return InboundDatashare( + share_name=share_name, + producer_namespace=namespace_match.group(1), + consumer_database=self.name, + ) + elif partial_namespace_match: + return PartialInboundDatashare( + share_name=share_name, + producer_namespace_prefix=partial_namespace_match.group(1), + consumer_database=self.name, + ) + else: + return PartialInboundDatashare( + share_name=share_name, + producer_namespace_prefix="", + consumer_database=self.name, + ) + @dataclass class RedshiftExtraTableMeta: @@ -141,13 +244,31 @@ class RedshiftDataDictionary: return [db[0] for db in dbs] + @staticmethod + def get_database_details( + conn: redshift_connector.Connection, database: str + ) -> Optional[RedshiftDatabase]: + cursor = RedshiftDataDictionary.get_query_result( + conn, + RedshiftCommonQuery.get_database_details(database), + ) + + row = cursor.fetchone() + if row is None: + return None + return RedshiftDatabase( + name=database, + type=row[1], + options=row[2], + ) + @staticmethod def get_schemas( conn: redshift_connector.Connection, database: str ) -> List[RedshiftSchema]: cursor = RedshiftDataDictionary.get_query_result( conn, - RedshiftCommonQuery.list_schemas.format(database_name=database), + RedshiftCommonQuery.list_schemas(database), ) schemas = cursor.fetchall() @@ -158,8 +279,8 @@ class RedshiftDataDictionary: database=database, name=schema[field_names.index("schema_name")], type=schema[field_names.index("schema_type")], - owner=schema[field_names.index("schema_owner_name")], option=schema[field_names.index("schema_option")], + external_platform=schema[field_names.index("external_platform")], external_database=schema[field_names.index("external_database")], ) for schema in schemas @@ -203,6 +324,7 @@ class RedshiftDataDictionary: self, conn: redshift_connector.Connection, skip_external_tables: bool = False, + is_shared_database: bool = False, ) -> Tuple[Dict[str, List[RedshiftTable]], Dict[str, List[RedshiftView]]]: tables: Dict[str, List[RedshiftTable]] = {} views: Dict[str, List[RedshiftView]] = {} @@ -213,7 +335,10 @@ class RedshiftDataDictionary: cur = RedshiftDataDictionary.get_query_result( conn, - RedshiftCommonQuery.list_tables(skip_external_tables=skip_external_tables), + RedshiftCommonQuery.list_tables( + skip_external_tables=skip_external_tables, + is_shared_database=is_shared_database, + ), ) field_names = [i[0] for i in cur.description] db_tables = cur.fetchall() @@ -358,11 +483,15 @@ class RedshiftDataDictionary: @staticmethod def get_columns_for_schema( - conn: redshift_connector.Connection, schema: RedshiftSchema + conn: redshift_connector.Connection, + schema: RedshiftSchema, + is_shared_database: bool = False, ) -> Dict[str, List[RedshiftColumn]]: cursor = RedshiftDataDictionary.get_query_result( conn, - RedshiftCommonQuery.list_columns.format(schema_name=schema.name), + RedshiftCommonQuery.list_columns( + is_shared_database=is_shared_database + ).format(schema_name=schema.name), ) table_columns: Dict[str, List[RedshiftColumn]] = {} @@ -508,3 +637,34 @@ class RedshiftDataDictionary: start_time=row[field_names.index("start_time")], ) rows = cursor.fetchmany() + + @staticmethod + def get_outbound_datashares( + conn: redshift_connector.Connection, + ) -> Iterable[OutboundDatashare]: + cursor = conn.cursor() + cursor.execute(RedshiftCommonQuery.list_outbound_datashares()) + for item in cursor.fetchall(): + yield OutboundDatashare( + share_name=item[1], + producer_namespace=item[2], + source_database=item[3], + ) + + # NOTE: this is not used right now as it requires superuser privilege + # We can use this in future if the permissions are lowered. + @staticmethod + def get_inbound_datashare( + conn: redshift_connector.Connection, + database: str, + ) -> Optional[InboundDatashare]: + cursor = conn.cursor() + cursor.execute(RedshiftCommonQuery.get_inbound_datashare(database)) + item = cursor.fetchone() + if item: + return InboundDatashare( + share_name=item[1], + producer_namespace=item[2], + consumer_database=item[3], + ) + return None diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/report.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/report.py index 2748f2a588..047df69555 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/report.py @@ -60,5 +60,8 @@ class RedshiftReport( sql_aggregator: Optional[SqlAggregatorReport] = None lineage_phases_timer: Dict[str, PerfTimer] = field(default_factory=dict) + is_shared_database: bool = False + outbound_shares_count: Optional[int] = None + def report_dropped(self, key: str) -> None: self.filtered.append(key) diff --git a/metadata-ingestion/tests/unit/redshift/test_redshift_datashares.py b/metadata-ingestion/tests/unit/redshift/test_redshift_datashares.py new file mode 100644 index 0000000000..3167341f4a --- /dev/null +++ b/metadata-ingestion/tests/unit/redshift/test_redshift_datashares.py @@ -0,0 +1,486 @@ +import json +from typing import Dict, List, Union +from unittest.mock import patch + +from datahub.api.entities.platformresource.platform_resource import ( + PlatformResource, + PlatformResourceKey, +) +from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph +from datahub.ingestion.source.redshift.config import RedshiftConfig +from datahub.ingestion.source.redshift.datashares import ( + InboundDatashare, + OutboundDatashare, + OutboundSharePlatformResource, + RedshiftDatasharesHelper, + RedshiftTable, + RedshiftView, +) +from datahub.ingestion.source.redshift.redshift_schema import PartialInboundDatashare +from datahub.ingestion.source.redshift.report import RedshiftReport +from datahub.metadata.schema_classes import ( + PlatformResourceInfoClass, + SerializedValueClass, + SerializedValueContentTypeClass, +) +from datahub.sql_parsing.sql_parsing_aggregator import KnownLineageMapping + + +def get_redshift_config(): + return RedshiftConfig( + host_port="localhost:5439", + database="XXXXXXX", + username="XXXXXXXXX", + password="XXXX_password", + platform_instance="consumer_instance", + ) + + +def get_datahub_graph(): + """ + Mock DataHubGraph instance for testing purposes. + """ + graph = DataHubGraph(DatahubClientConfig(server="xxxx")) + return graph + + +class TestDatasharesHelper: + def test_generate_lineage_success(self): + """ + Test generate_lineage method when share and graph exist, resources are found, + and upstream namespace and database are successfully identified. + """ + # Setup + config = get_redshift_config() + report = RedshiftReport() + graph = get_datahub_graph() + helper = RedshiftDatasharesHelper(config, report, graph) + + # Mock input data + share = InboundDatashare( + producer_namespace="producer_namespace", + share_name="test_share", + consumer_database="consumer_db", + ) + tables: Dict[str, List[Union[RedshiftTable, RedshiftView]]] = { + "schema1": [ + RedshiftTable(name="table1", comment=None, created=None), + RedshiftTable(name="table2", comment=None, created=None), + ], + "schema2": [RedshiftTable(name="table3", comment=None, created=None)], + } + + # Mock PlatformResource.search_by_key + def mock_search_by_key(*args, **kwargs): + resource = PlatformResource.create( + key=PlatformResourceKey( + platform="redshift", + platform_instance="producer_instance", + resource_type="OUTBOUND_DATASHARE", + primary_key="producer_namespace.some_share", + ), + value=OutboundSharePlatformResource( + namespace="producer_namespace", + platform_instance="producer_instance", + env="PROD", + source_database="producer_db", + share_name="test_share", + ), + ) + + return [resource] + + with patch.object(PlatformResource, "search_by_key") as mocked_method: + mocked_method.side_effect = mock_search_by_key + result = list(helper.generate_lineage(share, tables)) + # Assert + assert len(result) == 3 + expected_mappings = [ + KnownLineageMapping( + upstream_urn="urn:li:dataset:(urn:li:dataPlatform:redshift,producer_instance.producer_db.schema1.table1,PROD)", + downstream_urn="urn:li:dataset:(urn:li:dataPlatform:redshift,consumer_instance.consumer_db.schema1.table1,PROD)", + ), + KnownLineageMapping( + upstream_urn="urn:li:dataset:(urn:li:dataPlatform:redshift,producer_instance.producer_db.schema1.table2,PROD)", + downstream_urn="urn:li:dataset:(urn:li:dataPlatform:redshift,consumer_instance.consumer_db.schema1.table2,PROD)", + ), + KnownLineageMapping( + upstream_urn="urn:li:dataset:(urn:li:dataPlatform:redshift,producer_instance.producer_db.schema2.table3,PROD)", + downstream_urn="urn:li:dataset:(urn:li:dataPlatform:redshift,consumer_instance.consumer_db.schema2.table3,PROD)", + ), + ] + assert result == expected_mappings + + def test_generate_lineage_success_partial_inbound_share(self): + """ + Test generate_lineage method when share and graph exist, resources are found, + and upstream namespace and database are successfully identified. + """ + # Setup + config = get_redshift_config() + report = RedshiftReport() + graph = get_datahub_graph() + helper = RedshiftDatasharesHelper(config, report, graph) + + # Mock input data + share = PartialInboundDatashare( + producer_namespace_prefix="producer_na", + share_name="test_share", + consumer_database="consumer_db", + ) + tables: Dict[str, List[Union[RedshiftTable, RedshiftView]]] = { + "schema1": [ + RedshiftTable(name="table1", comment=None, created=None), + RedshiftTable(name="table2", comment=None, created=None), + ], + "schema2": [RedshiftTable(name="table3", comment=None, created=None)], + } + + # Mock PlatformResource.search_by_key + def mock_search_by_filters(*args, **kwargs): + resource = PlatformResource.create( + key=PlatformResourceKey( + platform="redshift", + platform_instance="producer_instance", + resource_type="OUTBOUND_DATASHARE", + primary_key="producer_namespace.some_share", + ), + value=OutboundSharePlatformResource( + namespace="producer_namespace", + platform_instance="producer_instance", + env="PROD", + source_database="producer_db", + share_name="test_share", + ), + ) + + return [resource] + + with patch.object(PlatformResource, "search_by_filters") as mocked_method: + mocked_method.side_effect = mock_search_by_filters + result = list(helper.generate_lineage(share, tables)) + # Assert + assert len(result) == 3 + expected_mappings = [ + KnownLineageMapping( + upstream_urn="urn:li:dataset:(urn:li:dataPlatform:redshift,producer_instance.producer_db.schema1.table1,PROD)", + downstream_urn="urn:li:dataset:(urn:li:dataPlatform:redshift,consumer_instance.consumer_db.schema1.table1,PROD)", + ), + KnownLineageMapping( + upstream_urn="urn:li:dataset:(urn:li:dataPlatform:redshift,producer_instance.producer_db.schema1.table2,PROD)", + downstream_urn="urn:li:dataset:(urn:li:dataPlatform:redshift,consumer_instance.consumer_db.schema1.table2,PROD)", + ), + KnownLineageMapping( + upstream_urn="urn:li:dataset:(urn:li:dataPlatform:redshift,producer_instance.producer_db.schema2.table3,PROD)", + downstream_urn="urn:li:dataset:(urn:li:dataPlatform:redshift,consumer_instance.consumer_db.schema2.table3,PROD)", + ), + ] + assert result == expected_mappings + + def test_generate_lineage_missing_graph_reports_warning(self): + """ + Test generate_lineage when share is provided but graph is not available. + + This test verifies that the method correctly handles the case where an InboundDatashare + is provided, but the DataHubGraph is not available. It should set the + self.is_shared_database flag to True and log a warning about missing upstream lineage. + """ + # Setup + config = get_redshift_config() + report = RedshiftReport() + graph = None + helper = RedshiftDatasharesHelper(config, report, graph) + + share = InboundDatashare( + producer_namespace="test_namespace", + share_name="test_share", + consumer_database="test_db", + ) + tables: Dict[str, List[Union[RedshiftTable, RedshiftView]]] = {} + + # Execute + list(helper.generate_lineage(share, tables)) + + # Assert + assert len(report.warnings) == 1 + + assert ( + list(report.warnings)[0].title + == "Upstream lineage of inbound datashare will be missing" + ) + assert "Missing datahub graph" in list(report.warnings)[0].message + + def test_generate_lineage_missing_producer_platform_resource(self): + """ + Test generate_lineage when share is provided, graph exists, but no resources are found. + + This test verifies that the method handles the case where an inbound datashare is provided, + the DataHubGraph is available, but no platform resources are found for the producer namespace. + It should result in a warning being reported and no lineage mappings being generated. + """ + # Setup + config = get_redshift_config() + report = RedshiftReport() + graph = get_datahub_graph() + helper = RedshiftDatasharesHelper(config, report, graph) + + # Create a mock InboundDatashare + share = InboundDatashare( + share_name="test_share", + producer_namespace="test_namespace", + consumer_database="test_db", + ) + + # Create mock tables + tables: Dict[str, List[Union[RedshiftTable, RedshiftView]]] = { + "schema1": [RedshiftTable(name="table1", created=None, comment=None)] + } + + # Mock the PlatformResource.search_by_key to return an empty list + with patch.object(PlatformResource, "search_by_key") as mocked_method: + mocked_method.return_value = [] + result = list(helper.generate_lineage(share, tables)) + + # Assertions + assert len(result) == 0, "No lineage mappings should be generated" + assert len(report.infos) == 1 + assert ( + list(report.infos)[0].title + == "Upstream lineage of inbound datashare will be missing" + ) + assert "Missing platform resource" in list(report.infos)[0].message + + def test_generate_lineage_malformed_share_platform_resource(self): + """ + Test generate_lineage method when share and graph exist, resources are found, + but upstream_share is None due to error in parsing resource info. + + This test verifies that the method handles the case where the upstream namespace is found, + but we failed to parse the value. + It should result in a warning being reported and no lineage mappings being generated. + """ + # Setup + config = get_redshift_config() + report = RedshiftReport() + graph = get_datahub_graph() + helper = RedshiftDatasharesHelper(config, report, graph) + + share = InboundDatashare( + producer_namespace="produer_namespace", + share_name="test_share", + consumer_database="consumer_db", + ) + tables: Dict[str, List[Union[RedshiftTable, RedshiftView]]] = { + "schema1": [RedshiftTable(name="table1", comment=None, created=None)] + } + + # Mock PlatformResource.search_by_key to return a resource + def mock_search_by_key(*args, **kwargs): + resource = PlatformResource.create( + key=PlatformResourceKey( + platform="redshift", + platform_instance="producer_instance", + resource_type="OUTBOUND_DATASHARE", + primary_key="producer_namespace.some_share", + ), + value={ + "namespace": "producer_namespace", + "platform_instance": "producer_instance", + "env": "PROD", + "outbound_share_name_to_source_database": {}, # Empty dict to simulate missing share + }, + ) + + return [resource] + + with patch.object(PlatformResource, "search_by_key") as mocked_method: + mocked_method.side_effect = mock_search_by_key + result = list(helper.generate_lineage(share, tables)) + + # Assert + assert len(result) == 0 + assert len(report.warnings) == 1 + assert ( + list(report.warnings)[0].title + == "Upstream lineage of inbound datashare will be missing" + ) + assert ( + "Failed to parse platform resource for outbound datashare" + in list(report.warnings)[0].message + ) + + def test_generate_lineage_shared_database_with_no_tables(self): + """ + Test generate_lineage with valid share but empty tables dictionary. + """ + config = get_redshift_config() + report = RedshiftReport() + graph = get_datahub_graph() # Mock or actual instance + helper = RedshiftDatasharesHelper(config, report, graph) + + share = InboundDatashare( + producer_namespace="producer_namespace", + consumer_database="db", + share_name="share", + ) + tables: Dict[str, List[Union[RedshiftTable, RedshiftView]]] = {} + + with patch.object(PlatformResource, "search_by_key") as mocked_method: + mocked_method.return_value = [] + result = list(helper.generate_lineage(share, tables)) + + assert len(result) == 0 + + def test_to_platform_resource_success(self): + """ + Test the to_platform_resource method when shares list is not empty. + + This test verifies that the method correctly processes a non-empty list of OutboundDatashare objects, + generates the appropriate PlatformResource, and yields the expected MetadataChangeProposalWrapper objects. + It also checks that the outbound_shares_count in the report is set correctly. + """ + # Setup + config = RedshiftConfig( + host_port="test_host", + database="test_db", + username="test_user", + password="test_pass", + platform_instance="test_instance", + ) + report = RedshiftReport() + helper = RedshiftDatasharesHelper(config, report, None) + + # Create test data + shares = [ + OutboundDatashare( + producer_namespace="test_namespace", + share_name="share1", + source_database="db1", + ), + OutboundDatashare( + producer_namespace="test_namespace", + share_name="share2", + source_database="db2", + ), + ] + + # Execute the method + result = list(helper.to_platform_resource(shares)) + + # Assertions + assert len(result) > 0, ( + "Expected at least one MetadataChangeProposalWrapper to be yielded" + ) + assert report.outbound_shares_count == 2, ( + "Expected outbound_shares_count to be 2" + ) + + # Check the content of the first MetadataChangeProposalWrapper + first_mcp = result[0] + assert first_mcp.entityType == "platformResource", ( + "Expected entityType to be platformResource" + ) + assert first_mcp.aspectName == "platformResourceInfo", ( + "Expected aspectName to be platformResourceInfo" + ) + + info = first_mcp.aspect + assert isinstance(info, PlatformResourceInfoClass) + + assert info.resourceType == "OUTBOUND_DATASHARE" + assert info.primaryKey == "test_namespace.share1" + + assert isinstance(info.value, SerializedValueClass) + assert info.value.contentType == SerializedValueContentTypeClass.JSON + assert info.value.blob == json.dumps( + { + "namespace": "test_namespace", + "platform_instance": "test_instance", + "env": "PROD", + "source_database": "db1", + "share_name": "share1", + }, + ).encode("utf-8") + + # Check the content of the first MetadataChangeProposalWrapper + fourth_mcp = result[3] + assert fourth_mcp.entityType == "platformResource", ( + "Expected entityType to be platformResource" + ) + assert fourth_mcp.aspectName == "platformResourceInfo", ( + "Expected aspectName to be platformResourceInfo" + ) + + info = fourth_mcp.aspect + assert isinstance(info, PlatformResourceInfoClass) + + assert info.resourceType == "OUTBOUND_DATASHARE" + assert info.primaryKey == "test_namespace.share2" + + assert isinstance(info.value, SerializedValueClass) + assert info.value.contentType == SerializedValueContentTypeClass.JSON + assert info.value.blob == json.dumps( + { + "namespace": "test_namespace", + "platform_instance": "test_instance", + "env": "PROD", + "source_database": "db2", + "share_name": "share2", + }, + ).encode("utf-8") + + def test_to_platform_resource_edge_case_single_share(self): + """ + Test the to_platform_resource method with a single share. + This edge case should still produce a valid result. + """ + config = get_redshift_config() + report = RedshiftReport() + helper = RedshiftDatasharesHelper(config, report, None) + + share = OutboundDatashare( + producer_namespace="test", share_name="share1", source_database="db1" + ) + + result = list(helper.to_platform_resource([share])) + + assert len(result) > 0 + assert report.outbound_shares_count == 1 + + def test_to_platform_resource_empty_input(self): + """ + Test the to_platform_resource method with an empty list of shares. + This should set the outbound_shares_count to 0 and return an empty iterable. + """ + config = get_redshift_config() + report = RedshiftReport() + helper = RedshiftDatasharesHelper(config, report, None) + + result = list(helper.to_platform_resource([])) + + assert len(result) == 0 + assert report.outbound_shares_count == 0 + + def test_to_platform_resource_exception_handling(self): + """ + Test the exception handling in the to_platform_resource method. + This should catch the exception and add a warning to the report. + """ + config = get_redshift_config() + report = RedshiftReport() + helper = RedshiftDatasharesHelper(config, report, None) + + # Create a share with invalid data to trigger an exception + invalid_share = OutboundDatashare( + producer_namespace=None, # type:ignore + share_name="x", + source_database="y", + ) + + list(helper.to_platform_resource([invalid_share])) + + assert len(report.warnings) == 1 + assert ( + list(report.warnings)[0].title + == "Downstream lineage to outbound datashare may not work" + )