Fix #11905: Automated lineage between external table and container snowflake (#15537)

This commit is contained in:
Mayur Singal 2024-03-15 00:52:41 +05:30 committed by GitHub
parent 0006988143
commit b643206bba
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 206 additions and 11 deletions

View File

@ -24,6 +24,7 @@ from requests.utils import quote
from metadata.generated.schema.api.createEventPublisherJob import (
CreateEventPublisherJob,
)
from metadata.generated.schema.entity.data.container import Container
from metadata.generated.schema.entity.data.query import Query
from metadata.generated.schema.system.eventPublisherJob import EventPublisherResult
from metadata.ingestion.ometa.client import REST, APIError
@ -45,7 +46,7 @@ class ESMixin(Generic[T]):
client: REST
fqdn_search = (
"/search/fieldQuery?fieldName=fullyQualifiedName&fieldValue={fqn}&from={from_}"
"/search/fieldQuery?fieldName={field_name}&fieldValue={field_value}&from={from_}"
"&size={size}&index={index}"
)
@ -105,17 +106,83 @@ class ESMixin(Generic[T]):
fields: Optional[str] = None,
) -> Optional[List[T]]:
"""
Given a service_name and some filters, search for entities using ES
Given a service name and filters, search for entities using Elasticsearch.
:param entity_type: Entity to look for
:param fqn_search_string: string used to search by FQN. E.g., service.*.schema.table
:param from_count: Records to expect
:param size: Number of records
:param fields: Comma separated list of fields to be returned
:return: List of entities
Args:
entity_type (Type[T]): The type of entity to look for.
fqn_search_string (str): The string used to search by fully qualified name (FQN).
Example: "service.*.schema.table".
from_count (int): The starting index of the search results.
size (int): The maximum number of records to return.
fields (Optional[str]): Comma-separated list of fields to be returned.
Returns:
Optional[List[T]]: A list of entities that match the search criteria, or None if no entities are found.
"""
return self._es_search_entity(
entity_type=entity_type,
field_value=fqn_search_string,
field_name="fullyQualifiedName",
from_count=from_count,
size=size,
fields=fields,
)
def es_search_container_by_path(
self,
full_path: str,
from_count: int = 0,
size: int = 10,
fields: Optional[str] = None,
) -> Optional[List[Container]]:
"""
Given a service name and filters, search for containers using Elasticsearch.
Args:
entity_type (Type[T]): The type of entity to look for.
full_path (str): The string used to search by full path.
from_count (int): The starting index of the search results.
size (int): The maximum number of records to return.
fields (Optional[str]): Comma-separated list of fields to be returned.
Returns:
Optional[List[Container]]: A list of containers that match the search criteria, or None if no entities are found.
"""
return self._es_search_entity(
entity_type=Container,
field_value=full_path,
field_name="fullPath",
from_count=from_count,
size=size,
fields=fields,
)
def _es_search_entity(
self,
entity_type: Type[T],
field_value: str,
field_name: str,
from_count: int = 0,
size: int = 10,
fields: Optional[str] = None,
) -> Optional[List[T]]:
"""
Search for entities using Elasticsearch.
Args:
entity_type (Type[T]): The type of entity to look for.
field_value (str): The value to search for in the specified field.
field_name (str): The name of the field to search in.
from_count (int, optional): The starting index of the search results. Defaults to 0.
size (int, optional): The maximum number of search results to return. Defaults to 10.
fields (Optional[str], optional): Comma-separated list of fields to be returned. Defaults to None.
Returns:
Optional[List[T]]: A list of entities that match the search criteria, or None if no entities are found.
"""
query_string = self.fqdn_search.format(
fqn=fqn_search_string,
field_name=field_name,
field_value=field_value,
from_=from_count,
size=size,
index=ES_INDEX_MAP[entity_type.__name__], # Fail if not exists

View File

@ -170,6 +170,11 @@ class DatabaseServiceTopology(ServiceTopology):
consumer=["database_service", "database", "database_schema"],
use_cache=True,
),
NodeStage(
type_=AddLineageRequest,
processor="yield_external_table_lineage",
nullable=True,
),
NodeStage(
type_=OMetaLifeCycleData,
processor="yield_life_cycle_data",
@ -578,6 +583,11 @@ class DatabaseServiceSource(
Get the life cycle data of the table
"""
def yield_external_table_lineage(self, _) -> Iterable[Either[AddLineageRequest]]:
"""
Process external table lineage
"""
def test_connection(self) -> None:
test_connection_fn = get_test_connection_fn(self.service_connection)
test_connection_fn(self.metadata, self.connection_obj, self.service_connection)

View File

@ -24,12 +24,14 @@ from sqlparse.sql import Function, Identifier
from metadata.generated.schema.api.data.createStoredProcedure import (
CreateStoredProcedureRequest,
)
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.storedProcedure import StoredProcedureCode
from metadata.generated.schema.entity.data.table import (
PartitionColumnDetails,
PartitionIntervalTypes,
Table,
TablePartition,
TableType,
)
@ -43,6 +45,8 @@ from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.basic import EntityName, SourceUrl
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
@ -67,6 +71,7 @@ from metadata.ingestion.source.database.snowflake.queries import (
SNOWFLAKE_GET_CURRENT_ACCOUNT,
SNOWFLAKE_GET_DATABASE_COMMENTS,
SNOWFLAKE_GET_DATABASES,
SNOWFLAKE_GET_EXTERNAL_LOCATIONS,
SNOWFLAKE_GET_ORGANIZATION_NAME,
SNOWFLAKE_GET_SCHEMA_COMMENTS,
SNOWFLAKE_GET_STORED_PROCEDURE_QUERIES,
@ -141,6 +146,7 @@ class SnowflakeSource(
self.partition_details = {}
self.schema_desc_map = {}
self.database_desc_map = {}
self.external_location_map = {}
self._account: Optional[str] = None
self._org_name: Optional[str] = None
@ -199,16 +205,28 @@ class SnowflakeSource(
] = row.CLUSTERING_KEY
def set_schema_description_map(self) -> None:
self.schema_desc_map.clear()
results = self.engine.execute(SNOWFLAKE_GET_SCHEMA_COMMENTS).all()
for row in results:
self.schema_desc_map[(row.DATABASE_NAME, row.SCHEMA_NAME)] = row.COMMENT
def set_database_description_map(self) -> None:
self.database_desc_map.clear()
if not self.database_desc_map:
results = self.engine.execute(SNOWFLAKE_GET_DATABASE_COMMENTS).all()
for row in results:
self.database_desc_map[row.DATABASE_NAME] = row.COMMENT
def set_external_location_map(self, database_name: str) -> None:
self.external_location_map.clear()
results = self.engine.execute(
SNOWFLAKE_GET_EXTERNAL_LOCATIONS.format(database_name=database_name)
).all()
self.external_location_map = {
(row.database_name, row.schema_name, row.name): row.location
for row in results
}
def get_schema_description(self, schema_name: str) -> Optional[str]:
"""
Method to fetch the schema description
@ -238,6 +256,7 @@ class SnowflakeSource(
self.set_partition_details()
self.set_schema_description_map()
self.set_database_description_map()
self.set_external_location_map(configured_db)
yield configured_db
else:
for new_database in self.get_database_names_raw():
@ -263,6 +282,7 @@ class SnowflakeSource(
self.set_partition_details()
self.set_schema_description_map()
self.set_database_description_map()
self.set_external_location_map(new_database)
yield new_database
except Exception as exc:
logger.debug(traceback.format_exc())
@ -606,3 +626,53 @@ class SnowflakeSource(
)
return queries_dict
def yield_external_table_lineage(
self, table_name_and_type: Tuple[str, str]
) -> Iterable[AddLineageRequest]:
"""
Yield external table lineage
"""
table_name, table_type = table_name_and_type
location = self.external_location_map.get(
(self.context.database, self.context.database_schema, table_name)
)
if table_type == TableType.External and location:
location_entity = self.metadata.es_search_container_by_path(
full_path=location
)
table_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name=self.context.database_service,
database_name=self.context.database,
schema_name=self.context.database_schema,
table_name=table_name,
skip_es_search=True,
)
table_entity = self.metadata.es_search_from_fqn(
entity_type=Table,
fqn_search_string=table_fqn,
)
if (
location_entity
and location_entity[0]
and table_entity
and table_entity[0]
):
yield Either(
right=AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id=location_entity[0].id,
type="container",
),
toEntity=EntityReference(
id=table_entity[0].id,
type="table",
),
)
)
)

View File

@ -115,6 +115,10 @@ SNOWFLAKE_GET_DATABASE_COMMENTS = """
select DATABASE_NAME,COMMENT from information_schema.databases
"""
SNOWFLAKE_GET_EXTERNAL_LOCATIONS = """
SHOW EXTERNAL TABLES IN DATABASE "{database_name}"
"""
SNOWFLAKE_TEST_FETCH_TAG = """
select TAG_NAME from snowflake.account_usage.tag_references limit 1
"""

View File

@ -187,6 +187,7 @@ class S3Source(StorageServiceSource):
parent=container_details.parent,
sourceUrl=container_details.sourceUrl,
fileFormats=container_details.file_formats,
fullPath=container_details.fullPath,
)
yield Either(right=container_request)
self.register_record(container_request=container_request)
@ -213,9 +214,12 @@ class S3Source(StorageServiceSource):
client=self.s3_client,
)
if columns:
prefix = (
f"{KEY_SEPARATOR}{metadata_entry.dataPath.strip(KEY_SEPARATOR)}"
)
return S3ContainerDetails(
name=metadata_entry.dataPath.strip(KEY_SEPARATOR),
prefix=f"{KEY_SEPARATOR}{metadata_entry.dataPath.strip(KEY_SEPARATOR)}",
prefix=prefix,
creation_date=bucket_response.creation_date.isoformat(),
number_of_objects=self._fetch_metric(
bucket_name=bucket_name, metric=S3Metric.NUMBER_OF_OBJECTS
@ -228,6 +232,7 @@ class S3Source(StorageServiceSource):
isPartitioned=metadata_entry.isPartitioned, columns=columns
),
parent=parent,
fullPath=self._get_full_path(bucket_name, prefix),
sourceUrl=self._get_object_source_url(
bucket_name=bucket_name,
prefix=metadata_entry.dataPath.strip(KEY_SEPARATOR),
@ -336,9 +341,27 @@ class S3Source(StorageServiceSource):
),
file_formats=[],
data_model=None,
fullPath=self._get_full_path(bucket_name=bucket_response.name),
sourceUrl=self._get_bucket_source_url(bucket_name=bucket_response.name),
)
def _clean_path(self, path: str) -> str:
return path.strip(KEY_SEPARATOR)
def _get_full_path(self, bucket_name: str, prefix: str = None) -> Optional[str]:
"""
Method to get the full path of the file
"""
if bucket_name is None:
return None
full_path = f"s3://{self._clean_path(bucket_name)}"
if prefix:
full_path += f"/{self._clean_path(prefix)}"
return full_path
def _get_sample_file_path(
self, bucket_name: str, metadata_entry: MetadataEntry
) -> Optional[str]:

View File

@ -78,3 +78,7 @@ class S3ContainerDetails(BaseModel):
sourceUrl: Optional[basic.SourceUrl] = Field(
None, description="Source URL of the container."
)
fullPath: Optional[str] = Field(
None, description="Full path of the container/file."
)

View File

@ -236,6 +236,7 @@ class StorageUnitTest(TestCase):
sourceUrl=SourceUrl(
__root__="https://s3.console.aws.amazon.com/s3/buckets/test_bucket?region=us-east-1&tab=objects"
),
fullPath="s3://test_bucket",
),
self.object_store_source._generate_unstructured_container(
bucket_response=bucket_response
@ -280,6 +281,7 @@ class StorageUnitTest(TestCase):
sourceUrl=SourceUrl(
__root__="https://s3.console.aws.amazon.com/s3/buckets/test_bucket?region=us-east-1&prefix=transactions/&showversions=false"
),
fullPath="s3://test_bucket/transactions",
),
self.object_store_source._generate_container_details(
S3BucketResponse(

View File

@ -294,6 +294,7 @@ public class ContainerRepository extends EntityRepository<Container> {
recordChange(
"size", original.getSize(), updated.getSize(), false, EntityUtil.objectMatch, false);
recordChange("sourceUrl", original.getSourceUrl(), updated.getSourceUrl());
recordChange("fullPath", original.getFullPath(), updated.getFullPath());
recordChange("retentionPeriod", original.getRetentionPeriod(), updated.getRetentionPeriod());
recordChange("sourceHash", original.getSourceHash(), updated.getSourceHash());
}

View File

@ -523,6 +523,7 @@ public class ContainerResource extends EntityResource<Container, ContainerReposi
.withPrefix(create.getPrefix())
.withNumberOfObjects(create.getNumberOfObjects())
.withSize(create.getSize())
.withFullPath(create.getFullPath())
.withFileFormats(create.getFileFormats())
.withSourceUrl(create.getSourceUrl())
.withSourceHash(create.getSourceHash());

View File

@ -63,6 +63,7 @@ public record ContainerIndex(Container container) implements ColumnIndex {
doc.put("column_suggest", columnSuggest);
doc.put("entityType", Entity.CONTAINER);
doc.put("serviceType", container.getServiceType());
doc.put("fullPath", container.getFullPath());
doc.put("lineage", SearchIndex.getLineageData(container.getEntityReference()));
doc.put(
"fqnParts",

View File

@ -97,6 +97,9 @@
"sourceUrl": {
"type": "keyword"
},
"fullPath": {
"type": "keyword"
},
"lineage": {
"type" : "object"
},

View File

@ -76,6 +76,10 @@
"description": "Source URL of container.",
"$ref": "../../type/basic.json#/definitions/sourceUrl"
},
"fullPath": {
"description": "Full path of the container/file.",
"type": "string"
},
"domain" : {
"description": "Fully qualified name of the domain the Container belongs to.",
"type": "string"

View File

@ -169,6 +169,10 @@
"description": "Source URL of container.",
"$ref": "../../type/basic.json#/definitions/sourceUrl"
},
"fullPath": {
"description": "Full path of the container/file.",
"type": "string"
},
"domain" : {
"description": "Domain the Container belongs to. When not set, the Container inherits the domain from the storage service it belongs to.",
"$ref": "../../type/entityReference.json"

View File

@ -52,7 +52,7 @@
"source": {
"description": "Lineage type describes how a lineage was created.",
"type": "string",
"enum": ["Manual", "ViewLineage", "QueryLineage", "PipelineLineage", "DashboardLineage", "DbtLineage", "SparkLineage", "OpenLineage"],
"enum": ["Manual", "ViewLineage", "QueryLineage", "PipelineLineage", "DashboardLineage", "DbtLineage", "SparkLineage", "OpenLineage", "ExternalTableLineage"],
"default": "Manual"
}
}

View File

@ -84,4 +84,5 @@ export const LINEAGE_SOURCE: { [key in Source]: string } = {
[Source.SparkLineage]: 'Spark Lineage',
[Source.ViewLineage]: 'View Lineage',
[Source.OpenLineage]: 'OpenLineage',
[Source.ExternalTableLineage]: 'External Table Lineage',
};