feat(ingest/powerbi): improve reporting around m-query parser (#11763)

This commit is contained in:
Harshal Sheth 2024-10-31 16:27:45 -07:00 committed by GitHub
parent 4634bbcae0
commit e609ff810d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 79 additions and 19 deletions

View File

@ -278,6 +278,9 @@ s3_base = {
threading_timeout_common = { threading_timeout_common = {
"stopit==1.1.2", "stopit==1.1.2",
# stopit uses pkg_resources internally, which means there's an implied
# dependency on setuptools.
"setuptools",
} }
abs_base = { abs_base = {

View File

@ -20,6 +20,7 @@ from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase, StatefulIngestionConfigBase,
) )
from datahub.utilities.lossy_collections import LossyList from datahub.utilities.lossy_collections import LossyList
from datahub.utilities.perf_timer import PerfTimer
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -190,6 +191,15 @@ class PowerBiDashboardSourceReport(StaleEntityRemovalSourceReport):
filtered_dashboards: List[str] = dataclass_field(default_factory=list) filtered_dashboards: List[str] = dataclass_field(default_factory=list)
filtered_charts: List[str] = dataclass_field(default_factory=list) filtered_charts: List[str] = dataclass_field(default_factory=list)
m_query_parse_timer: PerfTimer = dataclass_field(default_factory=PerfTimer)
m_query_parse_attempts: int = 0
m_query_parse_successes: int = 0
m_query_parse_timeouts: int = 0
m_query_parse_validation_errors: int = 0
m_query_parse_unexpected_character_errors: int = 0
m_query_parse_unknown_errors: int = 0
m_query_resolver_errors: int = 0
def report_dashboards_scanned(self, count: int = 1) -> None: def report_dashboards_scanned(self, count: int = 1) -> None:
self.dashboards_scanned += count self.dashboards_scanned += count

View File

@ -74,7 +74,9 @@ def get_upstream_tables(
) )
try: try:
parse_tree: Tree = _parse_expression(table.expression) with reporter.m_query_parse_timer:
reporter.m_query_parse_attempts += 1
parse_tree: Tree = _parse_expression(table.expression)
valid, message = validator.validate_parse_tree( valid, message = validator.validate_parse_tree(
parse_tree, native_query_enabled=config.native_query_parsing parse_tree, native_query_enabled=config.native_query_parsing
@ -87,10 +89,12 @@ def get_upstream_tables(
message="DataAccess function is not present in M-Query expression", message="DataAccess function is not present in M-Query expression",
context=f"table-full-name={table.full_name}, expression={table.expression}, message={message}", context=f"table-full-name={table.full_name}, expression={table.expression}, message={message}",
) )
reporter.m_query_parse_validation_errors += 1
return [] return []
except KeyboardInterrupt: except KeyboardInterrupt:
raise raise
except TimeoutException: except TimeoutException:
reporter.m_query_parse_timeouts += 1
reporter.warning( reporter.warning(
title="M-Query Parsing Timeout", title="M-Query Parsing Timeout",
message=f"M-Query parsing timed out after {_M_QUERY_PARSE_TIMEOUT} seconds. Lineage for this table will not be extracted.", message=f"M-Query parsing timed out after {_M_QUERY_PARSE_TIMEOUT} seconds. Lineage for this table will not be extracted.",
@ -102,8 +106,10 @@ def get_upstream_tables(
) as e: # TODO: Debug why BaseException is needed here and below. ) as e: # TODO: Debug why BaseException is needed here and below.
if isinstance(e, lark.exceptions.UnexpectedCharacters): if isinstance(e, lark.exceptions.UnexpectedCharacters):
error_type = "Unexpected Character Error" error_type = "Unexpected Character Error"
reporter.m_query_parse_unexpected_character_errors += 1
else: else:
error_type = "Unknown Parsing Error" error_type = "Unknown Parsing Error"
reporter.m_query_parse_unknown_errors += 1
reporter.warning( reporter.warning(
title="Unable to extract lineage from M-Query expression", title="Unable to extract lineage from M-Query expression",
@ -112,10 +118,10 @@ def get_upstream_tables(
exc=e, exc=e,
) )
return [] return []
reporter.m_query_parse_successes += 1
lineage: List[resolver.Lineage] = []
try: try:
lineage = resolver.MQueryResolver( lineage: List[resolver.Lineage] = resolver.MQueryResolver(
table=table, table=table,
parse_tree=parse_tree, parse_tree=parse_tree,
reporter=reporter, reporter=reporter,
@ -126,14 +132,14 @@ def get_upstream_tables(
platform_instance_resolver=platform_instance_resolver, platform_instance_resolver=platform_instance_resolver,
) )
return lineage
except BaseException as e: except BaseException as e:
reporter.m_query_resolver_errors += 1
reporter.warning( reporter.warning(
title="Unknown M-Query Pattern", title="Unknown M-Query Pattern",
message="Encountered a unknown M-Query Expression", message="Encountered a unknown M-Query Expression",
context=f"table-full-name={table.full_name}, expression={table.expression}, message={e}", context=f"table-full-name={table.full_name}, expression={table.expression}, message={e}",
exc=e, exc=e,
) )
return []
logger.debug(f"Stack trace for {table.full_name}:", exc_info=e)
return lineage

View File

@ -356,8 +356,9 @@ class MQueryResolver(AbstractDataAccessMQueryResolver, ABC):
) )
if arg_list is None: if arg_list is None:
self.reporter.report_warning( self.reporter.report_warning(
f"{self.table.full_name}-arg-list", title="M-Query Resolver Error",
f"Argument list not found for data-access-function {data_access_func}", message="Unable to extract lineage from parsed M-Query expression (missing argument list)",
context=f"{self.table.full_name}: argument list not found for data-access-function {data_access_func}",
) )
return None return None
@ -377,8 +378,9 @@ class MQueryResolver(AbstractDataAccessMQueryResolver, ABC):
f"Function invocation without argument in expression = {invoke_expression.pretty()}" f"Function invocation without argument in expression = {invoke_expression.pretty()}"
) )
self.reporter.report_warning( self.reporter.report_warning(
f"{self.table.full_name}-variable-statement", title="M-Query Resolver Error",
"Function invocation without argument", message="Unable to extract lineage from parsed M-Query expression (function invocation without argument)",
context=f"{self.table.full_name}: function invocation without argument",
) )
return None return None
@ -403,8 +405,9 @@ class MQueryResolver(AbstractDataAccessMQueryResolver, ABC):
f"Either list_expression or type_expression is not found = {invoke_expression.pretty()}" f"Either list_expression or type_expression is not found = {invoke_expression.pretty()}"
) )
self.reporter.report_warning( self.reporter.report_warning(
f"{self.table.full_name}-variable-statement", title="M-Query Resolver Error",
"Function argument expression is not supported", message="Unable to extract lineage from parsed M-Query expression (function argument expression is not supported)",
context=f"{self.table.full_name}: function argument expression is not supported",
) )
return None return None

View File

@ -90,7 +90,9 @@ class SchemaResolver(Closeable, SchemaResolverInterface):
)[0][0] )[0][0]
) )
def get_urn_for_table(self, table: _TableName, lower: bool = False) -> str: def get_urn_for_table(
self, table: _TableName, lower: bool = False, mixed: bool = False
) -> str:
# TODO: Validate that this is the correct 2/3 layer hierarchy for the platform. # TODO: Validate that this is the correct 2/3 layer hierarchy for the platform.
table_name = ".".join( table_name = ".".join(
@ -101,7 +103,10 @@ class SchemaResolver(Closeable, SchemaResolverInterface):
if lower: if lower:
table_name = table_name.lower() table_name = table_name.lower()
platform_instance = platform_instance.lower() if platform_instance else None if not mixed:
platform_instance = (
platform_instance.lower() if platform_instance else None
)
if self.platform == "bigquery": if self.platform == "bigquery":
# Normalize shard numbers and other BigQuery weirdness. # Normalize shard numbers and other BigQuery weirdness.
@ -131,6 +136,20 @@ class SchemaResolver(Closeable, SchemaResolverInterface):
if schema_info: if schema_info:
return urn_lower, schema_info return urn_lower, schema_info
# Our treatment of platform instances when lowercasing urns
# is inconsistent. In some places (e.g. Snowflake), we lowercase
# the table names but not the platform instance. In other places
# (e.g. Databricks), we lowercase everything because it happens
# via the automatic lowercasing helper.
# See https://github.com/datahub-project/datahub/pull/8928.
# While we have this sort of inconsistency, we should also
# check the mixed case urn, as a last resort.
urn_mixed = self.get_urn_for_table(table, lower=True, mixed=True)
if urn_mixed not in {urn, urn_lower}:
schema_info = self._resolve_schema_info(urn_mixed)
if schema_info:
return urn_mixed, schema_info
if self._prefers_urn_lower(): if self._prefers_urn_lower():
return urn_lower, None return urn_lower, None
else: else:

View File

@ -294,6 +294,9 @@ class BatchPartitionExecutor(Closeable):
def _handle_batch_completion( def _handle_batch_completion(
batch: List[_BatchPartitionWorkItem], future: Future batch: List[_BatchPartitionWorkItem], future: Future
) -> None: ) -> None:
nonlocal workers_available
workers_available += 1
with clearinghouse_state_lock: with clearinghouse_state_lock:
for item in batch: for item in batch:
keys_no_longer_in_flight.add(item.key) keys_no_longer_in_flight.add(item.key)

View File

@ -9,7 +9,6 @@ logger: logging.Logger = logging.getLogger(__name__)
class PerfTimer(AbstractContextManager): class PerfTimer(AbstractContextManager):
""" """
A context manager that gives easy access to elapsed time for performance measurement. A context manager that gives easy access to elapsed time for performance measurement.
""" """
def __init__(self) -> None: def __init__(self) -> None:

View File

@ -41,6 +41,11 @@ def test_get_urn_for_table_lowercase():
== "urn:li:dataset:(urn:li:dataPlatform:mssql,uppercased-instance.database.dataset.table,PROD)" == "urn:li:dataset:(urn:li:dataPlatform:mssql,uppercased-instance.database.dataset.table,PROD)"
) )
assert (
schema_resolver.get_urn_for_table(table=table, lower=True, mixed=True)
== "urn:li:dataset:(urn:li:dataPlatform:mssql,Uppercased-Instance.database.dataset.table,PROD)"
)
def test_get_urn_for_table_not_lower_should_keep_capital_letters(): def test_get_urn_for_table_not_lower_should_keep_capital_letters():
schema_resolver = SchemaResolver( schema_resolver = SchemaResolver(

View File

@ -8,7 +8,7 @@ from datahub.utilities.perf_timer import PerfTimer
approx = partial(pytest.approx, rel=2e-2) approx = partial(pytest.approx, rel=2e-2)
def test_perf_timer_simple(): def test_perf_timer_simple() -> None:
with PerfTimer() as timer: with PerfTimer() as timer:
time.sleep(0.4) time.sleep(0.4)
assert approx(timer.elapsed_seconds()) == 0.4 assert approx(timer.elapsed_seconds()) == 0.4
@ -16,7 +16,7 @@ def test_perf_timer_simple():
assert approx(timer.elapsed_seconds()) == 0.4 assert approx(timer.elapsed_seconds()) == 0.4
def test_perf_timer_paused_timer(): def test_perf_timer_paused_timer() -> None:
with PerfTimer() as current_timer: with PerfTimer() as current_timer:
time.sleep(0.5) time.sleep(0.5)
assert approx(current_timer.elapsed_seconds()) == 0.5 assert approx(current_timer.elapsed_seconds()) == 0.5
@ -29,7 +29,7 @@ def test_perf_timer_paused_timer():
assert approx(current_timer.elapsed_seconds()) == 0.7 assert approx(current_timer.elapsed_seconds()) == 0.7
def test_generator_with_paused_timer(): def test_generator_with_paused_timer() -> None:
n = 4 n = 4
def generator_function(): def generator_function():
@ -46,3 +46,15 @@ def test_generator_with_paused_timer():
seq = generator_function() seq = generator_function()
list([i for i in seq]) list([i for i in seq])
assert approx(outer_timer.elapsed_seconds()) == 1 + 0.2 * n + 0.2 * n assert approx(outer_timer.elapsed_seconds()) == 1 + 0.2 * n + 0.2 * n
def test_perf_timer_reuse() -> None:
timer = PerfTimer()
with timer:
time.sleep(0.2)
with timer:
time.sleep(0.3)
assert approx(timer.elapsed_seconds()) == 0.5