feat(ingestion): powerbi # uniquly identify the multiple instance of same platform (#7632)

Co-authored-by: MohdSiddiqueBagwan <mohdsiddique.bagwan@gslab.com>
Co-authored-by: John Joyce <john@acryl.io>
This commit is contained in:
mohdsiddique 2023-03-21 21:57:29 +05:30 committed by GitHub
parent 7efac2215d
commit 6d6d59141e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 1440 additions and 188 deletions

View File

@ -3,30 +3,42 @@ source:
config:
# Your Power BI tenant identifier
tenant_id: a949d688-67c0-4bf1-a344-e939411c6c0a
# Azure AD Application identifier
client_id: foo
# Azure AD App client secret
client_secret: bar
# Ingest elements of below PowerBi Workspace into Datahub
workspace_id_pattern:
allow:
- 4bd10256-e999-45dd-8e56-571c77153a5f
deny:
# Workspaces dataset environments (PROD, DEV, QA, STAGE)
env: DEV
# Azure AD Application identifier
client_id: foo
# Azure AD App client secret
client_secret: bar
# Enable / Disable ingestion of user information for dashboards
# Enable / Disable ingestion of ownership information for dashboards
extract_ownership: true
# Enable/Disable extracting workspace information to DataHub containers
extract_workspaces_to_containers: true
# Enable / Disable ingestion of endorsements.
# Please notice that this may overwrite any existing tags defined to ingested entities!
extract_endorsements_to_tags: false
# dataset_type_mapping is fixed mapping of Power BI datasources type to equivalent Datahub "data platform" dataset
dataset_type_mapping:
PostgreSql: postgres
Oracle: oracle
Sql: mssql
GoogleBigQuery: bigquery
# Optional -- This mapping is optional and only required to configure platform-instance for upstream tables
# A mapping of PowerBI datasource's server i.e host[:port] to data platform instance.
# :port is optional and only needed if your datasource server is running on non-standard port.
# For Google BigQuery the datasource's server is google bigquery project name
server_to_platform_instance:
ap-south-1.snowflakecomputing.com:
platform_instance: operational_instance
env: DEV
oracle-server:1920:
platform_instance: high_performance_production_unit
env: PROD
big-query-sales-project:
platform_instance: sn-2
env: QA
sink:
# sink configs

View File

@ -1,5 +1,6 @@
import logging
from dataclasses import dataclass, field as dataclass_field
from enum import Enum
from typing import Dict, List, Optional, Union
import pydantic
@ -7,7 +8,8 @@ from pydantic import validator
from pydantic.class_validators import root_validator
import datahub.emitter.mce_builder as builder
from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.configuration.pydantic_field_deprecation import pydantic_field_deprecated
from datahub.configuration.source_common import DEFAULT_ENV, DatasetSourceConfigMixin
from datahub.ingestion.source.common.subtypes import BIAssetSubTypes
from datahub.ingestion.source.state.stale_entity_removal_handler import (
@ -105,6 +107,40 @@ class Constant:
DATASET_WEB_URL = "datasetWebUrl"
@dataclass
class DataPlatformPair:
datahub_data_platform_name: str
powerbi_data_platform_name: str
class SupportedDataPlatform(Enum):
POSTGRES_SQL = DataPlatformPair(
powerbi_data_platform_name="PostgreSQL", datahub_data_platform_name="postgres"
)
ORACLE = DataPlatformPair(
powerbi_data_platform_name="Oracle", datahub_data_platform_name="oracle"
)
SNOWFLAKE = DataPlatformPair(
powerbi_data_platform_name="Snowflake", datahub_data_platform_name="snowflake"
)
MS_SQL = DataPlatformPair(
powerbi_data_platform_name="Sql", datahub_data_platform_name="mssql"
)
GOOGLE_BIGQUERY = DataPlatformPair(
powerbi_data_platform_name="GoogleBigQuery",
datahub_data_platform_name="bigquery",
)
AMAZON_REDSHIFT = DataPlatformPair(
powerbi_data_platform_name="AmazonRedshift",
datahub_data_platform_name="redshift",
)
@dataclass
class PowerBiDashboardSourceReport(StaleEntityRemovalSourceReport):
dashboards_scanned: int = 0
@ -129,15 +165,26 @@ class PowerBiDashboardSourceReport(StaleEntityRemovalSourceReport):
self.number_of_workspaces = number_of_workspaces
@dataclass
class PlatformDetail:
def default_for_dataset_type_mapping() -> Dict[str, str]:
dict_: dict = {}
for item in SupportedDataPlatform:
dict_[
item.value.powerbi_data_platform_name
] = item.value.datahub_data_platform_name
return dict_
class PlatformDetail(ConfigModel):
platform_instance: Optional[str] = pydantic.Field(
default=None,
description="DataHub platform instance name. It should be same as you have used in ingestion receipe of DataHub platform ingestion source of particular platform",
description="DataHub platform instance name. To generate correct urn for upstream dataset, this should match "
"with platform instance name used in ingestion"
"recipe of other datahub sources.",
)
env: str = pydantic.Field(
default=DEFAULT_ENV,
description="The environment that all assets produced by DataHub platform ingestion source belong to",
description="The environment that the platform is located in. It is default to PROD",
)
@ -157,7 +204,9 @@ class PowerBiDashboardSourceConfig(
tenant_id: str = pydantic.Field(description="PowerBI tenant identifier")
# PowerBi workspace identifier
workspace_id: Optional[str] = pydantic.Field(
description="[deprecated] Use workspace_id_pattern instead", default=None
default=None,
description="[deprecated] Use workspace_id_pattern instead",
hidden_from_docs=True,
)
# PowerBi workspace identifier
workspace_id_pattern: AllowDenyPattern = pydantic.Field(
@ -171,7 +220,23 @@ class PowerBiDashboardSourceConfig(
dataset_type_mapping: Union[
Dict[str, str], Dict[str, PlatformDetail]
] = pydantic.Field(
description="Mapping of PowerBI datasource type to DataHub supported data-sources. See Quickstart Recipe for mapping"
default_factory=default_for_dataset_type_mapping,
description="[deprecated] Use server_to_platform_instance instead. Mapping of PowerBI datasource type to "
"DataHub supported datasources."
"You can configured platform instance for dataset lineage. "
"See Quickstart Recipe for mapping",
hidden_from_docs=True,
)
# PowerBI datasource's server to platform instance mapping
server_to_platform_instance: Dict[str, PlatformDetail] = pydantic.Field(
default={},
description="A mapping of PowerBI datasource's server i.e host[:port] to Data platform instance."
" :port is optional and only needed if your datasource server is running on non-standard port."
"For Google BigQuery the datasource's server is google bigquery project name",
)
# deprecated warning
_dataset_type_mapping = pydantic_field_deprecated(
"dataset_type_mapping", "server_to_platform_instance"
)
# Azure app client identifier
client_id: str = pydantic.Field(description="Azure app client identifier")
@ -259,14 +324,28 @@ class PowerBiDashboardSourceConfig(
if workspace_id_pattern == AllowDenyPattern.allow_all() and workspace_id:
logger.warning(
"workspace_id_pattern is not set but workspace_id is set, setting workspace_id as workspace_id_pattern. workspace_id will be deprecated, please use workspace_id_pattern instead."
"workspace_id_pattern is not set but workspace_id is set, setting workspace_id as "
"workspace_id_pattern. workspace_id will be deprecated, please use workspace_id_pattern instead."
)
values["workspace_id_pattern"] = AllowDenyPattern(
allow=[f"^{workspace_id}$"]
)
elif workspace_id_pattern != AllowDenyPattern.allow_all() and workspace_id:
logger.warning(
"workspace_id will be ignored in favour of workspace_id_pattern. workspace_id will be deprecated, please use workspace_id_pattern only."
"workspace_id will be ignored in favour of workspace_id_pattern. workspace_id will be deprecated, "
"please use workspace_id_pattern only."
)
values.pop("workspace_id")
return values
@root_validator(pre=True)
def raise_error_for_dataset_type_mapping(cls, values: Dict) -> Dict:
if (
values.get("dataset_type_mapping") is not None
and values.get("server_to_platform_instance") is not None
):
raise ValueError(
"dataset_type_mapping is deprecated. Use server_to_platform_instance only."
)
return values

View File

@ -0,0 +1,75 @@
import logging
from abc import ABC, abstractmethod
from typing import Union
from datahub.ingestion.source.powerbi.config import (
PlatformDetail,
PowerBiDashboardSourceConfig,
)
from datahub.ingestion.source.powerbi.m_query.resolver import DataPlatformTable
logger = logging.getLogger(__name__)
class AbstractDataPlatformInstanceResolver(ABC):
@abstractmethod
def get_platform_instance(
self, dataplatform_table: DataPlatformTable
) -> PlatformDetail:
pass
class BaseAbstractDataPlatformInstanceResolver(
AbstractDataPlatformInstanceResolver, ABC
):
config: PowerBiDashboardSourceConfig
def __init__(self, config):
self.config = config
class ResolvePlatformInstanceFromDatasetTypeMapping(
BaseAbstractDataPlatformInstanceResolver
):
def get_platform_instance(
self, dataplatform_table: DataPlatformTable
) -> PlatformDetail:
platform: Union[str, PlatformDetail] = self.config.dataset_type_mapping[
dataplatform_table.data_platform_pair.powerbi_data_platform_name
]
if isinstance(platform, PlatformDetail):
return platform
return PlatformDetail.parse_obj({})
class ResolvePlatformInstanceFromServerToPlatformInstance(
BaseAbstractDataPlatformInstanceResolver
):
def get_platform_instance(
self, dataplatform_table: DataPlatformTable
) -> PlatformDetail:
return (
self.config.server_to_platform_instance[
dataplatform_table.datasource_server
]
if dataplatform_table.datasource_server
in self.config.server_to_platform_instance
else PlatformDetail.parse_obj({})
)
def create_dataplatform_instance_resolver(
config: PowerBiDashboardSourceConfig,
) -> AbstractDataPlatformInstanceResolver:
if config.server_to_platform_instance:
logger.debug(
"Creating resolver to resolve platform instance from server_to_platform_instance"
)
return ResolvePlatformInstanceFromServerToPlatformInstance(config)
logger.debug(
"Creating resolver to resolve platform instance from dataset_type_mapping"
)
return ResolvePlatformInstanceFromDatasetTypeMapping(config)

View File

@ -6,7 +6,11 @@ from typing import Any, Dict, List, Optional, Tuple, Type, Union, cast
from lark import Tree
from datahub.ingestion.source.powerbi.config import PowerBiDashboardSourceReport
from datahub.ingestion.source.powerbi.config import (
DataPlatformPair,
PowerBiDashboardSourceReport,
SupportedDataPlatform,
)
from datahub.ingestion.source.powerbi.m_query import native_sql_parser, tree_function
from datahub.ingestion.source.powerbi.m_query.data_classes import (
TRACE_POWERBI_MQUERY_PARSER,
@ -18,52 +22,19 @@ from datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes import Table
logger = logging.getLogger(__name__)
@dataclass
class DataPlatformPair:
datahub_data_platform_name: str
powerbi_data_platform_name: str
@dataclass
class DataPlatformTable:
name: str
full_name: str
datasource_server: str
data_platform_pair: DataPlatformPair
class SupportedDataPlatform(Enum):
POSTGRES_SQL = DataPlatformPair(
powerbi_data_platform_name="PostgreSQL", datahub_data_platform_name="postgres"
)
ORACLE = DataPlatformPair(
powerbi_data_platform_name="Oracle", datahub_data_platform_name="oracle"
)
SNOWFLAKE = DataPlatformPair(
powerbi_data_platform_name="Snowflake", datahub_data_platform_name="snowflake"
)
MS_SQL = DataPlatformPair(
powerbi_data_platform_name="Sql", datahub_data_platform_name="mssql"
)
GOOGLE_BIGQUERY = DataPlatformPair(
powerbi_data_platform_name="GoogleBigQuery",
datahub_data_platform_name="bigquery",
)
AMAZON_REDSHIFT = DataPlatformPair(
powerbi_data_platform_name="AmazonRedshift",
datahub_data_platform_name="redshift",
)
class AbstractTableFullNameCreator(ABC):
class AbstractDataPlatformTableCreator(ABC):
@abstractmethod
def get_full_table_names(
def create_dataplatform_tables(
self, data_access_func_detail: DataAccessFunctionDetail
) -> List[str]:
) -> List[DataPlatformTable]:
pass
@abstractmethod
@ -71,19 +42,20 @@ class AbstractTableFullNameCreator(ABC):
pass
@staticmethod
def get_db_name_from_second_argument(arg_list: Tree) -> Optional[str]:
def get_db_detail_from_argument(
arg_list: Tree,
) -> Tuple[Optional[str], Optional[str]]:
arguments: List[str] = tree_function.strip_char_from_list(
values=tree_function.remove_whitespaces_from_list(
tree_function.token_values(arg_list)
),
char='"',
)
if len(arguments) < 2:
logger.debug(f"Expected minimum 2 arguments, but got {len(arguments)}")
return None
return None, None
return arguments[1]
return arguments[0], arguments[1]
class AbstractDataAccessMQueryResolver(ABC):
@ -140,7 +112,6 @@ class MQueryResolver(AbstractDataAccessMQueryResolver, ABC):
cast(Tree, item_selector), parameters=self.parameters
)
),
'"',
)
identifier: List[str] = tree_function.token_values(
cast(Tree, identifier_tree)
@ -382,25 +353,18 @@ class MQueryResolver(AbstractDataAccessMQueryResolver, ABC):
)
continue
table_full_name_creator: AbstractTableFullNameCreator = (
table_full_name_creator: AbstractDataPlatformTableCreator = (
supported_resolver.get_table_full_name_creator()()
)
for table_full_name in table_full_name_creator.get_full_table_names(
f_detail
):
data_platform_tables.append(
DataPlatformTable(
name=table_full_name.split(".")[-1],
full_name=table_full_name,
data_platform_pair=table_full_name_creator.get_platform_pair(),
)
)
data_platform_tables.extend(
table_full_name_creator.create_dataplatform_tables(f_detail)
)
return data_platform_tables
class DefaultTwoStepDataAccessSources(AbstractTableFullNameCreator, ABC):
class DefaultTwoStepDataAccessSources(AbstractDataPlatformTableCreator, ABC):
"""
These are the DataSource for which PowerBI Desktop generates default M-Query of following pattern
let
@ -412,19 +376,16 @@ class DefaultTwoStepDataAccessSources(AbstractTableFullNameCreator, ABC):
def two_level_access_pattern(
self, data_access_func_detail: DataAccessFunctionDetail
) -> List[str]:
full_table_names: List[str] = []
) -> List[DataPlatformTable]:
logger.debug(
f"Processing {self.get_platform_pair().powerbi_data_platform_name} function detail {data_access_func_detail}"
f"Processing {self.get_platform_pair().powerbi_data_platform_name} data-access function detail {data_access_func_detail}"
)
db_name: Optional[str] = self.get_db_name_from_second_argument(
server, db_name = self.get_db_detail_from_argument(
data_access_func_detail.arg_list
)
if db_name is None:
logger.debug("db_name not found in expression")
return full_table_names # Return empty list
if server is None or db_name is None:
return [] # Return empty list
schema_name: str = cast(
IdentifierAccessor, data_access_func_detail.identifier_accessor
@ -434,38 +395,44 @@ class DefaultTwoStepDataAccessSources(AbstractTableFullNameCreator, ABC):
IdentifierAccessor, data_access_func_detail.identifier_accessor
).items["Item"]
full_table_names.append(f"{db_name}.{schema_name}.{table_name}")
full_table_name: str = f"{db_name}.{schema_name}.{table_name}"
logger.debug(
f"Platform({self.get_platform_pair().datahub_data_platform_name}) full-table-names = {full_table_names}"
f"Platform({self.get_platform_pair().datahub_data_platform_name}) full_table_name= {full_table_name}"
)
return full_table_names
return [
DataPlatformTable(
name=table_name,
full_name=full_table_name,
datasource_server=server,
data_platform_pair=self.get_platform_pair(),
)
]
class PostgresTableFullNameCreator(DefaultTwoStepDataAccessSources):
def get_full_table_names(
class PostgresDataPlatformTableCreator(DefaultTwoStepDataAccessSources):
def create_dataplatform_tables(
self, data_access_func_detail: DataAccessFunctionDetail
) -> List[str]:
) -> List[DataPlatformTable]:
return self.two_level_access_pattern(data_access_func_detail)
def get_platform_pair(self) -> DataPlatformPair:
return SupportedDataPlatform.POSTGRES_SQL.value
class MSSqlTableFullNameCreator(DefaultTwoStepDataAccessSources):
class MSSqlDataPlatformTableCreator(DefaultTwoStepDataAccessSources):
def get_platform_pair(self) -> DataPlatformPair:
return SupportedDataPlatform.MS_SQL.value
def get_full_table_names(
def create_dataplatform_tables(
self, data_access_func_detail: DataAccessFunctionDetail
) -> List[str]:
full_table_names: List[str] = []
) -> List[DataPlatformTable]:
dataplatform_tables: List[DataPlatformTable] = []
arguments: List[str] = tree_function.strip_char_from_list(
values=tree_function.remove_whitespaces_from_list(
tree_function.token_values(data_access_func_detail.arg_list)
),
char='"',
)
if len(arguments) == 2:
@ -475,9 +442,10 @@ class MSSqlTableFullNameCreator(DefaultTwoStepDataAccessSources):
if len(arguments) >= 4 and arguments[2] != "Query":
logger.debug("Unsupported case is found. Second index is not the Query")
return full_table_names
return dataplatform_tables
db_name: str = arguments[1]
tables: List[str] = native_sql_parser.get_tables(arguments[3])
for table in tables:
schema_and_table: List[str] = table.split(".")
@ -486,34 +454,42 @@ class MSSqlTableFullNameCreator(DefaultTwoStepDataAccessSources):
# https://learn.microsoft.com/en-us/sql/relational-databases/security/authentication-access/ownership-and-user-schema-separation?view=sql-server-ver16
schema_and_table.insert(0, "dbo")
full_table_names.append(
f"{db_name}.{schema_and_table[0]}.{schema_and_table[1]}"
dataplatform_tables.append(
DataPlatformTable(
name=schema_and_table[1],
full_name=f"{db_name}.{schema_and_table[0]}.{schema_and_table[1]}",
datasource_server=arguments[0],
data_platform_pair=self.get_platform_pair(),
)
)
logger.debug("MS-SQL full-table-names %s", full_table_names)
logger.debug("MS-SQL full-table-names %s", dataplatform_tables)
return full_table_names
return dataplatform_tables
class OracleTableFullNameCreator(AbstractTableFullNameCreator):
class OracleDataPlatformTableCreator(AbstractDataPlatformTableCreator):
def get_platform_pair(self) -> DataPlatformPair:
return SupportedDataPlatform.ORACLE.value
def _get_db_name(self, value: str) -> Optional[str]:
error_message: str = f"The target argument ({value}) should in the format of <host-name>:<port>/<db-name>[.<domain>]"
@staticmethod
def _get_server_and_db_name(value: str) -> Tuple[Optional[str], Optional[str]]:
error_message: str = (
f"The target argument ({value}) should in the format of <host-name>:<port>/<db-name>["
".<domain>]"
)
splitter_result: List[str] = value.split("/")
if len(splitter_result) != 2:
logger.debug(error_message)
return None
return None, None
db_name = splitter_result[1].split(".")[0]
return db_name
return tree_function.strip_char_from_list([splitter_result[0]])[0], db_name
def get_full_table_names(
def create_dataplatform_tables(
self, data_access_func_detail: DataAccessFunctionDetail
) -> List[str]:
full_table_names: List[str] = []
) -> List[DataPlatformTable]:
logger.debug(
f"Processing Oracle data-access function detail {data_access_func_detail}"
@ -523,9 +499,10 @@ class OracleTableFullNameCreator(AbstractTableFullNameCreator):
tree_function.token_values(data_access_func_detail.arg_list)
)
db_name: Optional[str] = self._get_db_name(arguments[0])
if db_name is None:
return full_table_names
server, db_name = self._get_server_and_db_name(arguments[0])
if db_name is None or server is None:
return []
schema_name: str = cast(
IdentifierAccessor, data_access_func_detail.identifier_accessor
@ -536,19 +513,32 @@ class OracleTableFullNameCreator(AbstractTableFullNameCreator):
cast(IdentifierAccessor, data_access_func_detail.identifier_accessor).next,
).items["Name"]
full_table_names.append(f"{db_name}.{schema_name}.{table_name}")
return full_table_names
return [
DataPlatformTable(
name=table_name,
full_name=f"{db_name}.{schema_name}.{table_name}",
datasource_server=server,
data_platform_pair=self.get_platform_pair(),
)
]
class DefaultThreeStepDataAccessSources(AbstractTableFullNameCreator, ABC):
def get_full_table_names(
class DefaultThreeStepDataAccessSources(AbstractDataPlatformTableCreator, ABC):
def get_datasource_server(
self, arguments: List[str], data_access_func_detail: DataAccessFunctionDetail
) -> str:
return tree_function.strip_char_from_list([arguments[0]])[0]
def create_dataplatform_tables(
self, data_access_func_detail: DataAccessFunctionDetail
) -> List[str]:
) -> List[DataPlatformTable]:
logger.debug(
f"Processing {self.get_platform_pair().datahub_data_platform_name} function detail {data_access_func_detail}"
)
arguments: List[str] = tree_function.remove_whitespaces_from_list(
tree_function.token_values(data_access_func_detail.arg_list)
)
# First is database name
db_name: str = data_access_func_detail.identifier_accessor.items["Name"] # type: ignore
# Second is schema name
@ -566,37 +556,56 @@ class DefaultThreeStepDataAccessSources(AbstractTableFullNameCreator, ABC):
f"{self.get_platform_pair().datahub_data_platform_name} full-table-name {full_table_name}"
)
return [full_table_name]
return [
DataPlatformTable(
name=table_name,
full_name=full_table_name,
datasource_server=self.get_datasource_server(
arguments, data_access_func_detail
),
data_platform_pair=self.get_platform_pair(),
)
]
class SnowflakeTableFullNameCreator(DefaultThreeStepDataAccessSources):
class SnowflakeDataPlatformTableCreator(DefaultThreeStepDataAccessSources):
def get_platform_pair(self) -> DataPlatformPair:
return SupportedDataPlatform.SNOWFLAKE.value
class GoogleBigQueryTableFullNameCreator(DefaultThreeStepDataAccessSources):
class GoogleBigQueryDataPlatformTableCreator(DefaultThreeStepDataAccessSources):
def get_platform_pair(self) -> DataPlatformPair:
return SupportedDataPlatform.GOOGLE_BIGQUERY.value
def get_datasource_server(
self, arguments: List[str], data_access_func_detail: DataAccessFunctionDetail
) -> str:
# In Google BigQuery server is project-name
# condition to silent lint, it is not going to be None
return (
data_access_func_detail.identifier_accessor.items["Name"]
if data_access_func_detail.identifier_accessor is not None
else str()
)
class AmazonRedshiftFullNameCreator(AbstractTableFullNameCreator):
class AmazonRedshiftDataPlatformTableCreator(AbstractDataPlatformTableCreator):
def get_platform_pair(self) -> DataPlatformPair:
return SupportedDataPlatform.AMAZON_REDSHIFT.value
def get_full_table_names(
def create_dataplatform_tables(
self, data_access_func_detail: DataAccessFunctionDetail
) -> List[str]:
full_table_names: List[str] = []
) -> List[DataPlatformTable]:
logger.debug(
f"Processing AmazonRedshift data-access function detail {data_access_func_detail}"
)
db_name: Optional[str] = self.get_db_name_from_second_argument(
server, db_name = self.get_db_detail_from_argument(
data_access_func_detail.arg_list
)
if db_name is None:
return full_table_names # Return empty list
if db_name is None or server is None:
return [] # Return empty list
schema_name: str = cast(
IdentifierAccessor, data_access_func_detail.identifier_accessor
@ -607,12 +616,17 @@ class AmazonRedshiftFullNameCreator(AbstractTableFullNameCreator):
cast(IdentifierAccessor, data_access_func_detail.identifier_accessor).next,
).items["Name"]
full_table_names.append(f"{db_name}.{schema_name}.{table_name}")
return full_table_names
return [
DataPlatformTable(
name=table_name,
full_name=f"{db_name}.{schema_name}.{table_name}",
datasource_server=server,
data_platform_pair=self.get_platform_pair(),
)
]
class NativeQueryTableFullNameCreator(AbstractTableFullNameCreator):
class NativeQueryDataPlatformTableCreator(AbstractDataPlatformTableCreator):
SUPPORTED_NATIVE_QUERY_DATA_PLATFORM: dict = {
SupportedDataPlatform.SNOWFLAKE.value.powerbi_data_platform_name: SupportedDataPlatform.SNOWFLAKE,
SupportedDataPlatform.AMAZON_REDSHIFT.value.powerbi_data_platform_name: SupportedDataPlatform.AMAZON_REDSHIFT,
@ -626,13 +640,13 @@ class NativeQueryTableFullNameCreator(AbstractTableFullNameCreator):
def is_native_parsing_supported(data_access_function_name: str) -> bool:
return (
data_access_function_name
in NativeQueryTableFullNameCreator.SUPPORTED_NATIVE_QUERY_DATA_PLATFORM
in NativeQueryDataPlatformTableCreator.SUPPORTED_NATIVE_QUERY_DATA_PLATFORM
)
def get_full_table_names(
def create_dataplatform_tables(
self, data_access_func_detail: DataAccessFunctionDetail
) -> List[str]:
full_table_names: List[str] = []
) -> List[DataPlatformTable]:
dataplatform_tables: List[DataPlatformTable] = []
t1: Tree = cast(
Tree, tree_function.first_arg_list_func(data_access_func_detail.arg_list)
)
@ -643,11 +657,11 @@ class NativeQueryTableFullNameCreator(AbstractTableFullNameCreator):
f"Expecting 2 argument, actual argument count is {len(flat_argument_list)}"
)
logger.debug(f"Flat argument list = {flat_argument_list}")
return full_table_names
return dataplatform_tables
data_access_tokens: List[str] = tree_function.remove_whitespaces_from_list(
tree_function.token_values(flat_argument_list[0])
)
if not self.is_native_parsing_supported(data_access_tokens[0]):
logger.debug(
f"Unsupported native-query data-platform = {data_access_tokens[0]}"
@ -655,7 +669,13 @@ class NativeQueryTableFullNameCreator(AbstractTableFullNameCreator):
logger.debug(
f"NativeQuery is supported only for {self.SUPPORTED_NATIVE_QUERY_DATA_PLATFORM}"
)
return full_table_names
if len(data_access_tokens[0]) < 3:
logger.debug(
f"Server is not available in argument list for data-platform {data_access_tokens[0]}. Returning empty "
"list"
)
return dataplatform_tables
self.current_data_platform = self.SUPPORTED_NATIVE_QUERY_DATA_PLATFORM[
data_access_tokens[0]
@ -665,7 +685,6 @@ class NativeQueryTableFullNameCreator(AbstractTableFullNameCreator):
values=tree_function.remove_whitespaces_from_list(
tree_function.token_values(flat_argument_list[1])
),
char='"',
)[
0
] # Remove any whitespaces and double quotes character
@ -677,9 +696,18 @@ class NativeQueryTableFullNameCreator(AbstractTableFullNameCreator):
)
continue
full_table_names.append(table)
dataplatform_tables.append(
DataPlatformTable(
name=table.split(".")[2],
full_name=table,
datasource_server=tree_function.strip_char_from_list(
[data_access_tokens[2]]
)[0],
data_platform_pair=self.get_platform_pair(),
)
)
return full_table_names
return dataplatform_tables
class FunctionName(Enum):
@ -694,41 +722,41 @@ class FunctionName(Enum):
class SupportedResolver(Enum):
POSTGRES_SQL = (
PostgresTableFullNameCreator,
PostgresDataPlatformTableCreator,
FunctionName.POSTGRESQL_DATA_ACCESS,
)
ORACLE = (
OracleTableFullNameCreator,
OracleDataPlatformTableCreator,
FunctionName.ORACLE_DATA_ACCESS,
)
SNOWFLAKE = (
SnowflakeTableFullNameCreator,
SnowflakeDataPlatformTableCreator,
FunctionName.SNOWFLAKE_DATA_ACCESS,
)
MS_SQL = (
MSSqlTableFullNameCreator,
MSSqlDataPlatformTableCreator,
FunctionName.MSSQL_DATA_ACCESS,
)
GOOGLE_BIG_QUERY = (
GoogleBigQueryTableFullNameCreator,
GoogleBigQueryDataPlatformTableCreator,
FunctionName.GOOGLE_BIGQUERY_DATA_ACCESS,
)
AMAZON_REDSHIFT = (
AmazonRedshiftFullNameCreator,
AmazonRedshiftDataPlatformTableCreator,
FunctionName.AMAZON_REDSHIFT_DATA_ACCESS,
)
NATIVE_QUERY = (
NativeQueryTableFullNameCreator,
NativeQueryDataPlatformTableCreator,
FunctionName.NATIVE_QUERY,
)
def get_table_full_name_creator(self) -> Type[AbstractTableFullNameCreator]:
def get_table_full_name_creator(self) -> Type[AbstractDataPlatformTableCreator]:
return self.value[0]
def get_function_name(self) -> str:

View File

@ -120,7 +120,7 @@ def remove_whitespaces_from_list(values: List[str]) -> List[str]:
return result
def strip_char_from_list(values: List[str], char: str) -> List[str]:
def strip_char_from_list(values: List[str], char: str = '"') -> List[str]:
result: List[str] = []
for item in values:
result.append(item.strip(char))

View File

@ -5,11 +5,10 @@
#########################################################
import logging
from typing import Iterable, List, Optional, Tuple, Union, cast
from typing import Iterable, List, Optional, Tuple
import datahub.emitter.mce_builder as builder
import datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes as powerbi_data_classes
from datahub.configuration.source_common import DEFAULT_ENV
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import gen_containers
from datahub.ingestion.api.common import PipelineContext
@ -30,6 +29,10 @@ from datahub.ingestion.source.powerbi.config import (
PowerBiDashboardSourceConfig,
PowerBiDashboardSourceReport,
)
from datahub.ingestion.source.powerbi.dataplatform_instance_resolver import (
AbstractDataPlatformInstanceResolver,
create_dataplatform_instance_resolver,
)
from datahub.ingestion.source.powerbi.m_query import parser, resolver
from datahub.ingestion.source.powerbi.rest_api_wrapper.powerbi_api import PowerBiAPI
from datahub.ingestion.source.state.sql_common_state import (
@ -95,9 +98,11 @@ class Mapper:
self,
config: PowerBiDashboardSourceConfig,
reporter: PowerBiDashboardSourceReport,
dataplatform_instance_resolver: AbstractDataPlatformInstanceResolver,
):
self.__config = config
self.__reporter = reporter
self.__dataplatform_instance_resolver = dataplatform_instance_resolver
@staticmethod
def urn_to_lowercase(value: str, flag: bool) -> str:
@ -157,7 +162,9 @@ class Mapper:
upstream_tables: List[resolver.DataPlatformTable] = parser.get_upstream_tables(
table, self.__reporter, parameters=parameters
)
logger.debug(
f"PowerBI virtual table {table.full_name} and it's upstream dataplatform tables = {upstream_tables}"
)
for upstream_table in upstream_tables:
if (
upstream_table.data_platform_pair.powerbi_data_platform_name
@ -168,27 +175,15 @@ class Mapper:
)
continue
platform: Union[str, PlatformDetail] = self.__config.dataset_type_mapping[
upstream_table.data_platform_pair.powerbi_data_platform_name
]
platform_name: str = (
upstream_table.data_platform_pair.datahub_data_platform_name
platform_detail: PlatformDetail = (
self.__dataplatform_instance_resolver.get_platform_instance(
upstream_table
)
)
platform_instance_name: Optional[str] = None
platform_env: str = DEFAULT_ENV
# Determine if PlatformDetail is provided
if isinstance(platform, PlatformDetail):
platform_instance_name = cast(
PlatformDetail, platform
).platform_instance
platform_env = cast(PlatformDetail, platform).env
upstream_urn = builder.make_dataset_urn_with_platform_instance(
platform=platform_name,
platform_instance=platform_instance_name,
env=platform_env,
platform=upstream_table.data_platform_pair.datahub_data_platform_name,
platform_instance=platform_detail.platform_instance,
env=platform_detail.env,
name=self.lineage_urn_to_lowercase(upstream_table.full_name),
)
@ -200,9 +195,7 @@ class Mapper:
if len(upstreams) > 0:
upstream_lineage = UpstreamLineageClass(upstreams=upstreams)
logger.debug(
f"DataSet Lineage = {ds_urn} and its lineage = {upstream_lineage}"
)
logger.debug(f"Dataset urn = {ds_urn} and its lineage = {upstream_lineage}")
mcp = MetadataChangeProposalWrapper(
entityType=Constant.DATASET,
changeType=ChangeTypeClass.UPSERT,
@ -906,6 +899,7 @@ class PowerBiDashboardSource(StatefulIngestionSourceBase):
source_config: PowerBiDashboardSourceConfig
reporter: PowerBiDashboardSourceReport
dataplatform_instance_resolver: AbstractDataPlatformInstanceResolver
accessed_dashboards: int = 0
platform: str = "powerbi"
@ -913,15 +907,19 @@ class PowerBiDashboardSource(StatefulIngestionSourceBase):
super(PowerBiDashboardSource, self).__init__(config, ctx)
self.source_config = config
self.reporter = PowerBiDashboardSourceReport()
self.dataplatform_instance_resolver = create_dataplatform_instance_resolver(
self.source_config
)
try:
self.powerbi_client = PowerBiAPI(self.source_config)
except Exception as e:
logger.warning(e)
exit(
1
) # Exit pipeline as we are not able to connect to PowerBI API Service. This exit will avoid raising unwanted stacktrace on console
) # Exit pipeline as we are not able to connect to PowerBI API Service. This exit will avoid raising
# unwanted stacktrace on console
self.mapper = Mapper(config, self.reporter)
self.mapper = Mapper(config, self.reporter, self.dataplatform_instance_resolver)
# Create and register the stateful ingestion use-case handler.
self.stale_entity_removal_handler = StaleEntityRemovalHandler(
@ -962,6 +960,10 @@ class PowerBiDashboardSource(StatefulIngestionSourceBase):
if key not in powerbi_data_platforms:
raise ValueError(f"PowerBI DataPlatform {key} is not supported")
logger.debug(
f"Dataset lineage would get ingested for data-platform = {self.source_config.dataset_type_mapping}"
)
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
"""
Datahub Ingestion framework invoke this method

View File

@ -0,0 +1,861 @@
[
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.public_issue_history,DEV)",
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"json": {
"customProperties": {},
"name": "public issue_history",
"description": "Library dataset description",
"tags": []
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.public_issue_history,DEV)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.SNOWFLAKE_TESTTABLE,DEV)",
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"json": {
"customProperties": {},
"name": "SNOWFLAKE_TESTTABLE",
"description": "Library dataset description",
"tags": []
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.SNOWFLAKE_TESTTABLE,DEV)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.SNOWFLAKE_TESTTABLE,DEV)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"json": {
"upstreams": [
{
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,snowflake_production_instance.pbi_test.test.testtable,PROD)",
"type": "TRANSFORMED"
}
]
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.snowflake_native-query,DEV)",
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"json": {
"customProperties": {},
"name": "snowflake native-query",
"description": "Library dataset description",
"tags": []
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.snowflake_native-query,DEV)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.snowflake_native-query,DEV)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"json": {
"upstreams": [
{
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,operations_analytics.transformed_prod.v_aps_sme_units_v4,PROD)",
"type": "TRANSFORMED"
}
]
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.big-query-with-parameter,DEV)",
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"json": {
"customProperties": {},
"name": "big-query-with-parameter",
"description": "Library dataset description",
"tags": []
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.big-query-with-parameter,DEV)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.big-query-with-parameter,DEV)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"json": {
"upstreams": [
{
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:bigquery,bigquery-computing-dev-account.my-test-project.universal.d_wh_date,QA)",
"type": "TRANSFORMED"
}
]
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.snowflake_native-query-with-join,DEV)",
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"json": {
"customProperties": {},
"name": "snowflake native-query-with-join",
"description": "Library dataset description",
"tags": []
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.snowflake_native-query-with-join,DEV)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.snowflake_native-query-with-join,DEV)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"json": {
"upstreams": [
{
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,gsl_test_db.public.sales_analyst,PROD)",
"type": "TRANSFORMED"
},
{
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,gsl_test_db.public.sales_forecast,PROD)",
"type": "TRANSFORMED"
}
]
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.job-history,DEV)",
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"json": {
"customProperties": {},
"name": "job-history",
"description": "Library dataset description",
"tags": []
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.job-history,DEV)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.job-history,DEV)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"json": {
"upstreams": [
{
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:oracle,oracle-sales-instance.salesdb.hr.employees,PROD)",
"type": "TRANSFORMED"
}
]
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.postgres_test_table,DEV)",
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"json": {
"customProperties": {},
"name": "postgres_test_table",
"description": "Library dataset description",
"tags": []
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.postgres_test_table,DEV)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.postgres_test_table,DEV)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"json": {
"upstreams": [
{
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,mics.public.order_date,PROD)",
"type": "TRANSFORMED"
}
]
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,hr_pbi_test.dbo_book_issue,DEV)",
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"json": {
"customProperties": {},
"name": "dbo_book_issue",
"description": "hr pbi test description",
"tags": []
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,hr_pbi_test.dbo_book_issue,DEV)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,hr_pbi_test.dbo_book_issue,DEV)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"json": {
"upstreams": [
{
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:mssql,library.dbo.book_issue,PROD)",
"type": "TRANSFORMED"
}
]
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,hr_pbi_test.ms_sql_native_table,DEV)",
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"json": {
"customProperties": {},
"name": "ms_sql_native_table",
"description": "hr pbi test description",
"tags": []
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,hr_pbi_test.ms_sql_native_table,DEV)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,hr_pbi_test.ms_sql_native_table,DEV)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"json": {
"upstreams": [
{
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:mssql,commopsdb.dbo.v_ps_cd_retention,PROD)",
"type": "TRANSFORMED"
}
]
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test"
}
},
{
"entityType": "corpuser",
"entityUrn": "urn:li:corpuser:users.User1@foo.com",
"changeType": "UPSERT",
"aspectName": "corpUserKey",
"aspect": {
"json": {
"username": "User1@foo.com"
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test"
}
},
{
"entityType": "corpuser",
"entityUrn": "urn:li:corpuser:users.User2@foo.com",
"changeType": "UPSERT",
"aspectName": "corpUserKey",
"aspect": {
"json": {
"username": "User2@foo.com"
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test"
}
},
{
"entityType": "chart",
"entityUrn": "urn:li:chart:(powerbi,charts.B8E293DC-0C83-4AA0-9BB9-0A8738DF24A0)",
"changeType": "UPSERT",
"aspectName": "chartInfo",
"aspect": {
"json": {
"customProperties": {
"createdFrom": "Dataset",
"datasetId": "05169CD2-E713-41E6-9600-1D8066D95445",
"datasetWebUrl": "http://localhost/groups/64ED5CAD-7C10-4684-8180-826122881108/datasets/05169CD2-E713-41E6-9600-1D8066D95445/details"
},
"title": "test_tile",
"description": "test_tile",
"lastModified": {
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
}
},
"inputs": [
{
"string": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.public_issue_history,DEV)"
},
{
"string": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.SNOWFLAKE_TESTTABLE,DEV)"
},
{
"string": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.snowflake_native-query,DEV)"
},
{
"string": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.big-query-with-parameter,DEV)"
},
{
"string": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.snowflake_native-query-with-join,DEV)"
},
{
"string": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.job-history,DEV)"
},
{
"string": "urn:li:dataset:(urn:li:dataPlatform:powerbi,library-dataset.postgres_test_table,DEV)"
}
]
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test"
}
},
{
"entityType": "chart",
"entityUrn": "urn:li:chart:(powerbi,charts.B8E293DC-0C83-4AA0-9BB9-0A8738DF24A0)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test"
}
},
{
"entityType": "chart",
"entityUrn": "urn:li:chart:(powerbi,charts.B8E293DC-0C83-4AA0-9BB9-0A8738DF24A0)",
"changeType": "UPSERT",
"aspectName": "chartKey",
"aspect": {
"json": {
"dashboardTool": "powerbi",
"chartId": "powerbi.linkedin.com/charts/B8E293DC-0C83-4AA0-9BB9-0A8738DF24A0"
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test"
}
},
{
"entityType": "chart",
"entityUrn": "urn:li:chart:(powerbi,charts.B8E293DC-0C83-4AA0-9BB9-0A8738DF24A0)",
"changeType": "UPSERT",
"aspectName": "browsePaths",
"aspect": {
"json": {
"paths": [
"/powerbi/demo-workspace"
]
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test"
}
},
{
"entityType": "chart",
"entityUrn": "urn:li:chart:(powerbi,charts.23212598-23b5-4980-87cc-5fc0ecd84385)",
"changeType": "UPSERT",
"aspectName": "chartInfo",
"aspect": {
"json": {
"customProperties": {
"createdFrom": "Dataset",
"datasetId": "ba0130a1-5b03-40de-9535-b34e778ea6ed",
"datasetWebUrl": "http://localhost/groups/64ED5CAD-7C10-4684-8180-826122881108/datasets/ba0130a1-5b03-40de-9535-b34e778ea6ed/details"
},
"title": "yearly_sales",
"description": "yearly_sales",
"lastModified": {
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
}
},
"inputs": [
{
"string": "urn:li:dataset:(urn:li:dataPlatform:powerbi,hr_pbi_test.dbo_book_issue,DEV)"
},
{
"string": "urn:li:dataset:(urn:li:dataPlatform:powerbi,hr_pbi_test.ms_sql_native_table,DEV)"
}
]
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test"
}
},
{
"entityType": "chart",
"entityUrn": "urn:li:chart:(powerbi,charts.23212598-23b5-4980-87cc-5fc0ecd84385)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test"
}
},
{
"entityType": "chart",
"entityUrn": "urn:li:chart:(powerbi,charts.23212598-23b5-4980-87cc-5fc0ecd84385)",
"changeType": "UPSERT",
"aspectName": "chartKey",
"aspect": {
"json": {
"dashboardTool": "powerbi",
"chartId": "powerbi.linkedin.com/charts/23212598-23b5-4980-87cc-5fc0ecd84385"
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test"
}
},
{
"entityType": "chart",
"entityUrn": "urn:li:chart:(powerbi,charts.23212598-23b5-4980-87cc-5fc0ecd84385)",
"changeType": "UPSERT",
"aspectName": "browsePaths",
"aspect": {
"json": {
"paths": [
"/powerbi/demo-workspace"
]
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test"
}
},
{
"entityType": "dashboard",
"entityUrn": "urn:li:dashboard:(powerbi,dashboards.7D668CAD-7FFC-4505-9215-655BCA5BEBAE)",
"changeType": "UPSERT",
"aspectName": "browsePaths",
"aspect": {
"json": {
"paths": [
"/powerbi/demo-workspace"
]
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test"
}
},
{
"entityType": "dashboard",
"entityUrn": "urn:li:dashboard:(powerbi,dashboards.7D668CAD-7FFC-4505-9215-655BCA5BEBAE)",
"changeType": "UPSERT",
"aspectName": "dashboardInfo",
"aspect": {
"json": {
"customProperties": {
"chartCount": "2",
"workspaceName": "demo-workspace",
"workspaceId": "64ED5CAD-7C10-4684-8180-826122881108"
},
"title": "test_dashboard",
"description": "Description of test dashboard",
"charts": [
"urn:li:chart:(powerbi,charts.B8E293DC-0C83-4AA0-9BB9-0A8738DF24A0)",
"urn:li:chart:(powerbi,charts.23212598-23b5-4980-87cc-5fc0ecd84385)"
],
"datasets": [],
"lastModified": {
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
}
},
"dashboardUrl": "https://localhost/dashboards/web/1"
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test"
}
},
{
"entityType": "dashboard",
"entityUrn": "urn:li:dashboard:(powerbi,dashboards.7D668CAD-7FFC-4505-9215-655BCA5BEBAE)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test"
}
},
{
"entityType": "dashboard",
"entityUrn": "urn:li:dashboard:(powerbi,dashboards.7D668CAD-7FFC-4505-9215-655BCA5BEBAE)",
"changeType": "UPSERT",
"aspectName": "dashboardKey",
"aspect": {
"json": {
"dashboardTool": "powerbi",
"dashboardId": "powerbi.linkedin.com/dashboards/7D668CAD-7FFC-4505-9215-655BCA5BEBAE"
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test"
}
},
{
"entityType": "dashboard",
"entityUrn": "urn:li:dashboard:(powerbi,dashboards.7D668CAD-7FFC-4505-9215-655BCA5BEBAE)",
"changeType": "UPSERT",
"aspectName": "ownership",
"aspect": {
"json": {
"owners": [
{
"owner": "urn:li:corpuser:users.User1@foo.com",
"type": "NONE"
},
{
"owner": "urn:li:corpuser:users.User2@foo.com",
"type": "NONE"
}
],
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
}
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test"
}
},
{
"entityType": "corpuser",
"entityUrn": "urn:li:corpuser:users.User1@foo.com",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test"
}
},
{
"entityType": "corpuser",
"entityUrn": "urn:li:corpuser:users.User2@foo.com",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test"
}
}
]

View File

@ -14,23 +14,23 @@ from datahub.ingestion.source.powerbi.m_query.resolver import (
)
M_QUERIES = [
'let\n Source = Snowflake.Databases("bu20658.ap-southeast-2.snowflakecomputing.com","PBI_TEST_WAREHOUSE_PROD",[Role="PBI_TEST_MEMBER"]),\n PBI_TEST_Database = Source{[Name="PBI_TEST",Kind="Database"]}[Data],\n TEST_Schema = PBI_TEST_Database{[Name="TEST",Kind="Schema"]}[Data],\n TESTTABLE_Table = TEST_Schema{[Name="TESTTABLE",Kind="Table"]}[Data]\nin\n TESTTABLE_Table',
'let\n Source = Value.NativeQuery(Snowflake.Databases("bu20658.ap-southeast-2.snowflakecomputing.com","operations_analytics_warehouse_prod",[Role="OPERATIONS_ANALYTICS_MEMBER"]){[Name="OPERATIONS_ANALYTICS"]}[Data], "SELECT#(lf)concat((UPPER(REPLACE(SELLER,\'-\',\'\'))), MONTHID) as AGENT_KEY,#(lf)concat((UPPER(REPLACE(CLIENT_DIRECTOR,\'-\',\'\'))), MONTHID) as CD_AGENT_KEY,#(lf) *#(lf)FROM#(lf)OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_APS_SME_UNITS_V4", null, [EnableFolding=true]),\n #"ADDed Conditional Column" = Table.AddColumn(Source, "SME Units ENT", each if [DEAL_TYPE] = "SME Unit" then [UNIT] else 0),\n #"Added Conditional Column1" = Table.AddColumn(#"Added Conditional Column", "Banklink Units", each if [DEAL_TYPE] = "Banklink" then [UNIT] else 0),\n #"Removed Columns" = Table.RemoveColumns(#"Added Conditional Column1",{"Banklink Units"}),\n #"Added Custom" = Table.AddColumn(#"Removed Columns", "Banklink Units", each if [DEAL_TYPE] = "Banklink" and [SALES_TYPE] = "3 - Upsell"\nthen [UNIT]\n\nelse if [SALES_TYPE] = "Adjusted BL Migration"\nthen [UNIT]\n\nelse 0),\n #"Added Custom1" = Table.AddColumn(#"Added Custom", "SME Units in $ (*$361)", each if [DEAL_TYPE] = "SME Unit" \nand [SALES_TYPE] <> "4 - Renewal"\n then [UNIT] * 361\nelse 0),\n #"Added Custom2" = Table.AddColumn(#"Added Custom1", "Banklink in $ (*$148)", each [Banklink Units] * 148)\nin\n #"Added Custom2"',
'let\n Source = Value.NativeQuery(Snowflake.Databases("bu20 658.ap-southeast-2.snowflakecomputing.com","operations_analytics_warehouse_prod",[Role="OPERATIONS_ANALYTICS_MEMBER"]){[Name="OPERATIONS_ANALYTICS"]}[Data], "select #(lf)UPPER(REPLACE(AGENT_NAME,\'-\',\'\')) AS Agent,#(lf)TIER,#(lf)UPPER(MANAGER),#(lf)TEAM_TYPE,#(lf)DATE_TARGET,#(lf)MONTHID,#(lf)TARGET_TEAM,#(lf)SELLER_EMAIL,#(lf)concat((UPPER(REPLACE(AGENT_NAME,\'-\',\'\'))), MONTHID) as AGENT_KEY,#(lf)UNIT_TARGET AS SME_Quota,#(lf)AMV_TARGET AS Revenue_Quota,#(lf)SERVICE_QUOTA,#(lf)BL_TARGET,#(lf)SOFTWARE_QUOTA as Software_Quota#(lf)#(lf)from OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_SME_UNIT_TARGETS#(lf)#(lf)where YEAR_TARGET >= 2022#(lf)and TEAM_TYPE = \'Accounting\'#(lf)and TARGET_TEAM = \'Enterprise\'", null, [EnableFolding=true]),\n #"Added Conditional Column" = Table.AddColumn(Source, "Has PS Software Quota?", each if [TIER] = "Expansion (Medium)" then "Yes" else if [TIER] = "Acquisition" then "Yes" else "No")\nin\n #"Added Conditional Column"',
'let\n Source = Snowflake.Databases("bu10758.ap-unknown-2.fakecomputing.com","PBI_TEST_WAREHOUSE_PROD",[Role="PBI_TEST_MEMBER"]),\n PBI_TEST_Database = Source{[Name="PBI_TEST",Kind="Database"]}[Data],\n TEST_Schema = PBI_TEST_Database{[Name="TEST",Kind="Schema"]}[Data],\n TESTTABLE_Table = TEST_Schema{[Name="TESTTABLE",Kind="Table"]}[Data]\nin\n TESTTABLE_Table',
'let\n Source = Value.NativeQuery(Snowflake.Databases("bu10758.ap-unknown-2.fakecomputing.com","operations_analytics_warehouse_prod",[Role="OPERATIONS_ANALYTICS_MEMBER"]){[Name="OPERATIONS_ANALYTICS"]}[Data], "SELECT#(lf)concat((UPPER(REPLACE(SELLER,\'-\',\'\'))), MONTHID) as AGENT_KEY,#(lf)concat((UPPER(REPLACE(CLIENT_DIRECTOR,\'-\',\'\'))), MONTHID) as CD_AGENT_KEY,#(lf) *#(lf)FROM#(lf)OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_APS_SME_UNITS_V4", null, [EnableFolding=true]),\n #"ADDed Conditional Column" = Table.AddColumn(Source, "SME Units ENT", each if [DEAL_TYPE] = "SME Unit" then [UNIT] else 0),\n #"Added Conditional Column1" = Table.AddColumn(#"Added Conditional Column", "Banklink Units", each if [DEAL_TYPE] = "Banklink" then [UNIT] else 0),\n #"Removed Columns" = Table.RemoveColumns(#"Added Conditional Column1",{"Banklink Units"}),\n #"Added Custom" = Table.AddColumn(#"Removed Columns", "Banklink Units", each if [DEAL_TYPE] = "Banklink" and [SALES_TYPE] = "3 - Upsell"\nthen [UNIT]\n\nelse if [SALES_TYPE] = "Adjusted BL Migration"\nthen [UNIT]\n\nelse 0),\n #"Added Custom1" = Table.AddColumn(#"Added Custom", "SME Units in $ (*$361)", each if [DEAL_TYPE] = "SME Unit" \nand [SALES_TYPE] <> "4 - Renewal"\n then [UNIT] * 361\nelse 0),\n #"Added Custom2" = Table.AddColumn(#"Added Custom1", "Banklink in $ (*$148)", each [Banklink Units] * 148)\nin\n #"Added Custom2"',
'let\n Source = Value.NativeQuery(Snowflake.Databases("bu10758.ap-unknown-2.fakecomputing.com","operations_analytics_warehouse_prod",[Role="OPERATIONS_ANALYTICS_MEMBER"]){[Name="OPERATIONS_ANALYTICS"]}[Data], "select #(lf)UPPER(REPLACE(AGENT_NAME,\'-\',\'\')) AS Agent,#(lf)TIER,#(lf)UPPER(MANAGER),#(lf)TEAM_TYPE,#(lf)DATE_TARGET,#(lf)MONTHID,#(lf)TARGET_TEAM,#(lf)SELLER_EMAIL,#(lf)concat((UPPER(REPLACE(AGENT_NAME,\'-\',\'\'))), MONTHID) as AGENT_KEY,#(lf)UNIT_TARGET AS SME_Quota,#(lf)AMV_TARGET AS Revenue_Quota,#(lf)SERVICE_QUOTA,#(lf)BL_TARGET,#(lf)SOFTWARE_QUOTA as Software_Quota#(lf)#(lf)from OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_SME_UNIT_TARGETS#(lf)#(lf)where YEAR_TARGET >= 2022#(lf)and TEAM_TYPE = \'Accounting\'#(lf)and TARGET_TEAM = \'Enterprise\'", null, [EnableFolding=true]),\n #"Added Conditional Column" = Table.AddColumn(Source, "Has PS Software Quota?", each if [TIER] = "Expansion (Medium)" then "Yes" else if [TIER] = "Acquisition" then "Yes" else "No")\nin\n #"Added Conditional Column"',
'let\n Source = Sql.Database("AUPRDWHDB", "COMMOPSDB", [Query="select *#(lf),concat((UPPER(REPLACE(CLIENT_MANAGER_QUOTED,\'-\',\'\'))), MONTHID) as AGENT_KEY#(lf),concat((UPPER(REPLACE(CLIENT_DIRECTOR,\'-\',\'\'))), MONTHID) as CD_AGENT_KEY#(lf)#(lf)from V_OIP_ENT_2022"]),\n #"Added Custom" = Table.AddColumn(Source, "OIP in $(*$350)", each [SALES_INVOICE_AMOUNT] * 350),\n #"Changed Type" = Table.TransformColumnTypes(#"Added Custom",{{"OIP in $(*$350)", type number}})\nin\n #"Changed Type"',
'let\n Source = Sql.Database("AUPRDWHDB", "COMMOPSDB", [Query="Select *,#(lf)#(lf)concat((UPPER(REPLACE(CLIENT_MANAGER_QUOTED,\'-\',\'\'))), #(lf)LEFT(CAST(DTE AS DATE),4)+LEFT(RIGHT(CAST(DTE AS DATE),5),2)) AS AGENT_KEY,#(lf)concat((UPPER(REPLACE(CLIENT_DIRECTOR,\'-\',\'\'))), #(lf)LEFT(CAST(DTE AS DATE),4)+LEFT(RIGHT(CAST(DTE AS DATE),5),2)) AS CD_AGENT_KEY#(lf)#(lf)from V_INVOICE_BOOKING_2022"]),\n #"Changed Type" = Table.TransformColumnTypes(Source,{{"CLIENT_ID", Int64.Type}}),\n #"Added Conditional Column" = Table.AddColumn(#"Changed Type", "PS Software (One-Off)", each if Text.Contains([REVENUE_TYPE], "Software") then [Inv_Amt] else if Text.Contains([REVENUE_TYPE], "Tax Seminar") then [Inv_Amt] else 0),\n #"Filtered Rows" = Table.SelectRows(#"Added Conditional Column", each true),\n #"Duplicated Column" = Table.DuplicateColumn(#"Filtered Rows", "CLIENT_ID", "CLIENT_ID - Copy"),\n #"Changed Type1" = Table.TransformColumnTypes(#"Duplicated Column",{{"CLIENT_ID - Copy", type text}}),\n #"Renamed Columns" = Table.RenameColumns(#"Changed Type1",{{"CLIENT_ID - Copy", "CLIENT_ID for Filter"}})\nin\n #"Renamed Columns"',
'let\n Source = Sql.Database("AUPRDWHDB", "COMMOPSDB", [Query="SELECT *,#(lf)concat((UPPER(REPLACE(CLIENT_MANAGER_CLOSING_MONTH,\'-\',\'\'))), #(lf)LEFT(CAST(MONTH_DATE AS DATE),4)+LEFT(RIGHT(CAST(MONTH_DATE AS DATE),5),2)) AS AGENT_KEY#(lf)#(lf)FROM dbo.V_ARR_ADDS"]),\n #"Changed Type" = Table.TransformColumnTypes(Source,{{"MONTH_DATE", type date}}),\n #"Added Custom" = Table.AddColumn(#"Changed Type", "Month", each Date.Month([MONTH_DATE]))\nin\n #"Added Custom"',
"let\n Source = Value.NativeQuery(Snowflake.Databases(\"bu20658.ap-southeast-2.snowflakecomputing.com\",\"operations_analytics_warehouse_prod\",[Role=\"OPERATIONS_ANALYTICS_MEMBER\"]){[Name=\"OPERATIONS_ANALYTICS\"]}[Data], \"select #(lf)UPPER(REPLACE(AGENT_NAME,'-','')) AS CLIENT_DIRECTOR,#(lf)TIER,#(lf)UPPER(MANAGER),#(lf)TEAM_TYPE,#(lf)DATE_TARGET,#(lf)MONTHID,#(lf)TARGET_TEAM,#(lf)SELLER_EMAIL,#(lf)concat((UPPER(REPLACE(AGENT_NAME,'-',''))), MONTHID) as AGENT_KEY,#(lf)UNIT_TARGET AS SME_Quota,#(lf)AMV_TARGET AS Revenue_Quota,#(lf)SERVICE_QUOTA,#(lf)BL_TARGET,#(lf)SOFTWARE_QUOTA as Software_Quota#(lf)#(lf)from OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_SME_UNIT_TARGETS#(lf)#(lf)where YEAR_TARGET >= 2022#(lf)and TEAM_TYPE = 'Accounting'#(lf)and TARGET_TEAM = 'Enterprise'#(lf)AND TIER = 'Client Director'\", null, [EnableFolding=true])\nin\n Source",
"let\n Source = Value.NativeQuery(Snowflake.Databases(\"bu10758.ap-unknown-2.fakecomputing.com\",\"operations_analytics_warehouse_prod\",[Role=\"OPERATIONS_ANALYTICS_MEMBER\"]){[Name=\"OPERATIONS_ANALYTICS\"]}[Data], \"select #(lf)UPPER(REPLACE(AGENT_NAME,'-','')) AS CLIENT_DIRECTOR,#(lf)TIER,#(lf)UPPER(MANAGER),#(lf)TEAM_TYPE,#(lf)DATE_TARGET,#(lf)MONTHID,#(lf)TARGET_TEAM,#(lf)SELLER_EMAIL,#(lf)concat((UPPER(REPLACE(AGENT_NAME,'-',''))), MONTHID) as AGENT_KEY,#(lf)UNIT_TARGET AS SME_Quota,#(lf)AMV_TARGET AS Revenue_Quota,#(lf)SERVICE_QUOTA,#(lf)BL_TARGET,#(lf)SOFTWARE_QUOTA as Software_Quota#(lf)#(lf)from OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_SME_UNIT_TARGETS#(lf)#(lf)where YEAR_TARGET >= 2022#(lf)and TEAM_TYPE = 'Accounting'#(lf)and TARGET_TEAM = 'Enterprise'#(lf)AND TIER = 'Client Director'\", null, [EnableFolding=true])\nin\n Source",
'let\n Source = Sql.Database("AUPRDWHDB", "COMMOPSDB", [Query="select *,#(lf)concat((UPPER(REPLACE(CLIENT_DIRECTOR,\'-\',\'\'))), MONTH_WID) as CD_AGENT_KEY,#(lf)concat((UPPER(REPLACE(CLIENT_MANAGER_CLOSING_MONTH,\'-\',\'\'))), MONTH_WID) as AGENT_KEY#(lf)#(lf)from V_PS_CD_RETENTION", CommandTimeout=#duration(0, 1, 30, 0)]),\n #"Changed Type" = Table.TransformColumnTypes(Source,{{"mth_date", type date}}),\n #"Added Custom" = Table.AddColumn(#"Changed Type", "Month", each Date.Month([mth_date])),\n #"Added Custom1" = Table.AddColumn(#"Added Custom", "TPV Opening", each if [Month] = 1 then [TPV_AMV_OPENING]\nelse if [Month] = 2 then 0\nelse if [Month] = 3 then 0\nelse if [Month] = 4 then [TPV_AMV_OPENING]\nelse if [Month] = 5 then 0\nelse if [Month] = 6 then 0\nelse if [Month] = 7 then [TPV_AMV_OPENING]\nelse if [Month] = 8 then 0\nelse if [Month] = 9 then 0\nelse if [Month] = 10 then [TPV_AMV_OPENING]\nelse if [Month] = 11 then 0\nelse if [Month] = 12 then 0\n\nelse 0)\nin\n #"Added Custom1"',
'let\n Source = Sql.Database("AUPRDWHDB", "COMMOPSDB", [Query="select#(lf)CLIENT_ID,#(lf)PARTNER_ACCOUNT_NAME,#(lf)CM_CLOSING_MNTH_COUNTRY,#(lf)MONTH_WID,#(lf)PS_DELETES,#(lf)CLIENT_MANAGER_CLOSING_MONTH,#(lf)SME_DELETES,#(lf)TPV_AMV_OPENING,#(lf)concat((UPPER(REPLACE(CLIENT_MANAGER_CLOSING_MONTH,\'-\',\'\'))), MONTH_WID) as AGENT_KEY#(lf)#(lf)from V_TPV_LEADERBOARD", CommandTimeout=#duration(0, 1, 30, 0)]),\n #"Changed Type" = Table.TransformColumnTypes(Source,{{"MONTH_WID", type text}}),\n #"Added Custom" = Table.AddColumn(#"Changed Type", "MONTH_DATE", each Date.FromText(\nText.Range([MONTH_WID], 0,4) & "-" &\nText.Range([MONTH_WID], 4,2)\n)),\n #"Added Custom2" = Table.AddColumn(#"Added Custom", "Month", each Date.Month([MONTH_DATE])),\n #"Added Custom1" = Table.AddColumn(#"Added Custom2", "TPV Opening", each if [Month] = 1 then [TPV_AMV_OPENING]\nelse if [Month] = 2 then 0\nelse if [Month] = 3 then 0\nelse if [Month] = 4 then [TPV_AMV_OPENING]\nelse if [Month] = 5 then 0\nelse if [Month] = 6 then 0\nelse if [Month] = 7 then [TPV_AMV_OPENING]\nelse if [Month] = 8 then 0\nelse if [Month] = 9 then 0\nelse if [Month] = 10 then [TPV_AMV_OPENING]\nelse if [Month] = 11 then 0\nelse if [Month] = 12 then 0\n\nelse 0)\nin\n #"Added Custom1"',
'let\n Source = Snowflake.Databases("bu20658.ap-southeast-2.snowflakecomputing.com","OPERATIONS_ANALYTICS_WAREHOUSE_PROD",[Role="OPERATIONS_ANALYTICS_MEMBER_AD"]),\n OPERATIONS_ANALYTICS_Database = Source{[Name="OPERATIONS_ANALYTICS",Kind="Database"]}[Data],\n TEST_Schema = OPERATIONS_ANALYTICS_Database{[Name="TEST",Kind="Schema"]}[Data],\n LZ_MIGRATION_DOWNLOAD_View = TEST_Schema{[Name="LZ_MIGRATION_DOWNLOAD",Kind="View"]}[Data],\n #"Changed Type" = Table.TransformColumnTypes(LZ_MIGRATION_DOWNLOAD_View,{{"MIGRATION_MONTH_ID", type text}}),\n #"Added Custom" = Table.AddColumn(#"Changed Type", "Migration Month", each Date.FromText(\nText.Range([MIGRATION_MONTH_ID], 0,4) & "-" & \nText.Range([MIGRATION_MONTH_ID], 4,2) \n)),\n #"Changed Type1" = Table.TransformColumnTypes(#"Added Custom",{{"Migration Month", type date}})\nin\n #"Changed Type1"',
"let\n Source = Value.NativeQuery(Snowflake.Databases(\"bu20658.ap-southeast-2.snowflakecomputing.com\",\"operations_analytics_warehouse_prod\",[Role=\"OPERATIONS_ANALYTICS_MEMBER\"]){[Name=\"OPERATIONS_ANALYTICS\"]}[Data], \"select *,#(lf)UPPER(REPLACE(AGENT_NAME,'-','')) AS Agent,#(lf)concat((UPPER(REPLACE(AGENT_NAME,'-',''))), MONTHID) as AGENT_KEY#(lf)#(lf)from OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_SME_UNIT_TARGETS#(lf)#(lf)where YEAR_TARGET >= 2022#(lf)and TEAM_TYPE = 'Industries'#(lf)and TARGET_TEAM = 'Enterprise'\", null, [EnableFolding=true])\nin\n Source",
'let\n Source = Snowflake.Databases("bu10758.ap-unknown-2.fakecomputing.com","OPERATIONS_ANALYTICS_WAREHOUSE_PROD",[Role="OPERATIONS_ANALYTICS_MEMBER_AD"]),\n OPERATIONS_ANALYTICS_Database = Source{[Name="OPERATIONS_ANALYTICS",Kind="Database"]}[Data],\n TEST_Schema = OPERATIONS_ANALYTICS_Database{[Name="TEST",Kind="Schema"]}[Data],\n LZ_MIGRATION_DOWNLOAD_View = TEST_Schema{[Name="LZ_MIGRATION_DOWNLOAD",Kind="View"]}[Data],\n #"Changed Type" = Table.TransformColumnTypes(LZ_MIGRATION_DOWNLOAD_View,{{"MIGRATION_MONTH_ID", type text}}),\n #"Added Custom" = Table.AddColumn(#"Changed Type", "Migration Month", each Date.FromText(\nText.Range([MIGRATION_MONTH_ID], 0,4) & "-" & \nText.Range([MIGRATION_MONTH_ID], 4,2) \n)),\n #"Changed Type1" = Table.TransformColumnTypes(#"Added Custom",{{"Migration Month", type date}})\nin\n #"Changed Type1"',
"let\n Source = Value.NativeQuery(Snowflake.Databases(\"bu10758.ap-unknown-2.fakecomputing.com\",\"operations_analytics_warehouse_prod\",[Role=\"OPERATIONS_ANALYTICS_MEMBER\"]){[Name=\"OPERATIONS_ANALYTICS\"]}[Data], \"select *,#(lf)UPPER(REPLACE(AGENT_NAME,'-','')) AS Agent,#(lf)concat((UPPER(REPLACE(AGENT_NAME,'-',''))), MONTHID) as AGENT_KEY#(lf)#(lf)from OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_SME_UNIT_TARGETS#(lf)#(lf)where YEAR_TARGET >= 2022#(lf)and TEAM_TYPE = 'Industries'#(lf)and TARGET_TEAM = 'Enterprise'\", null, [EnableFolding=true])\nin\n Source",
'let\n Source = Sql.Database("AUPRDWHDB", "COMMOPSDB", [Query="Select#(lf)*,#(lf)concat((UPPER(REPLACE(SALES_SPECIALIST,\'-\',\'\'))),#(lf)LEFT(CAST(INVOICE_DATE AS DATE),4)+LEFT(RIGHT(CAST(INVOICE_DATE AS DATE),5),2)) AS AGENT_KEY,#(lf)CASE#(lf) WHEN CLASS = \'Software\' and (NOT(PRODUCT in (\'ADV\', \'Adv\') and left(ACCOUNT_ID,2)=\'10\') #(lf) or V_ENTERPRISE_INVOICED_REVENUE.TYPE = \'Manual Adjustment\') THEN INVOICE_AMOUNT#(lf) WHEN V_ENTERPRISE_INVOICED_REVENUE.TYPE IN (\'Recurring\',\'0\') THEN INVOICE_AMOUNT#(lf) ELSE 0#(lf)END as SOFTWARE_INV#(lf)#(lf)from V_ENTERPRISE_INVOICED_REVENUE", CommandTimeout=#duration(0, 1, 30, 0)]),\n #"Added Conditional Column" = Table.AddColumn(Source, "Services", each if [CLASS] = "Services" then [INVOICE_AMOUNT] else 0),\n #"Added Custom" = Table.AddColumn(#"Added Conditional Column", "Advanced New Sites", each if [PRODUCT] = "ADV"\nor [PRODUCT] = "Adv"\nthen [NEW_SITE]\nelse 0)\nin\n #"Added Custom"',
'let\n Source = Snowflake.Databases("xaa48144.snowflakecomputing.com","GSL_TEST_WH",[Role="ACCOUNTADMIN"]),\n Source2 = PostgreSQL.Database("localhost", "mics"),\n public_order_date = Source2{[Schema="public",Item="order_date"]}[Data],\n GSL_TEST_DB_Database = Source{[Name="GSL_TEST_DB",Kind="Database"]}[Data],\n PUBLIC_Schema = GSL_TEST_DB_Database{[Name="PUBLIC",Kind="Schema"]}[Data],\n SALES_ANALYST_VIEW_View = PUBLIC_Schema{[Name="SALES_ANALYST_VIEW",Kind="View"]}[Data],\n two_source_table = Table.Combine({public_order_date, SALES_ANALYST_VIEW_View})\n in\n two_source_table',
'let\n Source = Snowflake.Databases("ghh48144.snowflakefakecomputing.com","GSL_TEST_WH",[Role="ACCOUNTADMIN"]),\n Source2 = PostgreSQL.Database("localhost", "mics"),\n public_order_date = Source2{[Schema="public",Item="order_date"]}[Data],\n GSL_TEST_DB_Database = Source{[Name="GSL_TEST_DB",Kind="Database"]}[Data],\n PUBLIC_Schema = GSL_TEST_DB_Database{[Name="PUBLIC",Kind="Schema"]}[Data],\n SALES_ANALYST_VIEW_View = PUBLIC_Schema{[Name="SALES_ANALYST_VIEW",Kind="View"]}[Data],\n two_source_table = Table.Combine({public_order_date, SALES_ANALYST_VIEW_View})\n in\n two_source_table',
'let\n Source = PostgreSQL.Database("localhost" , "mics" ),\n public_order_date = Source{[Schema="public",Item="order_date"]}[Data] \n in \n public_order_date',
'let\n Source = Oracle.Database("localhost:1521/salesdb.GSLAB.COM", [HierarchicalNavigation=true]), HR = Source{[Schema="HR"]}[Data], EMPLOYEES1 = HR{[Name="EMPLOYEES"]}[Data] \n in EMPLOYEES1',
'let\n Source = Sql.Database("localhost", "library"),\n dbo_book_issue = Source{[Schema="dbo",Item="book_issue"]}[Data]\n in dbo_book_issue',
'let\n Source = Snowflake.Databases("xaa48144.snowflakecomputing.com","GSL_TEST_WH",[Role="ACCOUNTADMIN"]),\n GSL_TEST_DB_Database = Source{[Name="GSL_TEST_DB",Kind="Database"]}[Data],\n PUBLIC_Schema = GSL_TEST_DB_Database{[Name="PUBLIC",Kind="Schema"]}[Data],\n SALES_FORECAST_Table = PUBLIC_Schema{[Name="SALES_FORECAST",Kind="Table"]}[Data],\n SALES_ANALYST_Table = PUBLIC_Schema{[Name="SALES_ANALYST",Kind="Table"]}[Data],\n RESULT = Table.Combine({SALES_FORECAST_Table, SALES_ANALYST_Table})\n\nin\n RESULT',
'let\n Source = Snowflake.Databases("ghh48144.snowflakefakecomputing.com","GSL_TEST_WH",[Role="ACCOUNTADMIN"]),\n GSL_TEST_DB_Database = Source{[Name="GSL_TEST_DB",Kind="Database"]}[Data],\n PUBLIC_Schema = GSL_TEST_DB_Database{[Name="PUBLIC",Kind="Schema"]}[Data],\n SALES_FORECAST_Table = PUBLIC_Schema{[Name="SALES_FORECAST",Kind="Table"]}[Data],\n SALES_ANALYST_Table = PUBLIC_Schema{[Name="SALES_ANALYST",Kind="Table"]}[Data],\n RESULT = Table.Combine({SALES_FORECAST_Table, SALES_ANALYST_Table})\n\nin\n RESULT',
'let\n Source = GoogleBigQuery.Database(),\n #"seraphic-music-344307" = Source{[Name="seraphic-music-344307"]}[Data],\n school_dataset_Schema = #"seraphic-music-344307"{[Name="school_dataset",Kind="Schema"]}[Data],\n first_Table = school_dataset_Schema{[Name="first",Kind="Table"]}[Data]\nin\n first_Table',
'let \nSource = GoogleBigQuery.Database([BillingProject = #"Parameter - Source"]),\n#"gcp-project" = Source{[Name=#"Parameter - Source"]}[Data],\ngcp_billing_Schema = #"gcp-project"{[Name=#"My bq project",Kind="Schema"]}[Data],\nF_GCP_COST_Table = gcp_billing_Schema{[Name="GCP_TABLE",Kind="Table"]}[Data]\nin\nF_GCP_COST_Table',
'let\n Source = GoogleBigQuery.Database([BillingProject = #"Parameter - Source"]),\n#"gcp-project" = Source{[Name=#"Parameter - Source"]}[Data],\nuniversal_Schema = #"gcp-project"{[Name="universal",Kind="Schema"]}[Data],\nD_WH_DATE_Table = universal_Schema{[Name="D_WH_DATE",Kind="Table"]}[Data],\n#"Filtered Rows" = Table.SelectRows(D_WH_DATE_Table, each [D_DATE] > #datetime(2019, 9, 10, 0, 0, 0)),\n#"Filtered Rows1" = Table.SelectRows(#"Filtered Rows", each DateTime.IsInPreviousNHours([D_DATE], 87600))\n in \n#"Filtered Rows1"',
@ -149,6 +149,10 @@ def test_snowflake_regular_case():
assert len(data_platform_tables) == 1
assert data_platform_tables[0].name == "TESTTABLE"
assert data_platform_tables[0].full_name == "PBI_TEST.TEST.TESTTABLE"
assert (
data_platform_tables[0].datasource_server
== "bu10758.ap-unknown-2.fakecomputing.com"
)
assert (
data_platform_tables[0].data_platform_pair.powerbi_data_platform_name
== SupportedDataPlatform.SNOWFLAKE.value.powerbi_data_platform_name
@ -172,6 +176,7 @@ def test_postgres_regular_case():
assert len(data_platform_tables) == 1
assert data_platform_tables[0].name == "order_date"
assert data_platform_tables[0].full_name == "mics.public.order_date"
assert data_platform_tables[0].datasource_server == "localhost"
assert (
data_platform_tables[0].data_platform_pair.powerbi_data_platform_name
== SupportedDataPlatform.POSTGRES_SQL.value.powerbi_data_platform_name
@ -195,6 +200,7 @@ def test_oracle_regular_case():
assert len(data_platform_tables) == 1
assert data_platform_tables[0].name == "EMPLOYEES"
assert data_platform_tables[0].full_name == "salesdb.HR.EMPLOYEES"
assert data_platform_tables[0].datasource_server == "localhost:1521"
assert (
data_platform_tables[0].data_platform_pair.powerbi_data_platform_name
== SupportedDataPlatform.ORACLE.value.powerbi_data_platform_name
@ -219,6 +225,7 @@ def test_mssql_regular_case():
assert len(data_platform_tables) == 1
assert data_platform_tables[0].name == "book_issue"
assert data_platform_tables[0].full_name == "library.dbo.book_issue"
assert data_platform_tables[0].datasource_server == "localhost"
assert (
data_platform_tables[0].data_platform_pair.powerbi_data_platform_name
== SupportedDataPlatform.MS_SQL.value.powerbi_data_platform_name
@ -259,6 +266,7 @@ def test_mssql_with_query():
assert len(data_platform_tables) == 1
assert data_platform_tables[0].name == expected_tables[index].split(".")[2]
assert data_platform_tables[0].full_name == expected_tables[index]
assert data_platform_tables[0].datasource_server == "AUPRDWHDB"
assert (
data_platform_tables[0].data_platform_pair.powerbi_data_platform_name
== SupportedDataPlatform.MS_SQL.value.powerbi_data_platform_name
@ -296,6 +304,10 @@ def test_snowflake_native_query():
assert len(data_platform_tables) == 1
assert data_platform_tables[0].name == expected_tables[index].split(".")[2]
assert data_platform_tables[0].full_name == expected_tables[index]
assert (
data_platform_tables[0].datasource_server
== "bu10758.ap-unknown-2.fakecomputing.com"
)
assert (
data_platform_tables[0].data_platform_pair.powerbi_data_platform_name
== SupportedDataPlatform.SNOWFLAKE.value.powerbi_data_platform_name
@ -316,6 +328,7 @@ def test_google_bigquery_1():
assert len(data_platform_tables) == 1
assert data_platform_tables[0].name == table.full_name.split(".")[2]
assert data_platform_tables[0].full_name == table.full_name
assert data_platform_tables[0].datasource_server == "seraphic-music-344307"
assert (
data_platform_tables[0].data_platform_pair.powerbi_data_platform_name
== SupportedDataPlatform.GOOGLE_BIGQUERY.value.powerbi_data_platform_name
@ -346,6 +359,7 @@ def test_google_bigquery_2():
assert len(data_platform_tables) == 1
assert data_platform_tables[0].name == table.full_name.split(".")[2]
assert data_platform_tables[0].full_name == table.full_name
assert data_platform_tables[0].datasource_server == "my-test-project"
assert (
data_platform_tables[0].data_platform_pair.powerbi_data_platform_name
== SupportedDataPlatform.GOOGLE_BIGQUERY.value.powerbi_data_platform_name
@ -373,6 +387,7 @@ def test_for_each_expression_1():
assert len(data_platform_tables) == 1
assert data_platform_tables[0].name == table.full_name.split(".")[2]
assert data_platform_tables[0].datasource_server == "my-test-project"
assert data_platform_tables[0].full_name == table.full_name
assert (
data_platform_tables[0].data_platform_pair.powerbi_data_platform_name
@ -401,6 +416,7 @@ def test_for_each_expression_2():
assert len(data_platform_tables) == 1
assert data_platform_tables[0].name == table.full_name.split(".")[2]
assert data_platform_tables[0].full_name == table.full_name
assert data_platform_tables[0].datasource_server == "dwh-prod"
assert (
data_platform_tables[0].data_platform_pair.powerbi_data_platform_name
== SupportedDataPlatform.GOOGLE_BIGQUERY.value.powerbi_data_platform_name
@ -439,12 +455,17 @@ def test_multi_source_table():
assert len(data_platform_tables) == 2
assert data_platform_tables[0].full_name == "mics.public.order_date"
assert data_platform_tables[0].datasource_server == "localhost"
assert (
data_platform_tables[0].data_platform_pair.powerbi_data_platform_name
== SupportedDataPlatform.POSTGRES_SQL.value.powerbi_data_platform_name
)
assert data_platform_tables[1].full_name == "GSL_TEST_DB.PUBLIC.SALES_ANALYST_VIEW"
assert (
data_platform_tables[1].datasource_server
== "ghh48144.snowflakefakecomputing.com"
)
assert (
data_platform_tables[1].data_platform_pair.powerbi_data_platform_name
== SupportedDataPlatform.SNOWFLAKE.value.powerbi_data_platform_name
@ -467,12 +488,20 @@ def test_table_combine():
assert len(data_platform_tables) == 2
assert data_platform_tables[0].full_name == "GSL_TEST_DB.PUBLIC.SALES_FORECAST"
assert (
data_platform_tables[0].datasource_server
== "ghh48144.snowflakefakecomputing.com"
)
assert (
data_platform_tables[0].data_platform_pair.powerbi_data_platform_name
== SupportedDataPlatform.SNOWFLAKE.value.powerbi_data_platform_name
)
assert data_platform_tables[1].full_name == "GSL_TEST_DB.PUBLIC.SALES_ANALYST"
assert (
data_platform_tables[1].datasource_server
== "ghh48144.snowflakefakecomputing.com"
)
assert (
data_platform_tables[1].data_platform_pair.powerbi_data_platform_name
== SupportedDataPlatform.SNOWFLAKE.value.powerbi_data_platform_name

View File

@ -1,12 +1,17 @@
import logging
import sys
from typing import Any, Dict
from typing import Any, Dict, cast
from unittest import mock
import pytest
from freezegun import freeze_time
from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.source.powerbi.config import (
PowerBiDashboardSourceConfig,
SupportedDataPlatform,
)
from datahub.ingestion.source.powerbi.powerbi import PowerBiDashboardSource
from tests.test_helpers import mce_helpers
FROZEN_TIME = "2022-02-03 07:00:00"
@ -1022,3 +1027,164 @@ def test_workspace_container(
output_path=tmp_path / "powerbi_container_mces.json",
golden_path=f"{test_resources_dir}/{mce_out_file}",
)
def dataset_type_mapping_set_to_all_platform(pipeline: Pipeline) -> None:
source_config: PowerBiDashboardSourceConfig = cast(
PowerBiDashboardSource, pipeline.source
).source_config
assert source_config.dataset_type_mapping is not None
# Generate default dataset_type_mapping and compare it with source_config.dataset_type_mapping
default_dataset_type_mapping: dict = {}
for item in SupportedDataPlatform:
default_dataset_type_mapping[
item.value.powerbi_data_platform_name
] = item.value.datahub_data_platform_name
assert default_dataset_type_mapping == source_config.dataset_type_mapping
@freeze_time(FROZEN_TIME)
@mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca)
@pytest.mark.integration
def test_dataset_type_mapping_should_set_to_all(
mock_msal, pytestconfig, tmp_path, mock_time, requests_mock
):
"""
Here we don't need to run the pipeline. We need to verify dataset_type_mapping is set to default dataplatform
"""
register_mock_api(request_mock=requests_mock)
new_config: dict = {**default_source_config()}
del new_config["dataset_type_mapping"]
pipeline = Pipeline.create(
{
"run_id": "powerbi-test",
"source": {
"type": "powerbi",
"config": {
**new_config,
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/powerbi_lower_case_urn_mces.json",
},
},
}
)
dataset_type_mapping_set_to_all_platform(pipeline)
@freeze_time(FROZEN_TIME)
@mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca)
@pytest.mark.integration
def test_dataset_type_mapping_error(
mock_msal, pytestconfig, tmp_path, mock_time, requests_mock
):
"""
Here we don't need to run the pipeline. We need to verify if both dataset_type_mapping and server_to_platform_instance
are set then value error should get raised
"""
register_mock_api(request_mock=requests_mock)
try:
Pipeline.create(
{
"run_id": "powerbi-test",
"source": {
"type": "powerbi",
"config": {
**default_source_config(),
"server_to_platform_instance": {
"localhost": {
"platform_instance": "test",
}
},
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/powerbi_lower_case_urn_mces.json",
},
},
}
)
except Exception as e:
assert (
"dataset_type_mapping is deprecated. Use server_to_platform_instance only."
in str(e)
)
@freeze_time(FROZEN_TIME)
@mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca)
def test_server_to_platform_map(
mock_msal, pytestconfig, tmp_path, mock_time, requests_mock
):
enable_logging()
test_resources_dir = pytestconfig.rootpath / "tests/integration/powerbi"
new_config: dict = {
**default_source_config(),
"extract_lineage": True,
"convert_lineage_urns_to_lowercase": True,
}
del new_config["dataset_type_mapping"]
new_config["server_to_platform_instance"] = {
"hp123rt5.ap-southeast-2.fakecomputing.com": {
"platform_instance": "snowflake_production_instance",
"env": "PROD",
},
"my-test-project": {
"platform_instance": "bigquery-computing-dev-account",
"env": "QA",
},
"localhost:1521": {"platform_instance": "oracle-sales-instance", "env": "PROD"},
}
register_mock_api(request_mock=requests_mock)
output_path: str = f"{tmp_path}/powerbi_server_to_platform_instance_mces.json"
pipeline = Pipeline.create(
{
"run_id": "powerbi-test",
"source": {
"type": "powerbi",
"config": new_config,
},
"sink": {
"type": "file",
"config": {
"filename": output_path,
},
},
}
)
pipeline.run()
pipeline.raise_from_status()
golden_file_path: str = (
f"{test_resources_dir}/golden_test_server_to_platform_instance.json"
)
mce_helpers.check_golden_file(
pytestconfig,
output_path=output_path,
golden_path=golden_file_path,
)
# As server_to_platform_instance map is provided, the old dataset_type_mapping
# should be set to all supported platform
# to process all available upstream lineage even if mapping for platform instance is
# not provided in server_to_platform_instance map
dataset_type_mapping_set_to_all_platform(pipeline)