Fix #19313 - dbt ingestion picks up wrong service to patch metadata (#19337)

* Fix #19313 - dbt ingestion picks up wrong service to patch metadata

* py format and linting

* Added a flag for searching across databases and docs

* py format
This commit is contained in:
Suman Maharana 2025-01-15 11:45:32 +05:30 committed by SumanMaharana
parent 9dbdb424d9
commit 6224b54900
6 changed files with 107 additions and 72 deletions

View File

@ -15,7 +15,7 @@ DBT source methods.
import traceback
from copy import deepcopy
from datetime import datetime
from typing import Any, Iterable, List, Optional, Union
from typing import Any, Iterable, List, Optional
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest
@ -63,6 +63,7 @@ from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.models.patch_request import PatchedEntity, PatchRequest
from metadata.ingestion.models.table_metadata import ColumnDescription
from metadata.ingestion.ometa.client import APIError
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser
from metadata.ingestion.source.database.database_service import DataModelLink
@ -98,7 +99,7 @@ from metadata.utils.elasticsearch import get_entity_from_es_result
from metadata.utils.entity_link import get_table_fqn
from metadata.utils.logger import ingestion_logger
from metadata.utils.tag_utils import get_ometa_tag_and_classification, get_tag_labels
from metadata.utils.time_utils import convert_timestamp_to_milliseconds
from metadata.utils.time_utils import datetime_to_timestamp
logger = ingestion_logger()
@ -326,11 +327,14 @@ class DbtSource(DbtServiceSource):
None,
)
def _add_dbt_freshness_test_from_sources(
def add_dbt_sources(
self, key: str, manifest_node, manifest_entities, dbt_objects: DbtObjects
):
# in dbt manifest sources node name is table/view name (not test name like with test nodes)
# so in order for the test creation to be named precisely I am amending manifest node name within it's deepcopy
) -> None:
"""
Method to append dbt test cases based on sources file for later processing
In dbt manifest sources node name is table/view name (not test name like with test nodes)
So in order for the test creation to be named precisely I am amending manifest node name within it's deepcopy
"""
manifest_node_new = deepcopy(manifest_node)
manifest_node_new.name = manifest_node_new.name + "_freshness"
@ -350,17 +354,57 @@ class DbtSource(DbtServiceSource):
DbtCommonEnum.RESULTS.value
] = freshness_test_result
def add_dbt_sources(
self, key: str, manifest_node, manifest_entities, dbt_objects: DbtObjects
) -> None:
"""
Method to append dbt test cases based on sources file for later processing
"""
self._add_dbt_freshness_test_from_sources(
key, manifest_node, manifest_entities, dbt_objects
)
def _get_table_entity(self, table_fqn) -> Optional[Table]:
def search_table(fqn_search_string: str) -> Optional[Table]:
table_entities = get_entity_from_es_result(
entity_list=self.metadata.es_search_from_fqn(
entity_type=Table,
fqn_search_string=fqn_search_string,
fields="sourceHash",
),
fetch_multiple_entities=True,
)
logger.debug(
f"Found table entities from {fqn_search_string}: {table_entities}"
)
return next(iter(filter(None, table_entities)), None)
# pylint: disable=too-many-locals, too-many-branches, too-many-statements
try:
table_entity = search_table(table_fqn)
if table_entity:
return table_entity
if self.source_config.searchAcrossDatabases:
logger.warning(
f"Table {table_fqn} not found under service: {self.config.serviceName}."
"Trying to find table across services"
)
_, database_name, schema_name, table_name = fqn.split(table_fqn)
table_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name="*",
database_name=database_name,
schema_name=schema_name,
table_name=table_name,
)
table_entity = search_table(table_fqn)
if table_entity:
return table_entity
logger.warning(
f"Unable to find the table '{table_fqn}' in OpenMetadata. "
"Please check if the table exists and is ingested in OpenMetadata. "
"Also, ensure the name, database, and schema of the manifest node"
"match the table present in OpenMetadata."
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Failed to get table entity from OpenMetadata: {exc}")
return None
# pylint: disable=too-many-locals, too-many-branches
def yield_data_models(
self, dbt_objects: DbtObjects
) -> Iterable[Either[DataModelLink]]:
@ -471,28 +515,19 @@ class DbtSource(DbtServiceSource):
dbt_compiled_query = get_dbt_compiled_query(manifest_node)
dbt_raw_query = get_dbt_raw_query(manifest_node)
# Get the table entity from ES
table_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name="*",
service_name=self.config.serviceName,
database_name=get_corrected_name(manifest_node.database),
schema_name=get_corrected_name(manifest_node.schema_),
table_name=model_name,
)
table_entity: Optional[
Union[Table, List[Table]]
] = get_entity_from_es_result(
entity_list=self.metadata.es_search_from_fqn(
entity_type=Table,
fqn_search_string=table_fqn,
fields="sourceHash",
),
fetch_multiple_entities=False,
)
if table_entity:
if table_entity := self._get_table_entity(table_fqn=table_fqn):
logger.debug(
f"Using Table Entity for datamodel: {table_entity}"
)
data_model_link = DataModelLink(
table_entity=table_entity,
datamodel=DataModel(
@ -523,13 +558,7 @@ class DbtSource(DbtServiceSource):
)
yield Either(right=data_model_link)
self.context.get().data_model_links.append(data_model_link)
else:
logger.warning(
f"Unable to find the table '{table_fqn}' in OpenMetadata"
"Please check if the table exists and is ingested in OpenMetadata"
"Also name, database, schema of the manifest node matches with the table present "
"in OpenMetadata"
)
except Exception as exc:
yield Either(
left=StackTraceError(
@ -573,22 +602,14 @@ class DbtSource(DbtServiceSource):
parent_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name="*",
service_name=self.config.serviceName,
database_name=get_corrected_name(parent_node.database),
schema_name=get_corrected_name(parent_node.schema_),
table_name=table_name,
)
# check if the parent table exists in OM before adding it to the upstream list
parent_table_entity: Optional[
Union[Table, List[Table]]
] = get_entity_from_es_result(
entity_list=self.metadata.es_search_from_fqn(
entity_type=Table, fqn_search_string=parent_fqn
),
fetch_multiple_entities=False,
)
if parent_table_entity:
if self._get_table_entity(table_fqn=parent_fqn):
upstream_nodes.append(parent_fqn)
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
@ -601,22 +622,14 @@ class DbtSource(DbtServiceSource):
parent_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name="*",
service_name=self.config.serviceName,
database_name=get_corrected_name(dbt_node.database),
schema_name=get_corrected_name(dbt_node.schema_),
table_name=dbt_node.name,
)
# check if the parent table exists in OM before adding it to the upstream list
parent_table_entity: Optional[
Union[Table, List[Table]]
] = get_entity_from_es_result(
entity_list=self.metadata.es_search_from_fqn(
entity_type=Table, fqn_search_string=parent_fqn
),
fetch_multiple_entities=False,
)
if parent_table_entity:
if self._get_table_entity(table_fqn=parent_fqn):
upstream_nodes.append(parent_fqn)
return upstream_nodes
@ -708,14 +721,8 @@ class DbtSource(DbtServiceSource):
for upstream_node in data_model_link.datamodel.upstream:
try:
from_es_result = self.metadata.es_search_from_fqn(
entity_type=Table,
fqn_search_string=upstream_node,
)
from_entity: Optional[
Union[Table, List[Table]]
] = get_entity_from_es_result(
entity_list=from_es_result, fetch_multiple_entities=False
from_entity: Optional[Table] = self._get_table_entity(
table_fqn=upstream_node
)
if from_entity and to_entity:
yield Either(
@ -989,6 +996,7 @@ class DbtSource(DbtServiceSource):
self.metadata, entity_link_str
)
table_fqn = get_table_fqn(entity_link_str)
logger.debug(f"Table fqn found: {table_fqn}")
source_elements = table_fqn.split(fqn.FQN_SEPARATOR)
test_case_fqn = fqn.build(
self.metadata,
@ -1024,6 +1032,7 @@ class DbtSource(DbtServiceSource):
owners=None,
)
)
logger.debug(f"Test case Already Exists: {test_case_fqn}")
except Exception as err: # pylint: disable=broad-except
yield Either(
left=StackTraceError(
@ -1084,9 +1093,7 @@ class DbtSource(DbtServiceSource):
# Create the test case result object
test_case_result = TestCaseResult(
timestamp=Timestamp(
convert_timestamp_to_milliseconds(dbt_timestamp.timestamp())
),
timestamp=Timestamp(datetime_to_timestamp(dbt_timestamp)),
testCaseStatus=test_case_status,
testResultValue=[
TestResultValue(
@ -1113,10 +1120,17 @@ class DbtSource(DbtServiceSource):
else None,
test_case_name=manifest_node.name,
)
self.metadata.add_test_case_results(
test_results=test_case_result,
test_case_fqn=test_case_fqn,
)
logger.debug(f"Adding test case results to {test_case_fqn} ")
try:
self.metadata.add_test_case_results(
test_results=test_case_result,
test_case_fqn=test_case_fqn,
)
except APIError as err:
if err.code != 409:
raise APIError(err) from err
except Exception as err: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.error(

View File

@ -11,6 +11,7 @@ slug: /main-concepts/metadata-standard/schemas/metadataingestion/dbtpipeline
- **`type`**: Pipeline type. Refer to *#/definitions/dbtConfigType*. Default: `DBT`.
- **`dbtConfigSource`**: Available sources to fetch DBT catalog and manifest files.
- **`searchAcrossDatabases`** *(boolean)*: Optional configuration to search across databases for tables or not. Default: `False`.
- **`dbtUpdateDescriptions`** *(boolean)*: Optional configuration to update the description from DBT or not. Default: `False`.
- **`dbtUpdateOwners`** *(boolean)*: Optional configuration to update the owner from DBT or not. Default: `False`.
- **`includeTags`** *(boolean)*: Optional configuration to toggle the tags ingestion. Default: `True`.

View File

@ -11,6 +11,7 @@ slug: /main-concepts/metadata-standard/schemas/metadataingestion/dbtpipeline
- **`type`**: Pipeline type. Refer to *#/definitions/dbtConfigType*. Default: `DBT`.
- **`dbtConfigSource`**: Available sources to fetch DBT catalog and manifest files.
- **`searchAcrossDatabases`** *(boolean)*: Optional configuration to search across databases for tables or not. Default: `False`.
- **`dbtUpdateDescriptions`** *(boolean)*: Optional configuration to update the description from DBT or not. Default: `False`.
- **`dbtUpdateOwners`** *(boolean)*: Optional configuration to update the owners from DBT or not. Default: `False`.
- **`includeTags`** *(boolean)*: Optional configuration to toggle the tags ingestion. Default: `True`.

View File

@ -43,6 +43,11 @@
}
]
},
"searchAcrossDatabases": {
"description": "Optional configuration to search across databases for tables or not",
"type": "boolean",
"default": false
},
"dbtUpdateDescriptions": {
"description": "Optional configuration to update the description from DBT or not",
"type": "boolean",

View File

@ -336,6 +336,16 @@ $$section
Set the `Enable Debug Log` toggle to set the logging level of the process to debug. You can check these logs in the Ingestion tab of the service and dig deeper into any errors you might find.
$$
$$section
### Search Tables Across Databases $(id="searchAcrossDatabases")
Option to search across database services for tables or not for processing dbt metadata ingestion.
If this option is enabled, OpenMetadata will first search for tables within the same database service if tables are not found it will search across all database services.
If the option is disabled, the search will be limited to the tables within the same database service.
$$
$$section
### Update Descriptions $(id="dbtUpdateDescriptions")

View File

@ -46,6 +46,10 @@ export interface DbtPipeline {
* Regex to only fetch tables or databases that matches the pattern.
*/
schemaFilterPattern?: FilterPattern;
/**
* Optional configuration to search across databases for tables or not
*/
searchAcrossDatabases?: boolean;
/**
* Regex exclude tables or databases that matches the pattern.
*/