diff --git a/ingestion/src/metadata/ingestion/source/database/dbt/dbt_config.py b/ingestion/src/metadata/ingestion/source/database/dbt/dbt_config.py index 170dbe58e6e..08afbafba0a 100644 --- a/ingestion/src/metadata/ingestion/source/database/dbt/dbt_config.py +++ b/ingestion/src/metadata/ingestion/source/database/dbt/dbt_config.py @@ -183,7 +183,8 @@ def _(config: DbtCloudConfig): # pylint: disable=too-many-locals last_run = runs_data[0] run_id = last_run["id"] logger.info( - f"Retrieved last successful run [{run_id}] finished {last_run['finished_at_humanized']} (duration: {last_run['duration_humanized']})" + f"Retrieved last successful run [{str(run_id)}]: " + f"Finished {str(last_run['finished_at_humanized'])} (duration: {str(last_run['duration_humanized'])})" ) try: logger.debug("Requesting [dbt_catalog]") diff --git a/ingestion/src/metadata/ingestion/source/database/dbt/dbt_service.py b/ingestion/src/metadata/ingestion/source/database/dbt/dbt_service.py index 051427a2dc2..759fbd6808a 100644 --- a/ingestion/src/metadata/ingestion/source/database/dbt/dbt_service.py +++ b/ingestion/src/metadata/ingestion/source/database/dbt/dbt_service.py @@ -216,8 +216,7 @@ class DbtServiceSource(TopologyRunnerMixin, Source, ABC): """ Prepare the data models """ - for data_model_link in self.context.get().data_model_links: - yield data_model_link + yield from self.context.get().data_model_links @abstractmethod def create_dbt_lineage(self, data_model_link: DataModelLink) -> AddLineageRequest: diff --git a/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py b/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py index fb1b8b812a2..b133ef7e750 100644 --- a/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py @@ -13,7 +13,7 @@ DBT source methods. """ import traceback from datetime import datetime -from typing import Iterable, List, Optional, Union +from typing import Any, Iterable, List, Optional, Union from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest @@ -136,10 +136,9 @@ class DbtSource(DbtServiceSource): """ By default for DBT nothing is required to be prepared """ - pass def get_dbt_owner( - self, manifest_node: dict, catalog_node: Optional[dict] + self, manifest_node: Any, catalog_node: Optional[Any] ) -> Optional[EntityReference]: """ Returns dbt owner @@ -411,7 +410,6 @@ class DbtSource(DbtServiceSource): dbt_raw_query = get_dbt_raw_query(manifest_node) # Get the table entity from ES - # TODO: Change to get_by_name once the postgres case sensitive calls is fixed table_fqn = fqn.build( self.metadata, entity_type=Table, @@ -445,8 +443,9 @@ class DbtSource(DbtServiceSource): rawSql=SqlQuery(dbt_raw_query) if dbt_raw_query else None, - # SQL Is a required param for the DataModel - sql=SqlQuery(dbt_compiled_query or dbt_raw_query or ""), + sql=SqlQuery(dbt_compiled_query) + if dbt_compiled_query + else None, columns=self.parse_data_model_columns( manifest_node, catalog_node ), @@ -519,7 +518,6 @@ class DbtSource(DbtServiceSource): ) # check if the parent table exists in OM before adding it to the upstream list - # TODO: Change to get_by_name once the postgres case sensitive calls is fixed parent_table_entity: Optional[ Union[Table, List[Table]] ] = get_entity_from_es_result( @@ -539,7 +537,7 @@ class DbtSource(DbtServiceSource): return upstream_nodes def parse_data_model_columns( - self, manifest_node: dict, catalog_node: dict + self, manifest_node: Any, catalog_node: Any ) -> List[Column]: """ Method to parse the DBT columns @@ -549,7 +547,7 @@ class DbtSource(DbtServiceSource): for key, manifest_column in manifest_columns.items(): try: logger.debug(f"Processing DBT column: {key}") - # If catalog file is passed pass the column information from catalog file + # If catalog file is passed, pass the column information from catalog file catalog_column = None if catalog_node and catalog_node.columns: catalog_column = catalog_node.columns.get(key) @@ -665,44 +663,48 @@ class DbtSource(DbtServiceSource): """ Method to process DBT lineage from queries """ - to_entity: Table = data_model_link.table_entity - logger.debug( - f"Processing DBT Query lineage for: {to_entity.fullyQualifiedName.root}" - ) - - try: - source_elements = fqn.split(to_entity.fullyQualifiedName.root) - # remove service name from fqn to make it parseable in format db.schema.table - query_fqn = fqn._build( # pylint: disable=protected-access - *source_elements[-3:] + if data_model_link.datamodel.sql: + to_entity: Table = data_model_link.table_entity + logger.debug( + f"Processing DBT Query lineage for: {to_entity.fullyQualifiedName.root}" ) - query = f"create table {query_fqn} as {data_model_link.datamodel.sql.root}" - connection_type = str(self.config.serviceConnection.root.config.type.value) - dialect = ConnectionTypeDialectMapper.dialect_of(connection_type) - lineages = get_lineage_by_query( - self.metadata, - query=query, - service_name=source_elements[0], - database_name=source_elements[1], - schema_name=source_elements[2], - dialect=dialect, - timeout_seconds=self.source_config.parsingTimeoutLimit, - lineage_source=LineageSource.DbtLineage, - ) - for lineage_request in lineages or []: - yield lineage_request - except Exception as exc: # pylint: disable=broad-except - yield Either( - left=StackTraceError( - name=data_model_link.datamodel.sql.root, - error=( - f"Failed to parse the query {data_model_link.datamodel.sql.root}" - f" to capture lineage: {exc}" - ), - stackTrace=traceback.format_exc(), + try: + source_elements = fqn.split(to_entity.fullyQualifiedName.root) + # remove service name from fqn to make it parseable in format db.schema.table + query_fqn = fqn._build( # pylint: disable=protected-access + *source_elements[-3:] + ) + query = ( + f"create table {query_fqn} as {data_model_link.datamodel.sql.root}" + ) + connection_type = str( + self.config.serviceConnection.root.config.type.value + ) + dialect = ConnectionTypeDialectMapper.dialect_of(connection_type) + lineages = get_lineage_by_query( + self.metadata, + query=query, + service_name=source_elements[0], + database_name=source_elements[1], + schema_name=source_elements[2], + dialect=dialect, + timeout_seconds=self.source_config.parsingTimeoutLimit, + lineage_source=LineageSource.DbtLineage, + ) + yield from lineages or [] + + except Exception as exc: # pylint: disable=broad-except + yield Either( + left=StackTraceError( + name=data_model_link.datamodel.sql.root, + error=( + f"Failed to parse the query {data_model_link.datamodel.sql.root}" + f" to capture lineage: {exc}" + ), + stackTrace=traceback.format_exc(), + ) ) - ) def process_dbt_meta(self, manifest_meta): """ diff --git a/ingestion/src/metadata/utils/time_utils.py b/ingestion/src/metadata/utils/time_utils.py index d87f03a442d..55493e3d698 100644 --- a/ingestion/src/metadata/utils/time_utils.py +++ b/ingestion/src/metadata/utils/time_utils.py @@ -123,4 +123,4 @@ def convert_timestamp_to_milliseconds(timestamp: Union[int, float]) -> int: """ if len(str(round(timestamp))) == 13: return timestamp - return timestamp * 1000 + return round(timestamp * 1000) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java index df50aec27c4..04ab7a76682 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java @@ -612,7 +612,7 @@ public class TableRepository extends EntityRepository