Fixed dbt logging (#10977)

* Fixed dbt logging

* Fixed pytests

* Fixed pycheckstyle
This commit is contained in:
Onkar Ravgan 2023-04-10 15:34:25 +05:30 committed by GitHub
parent 9f2b10f6e2
commit 3c32658d7a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 29 additions and 28 deletions

View File

@ -167,6 +167,9 @@ class MetadataRestSink(Sink[Entity]):
logger.debug( logger.debug(
f"Successfully ingested DataModel for {table.fullyQualifiedName.__root__}" f"Successfully ingested DataModel for {table.fullyQualifiedName.__root__}"
) )
self.status.records_written(
f"DataModel: {table.fullyQualifiedName.__root__}"
)
else: else:
logger.warning("Unable to ingest datamodel") logger.warning("Unable to ingest datamodel")

View File

@ -42,18 +42,15 @@ logger = ingestion_logger()
class DbtServiceTopology(ServiceTopology): class DbtServiceTopology(ServiceTopology):
""" """
Defines the hierarchy in Database Services. Defines the hierarchy in dbt Services.
service -> db -> schema -> table. dbt files -> dbt tags -> data models -> descriptions -> lineage -> tests.
We could have a topology validator. We can only consume
data that has been produced by any parent node.
""" """
root = TopologyNode( root = TopologyNode(
producer="get_dbt_files", producer="get_dbt_files",
stages=[ stages=[
NodeStage( NodeStage(
type_=OMetaTagAndClassification, type_=DbtFiles,
processor="validate_dbt_files", processor="validate_dbt_files",
ack_sink=False, ack_sink=False,
nullable=True, nullable=True,

View File

@ -229,7 +229,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
required_catalog_key in catalog_column required_catalog_key in catalog_column
for required_catalog_key in REQUIRED_CATALOG_KEYS for required_catalog_key in REQUIRED_CATALOG_KEYS
): ):
logger.info(f"Successfully Validated DBT Column: {catalog_key}") logger.debug(f"Successfully Validated DBT Column: {catalog_key}")
else: else:
logger.warning( logger.warning(
f"Error validating DBT Column: {catalog_key}\n" f"Error validating DBT Column: {catalog_key}\n"
@ -241,7 +241,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
Method to validate DBT files Method to validate DBT files
""" """
# Validate the Manifest File # Validate the Manifest File
logger.info("Validating Manifest File") logger.debug("Validating Manifest File")
if self.source_config.dbtConfigSource and dbt_files.dbt_manifest: if self.source_config.dbtConfigSource and dbt_files.dbt_manifest:
manifest_entities = { manifest_entities = {
@ -264,7 +264,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
required_key in manifest_node required_key in manifest_node
for required_key in REQUIRED_MANIFEST_KEYS for required_key in REQUIRED_MANIFEST_KEYS
): ):
logger.info(f"Successfully Validated DBT Node: {key}") logger.debug(f"Successfully Validated DBT Node: {key}")
else: else:
logger.warning( logger.warning(
f"Error validating DBT Node: {key}\n" f"Error validating DBT Node: {key}\n"
@ -296,7 +296,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
**dbt_objects.dbt_manifest.nodes, **dbt_objects.dbt_manifest.nodes,
**dbt_objects.dbt_manifest.sources, **dbt_objects.dbt_manifest.sources,
} }
logger.info("Processing DBT Tags") logger.debug("Processing DBT Tags")
dbt_tags_list = [] dbt_tags_list = []
for key, manifest_node in manifest_entities.items(): for key, manifest_node in manifest_entities.items():
try: try:
@ -368,7 +368,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
Yield the data models Yield the data models
""" """
if self.source_config.dbtConfigSource and dbt_objects.dbt_manifest: if self.source_config.dbtConfigSource and dbt_objects.dbt_manifest:
logger.info("Parsing DBT Data Models") logger.debug("Parsing DBT Data Models")
manifest_entities = { manifest_entities = {
**dbt_objects.dbt_manifest.nodes, **dbt_objects.dbt_manifest.nodes,
**dbt_objects.dbt_manifest.sources, **dbt_objects.dbt_manifest.sources,
@ -402,7 +402,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
if manifest_node.resource_type.value in [ if manifest_node.resource_type.value in [
item.value for item in SkipResourceTypeEnum item.value for item in SkipResourceTypeEnum
]: ]:
logger.info(f"Skipping DBT node: {key}.") logger.debug(f"Skipping DBT node: {key}.")
continue continue
model_name = ( model_name = (
@ -410,7 +410,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
if hasattr(manifest_node, "alias") and manifest_node.alias if hasattr(manifest_node, "alias") and manifest_node.alias
else manifest_node.name else manifest_node.name
) )
logger.info(f"Processing DBT node: {model_name}") logger.debug(f"Processing DBT node: {model_name}")
catalog_node = None catalog_node = None
if dbt_objects.dbt_catalog: if dbt_objects.dbt_catalog:
@ -478,8 +478,8 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
else: else:
logger.warning( logger.warning(
f"Unable to find the table '{table_fqn}' in OpenMetadata" f"Unable to find the table '{table_fqn}' in OpenMetadata"
f"Please check if the table exists is ingested in OpenMetadata" f"Please check if the table exists and is ingested in OpenMetadata"
f"And name, database, schema of the manifest node matches with the table present in OpenMetadata" # pylint: disable=line-too-long f"Also name, database, schema of the manifest node matches with the table present in OpenMetadata" # pylint: disable=line-too-long
) )
except Exception as exc: except Exception as exc:
logger.debug(traceback.format_exc()) logger.debug(traceback.format_exc())
@ -550,7 +550,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
manifest_columns = manifest_node.columns manifest_columns = manifest_node.columns
for key, manifest_column in manifest_columns.items(): for key, manifest_column in manifest_columns.items():
try: try:
logger.info(f"Processing DBT column: {key}") logger.debug(f"Processing DBT column: {key}")
# If catalog file is passed pass the column information from catalog file # If catalog file is passed pass the column information from catalog file
catalog_column = None catalog_column = None
if catalog_node and catalog_node.columns: if catalog_node and catalog_node.columns:
@ -585,7 +585,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
), ),
) )
) )
logger.info(f"Successfully processed DBT column: {key}") logger.debug(f"Successfully processed DBT column: {key}")
except Exception as exc: # pylint: disable=broad-except except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc()) logger.debug(traceback.format_exc())
logger.warning(f"Failed to parse DBT column {column_name}: {exc}") logger.warning(f"Failed to parse DBT column {column_name}: {exc}")
@ -599,7 +599,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
Method to process DBT lineage from upstream nodes Method to process DBT lineage from upstream nodes
""" """
to_entity: Table = data_model_link.table_entity to_entity: Table = data_model_link.table_entity
logger.info( logger.debug(
f"Processing DBT lineage for: {to_entity.fullyQualifiedName.__root__}" f"Processing DBT lineage for: {to_entity.fullyQualifiedName.__root__}"
) )
@ -641,7 +641,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
Method to process DBT lineage from queries Method to process DBT lineage from queries
""" """
to_entity: Table = data_model_link.table_entity to_entity: Table = data_model_link.table_entity
logger.info( logger.debug(
f"Processing DBT Query lineage for: {to_entity.fullyQualifiedName.__root__}" f"Processing DBT Query lineage for: {to_entity.fullyQualifiedName.__root__}"
) )
@ -680,7 +680,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
Method to process DBT descriptions using patch APIs Method to process DBT descriptions using patch APIs
""" """
table_entity: Table = data_model_link.table_entity table_entity: Table = data_model_link.table_entity
logger.info( logger.debug(
f"Processing DBT Descriptions for: {table_entity.fullyQualifiedName.__root__}" f"Processing DBT Descriptions for: {table_entity.fullyQualifiedName.__root__}"
) )
if table_entity: if table_entity:
@ -720,7 +720,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
manifest_node = dbt_test.get(DbtCommonEnum.MANIFEST_NODE.value) manifest_node = dbt_test.get(DbtCommonEnum.MANIFEST_NODE.value)
if manifest_node: if manifest_node:
test_name = manifest_node.name test_name = manifest_node.name
logger.info(f"Processing DBT Tests Suite for node: {test_name}") logger.debug(f"Processing DBT Tests Suite for node: {test_name}")
test_suite_name = manifest_node.meta.get( test_suite_name = manifest_node.meta.get(
DbtCommonEnum.TEST_SUITE_NAME.value, DbtCommonEnum.TEST_SUITE_NAME.value,
DbtCommonEnum.DBT_TEST_SUITE.value, DbtCommonEnum.DBT_TEST_SUITE.value,
@ -749,7 +749,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
try: try:
manifest_node = dbt_test.get(DbtCommonEnum.MANIFEST_NODE.value) manifest_node = dbt_test.get(DbtCommonEnum.MANIFEST_NODE.value)
if manifest_node: if manifest_node:
logger.info( logger.debug(
f"Processing DBT Tests Suite Definition for node: {manifest_node.name}" f"Processing DBT Tests Suite Definition for node: {manifest_node.name}"
) )
check_test_definition_exists = self.metadata.get_by_name( check_test_definition_exists = self.metadata.get_by_name(
@ -782,7 +782,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
try: try:
manifest_node = dbt_test.get(DbtCommonEnum.MANIFEST_NODE.value) manifest_node = dbt_test.get(DbtCommonEnum.MANIFEST_NODE.value)
if manifest_node: if manifest_node:
logger.info( logger.debug(
f"Processing DBT Test Case Definition for node: {manifest_node.name}" f"Processing DBT Test Case Definition for node: {manifest_node.name}"
) )
entity_link_list = self.generate_entity_link(dbt_test) entity_link_list = self.generate_entity_link(dbt_test)
@ -817,7 +817,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
# Process the Test Status # Process the Test Status
manifest_node = dbt_test.get(DbtCommonEnum.MANIFEST_NODE.value) manifest_node = dbt_test.get(DbtCommonEnum.MANIFEST_NODE.value)
if manifest_node: if manifest_node:
logger.info( logger.debug(
f"Processing DBT Test Case Results for node: {manifest_node.name}" f"Processing DBT Test Case Results for node: {manifest_node.name}"
) )
dbt_test_result = dbt_test.get(DbtCommonEnum.RESULTS.value) dbt_test_result = dbt_test.get(DbtCommonEnum.RESULTS.value)

View File

@ -193,7 +193,7 @@ def _(config: DbtCloudConfig): # pylint: disable=too-many-locals
f"/accounts/{account_id}/runs/{run_id}/artifacts/{DBT_CATALOG_FILE_NAME}" f"/accounts/{account_id}/runs/{run_id}/artifacts/{DBT_CATALOG_FILE_NAME}"
) )
except Exception as exc: except Exception as exc:
logger.info( logger.debug(
f"dbt catalog file not found, skipping the catalog file: {exc}" f"dbt catalog file not found, skipping the catalog file: {exc}"
) )
logger.debug(traceback.format_exc()) logger.debug(traceback.format_exc())
@ -207,7 +207,7 @@ def _(config: DbtCloudConfig): # pylint: disable=too-many-locals
f"/accounts/{account_id}/runs/{run_id}/artifacts/{DBT_RUN_RESULTS_FILE_NAME}" f"/accounts/{account_id}/runs/{run_id}/artifacts/{DBT_RUN_RESULTS_FILE_NAME}"
) )
except Exception as exc: except Exception as exc:
logger.info( logger.debug(
f"dbt run_results file not found, skipping dbt tests: {exc}" f"dbt run_results file not found, skipping dbt tests: {exc}"
) )
logger.debug(traceback.format_exc()) logger.debug(traceback.format_exc())

View File

@ -45,6 +45,7 @@ mock_dbt_config = {
}, },
"sink": {"type": "metadata-rest", "config": {}}, "sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": { "workflowConfig": {
"loggerLevel": "DEBUG",
"openMetadataServerConfig": { "openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api", "hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata", "authProvider": "openmetadata",
@ -56,7 +57,7 @@ mock_dbt_config = {
"r3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3u" "r3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3u"
"d-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" "d-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
}, },
} },
}, },
} }
@ -447,7 +448,7 @@ class DbtUnitTest(TestCase):
return dbt_files, dbt_objects return dbt_files, dbt_objects
def check_dbt_validate(self, dbt_files, expected_records): def check_dbt_validate(self, dbt_files, expected_records):
with self.assertLogs() as captured: with self.assertLogs(level="DEBUG") as captured:
self.dbt_source_obj.validate_dbt_files(dbt_files=dbt_files) self.dbt_source_obj.validate_dbt_files(dbt_files=dbt_files)
self.assertEqual(len(captured.records), expected_records) self.assertEqual(len(captured.records), expected_records)
for record in captured.records: for record in captured.records: