fix(ingest): presto-on-hive - Adding catalog name to the presto on hive urn (#6024)

Co-authored-by: Shirshanka Das <shirshanka@apache.org>
This commit is contained in:
Tamas Nemeth 2022-10-19 18:28:33 +02:00 committed by GitHub
parent e54f376f0a
commit 0545f3cb38
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 5014 additions and 2455 deletions

View File

@ -121,6 +121,11 @@ class PrestoOnHiveConfig(BasicSQLAlchemyConfig):
description="Dataset Subtype name to be 'Table' or 'View' Valid options: ['True', 'False']",
)
include_catalog_name_in_ids: bool = Field(
default=False,
description="Add the Presto catalog name (e.g. hive) to the generated dataset urns. `urn:li:dataset:(urn:li:dataPlatform:hive,hive.user.logging_events,PROD)` versus `urn:li:dataset:(urn:li:dataPlatform:hive,user.logging_events,PROD)`",
)
def get_sql_alchemy_url(self, uri_opts: Optional[Dict[str, Any]] = None) -> str:
if not ((self.host_port and self.scheme) or self.sqlalchemy_uri):
raise ValueError("host_port and schema or connect_uri required.")
@ -407,9 +412,17 @@ class PrestoOnHiveSource(SQLAlchemySource):
iter_res = self._alchemy_client.execute_query(statement)
for key, group in groupby(iter_res, self._get_table_key):
dataset_name = self.get_identifier(
schema=key.schema, entity=key.table, inspector=inspector
db_name = self.get_db_name(inspector)
schema_name = (
f"{db_name}.{key.schema}"
if self.config.include_catalog_name_in_ids
else key.schema
)
dataset_name = self.get_identifier(
schema=schema_name, entity=key.table, inspector=inspector
)
self.report.report_entity_scanned(dataset_name, ent_type="table")
if not sql_config.table_pattern.allowed(dataset_name):
@ -521,8 +534,14 @@ class PrestoOnHiveSource(SQLAlchemySource):
iter_res = self._alchemy_client.execute_query(statement)
for key, group in groupby(iter_res, self._get_table_key):
db_name = self.get_db_name(inspector)
schema_name = (
f"{db_name}.{key.schema}"
if self.config.include_catalog_name_in_ids
else key.schema
)
dataset_name = self.get_identifier(
schema=key.schema, entity=key.table, inspector=inspector
schema=schema_name, entity=key.table, inspector=inspector
)
columns = list(group)
@ -553,8 +572,16 @@ class PrestoOnHiveSource(SQLAlchemySource):
iter_res = self._alchemy_client.execute_query(statement)
for row in iter_res:
db_name = self.get_db_name(inspector)
schema_name = (
f"{db_name}.{row['schema']}"
if self.config.include_catalog_name_in_ids
else row["schema"]
)
dataset_name = self.get_identifier(
schema=row["schema"], entity=row["name"], inspector=inspector
schema=schema_name,
entity=row["name"],
inspector=inspector,
)
columns, view_definition = self._get_presto_view_column_metadata(

View File

@ -53,10 +53,12 @@ def loaded_presto_on_hive(presto_on_hive_runner):
@freeze_time(FROZEN_TIME)
@pytest.mark.integration_batch_1
@pytest.mark.parametrize(
"mode,use_catalog_subtype,use_dataset_pascalcase_subtype,test_suffix",
"mode,use_catalog_subtype,use_dataset_pascalcase_subtype,include_catalog_name_in_ids,test_suffix",
[
("hive", False, False, "_1"),
("presto-on-hive", True, True, "_2"),
("hive", False, False, False, "_1"),
("presto-on-hive", True, True, False, "_2"),
("hive", False, False, True, "_3"),
("presto-on-hive", True, True, True, "_4"),
],
)
def test_presto_on_hive_ingest(
@ -68,6 +70,7 @@ def test_presto_on_hive_ingest(
mode,
use_catalog_subtype,
use_dataset_pascalcase_subtype,
include_catalog_name_in_ids,
test_suffix,
):
@ -91,6 +94,7 @@ def test_presto_on_hive_ingest(
"scheme": "postgresql+psycopg2",
"include_views": True,
"include_tables": True,
"include_catalog_name_in_ids": include_catalog_name_in_ids,
"schema_pattern": {"allow": ["^public"]},
"mode": mode,
"use_catalog_subtype": use_catalog_subtype,