From 21f8d4be034da22e298f3fe893227dfa65d6e4ba Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Thu, 26 May 2022 16:24:54 +0530 Subject: [PATCH] fix(redash): improve logging for debugging, add validation for dataset urn, some refactoring (#4997) --- metadata-ingestion/scripts/modeldocgen.py | 10 ++++--- .../src/datahub/emitter/mce_builder.py | 23 +++++++++------ .../src/datahub/ingestion/source/nifi.py | 2 +- .../src/datahub/ingestion/source/redash.py | 29 +++++++++++++------ .../src/datahub/utilities/urns/dataset_urn.py | 26 ++++++++++++----- .../src/datahub/utilities/urns/urn.py | 2 +- 6 files changed, 61 insertions(+), 31 deletions(-) diff --git a/metadata-ingestion/scripts/modeldocgen.py b/metadata-ingestion/scripts/modeldocgen.py index 48bbee8056..3265d3570a 100644 --- a/metadata-ingestion/scripts/modeldocgen.py +++ b/metadata-ingestion/scripts/modeldocgen.py @@ -34,7 +34,6 @@ from datahub.metadata.schema_classes import ( SchemaMetadataClass, StringTypeClass, SubTypesClass, - SystemMetadataClass, TagAssociationClass, ) @@ -43,6 +42,7 @@ logger = logging.getLogger(__name__) # TODO: Support generating docs for each event type in entity registry. + def capitalize_first(something: str) -> str: return something[0:1].upper() + something[1:] @@ -89,10 +89,12 @@ class AspectDefinition: schema: Optional[avro.schema.Schema] = None type: Optional[str] = None + @dataclass class EventDefinition: name: str + entity_registry: Dict[str, EntityDefinition] = {} @@ -386,7 +388,7 @@ def generate_stitched_record(relnships_graph: RelationshipGraph) -> List[Any]: ) foreign_keys: List[ForeignKeyConstraintClass] = [] source_dataset_urn = make_dataset_urn( - platform=make_data_platform_urn("datahub"), + platform="datahub", name=f"{entity_display_name}", ) for f_field in schema_fields: @@ -444,7 +446,7 @@ def generate_stitched_record(relnships_graph: RelationshipGraph) -> List[Any]: destination_entity_name = capitalize_first(entity_type) foreign_dataset_urn = make_dataset_urn( - platform=make_data_platform_urn("datahub"), + platform="datahub", name=destination_entity_name, ) fkey = ForeignKeyConstraintClass( @@ -478,7 +480,7 @@ def generate_stitched_record(relnships_graph: RelationshipGraph) -> List[Any]: dataset = DatasetSnapshotClass( urn=make_dataset_urn( - platform=make_data_platform_urn("datahub"), + platform="datahub", name=f"{entity_display_name}", ), aspects=[ diff --git a/metadata-ingestion/src/datahub/emitter/mce_builder.py b/metadata-ingestion/src/datahub/emitter/mce_builder.py index 042f6cae0b..e620393370 100644 --- a/metadata-ingestion/src/datahub/emitter/mce_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mce_builder.py @@ -33,6 +33,7 @@ from datahub.metadata.schema_classes import ( UpstreamClass, UpstreamLineageClass, ) +from datahub.utilities.urns.dataset_urn import DatasetUrn logger = logging.getLogger(__name__) @@ -67,9 +68,9 @@ def make_data_platform_urn(platform: str) -> str: def make_dataset_urn(platform: str, name: str, env: str = DEFAULT_ENV) -> str: - if DATASET_URN_TO_LOWER: - name = name.lower() - return f"urn:li:dataset:({make_data_platform_urn(platform)},{name},{env})" + return make_dataset_urn_with_platform_instance( + platform=platform, name=name, platform_instance=None, env=env + ) def make_dataplatform_instance_urn(platform: str, instance: str) -> str: @@ -82,12 +83,16 @@ def make_dataplatform_instance_urn(platform: str, instance: str) -> str: def make_dataset_urn_with_platform_instance( platform: str, name: str, platform_instance: Optional[str], env: str = DEFAULT_ENV ) -> str: - if platform_instance: - if DATASET_URN_TO_LOWER: - name = name.lower() - return f"urn:li:dataset:({make_data_platform_urn(platform)},{platform_instance}.{name},{env})" - else: - return make_dataset_urn(platform=platform, name=name, env=env) + if DATASET_URN_TO_LOWER: + name = name.lower() + return str( + DatasetUrn.create_from_ids( + platform_id=platform, + table_name=name, + env=env, + platform_instance=platform_instance, + ) + ) def make_schema_field_urn(parent_urn: str, field_path: str) -> str: diff --git a/metadata-ingestion/src/datahub/ingestion/source/nifi.py b/metadata-ingestion/src/datahub/ingestion/source/nifi.py index cadb34e6a9..bb8ac44355 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/nifi.py +++ b/metadata-ingestion/src/datahub/ingestion/source/nifi.py @@ -193,7 +193,7 @@ class NifiProcessorProvenanceEventAnalyzer: s3_url = f"s3://{s3_bucket}/{s3_key}" s3_url = s3_url[: s3_url.rindex("/")] dataset_name = s3_url.replace("s3://", "").replace("/", ".") - platform = "urn:li:dataPlatform:s3" + platform = "s3" dataset_urn = builder.make_dataset_urn(platform, dataset_name, self.env) return ExternalDataset( platform, diff --git a/metadata-ingestion/src/datahub/ingestion/source/redash.py b/metadata-ingestion/src/datahub/ingestion/source/redash.py index c25707f119..70aeaa4ba5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redash.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redash.py @@ -332,11 +332,14 @@ class RedashSource(Source): f"Running Redash ingestion with parse_table_names_from_sql={self.parse_table_names_from_sql}" ) + def error(self, log: logging.Logger, key: str, reason: str) -> None: + self.report.report_failure(key, reason) + log.error(f"{key} => {reason}") + def test_connection(self) -> None: test_response = self.client._get(f"{self.config.connect_uri}/api") if test_response.status_code == 200: logger.info("Redash API connected succesfully") - pass else: raise ValueError(f"Failed to connect to {self.config.connect_uri}/api") @@ -349,9 +352,6 @@ class RedashSource(Source): def _import_sql_parser_cls(cls, sql_parser_path: str) -> Type[SQLParser]: assert "." in sql_parser_path, "sql_parser-path must contain a ." module_name, cls_name = sql_parser_path.rsplit(".", 1) - import sys - - logger.debug(sys.path) parser_cls = getattr(importlib.import_module(module_name), cls_name) if not issubclass(parser_cls, SQLParser): raise ValueError(f"must be derived from {SQLParser}; got {parser_cls}") @@ -421,6 +421,8 @@ class RedashSource(Source): platform = self._get_platform_based_on_datasource(data_source) database_name = self._get_database_name_based_on_datasource(data_source) data_source_syntax = data_source.get("syntax") + data_source_id = data_source.get("id") + query_id = sql_query_data.get("id") if database_name: query = sql_query_data.get("query", "") @@ -439,8 +441,11 @@ class RedashSource(Source): ) ) except Exception as e: - logger.error(e) - logger.error(query) + self.error( + logger, + f"sql-parsing-query-{query_id}-datasource-{data_source_id}", + f"exception {e} in parsing {query}", + ) # make sure dataset_urns is not empty list return dataset_urns if len(dataset_urns) > 0 else None @@ -540,6 +545,9 @@ class RedashSource(Source): dashboards_response = self.client.dashboards(1, PAGE_SIZE) total_dashboards = dashboards_response["count"] max_page = total_dashboards // PAGE_SIZE + logger.info( + f"/api/dashboards total count {total_dashboards} and max page {max_page}" + ) while ( current_dashboards_page <= max_page @@ -549,7 +557,9 @@ class RedashSource(Source): page=current_dashboards_page, page_size=PAGE_SIZE ) - logger.info(f"/api/dashboards on page {current_dashboards_page}") + logger.info( + f"/api/dashboards on page {current_dashboards_page} / {max_page}" + ) current_dashboards_page += 1 @@ -649,7 +659,7 @@ class RedashSource(Source): if datasource_urns is None: self.report.report_warning( - key=f"redash-chart-{viz_id}", + key=f"redash-chart-{viz_id}-datasource-{data_source_id}", reason=f"data_source_type={data_source_type} not yet implemented. Setting inputs to None", ) @@ -673,6 +683,7 @@ class RedashSource(Source): _queries_response = self.client.queries(1, PAGE_SIZE) total_queries = _queries_response["count"] max_page = total_queries // PAGE_SIZE + logger.info(f"/api/queries total count {total_queries} and max page {max_page}") while ( current_queries_page <= max_page @@ -682,7 +693,7 @@ class RedashSource(Source): page=current_queries_page, page_size=PAGE_SIZE ) - logger.info(f"/api/queries on page {current_queries_page}") + logger.info(f"/api/queries on page {current_queries_page} / {max_page}") current_queries_page += 1 diff --git a/metadata-ingestion/src/datahub/utilities/urns/dataset_urn.py b/metadata-ingestion/src/datahub/utilities/urns/dataset_urn.py index 6563c465a8..fc10323e72 100644 --- a/metadata-ingestion/src/datahub/utilities/urns/dataset_urn.py +++ b/metadata-ingestion/src/datahub/utilities/urns/dataset_urn.py @@ -1,4 +1,4 @@ -from typing import List, Set +from typing import List, Optional, Set from datahub.metadata.schema_classes import FabricTypeClass from datahub.utilities.urns.data_platform_urn import DataPlatformUrn @@ -55,13 +55,25 @@ class DatasetUrn(Urn): @classmethod def create_from_ids( - cls, platform_id: str, table_name: str, env: str + cls, + platform_id: str, + table_name: str, + env: str, + platform_instance: Optional[str] = None, ) -> "DatasetUrn": - entity_id: List[str] = [ - str(DataPlatformUrn.create_from_id(platform_id)), - table_name, - env, - ] + entity_id: List[str] + if platform_instance: + entity_id = [ + str(DataPlatformUrn.create_from_id(platform_id)), + f"{platform_instance}.{table_name}", + env, + ] + else: + entity_id = [ + str(DataPlatformUrn.create_from_id(platform_id)), + table_name, + env, + ] return cls(DatasetUrn.ENTITY_TYPE, entity_id) @staticmethod diff --git a/metadata-ingestion/src/datahub/utilities/urns/urn.py b/metadata-ingestion/src/datahub/utilities/urns/urn.py index cfcf6f8501..7498cc1532 100644 --- a/metadata-ingestion/src/datahub/utilities/urns/urn.py +++ b/metadata-ingestion/src/datahub/utilities/urns/urn.py @@ -144,7 +144,7 @@ class Urn: return self._entity_id[0] result = "" for part in self._entity_id: - result = result + part + "," + result = result + str(part) + "," return f"({result[:-1]})" def __hash__(self) -> int: