mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-12-16 10:08:08 +00:00
Bug: fix flaky timeout test (#10509)
This commit is contained in:
parent
34bf175ca1
commit
b94927b3ea
@ -51,13 +51,6 @@ logger = ingestion_logger()
|
|||||||
LINEAGE_PARSING_TIMEOUT = 10
|
LINEAGE_PARSING_TIMEOUT = 10
|
||||||
|
|
||||||
|
|
||||||
@timeout(seconds=LINEAGE_PARSING_TIMEOUT)
|
|
||||||
def get_sqlfluff_lineage_runner(query: str, dialect: str) -> LineageRunner:
|
|
||||||
lr_sqlfluff = LineageRunner(query, dialect=dialect)
|
|
||||||
lr_sqlfluff.get_column_lineage()
|
|
||||||
return lr_sqlfluff
|
|
||||||
|
|
||||||
|
|
||||||
class LineageParser:
|
class LineageParser:
|
||||||
"""
|
"""
|
||||||
Class that acts like a wrapper for the LineageRunner library usage
|
Class that acts like a wrapper for the LineageRunner library usage
|
||||||
@ -67,10 +60,17 @@ class LineageParser:
|
|||||||
query: str
|
query: str
|
||||||
_clean_query: str
|
_clean_query: str
|
||||||
|
|
||||||
def __init__(self, query: str, dialect: Dialect = Dialect.ANSI):
|
def __init__(
|
||||||
|
self,
|
||||||
|
query: str,
|
||||||
|
dialect: Dialect = Dialect.ANSI,
|
||||||
|
timeout_seconds: int = LINEAGE_PARSING_TIMEOUT,
|
||||||
|
):
|
||||||
self.query = query
|
self.query = query
|
||||||
self._clean_query = self.clean_raw_query(query)
|
self._clean_query = self.clean_raw_query(query)
|
||||||
self.parser = self._evaluate_best_parser(self._clean_query, dialect=dialect)
|
self.parser = self._evaluate_best_parser(
|
||||||
|
self._clean_query, dialect=dialect, timeout_seconds=timeout_seconds
|
||||||
|
)
|
||||||
|
|
||||||
@cached_property
|
@cached_property
|
||||||
def involved_tables(self) -> Optional[List[Table]]:
|
def involved_tables(self) -> Optional[List[Table]]:
|
||||||
@ -367,8 +367,14 @@ class LineageParser:
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _evaluate_best_parser(
|
def _evaluate_best_parser(
|
||||||
query: str, dialect: Dialect = Dialect.ANSI
|
query: str, dialect: Dialect, timeout_seconds: int
|
||||||
) -> LineageRunner:
|
) -> LineageRunner:
|
||||||
|
@timeout(seconds=timeout_seconds)
|
||||||
|
def get_sqlfluff_lineage_runner(qry: str, dlct: str) -> LineageRunner:
|
||||||
|
lr_dialect = LineageRunner(qry, dialect=dlct)
|
||||||
|
lr_dialect.get_column_lineage()
|
||||||
|
return lr_dialect
|
||||||
|
|
||||||
sqlfluff_count = 0
|
sqlfluff_count = 0
|
||||||
try:
|
try:
|
||||||
lr_sqlfluff = get_sqlfluff_lineage_runner(query, dialect.value)
|
lr_sqlfluff = get_sqlfluff_lineage_runner(query, dialect.value)
|
||||||
@ -382,7 +388,7 @@ class LineageParser:
|
|||||||
except TimeoutError:
|
except TimeoutError:
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"Lineage with SqlFluff failed for the [{dialect.value}] query: [{query}]: "
|
f"Lineage with SqlFluff failed for the [{dialect.value}] query: [{query}]: "
|
||||||
f"Parser has been running for more than {LINEAGE_PARSING_TIMEOUT} seconds."
|
f"Parser has been running for more than {timeout_seconds} seconds."
|
||||||
)
|
)
|
||||||
lr_sqlfluff = None
|
lr_sqlfluff = None
|
||||||
except Exception:
|
except Exception:
|
||||||
|
|||||||
@ -18,7 +18,6 @@ import inspect
|
|||||||
import os
|
import os
|
||||||
import platform
|
import platform
|
||||||
import signal
|
import signal
|
||||||
import traceback
|
|
||||||
from typing import Callable
|
from typing import Callable
|
||||||
|
|
||||||
from metadata.utils.constants import TEN_MIN
|
from metadata.utils.constants import TEN_MIN
|
||||||
@ -31,7 +30,6 @@ def _handle_timeout(signum, frame):
|
|||||||
"""
|
"""
|
||||||
Handler for signal timeout
|
Handler for signal timeout
|
||||||
"""
|
"""
|
||||||
logger.debug(traceback.print_stack(frame))
|
|
||||||
raise TimeoutError(f"[SIGNUM {signum}] {os.strerror(errno.ETIME)}")
|
raise TimeoutError(f"[SIGNUM {signum}] {os.strerror(errno.ETIME)}")
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -164,13 +164,15 @@ class SqlLineageTest(TestCase):
|
|||||||
# When
|
# When
|
||||||
with self.assertLogs(Loggers.INGESTION.value, level="DEBUG") as logger:
|
with self.assertLogs(Loggers.INGESTION.value, level="DEBUG") as logger:
|
||||||
LineageParser(
|
LineageParser(
|
||||||
query.format(values="\n".join(values)), dialect=Dialect.SNOWFLAKE
|
query.format(values="\n".join(values)),
|
||||||
|
dialect=Dialect.SNOWFLAKE,
|
||||||
|
timeout_seconds=1,
|
||||||
)
|
)
|
||||||
# Then
|
# Then
|
||||||
self.assertTrue(
|
self.assertTrue(
|
||||||
any(
|
any(
|
||||||
"Parser has been running for more than 10 seconds." in log
|
"Parser has been running for more than 1 seconds." in log
|
||||||
for log in logger.output
|
for log in logger.output
|
||||||
),
|
),
|
||||||
"Parser finished before the 10 expected seconds!",
|
"Parser finished before the 1 expected seconds!",
|
||||||
)
|
)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user