feat(ingest/powerbi): add timeouts for m-query parsing (#11753)

This commit is contained in:
Harshal Sheth 2024-10-30 11:40:45 -07:00 committed by GitHub
parent 799c452056
commit 143fc011fa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 201 additions and 56 deletions

View File

@ -276,6 +276,10 @@ s3_base = {
*path_spec_common,
}
threading_timeout_common = {
"stopit==1.1.2",
}
abs_base = {
"azure-core==1.29.4",
"azure-identity>=1.17.1",
@ -492,7 +496,14 @@ plugins: Dict[str, Set[str]] = {
"trino": sql_common | trino,
"starburst-trino-usage": sql_common | usage_common | trino,
"nifi": {"requests", "packaging", "requests-gssapi"},
"powerbi": microsoft_common | {"lark[regex]==1.1.4", "sqlparse"} | sqlglot_lib,
"powerbi": (
(
microsoft_common
| {"lark[regex]==1.1.4", "sqlparse", "more-itertools"}
| sqlglot_lib
| threading_timeout_common
)
),
"powerbi-report-server": powerbi_report_server,
"vertica": sql_common | {"vertica-sqlalchemy-dialect[vertica-python]==0.0.8.2"},
"unity-catalog": databricks | sql_common | sqllineage_lib,

View File

@ -175,7 +175,7 @@ class BigQuerySchemaApi:
def get_query_result(self, query: str) -> RowIterator:
def _should_retry(exc: BaseException) -> bool:
logger.debug(f"Exception occured for job query. Reason: {exc}")
logger.debug(f"Exception occurred for job query. Reason: {exc}")
# Jobs sometimes fail with transient errors.
# This is not currently handled by the python-bigquery client.
# https://github.com/googleapis/python-bigquery/issues/23
@ -197,7 +197,7 @@ class BigQuerySchemaApi:
def get_projects(self, max_results_per_page: int = 100) -> List[BigqueryProject]:
def _should_retry(exc: BaseException) -> bool:
logger.debug(
f"Exception occured for project.list api. Reason: {exc}. Retrying api request..."
f"Exception occurred for project.list api. Reason: {exc}. Retrying api request..."
)
self.report.num_list_projects_retry_request += 1
return True

View File

@ -19,6 +19,7 @@ from datahub.ingestion.source.state.stale_entity_removal_handler import (
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
)
from datahub.utilities.lossy_collections import LossyList
logger = logging.getLogger(__name__)
@ -176,11 +177,18 @@ class SupportedDataPlatform(Enum):
@dataclass
class PowerBiDashboardSourceReport(StaleEntityRemovalSourceReport):
all_workspace_count: int = 0
filtered_workspace_names: LossyList[str] = dataclass_field(
default_factory=LossyList
)
filtered_workspace_types: LossyList[str] = dataclass_field(
default_factory=LossyList
)
dashboards_scanned: int = 0
charts_scanned: int = 0
filtered_dashboards: List[str] = dataclass_field(default_factory=list)
filtered_charts: List[str] = dataclass_field(default_factory=list)
number_of_workspaces: int = 0
def report_dashboards_scanned(self, count: int = 1) -> None:
self.dashboards_scanned += count
@ -194,9 +202,6 @@ class PowerBiDashboardSourceReport(StaleEntityRemovalSourceReport):
def report_charts_dropped(self, view: str) -> None:
self.filtered_charts.append(view)
def report_number_of_workspaces(self, number_of_workspaces: int) -> None:
self.number_of_workspaces = number_of_workspaces
def default_for_dataset_type_mapping() -> Dict[str, str]:
dict_: dict = {}
@ -331,7 +336,7 @@ class PowerBiDashboardSourceConfig(
)
workspace_id_as_urn_part: bool = pydantic.Field(
default=False,
description="It is recommended to set this to True only if you have legacy workspaces based on Office 365 groups, as those workspaces can have identical names."
description="It is recommended to set this to True only if you have legacy workspaces based on Office 365 groups, as those workspaces can have identical names. "
"To maintain backward compatibility, this is set to False which uses workspace name",
)
# Enable/Disable extracting ownership information of Dashboard
@ -371,8 +376,8 @@ class PowerBiDashboardSourceConfig(
# any existing tags defined to those entities
extract_endorsements_to_tags: bool = pydantic.Field(
default=False,
description="Whether to extract endorsements to tags, note that this may overwrite existing tags. Admin API "
"access is required is this setting is enabled",
description="Whether to extract endorsements to tags, note that this may overwrite existing tags. "
"Admin API access is required if this setting is enabled.",
)
filter_dataset_endorsements: AllowDenyPattern = pydantic.Field(
default=AllowDenyPattern.allow_all(),

View File

@ -1,6 +1,7 @@
import functools
import importlib.resources as pkg_resource
import logging
import os
from typing import Dict, List
import lark
@ -19,9 +20,12 @@ from datahub.ingestion.source.powerbi.m_query.data_classes import (
TRACE_POWERBI_MQUERY_PARSER,
)
from datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes import Table
from datahub.utilities.threading_timeout import TimeoutException, threading_timeout
logger = logging.getLogger(__name__)
_M_QUERY_PARSE_TIMEOUT = int(os.getenv("DATAHUB_POWERBI_M_QUERY_PARSE_TIMEOUT", 60))
@functools.lru_cache(maxsize=1)
def get_lark_parser() -> Lark:
@ -41,7 +45,8 @@ def _parse_expression(expression: str) -> Tree:
expression = expression.replace("\u00a0", " ")
logger.debug(f"Parsing expression = {expression}")
parse_tree: Tree = lark_parser.parse(expression)
with threading_timeout(_M_QUERY_PARSE_TIMEOUT):
parse_tree: Tree = lark_parser.parse(expression)
if TRACE_POWERBI_MQUERY_PARSER:
logger.debug(parse_tree.pretty())
@ -83,17 +88,26 @@ def get_upstream_tables(
context=f"table-full-name={table.full_name}, expression={table.expression}, message={message}",
)
return []
except KeyboardInterrupt:
raise
except TimeoutException:
reporter.warning(
title="M-Query Parsing Timeout",
message=f"M-Query parsing timed out after {_M_QUERY_PARSE_TIMEOUT} seconds. Lineage for this table will not be extracted.",
context=f"table-full-name={table.full_name}, expression={table.expression}",
)
return []
except (
BaseException
) as e: # TODO: Debug why BaseException is needed here and below.
if isinstance(e, lark.exceptions.UnexpectedCharacters):
title = "Unexpected Character Found"
error_type = "Unexpected Character Error"
else:
title = "Unknown Parsing Error"
error_type = "Unknown Parsing Error"
reporter.warning(
title=title,
message="Unknown parsing error",
title="Unable to extract lineage from M-Query expression",
message=f"Got an '{error_type}' while parsing the expression. Lineage will be missing for this table.",
context=f"table-full-name={table.full_name}, expression={table.expression}",
exc=e,
)

View File

@ -473,8 +473,9 @@ class MQueryResolver(AbstractDataAccessMQueryResolver, ABC):
)
if v_statement is None:
self.reporter.report_warning(
f"{self.table.full_name}-variable-statement",
f"output variable ({current_identifier}) statement not found in table expression",
title="Unable to extract lineage from M-Query expression",
message="Lineage will be incomplete.",
context=f"table-full-name={self.table.full_name}: output-variable={current_identifier} not found in table expression",
)
return None

View File

@ -7,6 +7,8 @@ import logging
from datetime import datetime
from typing import Iterable, List, Optional, Tuple, Union
import more_itertools
import datahub.emitter.mce_builder as builder
import datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes as powerbi_data_classes
from datahub.emitter.mcp import MetadataChangeProposalWrapper
@ -795,6 +797,11 @@ class Mapper:
container_key=self.workspace_key,
name=workspace.name,
sub_types=[workspace.type],
extra_properties={
"workspace_id": workspace.id,
"workspace_name": workspace.name,
"workspace_type": workspace.type,
},
)
return container_work_units
@ -1256,20 +1263,33 @@ class PowerBiDashboardSource(StatefulIngestionSourceBase, TestableSource):
def get_allowed_workspaces(self) -> List[powerbi_data_classes.Workspace]:
all_workspaces = self.powerbi_client.get_workspaces()
allowed_wrk = [
workspace
for workspace in all_workspaces
if self.source_config.workspace_id_pattern.allowed(workspace.id)
and workspace.type in self.source_config.workspace_type_filter
]
logger.info(f"Number of workspaces = {len(all_workspaces)}")
self.reporter.report_number_of_workspaces(len(all_workspaces))
logger.info(f"Number of allowed workspaces = {len(allowed_wrk)}")
logger.debug(f"Workspaces = {all_workspaces}")
self.reporter.all_workspace_count = len(all_workspaces)
logger.debug(
f"All workspaces: {[workspace.format_name_for_logger() for workspace in all_workspaces]}"
)
return allowed_wrk
allowed_workspaces = []
for workspace in all_workspaces:
if not self.source_config.workspace_id_pattern.allowed(workspace.id):
self.reporter.filtered_workspace_names.append(
f"{workspace.id} - {workspace.name}"
)
continue
elif workspace.type not in self.source_config.workspace_type_filter:
self.reporter.filtered_workspace_types.append(
f"{workspace.id} - {workspace.name} (type = {workspace.type})"
)
continue
else:
allowed_workspaces.append(workspace)
logger.info(f"Number of allowed workspaces = {len(allowed_workspaces)}")
logger.debug(
f"Allowed workspaces: {[workspace.format_name_for_logger() for workspace in allowed_workspaces]}"
)
return allowed_workspaces
def validate_dataset_type_mapping(self):
powerbi_data_platforms: List[str] = [
@ -1480,16 +1500,10 @@ class PowerBiDashboardSource(StatefulIngestionSourceBase, TestableSource):
# Fetch PowerBi workspace for given workspace identifier
allowed_workspaces = self.get_allowed_workspaces()
workspaces_len = len(allowed_workspaces)
batch_size = (
self.source_config.scan_batch_size
) # 100 is the maximum allowed for powerbi scan
num_batches = (workspaces_len + batch_size - 1) // batch_size
batches = [
allowed_workspaces[i * batch_size : (i + 1) * batch_size]
for i in range(num_batches)
]
batches = more_itertools.chunked(
allowed_workspaces, self.source_config.scan_batch_size
)
for batch_workspaces in batches:
for workspace in self.powerbi_client.fill_workspaces(
batch_workspaces, self.reporter

View File

@ -96,6 +96,9 @@ class Workspace:
instance=platform_instance,
)
def format_name_for_logger(self) -> str:
return f"{self.name} ({self.id})"
@dataclass
class DataSource:

View File

@ -811,7 +811,7 @@ class AdminAPIResolver(DataResolverBase):
res.raise_for_status()
if res.json()[Constant.STATUS].upper() == Constant.SUCCEEDED:
logger.info(f"Scan result is available for scan id({scan_id})")
logger.debug(f"Scan result is available for scan id({scan_id})")
return True
if retry == max_retry:
@ -898,8 +898,8 @@ class AdminAPIResolver(DataResolverBase):
return users
def get_scan_result(self, scan_id: str) -> Optional[dict]:
logger.info("Fetching scan result")
logger.info(f"{Constant.SCAN_ID}={scan_id}")
logger.debug("Fetching scan result")
logger.debug(f"{Constant.SCAN_ID}={scan_id}")
scan_result_get_endpoint = AdminAPIResolver.API_ENDPOINTS[
Constant.SCAN_RESULT_GET
]

View File

@ -303,7 +303,7 @@ class PowerBiAPI:
)
return None
logger.info("Waiting for scan to complete")
logger.debug("Waiting for scan to complete")
if (
self.__admin_api_resolver.wait_for_scan_to_complete(
scan_id=scan_id, timeout=self.__config.scan_timeout
@ -355,22 +355,32 @@ class PowerBiAPI:
logger.debug("Processing scan result for datasets")
for dataset_dict in datasets:
dataset_instance: PowerBIDataset = self._get_resolver().get_dataset(
workspace=workspace,
dataset_id=dataset_dict[Constant.ID],
)
dataset_id = dataset_dict[Constant.ID]
try:
dataset_instance = self._get_resolver().get_dataset(
workspace=workspace,
dataset_id=dataset_id,
)
if dataset_instance is None:
continue
except Exception as e:
self.reporter.warning(
title="Unable to fetch dataset details",
message="Skipping this dataset due to the error. Metadata will be incomplete.",
context=f"workspace={workspace.name}, dataset-id={dataset_id}",
exc=e,
)
continue
# fetch + set dataset parameters
try:
dataset_parameters = self._get_resolver().get_dataset_parameters(
workspace_id=workspace.id,
dataset_id=dataset_dict[Constant.ID],
dataset_id=dataset_id,
)
dataset_instance.parameters = dataset_parameters
except Exception as e:
logger.info(
f"Unable to fetch dataset parameters for {dataset_dict[Constant.ID]}: {e}"
)
logger.info(f"Unable to fetch dataset parameters for {dataset_id}: {e}")
if self.__config.extract_endorsements_to_tags:
dataset_instance.tags = self._parse_endorsement(
@ -564,8 +574,7 @@ class PowerBiAPI:
)
else:
logger.info(
"Skipping endorsements tag as extract_endorsements_to_tags is set to "
"false "
"Skipping endorsements tag as extract_endorsements_to_tags is not enabled"
)
self._populate_app_details(
@ -641,6 +650,9 @@ class PowerBiAPI:
def fill_workspaces(
self, workspaces: List[Workspace], reporter: PowerBiDashboardSourceReport
) -> Iterable[Workspace]:
logger.info(
f"Fetching initial metadata for workspaces: {[workspace.format_name_for_logger() for workspace in workspaces]}"
)
workspaces = self._fill_metadata_from_scan_result(workspaces=workspaces)
# First try to fill the admin detail as some regular metadata contains lineage to admin metadata

View File

@ -0,0 +1,42 @@
import contextlib
import functools
import platform
from typing import ContextManager
from stopit import ThreadingTimeout as _ThreadingTimeout, TimeoutException
__all__ = ["threading_timeout", "TimeoutException"]
@functools.lru_cache(maxsize=1)
def _is_cpython() -> bool:
"""Check if we're running on CPython."""
return platform.python_implementation() == "CPython"
def threading_timeout(timeout: float) -> ContextManager[None]:
"""A timeout context manager that uses stopit's ThreadingTimeout underneath.
This is only supported on CPython.
That's because stopit.ThreadingTimeout uses a CPython-internal method to raise
an exception (the timeout error) in another thread. See stopit.threadstop.async_raise.
Reference: https://github.com/glenfant/stopit
Args:
timeout: The timeout in seconds. If <= 0, no timeout is applied.
Raises:
RuntimeError: If the timeout is not supported on the current Python implementation.
TimeoutException: If the timeout is exceeded.
"""
if timeout <= 0:
return contextlib.nullcontext()
if not _is_cpython():
raise RuntimeError(
f"Timeout is only supported on CPython, not {platform.python_implementation()}"
)
return _ThreadingTimeout(timeout, swallow_exc=False)

View File

@ -8,7 +8,10 @@
"json": {
"customProperties": {
"platform": "powerbi",
"workspace": "demo-workspace"
"workspace": "demo-workspace",
"workspace_id": "64ED5CAD-7C10-4684-8180-826122881108",
"workspace_name": "demo-workspace",
"workspace_type": "Workspace"
},
"name": "demo-workspace"
}
@ -3957,7 +3960,10 @@
"json": {
"customProperties": {
"platform": "powerbi",
"workspace": "second-demo-workspace"
"workspace": "second-demo-workspace",
"workspace_id": "64ED5CAD-7C22-4684-8180-826122881108",
"workspace_name": "second-demo-workspace",
"workspace_type": "Workspace"
},
"name": "second-demo-workspace"
}

View File

@ -8,7 +8,10 @@
"json": {
"customProperties": {
"platform": "powerbi",
"workspace": "64ED5CAD-7C10-4684-8180-826122881108"
"workspace": "64ED5CAD-7C10-4684-8180-826122881108",
"workspace_id": "64ED5CAD-7C10-4684-8180-826122881108",
"workspace_name": "demo-workspace",
"workspace_type": "Workspace"
},
"name": "demo-workspace"
}

View File

@ -8,7 +8,10 @@
"json": {
"customProperties": {
"platform": "powerbi",
"workspace": "Jane Smith Workspace"
"workspace": "Jane Smith Workspace",
"workspace_id": "90E9E256-3D6D-4D38-86C8-6CCCBD8C170C",
"workspace_name": "Jane Smith Workspace",
"workspace_type": "PersonalGroup"
},
"name": "Jane Smith Workspace"
}

View File

@ -0,0 +1,31 @@
import time
import pytest
from datahub.utilities.threading_timeout import TimeoutException, threading_timeout
def test_timeout_no_timeout():
# Should complete without raising an exception
with threading_timeout(1.0):
time.sleep(0.1)
def test_timeout_raises():
# Should raise TimeoutException
with pytest.raises(TimeoutException):
with threading_timeout(0.1):
time.sleep(0.5)
def test_timeout_early_exit():
# Test that context manager handles other exceptions properly
with pytest.raises(ValueError):
with threading_timeout(1.0):
raise ValueError("Early exit")
def test_timeout_zero():
# Should not raise an exception
with threading_timeout(0.0):
pass