diff --git a/ingestion/src/metadata/ingestion/api/source.py b/ingestion/src/metadata/ingestion/api/source.py index 9da8964dd4b..46b8eee83a7 100644 --- a/ingestion/src/metadata/ingestion/api/source.py +++ b/ingestion/src/metadata/ingestion/api/source.py @@ -32,8 +32,10 @@ class InvalidSourceException(Exception): class SourceStatus(Status): records = 0 - warnings: List[str] = field(default_factory=list) + success: List[str] = field(default_factory=list) failures: List[str] = field(default_factory=list) + warnings: List[str] = field(default_factory=list) + filtered: List[str] = field(default_factory=list) def scanned(self, record: Any) -> None: self.records += 1 @@ -44,6 +46,9 @@ class SourceStatus(Status): def failure(self, key: str, reason: str) -> None: self.failures.append({key: reason}) + def filter(self, key: str, reason: str) -> None: + self.filtered.append({key: reason}) + class Source(Closeable, Generic[Entity], metaclass=ABCMeta): @classmethod diff --git a/ingestion/src/metadata/ingestion/models/table_metadata.py b/ingestion/src/metadata/ingestion/models/table_metadata.py index f71a4b8a4d2..6fe52c29cc9 100644 --- a/ingestion/src/metadata/ingestion/models/table_metadata.py +++ b/ingestion/src/metadata/ingestion/models/table_metadata.py @@ -10,7 +10,7 @@ # limitations under the License. -from typing import Any, Dict, List, Optional +from typing import List, Optional from pydantic import BaseModel @@ -220,39 +220,3 @@ class GlossaryTermESDocument(BaseModel): suggest: List[dict] last_updated_timestamp: Optional[int] doc_as_upsert: bool = True - - -class DashboardOwner(BaseModel): - """Dashboard owner""" - - username: str - first_name: str - last_name: str - - -class Chart(BaseModel): - """Chart""" - - name: str - displayName: str - description: str - chart_type: str - url: str - owners: List[DashboardOwner] = None - lastModified: int = None - datasource_fqn: str = None - service: EntityReference - custom_props: Dict[Any, Any] = None - - -class Dashboard(BaseModel): - """Dashboard""" - - name: str - displayName: str - description: str - url: str - owners: List[DashboardOwner] = None - charts: List[str] - service: EntityReference - lastModified: int = None diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest.py index 629cadacde3..c0a73842f55 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest.py @@ -33,7 +33,6 @@ from metadata.generated.schema.api.policies.createPolicy import CreatePolicyRequ from metadata.generated.schema.api.teams.createRole import CreateRoleRequest from metadata.generated.schema.api.teams.createTeam import CreateTeamRequest from metadata.generated.schema.api.teams.createUser import CreateUserRequest -from metadata.generated.schema.entity.data.chart import ChartType from metadata.generated.schema.entity.data.location import Location from metadata.generated.schema.entity.data.pipeline import Pipeline from metadata.generated.schema.entity.data.table import Table @@ -50,7 +49,7 @@ from metadata.ingestion.models.ometa_policy import OMetaPolicy from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable from metadata.ingestion.models.ometa_tag_category import OMetaTagAndCategory from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus -from metadata.ingestion.models.table_metadata import Chart, Dashboard, DeleteTable +from metadata.ingestion.models.table_metadata import DeleteTable from metadata.ingestion.models.table_tests import OMetaTableTest from metadata.ingestion.models.user import OMetaUserProfile from metadata.ingestion.ometa.client import APIError @@ -66,20 +65,6 @@ logger = ingestion_logger() # Allow types from the generated pydantic models T = TypeVar("T", bound=BaseModel) -om_chart_type_dict = { - "line": ChartType.Line, - "table": ChartType.Table, - "dist_bar": ChartType.Bar, - "bar": ChartType.Bar, - "big_number": ChartType.Line, - "histogram": ChartType.Histogram, - "big_number_total": ChartType.Line, - "dual_line": ChartType.Line, - "line_multi": ChartType.Line, - "treemap": ChartType.Area, - "box_plot": ChartType.Bar, -} - class MetadataRestSinkConfig(ConfigModel): api_endpoint: str = None @@ -117,9 +102,9 @@ class MetadataRestSink(Sink[Entity]): self.write_tables(record) elif isinstance(record, Topic): self.write_topics(record) - elif isinstance(record, Chart): + elif isinstance(record, CreateChartRequest): self.write_charts(record) - elif isinstance(record, Dashboard): + elif isinstance(record, CreateDashboardRequest): self.write_dashboards(record) elif isinstance(record, Location): self.write_locations(record) @@ -297,27 +282,9 @@ class MetadataRestSink(Sink[Entity]): logger.error(err) self.status.failure(f"Topic: {topic.name}") - def write_charts(self, chart: Chart): + def write_charts(self, chart: CreateChartRequest): try: - om_chart_type = ChartType.Other - if ( - chart.chart_type is not None - and chart.chart_type in om_chart_type_dict.keys() - ): - om_chart_type = om_chart_type_dict[chart.chart_type] - - chart_request = CreateChartRequest( - name=chart.name, - displayName=chart.displayName, - description=chart.description, - chartType=om_chart_type, - chartUrl=chart.url, - service=chart.service, - ) - created_chart = self.metadata.create_or_update(chart_request) - self.charts_dict[chart.name] = EntityReference( - id=created_chart.id, type="chart" - ) + created_chart = self.metadata.create_or_update(chart) logger.info(f"Successfully ingested chart {created_chart.displayName}") self.status.records_written(f"Chart: {created_chart.displayName}") except (APIError, ValidationError) as err: @@ -325,19 +292,9 @@ class MetadataRestSink(Sink[Entity]): logger.error(err) self.status.failure(f"Chart: {chart.displayName}") - def write_dashboards(self, dashboard: Dashboard): + def write_dashboards(self, dashboard: CreateDashboardRequest): try: - charts = self._get_chart_references(dashboard) - - dashboard_request = CreateDashboardRequest( - name=dashboard.name, - displayName=dashboard.displayName, - description=dashboard.description, - dashboardUrl=dashboard.url, - charts=charts, - service=dashboard.service, - ) - created_dashboard = self.metadata.create_or_update(dashboard_request) + created_dashboard = self.metadata.create_or_update(dashboard) logger.info( f"Successfully ingested dashboard {created_dashboard.displayName}" ) @@ -347,13 +304,6 @@ class MetadataRestSink(Sink[Entity]): logger.error(err) self.status.failure(f"Dashboard {dashboard.name}") - def _get_chart_references(self, dashboard: Dashboard) -> []: - chart_references = [] - for chart_id in dashboard.charts: - if chart_id in self.charts_dict.keys(): - chart_references.append(self.charts_dict[chart_id]) - return chart_references - def write_locations(self, location: Location): try: created_location = self._create_location(location) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_source.py b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_source.py index 5831af0e702..8c577e4e983 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_source.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_source.py @@ -1,6 +1,9 @@ from abc import ABC, abstractmethod +from dataclasses import dataclass from typing import Any, Iterable, List, Optional, Union +from metadata.generated.schema.api.data.createChart import CreateChartRequest +from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, @@ -14,9 +17,7 @@ from metadata.generated.schema.metadataIngestion.workflow import ( ) from metadata.ingestion.api.common import Entity from metadata.ingestion.api.source import Source, SourceStatus -from metadata.ingestion.models.table_metadata import Chart, Dashboard from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.ingestion.source.database.common_db_source import SQLSourceStatus from metadata.utils.connections import get_connection from metadata.utils.filters import filter_by_dashboard from metadata.utils.logger import ingestion_logger @@ -24,6 +25,21 @@ from metadata.utils.logger import ingestion_logger logger = ingestion_logger() +@dataclass +class DashboardSourceStatus(SourceStatus): + """ + Reports the source status after ingestion + """ + + def scanned(self, record: str) -> None: + self.success.append(record) + logger.info(f"Scanned: {record}") + + def filter(self, record: str, err: str) -> None: + self.filtered.append(record) + logger.warning(f"Filtered {record}: {err}") + + class DashboardSourceService(Source, ABC): @abstractmethod def get_dashboards_list(self) -> Optional[List[Any]]: @@ -44,7 +60,7 @@ class DashboardSourceService(Source, ABC): """ @abstractmethod - def get_dashboard_entity(self, dashboard_details: Any) -> Dashboard: + def get_dashboard_entity(self, dashboard_details: Any) -> CreateDashboardRequest: """ Method to Get Dashboard Entity """ @@ -56,7 +72,9 @@ class DashboardSourceService(Source, ABC): """ @abstractmethod - def fetch_dashboard_charts(self, dashboard: Any) -> Optional[Iterable[Chart]]: + def fetch_dashboard_charts( + self, dashboard: Any + ) -> Optional[Iterable[CreateChartRequest]]: """ Method to fetch charts linked to dashboard """ @@ -80,7 +98,7 @@ class DashboardSourceService(Source, ABC): self.service = self.metadata.get_service_or_create( entity=DashboardService, config=config ) - self.status = SQLSourceStatus() + self.status = DashboardSourceStatus() self.metadata_client = OpenMetadata(self.metadata_config) def next_record(self) -> Iterable[Entity]: @@ -88,7 +106,7 @@ class DashboardSourceService(Source, ABC): def process_dashboards( self, - ) -> Iterable[Union[Dashboard, Chart, AddLineageRequest]]: + ) -> Iterable[Union[CreateDashboardRequest, CreateChartRequest, AddLineageRequest]]: """Get dashboard method""" for dashboard in self.get_dashboards_list(): try: @@ -102,6 +120,7 @@ class DashboardSourceService(Source, ABC): "Dashboard Pattern not Allowed", ) continue + yield from self.fetch_dashboard_charts(dashboard_details) or [] yield from self.get_dashboard_entity(dashboard_details) if self.source_config.dbServiceName: yield from self.get_lineage(dashboard_details) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/looker.py b/ingestion/src/metadata/ingestion/source/dashboard/looker.py index 92bcfe1a0e1..2ba7f4e1e6a 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/looker.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/looker.py @@ -15,6 +15,8 @@ from typing import Any, Iterable, List, Optional import looker_sdk +from metadata.generated.schema.api.data.createChart import CreateChartRequest +from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.services.connections.dashboard.lookerConnection import ( LookerConnection, @@ -28,9 +30,9 @@ from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.common import Entity from metadata.ingestion.api.source import InvalidSourceException -from metadata.ingestion.models.table_metadata import Chart, Dashboard from metadata.ingestion.source.dashboard.dashboard_source import DashboardSourceService -from metadata.utils.filters import filter_by_chart, filter_by_dashboard +from metadata.utils.filters import filter_by_chart +from metadata.utils.helpers import get_chart_entities_from_id, get_standard_chart_type from metadata.utils.logger import ingestion_logger logger = ingestion_logger() @@ -104,17 +106,20 @@ class LookerSource(DashboardSourceService): ] return self.client.dashboard(dashboard_id=dashboard.id, fields=",".join(fields)) - def get_dashboard_entity(self, dashboard_details) -> Dashboard: + def get_dashboard_entity(self, dashboard_details: Any) -> CreateDashboardRequest: """ Method to Get Dashboard Entity """ - yield from self.fetch_dashboard_charts(dashboard_details) - yield Dashboard( + yield CreateDashboardRequest( name=dashboard_details.id, displayName=dashboard_details.title, description=dashboard_details.description or "", - charts=self.chart_names, - url=f"/dashboards/{dashboard_details.id}", + charts=get_chart_entities_from_id( + chart_ids=self.chart_names, + metadata=self.metadata, + service_name=self.config.serviceName, + ), + dashboardUrl=f"/dashboards/{dashboard_details.id}", service=EntityReference(id=self.service.id, type="dashboardService"), ) @@ -125,7 +130,9 @@ class LookerSource(DashboardSourceService): logger.info("Lineage not implemented for Looker") return None - def fetch_dashboard_charts(self, dashboard_details) -> Optional[Iterable[Chart]]: + def fetch_dashboard_charts( + self, dashboard_details + ) -> Optional[Iterable[CreateChartRequest]]: """ Metod to fetch charts linked to dashboard """ @@ -137,14 +144,14 @@ class LookerSource(DashboardSourceService): chart_filter_pattern=self.source_config.chartFilterPattern, chart_name=dashboard_elements.id, ): - self.status.failure(dashboard_elements.id, "Chart filtered out") + self.status.filter(dashboard_elements.id, "Chart filtered out") continue - om_dashboard_elements = Chart( + om_dashboard_elements = CreateChartRequest( name=dashboard_elements.id, displayName=dashboard_elements.title or "", description="", - chart_type=dashboard_elements.type, - url=f"/dashboard_elements/{dashboard_elements.id}", + chartType=get_standard_chart_type(dashboard_elements.type).value, + chartUrl=f"/dashboard_elements/{dashboard_elements.id}", service=EntityReference( id=self.service.id, type="dashboardService" ), diff --git a/ingestion/src/metadata/ingestion/source/dashboard/metabase.py b/ingestion/src/metadata/ingestion/source/dashboard/metabase.py index f048ca68bf3..7a2bfa042c6 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/metabase.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/metabase.py @@ -11,11 +11,12 @@ """Metabase source module""" import traceback -import uuid from typing import Iterable, List, Optional import requests +from metadata.generated.schema.api.data.createChart import CreateChartRequest +from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.dashboard import ( Dashboard as LineageDashboard, @@ -33,14 +34,17 @@ from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.source import InvalidSourceException -from metadata.ingestion.models.table_metadata import Chart, Dashboard from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.dashboard.dashboard_source import DashboardSourceService from metadata.ingestion.source.database.common_db_source import SQLSourceStatus from metadata.utils import fqn from metadata.utils.connections import get_connection from metadata.utils.filters import filter_by_chart -from metadata.utils.helpers import replace_special_with +from metadata.utils.helpers import ( + get_chart_entities_from_id, + get_standard_chart_type, + replace_special_with, +) from metadata.utils.logger import ingestion_logger HEADERS = {"Content-Type": "application/json", "Accept": "*/*"} @@ -122,7 +126,7 @@ class MetabaseSource(DashboardSourceService): resp_dashboard = self.req_get(f"/api/dashboard/{dashboard['id']}") return resp_dashboard.json() - def get_dashboard_entity(self, dashboard_details: dict) -> Dashboard: + def get_dashboard_entity(self, dashboard_details: dict) -> CreateDashboardRequest: """ Method to Get Dashboard Entity """ @@ -130,27 +134,28 @@ class MetabaseSource(DashboardSourceService): f"/dashboard/{dashboard_details['id']}-" f"{replace_special_with(raw=dashboard_details['name'].lower(), replacement='-')}" ) - - yield from self.fetch_dashboard_charts(dashboard_details) - yield Dashboard( - id=uuid.uuid4(), + yield CreateDashboardRequest( name=dashboard_details["name"], - url=dashboard_url, + dashboardUrl=dashboard_url, displayName=dashboard_details["name"], - description=dashboard_details["description"] - if dashboard_details["description"] is not None - else "", - charts=self.charts, + description=dashboard_details.get("description", ""), + charts=get_chart_entities_from_id( + chart_ids=self.charts, + metadata=self.metadata, + service_name=self.config.serviceName, + ), service=EntityReference(id=self.service.id, type="dashboardService"), ) - def fetch_dashboard_charts(self, dashboard_details: dict) -> Iterable[Chart]: + def fetch_dashboard_charts( + self, dashboard_details: dict + ) -> Iterable[CreateChartRequest]: """Get chart method Args: dashboard_details: Returns: - Iterable[Chart] + Iterable[CreateChartRequest] """ charts = dashboard_details["ordered_cards"] for chart in charts: @@ -171,15 +176,14 @@ class MetabaseSource(DashboardSourceService): chart_details["name"], "Chart Pattern not allowed" ) continue - yield Chart( - id=uuid.uuid4(), + yield CreateChartRequest( name=chart_details["name"], displayName=chart_details["name"], - description=chart_details["description"] - if chart_details["description"] is not None - else "", - chart_type=str(chart_details["display"]), - url=chart_url, + description=chart_details.get("description", ""), + chartType=get_standard_chart_type( + str(chart_details["display"]) + ).value, + chartUrl=chart_url, service=EntityReference( id=self.service.id, type="dashboardService" ), @@ -256,31 +260,3 @@ class MetabaseSource(DashboardSourceService): return requests.get( self.service_connection.hostPort + path, headers=self.metabase_session ) - - def get_card_detail(self, card_list): - # TODO: Need to handle card lineage - metadata = OpenMetadata(self.metadata_config) - for card in card_list: - try: - card_details = card["card"] - if not card_details.get("id"): - continue - card_detail_resp = self.req_get(f"/api/card/{card_details['id']}") - if card_detail_resp.status_code == 200: - raw_query = ( - card_details.get("dataset_query", {}) - .get("native", {}) - .get("query", "") - ) - except Exception as e: - logger.error(repr(e)) - - def get_cards(self): - """Get cards method""" - resp_dashboards = self.req_get("/api/dashboard") - if resp_dashboards.status_code == 200: - for dashboard in resp_dashboards.json(): - resp_dashboard = self.req_get(f"/api/dashboard/{dashboard['id']}") - dashboard_details = resp_dashboard.json() - card_list = dashboard_details["ordered_cards"] - self.get_card_detail(card_list) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/powerbi.py b/ingestion/src/metadata/ingestion/source/dashboard/powerbi.py index 4b003f2eeda..c8ffad5e958 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/powerbi.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/powerbi.py @@ -11,13 +11,13 @@ """PowerBI source module""" import traceback -import uuid from typing import Any, Iterable, List, Optional +from metadata.generated.schema.api.data.createChart import CreateChartRequest +from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest -from metadata.generated.schema.entity.data.dashboard import ( - Dashboard as LineageDashboard, -) +from metadata.generated.schema.entity.data.chart import ChartType +from metadata.generated.schema.entity.data.dashboard import Dashboard from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.services.connections.dashboard.powerBIConnection import ( PowerBIConnection, @@ -31,10 +31,10 @@ from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.source import InvalidSourceException -from metadata.ingestion.models.table_metadata import Chart, Dashboard from metadata.ingestion.source.dashboard.dashboard_source import DashboardSourceService from metadata.utils import fqn from metadata.utils.filters import filter_by_chart +from metadata.utils.helpers import get_chart_entities_from_id from metadata.utils.logger import ingestion_logger logger = ingestion_logger() @@ -94,18 +94,21 @@ class PowerbiSource(DashboardSourceService): """ return dashboard - def get_dashboard_entity(self, dashboard_details: dict) -> Dashboard: + def get_dashboard_entity(self, dashboard_details: dict) -> CreateDashboardRequest: """ Method to Get Dashboard Entity, Dashboard Charts & Lineage """ - yield from self.fetch_dashboard_charts(dashboard_details) - yield Dashboard( + yield CreateDashboardRequest( name=dashboard_details["id"], # PBI has no hostPort property. All URL details are present in the webUrl property. - url=dashboard_details["webUrl"], + dashboardUrl=dashboard_details["webUrl"], displayName=dashboard_details["displayName"], description="", - charts=self.charts, + charts=get_chart_entities_from_id( + chart_ids=self.charts, + metadata=self.metadata, + service_name=self.config.serviceName, + ), service=EntityReference(id=self.service.id, type="dashboardService"), ) @@ -138,12 +141,12 @@ class PowerbiSource(DashboardSourceService): ) to_fqn = fqn.build( self.metadata, - entity_type=LineageDashboard, + entity_type=Dashboard, service_name=self.config.serviceName, dashboard_name=dashboard_details["id"], ) to_entity = self.metadata.get_by_name( - entity=LineageDashboard, + entity=Dashboard, fqn=to_fqn, ) if from_entity and to_entity: @@ -162,7 +165,9 @@ class PowerbiSource(DashboardSourceService): logger.debug(traceback.format_exc()) logger.error(err) - def fetch_dashboard_charts(self, dashboard_details: dict) -> Iterable[Chart]: + def fetch_dashboard_charts( + self, dashboard_details: dict + ) -> Iterable[CreateChartRequest]: """Get chart method Args: dashboard_details: @@ -179,18 +184,15 @@ class PowerbiSource(DashboardSourceService): if filter_by_chart( self.source_config.chartFilterPattern, chart["title"] ): - self.status.filter( - chart["title"], "Filtered out using Chart filter pattern" - ) + self.status.filter(chart["title"], "Chart Pattern not Allowed") continue - yield Chart( - id=uuid.uuid4(), + yield CreateChartRequest( name=chart["id"], displayName=chart["title"], description="", - chart_type="", # Fix this with https://github.com/open-metadata/OpenMetadata/issues/1673 + chartType=ChartType.Other.value, # PBI has no hostPort property. All URL details are present in the webUrl property. - url=chart["embedUrl"], + chartUrl=chart["embedUrl"], service=EntityReference( id=self.service.id, type="dashboardService" ), diff --git a/ingestion/src/metadata/ingestion/source/dashboard/redash.py b/ingestion/src/metadata/ingestion/source/dashboard/redash.py index 114957e2d5b..96b2886ebec 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/redash.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/redash.py @@ -9,14 +9,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -import uuid from dataclasses import dataclass, field from typing import Dict, Iterable, List, Optional from sql_metadata import Parser +from metadata.generated.schema.api.data.createChart import CreateChartRequest +from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest -from metadata.generated.schema.entity.data.chart import Chart from metadata.generated.schema.entity.data.dashboard import ( Dashboard as Lineage_Dashboard, ) @@ -33,10 +33,9 @@ from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.common import Entity from metadata.ingestion.api.source import InvalidSourceException, SourceStatus -from metadata.ingestion.models.table_metadata import Chart as ModelChart -from metadata.ingestion.models.table_metadata import Dashboard from metadata.ingestion.source.dashboard.dashboard_source import DashboardSourceService from metadata.utils import fqn +from metadata.utils.helpers import get_chart_entities_from_id, get_standard_chart_type from metadata.utils.logger import ingestion_logger from metadata.utils.sql_lineage import search_table_entities @@ -100,24 +99,25 @@ class RedashSource(DashboardSourceService): """ return self.client.get_dashboard(dashboard["slug"]) - def get_dashboard_entity(self, dashboard_details: dict) -> Dashboard: + def get_dashboard_entity(self, dashboard_details: dict) -> CreateDashboardRequest: """ Method to Get Dashboard Entity """ - yield from self.fetch_dashboard_charts(dashboard_details) self.status.item_scanned_status() dashboard_description = "" for widgets in dashboard_details.get("widgets", []): dashboard_description = widgets.get("text") - yield Dashboard( - id=uuid.uuid4(), + yield CreateDashboardRequest( name=dashboard_details.get("id"), displayName=dashboard_details["name"], description=dashboard_description if dashboard_details else "", - charts=self.dashboards_to_charts[dashboard_details.get("id")], - usageSummary=None, + charts=get_chart_entities_from_id( + chart_ids=self.dashboards_to_charts[dashboard_details.get("id")], + metadata=self.metadata, + service_name=self.config.serviceName, + ), service=EntityReference(id=self.service.id, type="dashboardService"), - url=f"/dashboard/{dashboard_details.get('slug', '')}", + dashboardUrl=f"/dashboard/{dashboard_details.get('slug', '')}", ) def get_lineage( @@ -137,7 +137,6 @@ class RedashSource(DashboardSourceService): table_list = Parser(visualization["query"]["query"]) for table in table_list.tables: dataabase_schema = None - print(table) if "." in table: dataabase_schema, table = fqn.split(table)[-2:] table_entities = search_table_entities( @@ -172,7 +171,7 @@ class RedashSource(DashboardSourceService): def fetch_dashboard_charts( self, dashboard_details: dict - ) -> Optional[Iterable[Chart]]: + ) -> Optional[Iterable[CreateChartRequest]]: """ Metod to fetch charts linked to dashboard """ @@ -180,14 +179,16 @@ class RedashSource(DashboardSourceService): for widgets in dashboard_details.get("widgets", []): visualization = widgets.get("visualization") self.dashboards_to_charts[dashboard_details.get("id")].append(widgets["id"]) - yield ModelChart( + yield CreateChartRequest( name=widgets["id"], displayName=visualization["query"]["name"] if visualization and visualization["query"] else "", - chart_type=visualization["type"] if visualization else "", + chartType=get_standard_chart_type( + visualization["type"] if visualization else "" + ), service=EntityReference(id=self.service.id, type="dashboardService"), - url=f"/dashboard/{dashboard_details.get('slug', '')}", + chartUrl=f"/dashboard/{dashboard_details.get('slug', '')}", description=visualization["description"] if visualization else "", ) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/superset.py b/ingestion/src/metadata/ingestion/source/dashboard/superset.py index 68932678f35..c4c74be843e 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/superset.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/superset.py @@ -18,6 +18,8 @@ from typing import List, Optional import dateutil.parser as dateparser +from metadata.generated.schema.api.data.createChart import CreateChartRequest +from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.dashboard import ( Dashboard as Lineage_Dashboard, @@ -38,9 +40,9 @@ from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.source import InvalidSourceException, SourceStatus -from metadata.ingestion.models.table_metadata import Chart, Dashboard, DashboardOwner from metadata.ingestion.source.dashboard.dashboard_source import DashboardSourceService from metadata.utils import fqn +from metadata.utils.helpers import get_chart_entities_from_id, get_standard_chart_type from metadata.utils.logger import ingestion_logger logger = ingestion_logger() @@ -84,26 +86,6 @@ def get_filter_name(filter_obj): return f"{clause} {column} {operator} {comparator}" -def get_owners(owners_obj): - """ - Get owner - - Args: - owners_obj: - Returns: - list - """ - owners = [] - for owner in owners_obj: - dashboard_owner = DashboardOwner( - first_name=owner["first_name"], - last_name=owner["last_name"], - username=owner["username"], - ) - owners.append(dashboard_owner) - return owners - - class SupersetSource(DashboardSourceService): """ Superset source class @@ -186,24 +168,21 @@ class SupersetSource(DashboardSourceService): """ return dashboard - def get_dashboard_entity(self, dashboard_details: dict) -> Dashboard: + def get_dashboard_entity(self, dashboard_details: dict) -> CreateDashboardRequest: """ Method to Get Dashboard Entity """ - yield from self.fetch_dashboard_charts(dashboard_details) - last_modified = ( - dateparser.parse(dashboard_details.get("changed_on_utc", "now")).timestamp() - * 1000 - ) - yield Dashboard( + yield CreateDashboardRequest( name=dashboard_details["id"], displayName=dashboard_details["dashboard_title"], description="", - url=dashboard_details["url"], - owners=get_owners(dashboard_details["owners"]), - charts=self.charts, + dashboardUrl=dashboard_details["url"], + charts=get_chart_entities_from_id( + chart_ids=self.charts, + metadata=self.metadata, + service_name=self.config.serviceName, + ), service=EntityReference(id=self.service.id, type="dashboardService"), - lastModified=last_modified, ) def _get_charts_of_dashboard(self, dashboard_details: dict) -> List[str]: @@ -296,41 +275,19 @@ class SupersetSource(DashboardSourceService): return None # pylint: disable=too-many-locals - def _build_chart(self, chart_json: dict) -> Chart: + def _build_chart(self, chart_json: dict) -> CreateChartRequest: chart_id = chart_json["id"] - last_modified = ( - dateparser.parse(chart_json.get("changed_on_utc", "now")).timestamp() * 1000 - ) params = json.loads(chart_json["params"]) - metrics = [ - get_metric_name(metric) - for metric in (params.get("metrics", []) or [params.get("metric")]) - ] - filters = [ - get_filter_name(filter_obj) - for filter_obj in params.get("adhoc_filters", []) - ] group_bys = params.get("groupby", []) or [] if isinstance(group_bys, str): group_bys = [group_bys] - custom_properties = { - "Metrics": ", ".join(metrics), - "Filters": ", ".join(filters), - "Dimensions": ", ".join(group_bys), - } - chart = Chart( + chart = CreateChartRequest( name=chart_id, displayName=chart_json["slice_name"], description="", - chart_type=chart_json["viz_type"], - url=chart_json["url"], - owners=get_owners(chart_json["owners"]), - datasource_fqn=self._get_datasource_fqn(chart_json["datasource_id"]) - if chart_json["datasource_id"] - else None, - lastModified=last_modified, + chartType=get_standard_chart_type(chart_json["viz_type"]), + chartUrl=chart_json["url"], service=EntityReference(id=self.service.id, type="dashboardService"), - custom_props=custom_properties, ) yield chart diff --git a/ingestion/src/metadata/ingestion/source/dashboard/tableau.py b/ingestion/src/metadata/ingestion/source/dashboard/tableau.py index 8c431efa337..036727351a4 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/tableau.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/tableau.py @@ -12,8 +12,7 @@ Tableau source module """ import traceback -import uuid -from typing import Iterable, List, Optional +from typing import Iterable, List, Optional, Union import dateutil.parser as dateparser from tableau_api_lib.utils.querying import ( @@ -22,7 +21,14 @@ from tableau_api_lib.utils.querying import ( get_workbooks_dataframe, ) +from metadata.generated.schema.api.data.createChart import CreateChartRequest +from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest +from metadata.generated.schema.api.tags.createTag import CreateTagRequest +from metadata.generated.schema.api.tags.createTagCategory import ( + CreateTagCategoryRequest, +) +from metadata.generated.schema.api.teams.createUser import CreateUserRequest from metadata.generated.schema.entity.data.dashboard import ( Dashboard as LineageDashboard, ) @@ -33,20 +39,24 @@ from metadata.generated.schema.entity.services.connections.dashboard.tableauConn from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) +from metadata.generated.schema.entity.tags.tagCategory import Tag +from metadata.generated.schema.entity.teams.user import User from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.generated.schema.type.entityReference import EntityReference +from metadata.generated.schema.type.tagLabel import TagLabel from metadata.ingestion.api.source import InvalidSourceException, SourceStatus -from metadata.ingestion.models.table_metadata import Chart, Dashboard, DashboardOwner +from metadata.ingestion.models.ometa_tag_category import OMetaTagAndCategory from metadata.ingestion.source.dashboard.dashboard_source import DashboardSourceService from metadata.utils import fqn from metadata.utils.filters import filter_by_chart -from metadata.utils.fqn import FQN_SEPARATOR +from metadata.utils.helpers import get_chart_entities_from_id, get_standard_chart_type from metadata.utils.logger import ingestion_logger logger = ingestion_logger() +TABLEAU_TAG_CATEGORY = "TableauTags" class TableauSource(DashboardSourceService): @@ -86,26 +96,6 @@ class TableauSource(DashboardSourceService): ) return cls(config, metadata_config) - @staticmethod - def get_owner(owner) -> List[DashboardOwner]: - """Get dashboard owner - - Args: - owner: - Returns: - List[DashboardOwner] - """ - parts = owner["fullName"].split(" ") - first_name = " ".join(parts[: len(owner) // 2]) - last_name = " ".join(parts[len(owner) // 2 :]) - return [ - DashboardOwner( - first_name=first_name, - last_name=last_name, - username=owner["name"], - ) - ] - def get_dashboards_list(self) -> Optional[List[dict]]: """ Get List of all dashboards @@ -128,26 +118,84 @@ class TableauSource(DashboardSourceService): """ return dashboard - def get_dashboard_entity(self, dashboard_details: dict) -> Dashboard: + def get_dashboard_owner(self, owner: dict) -> Optional[EntityReference]: + """Get dashboard owner + + Args: + owner: + Returns: + Optional[EntityReference] + """ + try: + user_request = CreateUserRequest( + name=owner["name"], displayName=owner["fullName"], email=owner["email"] + ) + created_user: User = self.metadata.create_or_update(user_request) + return EntityReference( + id=created_user.id.__root__, + type="user", + ) + except Exception as err: + logger.error(err) + + def create_tags(self, entity_tags: dict) -> OMetaTagAndCategory: + """ + Fetch Dashboard Tags + """ + if entity_tags.get("tag"): + for tag in entity_tags["tag"]: + tag_category = OMetaTagAndCategory( + category_name=CreateTagCategoryRequest( + name=TABLEAU_TAG_CATEGORY, + description="Tags associates with amundsen entities", + categoryType="Descriptive", + ), + category_details=CreateTagRequest( + name=tag["label"], description="Amundsen Table Tag" + ), + ) + yield tag_category + logger.info(f"Tag Category {tag_category}, Primary Tag {tag} Ingested") + + def get_tag_lables(self, tags: dict) -> Optional[List[TagLabel]]: + if tags.get("tag"): + return [ + TagLabel( + tagFQN=fqn.build( + self.metadata, + Tag, + tag_category_name=TABLEAU_TAG_CATEGORY, + tag_name=tag["label"], + ), + labelType="Automated", + state="Suggested", + source="Tag", + ) + for tag in tags["tag"] + ] + return [] + + def get_dashboard_entity( + self, dashboard_details: dict + ) -> Union[CreateDashboardRequest, Optional[OMetaTagAndCategory]]: """ Method to Get Dashboard Entity """ - yield from self.fetch_dashboard_charts(dashboard_details) dashboard_tag = dashboard_details.get("tags") - tag_labels = [] - if hasattr(dashboard_tag, "tag"): - tag_labels = [tag["label"] for tag in dashboard_tag["tag"]] - yield Dashboard( - id=uuid.uuid4(), + yield from self.create_tags(dashboard_tag) + yield CreateDashboardRequest( name=dashboard_details.get("name"), displayName=dashboard_details.get("name"), description="", - owner=self.get_owner(self.owner), - charts=self.charts, - tags=tag_labels, - url=dashboard_details.get("webpageUrl"), + owner=self.get_dashboard_owner(self.owner), + charts=get_chart_entities_from_id( + chart_ids=self.charts, + metadata=self.metadata, + service_name=self.config.serviceName, + ), + tags=self.get_tag_lables(dashboard_tag), + dashboardUrl=dashboard_details.get("webpageUrl"), service=EntityReference(id=self.service.id, type="dashboardService"), - last_modified=dateparser.parse(self.chart["updatedAt"]).timestamp() * 1000, ) def get_lineage(self, dashboard_details: dict) -> Optional[AddLineageRequest]: @@ -207,7 +255,7 @@ class TableauSource(DashboardSourceService): def fetch_dashboard_charts( self, dashboard_details: dict - ) -> Optional[Iterable[Chart]]: + ) -> Optional[Iterable[CreateChartRequest]]: """ Method to fetch charts linked to dashboard """ @@ -233,24 +281,19 @@ class TableauSource(DashboardSourceService): f"views/{self.all_dashboard_details['workbook'][index]['name']}/" f"{self.all_dashboard_details['viewUrlName'][index]}" ) - chart_last_modified = self.all_dashboard_details["updatedAt"][index] - tag_labels = [] - if hasattr(chart_tags, "tag"): - for tag in chart_tags["tag"]: - tag_labels.append(tag["label"]) - yield Chart( - id=uuid.uuid4(), + yield from self.create_tags(chart_tags) + yield CreateChartRequest( name=chart_id, displayName=chart_name, description="", - chart_type=self.all_dashboard_details["sheetType"][index], - url=chart_url, - owners=self.get_owner( + chartType=get_standard_chart_type( + self.all_dashboard_details["sheetType"][index] + ), + chartUrl=chart_url, + owner=self.get_dashboard_owner( self.all_dashboard_details["owner"][index] ), - datasource_fqn=chart_url.replace("/", FQN_SEPARATOR), - last_modified=dateparser.parse(chart_last_modified).timestamp() - * 1000, + tags=self.get_tag_lables(chart_tags), service=EntityReference( id=self.service.id, type="dashboardService" ), diff --git a/ingestion/src/metadata/ingestion/source/database/common_db_source.py b/ingestion/src/metadata/ingestion/source/database/common_db_source.py index abce8d8e4b3..26faa85363c 100644 --- a/ingestion/src/metadata/ingestion/source/database/common_db_source.py +++ b/ingestion/src/metadata/ingestion/source/database/common_db_source.py @@ -13,9 +13,9 @@ Generic source to build SQL connectors. """ import traceback -from dataclasses import dataclass, field +from dataclasses import dataclass from datetime import datetime -from typing import Iterable, List, Optional, Tuple +from typing import Iterable, Optional, Tuple from sqlalchemy.engine import Connection from sqlalchemy.engine.base import Engine @@ -56,11 +56,6 @@ class SQLSourceStatus(SourceStatus): Reports the source status after ingestion """ - success: List[str] = field(default_factory=list) - failures: List[str] = field(default_factory=list) - warnings: List[str] = field(default_factory=list) - filtered: List[str] = field(default_factory=list) - def scanned(self, record: str) -> None: self.success.append(record) logger.info(f"Table Scanned: {record}") diff --git a/ingestion/src/metadata/ingestion/source/database/sample_data.py b/ingestion/src/metadata/ingestion/source/database/sample_data.py index e1a21bf3485..6778888926f 100644 --- a/ingestion/src/metadata/ingestion/source/database/sample_data.py +++ b/ingestion/src/metadata/ingestion/source/database/sample_data.py @@ -21,6 +21,8 @@ from typing import Any, Dict, Iterable, List, Union from pydantic import ValidationError +from metadata.generated.schema.api.data.createChart import CreateChartRequest +from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest from metadata.generated.schema.api.data.createMlModel import CreateMlModelRequest from metadata.generated.schema.api.data.createTopic import CreateTopicRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest @@ -29,6 +31,7 @@ from metadata.generated.schema.api.teams.createTeam import CreateTeamRequest from metadata.generated.schema.api.teams.createUser import CreateUserRequest from metadata.generated.schema.api.tests.createColumnTest import CreateColumnTestRequest from metadata.generated.schema.api.tests.createTableTest import CreateTableTestRequest +from metadata.generated.schema.entity.data.dashboard import Dashboard from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.location import Location, LocationType @@ -65,11 +68,14 @@ from metadata.ingestion.api.common import Entity from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus -from metadata.ingestion.models.table_metadata import Chart, Dashboard from metadata.ingestion.models.table_tests import OMetaTableTest from metadata.ingestion.models.user import OMetaUserProfile from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.utils.helpers import get_storage_service_or_create +from metadata.utils.helpers import ( + get_chart_entities_from_id, + get_standard_chart_type, + get_storage_service_or_create, +) from metadata.utils.logger import ingestion_logger logger = ingestion_logger() @@ -479,15 +485,15 @@ class SampleDataSource(Source[Entity]): self.status.scanned("topic", create_topic.name.__root__) yield create_topic - def ingest_charts(self) -> Iterable[Chart]: + def ingest_charts(self) -> Iterable[CreateChartRequest]: for chart in self.charts["charts"]: try: - chart_ev = Chart( + chart_ev = CreateChartRequest( name=chart["name"], displayName=chart["displayName"], description=chart["description"], - chart_type=chart["chartType"], - url=chart["chartUrl"], + chartType=get_standard_chart_type(chart["chartType"]).value, + chartUrl=chart["chartUrl"], service=EntityReference( id=self.dashboard_service.id, type="dashboardService" ), @@ -497,22 +503,26 @@ class SampleDataSource(Source[Entity]): except ValidationError as err: logger.error(err) - def ingest_dashboards(self) -> Iterable[Dashboard]: + def ingest_dashboards(self) -> Iterable[CreateDashboardRequest]: for dashboard in self.dashboards["dashboards"]: - dashboard_ev = Dashboard( + dashboard_ev = CreateDashboardRequest( name=dashboard["name"], displayName=dashboard["displayName"], description=dashboard["description"], - url=dashboard["dashboardUrl"], - charts=dashboard["charts"], + dashboardUrl=dashboard["dashboardUrl"], + charts=get_chart_entities_from_id( + dashboard["charts"], + self.metadata, + self.dashboard_service.name.__root__, + ), service=EntityReference( id=self.dashboard_service.id, type="dashboardService" ), ) - self.status.scanned("dashboard", dashboard_ev.name) + self.status.scanned("dashboard", dashboard_ev.name.__root__) yield dashboard_ev - def ingest_pipelines(self) -> Iterable[Dashboard]: + def ingest_pipelines(self) -> Iterable[Pipeline]: for pipeline in self.pipelines["pipelines"]: pipeline_ev = Pipeline( id=uuid.uuid4(), diff --git a/ingestion/src/metadata/ingestion/source/metadata/amundsen.py b/ingestion/src/metadata/ingestion/source/metadata/amundsen.py index efc6656bb59..1e33f7ef606 100644 --- a/ingestion/src/metadata/ingestion/source/metadata/amundsen.py +++ b/ingestion/src/metadata/ingestion/source/metadata/amundsen.py @@ -17,6 +17,8 @@ from typing import Iterable, List, Optional from pydantic import SecretStr from metadata.config.common import ConfigModel +from metadata.generated.schema.api.data.createChart import CreateChartRequest +from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest from metadata.generated.schema.api.tags.createTag import CreateTagRequest from metadata.generated.schema.api.tags.createTagCategory import ( CreateTagCategoryRequest, @@ -49,13 +51,16 @@ from metadata.ingestion.api.common import Entity from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable from metadata.ingestion.models.ometa_tag_category import OMetaTagAndCategory -from metadata.ingestion.models.table_metadata import Chart, Dashboard from metadata.ingestion.models.user import OMetaUserProfile from metadata.ingestion.ometa.client import APIError from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils import fqn from metadata.utils.column_type_parser import ColumnTypeParser -from metadata.utils.helpers import get_dashboard_service_or_create +from metadata.utils.helpers import ( + get_chart_entities_from_id, + get_dashboard_service_or_create, + get_standard_chart_type, +) from metadata.utils.logger import ingestion_logger from metadata.utils.neo4j_helper import Neo4JConfig, Neo4jHelper from metadata.utils.sql_queries import ( @@ -307,12 +312,16 @@ class AmundsenSource(Source[Entity]): self.metadata_config, ) self.status.scanned(dashboard["name"]) - yield Dashboard( + yield CreateDashboardRequest( name=dashboard["name"], displayName=dashboard["name"], description="", - url=dashboard["url"], - charts=dashboard["chart_ids"], + dashboardUrl=dashboard["url"], + charts=get_chart_entities_from_id( + chart_ids=dashboard["chart_ids"], + metadata=self.metadata, + service_name=service_entity.name.__root__, + ), service=EntityReference(id=service_entity.id, type="dashboardService"), ) except Exception as e: @@ -329,19 +338,18 @@ class AmundsenSource(Source[Entity]): {"username": "test", "hostPort": "http://localhost:8088"}, self.metadata_config, ) - for (name, chart_id, chart_type, url) in zip( dashboard["chart_names"], dashboard["chart_ids"], dashboard["chart_types"], dashboard["chart_urls"], ): - chart = Chart( + chart = CreateChartRequest( name=chart_id, displayName=name, description="", - chart_url=url, - chart_type=chart_type, + chartUrl=url, + chartType=get_standard_chart_type(chart_type).value, service=EntityReference(id=service_entity.id, type="dashboardService"), ) self.status.scanned(name) diff --git a/ingestion/src/metadata/utils/elasticsearch.py b/ingestion/src/metadata/utils/elasticsearch.py index 5c15d0ba545..69f21a3d51f 100644 --- a/ingestion/src/metadata/utils/elasticsearch.py +++ b/ingestion/src/metadata/utils/elasticsearch.py @@ -16,13 +16,13 @@ from typing import Dict, List, Optional, TypeVar from pydantic import BaseModel +from metadata.generated.schema.entity.data.dashboard import Dashboard from metadata.generated.schema.entity.data.glossary import Glossary from metadata.generated.schema.entity.data.pipeline import Pipeline from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.data.topic import Topic from metadata.generated.schema.entity.teams.team import Team from metadata.generated.schema.entity.teams.user import User -from metadata.ingestion.models.table_metadata import Dashboard from metadata.utils.logger import utils_logger logger = utils_logger() diff --git a/ingestion/src/metadata/utils/fqn.py b/ingestion/src/metadata/utils/fqn.py index 7b630d3789c..299fc7f57c5 100644 --- a/ingestion/src/metadata/utils/fqn.py +++ b/ingestion/src/metadata/utils/fqn.py @@ -25,6 +25,7 @@ from pydantic import BaseModel from metadata.antlr.split_listener import SplitListener from metadata.generated.antlr.FqnLexer import FqnLexer from metadata.generated.antlr.FqnParser import FqnParser +from metadata.generated.schema.entity.data.chart import Chart from metadata.generated.schema.entity.data.dashboard import Dashboard from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema @@ -207,6 +208,20 @@ def _( return _build(service_name, dashboard_name) +@fqn_build_registry.add(Chart) +def _( + _: OpenMetadata, # ES Index not necessary for dashboard FQN building + *, + service_name: str, + chart_name: str, +) -> str: + if not service_name or not chart_name: + raise FQNBuildingException( + f"Args should be informed, but got service=`{service_name}`, chart=`{chart_name}``" + ) + return _build(service_name, chart_name) + + @fqn_build_registry.add(Tag) def _( _: OpenMetadata, # ES Index not necessary for Tag FQN building diff --git a/ingestion/src/metadata/utils/helpers.py b/ingestion/src/metadata/utils/helpers.py index ac36fcb7d16..9cf5a535931 100644 --- a/ingestion/src/metadata/utils/helpers.py +++ b/ingestion/src/metadata/utils/helpers.py @@ -10,7 +10,7 @@ # limitations under the License. import re from datetime import datetime, timedelta -from typing import Any, Dict, Iterable, Optional +from typing import Any, Dict, Iterable, List, Optional from metadata.generated.schema.api.services.createDashboardService import ( CreateDashboardServiceRequest, @@ -24,6 +24,7 @@ from metadata.generated.schema.api.services.createMessagingService import ( from metadata.generated.schema.api.services.createStorageService import ( CreateStorageServiceRequest, ) +from metadata.generated.schema.entity.data.chart import Chart, ChartType from metadata.generated.schema.entity.services.dashboardService import DashboardService from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.entity.services.messagingService import MessagingService @@ -31,11 +32,35 @@ from metadata.generated.schema.entity.services.storageService import StorageServ from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) +from metadata.generated.schema.type.entityReference import ( + EntityReference, + EntityReferenceList, +) from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.utils import fqn from metadata.utils.logger import utils_logger logger = utils_logger() +om_chart_type_dict = { + "line": ChartType.Line, + "big_number": ChartType.Line, + "big_number_total": ChartType.Line, + "dual_line": ChartType.Line, + "line_multi": ChartType.Line, + "table": ChartType.Table, + "dist_bar": ChartType.Bar, + "bar": ChartType.Bar, + "box_plot": ChartType.BoxPlot, + "boxplot": ChartType.BoxPlot, + "histogram": ChartType.Histogram, + "treemap": ChartType.Area, + "area": ChartType.Area, + "pie": ChartType.Pie, + "text": ChartType.Text, + "scatter": ChartType.Scatter, +} + def get_start_and_end(duration): today = datetime.utcnow() @@ -204,3 +229,29 @@ def replace_special_with(raw: str, replacement: str) -> str: :return: clean string """ return re.sub(r"[^a-zA-Z0-9]", replacement, raw) + + +def get_standard_chart_type(raw_chart_type: str) -> str: + """ + Get standard chart type supported by OpenMetadata based on raw chart type input + :param raw_chart_type: raw chart type to be standardize + :return: standard chart type + """ + return om_chart_type_dict.get(raw_chart_type.lower(), ChartType.Other) + + +def get_chart_entities_from_id( + chart_ids: List[str], metadata: OpenMetadata, service_name: str +) -> List[EntityReferenceList]: + entities = [] + for id in chart_ids: + chart: Chart = metadata.get_by_name( + entity=Chart, + fqn=fqn.build( + metadata, Chart, chart_name=str(id), service_name=service_name + ), + ) + if chart: + entity = EntityReference(id=chart.id, type="chart") + entities.append(entity) + return entities