tableau-connector-fix-added (#3946)

* Fixed Tableau source config

* Review changes

Co-authored-by: Onkar Ravgan <onkarravgan@Onkars-MacBook-Pro.local>
This commit is contained in:
Onkar Ravgan 2022-04-08 19:23:44 +05:30 committed by GitHub
parent bb76a2f11b
commit 7b26b5ce24
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 67 additions and 73 deletions

View File

@ -19,7 +19,7 @@
"$ref": "#/definitions/tableauType", "$ref": "#/definitions/tableauType",
"default": "Tableau" "default": "Tableau"
}, },
"server": { "hostPort": {
"description": "Tableau Server", "description": "Tableau Server",
"type": "string" "type": "string"
}, },
@ -40,11 +40,6 @@
"description": "Tableau Site Name", "description": "Tableau Site Name",
"type": "string" "type": "string"
}, },
"siteURL": {
"description": "Tableau Site URL",
"type": "string",
"format": "uri"
},
"personalAccessTokenName": { "personalAccessTokenName": {
"description": "Personal Access Token Name", "description": "Personal Access Token Name",
"type": "string" "type": "string"
@ -53,6 +48,10 @@
"description": "Personal Access Token Secret", "description": "Personal Access Token Secret",
"type": "string" "type": "string"
}, },
"env": {
"description": "Tableau Environment Name",
"type": "string"
},
"supportedPipelineTypes": { "supportedPipelineTypes": {
"description": "Supported Metadata Extraction Pipelines.", "description": "Supported Metadata Extraction Pipelines.",
"type": "string", "type": "string",

View File

@ -1,28 +1,30 @@
{ {
"source": { "source": {
"type": "tableau", "type": "tableau",
"serviceName": "local_tableau",
"serviceConnection": {
"config": { "config": {
"personal_access_token_secret": "personal_access_token_secret", "type": "Tableau",
"personal_access_token_name": "personal_access_token_name",
"username": "username", "username": "username",
"password": "password", "password": "password",
"service_name": "local_tableau", "env": "tableau_prod",
"server": "server_address", "hostPort": "localhost",
"site_name": "site_name", "siteName": "site_name",
"site_url": "site_url", "apiVersion": "api_version",
"api_version": "api version", "personalAccessTokenName": "personal_access_token_name",
"env": "env" "personalAccessTokenSecret": "personal_access_token_secret"
} }
}, },
"sourceConfig": {"config": {"enableDataProfiler": false}}
},
"sink": { "sink": {
"type": "metadata-rest", "type": "metadata-rest",
"config": {} "config": {}
}, },
"metadata_server": { "workflowConfig": {
"type": "metadata-server", "openMetadataServerConfig": {
"config": { "hostPort": "http://localhost:8585/api",
"api_endpoint": "http://localhost:8585/api", "authProvider": "no-auth"
"auth_provider_type": "no-auth"
} }
} }
} }

View File

@ -14,10 +14,9 @@ Tableau source module
import logging import logging
import uuid import uuid
from typing import Iterable, List, Optional from typing import Iterable, List
import dateutil.parser as dateparser import dateutil.parser as dateparser
from pydantic import SecretStr
from tableau_api_lib import TableauServerConnection from tableau_api_lib import TableauServerConnection
from tableau_api_lib.utils.querying import ( from tableau_api_lib.utils.querying import (
get_views_dataframe, get_views_dataframe,
@ -31,16 +30,22 @@ from metadata.generated.schema.entity.data.dashboard import (
Dashboard as Dashboard_Entity, Dashboard as Dashboard_Entity,
) )
from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.dashboard.tableauConnection import (
TableauConnection,
)
from metadata.generated.schema.entity.services.dashboardService import ( from metadata.generated.schema.entity.services.dashboardService import (
DashboardServiceType, DashboardServiceType,
) )
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataServerConfig, OpenMetadataServerConfig,
) )
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import ConfigModel, Entity, IncludeFilterPattern from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import Source, SourceStatus from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
from metadata.ingestion.models.table_metadata import Chart, Dashboard, DashboardOwner from metadata.ingestion.models.table_metadata import Chart, Dashboard, DashboardOwner
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.helpers import get_dashboard_service_or_create from metadata.utils.helpers import get_dashboard_service_or_create
@ -48,25 +53,6 @@ from metadata.utils.helpers import get_dashboard_service_or_create
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class TableauSourceConfig(ConfigModel):
"""Tableau pydantic source model"""
username: Optional[str] = None
password: Optional[SecretStr] = None
server: str
api_version: str
env: Optional[str] = "tableau_prod"
site_name: str
site_url: str
db_service_name: Optional[str] = None
service_name: str
service_type: str = DashboardServiceType.Tableau.value
personal_access_token_name: Optional[str] = None
personal_access_token_secret: Optional[str] = None
dashboard_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all()
chart_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all()
class TableauSource(Source[Entity]): class TableauSource(Source[Entity]):
"""Tableau source entity class """Tableau source entity class
@ -83,25 +69,24 @@ class TableauSource(Source[Entity]):
all_dashboard_details: all_dashboard_details:
""" """
config: TableauSourceConfig config: WorkflowSource
metadata_config: OpenMetadataServerConfig metadata_config: OpenMetadataServerConfig
status: SourceStatus status: SourceStatus
def __init__( def __init__(
self, self,
config: TableauSourceConfig, config: WorkflowSource,
metadata_config: OpenMetadataServerConfig, metadata_config: OpenMetadataServerConfig,
): ):
super().__init__() super().__init__()
self.config = config self.config = config
self.metadata_config = metadata_config self.metadata_config = metadata_config
self.connection_config = self.config.serviceConnection.__root__.config
self.client = self.tableau_client() self.client = self.tableau_client()
self.service = get_dashboard_service_or_create( self.service = get_dashboard_service_or_create(
service_name=config.service_name, service_name=config.serviceName,
dashboard_service_type=DashboardServiceType.Tableau.name, dashboard_service_type=DashboardServiceType.Tableau.name,
username=config.username, config=self.config.serviceConnection.__root__.dict(),
password=config.password.get_secret_value() if config.password else None,
dashboard_url=config.server,
metadata_config=metadata_config, metadata_config=metadata_config,
) )
self.status = SourceStatus() self.status = SourceStatus()
@ -115,31 +100,34 @@ class TableauSource(Source[Entity]):
Returns: Returns:
""" """
tableau_server_config = { tableau_server_config = {
f"{self.config.env}": { f"{self.connection_config.env}": {
"server": self.config.server, "server": self.connection_config.hostPort,
"api_version": self.config.api_version, "api_version": self.connection_config.apiVersion,
"site_name": self.config.site_name, "site_name": self.connection_config.siteName,
"site_url": self.config.site_url, "site_url": self.connection_config.siteName,
} }
} }
if self.config.username and self.config.password: if self.connection_config.username and self.connection_config.password:
tableau_server_config[self.config.env]["username"] = self.config.username tableau_server_config[self.connection_config.env][
tableau_server_config[self.config.env][ "username"
] = self.connection_config.username
tableau_server_config[self.connection_config.env][
"password" "password"
] = self.config.password.get_secret_value() ] = self.connection_config.password.get_secret_value()
elif ( elif (
self.config.personal_access_token_name self.connection_config.personalAccessTokenName
and self.config.personal_access_token_secret and self.connection_config.personalAccessTokenSecret
): ):
tableau_server_config[self.config.env][ tableau_server_config[self.connection_config.env][
"personal_access_token_name" "personal_access_token_name"
] = self.config.personal_access_token_name ] = self.connection_config.personalAccessTokenName
tableau_server_config[self.config.env][ tableau_server_config[self.connection_config.env][
"personal_access_token_secret" "personal_access_token_secret"
] = self.config.personal_access_token_secret ] = self.connection_config.personalAccessTokenSecret
try: try:
conn = TableauServerConnection( conn = TableauServerConnection(
config_json=tableau_server_config, env=self.config.env config_json=tableau_server_config,
env=self.connection_config.env,
) )
conn.sign_in().json() conn.sign_in().json()
except Exception as err: # pylint: disable=broad-except except Exception as err: # pylint: disable=broad-except
@ -148,7 +136,12 @@ class TableauSource(Source[Entity]):
@classmethod @classmethod
def create(cls, config_dict: dict, metadata_config: OpenMetadataServerConfig): def create(cls, config_dict: dict, metadata_config: OpenMetadataServerConfig):
config = TableauSourceConfig.parse_obj(config_dict) config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: TableauConnection = config.serviceConnection.__root__.config
if not isinstance(connection, TableauConnection):
raise InvalidSourceException(
f"Expected TableauConnection, but got {connection}"
)
return cls(config, metadata_config) return cls(config, metadata_config)
def prepare(self): def prepare(self):
@ -182,8 +175,8 @@ class TableauSource(Source[Entity]):
for datasource in datasource_list: for datasource in datasource_list:
try: try:
table_fqdn = datasource.split("(")[1].split(")")[0] table_fqdn = datasource.split("(")[1].split(")")[0]
dashboard_fqdn = f"{self.config.service_name}.{dashboard_name}" dashboard_fqdn = f"{self.config.serviceName}.{dashboard_name}"
table_fqdn = f"{self.config.db_service_name}.{table_fqdn}" table_fqdn = f"{self.config.serviceName}.{table_fqdn}"
table_entity = self.metadata_client.get_by_name( table_entity = self.metadata_client.get_by_name(
entity=Table, fqdn=table_fqdn entity=Table, fqdn=table_fqdn
) )
@ -240,7 +233,7 @@ class TableauSource(Source[Entity]):
service=EntityReference(id=self.service.id, type="dashboardService"), service=EntityReference(id=self.service.id, type="dashboardService"),
last_modified=dateparser.parse(chart["updatedAt"]).timestamp() * 1000, last_modified=dateparser.parse(chart["updatedAt"]).timestamp() * 1000,
) )
if self.config.db_service_name: if self.config.serviceName:
yield from self.get_lineage(datasource_list, dashboard_id) yield from self.get_lineage(datasource_list, dashboard_id)
def _get_tableau_charts(self): def _get_tableau_charts(self):
@ -250,7 +243,7 @@ class TableauSource(Source[Entity]):
chart_tags = self.all_dashboard_details["tags"][index] chart_tags = self.all_dashboard_details["tags"][index]
chart_type = self.all_dashboard_details["sheetType"][index] chart_type = self.all_dashboard_details["sheetType"][index]
chart_url = ( chart_url = (
f"{self.config.server}/#/site/{self.config.site_name}" f"{self.connection_config.hostPort}/#/site/{self.connection_config.siteName}"
f"{self.all_dashboard_details['contentUrl'][index]}" f"{self.all_dashboard_details['contentUrl'][index]}"
) )
chart_owner = self.all_dashboard_details["owner"][index] chart_owner = self.all_dashboard_details["owner"][index]