fix(GE): fix dependencies for GE DataHubValidationAction, logic for s… (#4347)

This commit is contained in:
mayurinehate 2022-03-09 00:50:09 +05:30 committed by GitHub
parent 1d9becad1e
commit e19b12030e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 24 additions and 9 deletions

View File

@ -101,9 +101,7 @@ snowflake_common = {
"cryptography",
}
microsoft_common = {
"msal==1.16.0"
}
microsoft_common = {"msal==1.16.0"}
data_lake_base = {
*aws_common,
@ -129,7 +127,7 @@ plugins: Dict[str, Set[str]] = {
"airflow": {
"apache-airflow >= 1.10.2",
},
"great-expectations": sql_common,
"great-expectations": sql_common | {"sqllineage==1.3.3"},
# Source plugins
# PyAthena is pinned with exact version because we use private method in PyAthena
"athena": sql_common | {"PyAthena[SQLAlchemy]==2.4.1"},
@ -188,7 +186,7 @@ plugins: Dict[str, Set[str]] = {
"trino": sql_common | {"trino"},
"starburst-trino-usage": sql_common | {"trino"},
"nifi": {"requests", "packaging"},
"powerbi": {"orderedset"} | microsoft_common
"powerbi": {"orderedset"} | microsoft_common,
}
all_exclude_plugins: Set[str] = {

View File

@ -25,6 +25,7 @@ from great_expectations.execution_engine.sqlalchemy_execution_engine import (
SqlAlchemyExecutionEngine,
)
from great_expectations.validator.validator import Validator
from sqlalchemy.engine.base import Connection, Engine
from sqlalchemy.engine.url import make_url
import datahub.emitter.mce_builder as builder
@ -550,13 +551,20 @@ class DataHubValidationAction(ValidationAction):
data_asset.active_batch_definition.datasource_name
),
}
sqlalchemy_uri = None
if isinstance(data_asset.execution_engine.engine, Engine):
sqlalchemy_uri = data_asset.execution_engine.engine.url
# For snowflake sqlalchemy_execution_engine.engine is actually instance of Connection
elif isinstance(data_asset.execution_engine.engine, Connection):
sqlalchemy_uri = data_asset.execution_engine.engine.engine.url
if isinstance(ge_batch_spec, SqlAlchemyDatasourceBatchSpec):
# e.g. ConfiguredAssetSqlDataConnector with splitter_method or sampling_method
schema_name = ge_batch_spec.get("schema_name")
table_name = ge_batch_spec.get("table_name")
dataset_urn = make_dataset_urn_from_sqlalchemy_uri(
data_asset.execution_engine.engine.url,
sqlalchemy_uri,
schema_name,
table_name,
self.env,
@ -609,7 +617,7 @@ class DataHubValidationAction(ValidationAction):
)
for table in tables:
dataset_urn = make_dataset_urn_from_sqlalchemy_uri(
data_asset.execution_engine.engine.url,
sqlalchemy_uri,
None,
table,
self.env,
@ -640,7 +648,7 @@ class DataHubValidationAction(ValidationAction):
def get_platform_instance(self, datasource_name):
if self.platform_instance_map and datasource_name in self.platform_instance_map:
return self.platform_instance_map[datasource_name]
if datasource_name:
if self.platform_instance_map:
warn(
f"Datasource {datasource_name} is not present in platform_instance_map"
)
@ -680,7 +688,15 @@ def make_dataset_urn_from_sqlalchemy_uri(
for {data_platform}."
)
return None
schema_name = "{}.{}".format(url_instance.database, schema_name)
# If data platform is snowflake, we artificially lowercase the Database name.
# This is because DataHub also does this during ingestion.
# Ref: https://github.com/linkedin/datahub/blob/master/metadata-ingestion%2Fsrc%2Fdatahub%2Fingestion%2Fsource%2Fsql%2Fsnowflake.py#L272
schema_name = "{}.{}".format(
url_instance.database.lower()
if data_platform == "snowflake"
else url_instance.database,
schema_name,
)
elif data_platform == "bigquery":
if url_instance.host is None or url_instance.database is None:
warn(
@ -705,6 +721,7 @@ def make_dataset_urn_from_sqlalchemy_uri(
platform_instance=platform_instance,
env=env,
)
return dataset_urn