PowerBI - Schema Refactoring (#3960)

* Added PowerBI refactoring

* Update powerbi.py

* Update powerbi.py

* Modified filters
This commit is contained in:
Ayush Shah 2022-04-08 12:08:41 -07:00 committed by GitHub
parent 21d8ba72a8
commit 839d152cc3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 88 additions and 52 deletions

View File

@ -25,7 +25,8 @@
},
"clientSecret": {
"description": "clientSecret for the PowerBI.",
"type": "string"
"type": "string",
"format": "password"
},
"credentials": {
"description": "Credentials for the PowerBI.",

View File

@ -1,27 +1,36 @@
{
"source": {
"type": "powerbi",
"config": {
"client_id": "client_id",
"client_secret": "client_secret",
"service_name": "local_powerbi",
"redirect_uri": "http://localhost:8585/callback",
"scope": [
"scope",
"https://analysis.windows.net/powerbi/api/App.Read.All"
],
"credentials": "path"
"serviceName": "local_powerbi",
"serviceConnection": {
"config": {
"clientId": "client_id",
"clientSecret": "client_secret",
"redirectURI": "http://localhost:8585/callback",
"dashboardURL": "https://analysis.windows.net/powerbi",
"scope": [
"scope",
"https://analysis.windows.net/powerbi/api/App.Read.All"
],
"credentials": "path",
"type": "PowerBI"
}
},
"sourceConfig": {
"config":
{
}
}
},
"sink": {
"type": "metadata-rest",
"config": {}
},
"metadata_server": {
"type": "metadata-server",
"config": {
"api_endpoint": "http://localhost:8585/api",
"auth_provider_type": "no-auth"
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "no-auth"
}
}
}

View File

@ -13,40 +13,32 @@
import logging
import traceback
import uuid
from typing import Iterable, List
from typing import Iterable
from powerbi.client import PowerBiClient
from pydantic.types import SecretStr
from metadata.generated.schema.entity.services.connections.dashboard.powerBIConnection import (
PowerBIConnection,
)
from metadata.generated.schema.entity.services.dashboardService import (
DashboardServiceType,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataServerConfig,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import ConfigModel, Entity, IncludeFilterPattern
from metadata.ingestion.api.source import Source, SourceStatus
from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
from metadata.ingestion.models.table_metadata import Chart, Dashboard
from metadata.utils.filters import filter_by_chart, filter_by_dashboard
from metadata.utils.helpers import get_dashboard_service_or_create
logger: logging.Logger = logging.getLogger(__name__)
class PowerbiSourceConfig(ConfigModel):
"""Powerbi pydantic config model"""
client_id: str
client_secret: SecretStr
service_name: str
scope: List[str] = []
redirect_uri: str
credentials: str
dashboard_url: str = "https://analysis.windows.net/powerbi"
dashboard_filter_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all()
chart_filter_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all()
class PowerbiSource(Source[Entity]):
"""Powerbi entity class
@ -61,33 +53,29 @@ class PowerbiSource(Source[Entity]):
charts:
"""
config: PowerbiSourceConfig
metadata_config: OpenMetadataServerConfig
status: SourceStatus
def __init__(
self,
config: PowerbiSourceConfig,
config: WorkflowSource,
metadata_config: OpenMetadataServerConfig,
):
super().__init__()
self.config = config
self.source_config = self.config.sourceConfig.config
self.service_connection_config = config.serviceConnection.__root__.config
self.metadata_config = metadata_config
self.status = SourceStatus()
self.dashboard_service = get_dashboard_service_or_create(
config.service_name,
self.config.serviceName,
DashboardServiceType.PowerBI.name,
config.client_id,
config.client_secret.get_secret_value(),
config.dashboard_url,
self.service_connection_config.dict(),
metadata_config,
)
self.client = PowerBiClient(
client_id=self.config.client_id,
client_secret=self.config.client_secret.get_secret_value(),
scope=self.config.scope,
redirect_uri=self.config.redirect_uri,
credentials=self.config.credentials,
client_id=self.service_connection_config.clientId,
client_secret=self.service_connection_config.clientSecret.get_secret_value(),
scope=self.service_connection_config.scope,
redirect_uri=self.service_connection_config.redirectURI,
credentials=self.service_connection_config.credentials,
)
@classmethod
@ -100,7 +88,12 @@ class PowerbiSource(Source[Entity]):
Returns:
PowerBiSource
"""
config = PowerbiSourceConfig.parse_obj(config_dict)
config = WorkflowSource.parse_obj(config_dict)
connection: PowerBIConnection = config.serviceConnection.__root__.config
if not isinstance(connection, PowerBIConnection):
raise InvalidSourceException(
f"Expected PowerBIConnection, but got {connection}"
)
return cls(config, metadata_config)
def next_record(self) -> Iterable[Entity]:
@ -115,7 +108,9 @@ class PowerbiSource(Source[Entity]):
"""
for chart in charts:
try:
if not self.config.chart_filter_pattern.included(chart["title"]):
if not filter_by_chart(
self.source_config.chartFilterPattern, chart["title"]
):
self.status.failures(
chart["title"], "Filtered out using Chart filter pattern"
)
@ -146,8 +141,9 @@ class PowerbiSource(Source[Entity]):
try:
dashboard_details = dashboard_service.get_dashboard(dashboard_id["id"])
self.charts = []
if not self.config.dashboard_filter_pattern.included(
dashboard_details["displayName"]
if not filter_by_dashboard(
self.source_config.dashboardFilterPattern,
dashboard_details["displayName"],
):
self.status.failures(
dashboard_details["displayName"],

View File

@ -101,3 +101,33 @@ def filter_by_table(
:return: True for filtering, False otherwise
"""
return _filter(table_filter_pattern, table_name)
def filter_by_chart(
chart_filter_pattern: Optional[FilterPattern], chart_name: str
) -> bool:
"""
Return True if the schema needs to be filtered, False otherwise
Include takes precedence over exclude
:param chart_filter_pattern: Model defining chart filtering logic
:param chart_name: table chart name
:return: True for filtering, False otherwise
"""
return _filter(chart_filter_pattern, chart_name)
def filter_by_dashboard(
dashboard_filter_pattern: Optional[FilterPattern], dashboard_name: str
) -> bool:
"""
Return True if the schema needs to be filtered, False otherwise
Include takes precedence over exclude
:param dashboard_filter_pattern: Model defining dashboard filtering logic
:param dashboard_name: table dashboard name
:return: True for filtering, False otherwise
"""
return _filter(dashboard_filter_pattern, dashboard_name)