Fix #4696 - Only store URL suffixes when extracting metadata (#5281)

* Only store URL suffix

* Fix test service name
This commit is contained in:
Pere Miquel Brull 2022-06-03 11:43:40 +02:00 committed by GitHub
parent 49fa5c4f7f
commit 9138c70dee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 98 additions and 82 deletions

View File

@ -23,9 +23,8 @@
"$ref": "../../entity/data/chart.json#/definitions/chartType"
},
"chartUrl": {
"description": "Chart URL, pointing to its own Service URL",
"type": "string",
"format": "uri"
"description": "Chart URL suffix from its service.",
"type": "string"
},
"tables": {
"description": "Link to tables used in this chart.",

View File

@ -20,9 +20,8 @@
"$ref": "../../type/basic.json#/definitions/markdown"
},
"dashboardUrl": {
"description": "Dashboard URL",
"type": "string",
"format": "uri"
"description": "Dashboard URL suffix from its service.",
"type": "string"
},
"charts": {
"description": "All the charts included in this Dashboard.",

View File

@ -94,9 +94,8 @@
"$ref": "#/definitions/chartType"
},
"chartUrl": {
"description": "Chart URL, pointing to its own Service URL.",
"type": "string",
"format": "uri"
"description": "Chart URL suffix from its service.",
"type": "string"
},
"href": {
"description": "Link to the resource corresponding to this entity.",

View File

@ -40,9 +40,8 @@
"type": "string"
},
"dashboardUrl": {
"description": "Dashboard URL.",
"type": "string",
"format": "uri"
"description": "Dashboard URL suffix from its service.",
"type": "string"
},
"charts": {
"description": "All the charts included in this Dashboard.",

View File

@ -155,12 +155,9 @@ def create_or_update_pipeline( # pylint: disable=too-many-locals
:param metadata: OpenMetadata API client
:return: PipelineEntity
"""
pipeline_service_url = conf.get("webserver", "base_url")
dag_url = f"{pipeline_service_url}/tree?dag_id={dag.dag_id}"
task_url = (
f"{pipeline_service_url}/taskinstance/list/"
+ f"?flt1_dag_id_equals={dag.dag_id}&_flt_3_task_id={operator.task_id}"
)
dag_url = f"/tree?dag_id={dag.dag_id}"
task_url = f"/taskinstance/list/?flt1_dag_id_equals={dag.dag_id}&_flt_3_task_id={operator.task_id}"
dag_start_date = dag.start_date.isoformat() if dag.start_date else None
task_start_date = (
task_instance.start_date.isoformat() if task_instance.start_date else None

View File

@ -104,7 +104,7 @@ class LookerSource(DashboardSourceService):
]
return self.client.dashboard(dashboard_id=dashboard.id, fields=",".join(fields))
def get_dashboard_entity(self, dashboard_details: object) -> Dashboard:
def get_dashboard_entity(self, dashboard_details) -> Dashboard:
"""
Method to Get Dashboard Entity
"""
@ -114,25 +114,25 @@ class LookerSource(DashboardSourceService):
displayName=dashboard_details.title,
description=dashboard_details.description or "",
charts=self.chart_names,
url=f"{self.service_connection.hostPort}/dashboards/{dashboard_details.id}",
url=f"/dashboards/{dashboard_details.id}",
service=EntityReference(id=self.service.id, type="dashboardService"),
)
def get_lineage(self, dashboard_details: object) -> AddLineageRequest:
def get_lineage(self, dashboard_details) -> Optional[AddLineageRequest]:
"""
Get lineage between dashboard and data sources
"""
logger.info("Lineage not implemented for Looker")
return None
def process_charts(self) -> Optional[Iterable[Chart]]:
"""
Get lineage between dashboard and data sources
"""
logger.info("Fetch Charts Not implemented for Looker")
return None
def fetch_dashboard_charts(
self, dashboard_details: object
) -> Optional[Iterable[Chart]]:
def fetch_dashboard_charts(self, dashboard_details) -> Optional[Iterable[Chart]]:
"""
Metod to fetch charts linked to dashboard
"""
@ -151,7 +151,7 @@ class LookerSource(DashboardSourceService):
displayName=dashboard_elements.title or "",
description="",
chart_type=dashboard_elements.type,
url=f"{self.service_connection.hostPort}/dashboard_elements/{dashboard_elements.id}",
url=f"/dashboard_elements/{dashboard_elements.id}",
service=EntityReference(
id=self.service.id, type="dashboardService"
),

View File

@ -40,6 +40,7 @@ from metadata.ingestion.source.database.common_db_source import SQLSourceStatus
from metadata.utils import fqn
from metadata.utils.connections import get_connection
from metadata.utils.filters import filter_by_chart
from metadata.utils.helpers import replace_special_with
from metadata.utils.logger import ingestion_logger
HEADERS = {"Content-Type": "application/json", "Accept": "*/*"}
@ -128,11 +129,16 @@ class MetabaseSource(DashboardSourceService):
"""
Method to Get Dashboard Entity
"""
dashboard_url = (
f"/dashboard/{dashboard_details['id']}-"
f"{replace_special_with(raw=dashboard_details['name'].lower(), replacement='-')}"
)
yield from self.fetch_dashboard_charts(dashboard_details)
yield Dashboard(
id=uuid.uuid4(),
name=dashboard_details["name"],
url=self.service_connection.hostPort,
url=dashboard_url,
displayName=dashboard_details["name"],
description=dashboard_details["description"]
if dashboard_details["description"] is not None
@ -145,7 +151,7 @@ class MetabaseSource(DashboardSourceService):
"""Get chart method
Args:
charts:
dashboard_details:
Returns:
Iterable[Chart]
"""
@ -153,6 +159,12 @@ class MetabaseSource(DashboardSourceService):
for chart in charts:
try:
chart_details = chart["card"]
chart_url = (
f"/question/{chart_details['id']}-"
f"{replace_special_with(raw=chart_details['name'].lower(), replacement='-')}"
)
if not ("name" in chart_details):
continue
if filter_by_chart(
@ -170,7 +182,7 @@ class MetabaseSource(DashboardSourceService):
if chart_details["description"] is not None
else "",
chart_type=str(chart_details["display"]),
url=self.service_connection.hostPort,
url=chart_url,
service=EntityReference(
id=self.service.id, type="dashboardService"
),

View File

@ -101,6 +101,7 @@ class PowerbiSource(DashboardSourceService):
yield from self.fetch_dashboard_charts(dashboard_details)
yield Dashboard(
name=dashboard_details["id"],
# PBI has no hostPort property. All URL details are present in the webUrl property.
url=dashboard_details["webUrl"],
displayName=dashboard_details["displayName"],
description="",
@ -193,7 +194,8 @@ class PowerbiSource(DashboardSourceService):
name=chart["id"],
displayName=chart["title"],
description="",
chart_type="",
chart_type="", # Fix this with https://github.com/open-metadata/OpenMetadata/issues/1673
# PBI has no hostPort property. All URL details are present in the webUrl property.
url=chart["embedUrl"],
service=EntityReference(
id=self.service.id, type="dashboardService"

View File

@ -103,7 +103,6 @@ class RedashSource(DashboardSourceService):
if dashboard_id is not None:
self.status.item_scanned_status()
dashboard_data = self.client.get_dashboard(dashboard_id)
dashboard_url = f"{self.service_connection.hostPort}/dashboard/{dashboard_data.get('slug', '')}"
dashboard_description = ""
for widgets in dashboard_data.get("widgets", []):
dashboard_description = widgets.get("text")
@ -115,7 +114,7 @@ class RedashSource(DashboardSourceService):
charts=self.dashboards_to_charts[dashboard_id],
usageSummary=None,
service=EntityReference(id=self.service.id, type="dashboardService"),
url=dashboard_url,
url=f"/dashboard/{dashboard_data.get('slug', '')}",
)
def get_lineage(self, dashboard_details: dict) -> Optional[AddLineageRequest]:
@ -123,6 +122,7 @@ class RedashSource(DashboardSourceService):
Get lineage between dashboard and data sources
"""
logger.info("Lineage not implemented for redash")
return None
def process_charts(self) -> Optional[Iterable[Chart]]:
"""
@ -173,9 +173,7 @@ class RedashSource(DashboardSourceService):
service=EntityReference(
id=self.service.id, type="dashboardService"
),
url=(
f"{self.service_connection.hostPort}/dashboard/{dashboard_data.get('slug', '')}"
),
url=f"/dashboard/{dashboard_data.get('slug', '')}",
description=visualization["description"] if visualization else "",
)

View File

@ -217,22 +217,16 @@ class SupersetSource(DashboardSourceService):
Method to Get Dashboard Entity
"""
self.fetch_dashboard_charts(dashboard_details)
dashboard_id = dashboard_details["id"]
name = dashboard_details["dashboard_title"]
dashboard_url = (
f"{self.service_connection.hostPort[:-1]}{dashboard_details['url']}"
)
last_modified = (
dateparser.parse(dashboard_details.get("changed_on_utc", "now")).timestamp()
* 1000
)
owners = get_owners(dashboard_details["owners"])
yield Dashboard(
name=dashboard_id,
displayName=name,
name=dashboard_details["id"],
displayName=dashboard_details["dashboard_title"],
description="",
url=dashboard_url,
owners=owners,
url=dashboard_details["url"],
owners=get_owners(dashboard_details["owners"]),
charts=self.charts,
service=EntityReference(id=self.service.id, type="dashboardService"),
lastModified=last_modified,
@ -243,6 +237,7 @@ class SupersetSource(DashboardSourceService):
Get lineage between dashboard and data sources
"""
logger.info("Lineage not implemented for superset")
return None
def fetch_dashboard_charts(self, dashboard_details: dict) -> None:
"""
@ -316,15 +311,9 @@ class SupersetSource(DashboardSourceService):
# pylint: disable=too-many-locals
def _build_chart(self, chart_json: dict) -> Chart:
chart_id = chart_json["id"]
name = chart_json["slice_name"]
last_modified = (
dateparser.parse(chart_json.get("changed_on_utc", "now")).timestamp() * 1000
)
chart_type = chart_json["viz_type"]
chart_url = f"{self.service_connection.hostPort}{chart_json['url']}"
datasource_id = chart_json["datasource_id"]
datasource_fqn = self._get_datasource_from_id(datasource_id)
owners = get_owners(chart_json["owners"])
params = json.loads(chart_json["params"])
metrics = [
get_metric_name(metric)
@ -345,12 +334,12 @@ class SupersetSource(DashboardSourceService):
chart = Chart(
name=chart_id,
displayName=name,
displayName=chart_json["slice_name"],
description="",
chart_type=chart_type,
url=chart_url,
owners=owners,
datasource_fqn=datasource_fqn,
chart_type=chart_json["viz_type"],
url=chart_json["url"],
owners=get_owners(chart_json["owners"]),
datasource_fqn=self._get_datasource_from_id(chart_json["datasource_id"]),
lastModified=last_modified,
service=EntityReference(id=self.service.id, type="dashboardService"),
custom_props=custom_properties,

View File

@ -136,22 +136,19 @@ class TableauSource(DashboardSourceService):
Method to Get Dashboard Entity
"""
self.fetch_dashboard_charts(dashboard_details)
dashboard_id = dashboard_details.get("id")
dashboard_name = dashboard_details.get("name")
dashboard_tag = dashboard_details.get("tags")
dashboard_url = dashboard_details.get("webpageUrl")
tag_labels = []
if hasattr(dashboard_tag, "tag"):
tag_labels = [tag["label"] for tag in dashboard_tag["tag"]]
yield Dashboard(
id=uuid.uuid4(),
name=dashboard_id,
displayName=dashboard_name,
name=dashboard_details.get("id"),
displayName=dashboard_details.get("name"),
description="",
owner=self.get_owner(self.owner),
charts=self.charts,
tags=tag_labels,
url=dashboard_url,
url=dashboard_details.get("webpageUrl"),
service=EntityReference(id=self.service.id, type="dashboardService"),
last_modified=dateparser.parse(self.chart["updatedAt"]).timestamp() * 1000,
)
@ -203,28 +200,24 @@ class TableauSource(DashboardSourceService):
if filter_by_chart(self.source_config.chartFilterPattern, chart_name):
self.status.failure(chart_name, "Chart Pattern not allowed")
continue
chart_id = self.all_dashboard_details["id"][index]
chart_tags = self.all_dashboard_details["tags"][index]
chart_type = self.all_dashboard_details["sheetType"][index]
chart_url = (
f"{self.service_connection.hostPort}/#/site/{self.service_connection.siteName}"
f"/#/site/{self.service_connection.siteName}"
f"{self.all_dashboard_details['contentUrl'][index]}"
)
chart_owner = self.all_dashboard_details["owner"][index]
chart_datasource_fqn = chart_url.replace("/", FQN_SEPARATOR)
chart_last_modified = self.all_dashboard_details["updatedAt"][index]
tag_labels = []
if hasattr(chart_tags, "tag"):
for tag in chart_tags["tag"]:
tag_labels.append(tag["label"])
yield Chart(
name=chart_id,
name=self.all_dashboard_details["id"][index],
displayName=chart_name,
description="",
chart_type=chart_type,
chart_type=self.all_dashboard_details["sheetType"][index],
url=chart_url,
owners=self.get_owner(chart_owner),
datasource_fqn=chart_datasource_fqn,
owners=self.get_owner(self.all_dashboard_details["owner"][index]),
datasource_fqn=chart_url.replace("/", FQN_SEPARATOR),
last_modified=dateparser.parse(chart_last_modified).timestamp()
* 1000,
service=EntityReference(

View File

@ -8,7 +8,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
from datetime import datetime, timedelta
from typing import Any, Dict, Iterable, Optional
@ -21,16 +21,12 @@ from metadata.generated.schema.api.services.createDatabaseService import (
from metadata.generated.schema.api.services.createMessagingService import (
CreateMessagingServiceRequest,
)
from metadata.generated.schema.api.services.createPipelineService import (
CreatePipelineServiceRequest,
)
from metadata.generated.schema.api.services.createStorageService import (
CreateStorageServiceRequest,
)
from metadata.generated.schema.entity.services.dashboardService import DashboardService
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.services.messagingService import MessagingService
from metadata.generated.schema.entity.services.pipelineService import PipelineService
from metadata.generated.schema.entity.services.storageService import StorageService
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
@ -198,3 +194,13 @@ def get_raw_extract_iter(alchemy_helper) -> Iterable[Dict[str, Any]]:
rows = alchemy_helper.execute_query()
for row in rows:
yield row
def replace_special_with(raw: str, replacement: str) -> str:
"""
Replace special characters in a string by a hyphen
:param raw: raw string to clean
:param replacement: string used to replace
:return: clean string
"""
return re.sub(r"[^a-zA-Z0-9]", replacement, raw)

View File

@ -17,8 +17,9 @@ Other airflow versions require a different way to
mock the DAG and Task runs.
"""
import os
from datetime import datetime, timedelta
from unittest import TestCase
from unittest import TestCase, mock
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
@ -193,6 +194,11 @@ class AirflowLineageTest(TestCase):
self.assertIsNone(get_xlets(self.dag.get_task("task3"), "_inlets"))
self.assertIsNone(get_xlets(self.dag.get_task("task3"), "_outlets"))
@mock.patch.dict(
os.environ,
{"AIRFLOW__LINEAGE__AIRFLOW_SERVICE_NAME": "int_airflow"},
clear=True,
)
def test_lineage(self):
"""
Test end to end
@ -214,16 +220,21 @@ class AirflowLineageTest(TestCase):
)
self.assertIsNotNone(
self.metadata.get_by_name(entity=Pipeline, fqn="local_airflow_3.lineage")
self.metadata.get_by_name(entity=Pipeline, fqn="int_airflow.lineage")
)
lineage = self.metadata.get_lineage_by_name(
entity=Pipeline, fqn="local_airflow_3.lineage"
entity=Pipeline, fqn="int_airflow.lineage"
)
nodes = {node["id"] for node in lineage["nodes"]}
self.assertIn(str(self.table.id.__root__), nodes)
@mock.patch.dict(
os.environ,
{"AIRFLOW__LINEAGE__AIRFLOW_SERVICE_NAME": "int_airflow"},
clear=True,
)
def test_lineage_task_group(self):
"""
Test end to end for task groups.
@ -298,14 +309,26 @@ class AirflowLineageTest(TestCase):
},
)
pipeline = self.metadata.get_by_name(
entity=Pipeline, fqn="local_airflow_3.task_group_lineage", fields=["tasks"]
pipeline: Pipeline = self.metadata.get_by_name(
entity=Pipeline, fqn="int_airflow.task_group_lineage", fields=["tasks"]
)
self.assertIsNotNone(pipeline)
self.assertIn("group1.task1", {task.name for task in pipeline.tasks})
self.assertIn("group1.task2", {task.name for task in pipeline.tasks})
self.assertIn("end", {task.name for task in pipeline.tasks})
# Validate URL building
self.assertEqual("/tree?dag_id=task_group_lineage", pipeline.pipelineUrl)
self.assertIn(
"/taskinstance/list/?flt1_dag_id_equals=task_group_lineage&_flt_3_task_id=end",
{task.taskUrl for task in pipeline.tasks},
)
@mock.patch.dict(
os.environ,
{"AIRFLOW__LINEAGE__AIRFLOW_SERVICE_NAME": "int_airflow"},
clear=True,
)
def test_clean_tasks(self):
"""
Check that we can safely remove tasks from a Pipeline
@ -360,7 +383,7 @@ class AirflowLineageTest(TestCase):
)
pipeline = self.metadata.get_by_name(
entity=Pipeline, fqn="local_airflow_3.clean_test", fields=["tasks"]
entity=Pipeline, fqn="int_airflow.clean_test", fields=["tasks"]
)
self.assertIsNotNone(pipeline)
self.assertIn("task1", {task.name for task in pipeline.tasks})
@ -415,7 +438,7 @@ class AirflowLineageTest(TestCase):
)
pipeline: Pipeline = self.metadata.get_by_name(
entity=Pipeline, fqn="local_airflow_3.clean_test", fields=["tasks"]
entity=Pipeline, fqn="int_airflow.clean_test", fields=["tasks"]
)
self.assertIsNotNone(pipeline)
self.assertIn("task1", {task.name for task in pipeline.tasks})