Fixed Redash Source Issues (#10570)

* Imporved redash source

* Added docs

* Addressed review comments
This commit is contained in:
Onkar Ravgan 2023-03-14 23:00:49 +05:30 committed by GitHub
parent 48512ae334
commit 93e554ae67
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 214 additions and 17 deletions

View File

@ -193,7 +193,7 @@ plugins: Dict[str, Set[str]] = {
"presto": {*COMMONS["hive"]}, "presto": {*COMMONS["hive"]},
"pymssql": {"pymssql==2.2.5"}, "pymssql": {"pymssql==2.2.5"},
"quicksight": {VERSIONS["boto3"]}, "quicksight": {VERSIONS["boto3"]},
"redash": {"redash-toolbelt~=0.1"}, "redash": {"packaging==21.3"},
"redpanda": {*COMMONS["kafka"]}, "redpanda": {*COMMONS["kafka"]},
"redshift": { "redshift": {
"sqlalchemy-redshift~=0.8", "sqlalchemy-redshift~=0.8",

View File

@ -137,7 +137,8 @@ def get_api_version(api_version: str) -> str:
Returns: Returns:
str str
""" """
api_version = api_version or os.environ.get("APCA_API_VERSION") if api_version is None:
api_version = os.environ.get("APCA_API_VERSION")
if api_version is None: if api_version is None:
api_version = "v1" api_version = "v1"

View File

@ -221,7 +221,7 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC):
""" """
Method to fetch dashboard tags Method to fetch dashboard tags
""" """
return # Dashboard does not support fetching tags except Tableau return # Dashboard does not support fetching tags except Tableau and Redash
def yield_dashboard_usage( def yield_dashboard_usage(
self, *args, **kwargs # pylint: disable=W0613 self, *args, **kwargs # pylint: disable=W0613

View File

@ -0,0 +1,66 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
REST Auth & Client for Redash
"""
from metadata.ingestion.ometa.client import REST, ClientConfig
from metadata.utils.logger import utils_logger
logger = utils_logger()
class RedashApiClient:
"""
REST Auth & Client for Redash
"""
client: REST
def __init__(self, config):
self.config = config
client_config = ClientConfig(
base_url=config.hostPort,
api_version="",
access_token=config.apiKey.get_secret_value(),
auth_header="Authorization",
auth_token_mode="Key",
allow_redirects=True,
)
self.client = REST(client_config)
def dashboards(self, page=1, page_size=25):
"""GET api/dashboards"""
params_data = {"page": page, "page_size": page_size}
return self.client.get(path="api/dashboards", data=params_data)
def get_dashboard(self, slug):
"""GET api/dashboards/<slug>"""
# The API changed from redash v9 onwards
# legacy=true allows us to get the results in the old way
return self.client.get(
f"api/dashboards/{slug}?legacy=true",
)
def paginate(self, resource, page=1, page_size=25, **kwargs):
"""Load all items of a paginated resource"""
response = resource(page=page, page_size=page_size, **kwargs)
items = response["results"]
if response["page"] * response["page_size"] >= response["count"]:
return items
return [
*items,
*self.paginate(resource, page=page + 1, page_size=page_size, **kwargs),
]

View File

@ -12,26 +12,26 @@
""" """
Source connection handler Source connection handler
""" """
from redash_toolbelt import Redash
from metadata.generated.schema.entity.services.connections.dashboard.redashConnection import ( from metadata.generated.schema.entity.services.connections.dashboard.redashConnection import (
RedashConnection, RedashConnection,
) )
from metadata.ingestion.connections.test_connections import SourceConnectionException from metadata.ingestion.connections.test_connections import SourceConnectionException
from metadata.ingestion.source.dashboard.redash.client import RedashApiClient
def get_connection(connection: RedashConnection) -> Redash: def get_connection(connection: RedashConnection) -> RedashApiClient:
""" """
Create connection Create connection
""" """
try: try:
return Redash(connection.hostPort, connection.apiKey.get_secret_value()) return RedashApiClient(connection)
except Exception as exc: except Exception as exc:
msg = f"Unknown error connecting with {connection}: {exc}." msg = f"Unknown error connecting with {connection}: {exc}."
raise SourceConnectionException(msg) from exc raise SourceConnectionException(msg) from exc
def test_connection(client: Redash, _) -> None: def test_connection(client: RedashApiClient, _) -> None:
""" """
Test connection Test connection
""" """

View File

@ -14,9 +14,16 @@ Redash source module
import traceback import traceback
from typing import Iterable, List, Optional from typing import Iterable, List, Optional
from packaging import version
from metadata.generated.schema.api.classification.createClassification import (
CreateClassificationRequest,
)
from metadata.generated.schema.api.classification.createTag import CreateTagRequest
from metadata.generated.schema.api.data.createChart import CreateChartRequest from metadata.generated.schema.api.data.createChart import CreateChartRequest
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.classification.tag import Tag
from metadata.generated.schema.entity.data.chart import Chart from metadata.generated.schema.entity.data.chart import Chart
from metadata.generated.schema.entity.data.dashboard import ( from metadata.generated.schema.entity.data.dashboard import (
Dashboard as LineageDashboard, Dashboard as LineageDashboard,
@ -31,8 +38,16 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource, Source as WorkflowSource,
) )
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.tagLabel import (
LabelType,
State,
TagLabel,
TagSource,
)
from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.lineage.parser import LineageParser from metadata.ingestion.lineage.parser import LineageParser
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource
from metadata.utils import fqn from metadata.utils import fqn
from metadata.utils.filters import filter_by_chart from metadata.utils.filters import filter_by_chart
@ -41,12 +56,26 @@ from metadata.utils.logger import ingestion_logger
logger = ingestion_logger() logger = ingestion_logger()
REDASH_TAG_CATEGORY = "RedashTags"
INCOMPATIBLE_REDASH_VERSION = "8.0.0"
class RedashSource(DashboardServiceSource): class RedashSource(DashboardServiceSource):
""" """
Redash Source Class Redash Source Class
""" """
def __init__(
self,
config: WorkflowSource,
metadata_config: OpenMetadataConnection,
):
super().__init__(config, metadata_config)
self.dashboard_list = [] # We will populate this in `prepare`
self.tags = [] # To create the tags before yielding final entities
@classmethod @classmethod
def create(cls, config_dict: dict, metadata_config: OpenMetadataConnection): def create(cls, config_dict: dict, metadata_config: OpenMetadataConnection):
config: WorkflowSource = WorkflowSource.parse_obj(config_dict) config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
@ -57,12 +86,64 @@ class RedashSource(DashboardServiceSource):
) )
return cls(config, metadata_config) return cls(config, metadata_config)
def prepare(self):
"""
Fetch the paginated list of dashboards and tags
"""
self.dashboard_list = self.client.paginate(self.client.dashboards)
# Collecting all the tags
for dashboard in self.dashboard_list:
self.tags.extend(dashboard.get("tags") or [])
def yield_tag(self, *_, **__) -> OMetaTagAndClassification:
"""
Fetch Dashboard Tags
"""
for tag in self.tags:
try:
classification = OMetaTagAndClassification(
classification_request=CreateClassificationRequest(
name=REDASH_TAG_CATEGORY,
description="Tags associates with redash entities",
),
tag_request=CreateTagRequest(
classification=REDASH_TAG_CATEGORY,
name=tag,
description="Redash Tag",
),
)
yield classification
logger.info(f"Classification {REDASH_TAG_CATEGORY}, Tag {tag} Ingested")
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error ingesting tag {tag}: {exc}")
def get_tag_labels(self, tags: Optional[List[str]]) -> Optional[List[TagLabel]]:
if tags:
return [
TagLabel(
tagFQN=fqn.build(
self.metadata,
Tag,
classification_name=REDASH_TAG_CATEGORY,
tag_name=tag,
),
labelType=LabelType.Automated.value,
state=State.Suggested.value,
source=TagSource.Classification.value,
)
for tag in tags
]
return None
def get_dashboards_list(self) -> Optional[List[dict]]: def get_dashboards_list(self) -> Optional[List[dict]]:
""" """
Get List of all dashboards Get List of all dashboards
""" """
dashboard_info = self.client.dashboards()
return dashboard_info["results"] return self.dashboard_list
def get_dashboard_name(self, dashboard: dict) -> str: def get_dashboard_name(self, dashboard: dict) -> str:
""" """
@ -76,6 +157,45 @@ class RedashSource(DashboardServiceSource):
""" """
return self.client.get_dashboard(dashboard["slug"]) return self.client.get_dashboard(dashboard["slug"])
def get_owner_details(self, dashboard_details) -> Optional[EntityReference]:
"""Get dashboard owner
Args:
dashboard_details:
Returns:
Optional[EntityReference]
"""
if dashboard_details.get("user") and dashboard_details["user"].get("email"):
user = self.metadata.get_user_by_email(
dashboard_details["user"].get("email")
)
if user:
return EntityReference(id=user.id.__root__, type="user")
return None
def process_owner(self, dashboard_details) -> Optional[LineageDashboard]:
try:
owner = self.get_owner_details(dashboard_details=dashboard_details)
if owner and self.source_config.overrideOwner:
self.metadata.patch_owner(
entity=LineageDashboard,
entity_id=self.context.dashboard.id,
owner=owner,
force=True,
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error processing owner for {dashboard_details}: {exc}")
def get_dashboard_url(self, dashboard_details: dict) -> str:
if version.parse(self.service_connection.redashVersion) > version.parse(
INCOMPATIBLE_REDASH_VERSION
):
dashboard_url = f"/dashboards/{dashboard_details.get('id', '')}"
else:
dashboard_url = f"/dashboards/{dashboard_details.get('slug', '')}"
return dashboard_url
def yield_dashboard( def yield_dashboard(
self, dashboard_details: dict self, dashboard_details: dict
) -> Iterable[CreateDashboardRequest]: ) -> Iterable[CreateDashboardRequest]:
@ -84,8 +204,9 @@ class RedashSource(DashboardServiceSource):
""" """
try: try:
dashboard_description = "" dashboard_description = ""
for widgets in dashboard_details.get("widgets", []): for widgets in dashboard_details.get("widgets") or []:
dashboard_description = widgets.get("text") dashboard_description = widgets.get("text")
yield CreateDashboardRequest( yield CreateDashboardRequest(
name=dashboard_details["id"], name=dashboard_details["id"],
displayName=dashboard_details.get("name"), displayName=dashboard_details.get("name"),
@ -100,7 +221,8 @@ class RedashSource(DashboardServiceSource):
for chart in self.context.charts for chart in self.context.charts
], ],
service=self.context.dashboard_service.fullyQualifiedName.__root__, service=self.context.dashboard_service.fullyQualifiedName.__root__,
dashboardUrl=f"/dashboard/{dashboard_details.get('slug', '')}", dashboardUrl=self.get_dashboard_url(dashboard_details),
tags=self.get_tag_labels(dashboard_details.get("tags")),
) )
self.status.scanned(dashboard_details["name"]) self.status.scanned(dashboard_details["name"])
@ -127,7 +249,7 @@ class RedashSource(DashboardServiceSource):
entity=LineageDashboard, entity=LineageDashboard,
fqn=to_fqn, fqn=to_fqn,
) )
for widgets in dashboard_details.get("widgets", []): for widgets in dashboard_details.get("widgets") or []:
try: try:
visualization = widgets.get("visualization") visualization = widgets.get("visualization")
if not visualization: if not visualization:
@ -165,7 +287,7 @@ class RedashSource(DashboardServiceSource):
""" """
Metod to fetch charts linked to dashboard Metod to fetch charts linked to dashboard
""" """
for widgets in dashboard_details.get("widgets", []): for widgets in dashboard_details.get("widgets") or []:
try: try:
visualization = widgets.get("visualization") visualization = widgets.get("visualization")
chart_display_name = str( chart_display_name = str(
@ -185,7 +307,7 @@ class RedashSource(DashboardServiceSource):
visualization["type"] if visualization else "" visualization["type"] if visualization else ""
), ),
service=self.context.dashboard_service.fullyQualifiedName.__root__, service=self.context.dashboard_service.fullyQualifiedName.__root__,
chartUrl=f"/dashboard/{dashboard_details.get('slug', '')}", chartUrl=self.get_dashboard_url(dashboard_details),
description=visualization["description"] if visualization else "", description=visualization["description"] if visualization else "",
) )
except Exception as exc: except Exception as exc:
@ -193,6 +315,3 @@ class RedashSource(DashboardServiceSource):
logger.warning( logger.warning(
f"Error to yield dashboard chart for widget_id: {widgets['id']} and {dashboard_details}: {exc}" f"Error to yield dashboard chart for widget_id: {widgets['id']} and {dashboard_details}: {exc}"
) )
def close(self):
self.client.session.close()

View File

@ -55,6 +55,7 @@ source:
hostPort: http://localhost:5000 hostPort: http://localhost:5000
apiKey: api_key apiKey: api_key
username: random username: random
redashVersion: 10.0.0
sourceConfig: sourceConfig:
config: config:
type: DashboardMetadata type: DashboardMetadata
@ -91,6 +92,7 @@ workflowConfig:
- **hostPort**: URL to the Redash instance. - **hostPort**: URL to the Redash instance.
- **username**: Specify the User to connect to Redash. It should have enough privileges to read all the metadata. - **username**: Specify the User to connect to Redash. It should have enough privileges to read all the metadata.
- **apiKey**: API key of the redash instance to access. - **apiKey**: API key of the redash instance to access.
- **Redash Version**: (Default: 10.0.0) Redash version of your redash instance. Enter the numerical value from the [Redash Releases](https://github.com/getredash/redash/releases) page.
#### Source Configuration - Source Config #### Source Configuration - Source Config

View File

@ -55,6 +55,7 @@ source:
hostPort: http://localhost:5000 hostPort: http://localhost:5000
apiKey: api_key apiKey: api_key
username: random username: random
redashVersion: 10.0.0
sourceConfig: sourceConfig:
config: config:
type: DashboardMetadata type: DashboardMetadata
@ -91,6 +92,7 @@ workflowConfig:
- **hostPort**: URL to the Redash instance. - **hostPort**: URL to the Redash instance.
- **username**: Specify the User to connect to Redash. It should have enough privileges to read all the metadata. - **username**: Specify the User to connect to Redash. It should have enough privileges to read all the metadata.
- **apiKey**: API key of the redash instance to access. - **apiKey**: API key of the redash instance to access.
- **Redash Version**: (Default: 10.0.0) Redash version of your redash instance. Enter the numerical value from the [Redash Releases](https://github.com/getredash/redash/releases) page.
#### Source Configuration - Source Config #### Source Configuration - Source Config

View File

@ -132,6 +132,7 @@ the changes.
- **Host and Port**: URL to the Redash instance. - **Host and Port**: URL to the Redash instance.
- **Username**: Specify the User to connect to Redash. It should have enough privileges to read all the metadata. - **Username**: Specify the User to connect to Redash. It should have enough privileges to read all the metadata.
- **API Key**: API key of the redash instance to access. - **API Key**: API key of the redash instance to access.
- **Redash Version**: (Default: 10.0.0) Redash version of your redash instance. Enter the numerical value from the [Redash Releases](https://github.com/getredash/redash/releases) page
### 6. Configure Metadata Ingestion ### 6. Configure Metadata Ingestion

Binary file not shown.

Before

Width:  |  Height:  |  Size: 66 KiB

After

Width:  |  Height:  |  Size: 161 KiB

View File

@ -39,6 +39,12 @@
"type": "string", "type": "string",
"format": "password" "format": "password"
}, },
"redashVersion": {
"title": "Redash Version",
"description": "Version of the Redash instance",
"type": "string",
"default": "10.0.0"
},
"supportsMetadataExtraction": { "supportsMetadataExtraction": {
"title": "Supports Metadata Extraction", "title": "Supports Metadata Extraction",
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction" "$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"