issue-19892: parse powerbi table source (#20141)

This commit is contained in:
harshsoni2024 2025-03-12 12:59:29 +05:30 committed by GitHub
parent 831a9ba0c4
commit 826279608f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 233 additions and 3 deletions

View File

@ -10,6 +10,7 @@
# limitations under the License. # limitations under the License.
"""PowerBI source module""" """PowerBI source module"""
import re
import traceback import traceback
from typing import Any, Iterable, List, Optional, Union from typing import Any, Iterable, List, Optional, Union
@ -616,6 +617,87 @@ class PowerbiSource(DashboardServiceSource):
logger.debug(f"Error to get data_model_column_fqn {exc}") logger.debug(f"Error to get data_model_column_fqn {exc}")
logger.debug(traceback.format_exc()) logger.debug(traceback.format_exc())
def _parse_redshift_source(self, source_expression: str) -> Optional[dict]:
try:
db_match = re.search(
r'AmazonRedshift\.Database\("[^"]+","([^"]+)"\)', source_expression
)
if not db_match:
# not valid redshift source
return None
schema_table_match = re.findall(r'\[Name="([^"]+)"\]', source_expression)
database = db_match.group(1) if db_match else None
schema = table = None
if isinstance(schema_table_match, list):
schema = schema_table_match[0] if len(schema_table_match) > 0 else None
table = schema_table_match[1] if len(schema_table_match) > 1 else None
if (
schema and table
): # atlease 2 entities schema and table should be fetched
return {"database": database, "schema": schema, "table": table}
return None
except Exception as exc:
logger.debug(f"Error to parse redshift table source: {exc}")
logger.debug(traceback.format_exc())
return None
def _parse_snowflake_source(self, source_expression: str) -> Optional[dict]:
try:
if "Snowflake.Databases" not in source_expression:
# Not a snowflake valid expression
return None
db_match = re.search(
r'\[Name="([^"]+)",Kind="Database"\]', source_expression
)
schema_match = re.search(
r'\[Name="([^"]+)",Kind="Schema"\]', source_expression
)
view_match = re.search(r'\[Name="([^"]+)",Kind="View"\]', source_expression)
table_match = re.search(
r'\[Name="([^"]+)",Kind="Table"\]', source_expression
)
database = db_match.group(1) if db_match else None
schema = schema_match.group(1) if schema_match else None
view = view_match.group(1) if view_match else None
table = table_match.group(1) if table_match else None
if schema and (
table or view
): # atlease 2 entities schema and table should be fetched
return {
"database": database,
"schema": schema,
"table": table if table else view,
}
return None
except Exception as exc:
logger.debug(f"Error to parse snowflake table source: {exc}")
logger.debug(traceback.format_exc())
return None
def _parse_table_info_from_source_exp(self, table: PowerBiTable) -> dict:
try:
if not isinstance(table.source, list):
return {}
source_expression = table.source[0].expression
# parse snowflake source
table_info = self._parse_snowflake_source(source_expression)
if isinstance(table_info, dict):
return table_info
# parse redshift source
table_info = self._parse_redshift_source(source_expression)
if isinstance(table_info, dict):
return table_info
return {}
except Exception as exc:
logger.debug(f"Error to parse table source: {exc}")
logger.debug(traceback.format_exc())
return {}
def _get_table_and_datamodel_lineage( def _get_table_and_datamodel_lineage(
self, self,
db_service_name: Optional[str], db_service_name: Optional[str],
@ -626,11 +708,12 @@ class PowerbiSource(DashboardServiceSource):
Method to create lineage between table and datamodels Method to create lineage between table and datamodels
""" """
try: try:
table_info = self._parse_table_info_from_source_exp(table)
fqn_search_string = build_es_fqn_search_string( fqn_search_string = build_es_fqn_search_string(
database_name=None, database_name=table_info.get("database"),
schema_name=None, schema_name=table_info.get("schema"),
service_name=db_service_name or "*", service_name=db_service_name or "*",
table_name=table.name, table_name=table_info.get("table") or table.name,
) )
table_entity = self.metadata.search_in_any_service( table_entity = self.metadata.search_in_any_service(
entity_type=Table, entity_type=Table,

View File

@ -98,6 +98,14 @@ class PowerBiColumns(BaseModel):
description: Optional[str] = None description: Optional[str] = None
class PowerBITableSource(BaseModel):
"""
PowerBI Table Source
"""
expression: str
class PowerBiTable(BaseModel): class PowerBiTable(BaseModel):
""" """
PowerBI Table Model PowerBI Table Model
@ -107,6 +115,7 @@ class PowerBiTable(BaseModel):
name: str name: str
columns: Optional[List[PowerBiColumns]] = None columns: Optional[List[PowerBiColumns]] = None
description: Optional[str] = None description: Optional[str] = None
source: Optional[List[PowerBITableSource]] = None
class TablesResponse(BaseModel): class TablesResponse(BaseModel):

View File

@ -0,0 +1,138 @@
from unittest import TestCase
from unittest.mock import patch
import pytest
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.powerbi.metadata import PowerbiSource
MOCK_REDSHIFT_EXP = """
let
Source = AmazonRedshift.Database("redsshift-cluster.redshift.amazonaws.com:5439","dev"),
demo_dbt_jaffle = Source{[Name="demo_dbt_jaffle"]}[Data],
customers_clean1 = demo_dbt_jaffle{[Name="customers_clean"]}[Data]
in
customers_clean1
"""
MOCK_REDSHIFT_EXP_INVALID = """
let
Source = Database("redsshift-cluster.redshift.amazonaws.com:5439","dev"),
demo_dbt_jaffle = Source{[Name="demo_dbt_jaffle"]}[Data],
customers_clean1 = demo_dbt_jaffle{[Name="customers_clean"]}[Data]
in
customers_clean1
"""
MOCK_REDSHIFT_EXP_INVALID_V2 = """
let
Source = AmazonRedshift.Database("redsshift-cluster.redshift.amazonaws.com:5439","dev"),
customers_clean1 = demo_dbt_jaffle{[Name="customers_clean"]}[Data]
in
customers_clean1
"""
EXPECTED_REDSHIFT_RESULT = {
"database": "dev",
"schema": "demo_dbt_jaffle",
"table": "customers_clean",
}
MOCK_SNOWFLAKE_EXP = """let
Source = Snowflake.Databases("abcd-123.snowflakecomputing.com","COMPUTE_WH"),
DEMO_STAGE_Database = Source{[Name="DEMO_STAGE",Kind="Database"]}[Data],
PUBLIC_Schema = DEMO_STAGE_Database{[Name="PUBLIC",Kind="Schema"]}[Data],
STG_CUSTOMERS_View = PUBLIC_Schema{[Name="STG_CUSTOMERS",Kind="View"]}[Data]
in
STG_CUSTOMERS_View"""
MOCK_SNOWFLAKE_EXP_INVALID = """let
Source = Snowflake("abcd-123.snowflakecomputing.com","COMPUTE_WH"),
DEMO_STAGE_Database = Source{[Name="DEMO_STAGE",Kind="Database"]}[Data],
in
STG_CUSTOMERS_View"""
EXPECTED_SNOWFLAKE_RESULT = {
"database": "DEMO_STAGE",
"schema": "PUBLIC",
"table": "STG_CUSTOMERS",
}
mock_config = {
"source": {
"type": "powerbi",
"serviceName": "mock_metabase",
"serviceConnection": {
"config": {
"type": "PowerBI",
"clientId": "client_id",
"clientSecret": "secret",
"tenantId": "tenant_id",
},
},
"sourceConfig": {"config": {"type": "DashboardMetadata"}},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"loggerLevel": "DEBUG",
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {
"jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGc"
"iOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE"
"2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXB"
"iEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fN"
"r3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3u"
"d-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
},
},
},
}
class PowerBIUnitTest(TestCase):
"""
Implements the necessary methods to extract
powerbi Dashboard Unit Test
"""
@patch(
"metadata.ingestion.source.dashboard.dashboard_service.DashboardServiceSource.test_connection"
)
@patch("metadata.ingestion.source.dashboard.powerbi.connection.get_connection")
def __init__(self, methodName, get_connection, test_connection) -> None:
super().__init__(methodName)
get_connection.return_value = False
test_connection.return_value = False
self.config = OpenMetadataWorkflowConfig.model_validate(mock_config)
self.powerbi: PowerbiSource = PowerbiSource.create(
mock_config["source"],
OpenMetadata(self.config.workflowConfig.openMetadataServerConfig),
)
@pytest.mark.order(1)
def test_parse_database_source(self):
# Test with valid redshift source
result = self.powerbi._parse_redshift_source(MOCK_REDSHIFT_EXP)
self.assertEqual(result, EXPECTED_REDSHIFT_RESULT)
# Test with invalid redshift source
result = self.powerbi._parse_redshift_source(MOCK_REDSHIFT_EXP_INVALID)
self.assertEqual(result, None)
# Test with invalid redshift source
result = self.powerbi._parse_redshift_source(MOCK_REDSHIFT_EXP_INVALID_V2)
self.assertEqual(result, None)
# Test with valid snowflake source
result = self.powerbi._parse_snowflake_source(MOCK_SNOWFLAKE_EXP)
self.assertEqual(result, EXPECTED_SNOWFLAKE_RESULT)
# Test with invalid snowflake source
result = self.powerbi._parse_snowflake_source(MOCK_SNOWFLAKE_EXP_INVALID)
self.assertEqual(result, None)