mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-27 09:58:14 +00:00
feat(ingestion): powerbi # Amazon Redshift lineage support (#7562)
Co-authored-by: MohdSiddiqueBagwan <mohdsiddique.bagwan@gslab.com> Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
This commit is contained in:
parent
5aed03a5e2
commit
fc8757d25e
@ -40,7 +40,7 @@ PowerBI Source supports M-Query expression for below listed PowerBI Data Sources
|
||||
4. Microsoft SQL Server
|
||||
5. Google BigQuery
|
||||
|
||||
Native SQL query parsing is only supported for `Snowflake` data-source and only first table from `FROM` clause will be ingested as upstream table. Advance SQL construct like JOIN and SUB-QUERIES in `FROM` clause are not supported.
|
||||
Native SQL query parsing is supported for `Snowflake` and `Amazon Redshift` data-sources and only first table from `FROM` clause will be ingested as upstream table. Advance SQL construct like JOIN and SUB-QUERIES in `FROM` clause are not supported.
|
||||
|
||||
For example refer below native SQL query. The table `OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_UNIT_TARGET` will be ingested as upstream table.
|
||||
|
||||
@ -70,6 +70,7 @@ let
|
||||
in
|
||||
#"Added Conditional Column"
|
||||
```
|
||||
Use full-table-name in `from` clause. For example dev.public.category
|
||||
|
||||
## M-Query Pattern Supported For Lineage Extraction
|
||||
Lets consider a M-Query which combine two PostgreSQL tables. Such M-Query can be written as per below patterns.
|
||||
|
||||
@ -47,11 +47,17 @@ class SupportedDataPlatform(Enum):
|
||||
MS_SQL = DataPlatformPair(
|
||||
powerbi_data_platform_name="Sql", datahub_data_platform_name="mssql"
|
||||
)
|
||||
|
||||
GOOGLE_BIGQUERY = DataPlatformPair(
|
||||
powerbi_data_platform_name="GoogleBigQuery",
|
||||
datahub_data_platform_name="bigquery",
|
||||
)
|
||||
|
||||
AMAZON_REDSHIFT = DataPlatformPair(
|
||||
powerbi_data_platform_name="AmazonRedshift",
|
||||
datahub_data_platform_name="redshift",
|
||||
)
|
||||
|
||||
|
||||
class AbstractTableFullNameCreator(ABC):
|
||||
@abstractmethod
|
||||
@ -64,6 +70,21 @@ class AbstractTableFullNameCreator(ABC):
|
||||
def get_platform_pair(self) -> DataPlatformPair:
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def get_db_name_from_second_argument(arg_list: Tree) -> Optional[str]:
|
||||
arguments: List[str] = tree_function.strip_char_from_list(
|
||||
values=tree_function.remove_whitespaces_from_list(
|
||||
tree_function.token_values(arg_list)
|
||||
),
|
||||
char='"',
|
||||
)
|
||||
|
||||
if len(arguments) < 2:
|
||||
logger.debug(f"Expected minimum 2 arguments, but got {len(arguments)}")
|
||||
return None
|
||||
|
||||
return arguments[1]
|
||||
|
||||
|
||||
class AbstractDataAccessMQueryResolver(ABC):
|
||||
table: Table
|
||||
@ -395,20 +416,15 @@ class DefaultTwoStepDataAccessSources(AbstractTableFullNameCreator, ABC):
|
||||
full_table_names: List[str] = []
|
||||
|
||||
logger.debug(
|
||||
f"Processing PostgreSQL data-access function detail {data_access_func_detail}"
|
||||
)
|
||||
arguments: List[str] = tree_function.strip_char_from_list(
|
||||
values=tree_function.remove_whitespaces_from_list(
|
||||
tree_function.token_values(data_access_func_detail.arg_list)
|
||||
),
|
||||
char='"',
|
||||
f"Processing {self.get_platform_pair().powerbi_data_platform_name} function detail {data_access_func_detail}"
|
||||
)
|
||||
|
||||
if len(arguments) != 2:
|
||||
logger.debug(f"Expected 2 arguments, but got {len(arguments)}")
|
||||
return full_table_names
|
||||
|
||||
db_name: str = arguments[1]
|
||||
db_name: Optional[str] = self.get_db_name_from_second_argument(
|
||||
data_access_func_detail.arg_list
|
||||
)
|
||||
if db_name is None:
|
||||
logger.debug("db_name not found in expression")
|
||||
return full_table_names # Return empty list
|
||||
|
||||
schema_name: str = cast(
|
||||
IdentifierAccessor, data_access_func_detail.identifier_accessor
|
||||
@ -563,9 +579,55 @@ class GoogleBigQueryTableFullNameCreator(DefaultThreeStepDataAccessSources):
|
||||
return SupportedDataPlatform.GOOGLE_BIGQUERY.value
|
||||
|
||||
|
||||
class NativeQueryTableFullNameCreator(AbstractTableFullNameCreator):
|
||||
class AmazonRedshiftFullNameCreator(AbstractTableFullNameCreator):
|
||||
def get_platform_pair(self) -> DataPlatformPair:
|
||||
return SupportedDataPlatform.SNOWFLAKE.value
|
||||
return SupportedDataPlatform.AMAZON_REDSHIFT.value
|
||||
|
||||
def get_full_table_names(
|
||||
self, data_access_func_detail: DataAccessFunctionDetail
|
||||
) -> List[str]:
|
||||
full_table_names: List[str] = []
|
||||
|
||||
logger.debug(
|
||||
f"Processing AmazonRedshift data-access function detail {data_access_func_detail}"
|
||||
)
|
||||
|
||||
db_name: Optional[str] = self.get_db_name_from_second_argument(
|
||||
data_access_func_detail.arg_list
|
||||
)
|
||||
if db_name is None:
|
||||
return full_table_names # Return empty list
|
||||
|
||||
schema_name: str = cast(
|
||||
IdentifierAccessor, data_access_func_detail.identifier_accessor
|
||||
).items["Name"]
|
||||
|
||||
table_name: str = cast(
|
||||
IdentifierAccessor,
|
||||
cast(IdentifierAccessor, data_access_func_detail.identifier_accessor).next,
|
||||
).items["Name"]
|
||||
|
||||
full_table_names.append(f"{db_name}.{schema_name}.{table_name}")
|
||||
|
||||
return full_table_names
|
||||
|
||||
|
||||
class NativeQueryTableFullNameCreator(AbstractTableFullNameCreator):
|
||||
SUPPORTED_NATIVE_QUERY_DATA_PLATFORM: dict = {
|
||||
SupportedDataPlatform.SNOWFLAKE.value.powerbi_data_platform_name: SupportedDataPlatform.SNOWFLAKE,
|
||||
SupportedDataPlatform.AMAZON_REDSHIFT.value.powerbi_data_platform_name: SupportedDataPlatform.AMAZON_REDSHIFT,
|
||||
}
|
||||
current_data_platform: SupportedDataPlatform = SupportedDataPlatform.SNOWFLAKE
|
||||
|
||||
def get_platform_pair(self) -> DataPlatformPair:
|
||||
return self.current_data_platform.value
|
||||
|
||||
@staticmethod
|
||||
def is_native_parsing_supported(data_access_function_name: str) -> bool:
|
||||
return (
|
||||
data_access_function_name
|
||||
in NativeQueryTableFullNameCreator.SUPPORTED_NATIVE_QUERY_DATA_PLATFORM
|
||||
)
|
||||
|
||||
def get_full_table_names(
|
||||
self, data_access_func_detail: DataAccessFunctionDetail
|
||||
@ -586,16 +648,18 @@ class NativeQueryTableFullNameCreator(AbstractTableFullNameCreator):
|
||||
data_access_tokens: List[str] = tree_function.remove_whitespaces_from_list(
|
||||
tree_function.token_values(flat_argument_list[0])
|
||||
)
|
||||
if (
|
||||
data_access_tokens[0]
|
||||
!= SupportedDataPlatform.SNOWFLAKE.value.powerbi_data_platform_name
|
||||
):
|
||||
if not self.is_native_parsing_supported(data_access_tokens[0]):
|
||||
logger.debug(
|
||||
f"Provided native-query data-platform = {data_access_tokens[0]}"
|
||||
f"Unsupported native-query data-platform = {data_access_tokens[0]}"
|
||||
)
|
||||
logger.debug(
|
||||
f"NativeQuery is supported only for {self.SUPPORTED_NATIVE_QUERY_DATA_PLATFORM}"
|
||||
)
|
||||
logger.debug("Only Snowflake is supported in NativeQuery")
|
||||
return full_table_names
|
||||
|
||||
self.current_data_platform = self.SUPPORTED_NATIVE_QUERY_DATA_PLATFORM[
|
||||
data_access_tokens[0]
|
||||
]
|
||||
# First argument is the query
|
||||
sql_query: str = tree_function.strip_char_from_list(
|
||||
values=tree_function.remove_whitespaces_from_list(
|
||||
@ -625,6 +689,7 @@ class FunctionName(Enum):
|
||||
SNOWFLAKE_DATA_ACCESS = "Snowflake.Databases"
|
||||
MSSQL_DATA_ACCESS = "Sql.Database"
|
||||
GOOGLE_BIGQUERY_DATA_ACCESS = "GoogleBigQuery.Database"
|
||||
AMAZON_REDSHIFT_DATA_ACCESS = "AmazonRedshift.Database"
|
||||
|
||||
|
||||
class SupportedResolver(Enum):
|
||||
@ -652,6 +717,12 @@ class SupportedResolver(Enum):
|
||||
GoogleBigQueryTableFullNameCreator,
|
||||
FunctionName.GOOGLE_BIGQUERY_DATA_ACCESS,
|
||||
)
|
||||
|
||||
AMAZON_REDSHIFT = (
|
||||
AmazonRedshiftFullNameCreator,
|
||||
FunctionName.AMAZON_REDSHIFT_DATA_ACCESS,
|
||||
)
|
||||
|
||||
NATIVE_QUERY = (
|
||||
NativeQueryTableFullNameCreator,
|
||||
FunctionName.NATIVE_QUERY,
|
||||
|
||||
@ -35,6 +35,8 @@ M_QUERIES = [
|
||||
'let \nSource = GoogleBigQuery.Database([BillingProject = #"Parameter - Source"]),\n#"gcp-project" = Source{[Name=#"Parameter - Source"]}[Data],\ngcp_billing_Schema = #"gcp-project"{[Name=#"My bq project",Kind="Schema"]}[Data],\nF_GCP_COST_Table = gcp_billing_Schema{[Name="GCP_TABLE",Kind="Table"]}[Data]\nin\nF_GCP_COST_Table',
|
||||
'let\n Source = GoogleBigQuery.Database([BillingProject = #"Parameter - Source"]),\n#"gcp-project" = Source{[Name=#"Parameter - Source"]}[Data],\nuniversal_Schema = #"gcp-project"{[Name="universal",Kind="Schema"]}[Data],\nD_WH_DATE_Table = universal_Schema{[Name="D_WH_DATE",Kind="Table"]}[Data],\n#"Filtered Rows" = Table.SelectRows(D_WH_DATE_Table, each [D_DATE] > #datetime(2019, 9, 10, 0, 0, 0)),\n#"Filtered Rows1" = Table.SelectRows(#"Filtered Rows", each DateTime.IsInPreviousNHours([D_DATE], 87600))\n in \n#"Filtered Rows1"',
|
||||
'let\n Source = GoogleBigQuery.Database([BillingProject="dwh-prod"]),\ngcp_project = Source{[Name="dwh-prod"]}[Data],\ngcp_billing_Schema = gcp_project {[Name="gcp_billing",Kind="Schema"]}[Data],\nD_GCP_CUSTOM_LABEL_Table = gcp_billing_Schema{[Name="D_GCP_CUSTOM_LABEL",Kind="Table"]}[Data] \n in \n D_GCP_CUSTOM_LABEL_Table',
|
||||
'let\n Source = AmazonRedshift.Database("redshift-url","dev"),\n public = Source{[Name="public"]}[Data],\n category1 = public{[Name="category"]}[Data]\nin\n category1',
|
||||
'let\n Source = Value.NativeQuery(AmazonRedshift.Database("redshift-url","dev"), "select * from dev.public.category", null, [EnableFolding=true]) \n in Source',
|
||||
]
|
||||
|
||||
|
||||
@ -500,3 +502,43 @@ def test_expression_is_none():
|
||||
)
|
||||
|
||||
assert len(data_platform_tables) == 0
|
||||
|
||||
|
||||
def test_redshift_regular_case():
|
||||
table: powerbi_data_classes.Table = powerbi_data_classes.Table(
|
||||
expression=M_QUERIES[21],
|
||||
name="category",
|
||||
full_name="dev.public.category",
|
||||
)
|
||||
reporter = PowerBiDashboardSourceReport()
|
||||
|
||||
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
|
||||
table, reporter, native_query_enabled=False
|
||||
)
|
||||
assert len(data_platform_tables) == 1
|
||||
assert data_platform_tables[0].name == table.full_name.split(".")[2]
|
||||
assert data_platform_tables[0].full_name == table.full_name
|
||||
assert (
|
||||
data_platform_tables[0].data_platform_pair.powerbi_data_platform_name
|
||||
== SupportedDataPlatform.AMAZON_REDSHIFT.value.powerbi_data_platform_name
|
||||
)
|
||||
|
||||
|
||||
def test_redshift_native_query():
|
||||
table: powerbi_data_classes.Table = powerbi_data_classes.Table(
|
||||
expression=M_QUERIES[22],
|
||||
name="category",
|
||||
full_name="dev.public.category",
|
||||
)
|
||||
reporter = PowerBiDashboardSourceReport()
|
||||
|
||||
data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
|
||||
table, reporter, native_query_enabled=True
|
||||
)
|
||||
assert len(data_platform_tables) == 1
|
||||
assert data_platform_tables[0].name == table.full_name.split(".")[2]
|
||||
assert data_platform_tables[0].full_name == table.full_name
|
||||
assert (
|
||||
data_platform_tables[0].data_platform_pair.powerbi_data_platform_name
|
||||
== SupportedDataPlatform.AMAZON_REDSHIFT.value.powerbi_data_platform_name
|
||||
)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user