Fixed Metabase Lineage (#5270)

* Fixed Metabase Lineage

* removed get chained calls

Co-authored-by: Onkar Ravgan <onkarravgan@Onkars-MacBook-Pro.local>
This commit is contained in:
Onkar Ravgan 2022-06-02 20:11:17 +05:30 committed by GitHub
parent a5ff3a3499
commit 93ae064ffd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -13,12 +13,13 @@
import traceback import traceback
import uuid import uuid
from typing import Iterable, List, Optional from typing import Iterable, List, Optional
from urllib.parse import quote
import requests import requests
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.dashboard import Dashboard as Model_Dashboard from metadata.generated.schema.entity.data.dashboard import (
Dashboard as LineageDashboard,
)
from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.dashboard.metabaseConnection import ( from metadata.generated.schema.entity.services.connections.dashboard.metabaseConnection import (
MetabaseConnection, MetabaseConnection,
@ -26,21 +27,17 @@ from metadata.generated.schema.entity.services.connections.dashboard.metabaseCon
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection, OpenMetadataConnection,
) )
from metadata.generated.schema.entity.services.dashboardService import DashboardService
from metadata.generated.schema.metadataIngestion.dashboardServiceMetadataPipeline import (
DashboardServiceMetadataPipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource, Source as WorkflowSource,
) )
from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.models.table_metadata import Chart, Dashboard from metadata.ingestion.models.table_metadata import Chart, Dashboard
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.dashboard_source import DashboardSourceService from metadata.ingestion.source.dashboard.dashboard_source import DashboardSourceService
from metadata.ingestion.source.database.common_db_source import SQLSourceStatus from metadata.ingestion.source.database.common_db_source import SQLSourceStatus
from metadata.utils import fqn
from metadata.utils.connections import get_connection from metadata.utils.connections import get_connection
from metadata.utils.filters import filter_by_chart from metadata.utils.filters import filter_by_chart
from metadata.utils.logger import ingestion_logger from metadata.utils.logger import ingestion_logger
@ -195,7 +192,6 @@ class MetabaseSource(DashboardSourceService):
dashboard_details["ordered_cards"], dashboard_details["ordered_cards"],
dashboard_details["name"], dashboard_details["name"],
) )
metadata = OpenMetadata(self.metadata_config)
for chart in chart_list: for chart in chart_list:
try: try:
chart_details = chart["card"] chart_details = chart["card"]
@ -204,27 +200,43 @@ class MetabaseSource(DashboardSourceService):
resp_tables = self.req_get(f"/api/table/{chart_details['table_id']}") resp_tables = self.req_get(f"/api/table/{chart_details['table_id']}")
if resp_tables.status_code == 200: if resp_tables.status_code == 200:
table = resp_tables.json() table = resp_tables.json()
table_fqn = f"{self.source_config.dbServiceName}.{table['schema']}.{table['name']}" from_fqn = fqn.build(
dashboard_fqn = f"{self.service.name}.{quote(dashboard_name)}" self.metadata,
table_entity = metadata.get_by_name(entity=Table, fqn=table_fqn) entity_type=Table,
chart_entity = metadata.get_by_name( service_name=self.source_config.dbServiceName,
entity=Model_Dashboard, fqn=dashboard_fqn database_name=table["db"]["details"]["db"],
schema_name=table.get("schema"),
table_name=table.get("display_name"),
) )
logger.debug("from entity %s", table_entity) from_entity = self.metadata.get_by_name(
if table_entity and chart_entity: entity=Table,
fqn=from_fqn,
)
to_fqn = fqn.build(
self.metadata,
entity_type=LineageDashboard,
service_name=self.config.serviceName,
dashboard_name=dashboard_name,
)
to_entity = self.metadata.get_by_name(
entity=LineageDashboard,
fqn=to_fqn,
)
if from_entity and to_entity:
lineage = AddLineageRequest( lineage = AddLineageRequest(
edge=EntitiesEdge( edge=EntitiesEdge(
fromEntity=EntityReference( fromEntity=EntityReference(
id=table_entity.id.__root__, type="table" id=from_entity.id.__root__, type="table"
), ),
toEntity=EntityReference( toEntity=EntityReference(
id=chart_entity.id.__root__, type="dashboard" id=to_entity.id.__root__, type="dashboard"
), ),
) )
) )
yield lineage yield lineage
except Exception as err: # pylint: disable=broad-except,unused-variable except Exception as err: # pylint: disable=broad-except,unused-variable
logger.error(traceback.format_exc()) logger.debug(traceback.format_exc())
logger.error(err)
def req_get(self, path): def req_get(self, path):
"""Send get request method """Send get request method