mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-11-02 19:48:17 +00:00
Tableau Lineage Implementation (#2850)
* Tableau Lineage Implementation * Handling Code Smell
This commit is contained in:
parent
5ecf980ab0
commit
0e34028d05
@ -19,11 +19,21 @@ from typing import Iterable, List, Optional
|
||||
import dateutil.parser as dateparser
|
||||
from pydantic import SecretStr
|
||||
from tableau_api_lib import TableauServerConnection
|
||||
from tableau_api_lib.utils.querying import get_views_dataframe, get_workbooks_dataframe
|
||||
from tableau_api_lib.utils.querying import (
|
||||
get_views_dataframe,
|
||||
get_workbook_connections_dataframe,
|
||||
get_workbooks_dataframe,
|
||||
)
|
||||
|
||||
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
|
||||
from metadata.generated.schema.entity.data.dashboard import (
|
||||
Dashboard as Dashboard_Entity,
|
||||
)
|
||||
from metadata.generated.schema.entity.data.table import Table
|
||||
from metadata.generated.schema.entity.services.dashboardService import (
|
||||
DashboardServiceType,
|
||||
)
|
||||
from metadata.generated.schema.type.entityLineage import EntitiesEdge
|
||||
from metadata.generated.schema.type.entityReference import EntityReference
|
||||
from metadata.ingestion.api.common import (
|
||||
ConfigModel,
|
||||
@ -33,6 +43,7 @@ from metadata.ingestion.api.common import (
|
||||
)
|
||||
from metadata.ingestion.api.source import Source, SourceStatus
|
||||
from metadata.ingestion.models.table_metadata import Chart, Dashboard, DashboardOwner
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
|
||||
from metadata.utils.helpers import get_dashboard_service_or_create
|
||||
|
||||
@ -49,6 +60,7 @@ class TableauSourceConfig(ConfigModel):
|
||||
env: Optional[str] = "tableau_prod"
|
||||
site_name: str
|
||||
site_url: str
|
||||
db_service_name: Optional[str] = None
|
||||
service_name: str
|
||||
service_type: str = DashboardServiceType.Tableau.value
|
||||
personal_access_token_name: Optional[str] = None
|
||||
@ -97,6 +109,7 @@ class TableauSource(Source[Entity]):
|
||||
metadata_config=metadata_config,
|
||||
)
|
||||
self.status = SourceStatus()
|
||||
self.metadata_client = OpenMetadata(self.metadata_config)
|
||||
self.dashboards = get_workbooks_dataframe(self.client).to_dict()
|
||||
self.all_dashboard_details = get_views_dataframe(self.client).to_dict()
|
||||
|
||||
@ -172,12 +185,44 @@ class TableauSource(Source[Entity]):
|
||||
)
|
||||
]
|
||||
|
||||
def get_lineage(self, datasource_list, dashboard_name) -> AddLineageRequest:
|
||||
for datasource in datasource_list:
|
||||
try:
|
||||
table_fqdn = datasource.split("(")[1].split(")")[0]
|
||||
dashboard_fqdn = f"{self.config.service_name}.{dashboard_name}"
|
||||
table_fqdn = f"{self.config.db_service_name}.{table_fqdn}"
|
||||
table_entity = self.metadata_client.get_by_name(
|
||||
entity=Table, fqdn=table_fqdn
|
||||
)
|
||||
dashboard_entity = self.metadata_client.get_by_name(
|
||||
entity=Dashboard_Entity, fqdn=dashboard_fqdn
|
||||
)
|
||||
if table_entity and dashboard_entity:
|
||||
lineage = AddLineageRequest(
|
||||
edge=EntitiesEdge(
|
||||
fromEntity=EntityReference(
|
||||
id=table_entity.id.__root__, type="table"
|
||||
),
|
||||
toEntity=EntityReference(
|
||||
id=dashboard_entity.id.__root__, type="dashboard"
|
||||
),
|
||||
)
|
||||
)
|
||||
yield lineage
|
||||
except (Exception, IndexError) as err:
|
||||
logger.error(err)
|
||||
|
||||
def _get_tableau_dashboard(self) -> Dashboard:
|
||||
for index in range(len(self.dashboards["id"])):
|
||||
dashboard_id = self.dashboards["id"][index]
|
||||
dashboard_name = self.dashboards["name"][index]
|
||||
dashboard_tag = self.dashboards["tags"][index]
|
||||
dashboard_url = self.dashboards["webpageUrl"][index]
|
||||
datasource_list = (
|
||||
get_workbook_connections_dataframe(self.client, dashboard_id)
|
||||
.get("datasource_name")
|
||||
.tolist()
|
||||
)
|
||||
tag_labels = []
|
||||
if hasattr(dashboard_tag, "tag"):
|
||||
for tag in dashboard_tag["tag"]:
|
||||
@ -202,6 +247,8 @@ class TableauSource(Source[Entity]):
|
||||
service=EntityReference(id=self.service.id, type="dashboardService"),
|
||||
last_modified=dateparser.parse(chart["updatedAt"]).timestamp() * 1000,
|
||||
)
|
||||
if self.config.db_service_name:
|
||||
yield from self.get_lineage(datasource_list, dashboard_id)
|
||||
|
||||
def _get_tableau_charts(self):
|
||||
for index in range(len(self.all_dashboard_details["id"])):
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user