diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/dashboard/powerBIConnection.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/dashboard/powerBIConnection.json index 98b40e60dff..1c2ce9f1521 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/dashboard/powerBIConnection.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/dashboard/powerBIConnection.json @@ -31,15 +31,16 @@ "type": "string", "format": "password" }, - "credentials": { - "title": "Credentials", - "description": "Credentials for PowerBI.", + "tenantId": { + "title": "Tenant ID", + "description": "Tenant ID for PowerBI.", "type": "string" }, - "redirectURI": { - "title": "Redirect URI", - "description": "Dashboard redirect URI for the PowerBI service.", - "type": "string" + "authorityURI": { + "title": "Authority URI", + "description": "Authority URI for the PowerBI service.", + "type": "string", + "default": "https://login.microsoftonline.com/" }, "hostPort": { "title": "Host and Port", @@ -55,7 +56,7 @@ "items": { "type": "string" }, - "default": null + "default": ["https://analysis.windows.net/powerbi/api/.default"] }, "supportsMetadataExtraction": { "title": "Supports Metadata Extraction", @@ -63,5 +64,5 @@ } }, "additionalProperties": false, - "required": ["hostPort", "clientId", "clientSecret"] + "required": ["clientId", "clientSecret", "tenantId"] } diff --git a/ingestion/examples/workflows/powerbi.yaml b/ingestion/examples/workflows/powerbi.yaml index 6ce08c30c3e..cf0749a755c 100644 --- a/ingestion/examples/workflows/powerbi.yaml +++ b/ingestion/examples/workflows/powerbi.yaml @@ -1,19 +1,27 @@ source: type: powerbi - serviceName: local_powerbi + serviceName: local_power11 serviceConnection: config: clientId: client_id clientSecret: client_secret - redirectURI: http://localhost:8585/callback - hostPort: https://analysis.windows.net/powerbi + tenantId: tenant_id scope: - - scope - - https://analysis.windows.net/powerbi/api/App.Read.All - credentials: path + - https://analysis.windows.net/powerbi/api/.default type: PowerBI sourceConfig: - config: {} + config: + chartFilterPattern: + includes: + - Gross Margin % + - Total Defect* + - "Number" + excludes: + - Total Revenue + dashboardFilterPattern: + includes: + - Supplier Quality Analysis Sample + - "Customer" sink: type: metadata-rest config: {} diff --git a/ingestion/setup.py b/ingestion/setup.py index 765b7eb1c5a..a0dc87efe31 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -111,7 +111,7 @@ plugins: Dict[str, Set[str]] = { "mssql-odbc": {"pyodbc"}, "mysql": {"pymysql>=1.0.2"}, "oracle": {"cx_Oracle"}, - "powerbi": {"python-power-bi==0.1.2"}, + "powerbi": {"msal==1.17.0"}, "presto": {"pyhive~=0.6.3"}, "trino": {"trino[sqlalchemy]"}, "postgres": {"pymysql>=1.0.2", "psycopg2-binary", "GeoAlchemy2"}, diff --git a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_source.py b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_source.py index d2e7292426f..022c836be2c 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_source.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_source.py @@ -16,6 +16,7 @@ from metadata.ingestion.api.common import Entity from metadata.ingestion.api.source import Source, SourceStatus from metadata.ingestion.models.table_metadata import Chart, Dashboard from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.source.database.common_db_source import SQLSourceStatus from metadata.utils.connections import get_connection from metadata.utils.filters import filter_by_dashboard from metadata.utils.logger import ingestion_logger @@ -57,13 +58,13 @@ class DashboardSourceService(Source, ABC): @abstractmethod def process_charts(self) -> Optional[Iterable[Chart]]: """ - Metod to fetch Charts + Method to fetch Charts """ @abstractmethod def fetch_dashboard_charts(self, dashboard: Any) -> Optional[Iterable[Chart]]: """ - Metod to fetch charts linked to dashboard + Method to fetch charts linked to dashboard """ @abstractmethod @@ -80,18 +81,17 @@ class DashboardSourceService(Source, ABC): self.source_config: DashboardServiceMetadataPipeline = ( self.config.sourceConfig.config ) - self.connection = get_connection(self.service_connection) self.client = self.connection.client self.service = self.metadata.get_service_or_create( entity=DashboardService, config=config ) - self.status = SourceStatus() + self.status = SQLSourceStatus() self.metadata_client = OpenMetadata(self.metadata_config) def next_record(self) -> Iterable[Entity]: yield from self.process_dashboards() - yield from self.process_charts() + yield from self.process_charts() or [] def process_dashboards( self, diff --git a/ingestion/src/metadata/ingestion/source/dashboard/metabase.py b/ingestion/src/metadata/ingestion/source/dashboard/metabase.py index dfa3fd25874..bbed41e3c2d 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/metabase.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/metabase.py @@ -204,7 +204,7 @@ class MetabaseSource(DashboardSourceService): resp_tables = self.req_get(f"/api/table/{chart_details['table_id']}") if resp_tables.status_code == 200: table = resp_tables.json() - table_fqn = f"{self.service_connection.dbServiceName}.{table['schema']}.{table['name']}" + table_fqn = f"{self.source_config.dbServiceName}.{table['schema']}.{table['name']}" dashboard_fqn = f"{self.service.name}.{quote(dashboard_name)}" table_entity = metadata.get_by_name(entity=Table, fqn=table_fqn) chart_entity = metadata.get_by_name( diff --git a/ingestion/src/metadata/ingestion/source/dashboard/powerbi.py b/ingestion/src/metadata/ingestion/source/dashboard/powerbi.py index 15f43818c5d..1f69f302cda 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/powerbi.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/powerbi.py @@ -12,43 +12,42 @@ import traceback import uuid -from typing import Iterable, List, Optional - -from powerbi.client import PowerBiClient +from typing import Any, Iterable, List, Optional from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest +from metadata.generated.schema.entity.data.dashboard import ( + Dashboard as LineageDashboard, +) +from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.services.connections.dashboard.powerBIConnection import ( PowerBIConnection, ) from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) -from metadata.generated.schema.entity.services.dashboardService import DashboardService from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) +from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.generated.schema.type.entityReference import EntityReference -from metadata.ingestion.api.common import Entity -from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus +from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.models.table_metadata import Chart, Dashboard -from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.utils.filters import filter_by_chart, filter_by_dashboard +from metadata.ingestion.source.dashboard.dashboard_source import DashboardSourceService +from metadata.utils import fqn +from metadata.utils.filters import filter_by_chart from metadata.utils.logger import ingestion_logger logger = ingestion_logger() -class PowerbiSource(Source[Entity]): - """Powerbi entity class - +class PowerbiSource(DashboardSourceService): + """PowerBi entity class Args: config: metadata_config: Attributes: config: metadata_config: - status: - dashboard_service: charts: """ @@ -59,18 +58,9 @@ class PowerbiSource(Source[Entity]): ): super().__init__(config, metadata_config) - self.client = PowerBiClient( - client_id=self.service_connection_config.clientId, - client_secret=self.service_connection_config.clientSecret.get_secret_value(), - scope=self.service_connection_config.scope, - redirect_uri=self.service_connection_config.redirectURI, - credentials=self.service_connection_config.credentials, - ) - @classmethod def create(cls, config_dict, metadata_config: OpenMetadataConnection): """Instantiate object - Args: config_dict: metadata_config: @@ -89,70 +79,112 @@ class PowerbiSource(Source[Entity]): """ Get List of all dashboards """ - self.dashboard_service = self.client.dashboards() - dashboard_list = self.dashboard_service.get_dashboards() - return dashboard_list.get("value") + self.dashboards = self.client.fetch_dashboards().get("value") + return self.dashboards def get_dashboard_name(self, dashboard_details: dict) -> str: """ Get Dashboard Name """ - return dashboard_details["id"] + return dashboard_details["displayName"] def get_dashboard_details(self, dashboard: dict) -> dict: """ Get Dashboard Details """ - return self.dashboard_service.get_dashboard(dashboard["id"]) + return dashboard def get_dashboard_entity(self, dashboard_details: dict) -> Dashboard: """ Method to Get Dashboard Entity, Dashboard Charts & Lineage """ yield from self.fetch_dashboard_charts(dashboard_details) - return Dashboard( + yield Dashboard( name=dashboard_details["id"], url=dashboard_details["webUrl"], displayName=dashboard_details["displayName"], description="", charts=self.charts, - service=EntityReference( - id=self.dashboard_service.id, type="dashboardService" - ), + service=EntityReference(id=self.service.id, type="dashboardService"), ) - def get_lineage( - self, datasource_list: List, dashboard_name: str - ) -> AddLineageRequest: + def get_lineage(self, dashboard_details: Any) -> Optional[AddLineageRequest]: """ Get lineage between dashboard and data sources """ - logger.info("Lineage not implemented for Looker") + try: + charts = self.client.fetch_charts(dashboard_id=dashboard_details["id"]).get( + "value" + ) + for chart in charts: + dataset_id = chart.get("datasetId") + if dataset_id: + data_sources = self.client.fetch_data_sources(dataset_id=dataset_id) + for data_source in data_sources.get("value"): + database_name = data_source.get("connectionDetails").get( + "database" + ) - def fetch_charts(self) -> Iterable[Chart]: + from_fqn = fqn.build( + self.metadata, + entity_type=Database, + service_name=self.source_config.dbServiceName, + database_name=database_name, + ) + from_entity = self.metadata.get_by_name( + entity=Database, + fqn=from_fqn, + ) + to_fqn = fqn.build( + self.metadata, + entity_type=LineageDashboard, + service_name=self.config.serviceName, + dashboard_name=dashboard_details["id"], + ) + to_entity = self.metadata.get_by_name( + entity=LineageDashboard, + fqn=to_fqn, + ) + if from_entity and to_entity: + lineage = AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference( + id=from_entity.id.__root__, type="database" + ), + toEntity=EntityReference( + id=to_entity.id.__root__, type="dashboard" + ), + ) + ) + yield lineage + except Exception as err: # pylint: disable=broad-except + logger.debug(traceback.format_exc()) + logger.error(err) + + def process_charts(self) -> Iterable[Chart]: """ - Metod to fetch Charts + Method to fetch Charts """ - logger.info("Fetch Charts Not implemented for Looker") + logger.info("Fetch Charts Not implemented for PowerBi") def fetch_dashboard_charts(self, dashboard_details: dict) -> Iterable[Chart]: """Get chart method Args: - charts: + dashboard_details: Returns: Iterable[Chart] """ self.charts = [] - charts = self.dashboard_service.get_tiles( - dashboard_id=dashboard_details["id"] - ).get("value") + charts = self.client.fetch_charts(dashboard_id=dashboard_details["id"]).get( + "value" + ) for chart in charts: try: if filter_by_chart( self.source_config.chartFilterPattern, chart["title"] ): - self.status.failure( + self.status.filter( chart["title"], "Filtered out using Chart filter pattern" ) continue @@ -164,12 +196,12 @@ class PowerbiSource(Source[Entity]): chart_type="", url=chart["embedUrl"], service=EntityReference( - id=self.dashboard_service.id, type="dashboardService" + id=self.service.id, type="dashboardService" ), ) self.charts.append(chart["id"]) self.status.scanned(chart["title"]) except Exception as err: # pylint: disable=broad-except logger.debug(traceback.format_exc()) - logger.error(repr(err)) + logger.error(err) self.status.failure(chart["title"], repr(err)) diff --git a/ingestion/src/metadata/ingestion/source/database/sqlalchemy_source.py b/ingestion/src/metadata/ingestion/source/database/sqlalchemy_source.py index 9eb8afa5d99..48ee6df0dca 100644 --- a/ingestion/src/metadata/ingestion/source/database/sqlalchemy_source.py +++ b/ingestion/src/metadata/ingestion/source/database/sqlalchemy_source.py @@ -46,13 +46,13 @@ class SqlAlchemySource(Source, ABC): @abstractmethod def get_databases(self) -> Iterable[Inspector]: """ - Method Yields Inspector objects for each aviable database + Method Yields Inspector objects for each available database """ @abstractmethod def get_schemas(self, inspector: Inspector) -> str: """ - Method Yields schemas aviable in database + Method Yields schemas available in database """ @abstractmethod @@ -80,7 +80,7 @@ class SqlAlchemySource(Source, ABC): @abstractmethod def get_data_model(self, database: str, schema: str, table_name: str) -> DataModel: """ - Method to fetch data modles + Method to fetch data models """ @abstractmethod @@ -134,7 +134,7 @@ class SqlAlchemySource(Source, ABC): def get_database_entity(self) -> Database: """ - Method to get database enetity from db name + Method to get database entity from db name """ return Database( name=self._get_database_name(), @@ -145,7 +145,7 @@ class SqlAlchemySource(Source, ABC): def get_schema_entity(self, schema: str, database: Database) -> DatabaseSchema: """ - Method to get DatabaseSchema enetity from schema name and database entity + Method to get DatabaseSchema entity from schema name and database entity """ return DatabaseSchema( name=schema, @@ -157,7 +157,7 @@ class SqlAlchemySource(Source, ABC): def next_record(self) -> Iterable[Entity]: """ - Method to fetch all tables, views & mark deleet tables + Method to fetch all tables, views & mark delete tables """ for inspector in self.get_databases(): for schema in self.get_schemas(inspector): diff --git a/ingestion/src/metadata/utils/connection_clients.py b/ingestion/src/metadata/utils/connection_clients.py index d0464b68e7b..2c6d6e914b4 100644 --- a/ingestion/src/metadata/utils/connection_clients.py +++ b/ingestion/src/metadata/utils/connection_clients.py @@ -72,3 +72,9 @@ class SupersetClient: class TableauClient: def __init__(self, client) -> None: self.client = client + + +@dataclass +class PowerBiClient: + def __init__(self, client) -> None: + self.client = client diff --git a/ingestion/src/metadata/utils/connections.py b/ingestion/src/metadata/utils/connections.py index 278c7dd483e..c5e9fc5fe12 100644 --- a/ingestion/src/metadata/utils/connections.py +++ b/ingestion/src/metadata/utils/connections.py @@ -32,6 +32,9 @@ from metadata.generated.schema.entity.services.connections.connectionBasicType i from metadata.generated.schema.entity.services.connections.dashboard.metabaseConnection import ( MetabaseConnection, ) +from metadata.generated.schema.entity.services.connections.dashboard.powerBIConnection import ( + PowerBIConnection, +) from metadata.generated.schema.entity.services.connections.dashboard.redashConnection import ( RedashConnection, ) @@ -72,6 +75,7 @@ from metadata.utils.connection_clients import ( GlueClient, KafkaClient, MetabaseClient, + PowerBiClient, RedashClient, SalesforceClient, SupersetClient, @@ -518,3 +522,20 @@ def _(connection: TableauClient) -> None: raise SourceConnectionException( f"Unknown error connecting with {connection} - {err}." ) + + +@get_connection.register +def _(connection: PowerBIConnection, verbose: bool = False): + from metadata.utils.powerbi_client import PowerBiApiClient + + return PowerBiClient(PowerBiApiClient(connection)) + + +@test_connection.register +def _(connection: PowerBiClient) -> None: + try: + connection.client.fetch_dashboards() + except Exception as err: + raise SourceConnectionException( + f"Unknown error connecting with {connection} - {err}." + ) diff --git a/ingestion/src/metadata/utils/powerbi_client.py b/ingestion/src/metadata/utils/powerbi_client.py new file mode 100644 index 00000000000..e920fe40a75 --- /dev/null +++ b/ingestion/src/metadata/utils/powerbi_client.py @@ -0,0 +1,130 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +REST Auth & Client for PowerBi +""" +import traceback +from typing import Tuple + +import msal + +from metadata.ingestion.ometa.client import REST, ClientConfig +from metadata.utils.logger import utils_logger + +logger = utils_logger() + + +class PowerBiApiClient: + client: REST + + def __init__(self, config): + self.config = config + self.msal_client = msal.ConfidentialClientApplication( + client_id=self.config.clientId, + client_credential=self.config.clientSecret.get_secret_value(), + authority=self.config.authorityURI + self.config.tenantId, + ) + self.auth_token = self.get_auth_token() + client_config = ClientConfig( + base_url="https://api.powerbi.com", + api_version="v1.0", + auth_token=lambda: self.auth_token, + auth_header="Authorization", + allow_redirects=True, + ) + self.client = REST(client_config) + + def get_auth_token(self) -> Tuple[str, str]: + """ + Method to generate PowerBi access token + """ + logger.info("Generating PowerBi access token") + + auth_response = self.msal_client.acquire_token_silent( + scopes=self.config.scope, account=None + ) + + if not auth_response: + logger.info("Token does not exist in the cache. Getting a new token.") + auth_response = self.msal_client.acquire_token_for_client( + scopes=self.config.scope + ) + + if not auth_response.get("access_token"): + logger.error( + "Failed to generate the PowerBi access token. Please check provided config" + ) + raise Exception( + "Failed to generate the PowerBi access token. Please check provided config" + ) + + logger.info("PowerBi Access Token generated successfully") + access_token = auth_response.get("access_token") + expiry = auth_response.get("expires_in") + + return access_token, expiry + + def fetch_charts(self, dashboard_id: str) -> dict: + """Get charts method + + Args: + dashboard_id: + Returns: + dict + """ + try: + response = self.client.get(f"/myorg/admin/dashboards/{dashboard_id}/tiles") + return response + except Exception as err: # pylint: disable=broad-except + logger.error(err) + logger.debug(traceback.format_exc()) + + def fetch_dashboards(self) -> dict: + """Get dashboards method + + Returns: + dict + """ + try: + response = self.client.get(f"/myorg/admin/dashboards") + return response + except Exception as err: # pylint: disable=broad-except + logger.error(err) + logger.debug(traceback.format_exc()) + + def fetch_datasets(self) -> dict: + """Get datasets method + + Returns: + dict + """ + try: + response = self.client.get(f"/myorg/admin/datasets") + return response + except Exception as err: # pylint: disable=broad-except + logger.error(err) + logger.debug(traceback.format_exc()) + + def fetch_data_sources(self, dataset_id: str) -> dict: + """Get dataset by id method + Args: + dataset_id: + Returns: + dict + """ + try: + response = self.client.get( + f"/myorg/admin/datasets/{dataset_id}/datasources" + ) + return response + except Exception as err: # pylint: disable=broad-except + logger.error(err) + logger.debug(traceback.format_exc()) diff --git a/ingestion/tests/unit/source/test_source_parsing.py b/ingestion/tests/unit/source/test_source_parsing.py index d2853c0ca6a..f88e6d3e78c 100644 --- a/ingestion/tests/unit/source/test_source_parsing.py +++ b/ingestion/tests/unit/source/test_source_parsing.py @@ -546,17 +546,18 @@ def test_powerbi(): "config": { "clientId": "client_id", "clientSecret": "client_secret", - "redirectURI": "http://localhost:8585/callback", - "hostPort": "https://analysis.windows.net/powerbi", - "scope": [ - "scope", - "https://analysis.windows.net/powerbi/api/App.Read.All", - ], - "credentials": "path", + "tenantId": "tenant_id", + "scope": ["https://analysis.windows.net/powerbi/api/.default"], "type": "PowerBI", } }, - "sourceConfig": {"config": {}}, + "sourceConfig": { + "config": { + "dashboardFilterPattern": { + "includes": ["Supplier Quality Analysis Sample"] + } + } + }, } config: WorkflowSource = WorkflowSource.parse_obj(source) diff --git a/openmetadata-core/src/main/resources/json/schema/entity/services/connections/dashboard/powerBIConnection.json b/openmetadata-core/src/main/resources/json/schema/entity/services/connections/dashboard/powerBIConnection.json index 98b40e60dff..1c2ce9f1521 100644 --- a/openmetadata-core/src/main/resources/json/schema/entity/services/connections/dashboard/powerBIConnection.json +++ b/openmetadata-core/src/main/resources/json/schema/entity/services/connections/dashboard/powerBIConnection.json @@ -31,15 +31,16 @@ "type": "string", "format": "password" }, - "credentials": { - "title": "Credentials", - "description": "Credentials for PowerBI.", + "tenantId": { + "title": "Tenant ID", + "description": "Tenant ID for PowerBI.", "type": "string" }, - "redirectURI": { - "title": "Redirect URI", - "description": "Dashboard redirect URI for the PowerBI service.", - "type": "string" + "authorityURI": { + "title": "Authority URI", + "description": "Authority URI for the PowerBI service.", + "type": "string", + "default": "https://login.microsoftonline.com/" }, "hostPort": { "title": "Host and Port", @@ -55,7 +56,7 @@ "items": { "type": "string" }, - "default": null + "default": ["https://analysis.windows.net/powerbi/api/.default"] }, "supportsMetadataExtraction": { "title": "Supports Metadata Extraction", @@ -63,5 +64,5 @@ } }, "additionalProperties": false, - "required": ["hostPort", "clientId", "clientSecret"] + "required": ["clientId", "clientSecret", "tenantId"] }