feat(ingest/powerbi): support PowerBI parameter references (#7523)

This commit is contained in:
Harshal Sheth 2023-03-09 14:14:55 -05:00 committed by GitHub
parent 6a4c061651
commit 91a8e5ea16
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 206 additions and 57 deletions

View File

@ -75,6 +75,7 @@ class Constant:
HTTP_RESPONSE_STATUS_CODE = "HttpResponseStatusCode"
NAME = "name"
DISPLAY_NAME = "displayName"
CURRENT_VALUE = "currentValue"
ORDER = "order"
IDENTIFIER = "identifier"
EMAIL_ADDRESS = "emailAddress"

View File

@ -1,9 +1,12 @@
import os
from abc import ABC
from dataclasses import dataclass
from typing import Any, Dict, Optional
from lark import Tree
TRACE_POWERBI_MQUERY_PARSER = os.getenv("DATAHUB_TRACE_POWERBI_MQUERY_PARSER", False)
class AbstractIdentifierAccessor(ABC): # To pass lint
pass

View File

@ -1,13 +1,16 @@
import functools
import importlib.resources as pkg_resource
import logging
from typing import List, cast
from typing import Dict, List, Optional
import lark
from lark import Lark, Tree
from datahub.ingestion.source.powerbi.config import PowerBiDashboardSourceReport
from datahub.ingestion.source.powerbi.m_query import resolver, validator
from datahub.ingestion.source.powerbi.m_query.data_classes import (
TRACE_POWERBI_MQUERY_PARSER,
)
from datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes import Table
logger = logging.getLogger(__name__)
@ -26,13 +29,14 @@ def get_lark_parser() -> Lark:
def _parse_expression(expression: str) -> Tree:
lark_parser: Lark = get_lark_parser()
parse_tree: Tree = lark_parser.parse(expression)
# Replace U+00a0 NO-BREAK SPACE with a normal space.
# Sometimes PowerBI returns expressions with this character and it breaks the parser.
expression = expression.replace("\u00a0", " ")
logger.debug(f"Parsing expression = {expression}")
parse_tree: Tree = lark_parser.parse(expression)
if (
logger.level == logging.DEBUG
): # Guard condition to avoid heavy pretty() function call
if TRACE_POWERBI_MQUERY_PARSER:
logger.debug(parse_tree.pretty())
return parse_tree
@ -42,11 +46,14 @@ def get_upstream_tables(
table: Table,
reporter: PowerBiDashboardSourceReport,
native_query_enabled: bool = True,
parameters: Optional[Dict[str, str]] = None,
) -> List[resolver.DataPlatformTable]:
if table.expression is None:
logger.debug(f"Expression is none for table {table.full_name}")
return []
parameters = parameters or {}
try:
parse_tree: Tree = _parse_expression(table.expression)
@ -54,28 +61,34 @@ def get_upstream_tables(
parse_tree, native_query_enabled=native_query_enabled
)
if valid is False:
logger.debug(f"Validation failed: {cast(str, message)}")
reporter.report_warning(table.full_name, cast(str, message))
assert message is not None
logger.debug(f"Validation failed: {message}")
reporter.report_warning(table.full_name, message)
return []
except BaseException as e: # TODO: Debug why BaseException is needed here and below.
if isinstance(e, lark.exceptions.UnexpectedCharacters):
message = "Unsupported m-query expression"
else:
message = "Failed to parse m-query expression"
reporter.report_warning(table.full_name, message)
logger.info(f"{message} for table {table.full_name}: {str(e)}")
logger.debug(f"Stack trace for {table.full_name}:", exc_info=e)
return []
try:
return resolver.MQueryResolver(
table=table,
parse_tree=parse_tree,
reporter=reporter,
).resolve_to_data_platform_table_list() # type: ignore
parameters=parameters,
).resolve_to_data_platform_table_list()
except BaseException as e:
if isinstance(e, lark.exceptions.UnexpectedCharacters):
logger.info(f"Unsupported m-query expression for table {table.full_name}")
reporter.report_warning(
table.full_name, "Failed to parse m-query expression"
)
else:
logger.info(f"Failed to parse expression: {str(e)}")
logger.debug(
f"Fail to parse expression for table {table.full_name}: {table.expression}",
exc_info=e,
reporter.report_warning(table.full_name, "Failed to process m-query expression")
logger.info(
f"Failed to process m-query expression for table {table.full_name}: {str(e)}"
)
logger.debug(f"Stack trace for {table.full_name}:", exc_info=e)
return []

View File

@ -67,6 +67,7 @@ class AbstractTableFullNameCreator(ABC):
class AbstractDataAccessMQueryResolver(ABC):
table: Table
parse_tree: Tree
parameters: Dict[str, str]
reporter: PowerBiDashboardSourceReport
data_access_functions: List[str]
@ -75,10 +76,12 @@ class AbstractDataAccessMQueryResolver(ABC):
table: Table,
parse_tree: Tree,
reporter: PowerBiDashboardSourceReport,
parameters: Dict[str, str],
):
self.table = table
self.parse_tree = parse_tree
self.reporter = reporter
self.parameters = parameters
self.data_access_functions = SupportedResolver.get_function_names()
@abstractmethod
@ -87,8 +90,8 @@ class AbstractDataAccessMQueryResolver(ABC):
class MQueryResolver(AbstractDataAccessMQueryResolver, ABC):
@staticmethod
def get_item_selector_tokens(
self,
expression_tree: Tree,
) -> Tuple[Optional[str], Optional[Dict[str, str]]]:
@ -111,7 +114,9 @@ class MQueryResolver(AbstractDataAccessMQueryResolver, ABC):
# remove whitespaces and quotes from token
tokens: List[str] = tree_function.strip_char_from_list(
tree_function.remove_whitespaces_from_list(
tree_function.token_values(cast(Tree, item_selector))
tree_function.token_values(
cast(Tree, item_selector), parameters=self.parameters
)
),
'"',
)
@ -287,9 +292,7 @@ class MQueryResolver(AbstractDataAccessMQueryResolver, ABC):
if result is None:
return None # No need to process some un-expected grammar found while processing invoke_expression
if isinstance(result, DataAccessFunctionDetail):
cast(
DataAccessFunctionDetail, result
).identifier_accessor = identifier_accessor
result.identifier_accessor = identifier_accessor
table_links.append(result) # Link of a table is completed
identifier_accessor = (
None # reset the identifier_accessor for other table

View File

@ -1,9 +1,13 @@
import logging
from functools import partial
from typing import Any, List, Optional, Union, cast
from typing import Any, Dict, List, Optional, Union, cast
from lark import Token, Tree
from datahub.ingestion.source.powerbi.m_query.data_classes import (
TRACE_POWERBI_MQUERY_PARSER,
)
logger = logging.getLogger(__name__)
@ -28,13 +32,14 @@ def get_variable_statement(parse_tree: Tree, variable: str) -> Optional[Tree]:
for tree in _filter:
values: List[str] = token_values(tree.children[0])
actual_value: str = "".join(strip_char_from_list(values, " "))
if TRACE_POWERBI_MQUERY_PARSER:
logger.debug(f"Actual Value = {actual_value}")
logger.debug(f"Expected Value = {variable}")
if actual_value.lower() == variable.lower():
return tree
logger.info(f"Provided variable({variable}) not found in variable rule")
logger.debug(f"Provided variable({variable}) not found in variable rule")
return None
@ -65,19 +70,48 @@ def get_first_rule(tree: Tree, rule: str) -> Optional[Tree]:
return expression_tree
def token_values(tree: Tree) -> List[str]:
def token_values(tree: Tree, parameters: Optional[Dict[str, str]] = None) -> List[str]:
"""
:param tree: Tree to traverse
:param parameters: If parameters is not None, it will try to resolve identifier variable references
using the values in 'parameters'.
:return: List of leaf token data
"""
values: List[str] = []
def internal(node: Union[Tree, Token]) -> None:
if isinstance(node, Token):
if (
parameters is not None
and isinstance(node, Tree)
and node.data == "identifier"
and node.children[0].data == "quoted_identifier"
):
# This is the case where they reference a variable using
# the `#"Name of variable"` syntax.
identifier = node.children[0].children[0]
assert isinstance(identifier, Token)
# For quoted_identifier, ref will have quotes around it.
# However, we'll probably need to expand this to all identifier types,
# which are not required to have quotes (using make_function_name).
ref = identifier.value
if ref.startswith('"') and ref[1:-1] in parameters:
resolved = parameters[ref[1:-1]]
values.append(resolved)
elif ref in parameters:
resolved = parameters[ref]
values.append(resolved)
else:
# If we can't resolve, fall back to the name of the variable.
logger.debug(f"Unable to resolve parameter reference to {ref}")
values.append(ref)
elif isinstance(node, Token):
# This means we're probably looking at a literal.
values.append(cast(Token, node).value)
return
else:
for child in node.children:
internal(child)
@ -120,6 +154,7 @@ def get_all_function_name(tree: Tree) -> List[str]:
_filter: Any = tree.find_data("invoke_expression")
for node in _filter:
if TRACE_POWERBI_MQUERY_PARSER:
logger.debug(f"Tree = {node.pretty}")
primary_expression_node: Optional[Tree] = first_primary_expression_func(node)
if primary_expression_node is None:
@ -153,7 +188,6 @@ first_item_selector_func = partial(get_first_rule, rule="item_selector")
first_arg_list_func = partial(get_first_rule, rule="argument_list")
first_identifier_func = partial(get_first_rule, rule="identifier")
first_primary_expression_func = partial(get_first_rule, rule="primary_expression")
first_identifier_func = partial(get_first_rule, rule="identifier")
first_invoke_expression_func = partial(get_first_rule, rule="invoke_expression")
first_type_expression_func = partial(get_first_rule, rule="type_expression")
first_list_expression_func = partial(get_first_rule, rule="list_expression")

View File

@ -1,3 +1,9 @@
// MODIFICATIONS:
// - The let_expression definition has added the whitespace rule instead of the required newline.
// This allows the parser to be less strict about whitespace.
// - Add inline whitespace to item_selection and optional_item_selection.
lexical_unit: lexical_elements?
lexical_elements: lexical_element
@ -361,9 +367,9 @@ field_name: generalized_identifier
item_access_expression: item_selection
| optional_item_selection
item_selection: primary_expression "{" item_selector "}"
item_selection: primary_expression whitespace? "{" item_selector "}"
optional_item_selection: primary_expression "{" item_selector "}" "?"
optional_item_selection: primary_expression whitespace? "{" item_selector "}" "?"
item_selector: expression
@ -426,7 +432,7 @@ each_expression: "each" WS_INLINE each_expression_body
each_expression_body: function_body
let_expression: "let" NEWLINE WS_INLINE? variable_list WS_INLINE? NEWLINE? in_expression
let_expression: "let" whitespace variable_list WS_INLINE? NEWLINE? in_expression
in_expression: "in" NEWLINE? WS_INLINE? NEWLINE? expression

View File

@ -150,9 +150,12 @@ class Mapper:
) -> List[MetadataChangeProposalWrapper]:
mcps: List[MetadataChangeProposalWrapper] = []
# table.dataset should always be set, but we check it just in case.
parameters = table.dataset.parameters if table.dataset else None
upstreams: List[UpstreamClass] = []
upstream_tables: List[resolver.DataPlatformTable] = parser.get_upstream_tables(
table, self.__reporter
table, self.__reporter, parameters=parameters
)
for upstream_table in upstream_tables:

View File

@ -55,6 +55,9 @@ class Table:
full_name: str
expression: Optional[str]
# Pointer to the parent dataset.
dataset: Optional["PowerBIDataset"] = None
@dataclass
class PowerBIDataset:
@ -62,6 +65,8 @@ class PowerBIDataset:
name: Optional[str]
webUrl: Optional[str]
workspace_id: str
parameters: Optional[Dict[str, str]]
# Table in datasets
tables: List["Table"]
tags: List[str]
@ -186,6 +191,7 @@ def new_powerbi_dataset(workspace_id: str, raw_instance: dict) -> PowerBIDataset
if raw_instance.get("webUrl") is not None
else None,
workspace_id=workspace_id,
parameters=None,
tables=[],
tags=[],
)

View File

@ -2,7 +2,7 @@ import logging
import math
from abc import ABC, abstractmethod
from time import sleep
from typing import Any, List, Optional
from typing import Any, Dict, List, Optional
import msal
import requests
@ -99,6 +99,11 @@ class DataResolverBase(ABC):
) -> Optional[PowerBIDataset]:
pass
def get_dataset_parameters(
self, workspace_id: str, dataset_id: str
) -> Optional[Dict[str, str]]:
return None
@abstractmethod
def get_users(self, workspace_id: str, entity: str, entity_id: str) -> List[User]:
pass
@ -111,7 +116,6 @@ class DataResolverBase(ABC):
def get_access_token(self):
if self.__access_token is not None:
logger.debug("Returning the cached access token")
return self.__access_token
logger.info("Generating PowerBi access token")
@ -394,6 +398,38 @@ class RegularAPIResolver(DataResolverBase):
# None/details
return new_powerbi_dataset(workspace_id, response_dict)
def get_dataset_parameters(
self, workspace_id: str, dataset_id: str
) -> Optional[Dict[str, str]]:
dataset_get_endpoint: str = RegularAPIResolver.API_ENDPOINTS[
Constant.DATASET_GET
]
dataset_get_endpoint = dataset_get_endpoint.format(
POWERBI_BASE_URL=DataResolverBase.BASE_URL,
WORKSPACE_ID=workspace_id,
DATASET_ID=dataset_id,
)
logger.debug(f"Request to dataset URL={dataset_get_endpoint}")
params_get_endpoint = dataset_get_endpoint + "/parameters"
params_response = self._request_session.get(
params_get_endpoint,
headers=self.get_authorization_header(),
)
params_response.raise_for_status()
params_dict = params_response.json()
params_values: Optional[List] = params_dict.get(Constant.VALUE)
if params_values:
logger.debug(f"dataset {dataset_id} parameters = {params_values}")
return {
value[Constant.NAME]: value[Constant.CURRENT_VALUE]
for value in params_values
}
else:
logger.debug(f"dataset {dataset_id} has no parameters")
return {}
def get_groups_endpoint(self) -> str:
return DataResolverBase.BASE_URL
@ -525,7 +561,7 @@ class AdminAPIResolver(DataResolverBase):
logger.debug(f"Hitting URL={scan_get_endpoint}")
retry = 1
while True:
logger.info(f"retry = {retry}")
logger.debug(f"retry = {retry}")
res = self._request_session.get(
scan_get_endpoint,
headers=self.get_authorization_header(),
@ -546,7 +582,7 @@ class AdminAPIResolver(DataResolverBase):
)
break
logger.info(
logger.debug(
f"Waiting to check for scan job completion for {minimum_sleep_seconds} seconds."
)
sleep(minimum_sleep_seconds)
@ -562,7 +598,7 @@ class AdminAPIResolver(DataResolverBase):
max_retry: int = AdminAPIResolver._calculate_max_retry(
minimum_sleep_seconds, timeout
)
logger.info(f"Max trial {max_retry}")
# logger.info(f"Max trial {max_retry}")
scan_get_endpoint = AdminAPIResolver.API_ENDPOINTS[Constant.SCAN_GET]
scan_get_endpoint = scan_get_endpoint.format(

View File

@ -277,6 +277,18 @@ class PowerBiAPI:
dataset_id=dataset_dict[Constant.ID],
)
# fetch + set dataset parameters
try:
dataset_parameters = self._get_resolver().get_dataset_parameters(
workspace_id=scan_result[Constant.ID],
dataset_id=dataset_dict[Constant.ID],
)
dataset_instance.parameters = dataset_parameters
except Exception as e:
logger.info(
f"Unable to fetch dataset parameters for {dataset_dict[Constant.ID]}: {e}"
)
if self.__config.extract_endorsements_to_tags:
dataset_instance.tags = self._parse_endorsement(
dataset_dict.get(Constant.ENDORSEMENT_DETAIL, None)
@ -305,6 +317,7 @@ class PowerBiAPI:
table[Constant.NAME].replace(" ", "_"),
),
expression=expression,
dataset=dataset_instance,
)
)

View File

@ -32,6 +32,7 @@ M_QUERIES = [
'let\n Source = Sql.Database("localhost", "library"),\n dbo_book_issue = Source{[Schema="dbo",Item="book_issue"]}[Data]\n in dbo_book_issue',
'let\n Source = Snowflake.Databases("xaa48144.snowflakecomputing.com","GSL_TEST_WH",[Role="ACCOUNTADMIN"]),\n GSL_TEST_DB_Database = Source{[Name="GSL_TEST_DB",Kind="Database"]}[Data],\n PUBLIC_Schema = GSL_TEST_DB_Database{[Name="PUBLIC",Kind="Schema"]}[Data],\n SALES_FORECAST_Table = PUBLIC_Schema{[Name="SALES_FORECAST",Kind="Table"]}[Data],\n SALES_ANALYST_Table = PUBLIC_Schema{[Name="SALES_ANALYST",Kind="Table"]}[Data],\n RESULT = Table.Combine({SALES_FORECAST_Table, SALES_ANALYST_Table})\n\nin\n RESULT',
'let\n Source = GoogleBigQuery.Database(),\n #"seraphic-music-344307" = Source{[Name="seraphic-music-344307"]}[Data],\n school_dataset_Schema = #"seraphic-music-344307"{[Name="school_dataset",Kind="Schema"]}[Data],\n first_Table = school_dataset_Schema{[Name="first",Kind="Table"]}[Data]\nin\n first_Table',
'let \nSource = GoogleBigQuery.Database([BillingProject = #"Parameter - Source"]),\n#"gcp-project" = Source{[Name=#"Parameter - Source"]}[Data],\ngcp_billing_Schema = #"gcp-project"{[Name=#"My bq project",Kind="Schema"]}[Data],\nF_GCP_COST_Table = gcp_billing_Schema{[Name="GCP_TABLE",Kind="Table"]}[Data]\nin\nF_GCP_COST_Table',
]
@ -297,7 +298,7 @@ def test_snowflake_native_query():
)
def test_google_big_query():
def test_google_bigquery_1():
table: powerbi_data_classes.Table = powerbi_data_classes.Table(
expression=M_QUERIES[17],
name="first",
@ -317,6 +318,36 @@ def test_google_big_query():
)
def test_google_bigquery_2():
# The main purpose of this test is actually to validate that we're handling parameter
# references correctly.
table: powerbi_data_classes.Table = powerbi_data_classes.Table(
expression=M_QUERIES[18],
name="gcp_table",
full_name="my-test-project.gcp_billing.GCP_TABLE",
)
reporter = PowerBiDashboardSourceReport()
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
table,
reporter,
native_query_enabled=False,
parameters={
"Parameter - Source": "my-test-project",
"My bq project": "gcp_billing",
},
)
assert len(data_platform_tables) == 1
assert data_platform_tables[0].name == table.full_name.split(".")[2]
assert data_platform_tables[0].full_name == table.full_name
assert (
data_platform_tables[0].data_platform_pair.powerbi_data_platform_name
== SupportedDataPlatform.GOOGLE_BIGQUERY.value.powerbi_data_platform_name
)
@pytest.mark.integration
def test_native_query_disabled():
table: powerbi_data_classes.Table = powerbi_data_classes.Table(