feat(ingest/powerbi): DatabricksMultiCloud native query support (#11756)

Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
Co-authored-by: Aseem Bansal <asmbansal2@gmail.com>
This commit is contained in:
sid-acryl 2024-11-12 19:26:06 +05:30 committed by GitHub
parent 5094dabdf4
commit 6454ff30ab
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 533 additions and 108 deletions

View File

@ -130,6 +130,8 @@ class Constant:
APP_SUB_TYPE = "App"
STATE = "state"
ACTIVE = "Active"
SQL_PARSING_FAILURE = "SQL Parsing Failure"
M_QUERY_NULL = '"null"'
@dataclass
@ -175,6 +177,11 @@ class SupportedDataPlatform(Enum):
powerbi_data_platform_name="Databricks", datahub_data_platform_name="databricks"
)
DatabricksMultiCloud_SQL = DataPlatformPair(
powerbi_data_platform_name="DatabricksMultiCloud",
datahub_data_platform_name="databricks",
)
@dataclass
class PowerBiDashboardSourceReport(StaleEntityRemovalSourceReport):
@ -199,6 +206,8 @@ class PowerBiDashboardSourceReport(StaleEntityRemovalSourceReport):
m_query_parse_unexpected_character_errors: int = 0
m_query_parse_unknown_errors: int = 0
m_query_resolver_errors: int = 0
m_query_resolver_no_lineage: int = 0
m_query_resolver_successes: int = 0
def report_dashboards_scanned(self, count: int = 1) -> None:
self.dashboards_scanned += count
@ -495,6 +504,18 @@ class PowerBiDashboardSourceConfig(
description="Whether to ingest workspace app. Requires DataHub server 0.14.2+.",
)
m_query_parse_timeout: int = pydantic.Field(
default=70,
description="Timeout for PowerBI M-query parsing in seconds. Table-level lineage is determined by analyzing the M-query expression. "
"Increase this value if you encounter the 'M-Query Parsing Timeout' message in the connector report.",
)
metadata_api_timeout: int = pydantic.Field(
default=30,
description="timeout in seconds for Metadata Rest Api.",
hidden_from_docs=True,
)
@root_validator(skip_on_failure=True)
def validate_extract_column_level_lineage(cls, values: Dict) -> Dict:
flags = [

View File

@ -1,4 +1,5 @@
import logging
import re
from typing import List, Optional
import sqlparse
@ -9,14 +10,29 @@ from datahub.sql_parsing.sqlglot_lineage import (
create_lineage_sql_parsed_result,
)
SPECIAL_CHARACTERS = ["#(lf)", "(lf)", "#(tab)"]
# It is the PowerBI M-Query way to mentioned \n , \t
SPECIAL_CHARACTERS = {
"#(lf)": "\n",
"(lf)": "\n",
"#(tab)": "\t",
}
ANSI_ESCAPE_CHARACTERS = r"\x1b\[[0-9;]*m"
logger = logging.getLogger(__name__)
def remove_special_characters(native_query: str) -> str:
for char in SPECIAL_CHARACTERS:
native_query = native_query.replace(char, " ")
native_query = native_query.replace(char, SPECIAL_CHARACTERS[char])
ansi_escape_regx = re.compile(ANSI_ESCAPE_CHARACTERS)
native_query = ansi_escape_regx.sub("", native_query)
# Replace "" quotes by ". Sqlglot is not handling column name alias surrounded with two double quotes
native_query = native_query.replace('""', '"')
return native_query
@ -53,6 +69,15 @@ def get_tables(native_query: str) -> List[str]:
return tables
def remove_drop_statement(query: str) -> str:
# Certain PowerBI M-Queries contain a combination of DROP and SELECT statements within SQL, causing SQLParser to fail on these queries.
# Therefore, these occurrences are being removed.
# Regular expression to match patterns like "DROP TABLE IF EXISTS #<identifier>;"
pattern = r"DROP TABLE IF EXISTS #\w+;?"
return re.sub(pattern, "", query)
def parse_custom_sql(
ctx: PipelineContext,
query: str,
@ -65,12 +90,10 @@ def parse_custom_sql(
logger.debug("Using sqlglot_lineage to parse custom sql")
sql_query = remove_special_characters(query)
logger.debug(f"Processing native query = {sql_query}")
logger.debug(f"Processing native query using DataHub Sql Parser = {query}")
return create_lineage_sql_parsed_result(
query=sql_query,
query=query,
default_schema=schema,
default_db=database,
platform=platform,

View File

@ -37,15 +37,19 @@ def get_lark_parser() -> Lark:
return Lark(grammar, start="let_expression", regex=True)
def _parse_expression(expression: str) -> Tree:
def _parse_expression(expression: str, parse_timeout: int = 60) -> Tree:
lark_parser: Lark = get_lark_parser()
# Replace U+00a0 NO-BREAK SPACE with a normal space.
# Sometimes PowerBI returns expressions with this character and it breaks the parser.
expression = expression.replace("\u00a0", " ")
# Parser resolves the variable=null value to variable='', and in the Tree we get empty string
# to distinguish between an empty and null set =null to ="null"
expression = expression.replace("=null", '="null"')
logger.debug(f"Parsing expression = {expression}")
with threading_timeout(_M_QUERY_PARSE_TIMEOUT):
with threading_timeout(parse_timeout):
parse_tree: Tree = lark_parser.parse(expression)
if TRACE_POWERBI_MQUERY_PARSER:
@ -74,30 +78,33 @@ def get_upstream_tables(
)
try:
with reporter.m_query_parse_timer:
reporter.m_query_parse_attempts += 1
parse_tree: Tree = _parse_expression(table.expression)
valid, message = validator.validate_parse_tree(
parse_tree, native_query_enabled=config.native_query_parsing
table.expression, native_query_enabled=config.native_query_parsing
)
if valid is False:
assert message is not None
logger.debug(f"Validation failed: {message}")
reporter.info(
title="Unsupported M-Query",
message="DataAccess function is not present in M-Query expression",
title="Non-Data Platform Expression",
message=message,
context=f"table-full-name={table.full_name}, expression={table.expression}, message={message}",
)
reporter.m_query_parse_validation_errors += 1
return []
with reporter.m_query_parse_timer:
reporter.m_query_parse_attempts += 1
parse_tree: Tree = _parse_expression(
table.expression, parse_timeout=config.m_query_parse_timeout
)
except KeyboardInterrupt:
raise
except TimeoutException:
reporter.m_query_parse_timeouts += 1
reporter.warning(
title="M-Query Parsing Timeout",
message=f"M-Query parsing timed out after {_M_QUERY_PARSE_TIMEOUT} seconds. Lineage for this table will not be extracted.",
message=f"M-Query parsing timed out after {config.m_query_parse_timeout} seconds. Lineage for this table will not be extracted.",
context=f"table-full-name={table.full_name}, expression={table.expression}",
)
return []
@ -112,7 +119,7 @@ def get_upstream_tables(
reporter.m_query_parse_unknown_errors += 1
reporter.warning(
title="Unable to extract lineage from M-Query expression",
title="Unable to parse M-Query expression",
message=f"Got an '{error_type}' while parsing the expression. Lineage will be missing for this table.",
context=f"table-full-name={table.full_name}, expression={table.expression}",
exc=e,
@ -132,6 +139,10 @@ def get_upstream_tables(
platform_instance_resolver=platform_instance_resolver,
)
if lineage:
reporter.m_query_resolver_successes += 1
else:
reporter.m_query_resolver_no_lineage += 1
return lineage
except BaseException as e:

View File

@ -9,6 +9,7 @@ from lark import Tree
import datahub.emitter.mce_builder as builder
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.powerbi.config import (
Constant,
DataBricksPlatformDetail,
DataPlatformPair,
PlatformDetail,
@ -117,18 +118,24 @@ class AbstractDataPlatformTableCreator(ABC):
"""
ctx: PipelineContext
table: Table
config: PowerBiDashboardSourceConfig
reporter: PowerBiDashboardSourceReport
platform_instance_resolver: AbstractDataPlatformInstanceResolver
def __init__(
self,
ctx: PipelineContext,
table: Table,
config: PowerBiDashboardSourceConfig,
reporter: PowerBiDashboardSourceReport,
platform_instance_resolver: AbstractDataPlatformInstanceResolver,
) -> None:
super().__init__()
self.ctx = ctx
self.table = table
self.config = config
self.reporter = reporter
self.platform_instance_resolver = platform_instance_resolver
@abstractmethod
@ -214,6 +221,10 @@ class AbstractDataPlatformTableCreator(ABC):
)
)
query = native_sql_parser.remove_drop_statement(
native_sql_parser.remove_special_characters(query)
)
parsed_result: Optional[
"SqlParsingResult"
] = native_sql_parser.parse_custom_sql(
@ -227,7 +238,19 @@ class AbstractDataPlatformTableCreator(ABC):
)
if parsed_result is None:
logger.debug("Failed to parse query")
self.reporter.info(
title=Constant.SQL_PARSING_FAILURE,
message="Fail to parse native sql present in PowerBI M-Query",
context=f"table-name={self.table.full_name}, sql={query}",
)
return Lineage.empty()
if parsed_result.debug_info and parsed_result.debug_info.table_error:
self.reporter.warning(
title=Constant.SQL_PARSING_FAILURE,
message="Fail to parse native sql present in PowerBI M-Query",
context=f"table-name={self.table.full_name}, error={parsed_result.debug_info.table_error},sql={query}",
)
return Lineage.empty()
for urn in parsed_result.in_tables:
@ -290,8 +313,8 @@ class MQueryResolver(AbstractDataAccessMQueryResolver, ABC):
Once DataAccessFunctionDetail instance is initialized thereafter MQueryResolver generates the DataPlatformTable with the help of AbstractDataPlatformTableCreator
(see method resolve_to_data_platform_table_list).
Classes which extended from AbstractDataPlatformTableCreator knows how to convert generated DataAccessFunctionDetail instance
to respective DataPlatformTable instance as per dataplatform.
Classes which extended from AbstractDataPlatformTableCreator know how to convert generated DataAccessFunctionDetail instance
to the respective DataPlatformTable instance as per dataplatform.
"""
@ -343,6 +366,22 @@ class MQueryResolver(AbstractDataAccessMQueryResolver, ABC):
return argument_list
def take_first_argument(self, expression: Tree) -> Optional[Tree]:
# function is not data-access function, lets process function argument
first_arg_tree: Optional[Tree] = tree_function.first_arg_list_func(expression)
if first_arg_tree is None:
logger.debug(
f"Function invocation without argument in expression = {expression.pretty()}"
)
self.reporter.report_warning(
f"{self.table.full_name}-variable-statement",
"Function invocation without argument",
)
return None
return first_arg_tree
def _process_invoke_expression(
self, invoke_expression: Tree
) -> Union[DataAccessFunctionDetail, List[str], None]:
@ -350,6 +389,9 @@ class MQueryResolver(AbstractDataAccessMQueryResolver, ABC):
data_access_func: str = tree_function.make_function_name(letter_tree)
# The invoke function is either DataAccess function like PostgreSQL.Database(<argument-list>) or
# some other function like Table.AddColumn or Table.Combine and so on
logger.debug(f"function-name: {data_access_func}")
if data_access_func in self.data_access_functions:
arg_list: Optional[Tree] = MQueryResolver.get_argument_list(
invoke_expression
@ -368,20 +410,8 @@ class MQueryResolver(AbstractDataAccessMQueryResolver, ABC):
identifier_accessor=None,
)
# function is not data-access function, lets process function argument
first_arg_tree: Optional[Tree] = tree_function.first_arg_list_func(
invoke_expression
)
first_arg_tree: Optional[Tree] = self.take_first_argument(invoke_expression)
if first_arg_tree is None:
logger.debug(
f"Function invocation without argument in expression = {invoke_expression.pretty()}"
)
self.reporter.report_warning(
title="M-Query Resolver Error",
message="Unable to extract lineage from parsed M-Query expression (function invocation without argument)",
context=f"{self.table.full_name}: function invocation without argument",
)
return None
flat_arg_list: List[Tree] = tree_function.flat_argument_list(first_arg_tree)
@ -390,6 +420,40 @@ class MQueryResolver(AbstractDataAccessMQueryResolver, ABC):
return None
first_argument: Tree = flat_arg_list[0] # take first argument only
# Detect nested function calls in the first argument
# M-Query's data transformation pipeline:
# 1. Functions typically operate on tables/columns
# 2. First argument must be either:
# - A table variable name (referencing data source)
# - Another function that eventually leads to a table
#
# Example of nested functions:
# #"Split Column by Delimiter2" = Table.SplitColumn(
# Table.TransformColumnTypes(#"Removed Columns1", "KB")
# )
#
# In this example:
# - The inner function Table.TransformColumnTypes takes #"Removed Columns1"
# (a table reference) as its first argument
# - Its result is then passed as the first argument to Table.SplitColumn
second_invoke_expression: Optional[
Tree
] = tree_function.first_invoke_expression_func(first_argument)
if second_invoke_expression:
# 1. The First argument is function call
# 2. That function's first argument references next table variable
first_arg_tree = self.take_first_argument(second_invoke_expression)
if first_arg_tree is None:
return None
flat_arg_list = tree_function.flat_argument_list(first_arg_tree)
if len(flat_arg_list) == 0:
logger.debug("flat_arg_list is zero")
return None
first_argument = flat_arg_list[0] # take first argument only
expression: Optional[Tree] = tree_function.first_list_expression_func(
first_argument
)
@ -478,7 +542,7 @@ class MQueryResolver(AbstractDataAccessMQueryResolver, ABC):
self.reporter.report_warning(
title="Unable to extract lineage from M-Query expression",
message="Lineage will be incomplete.",
context=f"table-full-name={self.table.full_name}: output-variable={current_identifier} not found in table expression",
context=f"table-full-name={self.table.full_name}, expression = {self.table.expression}, output-variable={current_identifier} not found in table expression",
)
return None
@ -579,7 +643,9 @@ class MQueryResolver(AbstractDataAccessMQueryResolver, ABC):
AbstractDataPlatformTableCreator
) = supported_resolver.get_table_full_name_creator()(
ctx=ctx,
table=self.table,
config=config,
reporter=self.reporter,
platform_instance_resolver=platform_instance_resolver,
)
@ -680,8 +746,10 @@ class MSSqlDataPlatformTableCreator(DefaultTwoStepDataAccessSources):
database = db_name
schema = MSSqlDataPlatformTableCreator.DEFAULT_SCHEMA
else:
logger.warning(
f"Unsupported table format found {parsed_table} in query {query}"
self.reporter.warning(
title="Invalid table format",
message="The advanced SQL lineage feature (enable_advance_lineage_sql_construct) is disabled. Please either enable this feature or ensure the table is referenced as <db-name>.<schema-name>.<table-name> in the SQL.",
context=f"table-name={self.table.full_name}",
)
continue
@ -715,7 +783,7 @@ class MSSqlDataPlatformTableCreator(DefaultTwoStepDataAccessSources):
)
if len(arguments) == 2:
# It is regular case of MS-SQL
# It is a regular case of MS-SQL
logger.debug("Handling with regular case")
return self.two_level_access_pattern(data_access_func_detail)
@ -1032,6 +1100,7 @@ class NativeQueryDataPlatformTableCreator(AbstractDataPlatformTableCreator):
SUPPORTED_NATIVE_QUERY_DATA_PLATFORM: dict = {
SupportedDataPlatform.SNOWFLAKE.value.powerbi_data_platform_name: SupportedDataPlatform.SNOWFLAKE,
SupportedDataPlatform.AMAZON_REDSHIFT.value.powerbi_data_platform_name: SupportedDataPlatform.AMAZON_REDSHIFT,
SupportedDataPlatform.DatabricksMultiCloud_SQL.value.powerbi_data_platform_name: SupportedDataPlatform.DatabricksMultiCloud_SQL,
}
current_data_platform: SupportedDataPlatform = SupportedDataPlatform.SNOWFLAKE
@ -1079,6 +1148,34 @@ class NativeQueryDataPlatformTableCreator(AbstractDataPlatformTableCreator):
column_lineage=[],
)
def get_db_name(self, data_access_tokens: List[str]) -> Optional[str]:
if (
data_access_tokens[0]
!= SupportedDataPlatform.DatabricksMultiCloud_SQL.value.powerbi_data_platform_name
):
return None
try:
if "Database" in data_access_tokens:
index = data_access_tokens.index("Database")
if data_access_tokens[index + 1] != Constant.M_QUERY_NULL:
# Database name is explicitly set in argument
return data_access_tokens[index + 1]
if "Name" in data_access_tokens:
index = data_access_tokens.index("Name")
# Next element is value of the Name. It is a database name
return data_access_tokens[index + 1]
if "Catalog" in data_access_tokens:
index = data_access_tokens.index("Catalog")
# Next element is value of the Catalog. In Databricks Catalog can also be used in place of a database.
return data_access_tokens[index + 1]
except IndexError as e:
logger.debug("Database name is not available", exc_info=e)
return None
def create_lineage(
self, data_access_func_detail: DataAccessFunctionDetail
) -> Lineage:
@ -1093,6 +1190,7 @@ class NativeQueryDataPlatformTableCreator(AbstractDataPlatformTableCreator):
)
logger.debug(f"Flat argument list = {flat_argument_list}")
return Lineage.empty()
data_access_tokens: List[str] = tree_function.remove_whitespaces_from_list(
tree_function.token_values(flat_argument_list[0])
)
@ -1105,6 +1203,8 @@ class NativeQueryDataPlatformTableCreator(AbstractDataPlatformTableCreator):
f"NativeQuery is supported only for {self.SUPPORTED_NATIVE_QUERY_DATA_PLATFORM}"
)
return Lineage.empty()
if len(data_access_tokens[0]) < 3:
logger.debug(
f"Server is not available in argument list for data-platform {data_access_tokens[0]}. Returning empty "
@ -1115,8 +1215,7 @@ class NativeQueryDataPlatformTableCreator(AbstractDataPlatformTableCreator):
self.current_data_platform = self.SUPPORTED_NATIVE_QUERY_DATA_PLATFORM[
data_access_tokens[0]
]
# First argument is the query
# The First argument is the query
sql_query: str = tree_function.strip_char_from_list(
values=tree_function.remove_whitespaces_from_list(
tree_function.token_values(flat_argument_list[1])
@ -1134,10 +1233,12 @@ class NativeQueryDataPlatformTableCreator(AbstractDataPlatformTableCreator):
server=server,
)
database_name: Optional[str] = self.get_db_name(data_access_tokens)
return self.parse_custom_sql(
query=sql_query,
server=server,
database=None, # database and schema is available inside custom sql as per PowerBI Behavior
database=database_name,
schema=None,
)

View File

@ -46,8 +46,8 @@ def get_variable_statement(parse_tree: Tree, variable: str) -> Optional[Tree]:
def get_first_rule(tree: Tree, rule: str) -> Optional[Tree]:
"""
Lark library doesn't have advance search function.
This function will return the first tree of provided rule
Lark library doesn't have an advance search function.
This function will return the first tree of the provided rule
:param tree: Tree to search for the expression rule
:return: Tree
"""
@ -99,7 +99,6 @@ def token_values(tree: Tree, parameters: Dict[str, str] = {}) -> List[str]:
logger.debug(f"Unable to resolve parameter reference to {ref}")
values.append(ref)
elif isinstance(node, Token):
# This means we're probably looking at a literal.
values.append(cast(Token, node).value)
return
else:
@ -120,10 +119,14 @@ def remove_whitespaces_from_list(values: List[str]) -> List[str]:
return result
def strip_char(value: str, char: str = '"') -> str:
return value.strip(char)
def strip_char_from_list(values: List[str], char: str = '"') -> List[str]:
result: List[str] = []
for item in values:
result.append(item.strip(char))
result.append(strip_char(item.strip(char), char=char))
return result

View File

@ -1,28 +1,28 @@
import logging
from typing import List, Optional, Tuple
from typing import Optional, Tuple
from lark import Tree
from datahub.ingestion.source.powerbi.m_query import resolver, tree_function
from datahub.ingestion.source.powerbi.m_query import resolver
logger = logging.getLogger(__name__)
def validate_parse_tree(
tree: Tree, native_query_enabled: bool = True
expression: str, native_query_enabled: bool = True
) -> Tuple[bool, Optional[str]]:
"""
:param tree: tree to validate as per functions supported by m_parser module
:param expression: M-Query expression to check if supported data-function is present in expression
:param native_query_enabled: Whether user want to extract lineage from native query
:return: first argument is False if validation is failed and second argument would contain the error message.
in the case of valid tree, the first argument is True and the second argument would be None.
:return: True or False.
"""
functions: List[str] = tree_function.get_all_function_name(tree)
if len(functions) == 0:
return False, "Function calls not found"
function_names = [fun.value for fun in resolver.FunctionName]
if not any(fun in expression for fun in function_names):
return False, "DataAccess function is not present in M-Query expression."
if native_query_enabled is False:
if resolver.FunctionName.NATIVE_QUERY.value in functions:
return False, "Lineage extraction from native query is disabled."
if resolver.FunctionName.NATIVE_QUERY.value in function_names:
return (
False,
"Lineage extraction from native query is disabled. Enable native_query_parsing in recipe",
)
return True, None

View File

@ -7,8 +7,20 @@
// whole relational_expression parse tree.
// - Added letter_character_and_decimal_digit phrase and updated keyword_or_identifier phrase
// - Added below pattern in argument_list
// | WS_INLINE? SQL_STRING
// | WS_INLINE? SQL_STRING "," argument_list
// | WS_INLINE? sql_string
// | WS_INLINE? sql_string "," argument_list
// - Added subtract_expression
// - Updated relational_expression, here below are the updates
// | subtract_expression
// | subtract_expression "<" relational_expression
// | subtract_expression ">" relational_expression
// | subtract_expression "<=" relational_expression
// | subtract_expression ">=" relational_expression
// - Added empty_string
// - Updated argument_list, below are the updates
// | empty_string
// | empty_string "," argument_list
// - Added sql_string in any_literal
lexical_unit: lexical_elements?
@ -290,10 +302,16 @@ equality_expression: WS_INLINE? relational_expression
| relational_expression WS_INLINE? "<>" WS_INLINE? equality_expression
relational_expression: additive_expression
| subtract_expression
| additive_expression "<" relational_expression
| additive_expression ">" relational_expression
| additive_expression "<=" relational_expression
| additive_expression ">=" relational_expression
| subtract_expression "<" relational_expression
| subtract_expression ">" relational_expression
| subtract_expression "<=" relational_expression
| subtract_expression ">=" relational_expression
additive_expression: multiplicative_expression
| multiplicative_expression "+" additive_expression
@ -301,6 +319,11 @@ additive_expression: multiplicative_expression
| multiplicative_expression WS_INLINE? NEWLINE? WS_INLINE? "&" WS_INLINE? NEWLINE? WS_INLINE? additive_expression
subtract_expression: multiplicative_expression
| multiplicative_expression "-" additive_expression
| multiplicative_expression WS_INLINE? "_" WS_INLINE? additive_expression
| multiplicative_expression WS_INLINE? NEWLINE? WS_INLINE? "&" WS_INLINE? NEWLINE? WS_INLINE? additive_expression
multiplicative_expression: WS_INLINE? metadata_expression
| metadata_expression WS_INLINE? "*" WS_INLINE? multiplicative_expression
| metadata_expression "/" multiplicative_expression
@ -346,14 +369,23 @@ not_implemented_expression: "..."
invoke_expression: "#"? primary_expression "(" NEWLINE? argument_list? NEWLINE? ")"
SQL_STRING: /\"((?:[^\"\\]|\\[\"]|\"\"|\#\(lf\))+)\"/
empty_string: /"([^"]|\\")*"/
// SQL String specific rules
sql_content: /(?:[^\"\\]|\\[\"]|\"\"|\#\(lf\))+/
sql_string: "\"" sql_content "\""
argument_list: WS_INLINE? expression
| WS_INLINE? expression WS_INLINE? "," WS_INLINE? argument_list
| WS_INLINE? SQL_STRING
| WS_INLINE? SQL_STRING "," argument_list
| WS_INLINE? sql_string
| WS_INLINE? sql_string "," argument_list
| "\"" identifier "\""
| "\"" identifier "\"" "," argument_list
| "[" identifier "]"
| "[" identifier "]" "," argument_list
| empty_string
| empty_string "," argument_list
| WS_INLINE
| WS_INLINE? ESCAPED_STRING
| WS_INLINE? ESCAPED_STRING "," argument_list
@ -576,6 +608,7 @@ any_literal: record_literal
| number_literal
| text_literal
| null_literal
| sql_string
%import common.WORD

View File

@ -1512,7 +1512,7 @@ class PowerBiDashboardSource(StatefulIngestionSourceBase, TestableSource):
if self.source_config.modified_since:
# As modified_workspaces is not idempotent, hence we checkpoint for each powerbi workspace
# Because job_id is used as dictionary key, we have to set a new job_id
# Because job_id is used as a dictionary key, we have to set a new job_id
# Refer to https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py#L390
self.stale_entity_removal_handler.set_job_id(workspace.id)
self.state_provider.register_stateful_ingestion_usecase_handler(

View File

@ -56,6 +56,19 @@ def is_http_failure(response: Response, message: str) -> bool:
return True
class SessionWithTimeout(requests.Session):
timeout: int
def __init__(self, timeout, *args, **kwargs):
super().__init__(*args, **kwargs)
self.timeout = timeout
def request(self, method, url, **kwargs):
# Set the default timeout if none is provided
kwargs.setdefault("timeout", self.timeout)
return super().request(method, url, **kwargs)
class DataResolverBase(ABC):
SCOPE: str = "https://analysis.windows.net/powerbi/api/.default"
MY_ORG_URL = "https://api.powerbi.com/v1.0/myorg"
@ -69,6 +82,7 @@ class DataResolverBase(ABC):
client_id: str,
client_secret: str,
tenant_id: str,
metadata_api_timeout: int,
):
self.__access_token: Optional[str] = None
self.__access_token_expiry_time: Optional[datetime] = None
@ -84,7 +98,9 @@ class DataResolverBase(ABC):
self.get_access_token()
logger.info(f"Connected to {self._get_authority_url()}")
self._request_session = requests.Session()
self._request_session = SessionWithTimeout(timeout=metadata_api_timeout)
# set re-try parameter for request_session
self._request_session.mount(
"https://",

View File

@ -69,12 +69,14 @@ class PowerBiAPI:
client_id=self.__config.client_id,
client_secret=self.__config.client_secret,
tenant_id=self.__config.tenant_id,
metadata_api_timeout=self.__config.metadata_api_timeout,
)
self.__admin_api_resolver = AdminAPIResolver(
client_id=self.__config.client_id,
client_secret=self.__config.client_secret,
tenant_id=self.__config.tenant_id,
metadata_api_timeout=self.__config.metadata_api_timeout,
)
self.reporter: PowerBiDashboardSourceReport = reporter
@ -91,6 +93,14 @@ class PowerBiAPI:
if isinstance(e, requests.exceptions.HTTPError):
logger.warning(f"HTTP status-code = {e.response.status_code}")
if isinstance(e, requests.exceptions.Timeout):
url: str = e.request.url if e.request else "URL not available"
self.reporter.warning(
title="Metadata API Timeout",
message=f"Metadata endpoints are not reachable. Check network connectivity to PowerBI Service.",
context=f"url={url}",
)
logger.debug(msg=message, exc_info=e)
return e
@ -253,7 +263,7 @@ class PowerBiAPI:
except:
self.log_http_error(message="Unable to fetch list of workspaces")
raise # we want this exception to bubble up
# raise # we want this exception to bubble up
workspaces = [
Workspace(

View File

@ -53,7 +53,10 @@ class PartitionExecutor(Closeable):
self.max_workers = max_workers
self.max_pending = max_pending
self._executor = ThreadPoolExecutor(max_workers=max_workers)
self._executor = ThreadPoolExecutor(
max_workers=max_workers,
thread_name_prefix=self.__class__.__name__,
)
# Each pending or executing request will acquire a permit from this semaphore.
self._semaphore = BoundedSemaphore(max_pending + max_workers)
@ -261,11 +264,17 @@ class BatchPartitionExecutor(Closeable):
self.min_process_interval = min_process_interval
assert self.max_workers > 1
# We add one here to account for the clearinghouse worker thread.
self._executor = ThreadPoolExecutor(max_workers=max_workers + 1)
self._executor = ThreadPoolExecutor(
# We add one here to account for the clearinghouse worker thread.
max_workers=max_workers + 1,
thread_name_prefix=self.__class__.__name__,
)
self._clearinghouse_started = False
# pending_count includes the length of the pending list, plus the
# number of items sitting in the clearinghouse's internal queue.
self._pending_count = BoundedSemaphore(max_pending)
self._pending: "queue.Queue[Optional[_BatchPartitionWorkItem]]" = queue.Queue(
maxsize=max_pending
)
@ -294,10 +303,10 @@ class BatchPartitionExecutor(Closeable):
def _handle_batch_completion(
batch: List[_BatchPartitionWorkItem], future: Future
) -> None:
nonlocal workers_available
workers_available += 1
with clearinghouse_state_lock:
nonlocal workers_available
workers_available += 1
for item in batch:
keys_no_longer_in_flight.add(item.key)
self._pending_count.release()

View File

@ -1,12 +1,15 @@
import logging
import sys
import time
from typing import List, Tuple
from unittest.mock import MagicMock, patch
import pytest
from lark import Tree
import datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes as powerbi_data_classes
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import StructuredLogLevel
from datahub.ingestion.source.powerbi.config import (
PowerBiDashboardSourceConfig,
PowerBiDashboardSourceReport,
@ -51,10 +54,38 @@ M_QUERIES = [
'let\n Source = DatabricksMultiCloud.Catalogs("abc.cloud.databricks.com", "/sql/gh2cfe3fe1d4c7cd", [Catalog="data_analysis", Database="summary", EnableAutomaticProxyDiscovery=null]),\n vips_data_summary_dev = Source{[Item="vips_data",Schema="summary",Catalog="data_analysis"]}[Data],\n #"Changed Type" = Table.TransformColumnTypes(vips_data_summary_dev,{{"vipstartDate", type date}, {"enteredDate", type datetime}, {"estDraftDate", type datetime}, {"estPublDate", type datetime}})\nin\n #"Changed Type"',
'let\n Source = Value.NativeQuery(Snowflake.Databases("0DD93C6BD5A6.snowflakecomputing.com","sales_analytics_warehouse_prod",[Role="sales_analytics_member_ad"]){[Name="ORDERING"]}[Data], "SELECT#(lf) DISTINCT#(lf) T5.PRESENTMENT_START_DATE#(lf),T5.PRESENTMENT_END_DATE#(lf),T5.DISPLAY_NAME#(lf),T5.NAME#(tab)#(lf),T5.PROMO_DISPLAY_NAME#(lf),T5.REGION#(lf),T5.ID#(lf),T5.WALKOUT#(lf),T6.DEAL_ID#(lf),T6.TYPE#(lf),T5.FREE_PERIOD#(lf),T6.PRICE_MODIFICATION#(lf)#(lf)FROM#(lf)#(lf)(#(lf) SELECT #(lf) T1.NAME#(lf),DATE(T1.CREATED_AT) as CREATED_AT#(lf),T1.PROMO_CODE#(lf),T1.STATUS#(lf),DATE(T1.UPDATED_AT) as UPDATED_AT#(lf),T1.ID#(lf),T1.DISPLAY_NAME as PROMO_DISPLAY_NAME#(lf),T4.*#(lf)FROM#(lf)(SELECT#(lf) DISTINCT#(lf) NAME#(lf),CREATED_AT#(lf),PROMO_CODE#(lf),STATUS#(lf),UPDATED_AT#(lf),ID#(lf),DISPLAY_NAME#(lf) FROM RAW.PROMOTIONS#(lf)#(lf)) T1#(lf)INNER JOIN#(lf)#(lf) (#(lf) SELECT #(lf) T3.PRODUCT_STATUS#(lf),T3.CODE#(lf),T3.REGION#(lf),T3.DISPLAY_ORDER_SEQUENCE#(lf),T3.PRODUCT_LINE_ID#(lf),T3.DISPLAY_NAME#(lf),T3.PRODUCT_TYPE#(lf),T3.ID as PROD_TBL_ID#(lf),T3.NAME as PROD_TBL_NAME#(lf),DATE(T2.PRESENTMENT_END_DATE) as PRESENTMENT_END_DATE#(lf),T2.PRICE_COMMITMENT_PERIOD#(lf),T2.NAME as SEAL_TBL_NAME#(lf),DATE(T2.CREATED_AT) as SEAL_TBL_CREATED_AT#(lf),T2.DESCRIPTION#(lf),T2.FREE_PERIOD#(lf),T2.WALKOUT#(lf),T2.PRODUCT_CAT_ID#(lf),T2.PROMOTION_ID#(lf),DATE(T2.PRESENTMENT_START_DATE) as PRESENTMENT_START_DATE#(lf),YEAR(T2.PRESENTMENT_START_DATE) as DEAL_YEAR_START#(lf),MONTH(T2.PRESENTMENT_START_DATE) as DEAL_MONTH_START#(lf),T2.DEAL_TYPE#(lf),DATE(T2.UPDATED_AT) as SEAL_TBL_UPDATED_AT#(lf),T2.ID as SEAL_TBL_ID#(lf),T2.STATUS as SEAL_TBL_STATUS#(lf)FROM#(lf)(SELECT#(lf) DISTINCT#(lf) PRODUCT_STATUS#(lf),CODE#(lf),REGION#(lf),DISPLAY_ORDER_SEQUENCE#(lf),PRODUCT_LINE_ID#(lf),DISPLAY_NAME#(lf),PRODUCT_TYPE#(lf),ID #(lf),NAME #(lf) FROM#(lf) RAW.PRODUCTS#(lf)#(lf)) T3#(lf)INNER JOIN#(lf)(#(lf) SELECT#(lf) DISTINCT#(lf) PRESENTMENT_END_DATE#(lf),PRICE_COMMITMENT_PERIOD#(lf),NAME#(lf),CREATED_AT#(lf),DESCRIPTION#(lf),FREE_PERIOD#(lf),WALKOUT#(lf),PRODUCT_CAT_ID#(lf),PROMOTION_ID#(lf),PRESENTMENT_START_DATE#(lf),DEAL_TYPE#(lf),UPDATED_AT#(lf),ID#(lf),STATUS#(lf) FROM#(lf) RAW.DEALS#(lf)#(lf)) T2#(lf)ON#(lf)T3.ID = T2.PRODUCT_CAT_ID #(lf)WHERE#(lf)T2.PRESENTMENT_START_DATE >= \'2015-01-01\'#(lf)AND#(lf)T2.STATUS = \'active\'#(lf)#(lf))T4#(lf)ON#(lf)T1.ID = T4.PROMOTION_ID#(lf))T5#(lf)INNER JOIN#(lf)RAW.PRICE_MODIFICATIONS T6#(lf)ON#(lf)T5.SEAL_TBL_ID = T6.DEAL_ID", null, [EnableFolding=true]) \n in \n Source',
'let\n Source = Databricks.Catalogs(#"hostname",#"http_path", null),\n edp_prod_Database = Source{[Name=#"catalog",Kind="Database"]}[Data],\n gold_Schema = edp_prod_Database{[Name=#"schema",Kind="Schema"]}[Data],\n pet_view = gold_Schema{[Name="pet_list",Kind="View"]}[Data],\n #"Filtered Rows" = Table.SelectRows(pet_view, each true),\n #"Removed Columns" = Table.RemoveColumns(#"Filtered Rows",{"created_timestmp"})\nin\n #"Removed Columns"',
'let\n Source = Value.NativeQuery(Snowflake.Databases("0DD93C6BD5A6.snowflakecomputing.com","sales_analytics_warehouse_prod",[Role="sales_analytics_member_ad"]){[Name="SL_OPERATIONS"]}[Data], "select SALE_NO AS ""SaleNo""#(lf) ,CODE AS ""Code""#(lf) ,ENDDATE AS ""end_date""#(lf) from SL_OPERATIONS.SALE.REPORTS#(lf) where ENDDATE > \'2024-02-03\'", null, [EnableFolding=true]),\n #"selected Row" = Table.SelectRows(Source)\nin\n #"selected Row"',
'let\n Source = Value.NativeQuery(Snowflake.Databases("0DD93C6BD5A6.snowflakecomputing.com","sales_analytics_warehouse_prod",[Role="sales_analytics_member_ad"]){[Name="SL_OPERATIONS"]}[Data], "select SALE_NO AS ""\x1b[4mSaleNo\x1b[0m""#(lf) ,CODE AS ""Code""#(lf) ,ENDDATE AS ""end_date""#(lf) from SL_OPERATIONS.SALE.REPORTS#(lf) where ENDDATE > \'2024-02-03\'", null, [EnableFolding=true]),\n #"selected Row" = Table.SelectRows(Source)\nin\n #"selected Row"',
'let\n Source = Value.NativeQuery(DatabricksMultiCloud.Catalogs("foo.com", "/sql/1.0/warehouses/423423ew", [Catalog="sales_db", Database=null, EnableAutomaticProxyDiscovery=null]){[Name="sales_db",Kind="Database"]}[Data], "select * from public.slae_history#(lf)where creation_timestamp >= getDate(-3)", null, [EnableFolding=true]),\n #"NewTable" = Table.TransformColumn(Source,{{"creation_timestamp", type date}})\nin\n #"NewTable"',
'let Source = Snowflake.Databases("example.snowflakecomputing.com","WAREHOUSE_NAME",[Role="CUSTOM_ROLE"]), DB_Source = Source{[Name="DATABASE_NAME",Kind="Database"]}[Data], SCHEMA_Source = DB_Source{[Name="SCHEMA_NAME",Kind="Schema"]}[Data], TABLE_Source = SCHEMA_Source{[Name="TABLE_NAME",Kind="View"]}[Data], #"Split Column by Time" = Table.SplitColumn(Table.TransformColumnTypes(TABLE_Source, {{"TIMESTAMP_COLUMN", type text}}, "en-GB"), "TIMESTAMP_COLUMN", Splitter.SplitTextByDelimiter(" ", QuoteStyle.Csv), {"TIMESTAMP_COLUMN.1", "TIMESTAMP_COLUMN.2"}), #"Added Custom" = Table.AddColumn(#"Split Column by Time", "SOB", each ([ENDTIME] - [STARTTIME]) * 60 * 60 * 24) in #"Added Custom"',
'let\n Source = Sql.Database("AUPRDWHDB", "COMMOPSDB", [Query="DROP TABLE IF EXISTS #KKR;#(lf)Select#(lf)*,#(lf)concat((UPPER(REPLACE(SALES_SPECIALIST,\'-\',\'\'))),#(lf)LEFT(CAST(INVOICE_DATE AS DATE),4)+LEFT(RIGHT(CAST(INVOICE_DATE AS DATE),5),2)) AS AGENT_KEY,#(lf)CASE#(lf) WHEN CLASS = \'Software\' and (NOT(PRODUCT in (\'ADV\', \'Adv\') and left(ACCOUNT_ID,2)=\'10\') #(lf) or V_ENTERPRISE_INVOICED_REVENUE.TYPE = \'Manual Adjustment\') THEN INVOICE_AMOUNT#(lf) WHEN V_ENTERPRISE_INVOICED_REVENUE.TYPE IN (\'Recurring\',\'0\') THEN INVOICE_AMOUNT#(lf) ELSE 0#(lf)END as SOFTWARE_INV#(lf)#(lf)from V_ENTERPRISE_INVOICED_REVENUE", CommandTimeout=#duration(0, 1, 30, 0)]),\n #"Added Conditional Column" = Table.AddColumn(Source, "Services", each if [CLASS] = "Services" then [INVOICE_AMOUNT] else 0),\n #"Added Custom" = Table.AddColumn(#"Added Conditional Column", "Advanced New Sites", each if [PRODUCT] = "ADV"\nor [PRODUCT] = "Adv"\nthen [NEW_SITE]\nelse 0)\nin\n #"Added Custom"',
"LOAD_DATA(SOURCE)",
]
def get_data_platform_tables_with_dummy_table(q: str) -> List[resolver.Lineage]:
table: powerbi_data_classes.Table = powerbi_data_classes.Table(
columns=[],
measures=[],
expression=q,
name="virtual_order_table",
full_name="OrderDataSet.virtual_order_table",
)
reporter = PowerBiDashboardSourceReport()
ctx, config, platform_instance_resolver = get_default_instances()
config.enable_advance_lineage_sql_construct = True
return parser.get_upstream_tables(
table,
reporter,
ctx=ctx,
config=config,
platform_instance_resolver=platform_instance_resolver,
)
def get_default_instances(
override_config: dict = {},
) -> Tuple[
@ -681,6 +712,7 @@ def test_redshift_regular_case():
def test_redshift_native_query():
table: powerbi_data_classes.Table = powerbi_data_classes.Table(
expression=M_QUERIES[22],
name="category",
@ -773,21 +805,13 @@ def test_sqlglot_parser():
def test_databricks_multi_cloud():
table: powerbi_data_classes.Table = powerbi_data_classes.Table(
expression=M_QUERIES[25],
name="category",
full_name="dev.public.category",
)
reporter = PowerBiDashboardSourceReport()
q = M_QUERIES[25]
ctx, config, platform_instance_resolver = get_default_instances()
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
table,
reporter,
ctx=ctx,
config=config,
platform_instance_resolver=platform_instance_resolver,
)[0].upstreams
lineage: List[resolver.Lineage] = get_data_platform_tables_with_dummy_table(q=q)
assert len(lineage) == 1
data_platform_tables = lineage[0].upstreams
assert len(data_platform_tables) == 1
@ -798,21 +822,13 @@ def test_databricks_multi_cloud():
def test_databricks_catalog_pattern_1():
table: powerbi_data_classes.Table = powerbi_data_classes.Table(
expression=M_QUERIES[26],
name="category",
full_name="dev.public.category",
)
reporter = PowerBiDashboardSourceReport()
q = M_QUERIES[26]
ctx, config, platform_instance_resolver = get_default_instances()
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
table,
reporter,
ctx=ctx,
config=config,
platform_instance_resolver=platform_instance_resolver,
)[0].upstreams
lineage: List[resolver.Lineage] = get_data_platform_tables_with_dummy_table(q=q)
assert len(lineage) == 1
data_platform_tables = lineage[0].upstreams
assert len(data_platform_tables) == 1
@ -936,6 +952,73 @@ def test_databricks_regular_case_with_view():
def test_snowflake_double_double_quotes():
q = M_QUERIES[30]
lineage: List[resolver.Lineage] = get_data_platform_tables_with_dummy_table(q=q)
assert len(lineage) == 1
data_platform_tables = lineage[0].upstreams
assert len(data_platform_tables) == 1
assert (
data_platform_tables[0].urn
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,sl_operations.sale.reports,PROD)"
)
def test_databricks_multicloud():
q = M_QUERIES[31]
lineage: List[resolver.Lineage] = get_data_platform_tables_with_dummy_table(q=q)
assert len(lineage) == 1
data_platform_tables = lineage[0].upstreams
assert len(data_platform_tables) == 1
assert (
data_platform_tables[0].urn
== "urn:li:dataset:(urn:li:dataPlatform:databricks,sales_db.public.slae_history,PROD)"
)
def test_snowflake_multi_function_call():
q = M_QUERIES[32]
lineage: List[resolver.Lineage] = get_data_platform_tables_with_dummy_table(q=q)
assert len(lineage) == 1
data_platform_tables = lineage[0].upstreams
assert len(data_platform_tables) == 1
assert (
data_platform_tables[0].urn
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,database_name.schema_name.table_name,PROD)"
)
def test_mssql_drop_with_select():
q = M_QUERIES[33]
lineage: List[resolver.Lineage] = get_data_platform_tables_with_dummy_table(q=q)
assert len(lineage) == 1
data_platform_tables = lineage[0].upstreams
assert len(data_platform_tables) == 1
assert (
data_platform_tables[0].urn
== "urn:li:dataset:(urn:li:dataPlatform:mssql,commopsdb.dbo.v_enterprise_invoiced_revenue,PROD)"
)
def test_unsupported_data_platform():
q = M_QUERIES[34]
table: powerbi_data_classes.Table = powerbi_data_classes.Table(
columns=[],
measures=[],
@ -948,16 +1031,119 @@ def test_snowflake_double_double_quotes():
ctx, config, platform_instance_resolver = get_default_instances()
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
config.enable_advance_lineage_sql_construct = True
assert (
parser.get_upstream_tables(
table,
reporter,
ctx=ctx,
config=config,
platform_instance_resolver=platform_instance_resolver,
)
== []
)
info_entries: dict = reporter._structured_logs._entries.get(
StructuredLogLevel.INFO, {}
) # type :ignore
is_entry_present: bool = False
for key, entry in info_entries.items():
if entry.title == "Non-Data Platform Expression":
is_entry_present = True
break
assert (
is_entry_present
), 'Info message "Non-Data Platform Expression" should be present in reporter'
def test_empty_string_in_m_query():
# TRIM(TRIM(TRIM(AGENT_NAME, '\"\"'), '+'), '\\'') is in Query
q = "let\n Source = Value.NativeQuery(Snowflake.Databases(\"bu10758.ap-unknown-2.fakecomputing.com\",\"operations_analytics_warehouse_prod\",[Role=\"OPERATIONS_ANALYTICS_MEMBER\"]){[Name=\"OPERATIONS_ANALYTICS\"]}[Data], \"select #(lf)UPPER(REPLACE(AGENT_NAME,'-','')) AS CLIENT_DIRECTOR,#(lf)TRIM(TRIM(TRIM(AGENT_NAME, '\"\"'), '+'), '\\'') AS TRIM_AGENT_NAME,#(lf)TIER,#(lf)UPPER(MANAGER),#(lf)TEAM_TYPE,#(lf)DATE_TARGET,#(lf)MONTHID,#(lf)TARGET_TEAM,#(lf)SELLER_EMAIL,#(lf)concat((UPPER(REPLACE(AGENT_NAME,'-',''))), MONTHID) as AGENT_KEY,#(lf)UNIT_TARGET AS SME_Quota,#(lf)AMV_TARGET AS Revenue_Quota,#(lf)SERVICE_QUOTA,#(lf)BL_TARGET,#(lf)SOFTWARE_QUOTA as Software_Quota#(lf)#(lf)from OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_SME_UNIT_TARGETS inner join OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_SME_UNIT #(lf)#(lf)where YEAR_TARGET >= 2022#(lf)and TEAM_TYPE = 'Accounting'#(lf)and TARGET_TEAM = 'Enterprise'#(lf)AND TIER = 'Client Director'\", null, [EnableFolding=true])\nin\n Source"
lineage: List[resolver.Lineage] = get_data_platform_tables_with_dummy_table(q=q)
assert len(lineage) == 1
data_platform_tables = lineage[0].upstreams
assert len(data_platform_tables) == 2
assert (
data_platform_tables[0].urn
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,operations_analytics.transformed_prod.v_sme_unit,PROD)"
)
assert (
data_platform_tables[1].urn
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,operations_analytics.transformed_prod.v_sme_unit_targets,PROD)"
)
def test_double_quotes_in_alias():
# SELECT CAST(sales_date AS DATE) AS \"\"Date\"\" in query
q = 'let \n Source = Sql.Database("abc.com", "DB", [Query="SELECT CAST(sales_date AS DATE) AS ""Date"",#(lf) SUM(cshintrpret) / 60.0 AS ""Total Order All Items"",#(lf)#(tab)#(tab)#(tab) SUM(cshintrpret) / 60.0 - LAG(SUM(cshintrpret) / 60.0, 1) OVER (ORDER BY CAST(sales_date AS DATE)) AS ""Total minute difference"",#(lf)#(tab)#(tab)#(tab) SUM(sale_price) / 60.0 - LAG(SUM(sale_price) / 60.0, 1) OVER (ORDER BY CAST(sales_date AS DATE)) AS ""Normal minute difference""#(lf) FROM [DB].[dbo].[sales_t]#(lf) WHERE sales_date >= GETDATE() - 365#(lf) GROUP BY CAST(sales_date AS DATE),#(lf)#(tab)#(tab)CAST(sales_date AS TIME);"]) \n in \n Source'
lineage: List[resolver.Lineage] = get_data_platform_tables_with_dummy_table(q=q)
assert len(lineage) == 1
data_platform_tables = lineage[0].upstreams
assert len(data_platform_tables) == 1
assert (
data_platform_tables[0].urn
== "urn:li:dataset:(urn:li:dataPlatform:mssql,db.dbo.sales_t,PROD)"
)
@patch("datahub.ingestion.source.powerbi.m_query.parser.get_lark_parser")
def test_m_query_timeout(mock_get_lark_parser):
q = 'let\n Source = Value.NativeQuery(Snowflake.Databases("0DD93C6BD5A6.snowflakecomputing.com","sales_analytics_warehouse_prod",[Role="sales_analytics_member_ad"]){[Name="SL_OPERATIONS"]}[Data], "select SALE_NO AS ""\x1b[4mSaleNo\x1b[0m""#(lf) ,CODE AS ""Code""#(lf) ,ENDDATE AS ""end_date""#(lf) from SL_OPERATIONS.SALE.REPORTS#(lf) where ENDDATE > \'2024-02-03\'", null, [EnableFolding=true]),\n #"selected Row" = Table.SelectRows(Source)\nin\n #"selected Row"'
table: powerbi_data_classes.Table = powerbi_data_classes.Table(
columns=[],
measures=[],
expression=q,
name="virtual_order_table",
full_name="OrderDataSet.virtual_order_table",
)
reporter = PowerBiDashboardSourceReport()
ctx, config, platform_instance_resolver = get_default_instances()
config.enable_advance_lineage_sql_construct = True
config.m_query_parse_timeout = 1
mock_lark_instance = MagicMock()
mock_get_lark_parser.return_value = mock_lark_instance
# sleep for 5 seconds to trigger timeout
mock_lark_instance.parse.side_effect = lambda expression: time.sleep(5)
parser.get_upstream_tables(
table,
reporter,
ctx=ctx,
config=config,
platform_instance_resolver=platform_instance_resolver,
)[0].upstreams
assert len(data_platform_tables) == 1
assert (
data_platform_tables[0].urn
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,sl_operations.sale.reports,PROD)"
)
warn_entries: dict = reporter._structured_logs._entries.get(
StructuredLogLevel.WARN, {}
) # type :ignore
is_entry_present: bool = False
for key, entry in warn_entries.items():
if entry.title == "M-Query Parsing Timeout":
is_entry_present = True
break
assert (
is_entry_present
), 'Warning message "M-Query Parsing Timeout" should be present in reporter'

View File

@ -1,13 +1,17 @@
import pytest
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.powerbi.config import PowerBiDashboardSourceConfig
from datahub.ingestion.source.powerbi.config import (
PowerBiDashboardSourceConfig,
PowerBiDashboardSourceReport,
)
from datahub.ingestion.source.powerbi.dataplatform_instance_resolver import (
ResolvePlatformInstanceFromDatasetTypeMapping,
)
from datahub.ingestion.source.powerbi.m_query.resolver import (
MSSqlDataPlatformTableCreator,
)
from datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes import Table
@pytest.fixture
@ -17,8 +21,16 @@ def creator():
client_id="test-client-id",
client_secret="test-client-secret",
)
table = Table(
name="test_table",
full_name="db.schema.test_table",
)
return MSSqlDataPlatformTableCreator(
ctx=PipelineContext(run_id="test-run-id"),
table=table,
reporter=PowerBiDashboardSourceReport(),
config=config,
platform_instance_resolver=ResolvePlatformInstanceFromDatasetTypeMapping(
config