From b643206bba7ad5ee66e3cc6f930fb836768ae361 Mon Sep 17 00:00:00 2001 From: Mayur Singal <39544459+ulixius9@users.noreply.github.com> Date: Fri, 15 Mar 2024 00:52:41 +0530 Subject: [PATCH] Fix #11905: Automated lineage between external table and container snowflake (#15537) --- .../ingestion/ometa/mixins/es_mixin.py | 85 +++++++++++++++++-- .../source/database/database_service.py | 10 +++ .../source/database/snowflake/metadata.py | 70 +++++++++++++++ .../source/database/snowflake/queries.py | 4 + .../ingestion/source/storage/s3/metadata.py | 25 +++++- .../ingestion/source/storage/s3/models.py | 4 + .../unit/topology/storage/test_storage.py | 2 + .../service/jdbi3/ContainerRepository.java | 1 + .../resources/storages/ContainerResource.java | 1 + .../search/indexes/ContainerIndex.java | 1 + .../en/container_index_mapping.json | 3 + .../json/schema/api/data/createContainer.json | 4 + .../json/schema/entity/data/container.json | 4 + .../json/schema/type/entityLineage.json | 2 +- .../ui/src/constants/Lineage.constants.ts | 1 + 15 files changed, 206 insertions(+), 11 deletions(-) diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py index 0e2540b1fd6..afbca655de4 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py @@ -24,6 +24,7 @@ from requests.utils import quote from metadata.generated.schema.api.createEventPublisherJob import ( CreateEventPublisherJob, ) +from metadata.generated.schema.entity.data.container import Container from metadata.generated.schema.entity.data.query import Query from metadata.generated.schema.system.eventPublisherJob import EventPublisherResult from metadata.ingestion.ometa.client import REST, APIError @@ -45,7 +46,7 @@ class ESMixin(Generic[T]): client: REST fqdn_search = ( - "/search/fieldQuery?fieldName=fullyQualifiedName&fieldValue={fqn}&from={from_}" + "/search/fieldQuery?fieldName={field_name}&fieldValue={field_value}&from={from_}" "&size={size}&index={index}" ) @@ -105,17 +106,83 @@ class ESMixin(Generic[T]): fields: Optional[str] = None, ) -> Optional[List[T]]: """ - Given a service_name and some filters, search for entities using ES + Given a service name and filters, search for entities using Elasticsearch. - :param entity_type: Entity to look for - :param fqn_search_string: string used to search by FQN. E.g., service.*.schema.table - :param from_count: Records to expect - :param size: Number of records - :param fields: Comma separated list of fields to be returned - :return: List of entities + Args: + entity_type (Type[T]): The type of entity to look for. + fqn_search_string (str): The string used to search by fully qualified name (FQN). + Example: "service.*.schema.table". + from_count (int): The starting index of the search results. + size (int): The maximum number of records to return. + fields (Optional[str]): Comma-separated list of fields to be returned. + + Returns: + Optional[List[T]]: A list of entities that match the search criteria, or None if no entities are found. + """ + return self._es_search_entity( + entity_type=entity_type, + field_value=fqn_search_string, + field_name="fullyQualifiedName", + from_count=from_count, + size=size, + fields=fields, + ) + + def es_search_container_by_path( + self, + full_path: str, + from_count: int = 0, + size: int = 10, + fields: Optional[str] = None, + ) -> Optional[List[Container]]: + """ + Given a service name and filters, search for containers using Elasticsearch. + + Args: + entity_type (Type[T]): The type of entity to look for. + full_path (str): The string used to search by full path. + from_count (int): The starting index of the search results. + size (int): The maximum number of records to return. + fields (Optional[str]): Comma-separated list of fields to be returned. + + Returns: + Optional[List[Container]]: A list of containers that match the search criteria, or None if no entities are found. + """ + return self._es_search_entity( + entity_type=Container, + field_value=full_path, + field_name="fullPath", + from_count=from_count, + size=size, + fields=fields, + ) + + def _es_search_entity( + self, + entity_type: Type[T], + field_value: str, + field_name: str, + from_count: int = 0, + size: int = 10, + fields: Optional[str] = None, + ) -> Optional[List[T]]: + """ + Search for entities using Elasticsearch. + + Args: + entity_type (Type[T]): The type of entity to look for. + field_value (str): The value to search for in the specified field. + field_name (str): The name of the field to search in. + from_count (int, optional): The starting index of the search results. Defaults to 0. + size (int, optional): The maximum number of search results to return. Defaults to 10. + fields (Optional[str], optional): Comma-separated list of fields to be returned. Defaults to None. + + Returns: + Optional[List[T]]: A list of entities that match the search criteria, or None if no entities are found. """ query_string = self.fqdn_search.format( - fqn=fqn_search_string, + field_name=field_name, + field_value=field_value, from_=from_count, size=size, index=ES_INDEX_MAP[entity_type.__name__], # Fail if not exists diff --git a/ingestion/src/metadata/ingestion/source/database/database_service.py b/ingestion/src/metadata/ingestion/source/database/database_service.py index 84865698de6..f06ab8eebbd 100644 --- a/ingestion/src/metadata/ingestion/source/database/database_service.py +++ b/ingestion/src/metadata/ingestion/source/database/database_service.py @@ -170,6 +170,11 @@ class DatabaseServiceTopology(ServiceTopology): consumer=["database_service", "database", "database_schema"], use_cache=True, ), + NodeStage( + type_=AddLineageRequest, + processor="yield_external_table_lineage", + nullable=True, + ), NodeStage( type_=OMetaLifeCycleData, processor="yield_life_cycle_data", @@ -578,6 +583,11 @@ class DatabaseServiceSource( Get the life cycle data of the table """ + def yield_external_table_lineage(self, _) -> Iterable[Either[AddLineageRequest]]: + """ + Process external table lineage + """ + def test_connection(self) -> None: test_connection_fn = get_test_connection_fn(self.service_connection) test_connection_fn(self.metadata, self.connection_obj, self.service_connection) diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py index c79a5257a09..a1480464c9b 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py @@ -24,12 +24,14 @@ from sqlparse.sql import Function, Identifier from metadata.generated.schema.api.data.createStoredProcedure import ( CreateStoredProcedureRequest, ) +from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.storedProcedure import StoredProcedureCode from metadata.generated.schema.entity.data.table import ( PartitionColumnDetails, PartitionIntervalTypes, + Table, TablePartition, TableType, ) @@ -43,6 +45,8 @@ from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) from metadata.generated.schema.type.basic import EntityName, SourceUrl +from metadata.generated.schema.type.entityLineage import EntitiesEdge +from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification @@ -67,6 +71,7 @@ from metadata.ingestion.source.database.snowflake.queries import ( SNOWFLAKE_GET_CURRENT_ACCOUNT, SNOWFLAKE_GET_DATABASE_COMMENTS, SNOWFLAKE_GET_DATABASES, + SNOWFLAKE_GET_EXTERNAL_LOCATIONS, SNOWFLAKE_GET_ORGANIZATION_NAME, SNOWFLAKE_GET_SCHEMA_COMMENTS, SNOWFLAKE_GET_STORED_PROCEDURE_QUERIES, @@ -141,6 +146,7 @@ class SnowflakeSource( self.partition_details = {} self.schema_desc_map = {} self.database_desc_map = {} + self.external_location_map = {} self._account: Optional[str] = None self._org_name: Optional[str] = None @@ -199,16 +205,28 @@ class SnowflakeSource( ] = row.CLUSTERING_KEY def set_schema_description_map(self) -> None: + self.schema_desc_map.clear() results = self.engine.execute(SNOWFLAKE_GET_SCHEMA_COMMENTS).all() for row in results: self.schema_desc_map[(row.DATABASE_NAME, row.SCHEMA_NAME)] = row.COMMENT def set_database_description_map(self) -> None: + self.database_desc_map.clear() if not self.database_desc_map: results = self.engine.execute(SNOWFLAKE_GET_DATABASE_COMMENTS).all() for row in results: self.database_desc_map[row.DATABASE_NAME] = row.COMMENT + def set_external_location_map(self, database_name: str) -> None: + self.external_location_map.clear() + results = self.engine.execute( + SNOWFLAKE_GET_EXTERNAL_LOCATIONS.format(database_name=database_name) + ).all() + self.external_location_map = { + (row.database_name, row.schema_name, row.name): row.location + for row in results + } + def get_schema_description(self, schema_name: str) -> Optional[str]: """ Method to fetch the schema description @@ -238,6 +256,7 @@ class SnowflakeSource( self.set_partition_details() self.set_schema_description_map() self.set_database_description_map() + self.set_external_location_map(configured_db) yield configured_db else: for new_database in self.get_database_names_raw(): @@ -263,6 +282,7 @@ class SnowflakeSource( self.set_partition_details() self.set_schema_description_map() self.set_database_description_map() + self.set_external_location_map(new_database) yield new_database except Exception as exc: logger.debug(traceback.format_exc()) @@ -606,3 +626,53 @@ class SnowflakeSource( ) return queries_dict + + def yield_external_table_lineage( + self, table_name_and_type: Tuple[str, str] + ) -> Iterable[AddLineageRequest]: + """ + Yield external table lineage + """ + table_name, table_type = table_name_and_type + location = self.external_location_map.get( + (self.context.database, self.context.database_schema, table_name) + ) + if table_type == TableType.External and location: + location_entity = self.metadata.es_search_container_by_path( + full_path=location + ) + + table_fqn = fqn.build( + self.metadata, + entity_type=Table, + service_name=self.context.database_service, + database_name=self.context.database, + schema_name=self.context.database_schema, + table_name=table_name, + skip_es_search=True, + ) + table_entity = self.metadata.es_search_from_fqn( + entity_type=Table, + fqn_search_string=table_fqn, + ) + + if ( + location_entity + and location_entity[0] + and table_entity + and table_entity[0] + ): + yield Either( + right=AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference( + id=location_entity[0].id, + type="container", + ), + toEntity=EntityReference( + id=table_entity[0].id, + type="table", + ), + ) + ) + ) diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/queries.py b/ingestion/src/metadata/ingestion/source/database/snowflake/queries.py index 2006e7e3220..3052156c663 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/queries.py @@ -115,6 +115,10 @@ SNOWFLAKE_GET_DATABASE_COMMENTS = """ select DATABASE_NAME,COMMENT from information_schema.databases """ +SNOWFLAKE_GET_EXTERNAL_LOCATIONS = """ +SHOW EXTERNAL TABLES IN DATABASE "{database_name}" +""" + SNOWFLAKE_TEST_FETCH_TAG = """ select TAG_NAME from snowflake.account_usage.tag_references limit 1 """ diff --git a/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py b/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py index 863fb26c9fc..067aa33b729 100644 --- a/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py +++ b/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py @@ -187,6 +187,7 @@ class S3Source(StorageServiceSource): parent=container_details.parent, sourceUrl=container_details.sourceUrl, fileFormats=container_details.file_formats, + fullPath=container_details.fullPath, ) yield Either(right=container_request) self.register_record(container_request=container_request) @@ -213,9 +214,12 @@ class S3Source(StorageServiceSource): client=self.s3_client, ) if columns: + prefix = ( + f"{KEY_SEPARATOR}{metadata_entry.dataPath.strip(KEY_SEPARATOR)}" + ) return S3ContainerDetails( name=metadata_entry.dataPath.strip(KEY_SEPARATOR), - prefix=f"{KEY_SEPARATOR}{metadata_entry.dataPath.strip(KEY_SEPARATOR)}", + prefix=prefix, creation_date=bucket_response.creation_date.isoformat(), number_of_objects=self._fetch_metric( bucket_name=bucket_name, metric=S3Metric.NUMBER_OF_OBJECTS @@ -228,6 +232,7 @@ class S3Source(StorageServiceSource): isPartitioned=metadata_entry.isPartitioned, columns=columns ), parent=parent, + fullPath=self._get_full_path(bucket_name, prefix), sourceUrl=self._get_object_source_url( bucket_name=bucket_name, prefix=metadata_entry.dataPath.strip(KEY_SEPARATOR), @@ -336,9 +341,27 @@ class S3Source(StorageServiceSource): ), file_formats=[], data_model=None, + fullPath=self._get_full_path(bucket_name=bucket_response.name), sourceUrl=self._get_bucket_source_url(bucket_name=bucket_response.name), ) + def _clean_path(self, path: str) -> str: + return path.strip(KEY_SEPARATOR) + + def _get_full_path(self, bucket_name: str, prefix: str = None) -> Optional[str]: + """ + Method to get the full path of the file + """ + if bucket_name is None: + return None + + full_path = f"s3://{self._clean_path(bucket_name)}" + + if prefix: + full_path += f"/{self._clean_path(prefix)}" + + return full_path + def _get_sample_file_path( self, bucket_name: str, metadata_entry: MetadataEntry ) -> Optional[str]: diff --git a/ingestion/src/metadata/ingestion/source/storage/s3/models.py b/ingestion/src/metadata/ingestion/source/storage/s3/models.py index fe8d5f97d82..a7b5fc14fa8 100644 --- a/ingestion/src/metadata/ingestion/source/storage/s3/models.py +++ b/ingestion/src/metadata/ingestion/source/storage/s3/models.py @@ -78,3 +78,7 @@ class S3ContainerDetails(BaseModel): sourceUrl: Optional[basic.SourceUrl] = Field( None, description="Source URL of the container." ) + + fullPath: Optional[str] = Field( + None, description="Full path of the container/file." + ) diff --git a/ingestion/tests/unit/topology/storage/test_storage.py b/ingestion/tests/unit/topology/storage/test_storage.py index 3920821dd32..75a988ee60a 100644 --- a/ingestion/tests/unit/topology/storage/test_storage.py +++ b/ingestion/tests/unit/topology/storage/test_storage.py @@ -236,6 +236,7 @@ class StorageUnitTest(TestCase): sourceUrl=SourceUrl( __root__="https://s3.console.aws.amazon.com/s3/buckets/test_bucket?region=us-east-1&tab=objects" ), + fullPath="s3://test_bucket", ), self.object_store_source._generate_unstructured_container( bucket_response=bucket_response @@ -280,6 +281,7 @@ class StorageUnitTest(TestCase): sourceUrl=SourceUrl( __root__="https://s3.console.aws.amazon.com/s3/buckets/test_bucket?region=us-east-1&prefix=transactions/&showversions=false" ), + fullPath="s3://test_bucket/transactions", ), self.object_store_source._generate_container_details( S3BucketResponse( diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ContainerRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ContainerRepository.java index 86289933e59..61c1b097953 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ContainerRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ContainerRepository.java @@ -294,6 +294,7 @@ public class ContainerRepository extends EntityRepository { recordChange( "size", original.getSize(), updated.getSize(), false, EntityUtil.objectMatch, false); recordChange("sourceUrl", original.getSourceUrl(), updated.getSourceUrl()); + recordChange("fullPath", original.getFullPath(), updated.getFullPath()); recordChange("retentionPeriod", original.getRetentionPeriod(), updated.getRetentionPeriod()); recordChange("sourceHash", original.getSourceHash(), updated.getSourceHash()); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/storages/ContainerResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/storages/ContainerResource.java index dbca7eac928..4b26470f7f8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/storages/ContainerResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/storages/ContainerResource.java @@ -523,6 +523,7 @@ public class ContainerResource extends EntityResource