diff --git a/catalog-rest-service/src/main/resources/json/schema/api/data/createChart.json b/catalog-rest-service/src/main/resources/json/schema/api/data/createChart.json index daf86c0f730..2ca1cf36106 100644 --- a/catalog-rest-service/src/main/resources/json/schema/api/data/createChart.json +++ b/catalog-rest-service/src/main/resources/json/schema/api/data/createChart.json @@ -23,9 +23,8 @@ "$ref": "../../entity/data/chart.json#/definitions/chartType" }, "chartUrl": { - "description": "Chart URL, pointing to its own Service URL", - "type": "string", - "format": "uri" + "description": "Chart URL suffix from its service.", + "type": "string" }, "tables": { "description": "Link to tables used in this chart.", diff --git a/catalog-rest-service/src/main/resources/json/schema/api/data/createDashboard.json b/catalog-rest-service/src/main/resources/json/schema/api/data/createDashboard.json index e27c4495db3..6622c9d7a57 100644 --- a/catalog-rest-service/src/main/resources/json/schema/api/data/createDashboard.json +++ b/catalog-rest-service/src/main/resources/json/schema/api/data/createDashboard.json @@ -20,9 +20,8 @@ "$ref": "../../type/basic.json#/definitions/markdown" }, "dashboardUrl": { - "description": "Dashboard URL", - "type": "string", - "format": "uri" + "description": "Dashboard URL suffix from its service.", + "type": "string" }, "charts": { "description": "All the charts included in this Dashboard.", diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/data/chart.json b/catalog-rest-service/src/main/resources/json/schema/entity/data/chart.json index f9bc4edd03f..7709080258e 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/data/chart.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/data/chart.json @@ -94,9 +94,8 @@ "$ref": "#/definitions/chartType" }, "chartUrl": { - "description": "Chart URL, pointing to its own Service URL.", - "type": "string", - "format": "uri" + "description": "Chart URL suffix from its service.", + "type": "string" }, "href": { "description": "Link to the resource corresponding to this entity.", diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/data/dashboard.json b/catalog-rest-service/src/main/resources/json/schema/entity/data/dashboard.json index de952ccc7d2..c4619b25270 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/data/dashboard.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/data/dashboard.json @@ -40,9 +40,8 @@ "type": "string" }, "dashboardUrl": { - "description": "Dashboard URL.", - "type": "string", - "format": "uri" + "description": "Dashboard URL suffix from its service.", + "type": "string" }, "charts": { "description": "All the charts included in this Dashboard.", diff --git a/ingestion/src/airflow_provider_openmetadata/lineage/utils.py b/ingestion/src/airflow_provider_openmetadata/lineage/utils.py index a56a809ad40..b968ec70e83 100644 --- a/ingestion/src/airflow_provider_openmetadata/lineage/utils.py +++ b/ingestion/src/airflow_provider_openmetadata/lineage/utils.py @@ -155,12 +155,9 @@ def create_or_update_pipeline( # pylint: disable=too-many-locals :param metadata: OpenMetadata API client :return: PipelineEntity """ - pipeline_service_url = conf.get("webserver", "base_url") - dag_url = f"{pipeline_service_url}/tree?dag_id={dag.dag_id}" - task_url = ( - f"{pipeline_service_url}/taskinstance/list/" - + f"?flt1_dag_id_equals={dag.dag_id}&_flt_3_task_id={operator.task_id}" - ) + dag_url = f"/tree?dag_id={dag.dag_id}" + task_url = f"/taskinstance/list/?flt1_dag_id_equals={dag.dag_id}&_flt_3_task_id={operator.task_id}" + dag_start_date = dag.start_date.isoformat() if dag.start_date else None task_start_date = ( task_instance.start_date.isoformat() if task_instance.start_date else None diff --git a/ingestion/src/metadata/ingestion/source/dashboard/looker.py b/ingestion/src/metadata/ingestion/source/dashboard/looker.py index ca62d816c8a..15e3c90ce02 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/looker.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/looker.py @@ -104,7 +104,7 @@ class LookerSource(DashboardSourceService): ] return self.client.dashboard(dashboard_id=dashboard.id, fields=",".join(fields)) - def get_dashboard_entity(self, dashboard_details: object) -> Dashboard: + def get_dashboard_entity(self, dashboard_details) -> Dashboard: """ Method to Get Dashboard Entity """ @@ -114,25 +114,25 @@ class LookerSource(DashboardSourceService): displayName=dashboard_details.title, description=dashboard_details.description or "", charts=self.chart_names, - url=f"{self.service_connection.hostPort}/dashboards/{dashboard_details.id}", + url=f"/dashboards/{dashboard_details.id}", service=EntityReference(id=self.service.id, type="dashboardService"), ) - def get_lineage(self, dashboard_details: object) -> AddLineageRequest: + def get_lineage(self, dashboard_details) -> Optional[AddLineageRequest]: """ Get lineage between dashboard and data sources """ logger.info("Lineage not implemented for Looker") + return None def process_charts(self) -> Optional[Iterable[Chart]]: """ Get lineage between dashboard and data sources """ logger.info("Fetch Charts Not implemented for Looker") + return None - def fetch_dashboard_charts( - self, dashboard_details: object - ) -> Optional[Iterable[Chart]]: + def fetch_dashboard_charts(self, dashboard_details) -> Optional[Iterable[Chart]]: """ Metod to fetch charts linked to dashboard """ @@ -151,7 +151,7 @@ class LookerSource(DashboardSourceService): displayName=dashboard_elements.title or "", description="", chart_type=dashboard_elements.type, - url=f"{self.service_connection.hostPort}/dashboard_elements/{dashboard_elements.id}", + url=f"/dashboard_elements/{dashboard_elements.id}", service=EntityReference( id=self.service.id, type="dashboardService" ), diff --git a/ingestion/src/metadata/ingestion/source/dashboard/metabase.py b/ingestion/src/metadata/ingestion/source/dashboard/metabase.py index e044df4980b..5f2c5d39a03 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/metabase.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/metabase.py @@ -40,6 +40,7 @@ from metadata.ingestion.source.database.common_db_source import SQLSourceStatus from metadata.utils import fqn from metadata.utils.connections import get_connection from metadata.utils.filters import filter_by_chart +from metadata.utils.helpers import replace_special_with from metadata.utils.logger import ingestion_logger HEADERS = {"Content-Type": "application/json", "Accept": "*/*"} @@ -128,11 +129,16 @@ class MetabaseSource(DashboardSourceService): """ Method to Get Dashboard Entity """ + dashboard_url = ( + f"/dashboard/{dashboard_details['id']}-" + f"{replace_special_with(raw=dashboard_details['name'].lower(), replacement='-')}" + ) + yield from self.fetch_dashboard_charts(dashboard_details) yield Dashboard( id=uuid.uuid4(), name=dashboard_details["name"], - url=self.service_connection.hostPort, + url=dashboard_url, displayName=dashboard_details["name"], description=dashboard_details["description"] if dashboard_details["description"] is not None @@ -145,7 +151,7 @@ class MetabaseSource(DashboardSourceService): """Get chart method Args: - charts: + dashboard_details: Returns: Iterable[Chart] """ @@ -153,6 +159,12 @@ class MetabaseSource(DashboardSourceService): for chart in charts: try: chart_details = chart["card"] + + chart_url = ( + f"/question/{chart_details['id']}-" + f"{replace_special_with(raw=chart_details['name'].lower(), replacement='-')}" + ) + if not ("name" in chart_details): continue if filter_by_chart( @@ -170,7 +182,7 @@ class MetabaseSource(DashboardSourceService): if chart_details["description"] is not None else "", chart_type=str(chart_details["display"]), - url=self.service_connection.hostPort, + url=chart_url, service=EntityReference( id=self.service.id, type="dashboardService" ), diff --git a/ingestion/src/metadata/ingestion/source/dashboard/powerbi.py b/ingestion/src/metadata/ingestion/source/dashboard/powerbi.py index 1f69f302cda..6441acdad40 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/powerbi.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/powerbi.py @@ -101,6 +101,7 @@ class PowerbiSource(DashboardSourceService): yield from self.fetch_dashboard_charts(dashboard_details) yield Dashboard( name=dashboard_details["id"], + # PBI has no hostPort property. All URL details are present in the webUrl property. url=dashboard_details["webUrl"], displayName=dashboard_details["displayName"], description="", @@ -193,7 +194,8 @@ class PowerbiSource(DashboardSourceService): name=chart["id"], displayName=chart["title"], description="", - chart_type="", + chart_type="", # Fix this with https://github.com/open-metadata/OpenMetadata/issues/1673 + # PBI has no hostPort property. All URL details are present in the webUrl property. url=chart["embedUrl"], service=EntityReference( id=self.service.id, type="dashboardService" diff --git a/ingestion/src/metadata/ingestion/source/dashboard/redash.py b/ingestion/src/metadata/ingestion/source/dashboard/redash.py index 70edbcadd60..6f7581169ab 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/redash.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/redash.py @@ -103,7 +103,6 @@ class RedashSource(DashboardSourceService): if dashboard_id is not None: self.status.item_scanned_status() dashboard_data = self.client.get_dashboard(dashboard_id) - dashboard_url = f"{self.service_connection.hostPort}/dashboard/{dashboard_data.get('slug', '')}" dashboard_description = "" for widgets in dashboard_data.get("widgets", []): dashboard_description = widgets.get("text") @@ -115,7 +114,7 @@ class RedashSource(DashboardSourceService): charts=self.dashboards_to_charts[dashboard_id], usageSummary=None, service=EntityReference(id=self.service.id, type="dashboardService"), - url=dashboard_url, + url=f"/dashboard/{dashboard_data.get('slug', '')}", ) def get_lineage(self, dashboard_details: dict) -> Optional[AddLineageRequest]: @@ -123,6 +122,7 @@ class RedashSource(DashboardSourceService): Get lineage between dashboard and data sources """ logger.info("Lineage not implemented for redash") + return None def process_charts(self) -> Optional[Iterable[Chart]]: """ @@ -173,9 +173,7 @@ class RedashSource(DashboardSourceService): service=EntityReference( id=self.service.id, type="dashboardService" ), - url=( - f"{self.service_connection.hostPort}/dashboard/{dashboard_data.get('slug', '')}" - ), + url=f"/dashboard/{dashboard_data.get('slug', '')}", description=visualization["description"] if visualization else "", ) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/superset.py b/ingestion/src/metadata/ingestion/source/dashboard/superset.py index 437bcb3946a..e9903ec42c1 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/superset.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/superset.py @@ -217,22 +217,16 @@ class SupersetSource(DashboardSourceService): Method to Get Dashboard Entity """ self.fetch_dashboard_charts(dashboard_details) - dashboard_id = dashboard_details["id"] - name = dashboard_details["dashboard_title"] - dashboard_url = ( - f"{self.service_connection.hostPort[:-1]}{dashboard_details['url']}" - ) last_modified = ( dateparser.parse(dashboard_details.get("changed_on_utc", "now")).timestamp() * 1000 ) - owners = get_owners(dashboard_details["owners"]) yield Dashboard( - name=dashboard_id, - displayName=name, + name=dashboard_details["id"], + displayName=dashboard_details["dashboard_title"], description="", - url=dashboard_url, - owners=owners, + url=dashboard_details["url"], + owners=get_owners(dashboard_details["owners"]), charts=self.charts, service=EntityReference(id=self.service.id, type="dashboardService"), lastModified=last_modified, @@ -243,6 +237,7 @@ class SupersetSource(DashboardSourceService): Get lineage between dashboard and data sources """ logger.info("Lineage not implemented for superset") + return None def fetch_dashboard_charts(self, dashboard_details: dict) -> None: """ @@ -316,15 +311,9 @@ class SupersetSource(DashboardSourceService): # pylint: disable=too-many-locals def _build_chart(self, chart_json: dict) -> Chart: chart_id = chart_json["id"] - name = chart_json["slice_name"] last_modified = ( dateparser.parse(chart_json.get("changed_on_utc", "now")).timestamp() * 1000 ) - chart_type = chart_json["viz_type"] - chart_url = f"{self.service_connection.hostPort}{chart_json['url']}" - datasource_id = chart_json["datasource_id"] - datasource_fqn = self._get_datasource_from_id(datasource_id) - owners = get_owners(chart_json["owners"]) params = json.loads(chart_json["params"]) metrics = [ get_metric_name(metric) @@ -345,12 +334,12 @@ class SupersetSource(DashboardSourceService): chart = Chart( name=chart_id, - displayName=name, + displayName=chart_json["slice_name"], description="", - chart_type=chart_type, - url=chart_url, - owners=owners, - datasource_fqn=datasource_fqn, + chart_type=chart_json["viz_type"], + url=chart_json["url"], + owners=get_owners(chart_json["owners"]), + datasource_fqn=self._get_datasource_from_id(chart_json["datasource_id"]), lastModified=last_modified, service=EntityReference(id=self.service.id, type="dashboardService"), custom_props=custom_properties, diff --git a/ingestion/src/metadata/ingestion/source/dashboard/tableau.py b/ingestion/src/metadata/ingestion/source/dashboard/tableau.py index ce0c5e03edc..96df3549ed5 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/tableau.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/tableau.py @@ -136,22 +136,19 @@ class TableauSource(DashboardSourceService): Method to Get Dashboard Entity """ self.fetch_dashboard_charts(dashboard_details) - dashboard_id = dashboard_details.get("id") - dashboard_name = dashboard_details.get("name") dashboard_tag = dashboard_details.get("tags") - dashboard_url = dashboard_details.get("webpageUrl") tag_labels = [] if hasattr(dashboard_tag, "tag"): tag_labels = [tag["label"] for tag in dashboard_tag["tag"]] yield Dashboard( id=uuid.uuid4(), - name=dashboard_id, - displayName=dashboard_name, + name=dashboard_details.get("id"), + displayName=dashboard_details.get("name"), description="", owner=self.get_owner(self.owner), charts=self.charts, tags=tag_labels, - url=dashboard_url, + url=dashboard_details.get("webpageUrl"), service=EntityReference(id=self.service.id, type="dashboardService"), last_modified=dateparser.parse(self.chart["updatedAt"]).timestamp() * 1000, ) @@ -203,28 +200,24 @@ class TableauSource(DashboardSourceService): if filter_by_chart(self.source_config.chartFilterPattern, chart_name): self.status.failure(chart_name, "Chart Pattern not allowed") continue - chart_id = self.all_dashboard_details["id"][index] chart_tags = self.all_dashboard_details["tags"][index] - chart_type = self.all_dashboard_details["sheetType"][index] chart_url = ( - f"{self.service_connection.hostPort}/#/site/{self.service_connection.siteName}" + f"/#/site/{self.service_connection.siteName}" f"{self.all_dashboard_details['contentUrl'][index]}" ) - chart_owner = self.all_dashboard_details["owner"][index] - chart_datasource_fqn = chart_url.replace("/", FQN_SEPARATOR) chart_last_modified = self.all_dashboard_details["updatedAt"][index] tag_labels = [] if hasattr(chart_tags, "tag"): for tag in chart_tags["tag"]: tag_labels.append(tag["label"]) yield Chart( - name=chart_id, + name=self.all_dashboard_details["id"][index], displayName=chart_name, description="", - chart_type=chart_type, + chart_type=self.all_dashboard_details["sheetType"][index], url=chart_url, - owners=self.get_owner(chart_owner), - datasource_fqn=chart_datasource_fqn, + owners=self.get_owner(self.all_dashboard_details["owner"][index]), + datasource_fqn=chart_url.replace("/", FQN_SEPARATOR), last_modified=dateparser.parse(chart_last_modified).timestamp() * 1000, service=EntityReference( diff --git a/ingestion/src/metadata/utils/helpers.py b/ingestion/src/metadata/utils/helpers.py index e27f0f69646..ac36fcb7d16 100644 --- a/ingestion/src/metadata/utils/helpers.py +++ b/ingestion/src/metadata/utils/helpers.py @@ -8,7 +8,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import re from datetime import datetime, timedelta from typing import Any, Dict, Iterable, Optional @@ -21,16 +21,12 @@ from metadata.generated.schema.api.services.createDatabaseService import ( from metadata.generated.schema.api.services.createMessagingService import ( CreateMessagingServiceRequest, ) -from metadata.generated.schema.api.services.createPipelineService import ( - CreatePipelineServiceRequest, -) from metadata.generated.schema.api.services.createStorageService import ( CreateStorageServiceRequest, ) from metadata.generated.schema.entity.services.dashboardService import DashboardService from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.entity.services.messagingService import MessagingService -from metadata.generated.schema.entity.services.pipelineService import PipelineService from metadata.generated.schema.entity.services.storageService import StorageService from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, @@ -198,3 +194,13 @@ def get_raw_extract_iter(alchemy_helper) -> Iterable[Dict[str, Any]]: rows = alchemy_helper.execute_query() for row in rows: yield row + + +def replace_special_with(raw: str, replacement: str) -> str: + """ + Replace special characters in a string by a hyphen + :param raw: raw string to clean + :param replacement: string used to replace + :return: clean string + """ + return re.sub(r"[^a-zA-Z0-9]", replacement, raw) diff --git a/ingestion/tests/integration/lineage/airflow/test_airflow_lineage.py b/ingestion/tests/integration/lineage/airflow/test_airflow_lineage.py index 0d5c71aeeb5..298980c702b 100644 --- a/ingestion/tests/integration/lineage/airflow/test_airflow_lineage.py +++ b/ingestion/tests/integration/lineage/airflow/test_airflow_lineage.py @@ -17,8 +17,9 @@ Other airflow versions require a different way to mock the DAG and Task runs. """ +import os from datetime import datetime, timedelta -from unittest import TestCase +from unittest import TestCase, mock # The DAG object; we'll need this to instantiate a DAG from airflow import DAG @@ -193,6 +194,11 @@ class AirflowLineageTest(TestCase): self.assertIsNone(get_xlets(self.dag.get_task("task3"), "_inlets")) self.assertIsNone(get_xlets(self.dag.get_task("task3"), "_outlets")) + @mock.patch.dict( + os.environ, + {"AIRFLOW__LINEAGE__AIRFLOW_SERVICE_NAME": "int_airflow"}, + clear=True, + ) def test_lineage(self): """ Test end to end @@ -214,16 +220,21 @@ class AirflowLineageTest(TestCase): ) self.assertIsNotNone( - self.metadata.get_by_name(entity=Pipeline, fqn="local_airflow_3.lineage") + self.metadata.get_by_name(entity=Pipeline, fqn="int_airflow.lineage") ) lineage = self.metadata.get_lineage_by_name( - entity=Pipeline, fqn="local_airflow_3.lineage" + entity=Pipeline, fqn="int_airflow.lineage" ) nodes = {node["id"] for node in lineage["nodes"]} self.assertIn(str(self.table.id.__root__), nodes) + @mock.patch.dict( + os.environ, + {"AIRFLOW__LINEAGE__AIRFLOW_SERVICE_NAME": "int_airflow"}, + clear=True, + ) def test_lineage_task_group(self): """ Test end to end for task groups. @@ -298,14 +309,26 @@ class AirflowLineageTest(TestCase): }, ) - pipeline = self.metadata.get_by_name( - entity=Pipeline, fqn="local_airflow_3.task_group_lineage", fields=["tasks"] + pipeline: Pipeline = self.metadata.get_by_name( + entity=Pipeline, fqn="int_airflow.task_group_lineage", fields=["tasks"] ) self.assertIsNotNone(pipeline) self.assertIn("group1.task1", {task.name for task in pipeline.tasks}) self.assertIn("group1.task2", {task.name for task in pipeline.tasks}) self.assertIn("end", {task.name for task in pipeline.tasks}) + # Validate URL building + self.assertEqual("/tree?dag_id=task_group_lineage", pipeline.pipelineUrl) + self.assertIn( + "/taskinstance/list/?flt1_dag_id_equals=task_group_lineage&_flt_3_task_id=end", + {task.taskUrl for task in pipeline.tasks}, + ) + + @mock.patch.dict( + os.environ, + {"AIRFLOW__LINEAGE__AIRFLOW_SERVICE_NAME": "int_airflow"}, + clear=True, + ) def test_clean_tasks(self): """ Check that we can safely remove tasks from a Pipeline @@ -360,7 +383,7 @@ class AirflowLineageTest(TestCase): ) pipeline = self.metadata.get_by_name( - entity=Pipeline, fqn="local_airflow_3.clean_test", fields=["tasks"] + entity=Pipeline, fqn="int_airflow.clean_test", fields=["tasks"] ) self.assertIsNotNone(pipeline) self.assertIn("task1", {task.name for task in pipeline.tasks}) @@ -415,7 +438,7 @@ class AirflowLineageTest(TestCase): ) pipeline: Pipeline = self.metadata.get_by_name( - entity=Pipeline, fqn="local_airflow_3.clean_test", fields=["tasks"] + entity=Pipeline, fqn="int_airflow.clean_test", fields=["tasks"] ) self.assertIsNotNone(pipeline) self.assertIn("task1", {task.name for task in pipeline.tasks})