From 3c32658d7a84cf07dbd69966f68ab533bf5933ab Mon Sep 17 00:00:00 2001 From: Onkar Ravgan Date: Mon, 10 Apr 2023 15:34:25 +0530 Subject: [PATCH] Fixed dbt logging (#10977) * Fixed dbt logging * Fixed pytests * Fixed pycheckstyle --- .../metadata/ingestion/sink/metadata_rest.py | 3 ++ .../source/database/dbt/dbt_service.py | 9 ++--- .../ingestion/source/database/dbt/metadata.py | 36 +++++++++---------- ingestion/src/metadata/utils/dbt_config.py | 4 +-- ingestion/tests/unit/test_dbt.py | 5 +-- 5 files changed, 29 insertions(+), 28 deletions(-) diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest.py index b2c702db10d..1b0a70ac88d 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest.py @@ -167,6 +167,9 @@ class MetadataRestSink(Sink[Entity]): logger.debug( f"Successfully ingested DataModel for {table.fullyQualifiedName.__root__}" ) + self.status.records_written( + f"DataModel: {table.fullyQualifiedName.__root__}" + ) else: logger.warning("Unable to ingest datamodel") diff --git a/ingestion/src/metadata/ingestion/source/database/dbt/dbt_service.py b/ingestion/src/metadata/ingestion/source/database/dbt/dbt_service.py index 3c6bab0d68f..c0fe010f5e6 100644 --- a/ingestion/src/metadata/ingestion/source/database/dbt/dbt_service.py +++ b/ingestion/src/metadata/ingestion/source/database/dbt/dbt_service.py @@ -42,18 +42,15 @@ logger = ingestion_logger() class DbtServiceTopology(ServiceTopology): """ - Defines the hierarchy in Database Services. - service -> db -> schema -> table. - - We could have a topology validator. We can only consume - data that has been produced by any parent node. + Defines the hierarchy in dbt Services. + dbt files -> dbt tags -> data models -> descriptions -> lineage -> tests. """ root = TopologyNode( producer="get_dbt_files", stages=[ NodeStage( - type_=OMetaTagAndClassification, + type_=DbtFiles, processor="validate_dbt_files", ack_sink=False, nullable=True, diff --git a/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py b/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py index 1c023167a8c..f30f1b25e02 100644 --- a/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py @@ -229,7 +229,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods required_catalog_key in catalog_column for required_catalog_key in REQUIRED_CATALOG_KEYS ): - logger.info(f"Successfully Validated DBT Column: {catalog_key}") + logger.debug(f"Successfully Validated DBT Column: {catalog_key}") else: logger.warning( f"Error validating DBT Column: {catalog_key}\n" @@ -241,7 +241,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods Method to validate DBT files """ # Validate the Manifest File - logger.info("Validating Manifest File") + logger.debug("Validating Manifest File") if self.source_config.dbtConfigSource and dbt_files.dbt_manifest: manifest_entities = { @@ -264,7 +264,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods required_key in manifest_node for required_key in REQUIRED_MANIFEST_KEYS ): - logger.info(f"Successfully Validated DBT Node: {key}") + logger.debug(f"Successfully Validated DBT Node: {key}") else: logger.warning( f"Error validating DBT Node: {key}\n" @@ -296,7 +296,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods **dbt_objects.dbt_manifest.nodes, **dbt_objects.dbt_manifest.sources, } - logger.info("Processing DBT Tags") + logger.debug("Processing DBT Tags") dbt_tags_list = [] for key, manifest_node in manifest_entities.items(): try: @@ -368,7 +368,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods Yield the data models """ if self.source_config.dbtConfigSource and dbt_objects.dbt_manifest: - logger.info("Parsing DBT Data Models") + logger.debug("Parsing DBT Data Models") manifest_entities = { **dbt_objects.dbt_manifest.nodes, **dbt_objects.dbt_manifest.sources, @@ -402,7 +402,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods if manifest_node.resource_type.value in [ item.value for item in SkipResourceTypeEnum ]: - logger.info(f"Skipping DBT node: {key}.") + logger.debug(f"Skipping DBT node: {key}.") continue model_name = ( @@ -410,7 +410,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods if hasattr(manifest_node, "alias") and manifest_node.alias else manifest_node.name ) - logger.info(f"Processing DBT node: {model_name}") + logger.debug(f"Processing DBT node: {model_name}") catalog_node = None if dbt_objects.dbt_catalog: @@ -478,8 +478,8 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods else: logger.warning( f"Unable to find the table '{table_fqn}' in OpenMetadata" - f"Please check if the table exists is ingested in OpenMetadata" - f"And name, database, schema of the manifest node matches with the table present in OpenMetadata" # pylint: disable=line-too-long + f"Please check if the table exists and is ingested in OpenMetadata" + f"Also name, database, schema of the manifest node matches with the table present in OpenMetadata" # pylint: disable=line-too-long ) except Exception as exc: logger.debug(traceback.format_exc()) @@ -550,7 +550,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods manifest_columns = manifest_node.columns for key, manifest_column in manifest_columns.items(): try: - logger.info(f"Processing DBT column: {key}") + logger.debug(f"Processing DBT column: {key}") # If catalog file is passed pass the column information from catalog file catalog_column = None if catalog_node and catalog_node.columns: @@ -585,7 +585,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods ), ) ) - logger.info(f"Successfully processed DBT column: {key}") + logger.debug(f"Successfully processed DBT column: {key}") except Exception as exc: # pylint: disable=broad-except logger.debug(traceback.format_exc()) logger.warning(f"Failed to parse DBT column {column_name}: {exc}") @@ -599,7 +599,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods Method to process DBT lineage from upstream nodes """ to_entity: Table = data_model_link.table_entity - logger.info( + logger.debug( f"Processing DBT lineage for: {to_entity.fullyQualifiedName.__root__}" ) @@ -641,7 +641,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods Method to process DBT lineage from queries """ to_entity: Table = data_model_link.table_entity - logger.info( + logger.debug( f"Processing DBT Query lineage for: {to_entity.fullyQualifiedName.__root__}" ) @@ -680,7 +680,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods Method to process DBT descriptions using patch APIs """ table_entity: Table = data_model_link.table_entity - logger.info( + logger.debug( f"Processing DBT Descriptions for: {table_entity.fullyQualifiedName.__root__}" ) if table_entity: @@ -720,7 +720,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods manifest_node = dbt_test.get(DbtCommonEnum.MANIFEST_NODE.value) if manifest_node: test_name = manifest_node.name - logger.info(f"Processing DBT Tests Suite for node: {test_name}") + logger.debug(f"Processing DBT Tests Suite for node: {test_name}") test_suite_name = manifest_node.meta.get( DbtCommonEnum.TEST_SUITE_NAME.value, DbtCommonEnum.DBT_TEST_SUITE.value, @@ -749,7 +749,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods try: manifest_node = dbt_test.get(DbtCommonEnum.MANIFEST_NODE.value) if manifest_node: - logger.info( + logger.debug( f"Processing DBT Tests Suite Definition for node: {manifest_node.name}" ) check_test_definition_exists = self.metadata.get_by_name( @@ -782,7 +782,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods try: manifest_node = dbt_test.get(DbtCommonEnum.MANIFEST_NODE.value) if manifest_node: - logger.info( + logger.debug( f"Processing DBT Test Case Definition for node: {manifest_node.name}" ) entity_link_list = self.generate_entity_link(dbt_test) @@ -817,7 +817,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods # Process the Test Status manifest_node = dbt_test.get(DbtCommonEnum.MANIFEST_NODE.value) if manifest_node: - logger.info( + logger.debug( f"Processing DBT Test Case Results for node: {manifest_node.name}" ) dbt_test_result = dbt_test.get(DbtCommonEnum.RESULTS.value) diff --git a/ingestion/src/metadata/utils/dbt_config.py b/ingestion/src/metadata/utils/dbt_config.py index 356912d40f9..e4d0b259d2c 100644 --- a/ingestion/src/metadata/utils/dbt_config.py +++ b/ingestion/src/metadata/utils/dbt_config.py @@ -193,7 +193,7 @@ def _(config: DbtCloudConfig): # pylint: disable=too-many-locals f"/accounts/{account_id}/runs/{run_id}/artifacts/{DBT_CATALOG_FILE_NAME}" ) except Exception as exc: - logger.info( + logger.debug( f"dbt catalog file not found, skipping the catalog file: {exc}" ) logger.debug(traceback.format_exc()) @@ -207,7 +207,7 @@ def _(config: DbtCloudConfig): # pylint: disable=too-many-locals f"/accounts/{account_id}/runs/{run_id}/artifacts/{DBT_RUN_RESULTS_FILE_NAME}" ) except Exception as exc: - logger.info( + logger.debug( f"dbt run_results file not found, skipping dbt tests: {exc}" ) logger.debug(traceback.format_exc()) diff --git a/ingestion/tests/unit/test_dbt.py b/ingestion/tests/unit/test_dbt.py index 82ebf538100..f8c18e1011c 100644 --- a/ingestion/tests/unit/test_dbt.py +++ b/ingestion/tests/unit/test_dbt.py @@ -45,6 +45,7 @@ mock_dbt_config = { }, "sink": {"type": "metadata-rest", "config": {}}, "workflowConfig": { + "loggerLevel": "DEBUG", "openMetadataServerConfig": { "hostPort": "http://localhost:8585/api", "authProvider": "openmetadata", @@ -56,7 +57,7 @@ mock_dbt_config = { "r3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3u" "d-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" }, - } + }, }, } @@ -447,7 +448,7 @@ class DbtUnitTest(TestCase): return dbt_files, dbt_objects def check_dbt_validate(self, dbt_files, expected_records): - with self.assertLogs() as captured: + with self.assertLogs(level="DEBUG") as captured: self.dbt_source_obj.validate_dbt_files(dbt_files=dbt_files) self.assertEqual(len(captured.records), expected_records) for record in captured.records: