Used ES instead of get_by_name (#8288)

Co-authored-by: Onkar Ravgan <onkarravgan@Onkars-MacBook-Pro.local>
This commit is contained in:
Onkar Ravgan 2022-10-20 18:16:39 +05:30 committed by GitHub
parent 36bd428616
commit 2a85ea4747
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -13,7 +13,7 @@ DBT source methods.
""" """
import traceback import traceback
from datetime import datetime from datetime import datetime
from typing import Dict, Iterable, List, Optional from typing import Dict, Iterable, List, Optional, Union
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest
@ -53,6 +53,7 @@ from metadata.generated.schema.type.tagLabel import (
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser
from metadata.utils import fqn from metadata.utils import fqn
from metadata.utils.elasticsearch import get_entity_from_es_result
from metadata.utils.logger import ingestion_logger from metadata.utils.logger import ingestion_logger
logger = ingestion_logger() logger = ingestion_logger()
@ -259,9 +260,12 @@ class DBTMixin:
""" """
logger.info("Processing DBT lineage and Descriptions") logger.info("Processing DBT lineage and Descriptions")
for data_model_name, data_model in self.data_models.items(): for data_model_name, data_model in self.data_models.items():
to_es_result = self.metadata.es_search_from_fqn(
to_entity: Table = self.metadata.get_by_name( entity_type=Table,
entity=Table, fqn=data_model_name fqn_search_string=data_model_name,
)
to_entity: Optional[Union[Table, List[Table]]] = get_entity_from_es_result(
entity_list=to_es_result, fetch_multiple_entities=False
) )
if to_entity: if to_entity:
try: try:
@ -292,8 +296,14 @@ class DBTMixin:
# Create Lineage from DBT # Create Lineage from DBT
for upstream_node in data_model.upstream: for upstream_node in data_model.upstream:
try: try:
from_entity: Table = self.metadata.get_by_name( from_es_result = self.metadata.es_search_from_fqn(
entity=Table, fqn=upstream_node entity_type=Table,
fqn_search_string=upstream_node,
)
from_entity: Optional[
Union[Table, List[Table]]
] = get_entity_from_es_result(
entity_list=from_es_result, fetch_multiple_entities=False
) )
if from_entity and to_entity: if from_entity and to_entity:
yield AddLineageRequest( yield AddLineageRequest(