Use ES call to get table entity (#9273)

* Use ES call to get table entity

* Address PR comments
This commit is contained in:
Nahuel 2022-12-13 19:32:48 +01:00 committed by GitHub
parent 288137d16b
commit 0eb49c6100
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 67 additions and 49 deletions

View File

@ -69,10 +69,8 @@ class YamlConfigurationMechanism(ConfigurationMechanism):
try:
config = yaml.safe_load(config_fp)
return config
except yaml.error.YAMLError:
raise ConfigurationError(
f"YAML Configuration file [{config_fp}] is not a valid YAML"
)
except yaml.error.YAMLError as exc:
raise ConfigurationError(f"YAML Configuration file is not valid \n {exc}")
class JsonConfigurationMechanism(ConfigurationMechanism):
@ -84,10 +82,8 @@ class JsonConfigurationMechanism(ConfigurationMechanism):
try:
config = json.load(config_fp)
return config
except json.decoder.JSONDecodeError:
raise ConfigurationError(
f"JSON Configuration file [{config_fp}] is not a valid JSON"
)
except json.decoder.JSONDecodeError as exc:
raise ConfigurationError(f"JSON Configuration file is not valid \n {exc}")
def load_config_file(config_file: pathlib.Path) -> dict:

View File

@ -25,6 +25,7 @@ from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.lineage.parser import LineageParser
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils import fqn
from metadata.utils.fqn import build_es_fqn_search_string
from metadata.utils.logger import utils_logger
from metadata.utils.lru_cache import LRUCache
@ -64,20 +65,33 @@ def search_table_entities(
if search_tuple in search_cache:
return search_cache.get(search_tuple)
try:
table_fqns = fqn.build(
metadata,
entity_type=Table,
service_name=service_name,
database_name=database,
schema_name=database_schema,
table_name=table,
fetch_multiple_entities=True,
)
table_entities: Optional[List[Table]] = []
for table_fqn in table_fqns or []:
table_entity: Table = metadata.get_by_name(Table, fqn=table_fqn)
if table_entity:
table_entities.append(table_entity)
# search on ES first
fqn_search_string = build_es_fqn_search_string(
database, database_schema, service_name, table
)
es_result_entities = metadata.es_search_from_fqn(
entity_type=Table,
fqn_search_string=fqn_search_string,
)
if es_result_entities:
table_entities = es_result_entities
else:
# build fqns without searching on ES
table_fqns = fqn.build(
metadata,
entity_type=Table,
service_name=service_name,
database_name=database,
schema_name=database_schema,
table_name=table,
fetch_multiple_entities=True,
skip_es_search=True,
)
for table_fqn in table_fqns or []:
table_entity: Table = metadata.get_by_name(Table, fqn=table_fqn)
if table_entity:
table_entities.append(table_entity)
search_cache.put(search_tuple, table_entities)
return table_entities
except Exception as exc:
@ -124,15 +138,6 @@ def get_table_entities_from_query(
database=database_name,
database_schema=database_schema,
table=table,
) or (
table
and search_table_entities(
metadata=metadata,
service_name=service_name,
database=database_name,
database_schema=database_schema,
table=table.upper(),
)
)
if table_entities:
@ -144,15 +149,6 @@ def get_table_entities_from_query(
database=database_query,
database_schema=schema_query,
table=table,
) or (
table
and search_table_entities(
metadata=metadata,
service_name=service_name,
database=database_query,
database_schema=schema_query,
table=table.upper(),
)
)
if table_entities:

View File

@ -132,6 +132,7 @@ def _(
schema_name: Optional[str],
table_name: str,
fetch_multiple_entities: bool = False,
skip_es_search: bool = False,
) -> Union[Optional[str], Optional[List[str]]]:
"""
Building logic for tables
@ -142,19 +143,19 @@ def _(
:param table_name: Table name
:return:
"""
if not service_name or not table_name:
raise FQNBuildingException(
f"Service Name and Table Name should be informed, but got service=`{service_name}`, table=`{table_name}`"
fqn_search_string = build_es_fqn_search_string(
database_name, schema_name, service_name, table_name
)
es_result = (
metadata.es_search_from_fqn(
entity_type=Table,
fqn_search_string=fqn_search_string,
)
fqn_search_string = _build(
service_name, database_name or "*", schema_name or "*", table_name
if not skip_es_search
else None
)
es_result = metadata.es_search_from_fqn(
entity_type=Table,
fqn_search_string=fqn_search_string,
)
entity: Optional[Union[Table, List[Table]]] = get_entity_from_es_result(
entity_list=es_result, fetch_multiple_entities=fetch_multiple_entities
)
@ -450,3 +451,28 @@ def split_test_case_fqn(test_case_fqn: str) -> Dict[str, Optional[str]]:
) = details
return SplitTestCaseFqn(service, database, schema, table, column, test_case)
def build_es_fqn_search_string(
database_name: str, schema_name, service_name, table_name
) -> str:
"""
Builds FQN search string for ElasticSearch
Args:
service_name: service name to filter
database_name: DB name or None
schema_name: schema name or None
table_name: table name
Returns:
FQN search string
"""
if not service_name or not table_name:
raise FQNBuildingException(
f"Service Name and Table Name should be informed, but got service=`{service_name}`, table=`{table_name}`"
)
fqn_search_string = _build(
service_name, database_name or "*", schema_name or "*", table_name
)
return fqn_search_string