mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-08-24 00:48:36 +00:00
parent
3d345f9b37
commit
9f2b10f6e2
@ -59,7 +59,7 @@ from metadata.ingestion.source.dashboard.dashboard_service import (
|
|||||||
)
|
)
|
||||||
from metadata.utils import fqn
|
from metadata.utils import fqn
|
||||||
from metadata.utils.filters import filter_by_chart
|
from metadata.utils.filters import filter_by_chart
|
||||||
from metadata.utils.helpers import get_standard_chart_type
|
from metadata.utils.helpers import clean_uri, get_standard_chart_type
|
||||||
from metadata.utils.logger import ingestion_logger
|
from metadata.utils.logger import ingestion_logger
|
||||||
|
|
||||||
logger = ingestion_logger()
|
logger = ingestion_logger()
|
||||||
@ -202,7 +202,7 @@ class LookerSource(DashboardServiceSource):
|
|||||||
)
|
)
|
||||||
for chart in self.context.charts
|
for chart in self.context.charts
|
||||||
],
|
],
|
||||||
dashboardUrl=f"/dashboards/{dashboard_details.id}",
|
dashboardUrl=f"{clean_uri(self.service_connection.hostPort)}/dashboards/{dashboard_details.id}",
|
||||||
service=self.context.dashboard_service.fullyQualifiedName.__root__,
|
service=self.context.dashboard_service.fullyQualifiedName.__root__,
|
||||||
)
|
)
|
||||||
yield dashboard_request
|
yield dashboard_request
|
||||||
@ -367,7 +367,7 @@ class LookerSource(DashboardServiceSource):
|
|||||||
displayName=chart.title or chart.id,
|
displayName=chart.title or chart.id,
|
||||||
description=self.build_chart_description(chart) or None,
|
description=self.build_chart_description(chart) or None,
|
||||||
chartType=get_standard_chart_type(chart.type).value,
|
chartType=get_standard_chart_type(chart.type).value,
|
||||||
chartUrl=f"/dashboard_elements/{chart.id}",
|
chartUrl=f"{clean_uri(self.service_connection.hostPort)}/dashboard_elements/{chart.id}",
|
||||||
service=self.context.dashboard_service.fullyQualifiedName.__root__,
|
service=self.context.dashboard_service.fullyQualifiedName.__root__,
|
||||||
)
|
)
|
||||||
self.status.scanned(chart.id)
|
self.status.scanned(chart.id)
|
||||||
|
@ -36,7 +36,11 @@ from metadata.ingestion.source.dashboard.dashboard_service import DashboardServi
|
|||||||
from metadata.ingestion.source.dashboard.metabase.models import Dashboard
|
from metadata.ingestion.source.dashboard.metabase.models import Dashboard
|
||||||
from metadata.utils import fqn
|
from metadata.utils import fqn
|
||||||
from metadata.utils.filters import filter_by_chart
|
from metadata.utils.filters import filter_by_chart
|
||||||
from metadata.utils.helpers import get_standard_chart_type, replace_special_with
|
from metadata.utils.helpers import (
|
||||||
|
clean_uri,
|
||||||
|
get_standard_chart_type,
|
||||||
|
replace_special_with,
|
||||||
|
)
|
||||||
from metadata.utils.logger import ingestion_logger
|
from metadata.utils.logger import ingestion_logger
|
||||||
|
|
||||||
logger = ingestion_logger()
|
logger = ingestion_logger()
|
||||||
@ -85,7 +89,8 @@ class MetabaseSource(DashboardServiceSource):
|
|||||||
Method to Get Dashboard Entity
|
Method to Get Dashboard Entity
|
||||||
"""
|
"""
|
||||||
dashboard_url = (
|
dashboard_url = (
|
||||||
f"/dashboard/{dashboard_details['id']}-"
|
f"{clean_uri(self.service_connection.hostPort)}/dashboard/"
|
||||||
|
f"{dashboard_details['id']}-"
|
||||||
f"{replace_special_with(raw=dashboard_details['name'].lower(), replacement='-')}"
|
f"{replace_special_with(raw=dashboard_details['name'].lower(), replacement='-')}"
|
||||||
)
|
)
|
||||||
dashboard_request = CreateDashboardRequest(
|
dashboard_request = CreateDashboardRequest(
|
||||||
@ -124,17 +129,18 @@ class MetabaseSource(DashboardServiceSource):
|
|||||||
if "id" not in chart_details:
|
if "id" not in chart_details:
|
||||||
continue
|
continue
|
||||||
chart_url = (
|
chart_url = (
|
||||||
f"/question/{chart_details['id']}-"
|
f"{clean_uri(self.service_connection.hostPort)}/question/"
|
||||||
|
f"{chart_details['id']}-"
|
||||||
f"{replace_special_with(raw=chart_details['name'].lower(), replacement='-')}"
|
f"{replace_special_with(raw=chart_details['name'].lower(), replacement='-')}"
|
||||||
)
|
)
|
||||||
|
|
||||||
if "name" not in chart_details:
|
|
||||||
continue
|
|
||||||
if filter_by_chart(
|
if filter_by_chart(
|
||||||
self.source_config.chartFilterPattern, chart_details["name"]
|
self.source_config.chartFilterPattern,
|
||||||
|
chart_details.get("name", chart_details["id"]),
|
||||||
):
|
):
|
||||||
self.status.filter(
|
self.status.filter(
|
||||||
chart_details["name"], "Chart Pattern not allowed"
|
chart_details.get("name", chart_details["id"]),
|
||||||
|
"Chart Pattern not allowed",
|
||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
yield CreateChartRequest(
|
yield CreateChartRequest(
|
||||||
@ -147,7 +153,7 @@ class MetabaseSource(DashboardServiceSource):
|
|||||||
chartUrl=chart_url,
|
chartUrl=chart_url,
|
||||||
service=self.context.dashboard_service.fullyQualifiedName.__root__,
|
service=self.context.dashboard_service.fullyQualifiedName.__root__,
|
||||||
)
|
)
|
||||||
self.status.scanned(chart_details["name"])
|
self.status.scanned(chart_details.get("name", chart_details["id"]))
|
||||||
except Exception as exc: # pylint: disable=broad-except
|
except Exception as exc: # pylint: disable=broad-except
|
||||||
logger.debug(traceback.format_exc())
|
logger.debug(traceback.format_exc())
|
||||||
logger.warning(f"Error creating chart [{chart}]: {exc}")
|
logger.warning(f"Error creating chart [{chart}]: {exc}")
|
||||||
|
@ -301,20 +301,3 @@ class PowerBiApiClient:
|
|||||||
poll += 1
|
poll += 1
|
||||||
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def get_dashboard_url(self, workspace_id: str, dashboard_id: str) -> str:
|
|
||||||
"""
|
|
||||||
Method to build the dashboard url
|
|
||||||
"""
|
|
||||||
return f"/groups/{workspace_id}/dashboards/{dashboard_id}"
|
|
||||||
|
|
||||||
def get_chart_url(
|
|
||||||
self, report_id: Optional[str], workspace_id: str, dashboard_id: str
|
|
||||||
) -> str:
|
|
||||||
"""
|
|
||||||
Method to build the chart url
|
|
||||||
"""
|
|
||||||
chart_url_postfix = (
|
|
||||||
f"reports/{report_id}" if report_id else f"dashboards/{dashboard_id}"
|
|
||||||
)
|
|
||||||
return f"/groups/{workspace_id}/{chart_url_postfix}"
|
|
||||||
|
@ -33,6 +33,7 @@ from metadata.ingestion.source.dashboard.dashboard_service import DashboardServi
|
|||||||
from metadata.ingestion.source.dashboard.powerbi.models import Dataset, PowerBIDashboard
|
from metadata.ingestion.source.dashboard.powerbi.models import Dataset, PowerBIDashboard
|
||||||
from metadata.utils import fqn
|
from metadata.utils import fqn
|
||||||
from metadata.utils.filters import filter_by_chart, filter_by_dashboard
|
from metadata.utils.filters import filter_by_chart, filter_by_dashboard
|
||||||
|
from metadata.utils.helpers import clean_uri
|
||||||
from metadata.utils.logger import ingestion_logger
|
from metadata.utils.logger import ingestion_logger
|
||||||
|
|
||||||
logger = ingestion_logger()
|
logger = ingestion_logger()
|
||||||
@ -198,6 +199,29 @@ class PowerbiSource(DashboardServiceSource):
|
|||||||
"""
|
"""
|
||||||
return dashboard
|
return dashboard
|
||||||
|
|
||||||
|
def get_dashboard_url(self, workspace_id: str, dashboard_id: str) -> str:
|
||||||
|
"""
|
||||||
|
Method to build the dashboard url
|
||||||
|
"""
|
||||||
|
return (
|
||||||
|
f"{clean_uri(self.service_connection.hostPort)}/groups/"
|
||||||
|
f"{workspace_id}/dashboards/{dashboard_id}"
|
||||||
|
)
|
||||||
|
|
||||||
|
def get_chart_url(
|
||||||
|
self, report_id: Optional[str], workspace_id: str, dashboard_id: str
|
||||||
|
) -> str:
|
||||||
|
"""
|
||||||
|
Method to build the chart url
|
||||||
|
"""
|
||||||
|
chart_url_postfix = (
|
||||||
|
f"reports/{report_id}" if report_id else f"dashboards/{dashboard_id}"
|
||||||
|
)
|
||||||
|
return (
|
||||||
|
f"{clean_uri(self.service_connection.hostPort)}/groups/"
|
||||||
|
f"{workspace_id}/{chart_url_postfix}"
|
||||||
|
)
|
||||||
|
|
||||||
def yield_dashboard(
|
def yield_dashboard(
|
||||||
self, dashboard_details: PowerBIDashboard
|
self, dashboard_details: PowerBIDashboard
|
||||||
) -> Iterable[CreateDashboardRequest]:
|
) -> Iterable[CreateDashboardRequest]:
|
||||||
@ -206,8 +230,7 @@ class PowerbiSource(DashboardServiceSource):
|
|||||||
"""
|
"""
|
||||||
dashboard_request = CreateDashboardRequest(
|
dashboard_request = CreateDashboardRequest(
|
||||||
name=dashboard_details.id,
|
name=dashboard_details.id,
|
||||||
# PBI has no hostPort property. Urls are built manually.
|
dashboardUrl=self.get_dashboard_url(
|
||||||
dashboardUrl=self.client.get_dashboard_url(
|
|
||||||
workspace_id=self.context.workspace.id,
|
workspace_id=self.context.workspace.id,
|
||||||
dashboard_id=dashboard_details.id,
|
dashboard_id=dashboard_details.id,
|
||||||
),
|
),
|
||||||
@ -297,8 +320,7 @@ class PowerbiSource(DashboardServiceSource):
|
|||||||
displayName=chart_display_name,
|
displayName=chart_display_name,
|
||||||
description="",
|
description="",
|
||||||
chartType=ChartType.Other.value,
|
chartType=ChartType.Other.value,
|
||||||
# PBI has no hostPort property. All URL details are present in the webUrl property.
|
chartUrl=self.get_chart_url(
|
||||||
chartUrl=self.client.get_chart_url(
|
|
||||||
report_id=chart.reportId,
|
report_id=chart.reportId,
|
||||||
workspace_id=self.context.workspace.id,
|
workspace_id=self.context.workspace.id,
|
||||||
dashboard_id=dashboard_details.id,
|
dashboard_id=dashboard_details.id,
|
||||||
|
@ -44,7 +44,7 @@ from metadata.ingestion.models.ometa_classification import OMetaTagAndClassifica
|
|||||||
from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource
|
from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource
|
||||||
from metadata.utils import fqn, tag_utils
|
from metadata.utils import fqn, tag_utils
|
||||||
from metadata.utils.filters import filter_by_chart
|
from metadata.utils.filters import filter_by_chart
|
||||||
from metadata.utils.helpers import get_standard_chart_type
|
from metadata.utils.helpers import clean_uri, get_standard_chart_type
|
||||||
from metadata.utils.logger import ingestion_logger
|
from metadata.utils.logger import ingestion_logger
|
||||||
|
|
||||||
logger = ingestion_logger()
|
logger = ingestion_logger()
|
||||||
@ -155,9 +155,15 @@ class RedashSource(DashboardServiceSource):
|
|||||||
if version.parse(self.service_connection.redashVersion) > version.parse(
|
if version.parse(self.service_connection.redashVersion) > version.parse(
|
||||||
INCOMPATIBLE_REDASH_VERSION
|
INCOMPATIBLE_REDASH_VERSION
|
||||||
):
|
):
|
||||||
dashboard_url = f"/dashboards/{dashboard_details.get('id', '')}"
|
dashboard_url = (
|
||||||
|
f"{clean_uri(self.service_connection.hostPort)}/dashboards"
|
||||||
|
f"/{dashboard_details.get('id', '')}"
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
dashboard_url = f"/dashboards/{dashboard_details.get('slug', '')}"
|
dashboard_url = (
|
||||||
|
f"{clean_uri(self.service_connection.hostPort)}/dashboards"
|
||||||
|
f"/{dashboard_details.get('slug', '')}"
|
||||||
|
)
|
||||||
return dashboard_url
|
return dashboard_url
|
||||||
|
|
||||||
def yield_dashboard(
|
def yield_dashboard(
|
||||||
|
@ -21,7 +21,7 @@ from metadata.generated.schema.entity.data.chart import Chart, ChartType
|
|||||||
from metadata.generated.schema.entity.data.table import Table
|
from metadata.generated.schema.entity.data.table import Table
|
||||||
from metadata.ingestion.source.dashboard.superset.mixin import SupersetSourceMixin
|
from metadata.ingestion.source.dashboard.superset.mixin import SupersetSourceMixin
|
||||||
from metadata.utils import fqn
|
from metadata.utils import fqn
|
||||||
from metadata.utils.helpers import get_standard_chart_type
|
from metadata.utils.helpers import clean_uri, get_standard_chart_type
|
||||||
from metadata.utils.logger import ingestion_logger
|
from metadata.utils.logger import ingestion_logger
|
||||||
|
|
||||||
logger = ingestion_logger()
|
logger = ingestion_logger()
|
||||||
@ -70,7 +70,7 @@ class SupersetAPISource(SupersetSourceMixin):
|
|||||||
name=dashboard_details["id"],
|
name=dashboard_details["id"],
|
||||||
displayName=dashboard_details["dashboard_title"],
|
displayName=dashboard_details["dashboard_title"],
|
||||||
description="",
|
description="",
|
||||||
dashboardUrl=dashboard_details["url"],
|
dashboardUrl=f"{clean_uri(self.service_connection.hostPort)}{dashboard_details['url']}",
|
||||||
charts=[
|
charts=[
|
||||||
fqn.build(
|
fqn.build(
|
||||||
self.metadata,
|
self.metadata,
|
||||||
@ -110,7 +110,7 @@ class SupersetAPISource(SupersetSourceMixin):
|
|||||||
chartType=get_standard_chart_type(
|
chartType=get_standard_chart_type(
|
||||||
chart_json.get("viz_type", ChartType.Other.value)
|
chart_json.get("viz_type", ChartType.Other.value)
|
||||||
),
|
),
|
||||||
chartUrl=chart_json.get("url"),
|
chartUrl=f"{clean_uri(self.service_connection.hostPort)}{chart_json.get('url')}",
|
||||||
service=self.context.dashboard_service.fullyQualifiedName.__root__,
|
service=self.context.dashboard_service.fullyQualifiedName.__root__,
|
||||||
)
|
)
|
||||||
yield chart
|
yield chart
|
||||||
|
@ -33,7 +33,7 @@ from metadata.ingestion.source.dashboard.superset.queries import (
|
|||||||
FETCH_DASHBOARDS,
|
FETCH_DASHBOARDS,
|
||||||
)
|
)
|
||||||
from metadata.utils import fqn
|
from metadata.utils import fqn
|
||||||
from metadata.utils.helpers import get_standard_chart_type
|
from metadata.utils.helpers import clean_uri, get_standard_chart_type
|
||||||
from metadata.utils.logger import ingestion_logger
|
from metadata.utils.logger import ingestion_logger
|
||||||
|
|
||||||
logger = ingestion_logger()
|
logger = ingestion_logger()
|
||||||
@ -76,7 +76,7 @@ class SupersetDBSource(SupersetSourceMixin):
|
|||||||
name=dashboard_details["id"],
|
name=dashboard_details["id"],
|
||||||
displayName=dashboard_details["dashboard_title"],
|
displayName=dashboard_details["dashboard_title"],
|
||||||
description="",
|
description="",
|
||||||
dashboardUrl=f"/superset/dashboard/{dashboard_details['id']}/",
|
dashboardUrl=f"{clean_uri(self.service_connection.hostPort)}/superset/dashboard/{dashboard_details['id']}/",
|
||||||
charts=[
|
charts=[
|
||||||
fqn.build(
|
fqn.build(
|
||||||
self.metadata,
|
self.metadata,
|
||||||
@ -116,7 +116,7 @@ class SupersetDBSource(SupersetSourceMixin):
|
|||||||
chartType=get_standard_chart_type(
|
chartType=get_standard_chart_type(
|
||||||
chart_json.get("viz_type", ChartType.Other.value)
|
chart_json.get("viz_type", ChartType.Other.value)
|
||||||
),
|
),
|
||||||
chartUrl=f"/explore/?slice_id={chart_json['id']}",
|
chartUrl=f"{clean_uri(self.service_connection.hostPort)}/explore/?slice_id={chart_json['id']}",
|
||||||
service=self.context.dashboard_service.fullyQualifiedName.__root__,
|
service=self.context.dashboard_service.fullyQualifiedName.__root__,
|
||||||
)
|
)
|
||||||
yield chart
|
yield chart
|
||||||
|
@ -89,7 +89,7 @@ class SupersetSourceMixin(DashboardServiceSource):
|
|||||||
|
|
||||||
def get_owner_details(self, dashboard_details: dict) -> EntityReference:
|
def get_owner_details(self, dashboard_details: dict) -> EntityReference:
|
||||||
for owner in dashboard_details.get("owners", []):
|
for owner in dashboard_details.get("owners", []):
|
||||||
user = self._get_user_by_email(owner["email"])
|
user = self._get_user_by_email(owner.get("email"))
|
||||||
if user:
|
if user:
|
||||||
return user
|
return user
|
||||||
if dashboard_details.get("email"):
|
if dashboard_details.get("email"):
|
||||||
|
@ -41,6 +41,7 @@ from metadata.ingestion.api.source import InvalidSourceException
|
|||||||
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
|
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
|
||||||
from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource
|
from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource
|
||||||
from metadata.utils import fqn
|
from metadata.utils import fqn
|
||||||
|
from metadata.utils.helpers import clean_uri
|
||||||
from metadata.utils.logger import ingestion_logger
|
from metadata.utils.logger import ingestion_logger
|
||||||
|
|
||||||
logger = ingestion_logger()
|
logger = ingestion_logger()
|
||||||
@ -103,7 +104,8 @@ class AirbyteSource(PipelineServiceSource):
|
|||||||
:return: Create Pipeline request with tasks
|
:return: Create Pipeline request with tasks
|
||||||
"""
|
"""
|
||||||
connection_url = (
|
connection_url = (
|
||||||
f"/workspaces/{pipeline_details.workspace.get('workspaceId')}"
|
f"{clean_uri(self.service_connection.hostPort)}/workspaces"
|
||||||
|
f"/{pipeline_details.workspace.get('workspaceId')}"
|
||||||
f"/connections/{pipeline_details.connection.get('connectionId')}"
|
f"/connections/{pipeline_details.connection.get('connectionId')}"
|
||||||
)
|
)
|
||||||
pipeline_request = CreatePipelineRequest(
|
pipeline_request = CreatePipelineRequest(
|
||||||
|
@ -46,7 +46,7 @@ from metadata.ingestion.connections.session import create_and_bind_session
|
|||||||
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
|
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
|
||||||
from metadata.ingestion.source.pipeline.airflow.lineage_parser import get_xlets_from_dag
|
from metadata.ingestion.source.pipeline.airflow.lineage_parser import get_xlets_from_dag
|
||||||
from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource
|
from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource
|
||||||
from metadata.utils.helpers import datetime_to_ts
|
from metadata.utils.helpers import clean_uri, datetime_to_ts
|
||||||
from metadata.utils.logger import ingestion_logger
|
from metadata.utils.logger import ingestion_logger
|
||||||
|
|
||||||
logger = ingestion_logger()
|
logger = ingestion_logger()
|
||||||
@ -277,7 +277,7 @@ class AirflowSource(PipelineServiceSource):
|
|||||||
return pipeline_details.dag_id
|
return pipeline_details.dag_id
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def get_tasks_from_dag(dag: SerializedDAG) -> List[Task]:
|
def get_tasks_from_dag(dag: SerializedDAG, host_port: str) -> List[Task]:
|
||||||
"""
|
"""
|
||||||
Obtain the tasks from a SerializedDAG
|
Obtain the tasks from a SerializedDAG
|
||||||
:param dag: SerializedDAG
|
:param dag: SerializedDAG
|
||||||
@ -287,8 +287,10 @@ class AirflowSource(PipelineServiceSource):
|
|||||||
Task(
|
Task(
|
||||||
name=task.task_id,
|
name=task.task_id,
|
||||||
description=task.doc_md,
|
description=task.doc_md,
|
||||||
# Just the suffix
|
taskUrl=(
|
||||||
taskUrl=f"/taskinstance/list/?flt1_dag_id_equals={dag.dag_id}&_flt_3_task_id={task.task_id}",
|
f"{clean_uri(host_port)}/taskinstance/list/"
|
||||||
|
f"?flt1_dag_id_equals={dag.dag_id}&_flt_3_task_id={task.task_id}"
|
||||||
|
),
|
||||||
downstreamTasks=list(task.downstream_task_ids),
|
downstreamTasks=list(task.downstream_task_ids),
|
||||||
taskType=task.task_type,
|
taskType=task.task_type,
|
||||||
startDate=task.start_date.isoformat() if task.start_date else None,
|
startDate=task.start_date.isoformat() if task.start_date else None,
|
||||||
@ -304,7 +306,6 @@ class AirflowSource(PipelineServiceSource):
|
|||||||
:param data: from SQA query
|
:param data: from SQA query
|
||||||
:return: SerializedDAG
|
:return: SerializedDAG
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if isinstance(data, dict):
|
if isinstance(data, dict):
|
||||||
return SerializedDAG.from_dict(data)
|
return SerializedDAG.from_dict(data)
|
||||||
|
|
||||||
@ -324,11 +325,11 @@ class AirflowSource(PipelineServiceSource):
|
|||||||
pipeline_request = CreatePipelineRequest(
|
pipeline_request = CreatePipelineRequest(
|
||||||
name=pipeline_details.dag_id,
|
name=pipeline_details.dag_id,
|
||||||
description=dag.description,
|
description=dag.description,
|
||||||
pipelineUrl=f"/tree?dag_id={dag.dag_id}", # Just the suffix
|
pipelineUrl=f"{clean_uri(self.service_connection.hostPort)}/tree?dag_id={dag.dag_id}",
|
||||||
concurrency=dag.concurrency,
|
concurrency=dag.concurrency,
|
||||||
pipelineLocation=pipeline_details.fileloc,
|
pipelineLocation=pipeline_details.fileloc,
|
||||||
startDate=dag.start_date.isoformat() if dag.start_date else None,
|
startDate=dag.start_date.isoformat() if dag.start_date else None,
|
||||||
tasks=self.get_tasks_from_dag(dag),
|
tasks=self.get_tasks_from_dag(dag, self.service_connection.hostPort),
|
||||||
service=self.context.pipeline_service.fullyQualifiedName.__root__,
|
service=self.context.pipeline_service.fullyQualifiedName.__root__,
|
||||||
)
|
)
|
||||||
yield pipeline_request
|
yield pipeline_request
|
||||||
|
@ -26,13 +26,14 @@ from metadata.generated.schema.entity.services.connections.pipeline.dagsterConne
|
|||||||
from metadata.ingestion.connections.test_connections import test_connection_steps
|
from metadata.ingestion.connections.test_connections import test_connection_steps
|
||||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||||
from metadata.ingestion.source.pipeline.dagster.queries import TEST_QUERY_GRAPHQL
|
from metadata.ingestion.source.pipeline.dagster.queries import TEST_QUERY_GRAPHQL
|
||||||
|
from metadata.utils.helpers import clean_uri
|
||||||
|
|
||||||
|
|
||||||
def get_connection(connection: DagsterConnection) -> DagsterGraphQLClient:
|
def get_connection(connection: DagsterConnection) -> DagsterGraphQLClient:
|
||||||
"""
|
"""
|
||||||
Create connection
|
Create connection
|
||||||
"""
|
"""
|
||||||
url = connection.host
|
url = clean_uri(connection.host)
|
||||||
dagster_connection = DagsterGraphQLClient(
|
dagster_connection = DagsterGraphQLClient(
|
||||||
url,
|
url,
|
||||||
transport=RequestsHTTPTransport(
|
transport=RequestsHTTPTransport(
|
||||||
|
@ -31,6 +31,7 @@ from metadata.generated.schema.metadataIngestion.workflow import (
|
|||||||
from metadata.ingestion.api.source import InvalidSourceException
|
from metadata.ingestion.api.source import InvalidSourceException
|
||||||
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
|
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
|
||||||
from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource
|
from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource
|
||||||
|
from metadata.utils.helpers import clean_uri
|
||||||
from metadata.utils.logger import ingestion_logger
|
from metadata.utils.logger import ingestion_logger
|
||||||
|
|
||||||
logger = ingestion_logger()
|
logger = ingestion_logger()
|
||||||
@ -113,7 +114,7 @@ class NifiSource(PipelineServiceSource):
|
|||||||
Task(
|
Task(
|
||||||
name=processor.id_,
|
name=processor.id_,
|
||||||
displayName=processor.name,
|
displayName=processor.name,
|
||||||
taskUrl=processor.uri.replace(self.service_connection.hostPort, ""),
|
taskUrl=f"{clean_uri(self.service_connection.hostPort)}{processor.uri}",
|
||||||
taskType=processor.type_,
|
taskType=processor.type_,
|
||||||
downstreamTasks=self._get_downstream_tasks_from(
|
downstreamTasks=self._get_downstream_tasks_from(
|
||||||
source_id=processor.id_,
|
source_id=processor.id_,
|
||||||
@ -140,9 +141,7 @@ class NifiSource(PipelineServiceSource):
|
|||||||
pipeline_request = CreatePipelineRequest(
|
pipeline_request = CreatePipelineRequest(
|
||||||
name=pipeline_details.id_,
|
name=pipeline_details.id_,
|
||||||
displayName=pipeline_details.name,
|
displayName=pipeline_details.name,
|
||||||
pipelineUrl=pipeline_details.uri.replace(
|
pipelineUrl=f"{clean_uri(self.service_connection.hostPort)}{pipeline_details.uri}",
|
||||||
self.service_connection.hostPort, ""
|
|
||||||
),
|
|
||||||
tasks=self._get_tasks_from_details(pipeline_details),
|
tasks=self._get_tasks_from_details(pipeline_details),
|
||||||
service=self.context.pipeline_service.fullyQualifiedName.__root__,
|
service=self.context.pipeline_service.fullyQualifiedName.__root__,
|
||||||
)
|
)
|
||||||
|
@ -343,3 +343,12 @@ def format_large_string_numbers(number: Union[float, int]) -> str:
|
|||||||
constant_k = 1000.0
|
constant_k = 1000.0
|
||||||
magnitude = int(floor(log(abs(number), constant_k)))
|
magnitude = int(floor(log(abs(number), constant_k)))
|
||||||
return f"{number / constant_k**magnitude:.2f}{units[magnitude]}"
|
return f"{number / constant_k**magnitude:.2f}{units[magnitude]}"
|
||||||
|
|
||||||
|
|
||||||
|
def clean_uri(uri: str) -> str:
|
||||||
|
"""
|
||||||
|
if uri is like http://localhost:9000/
|
||||||
|
then remove the end / and
|
||||||
|
make it http://localhost:9000
|
||||||
|
"""
|
||||||
|
return uri[:-1] if uri.endswith("/") else uri
|
||||||
|
@ -287,7 +287,7 @@ class LookerUnitTest(TestCase):
|
|||||||
displayName="title1",
|
displayName="title1",
|
||||||
description="description",
|
description="description",
|
||||||
charts=[],
|
charts=[],
|
||||||
dashboardUrl="/dashboards/1",
|
dashboardUrl="https://my-looker.com/dashboards/1",
|
||||||
service=self.looker.context.dashboard_service.fullyQualifiedName.__root__,
|
service=self.looker.context.dashboard_service.fullyQualifiedName.__root__,
|
||||||
owner=None,
|
owner=None,
|
||||||
)
|
)
|
||||||
@ -419,7 +419,7 @@ class LookerUnitTest(TestCase):
|
|||||||
displayName="chart_title1",
|
displayName="chart_title1",
|
||||||
description="subtitle; Some body text; Some note",
|
description="subtitle; Some body text; Some note",
|
||||||
chartType=ChartType.Line,
|
chartType=ChartType.Line,
|
||||||
chartUrl="/dashboard_elements/chart_id1",
|
chartUrl="https://my-looker.com/dashboard_elements/chart_id1",
|
||||||
service=self.looker.context.dashboard_service.fullyQualifiedName.__root__,
|
service=self.looker.context.dashboard_service.fullyQualifiedName.__root__,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -146,7 +146,7 @@ EXPECTED_DASH = CreateDashboardRequest(
|
|||||||
name=14,
|
name=14,
|
||||||
displayName="My DASH",
|
displayName="My DASH",
|
||||||
description="",
|
description="",
|
||||||
dashboardUrl="/superset/dashboard/14/",
|
dashboardUrl="https://my-superset.com/superset/dashboard/14/",
|
||||||
charts=[chart.fullyQualifiedName for chart in EXPECTED_CHATRT_ENTITY],
|
charts=[chart.fullyQualifiedName for chart in EXPECTED_CHATRT_ENTITY],
|
||||||
service=EXPECTED_DASH_SERVICE.fullyQualifiedName,
|
service=EXPECTED_DASH_SERVICE.fullyQualifiedName,
|
||||||
)
|
)
|
||||||
@ -156,7 +156,7 @@ EXPECTED_CHART = CreateChartRequest(
|
|||||||
displayName="% Rural",
|
displayName="% Rural",
|
||||||
description="TEST DESCRIPTION",
|
description="TEST DESCRIPTION",
|
||||||
chartType=ChartType.Other.value,
|
chartType=ChartType.Other.value,
|
||||||
chartUrl="/explore/?slice_id=37",
|
chartUrl="https://my-superset.com/explore/?slice_id=37",
|
||||||
service=EXPECTED_DASH_SERVICE.fullyQualifiedName,
|
service=EXPECTED_DASH_SERVICE.fullyQualifiedName,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -11,6 +11,7 @@
|
|||||||
"""
|
"""
|
||||||
Test Airbyte using the topology
|
Test Airbyte using the topology
|
||||||
"""
|
"""
|
||||||
|
# pylint: disable=line-too-long
|
||||||
import json
|
import json
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from unittest import TestCase
|
from unittest import TestCase
|
||||||
@ -39,11 +40,12 @@ from metadata.ingestion.source.pipeline.airbyte.metadata import (
|
|||||||
AirbytePipelineDetails,
|
AirbytePipelineDetails,
|
||||||
AirbyteSource,
|
AirbyteSource,
|
||||||
)
|
)
|
||||||
|
from metadata.utils.constants import UTF_8
|
||||||
|
|
||||||
mock_file_path = (
|
mock_file_path = (
|
||||||
Path(__file__).parent.parent.parent / "resources/datasets/airbyte_dataset.json"
|
Path(__file__).parent.parent.parent / "resources/datasets/airbyte_dataset.json"
|
||||||
)
|
)
|
||||||
with open(mock_file_path) as file:
|
with open(mock_file_path, encoding=UTF_8) as file:
|
||||||
mock_data: dict = json.load(file)
|
mock_data: dict = json.load(file)
|
||||||
|
|
||||||
mock_airbyte_config = {
|
mock_airbyte_config = {
|
||||||
@ -72,8 +74,10 @@ EXPECTED_ARIBYTE_DETAILS = AirbytePipelineDetails(
|
|||||||
workspace=mock_data["workspace"][0], connection=mock_data["connection"][0]
|
workspace=mock_data["workspace"][0], connection=mock_data["connection"][0]
|
||||||
)
|
)
|
||||||
|
|
||||||
MOCK_CONNECTION_URI_PATH = "/workspaces/af5680ec-2687-4fe0-bd55-5ad5f020a603/connections/a10f6d82-4fc6-4c90-ba04-bb773c8fbb0f"
|
MOCK_CONNECTION_URI_PATH = (
|
||||||
MOCK_LOG_URL = f"http://localhost:1234{MOCK_CONNECTION_URI_PATH}"
|
"http://localhost:1234/workspaces/af5680ec-2687-4fe0-bd55-5ad5f020a603/"
|
||||||
|
"connections/a10f6d82-4fc6-4c90-ba04-bb773c8fbb0f"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
EXPECTED_PIPELINE_STATUS = [
|
EXPECTED_PIPELINE_STATUS = [
|
||||||
@ -87,7 +91,7 @@ EXPECTED_PIPELINE_STATUS = [
|
|||||||
executionStatus=StatusType.Pending.value,
|
executionStatus=StatusType.Pending.value,
|
||||||
startTime=1655482894,
|
startTime=1655482894,
|
||||||
endTime=None,
|
endTime=None,
|
||||||
logLink=f"{MOCK_LOG_URL}/status",
|
logLink=f"{MOCK_CONNECTION_URI_PATH}/status",
|
||||||
)
|
)
|
||||||
],
|
],
|
||||||
timestamp=1655482894,
|
timestamp=1655482894,
|
||||||
@ -103,7 +107,7 @@ EXPECTED_PIPELINE_STATUS = [
|
|||||||
executionStatus=StatusType.Successful.value,
|
executionStatus=StatusType.Successful.value,
|
||||||
startTime=1655393914,
|
startTime=1655393914,
|
||||||
endTime=1655394054,
|
endTime=1655394054,
|
||||||
logLink=f"{MOCK_LOG_URL}/status",
|
logLink=f"{MOCK_CONNECTION_URI_PATH}/status",
|
||||||
)
|
)
|
||||||
],
|
],
|
||||||
timestamp=1655393914,
|
timestamp=1655393914,
|
||||||
|
@ -11,6 +11,7 @@
|
|||||||
"""
|
"""
|
||||||
Test nifi using the topology
|
Test nifi using the topology
|
||||||
"""
|
"""
|
||||||
|
# pylint: disable=line-too-long
|
||||||
import json
|
import json
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from unittest import TestCase
|
from unittest import TestCase
|
||||||
@ -34,17 +35,18 @@ from metadata.ingestion.source.pipeline.nifi.metadata import (
|
|||||||
NifiProcessorConnections,
|
NifiProcessorConnections,
|
||||||
NifiSource,
|
NifiSource,
|
||||||
)
|
)
|
||||||
|
from metadata.utils.constants import UTF_8
|
||||||
|
|
||||||
mock_file_path = (
|
mock_file_path = (
|
||||||
Path(__file__).parent.parent.parent / "resources/datasets/nifi_process_group.json"
|
Path(__file__).parent.parent.parent / "resources/datasets/nifi_process_group.json"
|
||||||
)
|
)
|
||||||
with open(mock_file_path) as file:
|
with open(mock_file_path, encoding=UTF_8) as file:
|
||||||
mock_data: dict = json.load(file)
|
mock_data: dict = json.load(file)
|
||||||
|
|
||||||
resources_mock_file_path = (
|
resources_mock_file_path = (
|
||||||
Path(__file__).parent.parent.parent / "resources/datasets/nifi_resources.json"
|
Path(__file__).parent.parent.parent / "resources/datasets/nifi_resources.json"
|
||||||
)
|
)
|
||||||
with open(mock_file_path) as file:
|
with open(mock_file_path, encoding=UTF_8) as file:
|
||||||
resources_mock_data: dict = json.load(file)
|
resources_mock_data: dict = json.load(file)
|
||||||
|
|
||||||
mock_nifi_config = {
|
mock_nifi_config = {
|
||||||
@ -108,19 +110,28 @@ EXPECTED_NIFI_DETAILS = NifiPipelineDetails(
|
|||||||
EXPECTED_CREATED_PIPELINES = CreatePipelineRequest(
|
EXPECTED_CREATED_PIPELINES = CreatePipelineRequest(
|
||||||
name="d3d6b945-0182-1000-d7e4-d81b8f79f310",
|
name="d3d6b945-0182-1000-d7e4-d81b8f79f310",
|
||||||
displayName="NiFi Flow",
|
displayName="NiFi Flow",
|
||||||
pipelineUrl="/nifi-api/flow/process-groups/d3d6b945-0182-1000-d7e4-d81b8f79f310",
|
pipelineUrl=(
|
||||||
|
"https://localhost:8443/nifi-api/flow/"
|
||||||
|
"process-groups/d3d6b945-0182-1000-d7e4-d81b8f79f310"
|
||||||
|
),
|
||||||
tasks=[
|
tasks=[
|
||||||
Task(
|
Task(
|
||||||
name="d3f023ac-0182-1000-8bbe-e2b00347fff8",
|
name="d3f023ac-0182-1000-8bbe-e2b00347fff8",
|
||||||
displayName="FetchFile",
|
displayName="FetchFile",
|
||||||
taskUrl="/nifi-api/processors/d3f023ac-0182-1000-8bbe-e2b00347fff8",
|
taskUrl=(
|
||||||
|
"https://localhost:8443/nifi-api/"
|
||||||
|
"processors/d3f023ac-0182-1000-8bbe-e2b00347fff8"
|
||||||
|
),
|
||||||
taskType="org.apache.nifi.processors.standard.FetchFile",
|
taskType="org.apache.nifi.processors.standard.FetchFile",
|
||||||
downstreamTasks=[],
|
downstreamTasks=[],
|
||||||
),
|
),
|
||||||
Task(
|
Task(
|
||||||
name="d3f1304d-0182-1000-f0f5-9a6927976941",
|
name="d3f1304d-0182-1000-f0f5-9a6927976941",
|
||||||
displayName="ListFile",
|
displayName="ListFile",
|
||||||
taskUrl="/nifi-api/processors/d3f1304d-0182-1000-f0f5-9a6927976941",
|
taskUrl=(
|
||||||
|
"https://localhost:8443/nifi-api/"
|
||||||
|
"processors/d3f1304d-0182-1000-f0f5-9a6927976941"
|
||||||
|
),
|
||||||
taskType="org.apache.nifi.processors.standard.ListFile",
|
taskType="org.apache.nifi.processors.standard.ListFile",
|
||||||
downstreamTasks=["d3f023ac-0182-1000-8bbe-e2b00347fff8"],
|
downstreamTasks=["d3f023ac-0182-1000-8bbe-e2b00347fff8"],
|
||||||
),
|
),
|
||||||
@ -141,19 +152,28 @@ MOCK_PIPELINE = Pipeline(
|
|||||||
name="d3d6b945-0182-1000-d7e4-d81b8f79f310",
|
name="d3d6b945-0182-1000-d7e4-d81b8f79f310",
|
||||||
fullyQualifiedName="nifi_source.d3d6b945-0182-1000-d7e4-d81b8f79f310",
|
fullyQualifiedName="nifi_source.d3d6b945-0182-1000-d7e4-d81b8f79f310",
|
||||||
displayName="NiFi Flow",
|
displayName="NiFi Flow",
|
||||||
pipelineUrl="/nifi-api/flow/process-groups/d3d6b945-0182-1000-d7e4-d81b8f79f310",
|
pipelineUrl=(
|
||||||
|
"https://localhost:8443/nifi-api/flow/"
|
||||||
|
"process-groups/d3d6b945-0182-1000-d7e4-d81b8f79f310"
|
||||||
|
),
|
||||||
tasks=[
|
tasks=[
|
||||||
Task(
|
Task(
|
||||||
name="d3f023ac-0182-1000-8bbe-e2b00347fff8",
|
name="d3f023ac-0182-1000-8bbe-e2b00347fff8",
|
||||||
displayName="FetchFile",
|
displayName="FetchFile",
|
||||||
taskUrl="/nifi-api/processors/d3f023ac-0182-1000-8bbe-e2b00347fff8",
|
taskUrl=(
|
||||||
|
"https://localhost:8443/nifi-api/processors/"
|
||||||
|
"d3f023ac-0182-1000-8bbe-e2b00347fff8"
|
||||||
|
),
|
||||||
taskType="org.apache.nifi.processors.standard.FetchFile",
|
taskType="org.apache.nifi.processors.standard.FetchFile",
|
||||||
downstreamTasks=[],
|
downstreamTasks=[],
|
||||||
),
|
),
|
||||||
Task(
|
Task(
|
||||||
name="d3f1304d-0182-1000-f0f5-9a6927976941",
|
name="d3f1304d-0182-1000-f0f5-9a6927976941",
|
||||||
displayName="ListFile",
|
displayName="ListFile",
|
||||||
taskUrl="/nifi-api/processors/d3f1304d-0182-1000-f0f5-9a6927976941",
|
taskUrl=(
|
||||||
|
"https://localhost:8443/nifi-api/processors/"
|
||||||
|
"d3f1304d-0182-1000-f0f5-9a6927976941"
|
||||||
|
),
|
||||||
taskType="org.apache.nifi.processors.standard.ListFile",
|
taskType="org.apache.nifi.processors.standard.ListFile",
|
||||||
downstreamTasks=["d3f023ac-0182-1000-8bbe-e2b00347fff8"],
|
downstreamTasks=["d3f023ac-0182-1000-8bbe-e2b00347fff8"],
|
||||||
),
|
),
|
||||||
@ -165,6 +185,10 @@ MOCK_PIPELINE = Pipeline(
|
|||||||
|
|
||||||
|
|
||||||
class NifiUnitTest(TestCase):
|
class NifiUnitTest(TestCase):
|
||||||
|
"""
|
||||||
|
Nifi unit tests
|
||||||
|
"""
|
||||||
|
|
||||||
@patch(
|
@patch(
|
||||||
"metadata.ingestion.source.pipeline.pipeline_service.PipelineServiceSource.test_connection"
|
"metadata.ingestion.source.pipeline.pipeline_service.PipelineServiceSource.test_connection"
|
||||||
)
|
)
|
||||||
|
@ -23,9 +23,7 @@ import java.util.Collections;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import org.openmetadata.schema.entity.data.Dashboard;
|
import org.openmetadata.schema.entity.data.Dashboard;
|
||||||
import org.openmetadata.schema.entity.services.DashboardService;
|
import org.openmetadata.schema.entity.services.DashboardService;
|
||||||
import org.openmetadata.schema.type.EntityReference;
|
import org.openmetadata.schema.type.*;
|
||||||
import org.openmetadata.schema.type.Include;
|
|
||||||
import org.openmetadata.schema.type.Relationship;
|
|
||||||
import org.openmetadata.service.Entity;
|
import org.openmetadata.service.Entity;
|
||||||
import org.openmetadata.service.jdbi3.CollectionDAO.EntityRelationshipRecord;
|
import org.openmetadata.service.jdbi3.CollectionDAO.EntityRelationshipRecord;
|
||||||
import org.openmetadata.service.resources.dashboards.DashboardResource;
|
import org.openmetadata.service.resources.dashboards.DashboardResource;
|
||||||
@ -37,6 +35,8 @@ public class DashboardRepository extends EntityRepository<Dashboard> {
|
|||||||
private static final String DASHBOARD_UPDATE_FIELDS = "owner,tags,charts,extension,followers,dataModels";
|
private static final String DASHBOARD_UPDATE_FIELDS = "owner,tags,charts,extension,followers,dataModels";
|
||||||
private static final String DASHBOARD_PATCH_FIELDS = "owner,tags,charts,extension,followers,dataModels";
|
private static final String DASHBOARD_PATCH_FIELDS = "owner,tags,charts,extension,followers,dataModels";
|
||||||
|
|
||||||
|
private static final String DASHBOARD_URL = "dashboardUrl";
|
||||||
|
|
||||||
public DashboardRepository(CollectionDAO dao) {
|
public DashboardRepository(CollectionDAO dao) {
|
||||||
super(
|
super(
|
||||||
DashboardResource.COLLECTION_PATH,
|
DashboardResource.COLLECTION_PATH,
|
||||||
@ -162,6 +162,7 @@ public class DashboardRepository extends EntityRepository<Dashboard> {
|
|||||||
"dataModels",
|
"dataModels",
|
||||||
listOrEmpty(updated.getDataModels()),
|
listOrEmpty(updated.getDataModels()),
|
||||||
listOrEmpty(original.getDataModels()));
|
listOrEmpty(original.getDataModels()));
|
||||||
|
updateDashboardUrl(original, updated);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void update(
|
private void update(
|
||||||
@ -179,5 +180,9 @@ public class DashboardRepository extends EntityRepository<Dashboard> {
|
|||||||
List<EntityReference> deleted = new ArrayList<>();
|
List<EntityReference> deleted = new ArrayList<>();
|
||||||
recordListChange(field, oriEntities, updEntities, added, deleted, EntityUtil.entityReferenceMatch);
|
recordListChange(field, oriEntities, updEntities, added, deleted, EntityUtil.entityReferenceMatch);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void updateDashboardUrl(Dashboard original, Dashboard updated) throws IOException {
|
||||||
|
recordChange(DASHBOARD_URL, original.getDashboardUrl(), updated.getDashboardUrl());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user