diff --git a/metadata-ingestion/docs/sources/powerbi/powerbi_recipe.yml b/metadata-ingestion/docs/sources/powerbi/powerbi_recipe.yml index 125ce28952..d63963a608 100644 --- a/metadata-ingestion/docs/sources/powerbi/powerbi_recipe.yml +++ b/metadata-ingestion/docs/sources/powerbi/powerbi_recipe.yml @@ -3,30 +3,42 @@ source: config: # Your Power BI tenant identifier tenant_id: a949d688-67c0-4bf1-a344-e939411c6c0a + + # Azure AD Application identifier + client_id: foo + # Azure AD App client secret + client_secret: bar + # Ingest elements of below PowerBi Workspace into Datahub workspace_id_pattern: allow: - 4bd10256-e999-45dd-8e56-571c77153a5f deny: - # Workspaces dataset environments (PROD, DEV, QA, STAGE) - env: DEV - # Azure AD Application identifier - client_id: foo - # Azure AD App client secret - client_secret: bar - # Enable / Disable ingestion of user information for dashboards + + # Enable / Disable ingestion of ownership information for dashboards extract_ownership: true + # Enable/Disable extracting workspace information to DataHub containers extract_workspaces_to_containers: true + # Enable / Disable ingestion of endorsements. # Please notice that this may overwrite any existing tags defined to ingested entities! extract_endorsements_to_tags: false - # dataset_type_mapping is fixed mapping of Power BI datasources type to equivalent Datahub "data platform" dataset - dataset_type_mapping: - PostgreSql: postgres - Oracle: oracle - Sql: mssql - GoogleBigQuery: bigquery + + # Optional -- This mapping is optional and only required to configure platform-instance for upstream tables + # A mapping of PowerBI datasource's server i.e host[:port] to data platform instance. + # :port is optional and only needed if your datasource server is running on non-standard port. + # For Google BigQuery the datasource's server is google bigquery project name + server_to_platform_instance: + ap-south-1.snowflakecomputing.com: + platform_instance: operational_instance + env: DEV + oracle-server:1920: + platform_instance: high_performance_production_unit + env: PROD + big-query-sales-project: + platform_instance: sn-2 + env: QA sink: # sink configs diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py index e721f6d38a..905a098c5f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py @@ -1,5 +1,6 @@ import logging from dataclasses import dataclass, field as dataclass_field +from enum import Enum from typing import Dict, List, Optional, Union import pydantic @@ -7,7 +8,8 @@ from pydantic import validator from pydantic.class_validators import root_validator import datahub.emitter.mce_builder as builder -from datahub.configuration.common import AllowDenyPattern +from datahub.configuration.common import AllowDenyPattern, ConfigModel +from datahub.configuration.pydantic_field_deprecation import pydantic_field_deprecated from datahub.configuration.source_common import DEFAULT_ENV, DatasetSourceConfigMixin from datahub.ingestion.source.common.subtypes import BIAssetSubTypes from datahub.ingestion.source.state.stale_entity_removal_handler import ( @@ -105,6 +107,40 @@ class Constant: DATASET_WEB_URL = "datasetWebUrl" +@dataclass +class DataPlatformPair: + datahub_data_platform_name: str + powerbi_data_platform_name: str + + +class SupportedDataPlatform(Enum): + POSTGRES_SQL = DataPlatformPair( + powerbi_data_platform_name="PostgreSQL", datahub_data_platform_name="postgres" + ) + + ORACLE = DataPlatformPair( + powerbi_data_platform_name="Oracle", datahub_data_platform_name="oracle" + ) + + SNOWFLAKE = DataPlatformPair( + powerbi_data_platform_name="Snowflake", datahub_data_platform_name="snowflake" + ) + + MS_SQL = DataPlatformPair( + powerbi_data_platform_name="Sql", datahub_data_platform_name="mssql" + ) + + GOOGLE_BIGQUERY = DataPlatformPair( + powerbi_data_platform_name="GoogleBigQuery", + datahub_data_platform_name="bigquery", + ) + + AMAZON_REDSHIFT = DataPlatformPair( + powerbi_data_platform_name="AmazonRedshift", + datahub_data_platform_name="redshift", + ) + + @dataclass class PowerBiDashboardSourceReport(StaleEntityRemovalSourceReport): dashboards_scanned: int = 0 @@ -129,15 +165,26 @@ class PowerBiDashboardSourceReport(StaleEntityRemovalSourceReport): self.number_of_workspaces = number_of_workspaces -@dataclass -class PlatformDetail: +def default_for_dataset_type_mapping() -> Dict[str, str]: + dict_: dict = {} + for item in SupportedDataPlatform: + dict_[ + item.value.powerbi_data_platform_name + ] = item.value.datahub_data_platform_name + + return dict_ + + +class PlatformDetail(ConfigModel): platform_instance: Optional[str] = pydantic.Field( default=None, - description="DataHub platform instance name. It should be same as you have used in ingestion receipe of DataHub platform ingestion source of particular platform", + description="DataHub platform instance name. To generate correct urn for upstream dataset, this should match " + "with platform instance name used in ingestion" + "recipe of other datahub sources.", ) env: str = pydantic.Field( default=DEFAULT_ENV, - description="The environment that all assets produced by DataHub platform ingestion source belong to", + description="The environment that the platform is located in. It is default to PROD", ) @@ -157,7 +204,9 @@ class PowerBiDashboardSourceConfig( tenant_id: str = pydantic.Field(description="PowerBI tenant identifier") # PowerBi workspace identifier workspace_id: Optional[str] = pydantic.Field( - description="[deprecated] Use workspace_id_pattern instead", default=None + default=None, + description="[deprecated] Use workspace_id_pattern instead", + hidden_from_docs=True, ) # PowerBi workspace identifier workspace_id_pattern: AllowDenyPattern = pydantic.Field( @@ -171,7 +220,23 @@ class PowerBiDashboardSourceConfig( dataset_type_mapping: Union[ Dict[str, str], Dict[str, PlatformDetail] ] = pydantic.Field( - description="Mapping of PowerBI datasource type to DataHub supported data-sources. See Quickstart Recipe for mapping" + default_factory=default_for_dataset_type_mapping, + description="[deprecated] Use server_to_platform_instance instead. Mapping of PowerBI datasource type to " + "DataHub supported datasources." + "You can configured platform instance for dataset lineage. " + "See Quickstart Recipe for mapping", + hidden_from_docs=True, + ) + # PowerBI datasource's server to platform instance mapping + server_to_platform_instance: Dict[str, PlatformDetail] = pydantic.Field( + default={}, + description="A mapping of PowerBI datasource's server i.e host[:port] to Data platform instance." + " :port is optional and only needed if your datasource server is running on non-standard port." + "For Google BigQuery the datasource's server is google bigquery project name", + ) + # deprecated warning + _dataset_type_mapping = pydantic_field_deprecated( + "dataset_type_mapping", "server_to_platform_instance" ) # Azure app client identifier client_id: str = pydantic.Field(description="Azure app client identifier") @@ -259,14 +324,28 @@ class PowerBiDashboardSourceConfig( if workspace_id_pattern == AllowDenyPattern.allow_all() and workspace_id: logger.warning( - "workspace_id_pattern is not set but workspace_id is set, setting workspace_id as workspace_id_pattern. workspace_id will be deprecated, please use workspace_id_pattern instead." + "workspace_id_pattern is not set but workspace_id is set, setting workspace_id as " + "workspace_id_pattern. workspace_id will be deprecated, please use workspace_id_pattern instead." ) values["workspace_id_pattern"] = AllowDenyPattern( allow=[f"^{workspace_id}$"] ) elif workspace_id_pattern != AllowDenyPattern.allow_all() and workspace_id: logger.warning( - "workspace_id will be ignored in favour of workspace_id_pattern. workspace_id will be deprecated, please use workspace_id_pattern only." + "workspace_id will be ignored in favour of workspace_id_pattern. workspace_id will be deprecated, " + "please use workspace_id_pattern only." ) values.pop("workspace_id") return values + + @root_validator(pre=True) + def raise_error_for_dataset_type_mapping(cls, values: Dict) -> Dict: + if ( + values.get("dataset_type_mapping") is not None + and values.get("server_to_platform_instance") is not None + ): + raise ValueError( + "dataset_type_mapping is deprecated. Use server_to_platform_instance only." + ) + + return values diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/dataplatform_instance_resolver.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/dataplatform_instance_resolver.py new file mode 100644 index 0000000000..396da2d79e --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/dataplatform_instance_resolver.py @@ -0,0 +1,75 @@ +import logging +from abc import ABC, abstractmethod +from typing import Union + +from datahub.ingestion.source.powerbi.config import ( + PlatformDetail, + PowerBiDashboardSourceConfig, +) +from datahub.ingestion.source.powerbi.m_query.resolver import DataPlatformTable + +logger = logging.getLogger(__name__) + + +class AbstractDataPlatformInstanceResolver(ABC): + @abstractmethod + def get_platform_instance( + self, dataplatform_table: DataPlatformTable + ) -> PlatformDetail: + pass + + +class BaseAbstractDataPlatformInstanceResolver( + AbstractDataPlatformInstanceResolver, ABC +): + config: PowerBiDashboardSourceConfig + + def __init__(self, config): + self.config = config + + +class ResolvePlatformInstanceFromDatasetTypeMapping( + BaseAbstractDataPlatformInstanceResolver +): + def get_platform_instance( + self, dataplatform_table: DataPlatformTable + ) -> PlatformDetail: + platform: Union[str, PlatformDetail] = self.config.dataset_type_mapping[ + dataplatform_table.data_platform_pair.powerbi_data_platform_name + ] + + if isinstance(platform, PlatformDetail): + return platform + + return PlatformDetail.parse_obj({}) + + +class ResolvePlatformInstanceFromServerToPlatformInstance( + BaseAbstractDataPlatformInstanceResolver +): + def get_platform_instance( + self, dataplatform_table: DataPlatformTable + ) -> PlatformDetail: + return ( + self.config.server_to_platform_instance[ + dataplatform_table.datasource_server + ] + if dataplatform_table.datasource_server + in self.config.server_to_platform_instance + else PlatformDetail.parse_obj({}) + ) + + +def create_dataplatform_instance_resolver( + config: PowerBiDashboardSourceConfig, +) -> AbstractDataPlatformInstanceResolver: + if config.server_to_platform_instance: + logger.debug( + "Creating resolver to resolve platform instance from server_to_platform_instance" + ) + return ResolvePlatformInstanceFromServerToPlatformInstance(config) + + logger.debug( + "Creating resolver to resolve platform instance from dataset_type_mapping" + ) + return ResolvePlatformInstanceFromDatasetTypeMapping(config) diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py index 82968708a9..e9ae87251a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py @@ -6,7 +6,11 @@ from typing import Any, Dict, List, Optional, Tuple, Type, Union, cast from lark import Tree -from datahub.ingestion.source.powerbi.config import PowerBiDashboardSourceReport +from datahub.ingestion.source.powerbi.config import ( + DataPlatformPair, + PowerBiDashboardSourceReport, + SupportedDataPlatform, +) from datahub.ingestion.source.powerbi.m_query import native_sql_parser, tree_function from datahub.ingestion.source.powerbi.m_query.data_classes import ( TRACE_POWERBI_MQUERY_PARSER, @@ -18,52 +22,19 @@ from datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes import Table logger = logging.getLogger(__name__) -@dataclass -class DataPlatformPair: - datahub_data_platform_name: str - powerbi_data_platform_name: str - - @dataclass class DataPlatformTable: name: str full_name: str + datasource_server: str data_platform_pair: DataPlatformPair -class SupportedDataPlatform(Enum): - POSTGRES_SQL = DataPlatformPair( - powerbi_data_platform_name="PostgreSQL", datahub_data_platform_name="postgres" - ) - - ORACLE = DataPlatformPair( - powerbi_data_platform_name="Oracle", datahub_data_platform_name="oracle" - ) - - SNOWFLAKE = DataPlatformPair( - powerbi_data_platform_name="Snowflake", datahub_data_platform_name="snowflake" - ) - - MS_SQL = DataPlatformPair( - powerbi_data_platform_name="Sql", datahub_data_platform_name="mssql" - ) - - GOOGLE_BIGQUERY = DataPlatformPair( - powerbi_data_platform_name="GoogleBigQuery", - datahub_data_platform_name="bigquery", - ) - - AMAZON_REDSHIFT = DataPlatformPair( - powerbi_data_platform_name="AmazonRedshift", - datahub_data_platform_name="redshift", - ) - - -class AbstractTableFullNameCreator(ABC): +class AbstractDataPlatformTableCreator(ABC): @abstractmethod - def get_full_table_names( + def create_dataplatform_tables( self, data_access_func_detail: DataAccessFunctionDetail - ) -> List[str]: + ) -> List[DataPlatformTable]: pass @abstractmethod @@ -71,19 +42,20 @@ class AbstractTableFullNameCreator(ABC): pass @staticmethod - def get_db_name_from_second_argument(arg_list: Tree) -> Optional[str]: + def get_db_detail_from_argument( + arg_list: Tree, + ) -> Tuple[Optional[str], Optional[str]]: arguments: List[str] = tree_function.strip_char_from_list( values=tree_function.remove_whitespaces_from_list( tree_function.token_values(arg_list) ), - char='"', ) if len(arguments) < 2: logger.debug(f"Expected minimum 2 arguments, but got {len(arguments)}") - return None + return None, None - return arguments[1] + return arguments[0], arguments[1] class AbstractDataAccessMQueryResolver(ABC): @@ -140,7 +112,6 @@ class MQueryResolver(AbstractDataAccessMQueryResolver, ABC): cast(Tree, item_selector), parameters=self.parameters ) ), - '"', ) identifier: List[str] = tree_function.token_values( cast(Tree, identifier_tree) @@ -382,25 +353,18 @@ class MQueryResolver(AbstractDataAccessMQueryResolver, ABC): ) continue - table_full_name_creator: AbstractTableFullNameCreator = ( + table_full_name_creator: AbstractDataPlatformTableCreator = ( supported_resolver.get_table_full_name_creator()() ) - for table_full_name in table_full_name_creator.get_full_table_names( - f_detail - ): - data_platform_tables.append( - DataPlatformTable( - name=table_full_name.split(".")[-1], - full_name=table_full_name, - data_platform_pair=table_full_name_creator.get_platform_pair(), - ) - ) + data_platform_tables.extend( + table_full_name_creator.create_dataplatform_tables(f_detail) + ) return data_platform_tables -class DefaultTwoStepDataAccessSources(AbstractTableFullNameCreator, ABC): +class DefaultTwoStepDataAccessSources(AbstractDataPlatformTableCreator, ABC): """ These are the DataSource for which PowerBI Desktop generates default M-Query of following pattern let @@ -412,19 +376,16 @@ class DefaultTwoStepDataAccessSources(AbstractTableFullNameCreator, ABC): def two_level_access_pattern( self, data_access_func_detail: DataAccessFunctionDetail - ) -> List[str]: - full_table_names: List[str] = [] - + ) -> List[DataPlatformTable]: logger.debug( - f"Processing {self.get_platform_pair().powerbi_data_platform_name} function detail {data_access_func_detail}" + f"Processing {self.get_platform_pair().powerbi_data_platform_name} data-access function detail {data_access_func_detail}" ) - db_name: Optional[str] = self.get_db_name_from_second_argument( + server, db_name = self.get_db_detail_from_argument( data_access_func_detail.arg_list ) - if db_name is None: - logger.debug("db_name not found in expression") - return full_table_names # Return empty list + if server is None or db_name is None: + return [] # Return empty list schema_name: str = cast( IdentifierAccessor, data_access_func_detail.identifier_accessor @@ -434,38 +395,44 @@ class DefaultTwoStepDataAccessSources(AbstractTableFullNameCreator, ABC): IdentifierAccessor, data_access_func_detail.identifier_accessor ).items["Item"] - full_table_names.append(f"{db_name}.{schema_name}.{table_name}") + full_table_name: str = f"{db_name}.{schema_name}.{table_name}" logger.debug( - f"Platform({self.get_platform_pair().datahub_data_platform_name}) full-table-names = {full_table_names}" + f"Platform({self.get_platform_pair().datahub_data_platform_name}) full_table_name= {full_table_name}" ) - return full_table_names + return [ + DataPlatformTable( + name=table_name, + full_name=full_table_name, + datasource_server=server, + data_platform_pair=self.get_platform_pair(), + ) + ] -class PostgresTableFullNameCreator(DefaultTwoStepDataAccessSources): - def get_full_table_names( +class PostgresDataPlatformTableCreator(DefaultTwoStepDataAccessSources): + def create_dataplatform_tables( self, data_access_func_detail: DataAccessFunctionDetail - ) -> List[str]: + ) -> List[DataPlatformTable]: return self.two_level_access_pattern(data_access_func_detail) def get_platform_pair(self) -> DataPlatformPair: return SupportedDataPlatform.POSTGRES_SQL.value -class MSSqlTableFullNameCreator(DefaultTwoStepDataAccessSources): +class MSSqlDataPlatformTableCreator(DefaultTwoStepDataAccessSources): def get_platform_pair(self) -> DataPlatformPair: return SupportedDataPlatform.MS_SQL.value - def get_full_table_names( + def create_dataplatform_tables( self, data_access_func_detail: DataAccessFunctionDetail - ) -> List[str]: - full_table_names: List[str] = [] + ) -> List[DataPlatformTable]: + dataplatform_tables: List[DataPlatformTable] = [] arguments: List[str] = tree_function.strip_char_from_list( values=tree_function.remove_whitespaces_from_list( tree_function.token_values(data_access_func_detail.arg_list) ), - char='"', ) if len(arguments) == 2: @@ -475,9 +442,10 @@ class MSSqlTableFullNameCreator(DefaultTwoStepDataAccessSources): if len(arguments) >= 4 and arguments[2] != "Query": logger.debug("Unsupported case is found. Second index is not the Query") - return full_table_names + return dataplatform_tables db_name: str = arguments[1] + tables: List[str] = native_sql_parser.get_tables(arguments[3]) for table in tables: schema_and_table: List[str] = table.split(".") @@ -486,34 +454,42 @@ class MSSqlTableFullNameCreator(DefaultTwoStepDataAccessSources): # https://learn.microsoft.com/en-us/sql/relational-databases/security/authentication-access/ownership-and-user-schema-separation?view=sql-server-ver16 schema_and_table.insert(0, "dbo") - full_table_names.append( - f"{db_name}.{schema_and_table[0]}.{schema_and_table[1]}" + dataplatform_tables.append( + DataPlatformTable( + name=schema_and_table[1], + full_name=f"{db_name}.{schema_and_table[0]}.{schema_and_table[1]}", + datasource_server=arguments[0], + data_platform_pair=self.get_platform_pair(), + ) ) - logger.debug("MS-SQL full-table-names %s", full_table_names) + logger.debug("MS-SQL full-table-names %s", dataplatform_tables) - return full_table_names + return dataplatform_tables -class OracleTableFullNameCreator(AbstractTableFullNameCreator): +class OracleDataPlatformTableCreator(AbstractDataPlatformTableCreator): def get_platform_pair(self) -> DataPlatformPair: return SupportedDataPlatform.ORACLE.value - def _get_db_name(self, value: str) -> Optional[str]: - error_message: str = f"The target argument ({value}) should in the format of :/[.]" + @staticmethod + def _get_server_and_db_name(value: str) -> Tuple[Optional[str], Optional[str]]: + error_message: str = ( + f"The target argument ({value}) should in the format of :/[" + ".]" + ) splitter_result: List[str] = value.split("/") if len(splitter_result) != 2: logger.debug(error_message) - return None + return None, None db_name = splitter_result[1].split(".")[0] - return db_name + return tree_function.strip_char_from_list([splitter_result[0]])[0], db_name - def get_full_table_names( + def create_dataplatform_tables( self, data_access_func_detail: DataAccessFunctionDetail - ) -> List[str]: - full_table_names: List[str] = [] + ) -> List[DataPlatformTable]: logger.debug( f"Processing Oracle data-access function detail {data_access_func_detail}" @@ -523,9 +499,10 @@ class OracleTableFullNameCreator(AbstractTableFullNameCreator): tree_function.token_values(data_access_func_detail.arg_list) ) - db_name: Optional[str] = self._get_db_name(arguments[0]) - if db_name is None: - return full_table_names + server, db_name = self._get_server_and_db_name(arguments[0]) + + if db_name is None or server is None: + return [] schema_name: str = cast( IdentifierAccessor, data_access_func_detail.identifier_accessor @@ -536,19 +513,32 @@ class OracleTableFullNameCreator(AbstractTableFullNameCreator): cast(IdentifierAccessor, data_access_func_detail.identifier_accessor).next, ).items["Name"] - full_table_names.append(f"{db_name}.{schema_name}.{table_name}") - - return full_table_names + return [ + DataPlatformTable( + name=table_name, + full_name=f"{db_name}.{schema_name}.{table_name}", + datasource_server=server, + data_platform_pair=self.get_platform_pair(), + ) + ] -class DefaultThreeStepDataAccessSources(AbstractTableFullNameCreator, ABC): - def get_full_table_names( +class DefaultThreeStepDataAccessSources(AbstractDataPlatformTableCreator, ABC): + def get_datasource_server( + self, arguments: List[str], data_access_func_detail: DataAccessFunctionDetail + ) -> str: + return tree_function.strip_char_from_list([arguments[0]])[0] + + def create_dataplatform_tables( self, data_access_func_detail: DataAccessFunctionDetail - ) -> List[str]: - + ) -> List[DataPlatformTable]: logger.debug( f"Processing {self.get_platform_pair().datahub_data_platform_name} function detail {data_access_func_detail}" ) + + arguments: List[str] = tree_function.remove_whitespaces_from_list( + tree_function.token_values(data_access_func_detail.arg_list) + ) # First is database name db_name: str = data_access_func_detail.identifier_accessor.items["Name"] # type: ignore # Second is schema name @@ -566,37 +556,56 @@ class DefaultThreeStepDataAccessSources(AbstractTableFullNameCreator, ABC): f"{self.get_platform_pair().datahub_data_platform_name} full-table-name {full_table_name}" ) - return [full_table_name] + return [ + DataPlatformTable( + name=table_name, + full_name=full_table_name, + datasource_server=self.get_datasource_server( + arguments, data_access_func_detail + ), + data_platform_pair=self.get_platform_pair(), + ) + ] -class SnowflakeTableFullNameCreator(DefaultThreeStepDataAccessSources): +class SnowflakeDataPlatformTableCreator(DefaultThreeStepDataAccessSources): def get_platform_pair(self) -> DataPlatformPair: return SupportedDataPlatform.SNOWFLAKE.value -class GoogleBigQueryTableFullNameCreator(DefaultThreeStepDataAccessSources): +class GoogleBigQueryDataPlatformTableCreator(DefaultThreeStepDataAccessSources): def get_platform_pair(self) -> DataPlatformPair: return SupportedDataPlatform.GOOGLE_BIGQUERY.value + def get_datasource_server( + self, arguments: List[str], data_access_func_detail: DataAccessFunctionDetail + ) -> str: + # In Google BigQuery server is project-name + # condition to silent lint, it is not going to be None + return ( + data_access_func_detail.identifier_accessor.items["Name"] + if data_access_func_detail.identifier_accessor is not None + else str() + ) -class AmazonRedshiftFullNameCreator(AbstractTableFullNameCreator): + +class AmazonRedshiftDataPlatformTableCreator(AbstractDataPlatformTableCreator): def get_platform_pair(self) -> DataPlatformPair: return SupportedDataPlatform.AMAZON_REDSHIFT.value - def get_full_table_names( + def create_dataplatform_tables( self, data_access_func_detail: DataAccessFunctionDetail - ) -> List[str]: - full_table_names: List[str] = [] + ) -> List[DataPlatformTable]: logger.debug( f"Processing AmazonRedshift data-access function detail {data_access_func_detail}" ) - db_name: Optional[str] = self.get_db_name_from_second_argument( + server, db_name = self.get_db_detail_from_argument( data_access_func_detail.arg_list ) - if db_name is None: - return full_table_names # Return empty list + if db_name is None or server is None: + return [] # Return empty list schema_name: str = cast( IdentifierAccessor, data_access_func_detail.identifier_accessor @@ -607,12 +616,17 @@ class AmazonRedshiftFullNameCreator(AbstractTableFullNameCreator): cast(IdentifierAccessor, data_access_func_detail.identifier_accessor).next, ).items["Name"] - full_table_names.append(f"{db_name}.{schema_name}.{table_name}") - - return full_table_names + return [ + DataPlatformTable( + name=table_name, + full_name=f"{db_name}.{schema_name}.{table_name}", + datasource_server=server, + data_platform_pair=self.get_platform_pair(), + ) + ] -class NativeQueryTableFullNameCreator(AbstractTableFullNameCreator): +class NativeQueryDataPlatformTableCreator(AbstractDataPlatformTableCreator): SUPPORTED_NATIVE_QUERY_DATA_PLATFORM: dict = { SupportedDataPlatform.SNOWFLAKE.value.powerbi_data_platform_name: SupportedDataPlatform.SNOWFLAKE, SupportedDataPlatform.AMAZON_REDSHIFT.value.powerbi_data_platform_name: SupportedDataPlatform.AMAZON_REDSHIFT, @@ -626,13 +640,13 @@ class NativeQueryTableFullNameCreator(AbstractTableFullNameCreator): def is_native_parsing_supported(data_access_function_name: str) -> bool: return ( data_access_function_name - in NativeQueryTableFullNameCreator.SUPPORTED_NATIVE_QUERY_DATA_PLATFORM + in NativeQueryDataPlatformTableCreator.SUPPORTED_NATIVE_QUERY_DATA_PLATFORM ) - def get_full_table_names( + def create_dataplatform_tables( self, data_access_func_detail: DataAccessFunctionDetail - ) -> List[str]: - full_table_names: List[str] = [] + ) -> List[DataPlatformTable]: + dataplatform_tables: List[DataPlatformTable] = [] t1: Tree = cast( Tree, tree_function.first_arg_list_func(data_access_func_detail.arg_list) ) @@ -643,11 +657,11 @@ class NativeQueryTableFullNameCreator(AbstractTableFullNameCreator): f"Expecting 2 argument, actual argument count is {len(flat_argument_list)}" ) logger.debug(f"Flat argument list = {flat_argument_list}") - return full_table_names - + return dataplatform_tables data_access_tokens: List[str] = tree_function.remove_whitespaces_from_list( tree_function.token_values(flat_argument_list[0]) ) + if not self.is_native_parsing_supported(data_access_tokens[0]): logger.debug( f"Unsupported native-query data-platform = {data_access_tokens[0]}" @@ -655,7 +669,13 @@ class NativeQueryTableFullNameCreator(AbstractTableFullNameCreator): logger.debug( f"NativeQuery is supported only for {self.SUPPORTED_NATIVE_QUERY_DATA_PLATFORM}" ) - return full_table_names + + if len(data_access_tokens[0]) < 3: + logger.debug( + f"Server is not available in argument list for data-platform {data_access_tokens[0]}. Returning empty " + "list" + ) + return dataplatform_tables self.current_data_platform = self.SUPPORTED_NATIVE_QUERY_DATA_PLATFORM[ data_access_tokens[0] @@ -665,7 +685,6 @@ class NativeQueryTableFullNameCreator(AbstractTableFullNameCreator): values=tree_function.remove_whitespaces_from_list( tree_function.token_values(flat_argument_list[1]) ), - char='"', )[ 0 ] # Remove any whitespaces and double quotes character @@ -677,9 +696,18 @@ class NativeQueryTableFullNameCreator(AbstractTableFullNameCreator): ) continue - full_table_names.append(table) + dataplatform_tables.append( + DataPlatformTable( + name=table.split(".")[2], + full_name=table, + datasource_server=tree_function.strip_char_from_list( + [data_access_tokens[2]] + )[0], + data_platform_pair=self.get_platform_pair(), + ) + ) - return full_table_names + return dataplatform_tables class FunctionName(Enum): @@ -694,41 +722,41 @@ class FunctionName(Enum): class SupportedResolver(Enum): POSTGRES_SQL = ( - PostgresTableFullNameCreator, + PostgresDataPlatformTableCreator, FunctionName.POSTGRESQL_DATA_ACCESS, ) ORACLE = ( - OracleTableFullNameCreator, + OracleDataPlatformTableCreator, FunctionName.ORACLE_DATA_ACCESS, ) SNOWFLAKE = ( - SnowflakeTableFullNameCreator, + SnowflakeDataPlatformTableCreator, FunctionName.SNOWFLAKE_DATA_ACCESS, ) MS_SQL = ( - MSSqlTableFullNameCreator, + MSSqlDataPlatformTableCreator, FunctionName.MSSQL_DATA_ACCESS, ) GOOGLE_BIG_QUERY = ( - GoogleBigQueryTableFullNameCreator, + GoogleBigQueryDataPlatformTableCreator, FunctionName.GOOGLE_BIGQUERY_DATA_ACCESS, ) AMAZON_REDSHIFT = ( - AmazonRedshiftFullNameCreator, + AmazonRedshiftDataPlatformTableCreator, FunctionName.AMAZON_REDSHIFT_DATA_ACCESS, ) NATIVE_QUERY = ( - NativeQueryTableFullNameCreator, + NativeQueryDataPlatformTableCreator, FunctionName.NATIVE_QUERY, ) - def get_table_full_name_creator(self) -> Type[AbstractTableFullNameCreator]: + def get_table_full_name_creator(self) -> Type[AbstractDataPlatformTableCreator]: return self.value[0] def get_function_name(self) -> str: diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/tree_function.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/tree_function.py index a571691683..f6c65eac7c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/tree_function.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/tree_function.py @@ -120,7 +120,7 @@ def remove_whitespaces_from_list(values: List[str]) -> List[str]: return result -def strip_char_from_list(values: List[str], char: str) -> List[str]: +def strip_char_from_list(values: List[str], char: str = '"') -> List[str]: result: List[str] = [] for item in values: result.append(item.strip(char)) diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py index 16ac63a050..5371a86487 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py @@ -5,11 +5,10 @@ ######################################################### import logging -from typing import Iterable, List, Optional, Tuple, Union, cast +from typing import Iterable, List, Optional, Tuple import datahub.emitter.mce_builder as builder import datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes as powerbi_data_classes -from datahub.configuration.source_common import DEFAULT_ENV from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp_builder import gen_containers from datahub.ingestion.api.common import PipelineContext @@ -30,6 +29,10 @@ from datahub.ingestion.source.powerbi.config import ( PowerBiDashboardSourceConfig, PowerBiDashboardSourceReport, ) +from datahub.ingestion.source.powerbi.dataplatform_instance_resolver import ( + AbstractDataPlatformInstanceResolver, + create_dataplatform_instance_resolver, +) from datahub.ingestion.source.powerbi.m_query import parser, resolver from datahub.ingestion.source.powerbi.rest_api_wrapper.powerbi_api import PowerBiAPI from datahub.ingestion.source.state.sql_common_state import ( @@ -95,9 +98,11 @@ class Mapper: self, config: PowerBiDashboardSourceConfig, reporter: PowerBiDashboardSourceReport, + dataplatform_instance_resolver: AbstractDataPlatformInstanceResolver, ): self.__config = config self.__reporter = reporter + self.__dataplatform_instance_resolver = dataplatform_instance_resolver @staticmethod def urn_to_lowercase(value: str, flag: bool) -> str: @@ -157,7 +162,9 @@ class Mapper: upstream_tables: List[resolver.DataPlatformTable] = parser.get_upstream_tables( table, self.__reporter, parameters=parameters ) - + logger.debug( + f"PowerBI virtual table {table.full_name} and it's upstream dataplatform tables = {upstream_tables}" + ) for upstream_table in upstream_tables: if ( upstream_table.data_platform_pair.powerbi_data_platform_name @@ -168,27 +175,15 @@ class Mapper: ) continue - platform: Union[str, PlatformDetail] = self.__config.dataset_type_mapping[ - upstream_table.data_platform_pair.powerbi_data_platform_name - ] - - platform_name: str = ( - upstream_table.data_platform_pair.datahub_data_platform_name + platform_detail: PlatformDetail = ( + self.__dataplatform_instance_resolver.get_platform_instance( + upstream_table + ) ) - - platform_instance_name: Optional[str] = None - platform_env: str = DEFAULT_ENV - # Determine if PlatformDetail is provided - if isinstance(platform, PlatformDetail): - platform_instance_name = cast( - PlatformDetail, platform - ).platform_instance - platform_env = cast(PlatformDetail, platform).env - upstream_urn = builder.make_dataset_urn_with_platform_instance( - platform=platform_name, - platform_instance=platform_instance_name, - env=platform_env, + platform=upstream_table.data_platform_pair.datahub_data_platform_name, + platform_instance=platform_detail.platform_instance, + env=platform_detail.env, name=self.lineage_urn_to_lowercase(upstream_table.full_name), ) @@ -200,9 +195,7 @@ class Mapper: if len(upstreams) > 0: upstream_lineage = UpstreamLineageClass(upstreams=upstreams) - logger.debug( - f"DataSet Lineage = {ds_urn} and its lineage = {upstream_lineage}" - ) + logger.debug(f"Dataset urn = {ds_urn} and its lineage = {upstream_lineage}") mcp = MetadataChangeProposalWrapper( entityType=Constant.DATASET, changeType=ChangeTypeClass.UPSERT, @@ -906,6 +899,7 @@ class PowerBiDashboardSource(StatefulIngestionSourceBase): source_config: PowerBiDashboardSourceConfig reporter: PowerBiDashboardSourceReport + dataplatform_instance_resolver: AbstractDataPlatformInstanceResolver accessed_dashboards: int = 0 platform: str = "powerbi" @@ -913,15 +907,19 @@ class PowerBiDashboardSource(StatefulIngestionSourceBase): super(PowerBiDashboardSource, self).__init__(config, ctx) self.source_config = config self.reporter = PowerBiDashboardSourceReport() + self.dataplatform_instance_resolver = create_dataplatform_instance_resolver( + self.source_config + ) try: self.powerbi_client = PowerBiAPI(self.source_config) except Exception as e: logger.warning(e) exit( 1 - ) # Exit pipeline as we are not able to connect to PowerBI API Service. This exit will avoid raising unwanted stacktrace on console + ) # Exit pipeline as we are not able to connect to PowerBI API Service. This exit will avoid raising + # unwanted stacktrace on console - self.mapper = Mapper(config, self.reporter) + self.mapper = Mapper(config, self.reporter, self.dataplatform_instance_resolver) # Create and register the stateful ingestion use-case handler. self.stale_entity_removal_handler = StaleEntityRemovalHandler( @@ -962,6 +960,10 @@ class PowerBiDashboardSource(StatefulIngestionSourceBase): if key not in powerbi_data_platforms: raise ValueError(f"PowerBI DataPlatform {key} is not supported") + logger.debug( + f"Dataset lineage would get ingested for data-platform = {self.source_config.dataset_type_mapping}" + ) + def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: """ Datahub Ingestion framework invoke this method diff --git a/metadata-ingestion/tests/integration/powerbi/golden_test_server_to_platform_instance.json b/metadata-ingestion/tests/integration/powerbi/golden_test_server_to_platform_instance.json new file mode 100644 index 0000000000..0e8cacd748 --- /dev/null +++ b/metadata-ingestion/tests/integration/powerbi/golden_test_server_to_platform_instance.json @@ -0,0 +1,861 @@ +[ +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.public_issue_history,DEV)", + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "json": { + "customProperties": {}, + "name": "public issue_history", + "description": "Library dataset description", + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.public_issue_history,DEV)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.SNOWFLAKE_TESTTABLE,DEV)", + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "json": { + "customProperties": {}, + "name": "SNOWFLAKE_TESTTABLE", + "description": "Library dataset description", + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.SNOWFLAKE_TESTTABLE,DEV)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.SNOWFLAKE_TESTTABLE,DEV)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,snowflake_production_instance.pbi_test.test.testtable,PROD)", + "type": "TRANSFORMED" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.snowflake_native-query,DEV)", + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "json": { + "customProperties": {}, + "name": "snowflake native-query", + "description": "Library dataset description", + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.snowflake_native-query,DEV)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.snowflake_native-query,DEV)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,operations_analytics.transformed_prod.v_aps_sme_units_v4,PROD)", + "type": "TRANSFORMED" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.big-query-with-parameter,DEV)", + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "json": { + "customProperties": {}, + "name": "big-query-with-parameter", + "description": "Library dataset description", + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.big-query-with-parameter,DEV)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.big-query-with-parameter,DEV)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:bigquery,bigquery-computing-dev-account.my-test-project.universal.d_wh_date,QA)", + "type": "TRANSFORMED" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.snowflake_native-query-with-join,DEV)", + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "json": { + "customProperties": {}, + "name": "snowflake native-query-with-join", + "description": "Library dataset description", + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.snowflake_native-query-with-join,DEV)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.snowflake_native-query-with-join,DEV)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,gsl_test_db.public.sales_analyst,PROD)", + "type": "TRANSFORMED" + }, + { + "auditStamp": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,gsl_test_db.public.sales_forecast,PROD)", + "type": "TRANSFORMED" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.job-history,DEV)", + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "json": { + "customProperties": {}, + "name": "job-history", + "description": "Library dataset description", + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.job-history,DEV)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.job-history,DEV)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:oracle,oracle-sales-instance.salesdb.hr.employees,PROD)", + "type": "TRANSFORMED" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.postgres_test_table,DEV)", + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "json": { + "customProperties": {}, + "name": "postgres_test_table", + "description": "Library dataset description", + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.postgres_test_table,DEV)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.postgres_test_table,DEV)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,mics.public.order_date,PROD)", + "type": "TRANSFORMED" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,hr_pbi_test.dbo_book_issue,DEV)", + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "json": { + "customProperties": {}, + "name": "dbo_book_issue", + "description": "hr pbi test description", + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,hr_pbi_test.dbo_book_issue,DEV)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,hr_pbi_test.dbo_book_issue,DEV)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:mssql,library.dbo.book_issue,PROD)", + "type": "TRANSFORMED" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,hr_pbi_test.ms_sql_native_table,DEV)", + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "json": { + "customProperties": {}, + "name": "ms_sql_native_table", + "description": "hr pbi test description", + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,hr_pbi_test.ms_sql_native_table,DEV)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,hr_pbi_test.ms_sql_native_table,DEV)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:mssql,commopsdb.dbo.v_ps_cd_retention,PROD)", + "type": "TRANSFORMED" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "corpuser", + "entityUrn": "urn:li:corpuser:users.User1@foo.com", + "changeType": "UPSERT", + "aspectName": "corpUserKey", + "aspect": { + "json": { + "username": "User1@foo.com" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "corpuser", + "entityUrn": "urn:li:corpuser:users.User2@foo.com", + "changeType": "UPSERT", + "aspectName": "corpUserKey", + "aspect": { + "json": { + "username": "User2@foo.com" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "chart", + "entityUrn": "urn:li:chart:(powerbi,charts.B8E293DC-0C83-4AA0-9BB9-0A8738DF24A0)", + "changeType": "UPSERT", + "aspectName": "chartInfo", + "aspect": { + "json": { + "customProperties": { + "createdFrom": "Dataset", + "datasetId": "05169CD2-E713-41E6-9600-1D8066D95445", + "datasetWebUrl": "http://localhost/groups/64ED5CAD-7C10-4684-8180-826122881108/datasets/05169CD2-E713-41E6-9600-1D8066D95445/details" + }, + "title": "test_tile", + "description": "test_tile", + "lastModified": { + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + } + }, + "inputs": [ + { + "string": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.public_issue_history,DEV)" + }, + { + "string": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.SNOWFLAKE_TESTTABLE,DEV)" + }, + { + "string": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.snowflake_native-query,DEV)" + }, + { + "string": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.big-query-with-parameter,DEV)" + }, + { + "string": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.snowflake_native-query-with-join,DEV)" + }, + { + "string": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.job-history,DEV)" + }, + { + "string": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.postgres_test_table,DEV)" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "chart", + "entityUrn": "urn:li:chart:(powerbi,charts.B8E293DC-0C83-4AA0-9BB9-0A8738DF24A0)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "chart", + "entityUrn": "urn:li:chart:(powerbi,charts.B8E293DC-0C83-4AA0-9BB9-0A8738DF24A0)", + "changeType": "UPSERT", + "aspectName": "chartKey", + "aspect": { + "json": { + "dashboardTool": "powerbi", + "chartId": "powerbi.linkedin.com/charts/B8E293DC-0C83-4AA0-9BB9-0A8738DF24A0" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "chart", + "entityUrn": "urn:li:chart:(powerbi,charts.B8E293DC-0C83-4AA0-9BB9-0A8738DF24A0)", + "changeType": "UPSERT", + "aspectName": "browsePaths", + "aspect": { + "json": { + "paths": [ + "/powerbi/demo-workspace" + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "chart", + "entityUrn": "urn:li:chart:(powerbi,charts.23212598-23b5-4980-87cc-5fc0ecd84385)", + "changeType": "UPSERT", + "aspectName": "chartInfo", + "aspect": { + "json": { + "customProperties": { + "createdFrom": "Dataset", + "datasetId": "ba0130a1-5b03-40de-9535-b34e778ea6ed", + "datasetWebUrl": "http://localhost/groups/64ED5CAD-7C10-4684-8180-826122881108/datasets/ba0130a1-5b03-40de-9535-b34e778ea6ed/details" + }, + "title": "yearly_sales", + "description": "yearly_sales", + "lastModified": { + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + } + }, + "inputs": [ + { + "string": "urn:li:dataset:(urn:li:dataPlatform:powerbi,hr_pbi_test.dbo_book_issue,DEV)" + }, + { + "string": "urn:li:dataset:(urn:li:dataPlatform:powerbi,hr_pbi_test.ms_sql_native_table,DEV)" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "chart", + "entityUrn": "urn:li:chart:(powerbi,charts.23212598-23b5-4980-87cc-5fc0ecd84385)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "chart", + "entityUrn": "urn:li:chart:(powerbi,charts.23212598-23b5-4980-87cc-5fc0ecd84385)", + "changeType": "UPSERT", + "aspectName": "chartKey", + "aspect": { + "json": { + "dashboardTool": "powerbi", + "chartId": "powerbi.linkedin.com/charts/23212598-23b5-4980-87cc-5fc0ecd84385" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "chart", + "entityUrn": "urn:li:chart:(powerbi,charts.23212598-23b5-4980-87cc-5fc0ecd84385)", + "changeType": "UPSERT", + "aspectName": "browsePaths", + "aspect": { + "json": { + "paths": [ + "/powerbi/demo-workspace" + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "dashboard", + "entityUrn": "urn:li:dashboard:(powerbi,dashboards.7D668CAD-7FFC-4505-9215-655BCA5BEBAE)", + "changeType": "UPSERT", + "aspectName": "browsePaths", + "aspect": { + "json": { + "paths": [ + "/powerbi/demo-workspace" + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "dashboard", + "entityUrn": "urn:li:dashboard:(powerbi,dashboards.7D668CAD-7FFC-4505-9215-655BCA5BEBAE)", + "changeType": "UPSERT", + "aspectName": "dashboardInfo", + "aspect": { + "json": { + "customProperties": { + "chartCount": "2", + "workspaceName": "demo-workspace", + "workspaceId": "64ED5CAD-7C10-4684-8180-826122881108" + }, + "title": "test_dashboard", + "description": "Description of test dashboard", + "charts": [ + "urn:li:chart:(powerbi,charts.B8E293DC-0C83-4AA0-9BB9-0A8738DF24A0)", + "urn:li:chart:(powerbi,charts.23212598-23b5-4980-87cc-5fc0ecd84385)" + ], + "datasets": [], + "lastModified": { + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + } + }, + "dashboardUrl": "https://localhost/dashboards/web/1" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "dashboard", + "entityUrn": "urn:li:dashboard:(powerbi,dashboards.7D668CAD-7FFC-4505-9215-655BCA5BEBAE)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "dashboard", + "entityUrn": "urn:li:dashboard:(powerbi,dashboards.7D668CAD-7FFC-4505-9215-655BCA5BEBAE)", + "changeType": "UPSERT", + "aspectName": "dashboardKey", + "aspect": { + "json": { + "dashboardTool": "powerbi", + "dashboardId": "powerbi.linkedin.com/dashboards/7D668CAD-7FFC-4505-9215-655BCA5BEBAE" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "dashboard", + "entityUrn": "urn:li:dashboard:(powerbi,dashboards.7D668CAD-7FFC-4505-9215-655BCA5BEBAE)", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [ + { + "owner": "urn:li:corpuser:users.User1@foo.com", + "type": "NONE" + }, + { + "owner": "urn:li:corpuser:users.User2@foo.com", + "type": "NONE" + } + ], + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + } + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "corpuser", + "entityUrn": "urn:li:corpuser:users.User1@foo.com", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test" + } +}, +{ + "entityType": "corpuser", + "entityUrn": "urn:li:corpuser:users.User2@foo.com", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test" + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/powerbi/test_m_parser.py b/metadata-ingestion/tests/integration/powerbi/test_m_parser.py index 9fc4a31656..dd67f3479e 100644 --- a/metadata-ingestion/tests/integration/powerbi/test_m_parser.py +++ b/metadata-ingestion/tests/integration/powerbi/test_m_parser.py @@ -14,23 +14,23 @@ from datahub.ingestion.source.powerbi.m_query.resolver import ( ) M_QUERIES = [ - 'let\n Source = Snowflake.Databases("bu20658.ap-southeast-2.snowflakecomputing.com","PBI_TEST_WAREHOUSE_PROD",[Role="PBI_TEST_MEMBER"]),\n PBI_TEST_Database = Source{[Name="PBI_TEST",Kind="Database"]}[Data],\n TEST_Schema = PBI_TEST_Database{[Name="TEST",Kind="Schema"]}[Data],\n TESTTABLE_Table = TEST_Schema{[Name="TESTTABLE",Kind="Table"]}[Data]\nin\n TESTTABLE_Table', - 'let\n Source = Value.NativeQuery(Snowflake.Databases("bu20658.ap-southeast-2.snowflakecomputing.com","operations_analytics_warehouse_prod",[Role="OPERATIONS_ANALYTICS_MEMBER"]){[Name="OPERATIONS_ANALYTICS"]}[Data], "SELECT#(lf)concat((UPPER(REPLACE(SELLER,\'-\',\'\'))), MONTHID) as AGENT_KEY,#(lf)concat((UPPER(REPLACE(CLIENT_DIRECTOR,\'-\',\'\'))), MONTHID) as CD_AGENT_KEY,#(lf) *#(lf)FROM#(lf)OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_APS_SME_UNITS_V4", null, [EnableFolding=true]),\n #"ADDed Conditional Column" = Table.AddColumn(Source, "SME Units ENT", each if [DEAL_TYPE] = "SME Unit" then [UNIT] else 0),\n #"Added Conditional Column1" = Table.AddColumn(#"Added Conditional Column", "Banklink Units", each if [DEAL_TYPE] = "Banklink" then [UNIT] else 0),\n #"Removed Columns" = Table.RemoveColumns(#"Added Conditional Column1",{"Banklink Units"}),\n #"Added Custom" = Table.AddColumn(#"Removed Columns", "Banklink Units", each if [DEAL_TYPE] = "Banklink" and [SALES_TYPE] = "3 - Upsell"\nthen [UNIT]\n\nelse if [SALES_TYPE] = "Adjusted BL Migration"\nthen [UNIT]\n\nelse 0),\n #"Added Custom1" = Table.AddColumn(#"Added Custom", "SME Units in $ (*$361)", each if [DEAL_TYPE] = "SME Unit" \nand [SALES_TYPE] <> "4 - Renewal"\n then [UNIT] * 361\nelse 0),\n #"Added Custom2" = Table.AddColumn(#"Added Custom1", "Banklink in $ (*$148)", each [Banklink Units] * 148)\nin\n #"Added Custom2"', - 'let\n Source = Value.NativeQuery(Snowflake.Databases("bu20 658.ap-southeast-2.snowflakecomputing.com","operations_analytics_warehouse_prod",[Role="OPERATIONS_ANALYTICS_MEMBER"]){[Name="OPERATIONS_ANALYTICS"]}[Data], "select #(lf)UPPER(REPLACE(AGENT_NAME,\'-\',\'\')) AS Agent,#(lf)TIER,#(lf)UPPER(MANAGER),#(lf)TEAM_TYPE,#(lf)DATE_TARGET,#(lf)MONTHID,#(lf)TARGET_TEAM,#(lf)SELLER_EMAIL,#(lf)concat((UPPER(REPLACE(AGENT_NAME,\'-\',\'\'))), MONTHID) as AGENT_KEY,#(lf)UNIT_TARGET AS SME_Quota,#(lf)AMV_TARGET AS Revenue_Quota,#(lf)SERVICE_QUOTA,#(lf)BL_TARGET,#(lf)SOFTWARE_QUOTA as Software_Quota#(lf)#(lf)from OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_SME_UNIT_TARGETS#(lf)#(lf)where YEAR_TARGET >= 2022#(lf)and TEAM_TYPE = \'Accounting\'#(lf)and TARGET_TEAM = \'Enterprise\'", null, [EnableFolding=true]),\n #"Added Conditional Column" = Table.AddColumn(Source, "Has PS Software Quota?", each if [TIER] = "Expansion (Medium)" then "Yes" else if [TIER] = "Acquisition" then "Yes" else "No")\nin\n #"Added Conditional Column"', + 'let\n Source = Snowflake.Databases("bu10758.ap-unknown-2.fakecomputing.com","PBI_TEST_WAREHOUSE_PROD",[Role="PBI_TEST_MEMBER"]),\n PBI_TEST_Database = Source{[Name="PBI_TEST",Kind="Database"]}[Data],\n TEST_Schema = PBI_TEST_Database{[Name="TEST",Kind="Schema"]}[Data],\n TESTTABLE_Table = TEST_Schema{[Name="TESTTABLE",Kind="Table"]}[Data]\nin\n TESTTABLE_Table', + 'let\n Source = Value.NativeQuery(Snowflake.Databases("bu10758.ap-unknown-2.fakecomputing.com","operations_analytics_warehouse_prod",[Role="OPERATIONS_ANALYTICS_MEMBER"]){[Name="OPERATIONS_ANALYTICS"]}[Data], "SELECT#(lf)concat((UPPER(REPLACE(SELLER,\'-\',\'\'))), MONTHID) as AGENT_KEY,#(lf)concat((UPPER(REPLACE(CLIENT_DIRECTOR,\'-\',\'\'))), MONTHID) as CD_AGENT_KEY,#(lf) *#(lf)FROM#(lf)OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_APS_SME_UNITS_V4", null, [EnableFolding=true]),\n #"ADDed Conditional Column" = Table.AddColumn(Source, "SME Units ENT", each if [DEAL_TYPE] = "SME Unit" then [UNIT] else 0),\n #"Added Conditional Column1" = Table.AddColumn(#"Added Conditional Column", "Banklink Units", each if [DEAL_TYPE] = "Banklink" then [UNIT] else 0),\n #"Removed Columns" = Table.RemoveColumns(#"Added Conditional Column1",{"Banklink Units"}),\n #"Added Custom" = Table.AddColumn(#"Removed Columns", "Banklink Units", each if [DEAL_TYPE] = "Banklink" and [SALES_TYPE] = "3 - Upsell"\nthen [UNIT]\n\nelse if [SALES_TYPE] = "Adjusted BL Migration"\nthen [UNIT]\n\nelse 0),\n #"Added Custom1" = Table.AddColumn(#"Added Custom", "SME Units in $ (*$361)", each if [DEAL_TYPE] = "SME Unit" \nand [SALES_TYPE] <> "4 - Renewal"\n then [UNIT] * 361\nelse 0),\n #"Added Custom2" = Table.AddColumn(#"Added Custom1", "Banklink in $ (*$148)", each [Banklink Units] * 148)\nin\n #"Added Custom2"', + 'let\n Source = Value.NativeQuery(Snowflake.Databases("bu10758.ap-unknown-2.fakecomputing.com","operations_analytics_warehouse_prod",[Role="OPERATIONS_ANALYTICS_MEMBER"]){[Name="OPERATIONS_ANALYTICS"]}[Data], "select #(lf)UPPER(REPLACE(AGENT_NAME,\'-\',\'\')) AS Agent,#(lf)TIER,#(lf)UPPER(MANAGER),#(lf)TEAM_TYPE,#(lf)DATE_TARGET,#(lf)MONTHID,#(lf)TARGET_TEAM,#(lf)SELLER_EMAIL,#(lf)concat((UPPER(REPLACE(AGENT_NAME,\'-\',\'\'))), MONTHID) as AGENT_KEY,#(lf)UNIT_TARGET AS SME_Quota,#(lf)AMV_TARGET AS Revenue_Quota,#(lf)SERVICE_QUOTA,#(lf)BL_TARGET,#(lf)SOFTWARE_QUOTA as Software_Quota#(lf)#(lf)from OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_SME_UNIT_TARGETS#(lf)#(lf)where YEAR_TARGET >= 2022#(lf)and TEAM_TYPE = \'Accounting\'#(lf)and TARGET_TEAM = \'Enterprise\'", null, [EnableFolding=true]),\n #"Added Conditional Column" = Table.AddColumn(Source, "Has PS Software Quota?", each if [TIER] = "Expansion (Medium)" then "Yes" else if [TIER] = "Acquisition" then "Yes" else "No")\nin\n #"Added Conditional Column"', 'let\n Source = Sql.Database("AUPRDWHDB", "COMMOPSDB", [Query="select *#(lf),concat((UPPER(REPLACE(CLIENT_MANAGER_QUOTED,\'-\',\'\'))), MONTHID) as AGENT_KEY#(lf),concat((UPPER(REPLACE(CLIENT_DIRECTOR,\'-\',\'\'))), MONTHID) as CD_AGENT_KEY#(lf)#(lf)from V_OIP_ENT_2022"]),\n #"Added Custom" = Table.AddColumn(Source, "OIP in $(*$350)", each [SALES_INVOICE_AMOUNT] * 350),\n #"Changed Type" = Table.TransformColumnTypes(#"Added Custom",{{"OIP in $(*$350)", type number}})\nin\n #"Changed Type"', 'let\n Source = Sql.Database("AUPRDWHDB", "COMMOPSDB", [Query="Select *,#(lf)#(lf)concat((UPPER(REPLACE(CLIENT_MANAGER_QUOTED,\'-\',\'\'))), #(lf)LEFT(CAST(DTE AS DATE),4)+LEFT(RIGHT(CAST(DTE AS DATE),5),2)) AS AGENT_KEY,#(lf)concat((UPPER(REPLACE(CLIENT_DIRECTOR,\'-\',\'\'))), #(lf)LEFT(CAST(DTE AS DATE),4)+LEFT(RIGHT(CAST(DTE AS DATE),5),2)) AS CD_AGENT_KEY#(lf)#(lf)from V_INVOICE_BOOKING_2022"]),\n #"Changed Type" = Table.TransformColumnTypes(Source,{{"CLIENT_ID", Int64.Type}}),\n #"Added Conditional Column" = Table.AddColumn(#"Changed Type", "PS Software (One-Off)", each if Text.Contains([REVENUE_TYPE], "Software") then [Inv_Amt] else if Text.Contains([REVENUE_TYPE], "Tax Seminar") then [Inv_Amt] else 0),\n #"Filtered Rows" = Table.SelectRows(#"Added Conditional Column", each true),\n #"Duplicated Column" = Table.DuplicateColumn(#"Filtered Rows", "CLIENT_ID", "CLIENT_ID - Copy"),\n #"Changed Type1" = Table.TransformColumnTypes(#"Duplicated Column",{{"CLIENT_ID - Copy", type text}}),\n #"Renamed Columns" = Table.RenameColumns(#"Changed Type1",{{"CLIENT_ID - Copy", "CLIENT_ID for Filter"}})\nin\n #"Renamed Columns"', 'let\n Source = Sql.Database("AUPRDWHDB", "COMMOPSDB", [Query="SELECT *,#(lf)concat((UPPER(REPLACE(CLIENT_MANAGER_CLOSING_MONTH,\'-\',\'\'))), #(lf)LEFT(CAST(MONTH_DATE AS DATE),4)+LEFT(RIGHT(CAST(MONTH_DATE AS DATE),5),2)) AS AGENT_KEY#(lf)#(lf)FROM dbo.V_ARR_ADDS"]),\n #"Changed Type" = Table.TransformColumnTypes(Source,{{"MONTH_DATE", type date}}),\n #"Added Custom" = Table.AddColumn(#"Changed Type", "Month", each Date.Month([MONTH_DATE]))\nin\n #"Added Custom"', - "let\n Source = Value.NativeQuery(Snowflake.Databases(\"bu20658.ap-southeast-2.snowflakecomputing.com\",\"operations_analytics_warehouse_prod\",[Role=\"OPERATIONS_ANALYTICS_MEMBER\"]){[Name=\"OPERATIONS_ANALYTICS\"]}[Data], \"select #(lf)UPPER(REPLACE(AGENT_NAME,'-','')) AS CLIENT_DIRECTOR,#(lf)TIER,#(lf)UPPER(MANAGER),#(lf)TEAM_TYPE,#(lf)DATE_TARGET,#(lf)MONTHID,#(lf)TARGET_TEAM,#(lf)SELLER_EMAIL,#(lf)concat((UPPER(REPLACE(AGENT_NAME,'-',''))), MONTHID) as AGENT_KEY,#(lf)UNIT_TARGET AS SME_Quota,#(lf)AMV_TARGET AS Revenue_Quota,#(lf)SERVICE_QUOTA,#(lf)BL_TARGET,#(lf)SOFTWARE_QUOTA as Software_Quota#(lf)#(lf)from OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_SME_UNIT_TARGETS#(lf)#(lf)where YEAR_TARGET >= 2022#(lf)and TEAM_TYPE = 'Accounting'#(lf)and TARGET_TEAM = 'Enterprise'#(lf)AND TIER = 'Client Director'\", null, [EnableFolding=true])\nin\n Source", + "let\n Source = Value.NativeQuery(Snowflake.Databases(\"bu10758.ap-unknown-2.fakecomputing.com\",\"operations_analytics_warehouse_prod\",[Role=\"OPERATIONS_ANALYTICS_MEMBER\"]){[Name=\"OPERATIONS_ANALYTICS\"]}[Data], \"select #(lf)UPPER(REPLACE(AGENT_NAME,'-','')) AS CLIENT_DIRECTOR,#(lf)TIER,#(lf)UPPER(MANAGER),#(lf)TEAM_TYPE,#(lf)DATE_TARGET,#(lf)MONTHID,#(lf)TARGET_TEAM,#(lf)SELLER_EMAIL,#(lf)concat((UPPER(REPLACE(AGENT_NAME,'-',''))), MONTHID) as AGENT_KEY,#(lf)UNIT_TARGET AS SME_Quota,#(lf)AMV_TARGET AS Revenue_Quota,#(lf)SERVICE_QUOTA,#(lf)BL_TARGET,#(lf)SOFTWARE_QUOTA as Software_Quota#(lf)#(lf)from OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_SME_UNIT_TARGETS#(lf)#(lf)where YEAR_TARGET >= 2022#(lf)and TEAM_TYPE = 'Accounting'#(lf)and TARGET_TEAM = 'Enterprise'#(lf)AND TIER = 'Client Director'\", null, [EnableFolding=true])\nin\n Source", 'let\n Source = Sql.Database("AUPRDWHDB", "COMMOPSDB", [Query="select *,#(lf)concat((UPPER(REPLACE(CLIENT_DIRECTOR,\'-\',\'\'))), MONTH_WID) as CD_AGENT_KEY,#(lf)concat((UPPER(REPLACE(CLIENT_MANAGER_CLOSING_MONTH,\'-\',\'\'))), MONTH_WID) as AGENT_KEY#(lf)#(lf)from V_PS_CD_RETENTION", CommandTimeout=#duration(0, 1, 30, 0)]),\n #"Changed Type" = Table.TransformColumnTypes(Source,{{"mth_date", type date}}),\n #"Added Custom" = Table.AddColumn(#"Changed Type", "Month", each Date.Month([mth_date])),\n #"Added Custom1" = Table.AddColumn(#"Added Custom", "TPV Opening", each if [Month] = 1 then [TPV_AMV_OPENING]\nelse if [Month] = 2 then 0\nelse if [Month] = 3 then 0\nelse if [Month] = 4 then [TPV_AMV_OPENING]\nelse if [Month] = 5 then 0\nelse if [Month] = 6 then 0\nelse if [Month] = 7 then [TPV_AMV_OPENING]\nelse if [Month] = 8 then 0\nelse if [Month] = 9 then 0\nelse if [Month] = 10 then [TPV_AMV_OPENING]\nelse if [Month] = 11 then 0\nelse if [Month] = 12 then 0\n\nelse 0)\nin\n #"Added Custom1"', 'let\n Source = Sql.Database("AUPRDWHDB", "COMMOPSDB", [Query="select#(lf)CLIENT_ID,#(lf)PARTNER_ACCOUNT_NAME,#(lf)CM_CLOSING_MNTH_COUNTRY,#(lf)MONTH_WID,#(lf)PS_DELETES,#(lf)CLIENT_MANAGER_CLOSING_MONTH,#(lf)SME_DELETES,#(lf)TPV_AMV_OPENING,#(lf)concat((UPPER(REPLACE(CLIENT_MANAGER_CLOSING_MONTH,\'-\',\'\'))), MONTH_WID) as AGENT_KEY#(lf)#(lf)from V_TPV_LEADERBOARD", CommandTimeout=#duration(0, 1, 30, 0)]),\n #"Changed Type" = Table.TransformColumnTypes(Source,{{"MONTH_WID", type text}}),\n #"Added Custom" = Table.AddColumn(#"Changed Type", "MONTH_DATE", each Date.FromText(\nText.Range([MONTH_WID], 0,4) & "-" &\nText.Range([MONTH_WID], 4,2)\n)),\n #"Added Custom2" = Table.AddColumn(#"Added Custom", "Month", each Date.Month([MONTH_DATE])),\n #"Added Custom1" = Table.AddColumn(#"Added Custom2", "TPV Opening", each if [Month] = 1 then [TPV_AMV_OPENING]\nelse if [Month] = 2 then 0\nelse if [Month] = 3 then 0\nelse if [Month] = 4 then [TPV_AMV_OPENING]\nelse if [Month] = 5 then 0\nelse if [Month] = 6 then 0\nelse if [Month] = 7 then [TPV_AMV_OPENING]\nelse if [Month] = 8 then 0\nelse if [Month] = 9 then 0\nelse if [Month] = 10 then [TPV_AMV_OPENING]\nelse if [Month] = 11 then 0\nelse if [Month] = 12 then 0\n\nelse 0)\nin\n #"Added Custom1"', - 'let\n Source = Snowflake.Databases("bu20658.ap-southeast-2.snowflakecomputing.com","OPERATIONS_ANALYTICS_WAREHOUSE_PROD",[Role="OPERATIONS_ANALYTICS_MEMBER_AD"]),\n OPERATIONS_ANALYTICS_Database = Source{[Name="OPERATIONS_ANALYTICS",Kind="Database"]}[Data],\n TEST_Schema = OPERATIONS_ANALYTICS_Database{[Name="TEST",Kind="Schema"]}[Data],\n LZ_MIGRATION_DOWNLOAD_View = TEST_Schema{[Name="LZ_MIGRATION_DOWNLOAD",Kind="View"]}[Data],\n #"Changed Type" = Table.TransformColumnTypes(LZ_MIGRATION_DOWNLOAD_View,{{"MIGRATION_MONTH_ID", type text}}),\n #"Added Custom" = Table.AddColumn(#"Changed Type", "Migration Month", each Date.FromText(\nText.Range([MIGRATION_MONTH_ID], 0,4) & "-" & \nText.Range([MIGRATION_MONTH_ID], 4,2) \n)),\n #"Changed Type1" = Table.TransformColumnTypes(#"Added Custom",{{"Migration Month", type date}})\nin\n #"Changed Type1"', - "let\n Source = Value.NativeQuery(Snowflake.Databases(\"bu20658.ap-southeast-2.snowflakecomputing.com\",\"operations_analytics_warehouse_prod\",[Role=\"OPERATIONS_ANALYTICS_MEMBER\"]){[Name=\"OPERATIONS_ANALYTICS\"]}[Data], \"select *,#(lf)UPPER(REPLACE(AGENT_NAME,'-','')) AS Agent,#(lf)concat((UPPER(REPLACE(AGENT_NAME,'-',''))), MONTHID) as AGENT_KEY#(lf)#(lf)from OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_SME_UNIT_TARGETS#(lf)#(lf)where YEAR_TARGET >= 2022#(lf)and TEAM_TYPE = 'Industries'#(lf)and TARGET_TEAM = 'Enterprise'\", null, [EnableFolding=true])\nin\n Source", + 'let\n Source = Snowflake.Databases("bu10758.ap-unknown-2.fakecomputing.com","OPERATIONS_ANALYTICS_WAREHOUSE_PROD",[Role="OPERATIONS_ANALYTICS_MEMBER_AD"]),\n OPERATIONS_ANALYTICS_Database = Source{[Name="OPERATIONS_ANALYTICS",Kind="Database"]}[Data],\n TEST_Schema = OPERATIONS_ANALYTICS_Database{[Name="TEST",Kind="Schema"]}[Data],\n LZ_MIGRATION_DOWNLOAD_View = TEST_Schema{[Name="LZ_MIGRATION_DOWNLOAD",Kind="View"]}[Data],\n #"Changed Type" = Table.TransformColumnTypes(LZ_MIGRATION_DOWNLOAD_View,{{"MIGRATION_MONTH_ID", type text}}),\n #"Added Custom" = Table.AddColumn(#"Changed Type", "Migration Month", each Date.FromText(\nText.Range([MIGRATION_MONTH_ID], 0,4) & "-" & \nText.Range([MIGRATION_MONTH_ID], 4,2) \n)),\n #"Changed Type1" = Table.TransformColumnTypes(#"Added Custom",{{"Migration Month", type date}})\nin\n #"Changed Type1"', + "let\n Source = Value.NativeQuery(Snowflake.Databases(\"bu10758.ap-unknown-2.fakecomputing.com\",\"operations_analytics_warehouse_prod\",[Role=\"OPERATIONS_ANALYTICS_MEMBER\"]){[Name=\"OPERATIONS_ANALYTICS\"]}[Data], \"select *,#(lf)UPPER(REPLACE(AGENT_NAME,'-','')) AS Agent,#(lf)concat((UPPER(REPLACE(AGENT_NAME,'-',''))), MONTHID) as AGENT_KEY#(lf)#(lf)from OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_SME_UNIT_TARGETS#(lf)#(lf)where YEAR_TARGET >= 2022#(lf)and TEAM_TYPE = 'Industries'#(lf)and TARGET_TEAM = 'Enterprise'\", null, [EnableFolding=true])\nin\n Source", 'let\n Source = Sql.Database("AUPRDWHDB", "COMMOPSDB", [Query="Select#(lf)*,#(lf)concat((UPPER(REPLACE(SALES_SPECIALIST,\'-\',\'\'))),#(lf)LEFT(CAST(INVOICE_DATE AS DATE),4)+LEFT(RIGHT(CAST(INVOICE_DATE AS DATE),5),2)) AS AGENT_KEY,#(lf)CASE#(lf) WHEN CLASS = \'Software\' and (NOT(PRODUCT in (\'ADV\', \'Adv\') and left(ACCOUNT_ID,2)=\'10\') #(lf) or V_ENTERPRISE_INVOICED_REVENUE.TYPE = \'Manual Adjustment\') THEN INVOICE_AMOUNT#(lf) WHEN V_ENTERPRISE_INVOICED_REVENUE.TYPE IN (\'Recurring\',\'0\') THEN INVOICE_AMOUNT#(lf) ELSE 0#(lf)END as SOFTWARE_INV#(lf)#(lf)from V_ENTERPRISE_INVOICED_REVENUE", CommandTimeout=#duration(0, 1, 30, 0)]),\n #"Added Conditional Column" = Table.AddColumn(Source, "Services", each if [CLASS] = "Services" then [INVOICE_AMOUNT] else 0),\n #"Added Custom" = Table.AddColumn(#"Added Conditional Column", "Advanced New Sites", each if [PRODUCT] = "ADV"\nor [PRODUCT] = "Adv"\nthen [NEW_SITE]\nelse 0)\nin\n #"Added Custom"', - 'let\n Source = Snowflake.Databases("xaa48144.snowflakecomputing.com","GSL_TEST_WH",[Role="ACCOUNTADMIN"]),\n Source2 = PostgreSQL.Database("localhost", "mics"),\n public_order_date = Source2{[Schema="public",Item="order_date"]}[Data],\n GSL_TEST_DB_Database = Source{[Name="GSL_TEST_DB",Kind="Database"]}[Data],\n PUBLIC_Schema = GSL_TEST_DB_Database{[Name="PUBLIC",Kind="Schema"]}[Data],\n SALES_ANALYST_VIEW_View = PUBLIC_Schema{[Name="SALES_ANALYST_VIEW",Kind="View"]}[Data],\n two_source_table = Table.Combine({public_order_date, SALES_ANALYST_VIEW_View})\n in\n two_source_table', + 'let\n Source = Snowflake.Databases("ghh48144.snowflakefakecomputing.com","GSL_TEST_WH",[Role="ACCOUNTADMIN"]),\n Source2 = PostgreSQL.Database("localhost", "mics"),\n public_order_date = Source2{[Schema="public",Item="order_date"]}[Data],\n GSL_TEST_DB_Database = Source{[Name="GSL_TEST_DB",Kind="Database"]}[Data],\n PUBLIC_Schema = GSL_TEST_DB_Database{[Name="PUBLIC",Kind="Schema"]}[Data],\n SALES_ANALYST_VIEW_View = PUBLIC_Schema{[Name="SALES_ANALYST_VIEW",Kind="View"]}[Data],\n two_source_table = Table.Combine({public_order_date, SALES_ANALYST_VIEW_View})\n in\n two_source_table', 'let\n Source = PostgreSQL.Database("localhost" , "mics" ),\n public_order_date = Source{[Schema="public",Item="order_date"]}[Data] \n in \n public_order_date', 'let\n Source = Oracle.Database("localhost:1521/salesdb.GSLAB.COM", [HierarchicalNavigation=true]), HR = Source{[Schema="HR"]}[Data], EMPLOYEES1 = HR{[Name="EMPLOYEES"]}[Data] \n in EMPLOYEES1', 'let\n Source = Sql.Database("localhost", "library"),\n dbo_book_issue = Source{[Schema="dbo",Item="book_issue"]}[Data]\n in dbo_book_issue', - 'let\n Source = Snowflake.Databases("xaa48144.snowflakecomputing.com","GSL_TEST_WH",[Role="ACCOUNTADMIN"]),\n GSL_TEST_DB_Database = Source{[Name="GSL_TEST_DB",Kind="Database"]}[Data],\n PUBLIC_Schema = GSL_TEST_DB_Database{[Name="PUBLIC",Kind="Schema"]}[Data],\n SALES_FORECAST_Table = PUBLIC_Schema{[Name="SALES_FORECAST",Kind="Table"]}[Data],\n SALES_ANALYST_Table = PUBLIC_Schema{[Name="SALES_ANALYST",Kind="Table"]}[Data],\n RESULT = Table.Combine({SALES_FORECAST_Table, SALES_ANALYST_Table})\n\nin\n RESULT', + 'let\n Source = Snowflake.Databases("ghh48144.snowflakefakecomputing.com","GSL_TEST_WH",[Role="ACCOUNTADMIN"]),\n GSL_TEST_DB_Database = Source{[Name="GSL_TEST_DB",Kind="Database"]}[Data],\n PUBLIC_Schema = GSL_TEST_DB_Database{[Name="PUBLIC",Kind="Schema"]}[Data],\n SALES_FORECAST_Table = PUBLIC_Schema{[Name="SALES_FORECAST",Kind="Table"]}[Data],\n SALES_ANALYST_Table = PUBLIC_Schema{[Name="SALES_ANALYST",Kind="Table"]}[Data],\n RESULT = Table.Combine({SALES_FORECAST_Table, SALES_ANALYST_Table})\n\nin\n RESULT', 'let\n Source = GoogleBigQuery.Database(),\n #"seraphic-music-344307" = Source{[Name="seraphic-music-344307"]}[Data],\n school_dataset_Schema = #"seraphic-music-344307"{[Name="school_dataset",Kind="Schema"]}[Data],\n first_Table = school_dataset_Schema{[Name="first",Kind="Table"]}[Data]\nin\n first_Table', 'let \nSource = GoogleBigQuery.Database([BillingProject = #"Parameter - Source"]),\n#"gcp-project" = Source{[Name=#"Parameter - Source"]}[Data],\ngcp_billing_Schema = #"gcp-project"{[Name=#"My bq project",Kind="Schema"]}[Data],\nF_GCP_COST_Table = gcp_billing_Schema{[Name="GCP_TABLE",Kind="Table"]}[Data]\nin\nF_GCP_COST_Table', 'let\n Source = GoogleBigQuery.Database([BillingProject = #"Parameter - Source"]),\n#"gcp-project" = Source{[Name=#"Parameter - Source"]}[Data],\nuniversal_Schema = #"gcp-project"{[Name="universal",Kind="Schema"]}[Data],\nD_WH_DATE_Table = universal_Schema{[Name="D_WH_DATE",Kind="Table"]}[Data],\n#"Filtered Rows" = Table.SelectRows(D_WH_DATE_Table, each [D_DATE] > #datetime(2019, 9, 10, 0, 0, 0)),\n#"Filtered Rows1" = Table.SelectRows(#"Filtered Rows", each DateTime.IsInPreviousNHours([D_DATE], 87600))\n in \n#"Filtered Rows1"', @@ -149,6 +149,10 @@ def test_snowflake_regular_case(): assert len(data_platform_tables) == 1 assert data_platform_tables[0].name == "TESTTABLE" assert data_platform_tables[0].full_name == "PBI_TEST.TEST.TESTTABLE" + assert ( + data_platform_tables[0].datasource_server + == "bu10758.ap-unknown-2.fakecomputing.com" + ) assert ( data_platform_tables[0].data_platform_pair.powerbi_data_platform_name == SupportedDataPlatform.SNOWFLAKE.value.powerbi_data_platform_name @@ -172,6 +176,7 @@ def test_postgres_regular_case(): assert len(data_platform_tables) == 1 assert data_platform_tables[0].name == "order_date" assert data_platform_tables[0].full_name == "mics.public.order_date" + assert data_platform_tables[0].datasource_server == "localhost" assert ( data_platform_tables[0].data_platform_pair.powerbi_data_platform_name == SupportedDataPlatform.POSTGRES_SQL.value.powerbi_data_platform_name @@ -195,6 +200,7 @@ def test_oracle_regular_case(): assert len(data_platform_tables) == 1 assert data_platform_tables[0].name == "EMPLOYEES" assert data_platform_tables[0].full_name == "salesdb.HR.EMPLOYEES" + assert data_platform_tables[0].datasource_server == "localhost:1521" assert ( data_platform_tables[0].data_platform_pair.powerbi_data_platform_name == SupportedDataPlatform.ORACLE.value.powerbi_data_platform_name @@ -219,6 +225,7 @@ def test_mssql_regular_case(): assert len(data_platform_tables) == 1 assert data_platform_tables[0].name == "book_issue" assert data_platform_tables[0].full_name == "library.dbo.book_issue" + assert data_platform_tables[0].datasource_server == "localhost" assert ( data_platform_tables[0].data_platform_pair.powerbi_data_platform_name == SupportedDataPlatform.MS_SQL.value.powerbi_data_platform_name @@ -259,6 +266,7 @@ def test_mssql_with_query(): assert len(data_platform_tables) == 1 assert data_platform_tables[0].name == expected_tables[index].split(".")[2] assert data_platform_tables[0].full_name == expected_tables[index] + assert data_platform_tables[0].datasource_server == "AUPRDWHDB" assert ( data_platform_tables[0].data_platform_pair.powerbi_data_platform_name == SupportedDataPlatform.MS_SQL.value.powerbi_data_platform_name @@ -296,6 +304,10 @@ def test_snowflake_native_query(): assert len(data_platform_tables) == 1 assert data_platform_tables[0].name == expected_tables[index].split(".")[2] assert data_platform_tables[0].full_name == expected_tables[index] + assert ( + data_platform_tables[0].datasource_server + == "bu10758.ap-unknown-2.fakecomputing.com" + ) assert ( data_platform_tables[0].data_platform_pair.powerbi_data_platform_name == SupportedDataPlatform.SNOWFLAKE.value.powerbi_data_platform_name @@ -316,6 +328,7 @@ def test_google_bigquery_1(): assert len(data_platform_tables) == 1 assert data_platform_tables[0].name == table.full_name.split(".")[2] assert data_platform_tables[0].full_name == table.full_name + assert data_platform_tables[0].datasource_server == "seraphic-music-344307" assert ( data_platform_tables[0].data_platform_pair.powerbi_data_platform_name == SupportedDataPlatform.GOOGLE_BIGQUERY.value.powerbi_data_platform_name @@ -346,6 +359,7 @@ def test_google_bigquery_2(): assert len(data_platform_tables) == 1 assert data_platform_tables[0].name == table.full_name.split(".")[2] assert data_platform_tables[0].full_name == table.full_name + assert data_platform_tables[0].datasource_server == "my-test-project" assert ( data_platform_tables[0].data_platform_pair.powerbi_data_platform_name == SupportedDataPlatform.GOOGLE_BIGQUERY.value.powerbi_data_platform_name @@ -373,6 +387,7 @@ def test_for_each_expression_1(): assert len(data_platform_tables) == 1 assert data_platform_tables[0].name == table.full_name.split(".")[2] + assert data_platform_tables[0].datasource_server == "my-test-project" assert data_platform_tables[0].full_name == table.full_name assert ( data_platform_tables[0].data_platform_pair.powerbi_data_platform_name @@ -401,6 +416,7 @@ def test_for_each_expression_2(): assert len(data_platform_tables) == 1 assert data_platform_tables[0].name == table.full_name.split(".")[2] assert data_platform_tables[0].full_name == table.full_name + assert data_platform_tables[0].datasource_server == "dwh-prod" assert ( data_platform_tables[0].data_platform_pair.powerbi_data_platform_name == SupportedDataPlatform.GOOGLE_BIGQUERY.value.powerbi_data_platform_name @@ -439,12 +455,17 @@ def test_multi_source_table(): assert len(data_platform_tables) == 2 assert data_platform_tables[0].full_name == "mics.public.order_date" + assert data_platform_tables[0].datasource_server == "localhost" assert ( data_platform_tables[0].data_platform_pair.powerbi_data_platform_name == SupportedDataPlatform.POSTGRES_SQL.value.powerbi_data_platform_name ) assert data_platform_tables[1].full_name == "GSL_TEST_DB.PUBLIC.SALES_ANALYST_VIEW" + assert ( + data_platform_tables[1].datasource_server + == "ghh48144.snowflakefakecomputing.com" + ) assert ( data_platform_tables[1].data_platform_pair.powerbi_data_platform_name == SupportedDataPlatform.SNOWFLAKE.value.powerbi_data_platform_name @@ -467,12 +488,20 @@ def test_table_combine(): assert len(data_platform_tables) == 2 assert data_platform_tables[0].full_name == "GSL_TEST_DB.PUBLIC.SALES_FORECAST" + assert ( + data_platform_tables[0].datasource_server + == "ghh48144.snowflakefakecomputing.com" + ) assert ( data_platform_tables[0].data_platform_pair.powerbi_data_platform_name == SupportedDataPlatform.SNOWFLAKE.value.powerbi_data_platform_name ) assert data_platform_tables[1].full_name == "GSL_TEST_DB.PUBLIC.SALES_ANALYST" + assert ( + data_platform_tables[1].datasource_server + == "ghh48144.snowflakefakecomputing.com" + ) assert ( data_platform_tables[1].data_platform_pair.powerbi_data_platform_name == SupportedDataPlatform.SNOWFLAKE.value.powerbi_data_platform_name diff --git a/metadata-ingestion/tests/integration/powerbi/test_powerbi.py b/metadata-ingestion/tests/integration/powerbi/test_powerbi.py index 0361206d27..8cfd6f21de 100644 --- a/metadata-ingestion/tests/integration/powerbi/test_powerbi.py +++ b/metadata-ingestion/tests/integration/powerbi/test_powerbi.py @@ -1,12 +1,17 @@ import logging import sys -from typing import Any, Dict +from typing import Any, Dict, cast from unittest import mock import pytest from freezegun import freeze_time from datahub.ingestion.run.pipeline import Pipeline +from datahub.ingestion.source.powerbi.config import ( + PowerBiDashboardSourceConfig, + SupportedDataPlatform, +) +from datahub.ingestion.source.powerbi.powerbi import PowerBiDashboardSource from tests.test_helpers import mce_helpers FROZEN_TIME = "2022-02-03 07:00:00" @@ -1022,3 +1027,164 @@ def test_workspace_container( output_path=tmp_path / "powerbi_container_mces.json", golden_path=f"{test_resources_dir}/{mce_out_file}", ) + + +def dataset_type_mapping_set_to_all_platform(pipeline: Pipeline) -> None: + source_config: PowerBiDashboardSourceConfig = cast( + PowerBiDashboardSource, pipeline.source + ).source_config + + assert source_config.dataset_type_mapping is not None + + # Generate default dataset_type_mapping and compare it with source_config.dataset_type_mapping + default_dataset_type_mapping: dict = {} + for item in SupportedDataPlatform: + default_dataset_type_mapping[ + item.value.powerbi_data_platform_name + ] = item.value.datahub_data_platform_name + + assert default_dataset_type_mapping == source_config.dataset_type_mapping + + +@freeze_time(FROZEN_TIME) +@mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) +@pytest.mark.integration +def test_dataset_type_mapping_should_set_to_all( + mock_msal, pytestconfig, tmp_path, mock_time, requests_mock +): + """ + Here we don't need to run the pipeline. We need to verify dataset_type_mapping is set to default dataplatform + """ + register_mock_api(request_mock=requests_mock) + + new_config: dict = {**default_source_config()} + + del new_config["dataset_type_mapping"] + + pipeline = Pipeline.create( + { + "run_id": "powerbi-test", + "source": { + "type": "powerbi", + "config": { + **new_config, + }, + }, + "sink": { + "type": "file", + "config": { + "filename": f"{tmp_path}/powerbi_lower_case_urn_mces.json", + }, + }, + } + ) + + dataset_type_mapping_set_to_all_platform(pipeline) + + +@freeze_time(FROZEN_TIME) +@mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) +@pytest.mark.integration +def test_dataset_type_mapping_error( + mock_msal, pytestconfig, tmp_path, mock_time, requests_mock +): + """ + Here we don't need to run the pipeline. We need to verify if both dataset_type_mapping and server_to_platform_instance + are set then value error should get raised + """ + register_mock_api(request_mock=requests_mock) + + try: + Pipeline.create( + { + "run_id": "powerbi-test", + "source": { + "type": "powerbi", + "config": { + **default_source_config(), + "server_to_platform_instance": { + "localhost": { + "platform_instance": "test", + } + }, + }, + }, + "sink": { + "type": "file", + "config": { + "filename": f"{tmp_path}/powerbi_lower_case_urn_mces.json", + }, + }, + } + ) + except Exception as e: + assert ( + "dataset_type_mapping is deprecated. Use server_to_platform_instance only." + in str(e) + ) + + +@freeze_time(FROZEN_TIME) +@mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) +def test_server_to_platform_map( + mock_msal, pytestconfig, tmp_path, mock_time, requests_mock +): + enable_logging() + + test_resources_dir = pytestconfig.rootpath / "tests/integration/powerbi" + new_config: dict = { + **default_source_config(), + "extract_lineage": True, + "convert_lineage_urns_to_lowercase": True, + } + + del new_config["dataset_type_mapping"] + + new_config["server_to_platform_instance"] = { + "hp123rt5.ap-southeast-2.fakecomputing.com": { + "platform_instance": "snowflake_production_instance", + "env": "PROD", + }, + "my-test-project": { + "platform_instance": "bigquery-computing-dev-account", + "env": "QA", + }, + "localhost:1521": {"platform_instance": "oracle-sales-instance", "env": "PROD"}, + } + + register_mock_api(request_mock=requests_mock) + + output_path: str = f"{tmp_path}/powerbi_server_to_platform_instance_mces.json" + + pipeline = Pipeline.create( + { + "run_id": "powerbi-test", + "source": { + "type": "powerbi", + "config": new_config, + }, + "sink": { + "type": "file", + "config": { + "filename": output_path, + }, + }, + } + ) + + pipeline.run() + pipeline.raise_from_status() + golden_file_path: str = ( + f"{test_resources_dir}/golden_test_server_to_platform_instance.json" + ) + + mce_helpers.check_golden_file( + pytestconfig, + output_path=output_path, + golden_path=golden_file_path, + ) + # As server_to_platform_instance map is provided, the old dataset_type_mapping + # should be set to all supported platform + # to process all available upstream lineage even if mapping for platform instance is + # not provided in server_to_platform_instance map + dataset_type_mapping_set_to_all_platform(pipeline)