feat(ingestion/powerbi): support multiple tables as upstream in native SQL parsing (#8592)

This commit is contained in:
siddiquebagwan-gslab 2023-08-23 14:38:58 +05:30 committed by GitHub
parent 4116716a15
commit 8ee58af0c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 714 additions and 281 deletions

View File

@ -121,6 +121,12 @@ class DataPlatformPair:
powerbi_data_platform_name: str powerbi_data_platform_name: str
@dataclass
class PowerBIPlatformDetail:
data_platform_pair: DataPlatformPair
data_platform_server: str
class SupportedDataPlatform(Enum): class SupportedDataPlatform(Enum):
POSTGRES_SQL = DataPlatformPair( POSTGRES_SQL = DataPlatformPair(
powerbi_data_platform_name="PostgreSQL", datahub_data_platform_name="postgres" powerbi_data_platform_name="PostgreSQL", datahub_data_platform_name="postgres"
@ -382,6 +388,15 @@ class PowerBiDashboardSourceConfig(
description="The instance of the platform that all assets produced by this recipe belong to", description="The instance of the platform that all assets produced by this recipe belong to",
) )
# Enable advance sql construct
enable_advance_lineage_sql_construct: bool = pydantic.Field(
default=False,
description="Whether to enable advance native sql construct for parsing like join, sub-queries. "
"along this flag , the native_query_parsing should be enabled. "
"By default convert_lineage_urns_to_lowercase is enabled, in-case if you have disabled it in previous ingestion execution then it may break lineage "
"as this option generates the upstream datasets URN in lowercase.",
)
@validator("dataset_type_mapping") @validator("dataset_type_mapping")
@classmethod @classmethod
def map_data_platform(cls, value): def map_data_platform(cls, value):

View File

@ -5,8 +5,8 @@ from typing import Union
from datahub.ingestion.source.powerbi.config import ( from datahub.ingestion.source.powerbi.config import (
PlatformDetail, PlatformDetail,
PowerBiDashboardSourceConfig, PowerBiDashboardSourceConfig,
PowerBIPlatformDetail,
) )
from datahub.ingestion.source.powerbi.m_query.resolver import DataPlatformTable
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -14,7 +14,7 @@ logger = logging.getLogger(__name__)
class AbstractDataPlatformInstanceResolver(ABC): class AbstractDataPlatformInstanceResolver(ABC):
@abstractmethod @abstractmethod
def get_platform_instance( def get_platform_instance(
self, dataplatform_table: DataPlatformTable self, data_platform_detail: PowerBIPlatformDetail
) -> PlatformDetail: ) -> PlatformDetail:
pass pass
@ -32,10 +32,10 @@ class ResolvePlatformInstanceFromDatasetTypeMapping(
BaseAbstractDataPlatformInstanceResolver BaseAbstractDataPlatformInstanceResolver
): ):
def get_platform_instance( def get_platform_instance(
self, dataplatform_table: DataPlatformTable self, data_platform_detail: PowerBIPlatformDetail
) -> PlatformDetail: ) -> PlatformDetail:
platform: Union[str, PlatformDetail] = self.config.dataset_type_mapping[ platform: Union[str, PlatformDetail] = self.config.dataset_type_mapping[
dataplatform_table.data_platform_pair.powerbi_data_platform_name data_platform_detail.data_platform_pair.powerbi_data_platform_name
] ]
if isinstance(platform, PlatformDetail): if isinstance(platform, PlatformDetail):
@ -48,13 +48,13 @@ class ResolvePlatformInstanceFromServerToPlatformInstance(
BaseAbstractDataPlatformInstanceResolver BaseAbstractDataPlatformInstanceResolver
): ):
def get_platform_instance( def get_platform_instance(
self, dataplatform_table: DataPlatformTable self, data_platform_detail: PowerBIPlatformDetail
) -> PlatformDetail: ) -> PlatformDetail:
return ( return (
self.config.server_to_platform_instance[ self.config.server_to_platform_instance[
dataplatform_table.datasource_server data_platform_detail.data_platform_server
] ]
if dataplatform_table.datasource_server if data_platform_detail.data_platform_server
in self.config.server_to_platform_instance in self.config.server_to_platform_instance
else PlatformDetail.parse_obj({}) else PlatformDetail.parse_obj({})
) )

View File

@ -1,8 +1,12 @@
import logging import logging
from typing import List from typing import List, Optional
import sqlparse import sqlparse
import datahub.utilities.sqlglot_lineage as sqlglot_l
from datahub.ingestion.api.common import PipelineContext
from datahub.utilities.sqlglot_lineage import SqlParsingResult
SPECIAL_CHARACTERS = ["#(lf)", "(lf)"] SPECIAL_CHARACTERS = ["#(lf)", "(lf)"]
logger = logging.getLogger() logger = logging.getLogger()
@ -45,3 +49,30 @@ def get_tables(native_query: str) -> List[str]:
from_index = from_index + 1 from_index = from_index + 1
return tables return tables
def parse_custom_sql(
ctx: PipelineContext,
query: str,
schema: Optional[str],
database: Optional[str],
platform: str,
env: str,
platform_instance: Optional[str],
) -> Optional["SqlParsingResult"]:
logger.debug("Using sqlglot_lineage to parse custom sql")
sql_query = remove_special_characters(query)
logger.debug(f"Parsing sql={sql_query}")
return sqlglot_l.create_lineage_sql_parsed_result(
query=sql_query,
schema=schema,
database=database,
platform=platform,
platform_instance=platform_instance,
env=env,
graph=ctx.graph,
)

View File

@ -6,7 +6,14 @@ from typing import Dict, List
import lark import lark
from lark import Lark, Tree from lark import Lark, Tree
from datahub.ingestion.source.powerbi.config import PowerBiDashboardSourceReport from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.powerbi.config import (
PowerBiDashboardSourceConfig,
PowerBiDashboardSourceReport,
)
from datahub.ingestion.source.powerbi.dataplatform_instance_resolver import (
AbstractDataPlatformInstanceResolver,
)
from datahub.ingestion.source.powerbi.m_query import resolver, validator from datahub.ingestion.source.powerbi.m_query import resolver, validator
from datahub.ingestion.source.powerbi.m_query.data_classes import ( from datahub.ingestion.source.powerbi.m_query.data_classes import (
TRACE_POWERBI_MQUERY_PARSER, TRACE_POWERBI_MQUERY_PARSER,
@ -45,7 +52,9 @@ def _parse_expression(expression: str) -> Tree:
def get_upstream_tables( def get_upstream_tables(
table: Table, table: Table,
reporter: PowerBiDashboardSourceReport, reporter: PowerBiDashboardSourceReport,
native_query_enabled: bool = True, platform_instance_resolver: AbstractDataPlatformInstanceResolver,
ctx: PipelineContext,
config: PowerBiDashboardSourceConfig,
parameters: Dict[str, str] = {}, parameters: Dict[str, str] = {},
) -> List[resolver.DataPlatformTable]: ) -> List[resolver.DataPlatformTable]:
if table.expression is None: if table.expression is None:
@ -58,7 +67,7 @@ def get_upstream_tables(
parse_tree: Tree = _parse_expression(table.expression) parse_tree: Tree = _parse_expression(table.expression)
valid, message = validator.validate_parse_tree( valid, message = validator.validate_parse_tree(
parse_tree, native_query_enabled=native_query_enabled parse_tree, native_query_enabled=config.native_query_parsing
) )
if valid is False: if valid is False:
assert message is not None assert message is not None
@ -84,7 +93,11 @@ def get_upstream_tables(
parse_tree=parse_tree, parse_tree=parse_tree,
reporter=reporter, reporter=reporter,
parameters=parameters, parameters=parameters,
).resolve_to_data_platform_table_list() ).resolve_to_data_platform_table_list(
ctx=ctx,
config=config,
platform_instance_resolver=platform_instance_resolver,
)
except BaseException as e: except BaseException as e:
reporter.report_warning(table.full_name, "Failed to process m-query expression") reporter.report_warning(table.full_name, "Failed to process m-query expression")

View File

@ -6,11 +6,19 @@ from typing import Any, Dict, List, Optional, Tuple, Type, Union, cast
from lark import Tree from lark import Tree
import datahub.emitter.mce_builder as builder
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.powerbi.config import ( from datahub.ingestion.source.powerbi.config import (
DataPlatformPair, DataPlatformPair,
PlatformDetail,
PowerBiDashboardSourceConfig,
PowerBiDashboardSourceReport, PowerBiDashboardSourceReport,
PowerBIPlatformDetail,
SupportedDataPlatform, SupportedDataPlatform,
) )
from datahub.ingestion.source.powerbi.dataplatform_instance_resolver import (
AbstractDataPlatformInstanceResolver,
)
from datahub.ingestion.source.powerbi.m_query import native_sql_parser, tree_function from datahub.ingestion.source.powerbi.m_query import native_sql_parser, tree_function
from datahub.ingestion.source.powerbi.m_query.data_classes import ( from datahub.ingestion.source.powerbi.m_query.data_classes import (
TRACE_POWERBI_MQUERY_PARSER, TRACE_POWERBI_MQUERY_PARSER,
@ -19,19 +27,98 @@ from datahub.ingestion.source.powerbi.m_query.data_classes import (
IdentifierAccessor, IdentifierAccessor,
) )
from datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes import Table from datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes import Table
from datahub.utilities.sqlglot_lineage import SqlParsingResult
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@dataclass @dataclass
class DataPlatformTable: class DataPlatformTable:
name: str
full_name: str
datasource_server: str
data_platform_pair: DataPlatformPair data_platform_pair: DataPlatformPair
urn: str
def urn_to_lowercase(value: str, flag: bool) -> str:
if flag is True:
return value.lower()
return value
def urn_creator(
config: PowerBiDashboardSourceConfig,
platform_instance_resolver: AbstractDataPlatformInstanceResolver,
data_platform_pair: DataPlatformPair,
server: str,
qualified_table_name: str,
) -> str:
platform_detail: PlatformDetail = platform_instance_resolver.get_platform_instance(
PowerBIPlatformDetail(
data_platform_pair=data_platform_pair,
data_platform_server=server,
)
)
return builder.make_dataset_urn_with_platform_instance(
platform=data_platform_pair.datahub_data_platform_name,
platform_instance=platform_detail.platform_instance,
env=platform_detail.env,
name=urn_to_lowercase(
qualified_table_name, config.convert_lineage_urns_to_lowercase
),
)
class AbstractDataPlatformTableCreator(ABC): class AbstractDataPlatformTableCreator(ABC):
"""
Base class to share common functionalities among different dataplatform for M-Query parsing.
To create qualified table name we need to parse M-Query data-access-functions(https://learn.microsoft.com/en-us/powerquery-m/accessing-data-functions) and
the data-access-functions has some define pattern to access database-name, schema-name and table-name, for example see below M-Query.
let
Source = Sql.Database("localhost", "library"),
dbo_book_issue = Source{[Schema="dbo",Item="book_issue"]}[Data]
in
dbo_book_issue
It is MSSQL M-Query and Sql.Database is the data-access-function to access MSSQL. If this function is available in M-Query then database name is available in second argument
of first statement and schema-name and table-name is available in second statement. second statement can be repeated to access different tables from MSSQL.
DefaultTwoStepDataAccessSources extends the AbstractDataPlatformTableCreator and provides the common functionalities for data-platform which has above type of M-Query pattern
data-access-function varies as per data-platform for example for MySQL.Database for MySQL, PostgreSQL.Database for Postgres and Oracle.Database for Oracle and number of statement to
find out database-name , schema-name and table-name also varies as per dataplatform.
Value.NativeQuery is one of the function which is used to execute native query inside M-Query, for example see below M-Query
let
Source = Value.NativeQuery(AmazonRedshift.Database("redshift-url","dev"), "select * from dev.public.category", null, [EnableFolding=true])
in
Source
In this M-Query database-name is available in first argument and rest of the detail i.e database & schema is available in native query.
NativeQueryDataPlatformTableCreator extends AbstractDataPlatformTableCreator to support Redshift and Snowflake native query parsing.
"""
ctx: PipelineContext
config: PowerBiDashboardSourceConfig
platform_instance_resolver: AbstractDataPlatformInstanceResolver
def __init__(
self,
ctx: PipelineContext,
config: PowerBiDashboardSourceConfig,
platform_instance_resolver: AbstractDataPlatformInstanceResolver,
) -> None:
super().__init__()
self.ctx = ctx
self.config = config
self.platform_instance_resolver = platform_instance_resolver
@abstractmethod @abstractmethod
def create_dataplatform_tables( def create_dataplatform_tables(
self, data_access_func_detail: DataAccessFunctionDetail self, data_access_func_detail: DataAccessFunctionDetail
@ -58,6 +145,49 @@ class AbstractDataPlatformTableCreator(ABC):
return arguments[0], arguments[1] return arguments[0], arguments[1]
def parse_custom_sql(
self, query: str, server: str, database: Optional[str], schema: Optional[str]
) -> List[DataPlatformTable]:
dataplatform_tables: List[DataPlatformTable] = []
platform_detail: PlatformDetail = (
self.platform_instance_resolver.get_platform_instance(
PowerBIPlatformDetail(
data_platform_pair=self.get_platform_pair(),
data_platform_server=server,
)
)
)
parsed_result: Optional[
"SqlParsingResult"
] = native_sql_parser.parse_custom_sql(
ctx=self.ctx,
query=query,
platform=self.get_platform_pair().datahub_data_platform_name,
platform_instance=platform_detail.platform_instance,
env=platform_detail.env,
database=database,
schema=schema,
)
if parsed_result is None:
logger.debug("Failed to parse query")
return dataplatform_tables
for urn in parsed_result.in_tables:
dataplatform_tables.append(
DataPlatformTable(
data_platform_pair=self.get_platform_pair(),
urn=urn,
)
)
logger.debug(f"Generated dataplatform_tables={dataplatform_tables}")
return dataplatform_tables
class AbstractDataAccessMQueryResolver(ABC): class AbstractDataAccessMQueryResolver(ABC):
table: Table table: Table
@ -80,11 +210,29 @@ class AbstractDataAccessMQueryResolver(ABC):
self.data_access_functions = SupportedResolver.get_function_names() self.data_access_functions = SupportedResolver.get_function_names()
@abstractmethod @abstractmethod
def resolve_to_data_platform_table_list(self) -> List[DataPlatformTable]: def resolve_to_data_platform_table_list(
self,
ctx: PipelineContext,
config: PowerBiDashboardSourceConfig,
platform_instance_resolver: AbstractDataPlatformInstanceResolver,
) -> List[DataPlatformTable]:
pass pass
class MQueryResolver(AbstractDataAccessMQueryResolver, ABC): class MQueryResolver(AbstractDataAccessMQueryResolver, ABC):
"""
This class parses the M-Query recursively to generate DataAccessFunctionDetail (see method create_data_access_functional_detail).
This class has generic code to process M-Query tokens and create instance of DataAccessFunctionDetail.
Once DataAccessFunctionDetail instance is initialized thereafter MQueryResolver generates the DataPlatformTable with the help of AbstractDataPlatformTableCreator
(see method resolve_to_data_platform_table_list).
Classes which extended from AbstractDataPlatformTableCreator knows how to convert generated DataAccessFunctionDetail instance
to respective DataPlatformTable instance as per dataplatform.
"""
def get_item_selector_tokens( def get_item_selector_tokens(
self, self,
expression_tree: Tree, expression_tree: Tree,
@ -318,9 +466,15 @@ class MQueryResolver(AbstractDataAccessMQueryResolver, ABC):
return table_links return table_links
def resolve_to_data_platform_table_list(self) -> List[DataPlatformTable]: def resolve_to_data_platform_table_list(
self,
ctx: PipelineContext,
config: PowerBiDashboardSourceConfig,
platform_instance_resolver: AbstractDataPlatformInstanceResolver,
) -> List[DataPlatformTable]:
data_platform_tables: List[DataPlatformTable] = [] data_platform_tables: List[DataPlatformTable] = []
# Find out output variable as we are doing backtracking in M-Query
output_variable: Optional[str] = tree_function.get_output_variable( output_variable: Optional[str] = tree_function.get_output_variable(
self.parse_tree self.parse_tree
) )
@ -332,12 +486,14 @@ class MQueryResolver(AbstractDataAccessMQueryResolver, ABC):
) )
return data_platform_tables return data_platform_tables
# Parse M-Query and use output_variable as root of tree and create instance of DataAccessFunctionDetail
table_links: List[ table_links: List[
DataAccessFunctionDetail DataAccessFunctionDetail
] = self.create_data_access_functional_detail(output_variable) ] = self.create_data_access_functional_detail(output_variable)
# Each item is data-access function # Each item is data-access function
for f_detail in table_links: for f_detail in table_links:
# Get & Check if we support data-access-function available in M-Query
supported_resolver = SupportedResolver.get_resolver( supported_resolver = SupportedResolver.get_resolver(
f_detail.data_access_function_name f_detail.data_access_function_name
) )
@ -351,8 +507,14 @@ class MQueryResolver(AbstractDataAccessMQueryResolver, ABC):
) )
continue continue
# From supported_resolver enum get respective resolver like AmazonRedshift or Snowflake or Oracle or NativeQuery and create instance of it
# & also pass additional information that will be need to generate urn
table_full_name_creator: AbstractDataPlatformTableCreator = ( table_full_name_creator: AbstractDataPlatformTableCreator = (
supported_resolver.get_table_full_name_creator()() supported_resolver.get_table_full_name_creator()(
ctx=ctx,
config=config,
platform_instance_resolver=platform_instance_resolver,
)
) )
data_platform_tables.extend( data_platform_tables.extend(
@ -393,18 +555,24 @@ class DefaultTwoStepDataAccessSources(AbstractDataPlatformTableCreator, ABC):
IdentifierAccessor, data_access_func_detail.identifier_accessor IdentifierAccessor, data_access_func_detail.identifier_accessor
).items["Item"] ).items["Item"]
full_table_name: str = f"{db_name}.{schema_name}.{table_name}" qualified_table_name: str = f"{db_name}.{schema_name}.{table_name}"
logger.debug( logger.debug(
f"Platform({self.get_platform_pair().datahub_data_platform_name}) full_table_name= {full_table_name}" f"Platform({self.get_platform_pair().datahub_data_platform_name}) qualified_table_name= {qualified_table_name}"
)
urn = urn_creator(
config=self.config,
platform_instance_resolver=self.platform_instance_resolver,
data_platform_pair=self.get_platform_pair(),
server=server,
qualified_table_name=qualified_table_name,
) )
return [ return [
DataPlatformTable( DataPlatformTable(
name=table_name,
full_name=full_table_name,
datasource_server=server,
data_platform_pair=self.get_platform_pair(), data_platform_pair=self.get_platform_pair(),
urn=urn,
) )
] ]
@ -420,9 +588,48 @@ class PostgresDataPlatformTableCreator(DefaultTwoStepDataAccessSources):
class MSSqlDataPlatformTableCreator(DefaultTwoStepDataAccessSources): class MSSqlDataPlatformTableCreator(DefaultTwoStepDataAccessSources):
# https://learn.microsoft.com/en-us/sql/relational-databases/security/authentication-access/ownership-and-user-schema-separation?view=sql-server-ver16
DEFAULT_SCHEMA = "dbo" # Default schema name in MS-SQL is dbo
def get_platform_pair(self) -> DataPlatformPair: def get_platform_pair(self) -> DataPlatformPair:
return SupportedDataPlatform.MS_SQL.value return SupportedDataPlatform.MS_SQL.value
def create_urn_using_old_parser(
self, query: str, db_name: str, server: str
) -> List[DataPlatformTable]:
dataplatform_tables: List[DataPlatformTable] = []
tables: List[str] = native_sql_parser.get_tables(query)
for table in tables:
schema_and_table: List[str] = table.split(".")
if len(schema_and_table) == 1:
# schema name is not present. set default schema
schema_and_table.insert(0, MSSqlDataPlatformTableCreator.DEFAULT_SCHEMA)
qualified_table_name = (
f"{db_name}.{schema_and_table[0]}.{schema_and_table[1]}"
)
urn = urn_creator(
config=self.config,
platform_instance_resolver=self.platform_instance_resolver,
data_platform_pair=self.get_platform_pair(),
server=server,
qualified_table_name=qualified_table_name,
)
dataplatform_tables.append(
DataPlatformTable(
data_platform_pair=self.get_platform_pair(),
urn=urn,
)
)
logger.debug(f"Generated upstream tables = {dataplatform_tables}")
return dataplatform_tables
def create_dataplatform_tables( def create_dataplatform_tables(
self, data_access_func_detail: DataAccessFunctionDetail self, data_access_func_detail: DataAccessFunctionDetail
) -> List[DataPlatformTable]: ) -> List[DataPlatformTable]:
@ -442,28 +649,20 @@ class MSSqlDataPlatformTableCreator(DefaultTwoStepDataAccessSources):
logger.debug("Unsupported case is found. Second index is not the Query") logger.debug("Unsupported case is found. Second index is not the Query")
return dataplatform_tables return dataplatform_tables
db_name: str = arguments[1] if self.config.enable_advance_lineage_sql_construct is False:
# Use previous parser to generate URN to keep backward compatibility
tables: List[str] = native_sql_parser.get_tables(arguments[3]) return self.create_urn_using_old_parser(
for table in tables: query=arguments[3],
schema_and_table: List[str] = table.split(".") db_name=arguments[1],
if len(schema_and_table) == 1: server=arguments[0],
# schema name is not present. Default schema name in MS-SQL is dbo
# https://learn.microsoft.com/en-us/sql/relational-databases/security/authentication-access/ownership-and-user-schema-separation?view=sql-server-ver16
schema_and_table.insert(0, "dbo")
dataplatform_tables.append(
DataPlatformTable(
name=schema_and_table[1],
full_name=f"{db_name}.{schema_and_table[0]}.{schema_and_table[1]}",
datasource_server=arguments[0],
data_platform_pair=self.get_platform_pair(),
)
) )
logger.debug("MS-SQL full-table-names %s", dataplatform_tables) return self.parse_custom_sql(
query=arguments[3],
return dataplatform_tables database=arguments[1],
server=arguments[0],
schema=MSSqlDataPlatformTableCreator.DEFAULT_SCHEMA,
)
class OracleDataPlatformTableCreator(AbstractDataPlatformTableCreator): class OracleDataPlatformTableCreator(AbstractDataPlatformTableCreator):
@ -510,12 +709,20 @@ class OracleDataPlatformTableCreator(AbstractDataPlatformTableCreator):
cast(IdentifierAccessor, data_access_func_detail.identifier_accessor).next, cast(IdentifierAccessor, data_access_func_detail.identifier_accessor).next,
).items["Name"] ).items["Name"]
qualified_table_name: str = f"{db_name}.{schema_name}.{table_name}"
urn = urn_creator(
config=self.config,
platform_instance_resolver=self.platform_instance_resolver,
data_platform_pair=self.get_platform_pair(),
server=server,
qualified_table_name=qualified_table_name,
)
return [ return [
DataPlatformTable( DataPlatformTable(
name=table_name,
full_name=f"{db_name}.{schema_name}.{table_name}",
datasource_server=server,
data_platform_pair=self.get_platform_pair(), data_platform_pair=self.get_platform_pair(),
urn=urn,
) )
] ]
@ -547,14 +754,28 @@ class DatabrickDataPlatformTableCreator(AbstractDataPlatformTableCreator):
db_name: str = value_dict["Database"] db_name: str = value_dict["Database"]
schema_name: str = value_dict["Schema"] schema_name: str = value_dict["Schema"]
table_name: str = value_dict["Table"] table_name: str = value_dict["Table"]
qualified_table_name: str = f"{db_name}.{schema_name}.{table_name}"
server, _ = self.get_db_detail_from_argument(data_access_func_detail.arg_list) server, _ = self.get_db_detail_from_argument(data_access_func_detail.arg_list)
if server is None:
logger.info(
f"server information is not available for {qualified_table_name}. Skipping upstream table"
)
return []
urn = urn_creator(
config=self.config,
platform_instance_resolver=self.platform_instance_resolver,
data_platform_pair=self.get_platform_pair(),
server=server,
qualified_table_name=qualified_table_name,
)
return [ return [
DataPlatformTable( DataPlatformTable(
name=table_name,
full_name=f"{db_name}.{schema_name}.{table_name}",
datasource_server=server if server else "",
data_platform_pair=self.get_platform_pair(), data_platform_pair=self.get_platform_pair(),
urn=urn,
) )
] ]
@ -589,20 +810,26 @@ class DefaultThreeStepDataAccessSources(AbstractDataPlatformTableCreator, ABC):
IdentifierAccessor, data_access_func_detail.identifier_accessor.next.next # type: ignore IdentifierAccessor, data_access_func_detail.identifier_accessor.next.next # type: ignore
).items["Name"] ).items["Name"]
full_table_name: str = f"{db_name}.{schema_name}.{table_name}" qualified_table_name: str = f"{db_name}.{schema_name}.{table_name}"
logger.debug( logger.debug(
f"{self.get_platform_pair().datahub_data_platform_name} full-table-name {full_table_name}" f"{self.get_platform_pair().datahub_data_platform_name} qualified_table_name {qualified_table_name}"
)
server: str = self.get_datasource_server(arguments, data_access_func_detail)
urn = urn_creator(
config=self.config,
platform_instance_resolver=self.platform_instance_resolver,
data_platform_pair=self.get_platform_pair(),
server=server,
qualified_table_name=qualified_table_name,
) )
return [ return [
DataPlatformTable( DataPlatformTable(
name=table_name,
full_name=full_table_name,
datasource_server=self.get_datasource_server(
arguments, data_access_func_detail
),
data_platform_pair=self.get_platform_pair(), data_platform_pair=self.get_platform_pair(),
urn=urn,
) )
] ]
@ -654,12 +881,20 @@ class AmazonRedshiftDataPlatformTableCreator(AbstractDataPlatformTableCreator):
cast(IdentifierAccessor, data_access_func_detail.identifier_accessor).next, cast(IdentifierAccessor, data_access_func_detail.identifier_accessor).next,
).items["Name"] ).items["Name"]
qualified_table_name: str = f"{db_name}.{schema_name}.{table_name}"
urn = urn_creator(
config=self.config,
platform_instance_resolver=self.platform_instance_resolver,
data_platform_pair=self.get_platform_pair(),
server=server,
qualified_table_name=qualified_table_name,
)
return [ return [
DataPlatformTable( DataPlatformTable(
name=table_name,
full_name=f"{db_name}.{schema_name}.{table_name}",
datasource_server=server,
data_platform_pair=self.get_platform_pair(), data_platform_pair=self.get_platform_pair(),
urn=urn,
) )
] ]
@ -681,6 +916,39 @@ class NativeQueryDataPlatformTableCreator(AbstractDataPlatformTableCreator):
in NativeQueryDataPlatformTableCreator.SUPPORTED_NATIVE_QUERY_DATA_PLATFORM in NativeQueryDataPlatformTableCreator.SUPPORTED_NATIVE_QUERY_DATA_PLATFORM
) )
def create_urn_using_old_parser(
self, query: str, server: str
) -> List[DataPlatformTable]:
dataplatform_tables: List[DataPlatformTable] = []
tables: List[str] = native_sql_parser.get_tables(query)
for qualified_table_name in tables:
if len(qualified_table_name.split(".")) != 3:
logger.debug(
f"Skipping table {qualified_table_name} as it is not as per qualified_table_name format"
)
continue
urn = urn_creator(
config=self.config,
platform_instance_resolver=self.platform_instance_resolver,
data_platform_pair=self.get_platform_pair(),
server=server,
qualified_table_name=qualified_table_name,
)
dataplatform_tables.append(
DataPlatformTable(
data_platform_pair=self.get_platform_pair(),
urn=urn,
)
)
logger.debug(f"Generated dataplatform_tables {dataplatform_tables}")
return dataplatform_tables
def create_dataplatform_tables( def create_dataplatform_tables(
self, data_access_func_detail: DataAccessFunctionDetail self, data_access_func_detail: DataAccessFunctionDetail
) -> List[DataPlatformTable]: ) -> List[DataPlatformTable]:
@ -727,25 +995,21 @@ class NativeQueryDataPlatformTableCreator(AbstractDataPlatformTableCreator):
0 0
] # Remove any whitespaces and double quotes character ] # Remove any whitespaces and double quotes character
for table in native_sql_parser.get_tables(sql_query): server = tree_function.strip_char_from_list([data_access_tokens[2]])[0]
if len(table.split(".")) != 3:
logger.debug(
f"Skipping table {table} as it is not as per full_table_name format"
)
continue
dataplatform_tables.append( if self.config.enable_advance_lineage_sql_construct is False:
DataPlatformTable( # Use previous parser to generate URN to keep backward compatibility
name=table.split(".")[2], return self.create_urn_using_old_parser(
full_name=table, query=sql_query,
datasource_server=tree_function.strip_char_from_list( server=server,
[data_access_tokens[2]]
)[0],
data_platform_pair=self.get_platform_pair(),
)
) )
return dataplatform_tables return self.parse_custom_sql(
query=sql_query,
server=server,
database=None, # database and schema is available inside custom sql as per PowerBI Behavior
schema=None,
)
class FunctionName(Enum): class FunctionName(Enum):

View File

@ -28,7 +28,6 @@ from datahub.ingestion.source.common.subtypes import (
) )
from datahub.ingestion.source.powerbi.config import ( from datahub.ingestion.source.powerbi.config import (
Constant, Constant,
PlatformDetail,
PowerBiDashboardSourceConfig, PowerBiDashboardSourceConfig,
PowerBiDashboardSourceReport, PowerBiDashboardSourceReport,
) )
@ -96,10 +95,12 @@ class Mapper:
def __init__( def __init__(
self, self,
ctx: PipelineContext,
config: PowerBiDashboardSourceConfig, config: PowerBiDashboardSourceConfig,
reporter: PowerBiDashboardSourceReport, reporter: PowerBiDashboardSourceReport,
dataplatform_instance_resolver: AbstractDataPlatformInstanceResolver, dataplatform_instance_resolver: AbstractDataPlatformInstanceResolver,
): ):
self.__ctx = ctx
self.__config = config self.__config = config
self.__reporter = reporter self.__reporter = reporter
self.__dataplatform_instance_resolver = dataplatform_instance_resolver self.__dataplatform_instance_resolver = dataplatform_instance_resolver
@ -172,43 +173,40 @@ class Mapper:
# table.dataset should always be set, but we check it just in case. # table.dataset should always be set, but we check it just in case.
parameters = table.dataset.parameters if table.dataset else {} parameters = table.dataset.parameters if table.dataset else {}
upstreams: List[UpstreamClass] = [] upstream: List[UpstreamClass] = []
upstream_tables: List[resolver.DataPlatformTable] = parser.get_upstream_tables(
table, self.__reporter, parameters=parameters upstream_dpts: List[resolver.DataPlatformTable] = parser.get_upstream_tables(
table=table,
reporter=self.__reporter,
platform_instance_resolver=self.__dataplatform_instance_resolver,
ctx=self.__ctx,
config=self.__config,
parameters=parameters,
) )
logger.debug( logger.debug(
f"PowerBI virtual table {table.full_name} and it's upstream dataplatform tables = {upstream_tables}" f"PowerBI virtual table {table.full_name} and it's upstream dataplatform tables = {upstream_dpts}"
) )
for upstream_table in upstream_tables:
for upstream_dpt in upstream_dpts:
if ( if (
upstream_table.data_platform_pair.powerbi_data_platform_name upstream_dpt.data_platform_pair.powerbi_data_platform_name
not in self.__config.dataset_type_mapping.keys() not in self.__config.dataset_type_mapping.keys()
): ):
logger.debug( logger.debug(
f"Skipping upstream table for {ds_urn}. The platform {upstream_table.data_platform_pair.powerbi_data_platform_name} is not part of dataset_type_mapping", f"Skipping upstream table for {ds_urn}. The platform {upstream_dpt.data_platform_pair.powerbi_data_platform_name} is not part of dataset_type_mapping",
) )
continue continue
platform_detail: PlatformDetail = (
self.__dataplatform_instance_resolver.get_platform_instance(
upstream_table
)
)
upstream_urn = builder.make_dataset_urn_with_platform_instance(
platform=upstream_table.data_platform_pair.datahub_data_platform_name,
platform_instance=platform_detail.platform_instance,
env=platform_detail.env,
name=self.lineage_urn_to_lowercase(upstream_table.full_name),
)
upstream_table_class = UpstreamClass( upstream_table_class = UpstreamClass(
upstream_urn, upstream_dpt.urn,
DatasetLineageTypeClass.TRANSFORMED, DatasetLineageTypeClass.TRANSFORMED,
) )
upstreams.append(upstream_table_class)
if len(upstreams) > 0: upstream.append(upstream_table_class)
upstream_lineage = UpstreamLineageClass(upstreams=upstreams)
if len(upstream) > 0:
upstream_lineage = UpstreamLineageClass(upstreams=upstream)
logger.debug(f"Dataset urn = {ds_urn} and its lineage = {upstream_lineage}") logger.debug(f"Dataset urn = {ds_urn} and its lineage = {upstream_lineage}")
mcp = MetadataChangeProposalWrapper( mcp = MetadataChangeProposalWrapper(
entityType=Constant.DATASET, entityType=Constant.DATASET,
@ -1107,7 +1105,9 @@ class PowerBiDashboardSource(StatefulIngestionSourceBase):
) # Exit pipeline as we are not able to connect to PowerBI API Service. This exit will avoid raising ) # Exit pipeline as we are not able to connect to PowerBI API Service. This exit will avoid raising
# unwanted stacktrace on console # unwanted stacktrace on console
self.mapper = Mapper(config, self.reporter, self.dataplatform_instance_resolver) self.mapper = Mapper(
ctx, config, self.reporter, self.dataplatform_instance_resolver
)
# Create and register the stateful ingestion use-case handler. # Create and register the stateful ingestion use-case handler.
self.stale_entity_removal_handler = StaleEntityRemovalHandler.create( self.stale_entity_removal_handler = StaleEntityRemovalHandler.create(

View File

@ -31,6 +31,7 @@ from tableauserverclient import (
from tableauserverclient.server.endpoint.exceptions import NonXMLResponseError from tableauserverclient.server.endpoint.exceptions import NonXMLResponseError
import datahub.emitter.mce_builder as builder import datahub.emitter.mce_builder as builder
import datahub.utilities.sqlglot_lineage as sqlglot_l
from datahub.configuration.common import ( from datahub.configuration.common import (
AllowDenyPattern, AllowDenyPattern,
ConfigModel, ConfigModel,
@ -136,12 +137,7 @@ from datahub.metadata.schema_classes import (
ViewPropertiesClass, ViewPropertiesClass,
) )
from datahub.utilities import config_clean from datahub.utilities import config_clean
from datahub.utilities.sqlglot_lineage import ( from datahub.utilities.sqlglot_lineage import ColumnLineageInfo, SqlParsingResult
ColumnLineageInfo,
SchemaResolver,
SqlParsingResult,
sqlglot_lineage,
)
logger: logging.Logger = logging.getLogger(__name__) logger: logging.Logger = logging.getLogger(__name__)
@ -1585,42 +1581,14 @@ class TableauSource(StatefulIngestionSourceBase):
f"Overridden info upstream_db={upstream_db}, platform_instance={platform_instance}, platform={platform}" f"Overridden info upstream_db={upstream_db}, platform_instance={platform_instance}, platform={platform}"
) )
parsed_result: Optional["SqlParsingResult"] = None return sqlglot_l.create_lineage_sql_parsed_result(
try: query=query,
schema_resolver = ( database=upstream_db,
self.ctx.graph._make_schema_resolver( platform=platform,
platform=platform, platform_instance=platform_instance,
platform_instance=platform_instance, env=env,
env=env, graph=self.ctx.graph,
) )
if self.ctx.graph is not None
else SchemaResolver(
platform=platform,
platform_instance=platform_instance,
env=env,
graph=None,
)
)
if schema_resolver.graph is None:
logger.warning(
"Column Level Lineage extraction would not work as DataHub graph client is None."
)
parsed_result = sqlglot_lineage(
query,
schema_resolver=schema_resolver,
default_db=upstream_db,
)
except Exception as e:
self.report.report_warning(
key="csql-lineage",
reason=f"Unable to retrieve lineage from query. "
f"Query: {query} "
f"Reason: {str(e)} ",
)
return parsed_result
def _create_lineage_from_unsupported_csql( def _create_lineage_from_unsupported_csql(
self, csql_urn: str, csql: dict self, csql_urn: str, csql: dict

View File

@ -825,3 +825,43 @@ def sqlglot_lineage(
table_error=e, table_error=e,
), ),
) )
def create_lineage_sql_parsed_result(
query: str,
database: Optional[str],
platform: str,
platform_instance: Optional[str],
env: str,
schema: Optional[str] = None,
graph: Optional[DataHubGraph] = None,
) -> Optional["SqlParsingResult"]:
parsed_result: Optional["SqlParsingResult"] = None
try:
schema_resolver = (
graph._make_schema_resolver(
platform=platform,
platform_instance=platform_instance,
env=env,
)
if graph is not None
else SchemaResolver(
platform=platform,
platform_instance=platform_instance,
env=env,
graph=None,
)
)
parsed_result = sqlglot_lineage(
query,
schema_resolver=schema_resolver,
default_db=database,
default_schema=schema,
)
except Exception as e:
logger.debug(f"Fail to prase query {query}", exc_info=e)
logger.warning("Fail to parse custom SQL")
return parsed_result

View File

@ -1,17 +1,22 @@
import logging import logging
import sys import sys
from typing import List from typing import List, Tuple
import pytest import pytest
from lark import Tree from lark import Tree
import datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes as powerbi_data_classes import datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes as powerbi_data_classes
from datahub.ingestion.source.powerbi.config import PowerBiDashboardSourceReport from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.powerbi.m_query import parser, tree_function from datahub.ingestion.source.powerbi.config import (
from datahub.ingestion.source.powerbi.m_query.resolver import ( PowerBiDashboardSourceConfig,
DataPlatformTable, PowerBiDashboardSourceReport,
SupportedDataPlatform,
) )
from datahub.ingestion.source.powerbi.dataplatform_instance_resolver import (
AbstractDataPlatformInstanceResolver,
create_dataplatform_instance_resolver,
)
from datahub.ingestion.source.powerbi.m_query import parser, tree_function
from datahub.ingestion.source.powerbi.m_query.resolver import DataPlatformTable
M_QUERIES = [ M_QUERIES = [
'let\n Source = Snowflake.Databases("bu10758.ap-unknown-2.fakecomputing.com","PBI_TEST_WAREHOUSE_PROD",[Role="PBI_TEST_MEMBER"]),\n PBI_TEST_Database = Source{[Name="PBI_TEST",Kind="Database"]}[Data],\n TEST_Schema = PBI_TEST_Database{[Name="TEST",Kind="Schema"]}[Data],\n TESTTABLE_Table = TEST_Schema{[Name="TESTTABLE",Kind="Table"]}[Data]\nin\n TESTTABLE_Table', 'let\n Source = Snowflake.Databases("bu10758.ap-unknown-2.fakecomputing.com","PBI_TEST_WAREHOUSE_PROD",[Role="PBI_TEST_MEMBER"]),\n PBI_TEST_Database = Source{[Name="PBI_TEST",Kind="Database"]}[Data],\n TEST_Schema = PBI_TEST_Database{[Name="TEST",Kind="Schema"]}[Data],\n TESTTABLE_Table = TEST_Schema{[Name="TESTTABLE",Kind="Table"]}[Data]\nin\n TESTTABLE_Table',
@ -38,9 +43,31 @@ M_QUERIES = [
'let\n Source = AmazonRedshift.Database("redshift-url","dev"),\n public = Source{[Name="public"]}[Data],\n category1 = public{[Name="category"]}[Data]\nin\n category1', 'let\n Source = AmazonRedshift.Database("redshift-url","dev"),\n public = Source{[Name="public"]}[Data],\n category1 = public{[Name="category"]}[Data]\nin\n category1',
'let\n Source = Value.NativeQuery(AmazonRedshift.Database("redshift-url","dev"), "select * from dev.public.category", null, [EnableFolding=true]) \n in Source', 'let\n Source = Value.NativeQuery(AmazonRedshift.Database("redshift-url","dev"), "select * from dev.public.category", null, [EnableFolding=true]) \n in Source',
'let\n Source = Databricks.Catalogs("adb-123.azuredatabricks.net", "/sql/1.0/endpoints/12345dc91aa25844", [Catalog=null, Database=null]),\n hive_metastore_Database = Source{[Name="hive_metastore",Kind="Database"]}[Data],\n sandbox_revenue_Schema = hive_metastore_Database{[Name="sandbox_revenue",Kind="Schema"]}[Data],\n public_consumer_price_index_Table = sandbox_revenue_Schema{[Name="public_consumer_price_index",Kind="Table"]}[Data],\n #"Renamed Columns" = Table.RenameColumns(public_consumer_price_index_Table,{{"Country", "country"}, {"Metric", "metric"}}),\n #"Inserted Year" = Table.AddColumn(#"Renamed Columns", "ID", each Date.Year([date_id]) + Date.Month([date_id]), Text.Type),\n #"Added Custom" = Table.AddColumn(#"Inserted Year", "Custom", each Text.Combine({Number.ToText(Date.Year([date_id])), Number.ToText(Date.Month([date_id])), [country]})),\n #"Removed Columns" = Table.RemoveColumns(#"Added Custom",{"ID"}),\n #"Renamed Columns1" = Table.RenameColumns(#"Removed Columns",{{"Custom", "ID"}}),\n #"Filtered Rows" = Table.SelectRows(#"Renamed Columns1", each ([metric] = "Consumer Price Index") and (not Number.IsNaN([value])))\nin\n #"Filtered Rows"', 'let\n Source = Databricks.Catalogs("adb-123.azuredatabricks.net", "/sql/1.0/endpoints/12345dc91aa25844", [Catalog=null, Database=null]),\n hive_metastore_Database = Source{[Name="hive_metastore",Kind="Database"]}[Data],\n sandbox_revenue_Schema = hive_metastore_Database{[Name="sandbox_revenue",Kind="Schema"]}[Data],\n public_consumer_price_index_Table = sandbox_revenue_Schema{[Name="public_consumer_price_index",Kind="Table"]}[Data],\n #"Renamed Columns" = Table.RenameColumns(public_consumer_price_index_Table,{{"Country", "country"}, {"Metric", "metric"}}),\n #"Inserted Year" = Table.AddColumn(#"Renamed Columns", "ID", each Date.Year([date_id]) + Date.Month([date_id]), Text.Type),\n #"Added Custom" = Table.AddColumn(#"Inserted Year", "Custom", each Text.Combine({Number.ToText(Date.Year([date_id])), Number.ToText(Date.Month([date_id])), [country]})),\n #"Removed Columns" = Table.RemoveColumns(#"Added Custom",{"ID"}),\n #"Renamed Columns1" = Table.RenameColumns(#"Removed Columns",{{"Custom", "ID"}}),\n #"Filtered Rows" = Table.SelectRows(#"Renamed Columns1", each ([metric] = "Consumer Price Index") and (not Number.IsNaN([value])))\nin\n #"Filtered Rows"',
"let\n Source = Value.NativeQuery(Snowflake.Databases(\"bu10758.ap-unknown-2.fakecomputing.com\",\"operations_analytics_warehouse_prod\",[Role=\"OPERATIONS_ANALYTICS_MEMBER\"]){[Name=\"OPERATIONS_ANALYTICS\"]}[Data], \"select #(lf)UPPER(REPLACE(AGENT_NAME,'-','')) AS CLIENT_DIRECTOR,#(lf)TIER,#(lf)UPPER(MANAGER),#(lf)TEAM_TYPE,#(lf)DATE_TARGET,#(lf)MONTHID,#(lf)TARGET_TEAM,#(lf)SELLER_EMAIL,#(lf)concat((UPPER(REPLACE(AGENT_NAME,'-',''))), MONTHID) as AGENT_KEY,#(lf)UNIT_TARGET AS SME_Quota,#(lf)AMV_TARGET AS Revenue_Quota,#(lf)SERVICE_QUOTA,#(lf)BL_TARGET,#(lf)SOFTWARE_QUOTA as Software_Quota#(lf)#(lf)from OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_SME_UNIT_TARGETS inner join OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_SME_UNIT #(lf)#(lf)where YEAR_TARGET >= 2022#(lf)and TEAM_TYPE = 'Accounting'#(lf)and TARGET_TEAM = 'Enterprise'#(lf)AND TIER = 'Client Director'\", null, [EnableFolding=true])\nin\n Source",
] ]
def get_default_instances(
override_config: dict = {},
) -> Tuple[
PipelineContext, PowerBiDashboardSourceConfig, AbstractDataPlatformInstanceResolver
]:
config: PowerBiDashboardSourceConfig = PowerBiDashboardSourceConfig.parse_obj(
{
"tenant_id": "fake",
"client_id": "foo",
"client_secret": "bar",
**override_config,
}
)
platform_instance_resolver: AbstractDataPlatformInstanceResolver = (
create_dataplatform_instance_resolver(config)
)
return PipelineContext(run_id="fake"), config, platform_instance_resolver
@pytest.mark.integration @pytest.mark.integration
def test_parse_m_query1(): def test_parse_m_query1():
expression: str = M_QUERIES[0] expression: str = M_QUERIES[0]
@ -145,20 +172,20 @@ def test_snowflake_regular_case():
reporter = PowerBiDashboardSourceReport() reporter = PowerBiDashboardSourceReport()
ctx, config, platform_instance_resolver = get_default_instances()
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables( data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
table, reporter table,
reporter,
ctx=ctx,
config=config,
platform_instance_resolver=platform_instance_resolver,
) )
assert len(data_platform_tables) == 1 assert len(data_platform_tables) == 1
assert data_platform_tables[0].name == "TESTTABLE"
assert data_platform_tables[0].full_name == "PBI_TEST.TEST.TESTTABLE"
assert ( assert (
data_platform_tables[0].datasource_server data_platform_tables[0].urn
== "bu10758.ap-unknown-2.fakecomputing.com" == "urn:li:dataset:(urn:li:dataPlatform:snowflake,pbi_test.test.testtable,PROD)"
)
assert (
data_platform_tables[0].data_platform_pair.powerbi_data_platform_name
== SupportedDataPlatform.SNOWFLAKE.value.powerbi_data_platform_name
) )
@ -174,17 +201,21 @@ def test_postgres_regular_case():
) )
reporter = PowerBiDashboardSourceReport() reporter = PowerBiDashboardSourceReport()
ctx, config, platform_instance_resolver = get_default_instances()
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables( data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
table, reporter table,
reporter,
ctx=ctx,
config=config,
platform_instance_resolver=platform_instance_resolver,
) )
assert len(data_platform_tables) == 1 assert len(data_platform_tables) == 1
assert data_platform_tables[0].name == "order_date"
assert data_platform_tables[0].full_name == "mics.public.order_date"
assert data_platform_tables[0].datasource_server == "localhost"
assert ( assert (
data_platform_tables[0].data_platform_pair.powerbi_data_platform_name data_platform_tables[0].urn
== SupportedDataPlatform.POSTGRES_SQL.value.powerbi_data_platform_name == "urn:li:dataset:(urn:li:dataPlatform:postgres,mics.public.order_date,PROD)"
) )
@ -200,19 +231,21 @@ def test_databricks_regular_case():
) )
reporter = PowerBiDashboardSourceReport() reporter = PowerBiDashboardSourceReport()
ctx, config, platform_instance_resolver = get_default_instances()
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables( data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
table, reporter table,
reporter,
ctx=ctx,
config=config,
platform_instance_resolver=platform_instance_resolver,
) )
assert len(data_platform_tables) == 1 assert len(data_platform_tables) == 1
assert data_platform_tables[0].name == "public_consumer_price_index"
assert ( assert (
data_platform_tables[0].full_name data_platform_tables[0].urn
== "hive_metastore.sandbox_revenue.public_consumer_price_index" == "urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.sandbox_revenue.public_consumer_price_index,PROD)"
)
assert (
data_platform_tables[0].data_platform_pair.powerbi_data_platform_name
== SupportedDataPlatform.DATABRICK_SQL.value.powerbi_data_platform_name
) )
@ -228,17 +261,21 @@ def test_oracle_regular_case():
) )
reporter = PowerBiDashboardSourceReport() reporter = PowerBiDashboardSourceReport()
ctx, config, platform_instance_resolver = get_default_instances()
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables( data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
table, reporter table,
reporter,
ctx=ctx,
config=config,
platform_instance_resolver=platform_instance_resolver,
) )
assert len(data_platform_tables) == 1 assert len(data_platform_tables) == 1
assert data_platform_tables[0].name == "EMPLOYEES"
assert data_platform_tables[0].full_name == "salesdb.HR.EMPLOYEES"
assert data_platform_tables[0].datasource_server == "localhost:1521"
assert ( assert (
data_platform_tables[0].data_platform_pair.powerbi_data_platform_name data_platform_tables[0].urn
== SupportedDataPlatform.ORACLE.value.powerbi_data_platform_name == "urn:li:dataset:(urn:li:dataPlatform:oracle,salesdb.hr.employees,PROD)"
) )
@ -255,17 +292,20 @@ def test_mssql_regular_case():
reporter = PowerBiDashboardSourceReport() reporter = PowerBiDashboardSourceReport()
ctx, config, platform_instance_resolver = get_default_instances()
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables( data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
table, reporter table,
reporter,
ctx=ctx,
config=config,
platform_instance_resolver=platform_instance_resolver,
) )
assert len(data_platform_tables) == 1 assert len(data_platform_tables) == 1
assert data_platform_tables[0].name == "book_issue"
assert data_platform_tables[0].full_name == "library.dbo.book_issue"
assert data_platform_tables[0].datasource_server == "localhost"
assert ( assert (
data_platform_tables[0].data_platform_pair.powerbi_data_platform_name data_platform_tables[0].urn
== SupportedDataPlatform.MS_SQL.value.powerbi_data_platform_name == "urn:li:dataset:(urn:li:dataPlatform:mssql,library.dbo.book_issue,PROD)"
) )
@ -280,14 +320,16 @@ def test_mssql_with_query():
M_QUERIES[11], M_QUERIES[11],
] ]
expected_tables = [ expected_tables = [
"COMMOPSDB.dbo.V_OIP_ENT_2022", "urn:li:dataset:(urn:li:dataPlatform:mssql,commopsdb.dbo.v_oip_ent_2022,PROD)",
"COMMOPSDB.dbo.V_INVOICE_BOOKING_2022", "urn:li:dataset:(urn:li:dataPlatform:mssql,commopsdb.dbo.v_invoice_booking_2022,PROD)",
"COMMOPSDB.dbo.V_ARR_ADDS", "urn:li:dataset:(urn:li:dataPlatform:mssql,commopsdb.dbo.v_arr_adds,PROD)",
"COMMOPSDB.dbo.V_PS_CD_RETENTION", "urn:li:dataset:(urn:li:dataPlatform:mssql,commopsdb.dbo.v_ps_cd_retention,PROD)",
"COMMOPSDB.dbo.V_TPV_LEADERBOARD", "urn:li:dataset:(urn:li:dataPlatform:mssql,commopsdb.dbo.v_tpv_leaderboard,PROD)",
"COMMOPSDB.dbo.V_ENTERPRISE_INVOICED_REVENUE", "urn:li:dataset:(urn:li:dataPlatform:mssql,commopsdb.dbo.v_enterprise_invoiced_revenue,PROD)",
] ]
ctx, config, platform_instance_resolver = get_default_instances()
for index, query in enumerate(mssql_queries): for index, query in enumerate(mssql_queries):
table: powerbi_data_classes.Table = powerbi_data_classes.Table( table: powerbi_data_classes.Table = powerbi_data_classes.Table(
columns=[], columns=[],
@ -299,17 +341,15 @@ def test_mssql_with_query():
reporter = PowerBiDashboardSourceReport() reporter = PowerBiDashboardSourceReport()
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables( data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
table, reporter, native_query_enabled=False table,
reporter,
ctx=ctx,
config=config,
platform_instance_resolver=platform_instance_resolver,
) )
assert len(data_platform_tables) == 1 assert len(data_platform_tables) == 1
assert data_platform_tables[0].name == expected_tables[index].split(".")[2] assert data_platform_tables[0].urn == expected_tables[index]
assert data_platform_tables[0].full_name == expected_tables[index]
assert data_platform_tables[0].datasource_server == "AUPRDWHDB"
assert (
data_platform_tables[0].data_platform_pair.powerbi_data_platform_name
== SupportedDataPlatform.MS_SQL.value.powerbi_data_platform_name
)
@pytest.mark.integration @pytest.mark.integration
@ -322,12 +362,14 @@ def test_snowflake_native_query():
] ]
expected_tables = [ expected_tables = [
"OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_APS_SME_UNITS_V4", "urn:li:dataset:(urn:li:dataPlatform:snowflake,operations_analytics.transformed_prod.v_aps_sme_units_v4,PROD)",
"OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_SME_UNIT_TARGETS", "urn:li:dataset:(urn:li:dataPlatform:snowflake,operations_analytics.transformed_prod.v_sme_unit_targets,PROD)",
"OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_SME_UNIT_TARGETS", "urn:li:dataset:(urn:li:dataPlatform:snowflake,operations_analytics.transformed_prod.v_sme_unit_targets,PROD)",
"OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_SME_UNIT_TARGETS", "urn:li:dataset:(urn:li:dataPlatform:snowflake,operations_analytics.transformed_prod.v_sme_unit_targets,PROD)",
] ]
ctx, config, platform_instance_resolver = get_default_instances()
for index, query in enumerate(snowflake_queries): for index, query in enumerate(snowflake_queries):
table: powerbi_data_classes.Table = powerbi_data_classes.Table( table: powerbi_data_classes.Table = powerbi_data_classes.Table(
columns=[], columns=[],
@ -339,20 +381,15 @@ def test_snowflake_native_query():
reporter = PowerBiDashboardSourceReport() reporter = PowerBiDashboardSourceReport()
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables( data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
table, reporter table,
reporter,
ctx=ctx,
config=config,
platform_instance_resolver=platform_instance_resolver,
) )
assert len(data_platform_tables) == 1 assert len(data_platform_tables) == 1
assert data_platform_tables[0].name == expected_tables[index].split(".")[2] assert data_platform_tables[0].urn == expected_tables[index]
assert data_platform_tables[0].full_name == expected_tables[index]
assert (
data_platform_tables[0].datasource_server
== "bu10758.ap-unknown-2.fakecomputing.com"
)
assert (
data_platform_tables[0].data_platform_pair.powerbi_data_platform_name
== SupportedDataPlatform.SNOWFLAKE.value.powerbi_data_platform_name
)
def test_google_bigquery_1(): def test_google_bigquery_1():
@ -363,16 +400,20 @@ def test_google_bigquery_1():
) )
reporter = PowerBiDashboardSourceReport() reporter = PowerBiDashboardSourceReport()
ctx, config, platform_instance_resolver = get_default_instances()
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables( data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
table, reporter, native_query_enabled=False table,
reporter,
ctx=ctx,
config=config,
platform_instance_resolver=platform_instance_resolver,
) )
assert len(data_platform_tables) == 1 assert len(data_platform_tables) == 1
assert data_platform_tables[0].name == table.full_name.split(".")[2]
assert data_platform_tables[0].full_name == table.full_name
assert data_platform_tables[0].datasource_server == "seraphic-music-344307"
assert ( assert (
data_platform_tables[0].data_platform_pair.powerbi_data_platform_name data_platform_tables[0].urn
== SupportedDataPlatform.GOOGLE_BIGQUERY.value.powerbi_data_platform_name == "urn:li:dataset:(urn:li:dataPlatform:bigquery,seraphic-music-344307.school_dataset.first,PROD)"
) )
@ -387,23 +428,24 @@ def test_google_bigquery_2():
) )
reporter = PowerBiDashboardSourceReport() reporter = PowerBiDashboardSourceReport()
ctx, config, platform_instance_resolver = get_default_instances()
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables( data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
table, table,
reporter, reporter,
native_query_enabled=False,
parameters={ parameters={
"Parameter - Source": "my-test-project", "Parameter - Source": "my-test-project",
"My bq project": "gcp_billing", "My bq project": "gcp_billing",
}, },
ctx=ctx,
config=config,
platform_instance_resolver=platform_instance_resolver,
) )
assert len(data_platform_tables) == 1 assert len(data_platform_tables) == 1
assert data_platform_tables[0].name == table.full_name.split(".")[2]
assert data_platform_tables[0].full_name == table.full_name
assert data_platform_tables[0].datasource_server == "my-test-project"
assert ( assert (
data_platform_tables[0].data_platform_pair.powerbi_data_platform_name data_platform_tables[0].urn
== SupportedDataPlatform.GOOGLE_BIGQUERY.value.powerbi_data_platform_name == "urn:li:dataset:(urn:li:dataPlatform:bigquery,my-test-project.gcp_billing.gcp_table,PROD)"
) )
@ -416,23 +458,24 @@ def test_for_each_expression_1():
reporter = PowerBiDashboardSourceReport() reporter = PowerBiDashboardSourceReport()
ctx, config, platform_instance_resolver = get_default_instances()
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables( data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
table, table,
reporter, reporter,
native_query_enabled=False,
parameters={ parameters={
"Parameter - Source": "my-test-project", "Parameter - Source": "my-test-project",
"My bq project": "gcp_billing", "My bq project": "gcp_billing",
}, },
ctx=ctx,
config=config,
platform_instance_resolver=platform_instance_resolver,
) )
assert len(data_platform_tables) == 1 assert len(data_platform_tables) == 1
assert data_platform_tables[0].name == table.full_name.split(".")[2]
assert data_platform_tables[0].datasource_server == "my-test-project"
assert data_platform_tables[0].full_name == table.full_name
assert ( assert (
data_platform_tables[0].data_platform_pair.powerbi_data_platform_name data_platform_tables[0].urn
== SupportedDataPlatform.GOOGLE_BIGQUERY.value.powerbi_data_platform_name == "urn:li:dataset:(urn:li:dataPlatform:bigquery,my-test-project.universal.d_wh_date,PROD)"
) )
@ -445,22 +488,23 @@ def test_for_each_expression_2():
reporter = PowerBiDashboardSourceReport() reporter = PowerBiDashboardSourceReport()
ctx, config, platform_instance_resolver = get_default_instances()
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables( data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
table, table,
reporter, reporter,
native_query_enabled=False,
parameters={ parameters={
"dwh-prod": "originally-not-a-variable-ref-and-not-resolved", "dwh-prod": "originally-not-a-variable-ref-and-not-resolved",
}, },
ctx=ctx,
config=config,
platform_instance_resolver=platform_instance_resolver,
) )
assert len(data_platform_tables) == 1 assert len(data_platform_tables) == 1
assert data_platform_tables[0].name == table.full_name.split(".")[2]
assert data_platform_tables[0].full_name == table.full_name
assert data_platform_tables[0].datasource_server == "dwh-prod"
assert ( assert (
data_platform_tables[0].data_platform_pair.powerbi_data_platform_name data_platform_tables[0].urn
== SupportedDataPlatform.GOOGLE_BIGQUERY.value.powerbi_data_platform_name == "urn:li:dataset:(urn:li:dataPlatform:bigquery,dwh-prod.gcp_billing.d_gcp_custom_label,PROD)"
) )
@ -476,8 +520,14 @@ def test_native_query_disabled():
reporter = PowerBiDashboardSourceReport() reporter = PowerBiDashboardSourceReport()
ctx, config, platform_instance_resolver = get_default_instances()
config.native_query_parsing = False
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables( data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
table, reporter, native_query_enabled=False table,
reporter,
ctx=ctx,
config=config,
platform_instance_resolver=platform_instance_resolver,
) )
assert len(data_platform_tables) == 0 assert len(data_platform_tables) == 0
@ -493,26 +543,25 @@ def test_multi_source_table():
) )
reporter = PowerBiDashboardSourceReport() reporter = PowerBiDashboardSourceReport()
ctx, config, platform_instance_resolver = get_default_instances()
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables( data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
table, reporter, native_query_enabled=False table,
reporter,
ctx=ctx,
config=config,
platform_instance_resolver=platform_instance_resolver,
) )
assert len(data_platform_tables) == 2 assert len(data_platform_tables) == 2
assert data_platform_tables[0].full_name == "mics.public.order_date"
assert data_platform_tables[0].datasource_server == "localhost"
assert ( assert (
data_platform_tables[0].data_platform_pair.powerbi_data_platform_name data_platform_tables[0].urn
== SupportedDataPlatform.POSTGRES_SQL.value.powerbi_data_platform_name == "urn:li:dataset:(urn:li:dataPlatform:postgres,mics.public.order_date,PROD)"
)
assert data_platform_tables[1].full_name == "GSL_TEST_DB.PUBLIC.SALES_ANALYST_VIEW"
assert (
data_platform_tables[1].datasource_server
== "ghh48144.snowflakefakecomputing.com"
) )
assert ( assert (
data_platform_tables[1].data_platform_pair.powerbi_data_platform_name data_platform_tables[1].urn
== SupportedDataPlatform.SNOWFLAKE.value.powerbi_data_platform_name == "urn:li:dataset:(urn:li:dataPlatform:snowflake,gsl_test_db.public.sales_analyst_view,PROD)"
) )
@ -521,36 +570,33 @@ def test_table_combine():
table: powerbi_data_classes.Table = powerbi_data_classes.Table( table: powerbi_data_classes.Table = powerbi_data_classes.Table(
columns=[], columns=[],
measures=[], measures=[],
expression=M_QUERIES[16], # 1st index has the native query expression=M_QUERIES[16],
name="virtual_order_table", name="virtual_order_table",
full_name="OrderDataSet.virtual_order_table", full_name="OrderDataSet.virtual_order_table",
) )
reporter = PowerBiDashboardSourceReport() reporter = PowerBiDashboardSourceReport()
ctx, config, platform_instance_resolver = get_default_instances()
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables( data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
table, reporter table,
reporter,
ctx=ctx,
config=config,
platform_instance_resolver=platform_instance_resolver,
) )
assert len(data_platform_tables) == 2 assert len(data_platform_tables) == 2
assert data_platform_tables[0].full_name == "GSL_TEST_DB.PUBLIC.SALES_FORECAST"
assert ( assert (
data_platform_tables[0].datasource_server data_platform_tables[0].urn
== "ghh48144.snowflakefakecomputing.com" == "urn:li:dataset:(urn:li:dataPlatform:snowflake,gsl_test_db.public.sales_forecast,PROD)"
)
assert (
data_platform_tables[0].data_platform_pair.powerbi_data_platform_name
== SupportedDataPlatform.SNOWFLAKE.value.powerbi_data_platform_name
) )
assert data_platform_tables[1].full_name == "GSL_TEST_DB.PUBLIC.SALES_ANALYST"
assert ( assert (
data_platform_tables[1].datasource_server data_platform_tables[1].urn
== "ghh48144.snowflakefakecomputing.com" == "urn:li:dataset:(urn:li:dataPlatform:snowflake,gsl_test_db.public.sales_analyst,PROD)"
)
assert (
data_platform_tables[1].data_platform_pair.powerbi_data_platform_name
== SupportedDataPlatform.SNOWFLAKE.value.powerbi_data_platform_name
) )
@ -574,8 +620,14 @@ def test_expression_is_none():
reporter = PowerBiDashboardSourceReport() reporter = PowerBiDashboardSourceReport()
ctx, config, platform_instance_resolver = get_default_instances()
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables( data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
table, reporter table,
reporter,
ctx=ctx,
config=config,
platform_instance_resolver=platform_instance_resolver,
) )
assert len(data_platform_tables) == 0 assert len(data_platform_tables) == 0
@ -589,15 +641,20 @@ def test_redshift_regular_case():
) )
reporter = PowerBiDashboardSourceReport() reporter = PowerBiDashboardSourceReport()
ctx, config, platform_instance_resolver = get_default_instances()
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables( data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
table, reporter, native_query_enabled=False table,
reporter,
ctx=ctx,
config=config,
platform_instance_resolver=platform_instance_resolver,
) )
assert len(data_platform_tables) == 1 assert len(data_platform_tables) == 1
assert data_platform_tables[0].name == table.full_name.split(".")[2]
assert data_platform_tables[0].full_name == table.full_name
assert ( assert (
data_platform_tables[0].data_platform_pair.powerbi_data_platform_name data_platform_tables[0].urn
== SupportedDataPlatform.AMAZON_REDSHIFT.value.powerbi_data_platform_name == "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.category,PROD)"
) )
@ -609,13 +666,60 @@ def test_redshift_native_query():
) )
reporter = PowerBiDashboardSourceReport() reporter = PowerBiDashboardSourceReport()
ctx, config, platform_instance_resolver = get_default_instances()
config.native_query_parsing = True
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables( data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
table, reporter, native_query_enabled=True table,
reporter,
ctx=ctx,
config=config,
platform_instance_resolver=platform_instance_resolver,
) )
assert len(data_platform_tables) == 1 assert len(data_platform_tables) == 1
assert data_platform_tables[0].name == table.full_name.split(".")[2]
assert data_platform_tables[0].full_name == table.full_name
assert ( assert (
data_platform_tables[0].data_platform_pair.powerbi_data_platform_name data_platform_tables[0].urn
== SupportedDataPlatform.AMAZON_REDSHIFT.value.powerbi_data_platform_name == "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.category,PROD)"
)
def test_sqlglot_parser():
table: powerbi_data_classes.Table = powerbi_data_classes.Table(
expression=M_QUERIES[24],
name="SALES_TARGET",
full_name="dev.public.sales",
)
reporter = PowerBiDashboardSourceReport()
ctx, config, platform_instance_resolver = get_default_instances(
override_config={
"server_to_platform_instance": {
"bu10758.ap-unknown-2.fakecomputing.com": {
"platform_instance": "sales_deployment",
"env": "PROD",
}
},
"native_query_parsing": True,
"enable_advance_lineage_sql_construct": True,
}
)
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
table,
reporter,
ctx=ctx,
config=config,
platform_instance_resolver=platform_instance_resolver,
)
assert len(data_platform_tables) == 2
assert (
data_platform_tables[0].urn
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,sales_deployment.operations_analytics.transformed_prod.v_sme_unit,PROD)"
)
assert (
data_platform_tables[1].urn
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,sales_deployment.operations_analytics.transformed_prod.v_sme_unit_targets,PROD)"
) )

View File

@ -791,11 +791,9 @@ def test_tableau_unsupported_csql(mock_datahub_graph):
database_override_map={"production database": "prod"} database_override_map={"production database": "prod"}
) )
with mock.patch( with mock.patch("datahub.ingestion.source.tableau.sqlglot_l") as sqlglot_lineage:
"datahub.ingestion.source.tableau.sqlglot_lineage"
) as sqlglot_lineage:
sqlglot_lineage.return_value = SqlParsingResult( # type:ignore sqlglot_lineage.create_lineage_sql_parsed_result.return_value = SqlParsingResult( # type:ignore
in_tables=[ in_tables=[
"urn:li:dataset:(urn:li:dataPlatform:bigquery,my_bigquery_project.invent_dw.userdetail,PROD)" "urn:li:dataset:(urn:li:dataPlatform:bigquery,my_bigquery_project.invent_dw.userdetail,PROD)"
], ],