diff --git a/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py b/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py index e74abafff91..d0d1542b2b2 100644 --- a/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py @@ -15,7 +15,7 @@ DBT source methods. import traceback from copy import deepcopy from datetime import datetime -from typing import Any, Iterable, List, Optional, Union +from typing import Any, Iterable, List, Optional from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest @@ -63,6 +63,7 @@ from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.models.patch_request import PatchedEntity, PatchRequest from metadata.ingestion.models.table_metadata import ColumnDescription +from metadata.ingestion.ometa.client import APIError from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser from metadata.ingestion.source.database.database_service import DataModelLink @@ -98,7 +99,7 @@ from metadata.utils.elasticsearch import get_entity_from_es_result from metadata.utils.entity_link import get_table_fqn from metadata.utils.logger import ingestion_logger from metadata.utils.tag_utils import get_ometa_tag_and_classification, get_tag_labels -from metadata.utils.time_utils import convert_timestamp_to_milliseconds +from metadata.utils.time_utils import datetime_to_timestamp logger = ingestion_logger() @@ -326,11 +327,14 @@ class DbtSource(DbtServiceSource): None, ) - def _add_dbt_freshness_test_from_sources( + def add_dbt_sources( self, key: str, manifest_node, manifest_entities, dbt_objects: DbtObjects - ): - # in dbt manifest sources node name is table/view name (not test name like with test nodes) - # so in order for the test creation to be named precisely I am amending manifest node name within it's deepcopy + ) -> None: + """ + Method to append dbt test cases based on sources file for later processing + In dbt manifest sources node name is table/view name (not test name like with test nodes) + So in order for the test creation to be named precisely I am amending manifest node name within it's deepcopy + """ manifest_node_new = deepcopy(manifest_node) manifest_node_new.name = manifest_node_new.name + "_freshness" @@ -350,17 +354,57 @@ class DbtSource(DbtServiceSource): DbtCommonEnum.RESULTS.value ] = freshness_test_result - def add_dbt_sources( - self, key: str, manifest_node, manifest_entities, dbt_objects: DbtObjects - ) -> None: - """ - Method to append dbt test cases based on sources file for later processing - """ - self._add_dbt_freshness_test_from_sources( - key, manifest_node, manifest_entities, dbt_objects - ) + def _get_table_entity(self, table_fqn) -> Optional[Table]: + def search_table(fqn_search_string: str) -> Optional[Table]: + table_entities = get_entity_from_es_result( + entity_list=self.metadata.es_search_from_fqn( + entity_type=Table, + fqn_search_string=fqn_search_string, + fields="sourceHash", + ), + fetch_multiple_entities=True, + ) + logger.debug( + f"Found table entities from {fqn_search_string}: {table_entities}" + ) + return next(iter(filter(None, table_entities)), None) - # pylint: disable=too-many-locals, too-many-branches, too-many-statements + try: + table_entity = search_table(table_fqn) + if table_entity: + return table_entity + + if self.source_config.searchAcrossDatabases: + logger.warning( + f"Table {table_fqn} not found under service: {self.config.serviceName}." + "Trying to find table across services" + ) + _, database_name, schema_name, table_name = fqn.split(table_fqn) + table_fqn = fqn.build( + self.metadata, + entity_type=Table, + service_name="*", + database_name=database_name, + schema_name=schema_name, + table_name=table_name, + ) + table_entity = search_table(table_fqn) + if table_entity: + return table_entity + + logger.warning( + f"Unable to find the table '{table_fqn}' in OpenMetadata. " + "Please check if the table exists and is ingested in OpenMetadata. " + "Also, ensure the name, database, and schema of the manifest node" + "match the table present in OpenMetadata." + ) + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning(f"Failed to get table entity from OpenMetadata: {exc}") + + return None + + # pylint: disable=too-many-locals, too-many-branches def yield_data_models( self, dbt_objects: DbtObjects ) -> Iterable[Either[DataModelLink]]: @@ -471,28 +515,19 @@ class DbtSource(DbtServiceSource): dbt_compiled_query = get_dbt_compiled_query(manifest_node) dbt_raw_query = get_dbt_raw_query(manifest_node) - # Get the table entity from ES table_fqn = fqn.build( self.metadata, entity_type=Table, - service_name="*", + service_name=self.config.serviceName, database_name=get_corrected_name(manifest_node.database), schema_name=get_corrected_name(manifest_node.schema_), table_name=model_name, ) - table_entity: Optional[ - Union[Table, List[Table]] - ] = get_entity_from_es_result( - entity_list=self.metadata.es_search_from_fqn( - entity_type=Table, - fqn_search_string=table_fqn, - fields="sourceHash", - ), - fetch_multiple_entities=False, - ) - - if table_entity: + if table_entity := self._get_table_entity(table_fqn=table_fqn): + logger.debug( + f"Using Table Entity for datamodel: {table_entity}" + ) data_model_link = DataModelLink( table_entity=table_entity, datamodel=DataModel( @@ -523,13 +558,7 @@ class DbtSource(DbtServiceSource): ) yield Either(right=data_model_link) self.context.get().data_model_links.append(data_model_link) - else: - logger.warning( - f"Unable to find the table '{table_fqn}' in OpenMetadata" - "Please check if the table exists and is ingested in OpenMetadata" - "Also name, database, schema of the manifest node matches with the table present " - "in OpenMetadata" - ) + except Exception as exc: yield Either( left=StackTraceError( @@ -573,22 +602,14 @@ class DbtSource(DbtServiceSource): parent_fqn = fqn.build( self.metadata, entity_type=Table, - service_name="*", + service_name=self.config.serviceName, database_name=get_corrected_name(parent_node.database), schema_name=get_corrected_name(parent_node.schema_), table_name=table_name, ) # check if the parent table exists in OM before adding it to the upstream list - parent_table_entity: Optional[ - Union[Table, List[Table]] - ] = get_entity_from_es_result( - entity_list=self.metadata.es_search_from_fqn( - entity_type=Table, fqn_search_string=parent_fqn - ), - fetch_multiple_entities=False, - ) - if parent_table_entity: + if self._get_table_entity(table_fqn=parent_fqn): upstream_nodes.append(parent_fqn) except Exception as exc: # pylint: disable=broad-except logger.debug(traceback.format_exc()) @@ -601,22 +622,14 @@ class DbtSource(DbtServiceSource): parent_fqn = fqn.build( self.metadata, entity_type=Table, - service_name="*", + service_name=self.config.serviceName, database_name=get_corrected_name(dbt_node.database), schema_name=get_corrected_name(dbt_node.schema_), table_name=dbt_node.name, ) # check if the parent table exists in OM before adding it to the upstream list - parent_table_entity: Optional[ - Union[Table, List[Table]] - ] = get_entity_from_es_result( - entity_list=self.metadata.es_search_from_fqn( - entity_type=Table, fqn_search_string=parent_fqn - ), - fetch_multiple_entities=False, - ) - if parent_table_entity: + if self._get_table_entity(table_fqn=parent_fqn): upstream_nodes.append(parent_fqn) return upstream_nodes @@ -708,14 +721,8 @@ class DbtSource(DbtServiceSource): for upstream_node in data_model_link.datamodel.upstream: try: - from_es_result = self.metadata.es_search_from_fqn( - entity_type=Table, - fqn_search_string=upstream_node, - ) - from_entity: Optional[ - Union[Table, List[Table]] - ] = get_entity_from_es_result( - entity_list=from_es_result, fetch_multiple_entities=False + from_entity: Optional[Table] = self._get_table_entity( + table_fqn=upstream_node ) if from_entity and to_entity: yield Either( @@ -989,6 +996,7 @@ class DbtSource(DbtServiceSource): self.metadata, entity_link_str ) table_fqn = get_table_fqn(entity_link_str) + logger.debug(f"Table fqn found: {table_fqn}") source_elements = table_fqn.split(fqn.FQN_SEPARATOR) test_case_fqn = fqn.build( self.metadata, @@ -1024,6 +1032,7 @@ class DbtSource(DbtServiceSource): owners=None, ) ) + logger.debug(f"Test case Already Exists: {test_case_fqn}") except Exception as err: # pylint: disable=broad-except yield Either( left=StackTraceError( @@ -1084,9 +1093,7 @@ class DbtSource(DbtServiceSource): # Create the test case result object test_case_result = TestCaseResult( - timestamp=Timestamp( - convert_timestamp_to_milliseconds(dbt_timestamp.timestamp()) - ), + timestamp=Timestamp(datetime_to_timestamp(dbt_timestamp)), testCaseStatus=test_case_status, testResultValue=[ TestResultValue( @@ -1113,10 +1120,17 @@ class DbtSource(DbtServiceSource): else None, test_case_name=manifest_node.name, ) - self.metadata.add_test_case_results( - test_results=test_case_result, - test_case_fqn=test_case_fqn, - ) + + logger.debug(f"Adding test case results to {test_case_fqn} ") + try: + self.metadata.add_test_case_results( + test_results=test_case_result, + test_case_fqn=test_case_fqn, + ) + except APIError as err: + if err.code != 409: + raise APIError(err) from err + except Exception as err: # pylint: disable=broad-except logger.debug(traceback.format_exc()) logger.error( diff --git a/openmetadata-docs/content/v1.4.x/main-concepts/metadata-standard/schemas/metadataIngestion/dbtPipeline.md b/openmetadata-docs/content/v1.4.x/main-concepts/metadata-standard/schemas/metadataIngestion/dbtPipeline.md index 0b27c51b5dd..ab655037461 100644 --- a/openmetadata-docs/content/v1.4.x/main-concepts/metadata-standard/schemas/metadataIngestion/dbtPipeline.md +++ b/openmetadata-docs/content/v1.4.x/main-concepts/metadata-standard/schemas/metadataIngestion/dbtPipeline.md @@ -11,6 +11,7 @@ slug: /main-concepts/metadata-standard/schemas/metadataingestion/dbtpipeline - **`type`**: Pipeline type. Refer to *#/definitions/dbtConfigType*. Default: `DBT`. - **`dbtConfigSource`**: Available sources to fetch DBT catalog and manifest files. +- **`searchAcrossDatabases`** *(boolean)*: Optional configuration to search across databases for tables or not. Default: `False`. - **`dbtUpdateDescriptions`** *(boolean)*: Optional configuration to update the description from DBT or not. Default: `False`. - **`dbtUpdateOwners`** *(boolean)*: Optional configuration to update the owner from DBT or not. Default: `False`. - **`includeTags`** *(boolean)*: Optional configuration to toggle the tags ingestion. Default: `True`. diff --git a/openmetadata-docs/content/v1.6.x-SNAPSHOT/main-concepts/metadata-standard/schemas/metadataIngestion/dbtPipeline.md b/openmetadata-docs/content/v1.6.x-SNAPSHOT/main-concepts/metadata-standard/schemas/metadataIngestion/dbtPipeline.md index fff8d454a04..d98baf90b00 100644 --- a/openmetadata-docs/content/v1.6.x-SNAPSHOT/main-concepts/metadata-standard/schemas/metadataIngestion/dbtPipeline.md +++ b/openmetadata-docs/content/v1.6.x-SNAPSHOT/main-concepts/metadata-standard/schemas/metadataIngestion/dbtPipeline.md @@ -11,6 +11,7 @@ slug: /main-concepts/metadata-standard/schemas/metadataingestion/dbtpipeline - **`type`**: Pipeline type. Refer to *#/definitions/dbtConfigType*. Default: `DBT`. - **`dbtConfigSource`**: Available sources to fetch DBT catalog and manifest files. +- **`searchAcrossDatabases`** *(boolean)*: Optional configuration to search across databases for tables or not. Default: `False`. - **`dbtUpdateDescriptions`** *(boolean)*: Optional configuration to update the description from DBT or not. Default: `False`. - **`dbtUpdateOwners`** *(boolean)*: Optional configuration to update the owners from DBT or not. Default: `False`. - **`includeTags`** *(boolean)*: Optional configuration to toggle the tags ingestion. Default: `True`. diff --git a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/dbtPipeline.json b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/dbtPipeline.json index 47ee12e8440..55b9a9c56c7 100644 --- a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/dbtPipeline.json +++ b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/dbtPipeline.json @@ -43,6 +43,11 @@ } ] }, + "searchAcrossDatabases": { + "description": "Optional configuration to search across databases for tables or not", + "type": "boolean", + "default": false + }, "dbtUpdateDescriptions": { "description": "Optional configuration to update the description from DBT or not", "type": "boolean", diff --git a/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Database/workflows/dbt.md b/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Database/workflows/dbt.md index 0185b6dab7c..c03a86d9049 100644 --- a/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Database/workflows/dbt.md +++ b/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Database/workflows/dbt.md @@ -336,6 +336,16 @@ $$section Set the `Enable Debug Log` toggle to set the logging level of the process to debug. You can check these logs in the Ingestion tab of the service and dig deeper into any errors you might find. $$ + +$$section +### Search Tables Across Databases $(id="searchAcrossDatabases") + +Option to search across database services for tables or not for processing dbt metadata ingestion. +If this option is enabled, OpenMetadata will first search for tables within the same database service if tables are not found it will search across all database services. + +If the option is disabled, the search will be limited to the tables within the same database service. +$$ + $$section ### Update Descriptions $(id="dbtUpdateDescriptions") diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/dbtPipeline.ts b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/dbtPipeline.ts index 9c2a4144a2a..eca19940b8c 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/dbtPipeline.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/dbtPipeline.ts @@ -46,6 +46,10 @@ export interface DbtPipeline { * Regex to only fetch tables or databases that matches the pattern. */ schemaFilterPattern?: FilterPattern; + /** + * Optional configuration to search across databases for tables or not + */ + searchAcrossDatabases?: boolean; /** * Regex exclude tables or databases that matches the pattern. */