Fix #5338: Dashboard Connector's to return create entities (#5341)

Fix #5338: Dashboard Connector's to return create entities (#5341)
This commit is contained in:
Mayur Singal 2022-06-08 14:33:48 +05:30 committed by GitHub
parent f7438f5469
commit b15ce6f52b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 340 additions and 337 deletions

View File

@ -32,8 +32,10 @@ class InvalidSourceException(Exception):
class SourceStatus(Status):
records = 0
warnings: List[str] = field(default_factory=list)
success: List[str] = field(default_factory=list)
failures: List[str] = field(default_factory=list)
warnings: List[str] = field(default_factory=list)
filtered: List[str] = field(default_factory=list)
def scanned(self, record: Any) -> None:
self.records += 1
@ -44,6 +46,9 @@ class SourceStatus(Status):
def failure(self, key: str, reason: str) -> None:
self.failures.append({key: reason})
def filter(self, key: str, reason: str) -> None:
self.filtered.append({key: reason})
class Source(Closeable, Generic[Entity], metaclass=ABCMeta):
@classmethod

View File

@ -10,7 +10,7 @@
# limitations under the License.
from typing import Any, Dict, List, Optional
from typing import List, Optional
from pydantic import BaseModel
@ -220,39 +220,3 @@ class GlossaryTermESDocument(BaseModel):
suggest: List[dict]
last_updated_timestamp: Optional[int]
doc_as_upsert: bool = True
class DashboardOwner(BaseModel):
"""Dashboard owner"""
username: str
first_name: str
last_name: str
class Chart(BaseModel):
"""Chart"""
name: str
displayName: str
description: str
chart_type: str
url: str
owners: List[DashboardOwner] = None
lastModified: int = None
datasource_fqn: str = None
service: EntityReference
custom_props: Dict[Any, Any] = None
class Dashboard(BaseModel):
"""Dashboard"""
name: str
displayName: str
description: str
url: str
owners: List[DashboardOwner] = None
charts: List[str]
service: EntityReference
lastModified: int = None

View File

@ -33,7 +33,6 @@ from metadata.generated.schema.api.policies.createPolicy import CreatePolicyRequ
from metadata.generated.schema.api.teams.createRole import CreateRoleRequest
from metadata.generated.schema.api.teams.createTeam import CreateTeamRequest
from metadata.generated.schema.api.teams.createUser import CreateUserRequest
from metadata.generated.schema.entity.data.chart import ChartType
from metadata.generated.schema.entity.data.location import Location
from metadata.generated.schema.entity.data.pipeline import Pipeline
from metadata.generated.schema.entity.data.table import Table
@ -50,7 +49,7 @@ from metadata.ingestion.models.ometa_policy import OMetaPolicy
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.models.ometa_tag_category import OMetaTagAndCategory
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.models.table_metadata import Chart, Dashboard, DeleteTable
from metadata.ingestion.models.table_metadata import DeleteTable
from metadata.ingestion.models.table_tests import OMetaTableTest
from metadata.ingestion.models.user import OMetaUserProfile
from metadata.ingestion.ometa.client import APIError
@ -66,20 +65,6 @@ logger = ingestion_logger()
# Allow types from the generated pydantic models
T = TypeVar("T", bound=BaseModel)
om_chart_type_dict = {
"line": ChartType.Line,
"table": ChartType.Table,
"dist_bar": ChartType.Bar,
"bar": ChartType.Bar,
"big_number": ChartType.Line,
"histogram": ChartType.Histogram,
"big_number_total": ChartType.Line,
"dual_line": ChartType.Line,
"line_multi": ChartType.Line,
"treemap": ChartType.Area,
"box_plot": ChartType.Bar,
}
class MetadataRestSinkConfig(ConfigModel):
api_endpoint: str = None
@ -117,9 +102,9 @@ class MetadataRestSink(Sink[Entity]):
self.write_tables(record)
elif isinstance(record, Topic):
self.write_topics(record)
elif isinstance(record, Chart):
elif isinstance(record, CreateChartRequest):
self.write_charts(record)
elif isinstance(record, Dashboard):
elif isinstance(record, CreateDashboardRequest):
self.write_dashboards(record)
elif isinstance(record, Location):
self.write_locations(record)
@ -297,27 +282,9 @@ class MetadataRestSink(Sink[Entity]):
logger.error(err)
self.status.failure(f"Topic: {topic.name}")
def write_charts(self, chart: Chart):
def write_charts(self, chart: CreateChartRequest):
try:
om_chart_type = ChartType.Other
if (
chart.chart_type is not None
and chart.chart_type in om_chart_type_dict.keys()
):
om_chart_type = om_chart_type_dict[chart.chart_type]
chart_request = CreateChartRequest(
name=chart.name,
displayName=chart.displayName,
description=chart.description,
chartType=om_chart_type,
chartUrl=chart.url,
service=chart.service,
)
created_chart = self.metadata.create_or_update(chart_request)
self.charts_dict[chart.name] = EntityReference(
id=created_chart.id, type="chart"
)
created_chart = self.metadata.create_or_update(chart)
logger.info(f"Successfully ingested chart {created_chart.displayName}")
self.status.records_written(f"Chart: {created_chart.displayName}")
except (APIError, ValidationError) as err:
@ -325,19 +292,9 @@ class MetadataRestSink(Sink[Entity]):
logger.error(err)
self.status.failure(f"Chart: {chart.displayName}")
def write_dashboards(self, dashboard: Dashboard):
def write_dashboards(self, dashboard: CreateDashboardRequest):
try:
charts = self._get_chart_references(dashboard)
dashboard_request = CreateDashboardRequest(
name=dashboard.name,
displayName=dashboard.displayName,
description=dashboard.description,
dashboardUrl=dashboard.url,
charts=charts,
service=dashboard.service,
)
created_dashboard = self.metadata.create_or_update(dashboard_request)
created_dashboard = self.metadata.create_or_update(dashboard)
logger.info(
f"Successfully ingested dashboard {created_dashboard.displayName}"
)
@ -347,13 +304,6 @@ class MetadataRestSink(Sink[Entity]):
logger.error(err)
self.status.failure(f"Dashboard {dashboard.name}")
def _get_chart_references(self, dashboard: Dashboard) -> []:
chart_references = []
for chart_id in dashboard.charts:
if chart_id in self.charts_dict.keys():
chart_references.append(self.charts_dict[chart_id])
return chart_references
def write_locations(self, location: Location):
try:
created_location = self._create_location(location)

View File

@ -1,6 +1,9 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Iterable, List, Optional, Union
from metadata.generated.schema.api.data.createChart import CreateChartRequest
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
@ -14,9 +17,7 @@ from metadata.generated.schema.metadataIngestion.workflow import (
)
from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import Source, SourceStatus
from metadata.ingestion.models.table_metadata import Chart, Dashboard
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.common_db_source import SQLSourceStatus
from metadata.utils.connections import get_connection
from metadata.utils.filters import filter_by_dashboard
from metadata.utils.logger import ingestion_logger
@ -24,6 +25,21 @@ from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
@dataclass
class DashboardSourceStatus(SourceStatus):
"""
Reports the source status after ingestion
"""
def scanned(self, record: str) -> None:
self.success.append(record)
logger.info(f"Scanned: {record}")
def filter(self, record: str, err: str) -> None:
self.filtered.append(record)
logger.warning(f"Filtered {record}: {err}")
class DashboardSourceService(Source, ABC):
@abstractmethod
def get_dashboards_list(self) -> Optional[List[Any]]:
@ -44,7 +60,7 @@ class DashboardSourceService(Source, ABC):
"""
@abstractmethod
def get_dashboard_entity(self, dashboard_details: Any) -> Dashboard:
def get_dashboard_entity(self, dashboard_details: Any) -> CreateDashboardRequest:
"""
Method to Get Dashboard Entity
"""
@ -56,7 +72,9 @@ class DashboardSourceService(Source, ABC):
"""
@abstractmethod
def fetch_dashboard_charts(self, dashboard: Any) -> Optional[Iterable[Chart]]:
def fetch_dashboard_charts(
self, dashboard: Any
) -> Optional[Iterable[CreateChartRequest]]:
"""
Method to fetch charts linked to dashboard
"""
@ -80,7 +98,7 @@ class DashboardSourceService(Source, ABC):
self.service = self.metadata.get_service_or_create(
entity=DashboardService, config=config
)
self.status = SQLSourceStatus()
self.status = DashboardSourceStatus()
self.metadata_client = OpenMetadata(self.metadata_config)
def next_record(self) -> Iterable[Entity]:
@ -88,7 +106,7 @@ class DashboardSourceService(Source, ABC):
def process_dashboards(
self,
) -> Iterable[Union[Dashboard, Chart, AddLineageRequest]]:
) -> Iterable[Union[CreateDashboardRequest, CreateChartRequest, AddLineageRequest]]:
"""Get dashboard method"""
for dashboard in self.get_dashboards_list():
try:
@ -102,6 +120,7 @@ class DashboardSourceService(Source, ABC):
"Dashboard Pattern not Allowed",
)
continue
yield from self.fetch_dashboard_charts(dashboard_details) or []
yield from self.get_dashboard_entity(dashboard_details)
if self.source_config.dbServiceName:
yield from self.get_lineage(dashboard_details)

View File

@ -15,6 +15,8 @@ from typing import Any, Iterable, List, Optional
import looker_sdk
from metadata.generated.schema.api.data.createChart import CreateChartRequest
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.services.connections.dashboard.lookerConnection import (
LookerConnection,
@ -28,9 +30,9 @@ from metadata.generated.schema.metadataIngestion.workflow import (
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.models.table_metadata import Chart, Dashboard
from metadata.ingestion.source.dashboard.dashboard_source import DashboardSourceService
from metadata.utils.filters import filter_by_chart, filter_by_dashboard
from metadata.utils.filters import filter_by_chart
from metadata.utils.helpers import get_chart_entities_from_id, get_standard_chart_type
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
@ -104,17 +106,20 @@ class LookerSource(DashboardSourceService):
]
return self.client.dashboard(dashboard_id=dashboard.id, fields=",".join(fields))
def get_dashboard_entity(self, dashboard_details) -> Dashboard:
def get_dashboard_entity(self, dashboard_details: Any) -> CreateDashboardRequest:
"""
Method to Get Dashboard Entity
"""
yield from self.fetch_dashboard_charts(dashboard_details)
yield Dashboard(
yield CreateDashboardRequest(
name=dashboard_details.id,
displayName=dashboard_details.title,
description=dashboard_details.description or "",
charts=self.chart_names,
url=f"/dashboards/{dashboard_details.id}",
charts=get_chart_entities_from_id(
chart_ids=self.chart_names,
metadata=self.metadata,
service_name=self.config.serviceName,
),
dashboardUrl=f"/dashboards/{dashboard_details.id}",
service=EntityReference(id=self.service.id, type="dashboardService"),
)
@ -125,7 +130,9 @@ class LookerSource(DashboardSourceService):
logger.info("Lineage not implemented for Looker")
return None
def fetch_dashboard_charts(self, dashboard_details) -> Optional[Iterable[Chart]]:
def fetch_dashboard_charts(
self, dashboard_details
) -> Optional[Iterable[CreateChartRequest]]:
"""
Metod to fetch charts linked to dashboard
"""
@ -137,14 +144,14 @@ class LookerSource(DashboardSourceService):
chart_filter_pattern=self.source_config.chartFilterPattern,
chart_name=dashboard_elements.id,
):
self.status.failure(dashboard_elements.id, "Chart filtered out")
self.status.filter(dashboard_elements.id, "Chart filtered out")
continue
om_dashboard_elements = Chart(
om_dashboard_elements = CreateChartRequest(
name=dashboard_elements.id,
displayName=dashboard_elements.title or "",
description="",
chart_type=dashboard_elements.type,
url=f"/dashboard_elements/{dashboard_elements.id}",
chartType=get_standard_chart_type(dashboard_elements.type).value,
chartUrl=f"/dashboard_elements/{dashboard_elements.id}",
service=EntityReference(
id=self.service.id, type="dashboardService"
),

View File

@ -11,11 +11,12 @@
"""Metabase source module"""
import traceback
import uuid
from typing import Iterable, List, Optional
import requests
from metadata.generated.schema.api.data.createChart import CreateChartRequest
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.dashboard import (
Dashboard as LineageDashboard,
@ -33,14 +34,17 @@ from metadata.generated.schema.metadataIngestion.workflow import (
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.models.table_metadata import Chart, Dashboard
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.dashboard_source import DashboardSourceService
from metadata.ingestion.source.database.common_db_source import SQLSourceStatus
from metadata.utils import fqn
from metadata.utils.connections import get_connection
from metadata.utils.filters import filter_by_chart
from metadata.utils.helpers import replace_special_with
from metadata.utils.helpers import (
get_chart_entities_from_id,
get_standard_chart_type,
replace_special_with,
)
from metadata.utils.logger import ingestion_logger
HEADERS = {"Content-Type": "application/json", "Accept": "*/*"}
@ -122,7 +126,7 @@ class MetabaseSource(DashboardSourceService):
resp_dashboard = self.req_get(f"/api/dashboard/{dashboard['id']}")
return resp_dashboard.json()
def get_dashboard_entity(self, dashboard_details: dict) -> Dashboard:
def get_dashboard_entity(self, dashboard_details: dict) -> CreateDashboardRequest:
"""
Method to Get Dashboard Entity
"""
@ -130,27 +134,28 @@ class MetabaseSource(DashboardSourceService):
f"/dashboard/{dashboard_details['id']}-"
f"{replace_special_with(raw=dashboard_details['name'].lower(), replacement='-')}"
)
yield from self.fetch_dashboard_charts(dashboard_details)
yield Dashboard(
id=uuid.uuid4(),
yield CreateDashboardRequest(
name=dashboard_details["name"],
url=dashboard_url,
dashboardUrl=dashboard_url,
displayName=dashboard_details["name"],
description=dashboard_details["description"]
if dashboard_details["description"] is not None
else "",
charts=self.charts,
description=dashboard_details.get("description", ""),
charts=get_chart_entities_from_id(
chart_ids=self.charts,
metadata=self.metadata,
service_name=self.config.serviceName,
),
service=EntityReference(id=self.service.id, type="dashboardService"),
)
def fetch_dashboard_charts(self, dashboard_details: dict) -> Iterable[Chart]:
def fetch_dashboard_charts(
self, dashboard_details: dict
) -> Iterable[CreateChartRequest]:
"""Get chart method
Args:
dashboard_details:
Returns:
Iterable[Chart]
Iterable[CreateChartRequest]
"""
charts = dashboard_details["ordered_cards"]
for chart in charts:
@ -171,15 +176,14 @@ class MetabaseSource(DashboardSourceService):
chart_details["name"], "Chart Pattern not allowed"
)
continue
yield Chart(
id=uuid.uuid4(),
yield CreateChartRequest(
name=chart_details["name"],
displayName=chart_details["name"],
description=chart_details["description"]
if chart_details["description"] is not None
else "",
chart_type=str(chart_details["display"]),
url=chart_url,
description=chart_details.get("description", ""),
chartType=get_standard_chart_type(
str(chart_details["display"])
).value,
chartUrl=chart_url,
service=EntityReference(
id=self.service.id, type="dashboardService"
),
@ -256,31 +260,3 @@ class MetabaseSource(DashboardSourceService):
return requests.get(
self.service_connection.hostPort + path, headers=self.metabase_session
)
def get_card_detail(self, card_list):
# TODO: Need to handle card lineage
metadata = OpenMetadata(self.metadata_config)
for card in card_list:
try:
card_details = card["card"]
if not card_details.get("id"):
continue
card_detail_resp = self.req_get(f"/api/card/{card_details['id']}")
if card_detail_resp.status_code == 200:
raw_query = (
card_details.get("dataset_query", {})
.get("native", {})
.get("query", "")
)
except Exception as e:
logger.error(repr(e))
def get_cards(self):
"""Get cards method"""
resp_dashboards = self.req_get("/api/dashboard")
if resp_dashboards.status_code == 200:
for dashboard in resp_dashboards.json():
resp_dashboard = self.req_get(f"/api/dashboard/{dashboard['id']}")
dashboard_details = resp_dashboard.json()
card_list = dashboard_details["ordered_cards"]
self.get_card_detail(card_list)

View File

@ -11,13 +11,13 @@
"""PowerBI source module"""
import traceback
import uuid
from typing import Any, Iterable, List, Optional
from metadata.generated.schema.api.data.createChart import CreateChartRequest
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.dashboard import (
Dashboard as LineageDashboard,
)
from metadata.generated.schema.entity.data.chart import ChartType
from metadata.generated.schema.entity.data.dashboard import Dashboard
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.services.connections.dashboard.powerBIConnection import (
PowerBIConnection,
@ -31,10 +31,10 @@ from metadata.generated.schema.metadataIngestion.workflow import (
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.models.table_metadata import Chart, Dashboard
from metadata.ingestion.source.dashboard.dashboard_source import DashboardSourceService
from metadata.utils import fqn
from metadata.utils.filters import filter_by_chart
from metadata.utils.helpers import get_chart_entities_from_id
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
@ -94,18 +94,21 @@ class PowerbiSource(DashboardSourceService):
"""
return dashboard
def get_dashboard_entity(self, dashboard_details: dict) -> Dashboard:
def get_dashboard_entity(self, dashboard_details: dict) -> CreateDashboardRequest:
"""
Method to Get Dashboard Entity, Dashboard Charts & Lineage
"""
yield from self.fetch_dashboard_charts(dashboard_details)
yield Dashboard(
yield CreateDashboardRequest(
name=dashboard_details["id"],
# PBI has no hostPort property. All URL details are present in the webUrl property.
url=dashboard_details["webUrl"],
dashboardUrl=dashboard_details["webUrl"],
displayName=dashboard_details["displayName"],
description="",
charts=self.charts,
charts=get_chart_entities_from_id(
chart_ids=self.charts,
metadata=self.metadata,
service_name=self.config.serviceName,
),
service=EntityReference(id=self.service.id, type="dashboardService"),
)
@ -138,12 +141,12 @@ class PowerbiSource(DashboardSourceService):
)
to_fqn = fqn.build(
self.metadata,
entity_type=LineageDashboard,
entity_type=Dashboard,
service_name=self.config.serviceName,
dashboard_name=dashboard_details["id"],
)
to_entity = self.metadata.get_by_name(
entity=LineageDashboard,
entity=Dashboard,
fqn=to_fqn,
)
if from_entity and to_entity:
@ -162,7 +165,9 @@ class PowerbiSource(DashboardSourceService):
logger.debug(traceback.format_exc())
logger.error(err)
def fetch_dashboard_charts(self, dashboard_details: dict) -> Iterable[Chart]:
def fetch_dashboard_charts(
self, dashboard_details: dict
) -> Iterable[CreateChartRequest]:
"""Get chart method
Args:
dashboard_details:
@ -179,18 +184,15 @@ class PowerbiSource(DashboardSourceService):
if filter_by_chart(
self.source_config.chartFilterPattern, chart["title"]
):
self.status.filter(
chart["title"], "Filtered out using Chart filter pattern"
)
self.status.filter(chart["title"], "Chart Pattern not Allowed")
continue
yield Chart(
id=uuid.uuid4(),
yield CreateChartRequest(
name=chart["id"],
displayName=chart["title"],
description="",
chart_type="", # Fix this with https://github.com/open-metadata/OpenMetadata/issues/1673
chartType=ChartType.Other.value,
# PBI has no hostPort property. All URL details are present in the webUrl property.
url=chart["embedUrl"],
chartUrl=chart["embedUrl"],
service=EntityReference(
id=self.service.id, type="dashboardService"
),

View File

@ -9,14 +9,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import uuid
from dataclasses import dataclass, field
from typing import Dict, Iterable, List, Optional
from sql_metadata import Parser
from metadata.generated.schema.api.data.createChart import CreateChartRequest
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.chart import Chart
from metadata.generated.schema.entity.data.dashboard import (
Dashboard as Lineage_Dashboard,
)
@ -33,10 +33,9 @@ from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import InvalidSourceException, SourceStatus
from metadata.ingestion.models.table_metadata import Chart as ModelChart
from metadata.ingestion.models.table_metadata import Dashboard
from metadata.ingestion.source.dashboard.dashboard_source import DashboardSourceService
from metadata.utils import fqn
from metadata.utils.helpers import get_chart_entities_from_id, get_standard_chart_type
from metadata.utils.logger import ingestion_logger
from metadata.utils.sql_lineage import search_table_entities
@ -100,24 +99,25 @@ class RedashSource(DashboardSourceService):
"""
return self.client.get_dashboard(dashboard["slug"])
def get_dashboard_entity(self, dashboard_details: dict) -> Dashboard:
def get_dashboard_entity(self, dashboard_details: dict) -> CreateDashboardRequest:
"""
Method to Get Dashboard Entity
"""
yield from self.fetch_dashboard_charts(dashboard_details)
self.status.item_scanned_status()
dashboard_description = ""
for widgets in dashboard_details.get("widgets", []):
dashboard_description = widgets.get("text")
yield Dashboard(
id=uuid.uuid4(),
yield CreateDashboardRequest(
name=dashboard_details.get("id"),
displayName=dashboard_details["name"],
description=dashboard_description if dashboard_details else "",
charts=self.dashboards_to_charts[dashboard_details.get("id")],
usageSummary=None,
charts=get_chart_entities_from_id(
chart_ids=self.dashboards_to_charts[dashboard_details.get("id")],
metadata=self.metadata,
service_name=self.config.serviceName,
),
service=EntityReference(id=self.service.id, type="dashboardService"),
url=f"/dashboard/{dashboard_details.get('slug', '')}",
dashboardUrl=f"/dashboard/{dashboard_details.get('slug', '')}",
)
def get_lineage(
@ -137,7 +137,6 @@ class RedashSource(DashboardSourceService):
table_list = Parser(visualization["query"]["query"])
for table in table_list.tables:
dataabase_schema = None
print(table)
if "." in table:
dataabase_schema, table = fqn.split(table)[-2:]
table_entities = search_table_entities(
@ -172,7 +171,7 @@ class RedashSource(DashboardSourceService):
def fetch_dashboard_charts(
self, dashboard_details: dict
) -> Optional[Iterable[Chart]]:
) -> Optional[Iterable[CreateChartRequest]]:
"""
Metod to fetch charts linked to dashboard
"""
@ -180,14 +179,16 @@ class RedashSource(DashboardSourceService):
for widgets in dashboard_details.get("widgets", []):
visualization = widgets.get("visualization")
self.dashboards_to_charts[dashboard_details.get("id")].append(widgets["id"])
yield ModelChart(
yield CreateChartRequest(
name=widgets["id"],
displayName=visualization["query"]["name"]
if visualization and visualization["query"]
else "",
chart_type=visualization["type"] if visualization else "",
chartType=get_standard_chart_type(
visualization["type"] if visualization else ""
),
service=EntityReference(id=self.service.id, type="dashboardService"),
url=f"/dashboard/{dashboard_details.get('slug', '')}",
chartUrl=f"/dashboard/{dashboard_details.get('slug', '')}",
description=visualization["description"] if visualization else "",
)

View File

@ -18,6 +18,8 @@ from typing import List, Optional
import dateutil.parser as dateparser
from metadata.generated.schema.api.data.createChart import CreateChartRequest
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.dashboard import (
Dashboard as Lineage_Dashboard,
@ -38,9 +40,9 @@ from metadata.generated.schema.metadataIngestion.workflow import (
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.source import InvalidSourceException, SourceStatus
from metadata.ingestion.models.table_metadata import Chart, Dashboard, DashboardOwner
from metadata.ingestion.source.dashboard.dashboard_source import DashboardSourceService
from metadata.utils import fqn
from metadata.utils.helpers import get_chart_entities_from_id, get_standard_chart_type
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
@ -84,26 +86,6 @@ def get_filter_name(filter_obj):
return f"{clause} {column} {operator} {comparator}"
def get_owners(owners_obj):
"""
Get owner
Args:
owners_obj:
Returns:
list
"""
owners = []
for owner in owners_obj:
dashboard_owner = DashboardOwner(
first_name=owner["first_name"],
last_name=owner["last_name"],
username=owner["username"],
)
owners.append(dashboard_owner)
return owners
class SupersetSource(DashboardSourceService):
"""
Superset source class
@ -186,24 +168,21 @@ class SupersetSource(DashboardSourceService):
"""
return dashboard
def get_dashboard_entity(self, dashboard_details: dict) -> Dashboard:
def get_dashboard_entity(self, dashboard_details: dict) -> CreateDashboardRequest:
"""
Method to Get Dashboard Entity
"""
yield from self.fetch_dashboard_charts(dashboard_details)
last_modified = (
dateparser.parse(dashboard_details.get("changed_on_utc", "now")).timestamp()
* 1000
)
yield Dashboard(
yield CreateDashboardRequest(
name=dashboard_details["id"],
displayName=dashboard_details["dashboard_title"],
description="",
url=dashboard_details["url"],
owners=get_owners(dashboard_details["owners"]),
charts=self.charts,
dashboardUrl=dashboard_details["url"],
charts=get_chart_entities_from_id(
chart_ids=self.charts,
metadata=self.metadata,
service_name=self.config.serviceName,
),
service=EntityReference(id=self.service.id, type="dashboardService"),
lastModified=last_modified,
)
def _get_charts_of_dashboard(self, dashboard_details: dict) -> List[str]:
@ -296,41 +275,19 @@ class SupersetSource(DashboardSourceService):
return None
# pylint: disable=too-many-locals
def _build_chart(self, chart_json: dict) -> Chart:
def _build_chart(self, chart_json: dict) -> CreateChartRequest:
chart_id = chart_json["id"]
last_modified = (
dateparser.parse(chart_json.get("changed_on_utc", "now")).timestamp() * 1000
)
params = json.loads(chart_json["params"])
metrics = [
get_metric_name(metric)
for metric in (params.get("metrics", []) or [params.get("metric")])
]
filters = [
get_filter_name(filter_obj)
for filter_obj in params.get("adhoc_filters", [])
]
group_bys = params.get("groupby", []) or []
if isinstance(group_bys, str):
group_bys = [group_bys]
custom_properties = {
"Metrics": ", ".join(metrics),
"Filters": ", ".join(filters),
"Dimensions": ", ".join(group_bys),
}
chart = Chart(
chart = CreateChartRequest(
name=chart_id,
displayName=chart_json["slice_name"],
description="",
chart_type=chart_json["viz_type"],
url=chart_json["url"],
owners=get_owners(chart_json["owners"]),
datasource_fqn=self._get_datasource_fqn(chart_json["datasource_id"])
if chart_json["datasource_id"]
else None,
lastModified=last_modified,
chartType=get_standard_chart_type(chart_json["viz_type"]),
chartUrl=chart_json["url"],
service=EntityReference(id=self.service.id, type="dashboardService"),
custom_props=custom_properties,
)
yield chart

View File

@ -12,8 +12,7 @@
Tableau source module
"""
import traceback
import uuid
from typing import Iterable, List, Optional
from typing import Iterable, List, Optional, Union
import dateutil.parser as dateparser
from tableau_api_lib.utils.querying import (
@ -22,7 +21,14 @@ from tableau_api_lib.utils.querying import (
get_workbooks_dataframe,
)
from metadata.generated.schema.api.data.createChart import CreateChartRequest
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.api.tags.createTag import CreateTagRequest
from metadata.generated.schema.api.tags.createTagCategory import (
CreateTagCategoryRequest,
)
from metadata.generated.schema.api.teams.createUser import CreateUserRequest
from metadata.generated.schema.entity.data.dashboard import (
Dashboard as LineageDashboard,
)
@ -33,20 +39,24 @@ from metadata.generated.schema.entity.services.connections.dashboard.tableauConn
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.tags.tagCategory import Tag
from metadata.generated.schema.entity.teams.user import User
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.tagLabel import TagLabel
from metadata.ingestion.api.source import InvalidSourceException, SourceStatus
from metadata.ingestion.models.table_metadata import Chart, Dashboard, DashboardOwner
from metadata.ingestion.models.ometa_tag_category import OMetaTagAndCategory
from metadata.ingestion.source.dashboard.dashboard_source import DashboardSourceService
from metadata.utils import fqn
from metadata.utils.filters import filter_by_chart
from metadata.utils.fqn import FQN_SEPARATOR
from metadata.utils.helpers import get_chart_entities_from_id, get_standard_chart_type
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
TABLEAU_TAG_CATEGORY = "TableauTags"
class TableauSource(DashboardSourceService):
@ -86,26 +96,6 @@ class TableauSource(DashboardSourceService):
)
return cls(config, metadata_config)
@staticmethod
def get_owner(owner) -> List[DashboardOwner]:
"""Get dashboard owner
Args:
owner:
Returns:
List[DashboardOwner]
"""
parts = owner["fullName"].split(" ")
first_name = " ".join(parts[: len(owner) // 2])
last_name = " ".join(parts[len(owner) // 2 :])
return [
DashboardOwner(
first_name=first_name,
last_name=last_name,
username=owner["name"],
)
]
def get_dashboards_list(self) -> Optional[List[dict]]:
"""
Get List of all dashboards
@ -128,26 +118,84 @@ class TableauSource(DashboardSourceService):
"""
return dashboard
def get_dashboard_entity(self, dashboard_details: dict) -> Dashboard:
def get_dashboard_owner(self, owner: dict) -> Optional[EntityReference]:
"""Get dashboard owner
Args:
owner:
Returns:
Optional[EntityReference]
"""
try:
user_request = CreateUserRequest(
name=owner["name"], displayName=owner["fullName"], email=owner["email"]
)
created_user: User = self.metadata.create_or_update(user_request)
return EntityReference(
id=created_user.id.__root__,
type="user",
)
except Exception as err:
logger.error(err)
def create_tags(self, entity_tags: dict) -> OMetaTagAndCategory:
"""
Fetch Dashboard Tags
"""
if entity_tags.get("tag"):
for tag in entity_tags["tag"]:
tag_category = OMetaTagAndCategory(
category_name=CreateTagCategoryRequest(
name=TABLEAU_TAG_CATEGORY,
description="Tags associates with amundsen entities",
categoryType="Descriptive",
),
category_details=CreateTagRequest(
name=tag["label"], description="Amundsen Table Tag"
),
)
yield tag_category
logger.info(f"Tag Category {tag_category}, Primary Tag {tag} Ingested")
def get_tag_lables(self, tags: dict) -> Optional[List[TagLabel]]:
if tags.get("tag"):
return [
TagLabel(
tagFQN=fqn.build(
self.metadata,
Tag,
tag_category_name=TABLEAU_TAG_CATEGORY,
tag_name=tag["label"],
),
labelType="Automated",
state="Suggested",
source="Tag",
)
for tag in tags["tag"]
]
return []
def get_dashboard_entity(
self, dashboard_details: dict
) -> Union[CreateDashboardRequest, Optional[OMetaTagAndCategory]]:
"""
Method to Get Dashboard Entity
"""
yield from self.fetch_dashboard_charts(dashboard_details)
dashboard_tag = dashboard_details.get("tags")
tag_labels = []
if hasattr(dashboard_tag, "tag"):
tag_labels = [tag["label"] for tag in dashboard_tag["tag"]]
yield Dashboard(
id=uuid.uuid4(),
yield from self.create_tags(dashboard_tag)
yield CreateDashboardRequest(
name=dashboard_details.get("name"),
displayName=dashboard_details.get("name"),
description="",
owner=self.get_owner(self.owner),
charts=self.charts,
tags=tag_labels,
url=dashboard_details.get("webpageUrl"),
owner=self.get_dashboard_owner(self.owner),
charts=get_chart_entities_from_id(
chart_ids=self.charts,
metadata=self.metadata,
service_name=self.config.serviceName,
),
tags=self.get_tag_lables(dashboard_tag),
dashboardUrl=dashboard_details.get("webpageUrl"),
service=EntityReference(id=self.service.id, type="dashboardService"),
last_modified=dateparser.parse(self.chart["updatedAt"]).timestamp() * 1000,
)
def get_lineage(self, dashboard_details: dict) -> Optional[AddLineageRequest]:
@ -207,7 +255,7 @@ class TableauSource(DashboardSourceService):
def fetch_dashboard_charts(
self, dashboard_details: dict
) -> Optional[Iterable[Chart]]:
) -> Optional[Iterable[CreateChartRequest]]:
"""
Method to fetch charts linked to dashboard
"""
@ -233,24 +281,19 @@ class TableauSource(DashboardSourceService):
f"views/{self.all_dashboard_details['workbook'][index]['name']}/"
f"{self.all_dashboard_details['viewUrlName'][index]}"
)
chart_last_modified = self.all_dashboard_details["updatedAt"][index]
tag_labels = []
if hasattr(chart_tags, "tag"):
for tag in chart_tags["tag"]:
tag_labels.append(tag["label"])
yield Chart(
id=uuid.uuid4(),
yield from self.create_tags(chart_tags)
yield CreateChartRequest(
name=chart_id,
displayName=chart_name,
description="",
chart_type=self.all_dashboard_details["sheetType"][index],
url=chart_url,
owners=self.get_owner(
chartType=get_standard_chart_type(
self.all_dashboard_details["sheetType"][index]
),
chartUrl=chart_url,
owner=self.get_dashboard_owner(
self.all_dashboard_details["owner"][index]
),
datasource_fqn=chart_url.replace("/", FQN_SEPARATOR),
last_modified=dateparser.parse(chart_last_modified).timestamp()
* 1000,
tags=self.get_tag_lables(chart_tags),
service=EntityReference(
id=self.service.id, type="dashboardService"
),

View File

@ -13,9 +13,9 @@ Generic source to build SQL connectors.
"""
import traceback
from dataclasses import dataclass, field
from dataclasses import dataclass
from datetime import datetime
from typing import Iterable, List, Optional, Tuple
from typing import Iterable, Optional, Tuple
from sqlalchemy.engine import Connection
from sqlalchemy.engine.base import Engine
@ -56,11 +56,6 @@ class SQLSourceStatus(SourceStatus):
Reports the source status after ingestion
"""
success: List[str] = field(default_factory=list)
failures: List[str] = field(default_factory=list)
warnings: List[str] = field(default_factory=list)
filtered: List[str] = field(default_factory=list)
def scanned(self, record: str) -> None:
self.success.append(record)
logger.info(f"Table Scanned: {record}")

View File

@ -21,6 +21,8 @@ from typing import Any, Dict, Iterable, List, Union
from pydantic import ValidationError
from metadata.generated.schema.api.data.createChart import CreateChartRequest
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
from metadata.generated.schema.api.data.createMlModel import CreateMlModelRequest
from metadata.generated.schema.api.data.createTopic import CreateTopicRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
@ -29,6 +31,7 @@ from metadata.generated.schema.api.teams.createTeam import CreateTeamRequest
from metadata.generated.schema.api.teams.createUser import CreateUserRequest
from metadata.generated.schema.api.tests.createColumnTest import CreateColumnTestRequest
from metadata.generated.schema.api.tests.createTableTest import CreateTableTestRequest
from metadata.generated.schema.entity.data.dashboard import Dashboard
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.location import Location, LocationType
@ -65,11 +68,14 @@ from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.models.table_metadata import Chart, Dashboard
from metadata.ingestion.models.table_tests import OMetaTableTest
from metadata.ingestion.models.user import OMetaUserProfile
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.helpers import get_storage_service_or_create
from metadata.utils.helpers import (
get_chart_entities_from_id,
get_standard_chart_type,
get_storage_service_or_create,
)
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
@ -479,15 +485,15 @@ class SampleDataSource(Source[Entity]):
self.status.scanned("topic", create_topic.name.__root__)
yield create_topic
def ingest_charts(self) -> Iterable[Chart]:
def ingest_charts(self) -> Iterable[CreateChartRequest]:
for chart in self.charts["charts"]:
try:
chart_ev = Chart(
chart_ev = CreateChartRequest(
name=chart["name"],
displayName=chart["displayName"],
description=chart["description"],
chart_type=chart["chartType"],
url=chart["chartUrl"],
chartType=get_standard_chart_type(chart["chartType"]).value,
chartUrl=chart["chartUrl"],
service=EntityReference(
id=self.dashboard_service.id, type="dashboardService"
),
@ -497,22 +503,26 @@ class SampleDataSource(Source[Entity]):
except ValidationError as err:
logger.error(err)
def ingest_dashboards(self) -> Iterable[Dashboard]:
def ingest_dashboards(self) -> Iterable[CreateDashboardRequest]:
for dashboard in self.dashboards["dashboards"]:
dashboard_ev = Dashboard(
dashboard_ev = CreateDashboardRequest(
name=dashboard["name"],
displayName=dashboard["displayName"],
description=dashboard["description"],
url=dashboard["dashboardUrl"],
charts=dashboard["charts"],
dashboardUrl=dashboard["dashboardUrl"],
charts=get_chart_entities_from_id(
dashboard["charts"],
self.metadata,
self.dashboard_service.name.__root__,
),
service=EntityReference(
id=self.dashboard_service.id, type="dashboardService"
),
)
self.status.scanned("dashboard", dashboard_ev.name)
self.status.scanned("dashboard", dashboard_ev.name.__root__)
yield dashboard_ev
def ingest_pipelines(self) -> Iterable[Dashboard]:
def ingest_pipelines(self) -> Iterable[Pipeline]:
for pipeline in self.pipelines["pipelines"]:
pipeline_ev = Pipeline(
id=uuid.uuid4(),

View File

@ -17,6 +17,8 @@ from typing import Iterable, List, Optional
from pydantic import SecretStr
from metadata.config.common import ConfigModel
from metadata.generated.schema.api.data.createChart import CreateChartRequest
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
from metadata.generated.schema.api.tags.createTag import CreateTagRequest
from metadata.generated.schema.api.tags.createTagCategory import (
CreateTagCategoryRequest,
@ -49,13 +51,16 @@ from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.models.ometa_tag_category import OMetaTagAndCategory
from metadata.ingestion.models.table_metadata import Chart, Dashboard
from metadata.ingestion.models.user import OMetaUserProfile
from metadata.ingestion.ometa.client import APIError
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils import fqn
from metadata.utils.column_type_parser import ColumnTypeParser
from metadata.utils.helpers import get_dashboard_service_or_create
from metadata.utils.helpers import (
get_chart_entities_from_id,
get_dashboard_service_or_create,
get_standard_chart_type,
)
from metadata.utils.logger import ingestion_logger
from metadata.utils.neo4j_helper import Neo4JConfig, Neo4jHelper
from metadata.utils.sql_queries import (
@ -307,12 +312,16 @@ class AmundsenSource(Source[Entity]):
self.metadata_config,
)
self.status.scanned(dashboard["name"])
yield Dashboard(
yield CreateDashboardRequest(
name=dashboard["name"],
displayName=dashboard["name"],
description="",
url=dashboard["url"],
charts=dashboard["chart_ids"],
dashboardUrl=dashboard["url"],
charts=get_chart_entities_from_id(
chart_ids=dashboard["chart_ids"],
metadata=self.metadata,
service_name=service_entity.name.__root__,
),
service=EntityReference(id=service_entity.id, type="dashboardService"),
)
except Exception as e:
@ -329,19 +338,18 @@ class AmundsenSource(Source[Entity]):
{"username": "test", "hostPort": "http://localhost:8088"},
self.metadata_config,
)
for (name, chart_id, chart_type, url) in zip(
dashboard["chart_names"],
dashboard["chart_ids"],
dashboard["chart_types"],
dashboard["chart_urls"],
):
chart = Chart(
chart = CreateChartRequest(
name=chart_id,
displayName=name,
description="",
chart_url=url,
chart_type=chart_type,
chartUrl=url,
chartType=get_standard_chart_type(chart_type).value,
service=EntityReference(id=service_entity.id, type="dashboardService"),
)
self.status.scanned(name)

View File

@ -16,13 +16,13 @@ from typing import Dict, List, Optional, TypeVar
from pydantic import BaseModel
from metadata.generated.schema.entity.data.dashboard import Dashboard
from metadata.generated.schema.entity.data.glossary import Glossary
from metadata.generated.schema.entity.data.pipeline import Pipeline
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.data.topic import Topic
from metadata.generated.schema.entity.teams.team import Team
from metadata.generated.schema.entity.teams.user import User
from metadata.ingestion.models.table_metadata import Dashboard
from metadata.utils.logger import utils_logger
logger = utils_logger()

View File

@ -25,6 +25,7 @@ from pydantic import BaseModel
from metadata.antlr.split_listener import SplitListener
from metadata.generated.antlr.FqnLexer import FqnLexer
from metadata.generated.antlr.FqnParser import FqnParser
from metadata.generated.schema.entity.data.chart import Chart
from metadata.generated.schema.entity.data.dashboard import Dashboard
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
@ -207,6 +208,20 @@ def _(
return _build(service_name, dashboard_name)
@fqn_build_registry.add(Chart)
def _(
_: OpenMetadata, # ES Index not necessary for dashboard FQN building
*,
service_name: str,
chart_name: str,
) -> str:
if not service_name or not chart_name:
raise FQNBuildingException(
f"Args should be informed, but got service=`{service_name}`, chart=`{chart_name}``"
)
return _build(service_name, chart_name)
@fqn_build_registry.add(Tag)
def _(
_: OpenMetadata, # ES Index not necessary for Tag FQN building

View File

@ -10,7 +10,7 @@
# limitations under the License.
import re
from datetime import datetime, timedelta
from typing import Any, Dict, Iterable, Optional
from typing import Any, Dict, Iterable, List, Optional
from metadata.generated.schema.api.services.createDashboardService import (
CreateDashboardServiceRequest,
@ -24,6 +24,7 @@ from metadata.generated.schema.api.services.createMessagingService import (
from metadata.generated.schema.api.services.createStorageService import (
CreateStorageServiceRequest,
)
from metadata.generated.schema.entity.data.chart import Chart, ChartType
from metadata.generated.schema.entity.services.dashboardService import DashboardService
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.services.messagingService import MessagingService
@ -31,11 +32,35 @@ from metadata.generated.schema.entity.services.storageService import StorageServ
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityReference import (
EntityReference,
EntityReferenceList,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils import fqn
from metadata.utils.logger import utils_logger
logger = utils_logger()
om_chart_type_dict = {
"line": ChartType.Line,
"big_number": ChartType.Line,
"big_number_total": ChartType.Line,
"dual_line": ChartType.Line,
"line_multi": ChartType.Line,
"table": ChartType.Table,
"dist_bar": ChartType.Bar,
"bar": ChartType.Bar,
"box_plot": ChartType.BoxPlot,
"boxplot": ChartType.BoxPlot,
"histogram": ChartType.Histogram,
"treemap": ChartType.Area,
"area": ChartType.Area,
"pie": ChartType.Pie,
"text": ChartType.Text,
"scatter": ChartType.Scatter,
}
def get_start_and_end(duration):
today = datetime.utcnow()
@ -204,3 +229,29 @@ def replace_special_with(raw: str, replacement: str) -> str:
:return: clean string
"""
return re.sub(r"[^a-zA-Z0-9]", replacement, raw)
def get_standard_chart_type(raw_chart_type: str) -> str:
"""
Get standard chart type supported by OpenMetadata based on raw chart type input
:param raw_chart_type: raw chart type to be standardize
:return: standard chart type
"""
return om_chart_type_dict.get(raw_chart_type.lower(), ChartType.Other)
def get_chart_entities_from_id(
chart_ids: List[str], metadata: OpenMetadata, service_name: str
) -> List[EntityReferenceList]:
entities = []
for id in chart_ids:
chart: Chart = metadata.get_by_name(
entity=Chart,
fqn=fqn.build(
metadata, Chart, chart_name=str(id), service_name=service_name
),
)
if chart:
entity = EntityReference(id=chart.id, type="chart")
entities.append(entity)
return entities