Tableau lineage using Graphql (#7603)

* Added graphql query

* Added tableau lineage with graphql

Co-authored-by: Onkar Ravgan <onkarravgan@Onkars-MacBook-Pro.local>
This commit is contained in:
Onkar Ravgan 2022-09-21 11:14:32 +05:30 committed by GitHub
parent bb66105545
commit 254b0b64d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -11,6 +11,7 @@
"""
Tableau source module
"""
import json
import traceback
from typing import Iterable, List, Optional
@ -58,6 +59,27 @@ from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
TABLEAU_TAG_CATEGORY = "TableauTags"
TABLEAU_LINEAGE_GRAPHQL_QUERY = """
{
workbooks {
id
luid
name
upstreamTables{
name
schema
upstreamDatabases{
name
}
referencedByQueries{
name
query
}
}
}
}
"""
class TableauSource(DashboardServiceSource):
@ -75,6 +97,7 @@ class TableauSource(DashboardServiceSource):
self.workbooks = {}
self.tags = []
self.owner = {}
self.workboook_datasources = {}
def prepare(self):
# Restructuring the api response for workbooks
@ -109,6 +132,14 @@ class TableauSource(DashboardServiceSource):
owner = get_all_user_fields(self.client)
self.owner = {user["id"]: user for user in owner}
# Fetch Datasource information for lineage
graphql_query_result = self.client.metadata_graphql_query(
query=TABLEAU_LINEAGE_GRAPHQL_QUERY
)
self.workboook_datasources = json.loads(graphql_query_result.text)["data"].get(
"workbooks"
)
return super().prepare()
@classmethod
@ -223,38 +254,43 @@ class TableauSource(DashboardServiceSource):
"""
Get lineage between dashboard and data sources
"""
datasource_list = (
get_workbook_connections_dataframe(self.client, dashboard_details.get("id"))
.get("datasource_name")
.tolist()
)
dashboard_name = dashboard_details.get("name")
dashboard_id = dashboard_details.get("id")
data_source = next(
(
data_source
for data_source in self.workboook_datasources
if data_source.get("luid") == dashboard_id
),
None,
)
to_fqn = fqn.build(
self.metadata,
entity_type=LineageDashboard,
service_name=self.config.serviceName,
dashboard_name=dashboard_name,
dashboard_name=dashboard_id,
)
to_entity = self.metadata.get_by_name(
entity=LineageDashboard,
fqn=to_fqn,
)
for datasource in datasource_list:
try:
schema_and_table_name = (
datasource.split("(")[1].split(")")[0].split(".")
)
schema_name = schema_and_table_name[0]
table_name = schema_and_table_name[1]
try:
upstream_tables = data_source.get("upstreamTables")
for upstream_table in upstream_tables:
database_schema_table = fqn.split_table_name(upstream_table.get("name"))
database_name = database_schema_table.get("database")
schema_name = database_schema_table.get("database_schema")
table_name = database_schema_table.get("table")
from_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name=db_service_name,
schema_name=schema_name,
schema_name=schema_name
if schema_name
else upstream_table.get("schema"),
table_name=table_name,
database_name=None,
database_name=database_name,
)
from_entity = self.metadata.get_by_name(
entity=Table,
@ -272,11 +308,11 @@ class TableauSource(DashboardServiceSource):
)
)
yield lineage
except (Exception, IndexError) as err:
logger.debug(traceback.format_exc())
logger.error(
f"Error to yield dashboard lineage details for DB service name [{db_service_name}]: {err}"
)
except (Exception, IndexError) as err:
logger.debug(traceback.format_exc())
logger.error(
f"Error to yield dashboard lineage details for DB service name [{db_service_name}]: {err}"
)
def yield_dashboard_chart(
self, dashboard_details: dict