diff --git a/metadata-ingestion/integration_docs/great-expectations.md b/metadata-ingestion/integration_docs/great-expectations.md index b017d4bffa..6d8b475cc8 100644 --- a/metadata-ingestion/integration_docs/great-expectations.md +++ b/metadata-ingestion/integration_docs/great-expectations.md @@ -47,7 +47,8 @@ This integration does not support - `retry_status_codes` (optional): Retry HTTP request also on these status codes. - `retry_max_times` (optional): Maximum times to retry if HTTP request fails. The delay between retries is increased exponentially. - `extra_headers` (optional): Extra headers which will be added to the datahub request. - + - `parse_table_names_from_sql` (defaults to false): The integration can use an SQL parser to try to parse the datasets being asserted. This parsing is disabled by default, but can be enabled by setting `parse_table_names_from_sql: True`. The parser is based on the [`sqllineage`](https://pypi.org/project/sqllineage/) package. + ## Learn more To see the Great Expectations in action, check out [this demo](https://www.loom.com/share/d781c9f0b270477fb5d6b0c26ef7f22d) from the Feb 2022 townhall. \ No newline at end of file diff --git a/metadata-ingestion/src/datahub/integrations/great_expectations/action.py b/metadata-ingestion/src/datahub/integrations/great_expectations/action.py index 9c97fa1c5b..a19b7d7b22 100644 --- a/metadata-ingestion/src/datahub/integrations/great_expectations/action.py +++ b/metadata-ingestion/src/datahub/integrations/great_expectations/action.py @@ -51,7 +51,7 @@ from datahub.metadata.com.linkedin.pegasus2avro.assertion import ( from datahub.metadata.com.linkedin.pegasus2avro.common import DataPlatformInstance from datahub.metadata.com.linkedin.pegasus2avro.events.metadata import ChangeType from datahub.metadata.schema_classes import PartitionSpecClass, PartitionTypeClass -from datahub.utilities.sql_parser import MetadataSQLSQLParser +from datahub.utilities.sql_parser import DefaultSQLParser logger = logging.getLogger(__name__) @@ -71,6 +71,7 @@ class DataHubValidationAction(ValidationAction): retry_status_codes: Optional[List[int]] = None, retry_max_times: Optional[int] = None, extra_headers: Optional[Dict[str, str]] = None, + parse_table_names_from_sql: bool = False, ): super().__init__(data_context) self.server_url = server_url @@ -82,6 +83,7 @@ class DataHubValidationAction(ValidationAction): self.retry_status_codes = retry_status_codes self.retry_max_times = retry_max_times self.extra_headers = extra_headers + self.parse_table_names_from_sql = parse_table_names_from_sql def _run( self, @@ -598,6 +600,12 @@ class DataHubValidationAction(ValidationAction): } ) elif isinstance(ge_batch_spec, RuntimeQueryBatchSpec): + if not self.parse_table_names_from_sql: + warn( + "Enable parse_table_names_from_sql in DatahubValidationAction config\ + to try to parse the tables being asserted from SQL query" + ) + return [] query = data_asset.batches[ batch_identifier ].batch_request.runtime_parameters["query"] @@ -610,11 +618,12 @@ class DataHubValidationAction(ValidationAction): query=query, customProperties=batchSpecProperties, ) - tables = MetadataSQLSQLParser(query).get_tables() + tables = DefaultSQLParser(query).get_tables() if len(set(tables)) != 1: warn( "DataHubValidationAction does not support cross dataset assertions." ) + return [] for table in tables: dataset_urn = make_dataset_urn_from_sqlalchemy_uri( sqlalchemy_uri, diff --git a/metadata-ingestion/src/datahub/utilities/sql_parser.py b/metadata-ingestion/src/datahub/utilities/sql_parser.py index 8492bd16b9..21fa422b85 100644 --- a/metadata-ingestion/src/datahub/utilities/sql_parser.py +++ b/metadata-ingestion/src/datahub/utilities/sql_parser.py @@ -5,15 +5,15 @@ import unittest.mock from abc import ABCMeta, abstractmethod from typing import List, Set +import sqlparse +from networkx import DiGraph +from sqllineage.core import LineageAnalyzer from sqllineage.core.holders import Column, SQLLineageHolder -try: - import sqlparse - from networkx import DiGraph - from sql_metadata import Parser as MetadataSQLParser - from sqllineage.core import LineageAnalyzer +import datahub.utilities.sqllineage_patch - import datahub.utilities.sqllineage_patch +try: + from sql_metadata import Parser as MetadataSQLParser except ImportError: pass diff --git a/metadata-ingestion/tests/integration/great-expectations/setup/great_expectations/checkpoints/test_checkpoint.yml b/metadata-ingestion/tests/integration/great-expectations/setup/great_expectations/checkpoints/test_checkpoint.yml index 1c5a8ebcca..7b12c5f2cf 100644 --- a/metadata-ingestion/tests/integration/great-expectations/setup/great_expectations/checkpoints/test_checkpoint.yml +++ b/metadata-ingestion/tests/integration/great-expectations/setup/great_expectations/checkpoints/test_checkpoint.yml @@ -23,6 +23,7 @@ action_list: class_name: DataHubValidationAction server_url: http://localhost:8080 graceful_exceptions: False + parse_table_names_from_sql: True platform_instance_map: my_postgresql_datasource: postgres1 runtime_postgres_datasource: postgres1