refactor(ingest/powerbi): organize code within the module based on responsibilities (#11924)

This commit is contained in:
sid-acryl 2024-11-27 23:02:24 +05:30 committed by GitHub
parent 36afa5cfa3
commit 7bf7673735
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 1042 additions and 974 deletions

View File

@ -773,7 +773,7 @@ entry_points = {
"trino = datahub.ingestion.source.sql.trino:TrinoSource",
"starburst-trino-usage = datahub.ingestion.source.usage.starburst_trino_usage:TrinoUsageSource",
"nifi = datahub.ingestion.source.nifi:NifiSource",
"powerbi = datahub.ingestion.source.powerbi:PowerBiDashboardSource",
"powerbi = datahub.ingestion.source.powerbi.powerbi:PowerBiDashboardSource",
"powerbi-report-server = datahub.ingestion.source.powerbi_report_server:PowerBiReportServerDashboardSource",
"iceberg = datahub.ingestion.source.iceberg.iceberg:IcebergSource",
"vertica = datahub.ingestion.source.sql.vertica:VerticaSource",

View File

@ -1 +0,0 @@
from datahub.ingestion.source.powerbi.powerbi import PowerBiDashboardSource

View File

@ -173,7 +173,7 @@ class SupportedDataPlatform(Enum):
datahub_data_platform_name="redshift",
)
DATABRICK_SQL = DataPlatformPair(
DATABRICKS_SQL = DataPlatformPair(
powerbi_data_platform_name="Databricks", datahub_data_platform_name="databricks"
)
@ -313,8 +313,8 @@ class PowerBiDashboardSourceConfig(
" Note: This field works in conjunction with 'workspace_type_filter' and both must be considered when filtering workspaces.",
)
# Dataset type mapping PowerBI support many type of data-sources. Here user need to define what type of PowerBI
# DataSource need to be mapped to corresponding DataHub Platform DataSource. For example PowerBI `Snowflake` is
# Dataset type mapping PowerBI support many type of data-sources. Here user needs to define what type of PowerBI
# DataSource needs to be mapped to corresponding DataHub Platform DataSource. For example, PowerBI `Snowflake` is
# mapped to DataHub `snowflake` PowerBI `PostgreSQL` is mapped to DataHub `postgres` and so on.
dataset_type_mapping: Union[
Dict[str, str], Dict[str, PlatformDetail]

View File

@ -1,10 +1,14 @@
import os
from abc import ABC
from dataclasses import dataclass
from typing import Any, Dict, Optional
from enum import Enum
from typing import Any, Dict, List, Optional
from lark import Tree
from datahub.ingestion.source.powerbi.config import DataPlatformPair
from datahub.sql_parsing.sqlglot_lineage import ColumnLineageInfo
TRACE_POWERBI_MQUERY_PARSER = os.getenv("DATAHUB_TRACE_POWERBI_MQUERY_PARSER", False)
@ -30,7 +34,7 @@ class IdentifierAccessor(AbstractIdentifierAccessor):
"[Schema="public",Item="order_date"]" is "items" in ItemSelector. Data of items varies as per DataSource
"public_order_date" is in "next" of ItemSelector. The "next" will be None if this identifier is leaf i.e. table
"public_order_date" is in "next" of ItemSelector. The "next" will be None if this identifier is leaf i.e., table
"""
@ -53,3 +57,31 @@ class ReferencedTable:
database: str
schema: str
table: str
@dataclass
class DataPlatformTable:
data_platform_pair: DataPlatformPair
urn: str
@dataclass
class Lineage:
upstreams: List[DataPlatformTable]
column_lineage: List[ColumnLineageInfo]
@staticmethod
def empty() -> "Lineage":
return Lineage(upstreams=[], column_lineage=[])
class FunctionName(Enum):
NATIVE_QUERY = "Value.NativeQuery"
POSTGRESQL_DATA_ACCESS = "PostgreSQL.Database"
ORACLE_DATA_ACCESS = "Oracle.Database"
SNOWFLAKE_DATA_ACCESS = "Snowflake.Databases"
MSSQL_DATA_ACCESS = "Sql.Database"
DATABRICK_DATA_ACCESS = "Databricks.Catalogs"
GOOGLE_BIGQUERY_DATA_ACCESS = "GoogleBigQuery.Database"
AMAZON_REDSHIFT_DATA_ACCESS = "AmazonRedshift.Database"
DATABRICK_MULTI_CLOUD_DATA_ACCESS = "DatabricksMultiCloud.Catalogs"

View File

@ -7,6 +7,7 @@ from typing import Dict, List
import lark
from lark import Lark, Tree
import datahub.ingestion.source.powerbi.m_query.data_classes
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.powerbi.config import (
PowerBiDashboardSourceConfig,
@ -65,7 +66,7 @@ def get_upstream_tables(
ctx: PipelineContext,
config: PowerBiDashboardSourceConfig,
parameters: Dict[str, str] = {},
) -> List[resolver.Lineage]:
) -> List[datahub.ingestion.source.powerbi.m_query.data_classes.Lineage]:
if table.expression is None:
logger.debug(f"There is no M-Query expression in table {table.full_name}")
return []
@ -127,12 +128,14 @@ def get_upstream_tables(
reporter.m_query_parse_successes += 1
try:
lineage: List[resolver.Lineage] = resolver.MQueryResolver(
lineage: List[
datahub.ingestion.source.powerbi.m_query.data_classes.Lineage
] = resolver.MQueryResolver(
table=table,
parse_tree=parse_tree,
reporter=reporter,
parameters=parameters,
).resolve_to_data_platform_table_list(
).resolve_to_lineage(
ctx=ctx,
config=config,
platform_instance_resolver=platform_instance_resolver,

View File

@ -0,0 +1,920 @@
import logging
from abc import ABC, abstractmethod
from enum import Enum
from typing import Dict, List, Optional, Tuple, Type, Union, cast
from lark import Tree
from datahub.emitter import mce_builder as builder
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.powerbi.config import (
Constant,
DataBricksPlatformDetail,
DataPlatformPair,
PlatformDetail,
PowerBiDashboardSourceConfig,
PowerBiDashboardSourceReport,
PowerBIPlatformDetail,
SupportedDataPlatform,
)
from datahub.ingestion.source.powerbi.dataplatform_instance_resolver import (
AbstractDataPlatformInstanceResolver,
)
from datahub.ingestion.source.powerbi.m_query import native_sql_parser, tree_function
from datahub.ingestion.source.powerbi.m_query.data_classes import (
AbstractIdentifierAccessor,
DataAccessFunctionDetail,
DataPlatformTable,
FunctionName,
IdentifierAccessor,
Lineage,
ReferencedTable,
)
from datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes import Table
from datahub.sql_parsing.sqlglot_lineage import SqlParsingResult
logger = logging.getLogger(__name__)
def get_next_item(items: List[str], item: str) -> Optional[str]:
if item in items:
try:
index = items.index(item)
return items[index + 1]
except IndexError:
logger.debug(f'item:"{item}", not found in item-list: {items}')
return None
def urn_to_lowercase(value: str, flag: bool) -> str:
if flag is True:
return value.lower()
return value
def make_urn(
config: PowerBiDashboardSourceConfig,
platform_instance_resolver: AbstractDataPlatformInstanceResolver,
data_platform_pair: DataPlatformPair,
server: str,
qualified_table_name: str,
) -> str:
platform_detail: PlatformDetail = platform_instance_resolver.get_platform_instance(
PowerBIPlatformDetail(
data_platform_pair=data_platform_pair,
data_platform_server=server,
)
)
return builder.make_dataset_urn_with_platform_instance(
platform=data_platform_pair.datahub_data_platform_name,
platform_instance=platform_detail.platform_instance,
env=platform_detail.env,
name=urn_to_lowercase(
qualified_table_name, config.convert_lineage_urns_to_lowercase
),
)
class AbstractLineage(ABC):
"""
Base class to share common functionalities among different dataplatform for M-Query parsing.
To create qualified table name we need to parse M-Query data-access-functions(https://learn.microsoft.com/en-us/powerquery-m/accessing-data-functions) and
the data-access-functions has some define pattern to access database-name, schema-name and table-name, for example, see below M-Query.
let
Source = Sql.Database("localhost", "library"),
dbo_book_issue = Source{[Schema="dbo",Item="book_issue"]}[Data]
in
dbo_book_issue
It is MSSQL M-Query and Sql.Database is the data-access-function to access MSSQL. If this function is available in M-Query then database name is available in the second argument of the first statement and schema-name and table-name is available in the second statement. the second statement can be repeated to access different tables from MSSQL.
DefaultTwoStepDataAccessSources extends the AbstractDataPlatformTableCreator and provides the common functionalities for data-platform which has above type of M-Query pattern
data-access-function varies as per data-platform for example for MySQL.Database for MySQL, PostgreSQL.Database for Postgres and Oracle.Database for Oracle and number of statement to
find out database-name , schema-name and table-name also varies as per dataplatform.
Value.NativeQuery is one of the functions which is used to execute a native query inside M-Query, for example see below M-Query
let
Source = Value.NativeQuery(AmazonRedshift.Database("redshift-url","dev"), "select * from dev.public.category", null, [EnableFolding=true])
in
Source
In this M-Query database-name is available in first argument and rest of the detail i.e database & schema is available in native query.
NativeQueryDataPlatformTableCreator extends AbstractDataPlatformTableCreator to support Redshift and Snowflake native query parsing.
"""
ctx: PipelineContext
table: Table
config: PowerBiDashboardSourceConfig
reporter: PowerBiDashboardSourceReport
platform_instance_resolver: AbstractDataPlatformInstanceResolver
def __init__(
self,
ctx: PipelineContext,
table: Table,
config: PowerBiDashboardSourceConfig,
reporter: PowerBiDashboardSourceReport,
platform_instance_resolver: AbstractDataPlatformInstanceResolver,
) -> None:
super().__init__()
self.ctx = ctx
self.table = table
self.config = config
self.reporter = reporter
self.platform_instance_resolver = platform_instance_resolver
@abstractmethod
def create_lineage(
self, data_access_func_detail: DataAccessFunctionDetail
) -> Lineage:
pass
@abstractmethod
def get_platform_pair(self) -> DataPlatformPair:
pass
@staticmethod
def get_db_detail_from_argument(
arg_list: Tree,
) -> Tuple[Optional[str], Optional[str]]:
arguments: List[str] = tree_function.strip_char_from_list(
values=tree_function.remove_whitespaces_from_list(
tree_function.token_values(arg_list)
),
)
if len(arguments) < 2:
logger.debug(f"Expected minimum 2 arguments, but got {len(arguments)}")
return None, None
return arguments[0], arguments[1]
@staticmethod
def create_reference_table(
arg_list: Tree,
table_detail: Dict[str, str],
) -> Optional[ReferencedTable]:
arguments: List[str] = tree_function.strip_char_from_list(
values=tree_function.remove_whitespaces_from_list(
tree_function.token_values(arg_list)
),
)
logger.debug(f"Processing arguments {arguments}")
if (
len(arguments)
>= 4 # [0] is warehouse FQDN.
# [1] is endpoint, we are not using it.
# [2] is "Catalog" key
# [3] is catalog's value
):
return ReferencedTable(
warehouse=arguments[0],
catalog=arguments[3],
# As per my observation, database and catalog names are same in M-Query
database=table_detail["Database"]
if table_detail.get("Database")
else arguments[3],
schema=table_detail["Schema"],
table=table_detail.get("Table") or table_detail["View"],
)
elif len(arguments) == 2:
return ReferencedTable(
warehouse=arguments[0],
database=table_detail["Database"],
schema=table_detail["Schema"],
table=table_detail.get("Table") or table_detail["View"],
catalog=None,
)
return None
def parse_custom_sql(
self, query: str, server: str, database: Optional[str], schema: Optional[str]
) -> Lineage:
dataplatform_tables: List[DataPlatformTable] = []
platform_detail: PlatformDetail = (
self.platform_instance_resolver.get_platform_instance(
PowerBIPlatformDetail(
data_platform_pair=self.get_platform_pair(),
data_platform_server=server,
)
)
)
query = native_sql_parser.remove_drop_statement(
native_sql_parser.remove_special_characters(query)
)
parsed_result: Optional[
"SqlParsingResult"
] = native_sql_parser.parse_custom_sql(
ctx=self.ctx,
query=query,
platform=self.get_platform_pair().datahub_data_platform_name,
platform_instance=platform_detail.platform_instance,
env=platform_detail.env,
database=database,
schema=schema,
)
if parsed_result is None:
self.reporter.info(
title=Constant.SQL_PARSING_FAILURE,
message="Fail to parse native sql present in PowerBI M-Query",
context=f"table-name={self.table.full_name}, sql={query}",
)
return Lineage.empty()
if parsed_result.debug_info and parsed_result.debug_info.table_error:
self.reporter.warning(
title=Constant.SQL_PARSING_FAILURE,
message="Fail to parse native sql present in PowerBI M-Query",
context=f"table-name={self.table.full_name}, error={parsed_result.debug_info.table_error},sql={query}",
)
return Lineage.empty()
for urn in parsed_result.in_tables:
dataplatform_tables.append(
DataPlatformTable(
data_platform_pair=self.get_platform_pair(),
urn=urn,
)
)
logger.debug(f"Native Query parsed result={parsed_result}")
logger.debug(f"Generated dataplatform_tables={dataplatform_tables}")
return Lineage(
upstreams=dataplatform_tables,
column_lineage=(
parsed_result.column_lineage
if parsed_result.column_lineage is not None
else []
),
)
class AmazonRedshiftLineage(AbstractLineage):
def get_platform_pair(self) -> DataPlatformPair:
return SupportedDataPlatform.AMAZON_REDSHIFT.value
def create_lineage(
self, data_access_func_detail: DataAccessFunctionDetail
) -> Lineage:
logger.debug(
f"Processing AmazonRedshift data-access function detail {data_access_func_detail}"
)
server, db_name = self.get_db_detail_from_argument(
data_access_func_detail.arg_list
)
if db_name is None or server is None:
return Lineage.empty() # Return an empty list
schema_name: str = cast(
IdentifierAccessor, data_access_func_detail.identifier_accessor
).items["Name"]
table_name: str = cast(
IdentifierAccessor,
cast(IdentifierAccessor, data_access_func_detail.identifier_accessor).next,
).items["Name"]
qualified_table_name: str = f"{db_name}.{schema_name}.{table_name}"
urn = make_urn(
config=self.config,
platform_instance_resolver=self.platform_instance_resolver,
data_platform_pair=self.get_platform_pair(),
server=server,
qualified_table_name=qualified_table_name,
)
return Lineage(
upstreams=[
DataPlatformTable(
data_platform_pair=self.get_platform_pair(),
urn=urn,
)
],
column_lineage=[],
)
class OracleLineage(AbstractLineage):
def get_platform_pair(self) -> DataPlatformPair:
return SupportedDataPlatform.ORACLE.value
@staticmethod
def _get_server_and_db_name(value: str) -> Tuple[Optional[str], Optional[str]]:
error_message: str = (
f"The target argument ({value}) should in the format of <host-name>:<port>/<db-name>["
".<domain>]"
)
splitter_result: List[str] = value.split("/")
if len(splitter_result) != 2:
logger.debug(error_message)
return None, None
db_name = splitter_result[1].split(".")[0]
return tree_function.strip_char_from_list([splitter_result[0]])[0], db_name
def create_lineage(
self, data_access_func_detail: DataAccessFunctionDetail
) -> Lineage:
logger.debug(
f"Processing Oracle data-access function detail {data_access_func_detail}"
)
arguments: List[str] = tree_function.remove_whitespaces_from_list(
tree_function.token_values(data_access_func_detail.arg_list)
)
server, db_name = self._get_server_and_db_name(arguments[0])
if db_name is None or server is None:
return Lineage.empty()
schema_name: str = cast(
IdentifierAccessor, data_access_func_detail.identifier_accessor
).items["Schema"]
table_name: str = cast(
IdentifierAccessor,
cast(IdentifierAccessor, data_access_func_detail.identifier_accessor).next,
).items["Name"]
qualified_table_name: str = f"{db_name}.{schema_name}.{table_name}"
urn = make_urn(
config=self.config,
platform_instance_resolver=self.platform_instance_resolver,
data_platform_pair=self.get_platform_pair(),
server=server,
qualified_table_name=qualified_table_name,
)
return Lineage(
upstreams=[
DataPlatformTable(
data_platform_pair=self.get_platform_pair(),
urn=urn,
)
],
column_lineage=[],
)
class DatabricksLineage(AbstractLineage):
def form_qualified_table_name(
self,
table_reference: ReferencedTable,
data_platform_pair: DataPlatformPair,
) -> str:
platform_detail: PlatformDetail = (
self.platform_instance_resolver.get_platform_instance(
PowerBIPlatformDetail(
data_platform_pair=data_platform_pair,
data_platform_server=table_reference.warehouse,
)
)
)
metastore: Optional[str] = None
qualified_table_name: str = f"{table_reference.database}.{table_reference.schema}.{table_reference.table}"
if isinstance(platform_detail, DataBricksPlatformDetail):
metastore = platform_detail.metastore
if metastore is not None:
return f"{metastore}.{qualified_table_name}"
return qualified_table_name
def create_lineage(
self, data_access_func_detail: DataAccessFunctionDetail
) -> Lineage:
logger.debug(
f"Processing Databrick data-access function detail {data_access_func_detail}"
)
table_detail: Dict[str, str] = {}
temp_accessor: Optional[
Union[IdentifierAccessor, AbstractIdentifierAccessor]
] = data_access_func_detail.identifier_accessor
while temp_accessor:
if isinstance(temp_accessor, IdentifierAccessor):
# Condition to handle databricks M-query pattern where table, schema and database all are present in
# the same invoke statement
if all(
element in temp_accessor.items
for element in ["Item", "Schema", "Catalog"]
):
table_detail["Schema"] = temp_accessor.items["Schema"]
table_detail["Table"] = temp_accessor.items["Item"]
else:
table_detail[temp_accessor.items["Kind"]] = temp_accessor.items[
"Name"
]
if temp_accessor.next is not None:
temp_accessor = temp_accessor.next
else:
break
else:
logger.debug(
"expecting instance to be IdentifierAccessor, please check if parsing is done properly"
)
return Lineage.empty()
table_reference = self.create_reference_table(
arg_list=data_access_func_detail.arg_list,
table_detail=table_detail,
)
if table_reference:
qualified_table_name: str = self.form_qualified_table_name(
table_reference=table_reference,
data_platform_pair=self.get_platform_pair(),
)
urn = make_urn(
config=self.config,
platform_instance_resolver=self.platform_instance_resolver,
data_platform_pair=self.get_platform_pair(),
server=table_reference.warehouse,
qualified_table_name=qualified_table_name,
)
return Lineage(
upstreams=[
DataPlatformTable(
data_platform_pair=self.get_platform_pair(),
urn=urn,
)
],
column_lineage=[],
)
return Lineage.empty()
def get_platform_pair(self) -> DataPlatformPair:
return SupportedDataPlatform.DATABRICKS_SQL.value
class TwoStepDataAccessPattern(AbstractLineage, ABC):
"""
These are the DataSource for which PowerBI Desktop generates default M-Query of the following pattern
let
Source = Sql.Database("localhost", "library"),
dbo_book_issue = Source{[Schema="dbo",Item="book_issue"]}[Data]
in
dbo_book_issue
"""
def two_level_access_pattern(
self, data_access_func_detail: DataAccessFunctionDetail
) -> Lineage:
logger.debug(
f"Processing {self.get_platform_pair().powerbi_data_platform_name} data-access function detail {data_access_func_detail}"
)
server, db_name = self.get_db_detail_from_argument(
data_access_func_detail.arg_list
)
if server is None or db_name is None:
return Lineage.empty() # Return an empty list
schema_name: str = cast(
IdentifierAccessor, data_access_func_detail.identifier_accessor
).items["Schema"]
table_name: str = cast(
IdentifierAccessor, data_access_func_detail.identifier_accessor
).items["Item"]
qualified_table_name: str = f"{db_name}.{schema_name}.{table_name}"
logger.debug(
f"Platform({self.get_platform_pair().datahub_data_platform_name}) qualified_table_name= {qualified_table_name}"
)
urn = make_urn(
config=self.config,
platform_instance_resolver=self.platform_instance_resolver,
data_platform_pair=self.get_platform_pair(),
server=server,
qualified_table_name=qualified_table_name,
)
return Lineage(
upstreams=[
DataPlatformTable(
data_platform_pair=self.get_platform_pair(),
urn=urn,
)
],
column_lineage=[],
)
class PostgresLineage(TwoStepDataAccessPattern):
def create_lineage(
self, data_access_func_detail: DataAccessFunctionDetail
) -> Lineage:
return self.two_level_access_pattern(data_access_func_detail)
def get_platform_pair(self) -> DataPlatformPair:
return SupportedDataPlatform.POSTGRES_SQL.value
class MSSqlLineage(TwoStepDataAccessPattern):
# https://learn.microsoft.com/en-us/sql/relational-databases/security/authentication-access/ownership-and-user-schema-separation?view=sql-server-ver16
DEFAULT_SCHEMA = "dbo" # Default schema name in MS-SQL is dbo
def get_platform_pair(self) -> DataPlatformPair:
return SupportedDataPlatform.MS_SQL.value
def create_urn_using_old_parser(
self, query: str, db_name: str, server: str
) -> List[DataPlatformTable]:
dataplatform_tables: List[DataPlatformTable] = []
tables: List[str] = native_sql_parser.get_tables(query)
for parsed_table in tables:
# components: List[str] = [v.strip("[]") for v in parsed_table.split(".")]
components = [v.strip("[]") for v in parsed_table.split(".")]
if len(components) == 3:
database, schema, table = components
elif len(components) == 2:
schema, table = components
database = db_name
elif len(components) == 1:
(table,) = components
database = db_name
schema = MSSqlLineage.DEFAULT_SCHEMA
else:
self.reporter.warning(
title="Invalid table format",
message="The advanced SQL lineage feature (enable_advance_lineage_sql_construct) is disabled. Please either enable this feature or ensure the table is referenced as <db-name>.<schema-name>.<table-name> in the SQL.",
context=f"table-name={self.table.full_name}",
)
continue
qualified_table_name = f"{database}.{schema}.{table}"
urn = make_urn(
config=self.config,
platform_instance_resolver=self.platform_instance_resolver,
data_platform_pair=self.get_platform_pair(),
server=server,
qualified_table_name=qualified_table_name,
)
dataplatform_tables.append(
DataPlatformTable(
data_platform_pair=self.get_platform_pair(),
urn=urn,
)
)
logger.debug(f"Generated upstream tables = {dataplatform_tables}")
return dataplatform_tables
def create_lineage(
self, data_access_func_detail: DataAccessFunctionDetail
) -> Lineage:
arguments: List[str] = tree_function.strip_char_from_list(
values=tree_function.remove_whitespaces_from_list(
tree_function.token_values(data_access_func_detail.arg_list)
),
)
server, database = self.get_db_detail_from_argument(
data_access_func_detail.arg_list
)
if server is None or database is None:
return Lineage.empty() # Return an empty list
assert server
assert database # to silent the lint
query: Optional[str] = get_next_item(arguments, "Query")
if query:
if self.config.enable_advance_lineage_sql_construct is False:
# Use previous parser to generate URN to keep backward compatibility
return Lineage(
upstreams=self.create_urn_using_old_parser(
query=query,
db_name=database,
server=server,
),
column_lineage=[],
)
return self.parse_custom_sql(
query=query,
database=database,
server=server,
schema=MSSqlLineage.DEFAULT_SCHEMA,
)
# It is a regular case of MS-SQL
logger.debug("Handling with regular case")
return self.two_level_access_pattern(data_access_func_detail)
class ThreeStepDataAccessPattern(AbstractLineage, ABC):
def get_datasource_server(
self, arguments: List[str], data_access_func_detail: DataAccessFunctionDetail
) -> str:
return tree_function.strip_char_from_list([arguments[0]])[0]
def create_lineage(
self, data_access_func_detail: DataAccessFunctionDetail
) -> Lineage:
logger.debug(
f"Processing {self.get_platform_pair().datahub_data_platform_name} function detail {data_access_func_detail}"
)
arguments: List[str] = tree_function.remove_whitespaces_from_list(
tree_function.token_values(data_access_func_detail.arg_list)
)
# First is database name
db_name: str = data_access_func_detail.identifier_accessor.items["Name"] # type: ignore
# Second is schema name
schema_name: str = cast(
IdentifierAccessor, data_access_func_detail.identifier_accessor.next # type: ignore
).items["Name"]
# Third is table name
table_name: str = cast(
IdentifierAccessor, data_access_func_detail.identifier_accessor.next.next # type: ignore
).items["Name"]
qualified_table_name: str = f"{db_name}.{schema_name}.{table_name}"
logger.debug(
f"{self.get_platform_pair().datahub_data_platform_name} qualified_table_name {qualified_table_name}"
)
server: str = self.get_datasource_server(arguments, data_access_func_detail)
urn = make_urn(
config=self.config,
platform_instance_resolver=self.platform_instance_resolver,
data_platform_pair=self.get_platform_pair(),
server=server,
qualified_table_name=qualified_table_name,
)
return Lineage(
upstreams=[
DataPlatformTable(
data_platform_pair=self.get_platform_pair(),
urn=urn,
)
],
column_lineage=[],
)
class SnowflakeLineage(ThreeStepDataAccessPattern):
def get_platform_pair(self) -> DataPlatformPair:
return SupportedDataPlatform.SNOWFLAKE.value
class GoogleBigQueryLineage(ThreeStepDataAccessPattern):
def get_platform_pair(self) -> DataPlatformPair:
return SupportedDataPlatform.GOOGLE_BIGQUERY.value
def get_datasource_server(
self, arguments: List[str], data_access_func_detail: DataAccessFunctionDetail
) -> str:
# In Google BigQuery server is project-name
# condition to silent lint, it is not going to be None
return (
data_access_func_detail.identifier_accessor.items["Name"]
if data_access_func_detail.identifier_accessor is not None
else ""
)
class NativeQueryLineage(AbstractLineage):
SUPPORTED_NATIVE_QUERY_DATA_PLATFORM: dict = {
SupportedDataPlatform.SNOWFLAKE.value.powerbi_data_platform_name: SupportedDataPlatform.SNOWFLAKE,
SupportedDataPlatform.AMAZON_REDSHIFT.value.powerbi_data_platform_name: SupportedDataPlatform.AMAZON_REDSHIFT,
SupportedDataPlatform.DatabricksMultiCloud_SQL.value.powerbi_data_platform_name: SupportedDataPlatform.DatabricksMultiCloud_SQL,
}
current_data_platform: SupportedDataPlatform = SupportedDataPlatform.SNOWFLAKE
def get_platform_pair(self) -> DataPlatformPair:
return self.current_data_platform.value
@staticmethod
def is_native_parsing_supported(data_access_function_name: str) -> bool:
return (
data_access_function_name
in NativeQueryLineage.SUPPORTED_NATIVE_QUERY_DATA_PLATFORM
)
def create_urn_using_old_parser(self, query: str, server: str) -> Lineage:
dataplatform_tables: List[DataPlatformTable] = []
tables: List[str] = native_sql_parser.get_tables(query)
for qualified_table_name in tables:
if len(qualified_table_name.split(".")) != 3:
logger.debug(
f"Skipping table {qualified_table_name} as it is not as per qualified_table_name format"
)
continue
urn = make_urn(
config=self.config,
platform_instance_resolver=self.platform_instance_resolver,
data_platform_pair=self.get_platform_pair(),
server=server,
qualified_table_name=qualified_table_name,
)
dataplatform_tables.append(
DataPlatformTable(
data_platform_pair=self.get_platform_pair(),
urn=urn,
)
)
logger.debug(f"Generated dataplatform_tables {dataplatform_tables}")
return Lineage(
upstreams=dataplatform_tables,
column_lineage=[],
)
def get_db_name(self, data_access_tokens: List[str]) -> Optional[str]:
if (
data_access_tokens[0]
!= SupportedDataPlatform.DatabricksMultiCloud_SQL.value.powerbi_data_platform_name
):
return None
database: Optional[str] = get_next_item(data_access_tokens, "Database")
if (
database and database != Constant.M_QUERY_NULL
): # database name is explicitly set
return database
return get_next_item( # database name is set in Name argument
data_access_tokens, "Name"
) or get_next_item( # If both above arguments are not available, then try Catalog
data_access_tokens, "Catalog"
)
def create_lineage(
self, data_access_func_detail: DataAccessFunctionDetail
) -> Lineage:
t1: Tree = cast(
Tree, tree_function.first_arg_list_func(data_access_func_detail.arg_list)
)
flat_argument_list: List[Tree] = tree_function.flat_argument_list(t1)
if len(flat_argument_list) != 2:
logger.debug(
f"Expecting 2 argument, actual argument count is {len(flat_argument_list)}"
)
logger.debug(f"Flat argument list = {flat_argument_list}")
return Lineage.empty()
data_access_tokens: List[str] = tree_function.remove_whitespaces_from_list(
tree_function.token_values(flat_argument_list[0])
)
if not self.is_native_parsing_supported(data_access_tokens[0]):
logger.debug(
f"Unsupported native-query data-platform = {data_access_tokens[0]}"
)
logger.debug(
f"NativeQuery is supported only for {self.SUPPORTED_NATIVE_QUERY_DATA_PLATFORM}"
)
return Lineage.empty()
if len(data_access_tokens[0]) < 3:
logger.debug(
f"Server is not available in argument list for data-platform {data_access_tokens[0]}. Returning empty "
"list"
)
return Lineage.empty()
self.current_data_platform = self.SUPPORTED_NATIVE_QUERY_DATA_PLATFORM[
data_access_tokens[0]
]
# The First argument is the query
sql_query: str = tree_function.strip_char_from_list(
values=tree_function.remove_whitespaces_from_list(
tree_function.token_values(flat_argument_list[1])
),
)[
0
] # Remove any whitespaces and double quotes character
server = tree_function.strip_char_from_list([data_access_tokens[2]])[0]
if self.config.enable_advance_lineage_sql_construct is False:
# Use previous parser to generate URN to keep backward compatibility
return self.create_urn_using_old_parser(
query=sql_query,
server=server,
)
database_name: Optional[str] = self.get_db_name(data_access_tokens)
return self.parse_custom_sql(
query=sql_query,
server=server,
database=database_name,
schema=None,
)
class SupportedPattern(Enum):
DATABRICKS_QUERY = (
DatabricksLineage,
FunctionName.DATABRICK_DATA_ACCESS,
)
DATABRICKS_MULTI_CLOUD = (
DatabricksLineage,
FunctionName.DATABRICK_MULTI_CLOUD_DATA_ACCESS,
)
POSTGRES_SQL = (
PostgresLineage,
FunctionName.POSTGRESQL_DATA_ACCESS,
)
ORACLE = (
OracleLineage,
FunctionName.ORACLE_DATA_ACCESS,
)
SNOWFLAKE = (
SnowflakeLineage,
FunctionName.SNOWFLAKE_DATA_ACCESS,
)
MS_SQL = (
MSSqlLineage,
FunctionName.MSSQL_DATA_ACCESS,
)
GOOGLE_BIG_QUERY = (
GoogleBigQueryLineage,
FunctionName.GOOGLE_BIGQUERY_DATA_ACCESS,
)
AMAZON_REDSHIFT = (
AmazonRedshiftLineage,
FunctionName.AMAZON_REDSHIFT_DATA_ACCESS,
)
NATIVE_QUERY = (
NativeQueryLineage,
FunctionName.NATIVE_QUERY,
)
def handler(self) -> Type[AbstractLineage]:
return self.value[0]
def function_name(self) -> str:
return self.value[1].value
@staticmethod
def get_function_names() -> List[str]:
functions: List[str] = []
for supported_resolver in SupportedPattern:
functions.append(supported_resolver.function_name())
return functions
@staticmethod
def get_pattern_handler(function_name: str) -> Optional["SupportedPattern"]:
logger.debug(f"Looking for pattern-handler for {function_name}")
for supported_resolver in SupportedPattern:
if function_name == supported_resolver.function_name():
return supported_resolver
logger.debug(f"pattern-handler not found for function_name {function_name}")
return None

View File

@ -1,7 +1,7 @@
import logging
from typing import Optional, Tuple
from datahub.ingestion.source.powerbi.m_query import resolver
import datahub.ingestion.source.powerbi.m_query.data_classes
logger = logging.getLogger(__name__)
@ -14,12 +14,18 @@ def validate_parse_tree(
:param native_query_enabled: Whether user want to extract lineage from native query
:return: True or False.
"""
function_names = [fun.value for fun in resolver.FunctionName]
function_names = [
fun.value
for fun in datahub.ingestion.source.powerbi.m_query.data_classes.FunctionName
]
if not any(fun in expression for fun in function_names):
return False, "DataAccess function is not present in M-Query expression."
if native_query_enabled is False:
if resolver.FunctionName.NATIVE_QUERY.value in function_names:
if (
datahub.ingestion.source.powerbi.m_query.data_classes.FunctionName.NATIVE_QUERY.value
in function_names
):
return (
False,
"Lineage extraction from native query is disabled. Enable native_query_parsing in recipe",

View File

@ -10,6 +10,7 @@ from typing import Iterable, List, Optional, Tuple, Union
import more_itertools
import datahub.emitter.mce_builder as builder
import datahub.ingestion.source.powerbi.m_query.data_classes
import datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes as powerbi_data_classes
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import ContainerKey, gen_containers
@ -42,12 +43,13 @@ from datahub.ingestion.source.powerbi.config import (
Constant,
PowerBiDashboardSourceConfig,
PowerBiDashboardSourceReport,
SupportedDataPlatform,
)
from datahub.ingestion.source.powerbi.dataplatform_instance_resolver import (
AbstractDataPlatformInstanceResolver,
create_dataplatform_instance_resolver,
)
from datahub.ingestion.source.powerbi.m_query import parser, resolver
from datahub.ingestion.source.powerbi.m_query import parser
from datahub.ingestion.source.powerbi.rest_api_wrapper.powerbi_api import PowerBiAPI
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
@ -182,7 +184,9 @@ class Mapper:
return [schema_mcp]
def make_fine_grained_lineage_class(
self, lineage: resolver.Lineage, dataset_urn: str
self,
lineage: datahub.ingestion.source.powerbi.m_query.data_classes.Lineage,
dataset_urn: str,
) -> List[FineGrainedLineage]:
fine_grained_lineages: List[FineGrainedLineage] = []
@ -234,7 +238,9 @@ class Mapper:
upstream: List[UpstreamClass] = []
cll_lineage: List[FineGrainedLineage] = []
upstream_lineage: List[resolver.Lineage] = parser.get_upstream_tables(
upstream_lineage: List[
datahub.ingestion.source.powerbi.m_query.data_classes.Lineage
] = parser.get_upstream_tables(
table=table,
reporter=self.__reporter,
platform_instance_resolver=self.__dataplatform_instance_resolver,
@ -1294,7 +1300,7 @@ class PowerBiDashboardSource(StatefulIngestionSourceBase, TestableSource):
def validate_dataset_type_mapping(self):
powerbi_data_platforms: List[str] = [
data_platform.value.powerbi_data_platform_name
for data_platform in resolver.SupportedDataPlatform
for data_platform in SupportedDataPlatform
]
for key in self.source_config.dataset_type_mapping.keys():
@ -1481,7 +1487,7 @@ class PowerBiDashboardSource(StatefulIngestionSourceBase, TestableSource):
def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
# As modified_workspaces is not idempotent, hence workunit processors are run later for each workspace_id
# This will result in creating checkpoint for each workspace_id
# This will result in creating a checkpoint for each workspace_id
if self.source_config.modified_since:
return [] # Handle these in get_workunits_internal
else:
@ -1492,7 +1498,7 @@ class PowerBiDashboardSource(StatefulIngestionSourceBase, TestableSource):
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
"""
Datahub Ingestion framework invoke this method
Datahub Ingestion framework invokes this method
"""
logger.info("PowerBi plugin execution is started")
# Validate dataset type mapping

View File

@ -7,6 +7,7 @@ from unittest.mock import MagicMock, patch
import pytest
from lark import Tree
import datahub.ingestion.source.powerbi.m_query.data_classes
import datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes as powerbi_data_classes
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import StructuredLogLevel
@ -18,8 +19,11 @@ from datahub.ingestion.source.powerbi.dataplatform_instance_resolver import (
AbstractDataPlatformInstanceResolver,
create_dataplatform_instance_resolver,
)
from datahub.ingestion.source.powerbi.m_query import parser, resolver, tree_function
from datahub.ingestion.source.powerbi.m_query.resolver import DataPlatformTable, Lineage
from datahub.ingestion.source.powerbi.m_query import parser, tree_function
from datahub.ingestion.source.powerbi.m_query.data_classes import (
DataPlatformTable,
Lineage,
)
pytestmark = pytest.mark.integration_batch_2
@ -62,7 +66,9 @@ M_QUERIES = [
]
def get_data_platform_tables_with_dummy_table(q: str) -> List[resolver.Lineage]:
def get_data_platform_tables_with_dummy_table(
q: str,
) -> List[datahub.ingestion.source.powerbi.m_query.data_classes.Lineage]:
table: powerbi_data_classes.Table = powerbi_data_classes.Table(
columns=[],
measures=[],
@ -759,7 +765,9 @@ def test_sqlglot_parser():
}
)
lineage: List[resolver.Lineage] = parser.get_upstream_tables(
lineage: List[
datahub.ingestion.source.powerbi.m_query.data_classes.Lineage
] = parser.get_upstream_tables(
table,
reporter,
ctx=ctx,
@ -806,7 +814,9 @@ def test_sqlglot_parser():
def test_databricks_multi_cloud():
q = M_QUERIES[25]
lineage: List[resolver.Lineage] = get_data_platform_tables_with_dummy_table(q=q)
lineage: List[
datahub.ingestion.source.powerbi.m_query.data_classes.Lineage
] = get_data_platform_tables_with_dummy_table(q=q)
assert len(lineage) == 1
@ -823,7 +833,9 @@ def test_databricks_multi_cloud():
def test_databricks_catalog_pattern_1():
q = M_QUERIES[26]
lineage: List[resolver.Lineage] = get_data_platform_tables_with_dummy_table(q=q)
lineage: List[
datahub.ingestion.source.powerbi.m_query.data_classes.Lineage
] = get_data_platform_tables_with_dummy_table(q=q)
assert len(lineage) == 1
@ -892,7 +904,9 @@ def test_sqlglot_parser_2():
}
)
lineage: List[resolver.Lineage] = parser.get_upstream_tables(
lineage: List[
datahub.ingestion.source.powerbi.m_query.data_classes.Lineage
] = parser.get_upstream_tables(
table,
reporter,
ctx=ctx,
@ -951,7 +965,9 @@ def test_databricks_regular_case_with_view():
def test_snowflake_double_double_quotes():
q = M_QUERIES[30]
lineage: List[resolver.Lineage] = get_data_platform_tables_with_dummy_table(q=q)
lineage: List[
datahub.ingestion.source.powerbi.m_query.data_classes.Lineage
] = get_data_platform_tables_with_dummy_table(q=q)
assert len(lineage) == 1
@ -968,7 +984,9 @@ def test_snowflake_double_double_quotes():
def test_databricks_multicloud():
q = M_QUERIES[31]
lineage: List[resolver.Lineage] = get_data_platform_tables_with_dummy_table(q=q)
lineage: List[
datahub.ingestion.source.powerbi.m_query.data_classes.Lineage
] = get_data_platform_tables_with_dummy_table(q=q)
assert len(lineage) == 1
@ -985,7 +1003,9 @@ def test_databricks_multicloud():
def test_snowflake_multi_function_call():
q = M_QUERIES[32]
lineage: List[resolver.Lineage] = get_data_platform_tables_with_dummy_table(q=q)
lineage: List[
datahub.ingestion.source.powerbi.m_query.data_classes.Lineage
] = get_data_platform_tables_with_dummy_table(q=q)
assert len(lineage) == 1
@ -1002,7 +1022,9 @@ def test_snowflake_multi_function_call():
def test_mssql_drop_with_select():
q = M_QUERIES[33]
lineage: List[resolver.Lineage] = get_data_platform_tables_with_dummy_table(q=q)
lineage: List[
datahub.ingestion.source.powerbi.m_query.data_classes.Lineage
] = get_data_platform_tables_with_dummy_table(q=q)
assert len(lineage) == 1
@ -1062,7 +1084,9 @@ def test_empty_string_in_m_query():
# TRIM(TRIM(TRIM(AGENT_NAME, '\"\"'), '+'), '\\'') is in Query
q = "let\n Source = Value.NativeQuery(Snowflake.Databases(\"bu10758.ap-unknown-2.fakecomputing.com\",\"operations_analytics_warehouse_prod\",[Role=\"OPERATIONS_ANALYTICS_MEMBER\"]){[Name=\"OPERATIONS_ANALYTICS\"]}[Data], \"select #(lf)UPPER(REPLACE(AGENT_NAME,'-','')) AS CLIENT_DIRECTOR,#(lf)TRIM(TRIM(TRIM(AGENT_NAME, '\"\"'), '+'), '\\'') AS TRIM_AGENT_NAME,#(lf)TIER,#(lf)UPPER(MANAGER),#(lf)TEAM_TYPE,#(lf)DATE_TARGET,#(lf)MONTHID,#(lf)TARGET_TEAM,#(lf)SELLER_EMAIL,#(lf)concat((UPPER(REPLACE(AGENT_NAME,'-',''))), MONTHID) as AGENT_KEY,#(lf)UNIT_TARGET AS SME_Quota,#(lf)AMV_TARGET AS Revenue_Quota,#(lf)SERVICE_QUOTA,#(lf)BL_TARGET,#(lf)SOFTWARE_QUOTA as Software_Quota#(lf)#(lf)from OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_SME_UNIT_TARGETS inner join OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_SME_UNIT #(lf)#(lf)where YEAR_TARGET >= 2022#(lf)and TEAM_TYPE = 'Accounting'#(lf)and TARGET_TEAM = 'Enterprise'#(lf)AND TIER = 'Client Director'\", null, [EnableFolding=true])\nin\n Source"
lineage: List[resolver.Lineage] = get_data_platform_tables_with_dummy_table(q=q)
lineage: List[
datahub.ingestion.source.powerbi.m_query.data_classes.Lineage
] = get_data_platform_tables_with_dummy_table(q=q)
assert len(lineage) == 1
@ -1084,7 +1108,9 @@ def test_double_quotes_in_alias():
# SELECT CAST(sales_date AS DATE) AS \"\"Date\"\" in query
q = 'let \n Source = Sql.Database("abc.com", "DB", [Query="SELECT CAST(sales_date AS DATE) AS ""Date"",#(lf) SUM(cshintrpret) / 60.0 AS ""Total Order All Items"",#(lf)#(tab)#(tab)#(tab) SUM(cshintrpret) / 60.0 - LAG(SUM(cshintrpret) / 60.0, 1) OVER (ORDER BY CAST(sales_date AS DATE)) AS ""Total minute difference"",#(lf)#(tab)#(tab)#(tab) SUM(sale_price) / 60.0 - LAG(SUM(sale_price) / 60.0, 1) OVER (ORDER BY CAST(sales_date AS DATE)) AS ""Normal minute difference""#(lf) FROM [DB].[dbo].[sales_t]#(lf) WHERE sales_date >= GETDATE() - 365#(lf) GROUP BY CAST(sales_date AS DATE),#(lf)#(tab)#(tab)CAST(sales_date AS TIME);"]) \n in \n Source'
lineage: List[resolver.Lineage] = get_data_platform_tables_with_dummy_table(q=q)
lineage: List[
datahub.ingestion.source.powerbi.m_query.data_classes.Lineage
] = get_data_platform_tables_with_dummy_table(q=q)
assert len(lineage) == 1

View File

@ -8,9 +8,7 @@ from datahub.ingestion.source.powerbi.config import (
from datahub.ingestion.source.powerbi.dataplatform_instance_resolver import (
ResolvePlatformInstanceFromDatasetTypeMapping,
)
from datahub.ingestion.source.powerbi.m_query.resolver import (
MSSqlDataPlatformTableCreator,
)
from datahub.ingestion.source.powerbi.m_query.pattern_handler import MSSqlLineage
from datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes import Table
@ -27,7 +25,7 @@ def creator():
full_name="db.schema.test_table",
)
return MSSqlDataPlatformTableCreator(
return MSSqlLineage(
ctx=PipelineContext(run_id="test-run-id"),
table=table,
reporter=PowerBiDashboardSourceReport(),