mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-09-04 14:43:11 +00:00
(cherry picked from commit 10e2e4afd47add74c1b15854030438289589fa09)
This commit is contained in:
parent
93c3b08fc7
commit
83f21e7407
@ -127,4 +127,4 @@ def mask_query(
|
|||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
logger.debug(f"Failed to mask query with sqlfluff: {exc}")
|
logger.debug(f"Failed to mask query with sqlfluff: {exc}")
|
||||||
logger.debug(traceback.format_exc())
|
logger.debug(traceback.format_exc())
|
||||||
return query
|
return None
|
||||||
|
@ -338,7 +338,7 @@ class LineageParser:
|
|||||||
logger.debug(
|
logger.debug(
|
||||||
f"Can't extract table names when parsing JOIN information from {comparison}"
|
f"Can't extract table names when parsing JOIN information from {comparison}"
|
||||||
)
|
)
|
||||||
logger.debug(f"Query: {self.masked_query}")
|
logger.debug(f"Query: {self.masked_query or self.query}")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
left_table_column = TableColumn(table=table_left, column=column_left)
|
left_table_column = TableColumn(table=table_left, column=column_left)
|
||||||
@ -463,7 +463,7 @@ class LineageParser:
|
|||||||
|
|
||||||
self.masked_query = mask_query(self._clean_query, parser=lr_sqlparser)
|
self.masked_query = mask_query(self._clean_query, parser=lr_sqlparser)
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"Using sqlparse for lineage parsing for query: {self.masked_query}"
|
f"Using sqlparse for lineage parsing for query: {self.masked_query or self.query}"
|
||||||
)
|
)
|
||||||
return lr_sqlparser
|
return lr_sqlparser
|
||||||
|
|
||||||
|
@ -625,8 +625,8 @@ def get_lineage_by_query(
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
lineage_parser = LineageParser(query, dialect, timeout_seconds=timeout_seconds)
|
lineage_parser = LineageParser(query, dialect, timeout_seconds=timeout_seconds)
|
||||||
masked_query = lineage_parser.masked_query or query
|
masked_query = lineage_parser.masked_query
|
||||||
logger.debug(f"Running lineage with query: {masked_query}")
|
logger.debug(f"Running lineage with query: {masked_query or query}")
|
||||||
|
|
||||||
raw_column_lineage = lineage_parser.column_lineage
|
raw_column_lineage = lineage_parser.column_lineage
|
||||||
column_lineage.update(populate_column_lineage_map(raw_column_lineage))
|
column_lineage.update(populate_column_lineage_map(raw_column_lineage))
|
||||||
@ -697,7 +697,7 @@ def get_lineage_by_query(
|
|||||||
if not lineage_parser.query_parsing_success:
|
if not lineage_parser.query_parsing_success:
|
||||||
query_parsing_failures.add(
|
query_parsing_failures.add(
|
||||||
QueryParsingError(
|
QueryParsingError(
|
||||||
query=masked_query,
|
query=masked_query or query,
|
||||||
error=lineage_parser.query_parsing_failure_reason,
|
error=lineage_parser.query_parsing_failure_reason,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
@ -729,8 +729,10 @@ def get_lineage_via_table_entity(
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
lineage_parser = LineageParser(query, dialect, timeout_seconds=timeout_seconds)
|
lineage_parser = LineageParser(query, dialect, timeout_seconds=timeout_seconds)
|
||||||
masked_query = lineage_parser.masked_query or query
|
masked_query = lineage_parser.masked_query
|
||||||
logger.debug(f"Getting lineage via table entity using query: {masked_query}")
|
logger.debug(
|
||||||
|
f"Getting lineage via table entity using query: {masked_query or query}"
|
||||||
|
)
|
||||||
to_table_name = table_entity.name.root
|
to_table_name = table_entity.name.root
|
||||||
|
|
||||||
for from_table_name in lineage_parser.source_tables:
|
for from_table_name in lineage_parser.source_tables:
|
||||||
|
@ -42,6 +42,8 @@ class OMetaQueryMixin:
|
|||||||
return str(result.hexdigest())
|
return str(result.hexdigest())
|
||||||
|
|
||||||
def _get_or_create_query(self, query: CreateQueryRequest) -> Optional[Query]:
|
def _get_or_create_query(self, query: CreateQueryRequest) -> Optional[Query]:
|
||||||
|
if query.query.root is None:
|
||||||
|
return None
|
||||||
query_hash = self._get_query_hash(query=query.query.root)
|
query_hash = self._get_query_hash(query=query.query.root)
|
||||||
query_entity = self.get_by_name(entity=Query, fqn=query_hash)
|
query_entity = self.get_by_name(entity=Query, fqn=query_hash)
|
||||||
if query_entity is None:
|
if query_entity is None:
|
||||||
|
@ -153,7 +153,7 @@ class UsageSource(QueryParserSource, ABC):
|
|||||||
if query:
|
if query:
|
||||||
logger.debug(
|
logger.debug(
|
||||||
(
|
(
|
||||||
f"###### USAGE QUERY #######\n{mask_query(query, self.dialect.value)}"
|
f"###### USAGE QUERY #######\n{mask_query(query, self.dialect.value) or query}"
|
||||||
"\n##########################"
|
"\n##########################"
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user