#14235: adding dialect based on connection type to LineageParser (#14249)

* Fix #14235: adding dialect based on connection type to LineageParser

* Fix: formating changes

* Update ingestion/src/metadata/ingestion/source/dashboard/metabase/metadata.py

Co-authored-by: Mayur Singal <39544459+ulixius9@users.noreply.github.com>

* style: fix indentation errors

* Fix pytest

---------

Co-authored-by: LucasGarcia07 <lucas.junqueira@hurb.com>
Co-authored-by: ulixius9 <mayursingal9@gmail.com>
This commit is contained in:
Lucas Garcia 2023-12-08 11:19:59 -03:00 committed by GitHub
parent ceb750f7e9
commit fe06b5cbb2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 31 additions and 2 deletions

View File

@ -26,11 +26,13 @@ from metadata.generated.schema.entity.services.connections.dashboard.metabaseCon
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection, OpenMetadataConnection,
) )
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource, Source as WorkflowSource,
) )
from metadata.ingestion.api.models import Either, StackTraceError from metadata.ingestion.api.models import Either, StackTraceError
from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper
from metadata.ingestion.lineage.parser import LineageParser from metadata.ingestion.lineage.parser import LineageParser
from metadata.ingestion.lineage.sql_lineage import search_table_entities from metadata.ingestion.lineage.sql_lineage import search_table_entities
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
@ -257,6 +259,9 @@ class MetabaseSource(DashboardServiceSource):
) )
) )
def _get_database_service(self, db_service_name: str):
return self.metadata.get_by_name(DatabaseService, db_service_name)
def _yield_lineage_from_query( def _yield_lineage_from_query(
self, chart_details: MetabaseChart, db_service_name: str, dashboard_name: str self, chart_details: MetabaseChart, db_service_name: str, dashboard_name: str
) -> Iterable[Either[AddLineageRequest]]: ) -> Iterable[Either[AddLineageRequest]]:
@ -275,7 +280,15 @@ class MetabaseSource(DashboardServiceSource):
database_name = database.details.db if database and database.details else None database_name = database.details.db if database and database.details else None
lineage_parser = LineageParser(query) db_service = self._get_database_service(db_service_name)
lineage_parser = LineageParser(
query,
ConnectionTypeDialectMapper.dialect_of(db_service.serviceType.value)
if db_service
else None,
)
for table in lineage_parser.source_tables: for table in lineage_parser.source_tables:
database_schema_name, table = fqn.split(str(table))[-2:] database_schema_name, table = fqn.split(str(table))[-2:]
database_schema_name = self.check_database_schema_name(database_schema_name) database_schema_name = self.check_database_schema_name(database_schema_name)

View File

@ -30,6 +30,11 @@ from metadata.generated.schema.entity.services.dashboardService import (
DashboardService, DashboardService,
DashboardServiceType, DashboardServiceType,
) )
from metadata.generated.schema.entity.services.databaseService import (
DatabaseConnection,
DatabaseService,
DatabaseServiceType,
)
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig, OpenMetadataWorkflowConfig,
) )
@ -59,6 +64,14 @@ MOCK_DASHBOARD_SERVICE = DashboardService(
serviceType=DashboardServiceType.Metabase, serviceType=DashboardServiceType.Metabase,
) )
MOCK_DATABASE_SERVICE = DatabaseService(
id="c3eb265f-5445-4ad3-ba5e-797d3a3071bb",
fullyQualifiedName=FullyQualifiedEntityName(__root__="mock_mysql"),
name="mock_mysql",
connection=DatabaseConnection(),
serviceType=DatabaseServiceType.Mysql,
)
Mock_DATABASE_SCHEMA = "my_schema" Mock_DATABASE_SCHEMA = "my_schema"
Mock_DATABASE_SCHEMA_DEFAULT = "<default>" Mock_DATABASE_SCHEMA_DEFAULT = "<default>"
@ -133,7 +146,7 @@ MOCK_CHARTS = [
name="chart2", name="chart2",
id="2", id="2",
dataset_query=DatasetQuery( dataset_query=DatasetQuery(
type="native", native=Native(query="select * from table") type="native", native=Native(query="select * from test_table")
), ),
display="chart2", display="chart2",
) )
@ -269,6 +282,9 @@ class MetabaseUnitTest(TestCase):
@patch.object(fqn, "build", return_value=None) @patch.object(fqn, "build", return_value=None)
@patch.object(OpenMetadata, "get_by_name", return_value=EXAMPLE_DASHBOARD) @patch.object(OpenMetadata, "get_by_name", return_value=EXAMPLE_DASHBOARD)
@patch.object(MetabaseMetadata, "search_table_entities", return_value=EXAMPLE_TABLE) @patch.object(MetabaseMetadata, "search_table_entities", return_value=EXAMPLE_TABLE)
@patch.object(
MetabaseSource, "_get_database_service", return_value=MOCK_DATABASE_SERVICE
)
def test_yield_lineage(self, *_): def test_yield_lineage(self, *_):
""" """
Function to test out lineage Function to test out lineage