feat(GE): add option to disable sql parsing, use default parser (#4377)

This commit is contained in:
mayurinehate 2022-03-11 07:06:59 +05:30 committed by GitHub
parent cf7cefe203
commit 3ea72869f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 20 additions and 9 deletions

View File

@ -47,6 +47,7 @@ This integration does not support
- `retry_status_codes` (optional): Retry HTTP request also on these status codes. - `retry_status_codes` (optional): Retry HTTP request also on these status codes.
- `retry_max_times` (optional): Maximum times to retry if HTTP request fails. The delay between retries is increased exponentially. - `retry_max_times` (optional): Maximum times to retry if HTTP request fails. The delay between retries is increased exponentially.
- `extra_headers` (optional): Extra headers which will be added to the datahub request. - `extra_headers` (optional): Extra headers which will be added to the datahub request.
- `parse_table_names_from_sql` (defaults to false): The integration can use an SQL parser to try to parse the datasets being asserted. This parsing is disabled by default, but can be enabled by setting `parse_table_names_from_sql: True`. The parser is based on the [`sqllineage`](https://pypi.org/project/sqllineage/) package.
## Learn more ## Learn more

View File

@ -51,7 +51,7 @@ from datahub.metadata.com.linkedin.pegasus2avro.assertion import (
from datahub.metadata.com.linkedin.pegasus2avro.common import DataPlatformInstance from datahub.metadata.com.linkedin.pegasus2avro.common import DataPlatformInstance
from datahub.metadata.com.linkedin.pegasus2avro.events.metadata import ChangeType from datahub.metadata.com.linkedin.pegasus2avro.events.metadata import ChangeType
from datahub.metadata.schema_classes import PartitionSpecClass, PartitionTypeClass from datahub.metadata.schema_classes import PartitionSpecClass, PartitionTypeClass
from datahub.utilities.sql_parser import MetadataSQLSQLParser from datahub.utilities.sql_parser import DefaultSQLParser
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -71,6 +71,7 @@ class DataHubValidationAction(ValidationAction):
retry_status_codes: Optional[List[int]] = None, retry_status_codes: Optional[List[int]] = None,
retry_max_times: Optional[int] = None, retry_max_times: Optional[int] = None,
extra_headers: Optional[Dict[str, str]] = None, extra_headers: Optional[Dict[str, str]] = None,
parse_table_names_from_sql: bool = False,
): ):
super().__init__(data_context) super().__init__(data_context)
self.server_url = server_url self.server_url = server_url
@ -82,6 +83,7 @@ class DataHubValidationAction(ValidationAction):
self.retry_status_codes = retry_status_codes self.retry_status_codes = retry_status_codes
self.retry_max_times = retry_max_times self.retry_max_times = retry_max_times
self.extra_headers = extra_headers self.extra_headers = extra_headers
self.parse_table_names_from_sql = parse_table_names_from_sql
def _run( def _run(
self, self,
@ -598,6 +600,12 @@ class DataHubValidationAction(ValidationAction):
} }
) )
elif isinstance(ge_batch_spec, RuntimeQueryBatchSpec): elif isinstance(ge_batch_spec, RuntimeQueryBatchSpec):
if not self.parse_table_names_from_sql:
warn(
"Enable parse_table_names_from_sql in DatahubValidationAction config\
to try to parse the tables being asserted from SQL query"
)
return []
query = data_asset.batches[ query = data_asset.batches[
batch_identifier batch_identifier
].batch_request.runtime_parameters["query"] ].batch_request.runtime_parameters["query"]
@ -610,11 +618,12 @@ class DataHubValidationAction(ValidationAction):
query=query, query=query,
customProperties=batchSpecProperties, customProperties=batchSpecProperties,
) )
tables = MetadataSQLSQLParser(query).get_tables() tables = DefaultSQLParser(query).get_tables()
if len(set(tables)) != 1: if len(set(tables)) != 1:
warn( warn(
"DataHubValidationAction does not support cross dataset assertions." "DataHubValidationAction does not support cross dataset assertions."
) )
return []
for table in tables: for table in tables:
dataset_urn = make_dataset_urn_from_sqlalchemy_uri( dataset_urn = make_dataset_urn_from_sqlalchemy_uri(
sqlalchemy_uri, sqlalchemy_uri,

View File

@ -5,15 +5,15 @@ import unittest.mock
from abc import ABCMeta, abstractmethod from abc import ABCMeta, abstractmethod
from typing import List, Set from typing import List, Set
import sqlparse
from networkx import DiGraph
from sqllineage.core import LineageAnalyzer
from sqllineage.core.holders import Column, SQLLineageHolder from sqllineage.core.holders import Column, SQLLineageHolder
try: import datahub.utilities.sqllineage_patch
import sqlparse
from networkx import DiGraph
from sql_metadata import Parser as MetadataSQLParser
from sqllineage.core import LineageAnalyzer
import datahub.utilities.sqllineage_patch try:
from sql_metadata import Parser as MetadataSQLParser
except ImportError: except ImportError:
pass pass

View File

@ -23,6 +23,7 @@ action_list:
class_name: DataHubValidationAction class_name: DataHubValidationAction
server_url: http://localhost:8080 server_url: http://localhost:8080
graceful_exceptions: False graceful_exceptions: False
parse_table_names_from_sql: True
platform_instance_map: platform_instance_map:
my_postgresql_datasource: postgres1 my_postgresql_datasource: postgres1
runtime_postgres_datasource: postgres1 runtime_postgres_datasource: postgres1