From 3a3f248edd2bbcbb8f9a17f74a4aa5ca40cd4945 Mon Sep 17 00:00:00 2001 From: Steve Russo <64294847+sjrusso8@users.noreply.github.com> Date: Thu, 5 Jan 2023 08:14:12 -0500 Subject: [PATCH] Issue 5592: Add Databricks Pipeline Connector (#9554) * feat: update databricks_client * feat: create pipelineconnection & connection * test: add unit test * feat: add examples file * fix: remove unused imports * style: fix python style * fix: code review adjustments * fix: remove unneeded schema ref * fix: format requirements * fix: change context calls * test: add pipeline_status test * format: pylint format change Co-authored-by: Pere Miquel Brull --- .../workflows/databricks_pipeline.yaml | 20 ++ .../source/database/databricks/client.py | 90 +++++- .../pipeline/databrickspipeline/__init__.py | 0 .../pipeline/databrickspipeline/connection.py | 37 +++ .../pipeline/databrickspipeline/metadata.py | 231 +++++++++++++++ .../datasets/databricks_pipeline_history.json | 174 ++++++++++++ .../databricks_pipeline_resource.json | 120 ++++++++ .../pipeline/test_databricks_pipeline.py | 265 ++++++++++++++++++ .../databricksPipelineConnection.json | 55 ++++ .../entity/services/pipelineService.json | 21 +- 10 files changed, 1005 insertions(+), 8 deletions(-) create mode 100644 ingestion/src/metadata/examples/workflows/databricks_pipeline.yaml create mode 100644 ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/__init__.py create mode 100644 ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/connection.py create mode 100644 ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/metadata.py create mode 100644 ingestion/tests/unit/resources/datasets/databricks_pipeline_history.json create mode 100644 ingestion/tests/unit/resources/datasets/databricks_pipeline_resource.json create mode 100644 ingestion/tests/unit/topology/pipeline/test_databricks_pipeline.py create mode 100644 openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/databricksPipelineConnection.json diff --git a/ingestion/src/metadata/examples/workflows/databricks_pipeline.yaml b/ingestion/src/metadata/examples/workflows/databricks_pipeline.yaml new file mode 100644 index 00000000000..00b59a2e808 --- /dev/null +++ b/ingestion/src/metadata/examples/workflows/databricks_pipeline.yaml @@ -0,0 +1,20 @@ +source: + type: databrickspipeline + serviceName: DatabricksPipeline + serviceConnection: + config: + type: DatabricksPipeline + token: + hostPort: localhost:443 + connectionArguments: + http_path: + sourceConfig: + config: + type: PipelineMetadata +sink: + type: metadata-rest + config: {} +workflowConfig: + openMetadataServerConfig: + hostPort: http://localhost:8585/api + authProvider: no-auth \ No newline at end of file diff --git a/ingestion/src/metadata/ingestion/source/database/databricks/client.py b/ingestion/src/metadata/ingestion/source/database/databricks/client.py index 11116a31fee..764622360d6 100644 --- a/ingestion/src/metadata/ingestion/source/database/databricks/client.py +++ b/ingestion/src/metadata/ingestion/source/database/databricks/client.py @@ -36,10 +36,14 @@ class DatabricksClient: def __init__(self, config: DatabricksConnection): self.config = config - base_url, _ = self.config.hostPort.split(":") + base_url, *_ = self.config.hostPort.split(":") api_version = "/api/2.0" + job_api_version = "/api/2.1" auth_token = self.config.token.get_secret_value() - self.base_url = f"https://{base_url}{api_version}/sql/history/queries" + self.base_query_url = f"https://{base_url}{api_version}/sql/history/queries" + self.base_job_url = f"https://{base_url}{job_api_version}/jobs" + self.jobs_list_url = f"{self.base_job_url}/list" + self.jobs_run_list_url = f"{self.base_job_url}/runs/list" self.headers = { "Authorization": f"Bearer {auth_token}", "Content-Type": "application/json", @@ -75,7 +79,7 @@ class DatabricksClient: } response = self.client.get( - self.base_url, + self.base_query_url, data=json.dumps(data), headers=self.headers, timeout=10, @@ -103,7 +107,7 @@ class DatabricksClient: if result[-1]["execution_end_time_ms"] <= end_time: response = self.client.get( - self.base_url, + self.base_query_url, data=json.dumps(data), headers=self.headers, timeout=10, @@ -122,3 +126,81 @@ class DatabricksClient: query_text.startswith(QUERY_WITH_DBT) or query_text.startswith(QUERY_WITH_OM_VERSION) ) + + def list_jobs(self) -> List[dict]: + """ + Method returns List all the created jobs in a Databricks Workspace + """ + job_list = [] + try: + data = {"limit": 25, "expand_tasks": True, "offset": 0} + + response = self.client.get( + self.jobs_list_url, + data=json.dumps(data), + headers=self.headers, + timeout=10, + ).json() + + job_list.extend(response["jobs"]) + + while response["has_more"]: + + data["offset"] = len(response["jobs"]) + + response = self.client.get( + self.jobs_list_url, + data=json.dumps(data), + headers=self.headers, + timeout=10, + ).json() + + job_list.extend(response["jobs"]) + + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.error(exc) + + return job_list + + def get_job_runs(self, job_id) -> List[dict]: + """ + Method returns List of all runs for a job by the specified job_id + """ + job_runs = [] + try: + params = { + "job_id": job_id, + "active_only": "false", + "completed_only": "true", + "run_type": "JOB_RUN", + "expand_tasks": "true", + } + + response = self.client.get( + self.jobs_run_list_url, + params=params, + headers=self.headers, + timeout=10, + ).json() + + job_runs.extend(response["runs"]) + + while response["has_more"]: + + params.update({"start_time_to": response["runs"][-1]["start_time"]}) + + response = self.client.get( + self.jobs_run_list_url, + params=params, + headers=self.headers, + timeout=10, + ).json() + + job_runs.extend(response["runs"]) + + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.error(exc) + + return job_runs diff --git a/ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/__init__.py b/ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/connection.py b/ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/connection.py new file mode 100644 index 00000000000..2dcd9b49f84 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/connection.py @@ -0,0 +1,37 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Source connection handler +""" +from metadata.generated.schema.entity.services.connections.pipeline.databricksPipelineConnection import ( + DatabricksPipelineConnection, +) +from metadata.ingestion.connections.test_connections import SourceConnectionException +from metadata.ingestion.source.database.databricks.client import DatabricksClient + + +def get_connection(connection: DatabricksPipelineConnection) -> DatabricksClient: + """ + Create connection + """ + return DatabricksClient(connection) + + +def test_connection(client: DatabricksClient) -> None: + """ + Test connection + """ + try: + client.list_jobs() + except Exception as exc: + msg = f"Unknown error connecting with {client}: {exc}." + raise SourceConnectionException(msg) from exc diff --git a/ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/metadata.py new file mode 100644 index 00000000000..5d4d7f1db2c --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/metadata.py @@ -0,0 +1,231 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Databricks pipeline source to extract metadata +""" + +import traceback +from typing import Any, Iterable, List, Optional + +from pydantic import ValidationError + +from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest +from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest +from metadata.generated.schema.entity.data.pipeline import ( + PipelineStatus, + StatusType, + Task, + TaskStatus, +) +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.generated.schema.entity.services.connections.pipeline.databricksPipelineConnection import ( + DatabricksPipelineConnection, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.api.source import InvalidSourceException +from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus +from metadata.ingestion.source.database.databricks.client import DatabricksClient +from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() + + +STATUS_MAP = { + "SUCCESS": StatusType.Successful, + "FAILED": StatusType.Failed, + "TIMEOUT": StatusType.Failed, + "CANCELED": StatusType.Failed, + "PENDING": StatusType.Pending, + "RUNNING": StatusType.Pending, + "TERMINATING": StatusType.Pending, + "SKIPPED": StatusType.Failed, + "INTERNAL_ERROR": StatusType.Failed, +} + + +class DatabrickspipelineSource(PipelineServiceSource): + """ + Implements the necessary methods ot extract + Pipeline metadata from Databricks Jobs API + """ + + def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection): + super().__init__(config, metadata_config) + self.connection = self.config.serviceConnection.__root__.config + self.client = DatabricksClient(self.connection) + + @classmethod + def create(cls, config_dict, metadata_config: OpenMetadataConnection): + """Create class instance""" + config: WorkflowSource = WorkflowSource.parse_obj(config_dict) + connection: DatabricksPipelineConnection = ( + config.serviceConnection.__root__.config + ) + if not isinstance(connection, DatabricksPipelineConnection): + raise InvalidSourceException( + f"Expected DatabricksPipelineConnection, but got {connection}" + ) + return cls(config, metadata_config) + + def get_pipelines_list(self) -> Iterable[dict]: + """ + Get List of all pipelines + """ + for workflow in self.client.list_jobs(): + yield workflow + + def get_pipeline_name(self, pipeline_details: dict) -> str: + """ + Get Pipeline Name + """ + return pipeline_details["settings"]["name"] + + def yield_pipeline(self, pipeline_details: Any) -> Iterable[CreatePipelineRequest]: + """ + Method to Get Pipeline Entity + """ + self.context.job_id_list = [] + try: + yield CreatePipelineRequest( + name=pipeline_details["job_id"], + displayName=pipeline_details["settings"]["name"], + description=pipeline_details["settings"]["name"], + tasks=self.get_tasks(pipeline_details), + service=EntityReference( + id=self.context.pipeline_service.id.__root__, type="pipelineService" + ), + ) + + except TypeError as err: + logger.debug(traceback.format_exc()) + logger.warning( + f"Error building Databricks Pipeline information from {pipeline_details}." + f" There might be Databricks Jobs API version incompatibilities - {err}" + ) + except ValidationError as err: + logger.debug(traceback.format_exc()) + logger.warning( + f"Error building pydantic model for {pipeline_details} - {err}" + ) + except Exception as err: + logger.debug(traceback.format_exc()) + logger.warning(f"Wild error ingesting pipeline {pipeline_details} - {err}") + + def get_tasks(self, pipeline_details: dict) -> List[Task]: + task_list = [] + self.append_context(key="job_id_list", value=pipeline_details["job_id"]) + + downstream_tasks = self.get_downstream_tasks( + pipeline_details["settings"]["tasks"] + ) + for task in pipeline_details["settings"]["tasks"]: + task_list.append( + Task( + name=task["task_key"], + displayName=task["task_key"], + taskType=self.get_task_type(task), + downstreamTasks=downstream_tasks.get(task["task_key"], []), + ) + ) + + return task_list + + def get_task_type(self, task): + task_key = "undefined_task_type" + for key in task.keys(): + if key.endswith("_task"): + task_key = key + + return task_key + + def get_downstream_tasks(self, workflow): + task_key_list = [task["task_key"] for task in workflow] + + dependent_tasks = self.get_dependent_tasks(workflow) + + downstream_tasks = {} + + for task_key, task_depend_ons in dependent_tasks.items(): + if task_depend_ons: + for task in task_depend_ons: + if task in downstream_tasks: + downstream_tasks[task].append(task_key) + else: + downstream_tasks[task] = [task_key] + + for task in task_key_list: + if task not in downstream_tasks: + downstream_tasks[task] = [] + + return downstream_tasks + + def get_dependent_tasks(self, workflow): + dependent_tasks = {} + + for task in workflow: + depends_on = task.get("depends_on") + if depends_on: + dependent_tasks[task["task_key"]] = [v["task_key"] for v in depends_on] + else: + dependent_tasks[task["task_key"]] = None + + return dependent_tasks + + def yield_pipeline_status(self, pipeline_details) -> Iterable[OMetaPipelineStatus]: + + for job_id in self.context.job_id_list: + try: + runs = self.client.get_job_runs(job_id=job_id) + for attempt in runs: + for task_run in attempt["tasks"]: + task_status = [] + task_status.append( + TaskStatus( + name=task_run["task_key"], + executionStatus=STATUS_MAP.get( + task_run["state"].get("result_state"), + StatusType.Failed, + ), + startTime=task_run["start_time"], + endTime=task_run["end_time"], + logLink=task_run["run_page_url"], + ) + ) + pipeline_status = PipelineStatus( + taskStatus=task_status, + timestamp=attempt["start_time"], + executionStatus=STATUS_MAP.get( + attempt["state"].get("result_state"), + StatusType.Failed, + ), + ) + yield OMetaPipelineStatus( + pipeline_fqn=self.context.pipeline.fullyQualifiedName.__root__, + pipeline_status=pipeline_status, + ) + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.error(f"Failed to yield pipeline status: {exc}") + + def yield_pipeline_lineage_details( + self, pipeline_details: Any + ) -> Optional[Iterable[AddLineageRequest]]: + """ + Get lineage between pipeline and data sources + """ + logger.info("Lineage is not yet supported on Databicks Pipelines") diff --git a/ingestion/tests/unit/resources/datasets/databricks_pipeline_history.json b/ingestion/tests/unit/resources/datasets/databricks_pipeline_history.json new file mode 100644 index 00000000000..bf9ec937cc0 --- /dev/null +++ b/ingestion/tests/unit/resources/datasets/databricks_pipeline_history.json @@ -0,0 +1,174 @@ +[ + { + "job_id": 325697581681107, + "run_id": 820960, + "creator_user_name": "user_name", + "number_in_job": 820960, + "original_attempt_run_id": 820960, + "state": { + "life_cycle_state": "TERMINATED", + "result_state": "SUCCESS", + "state_message": "", + "user_cancelled_or_timedout": false + }, + "schedule": { + "quartz_cron_expression": "30 1/2 * * * ?", + "timezone_id": "UTC", + "pause_status": "UNPAUSED" + }, + "start_time": 1672691730552, + "setup_duration": 51000, + "execution_duration": 11000, + "cleanup_duration": 0, + "end_time": 1672691793780, + "trigger": "PERIODIC", + "run_name": "OpenMetadata Databricks Workflow", + "run_page_url": "https://workspace.azuredatabricks.net/?o=workspace_id#job/325697581681107/run/820960", + "run_type": "JOB_RUN", + "tasks": [ + { + "run_id": 821029, + "task_key": "one_task", + "notebook_task": { + "notebook_path": "/Users/user_name/notebook", + "source": "WORKSPACE" + }, + "job_cluster_key": "Shared_job_cluster", + "state": { + "life_cycle_state": "TERMINATED", + "result_state": "SUCCESS", + "state_message": "", + "user_cancelled_or_timedout": false + }, + "run_page_url": "https://workspace.azuredatabricks.net/?o=workspace_id#job/325697581681107/run/821029", + "start_time": 1672691730568, + "setup_duration": 51000, + "execution_duration": 11000, + "cleanup_duration": 0, + "end_time": 1672691793559, + "cluster_instance": { + "cluster_id": "0102-203531-uh2t6efy", + "spark_context_id": "3018173046193599096" + }, + "attempt_number": 0 + } + ], + "job_clusters": [ + { + "job_cluster_key": "Shared_job_cluster", + "new_cluster": { + "cluster_name": "", + "spark_version": "11.3.x-scala2.12", + "spark_conf": { + "spark.databricks.delta.preview.enabled": "true", + "spark.master": "local[*, 4]", + "spark.databricks.cluster.profile": "singleNode" + }, + "azure_attributes": { + "first_on_demand": 1, + "availability": "ON_DEMAND_AZURE", + "spot_bid_max_price": -1.0 + }, + "node_type_id": "Standard_DS3_v2", + "custom_tags": { + "ResourceClass": "SingleNode" + }, + "spark_env_vars": { + "PYSPARK_PYTHON": "/databricks/python3/bin/python3" + }, + "enable_elastic_disk": true, + "data_security_mode": "SINGLE_USER", + "runtime_engine": "STANDARD", + "num_workers": 0 + } + } + ], + "format": "MULTI_TASK" + }, + { + "job_id": 325697581681107, + "run_id": 820788, + "creator_user_name": "user_name", + "number_in_job": 820788, + "original_attempt_run_id": 820788, + "state": { + "life_cycle_state": "TERMINATED", + "result_state": "FAILED ", + "state_message": "", + "user_cancelled_or_timedout": false + }, + "schedule": { + "quartz_cron_expression": "30 1/2 * * * ?", + "timezone_id": "UTC", + "pause_status": "UNPAUSED" + }, + "start_time": 1672691610525, + "setup_duration": 55000, + "execution_duration": 12000, + "cleanup_duration": 0, + "end_time": 1672691677852, + "trigger": "PERIODIC", + "run_name": "OpenMetadata Databricks Workflow", + "run_page_url": "https://workspace.azuredatabricks.net/?o=workspace_id#job/325697581681107/run/820788", + "run_type": "JOB_RUN", + "tasks": [ + { + "run_id": 820956, + "task_key": "one_task", + "notebook_task": { + "notebook_path": "/Users/user_name/notebook", + "source": "WORKSPACE" + }, + "job_cluster_key": "Shared_job_cluster", + "state": { + "life_cycle_state": "TERMINATED", + "result_state": "FAILED", + "state_message": "", + "user_cancelled_or_timedout": false + }, + "run_page_url": "https://workspace.azuredatabricks.net/?o=workspace_id#job/325697581681107/run/820956", + "start_time": 1672691610544, + "setup_duration": 55000, + "execution_duration": 12000, + "cleanup_duration": 0, + "end_time": 1672691677696, + "cluster_instance": { + "cluster_id": "0102-203335-kfts9a0p", + "spark_context_id": "3081931546313825347" + }, + "attempt_number": 0 + } + ], + "job_clusters": [ + { + "job_cluster_key": "Shared_job_cluster", + "new_cluster": { + "cluster_name": "", + "spark_version": "11.3.x-scala2.12", + "spark_conf": { + "spark.databricks.delta.preview.enabled": "true", + "spark.master": "local[*, 4]", + "spark.databricks.cluster.profile": "singleNode" + }, + "azure_attributes": { + "first_on_demand": 1, + "availability": "ON_DEMAND_AZURE", + "spot_bid_max_price": -1.0 + }, + "node_type_id": "Standard_DS3_v2", + "custom_tags": { + "ResourceClass": "SingleNode" + }, + "spark_env_vars": { + "PYSPARK_PYTHON": "/databricks/python3/bin/python3" + }, + "enable_elastic_disk": true, + "data_security_mode": "SINGLE_USER", + "runtime_engine": "STANDARD", + "num_workers": 0 + } + } + ], + "format": "MULTI_TASK" + } +] \ No newline at end of file diff --git a/ingestion/tests/unit/resources/datasets/databricks_pipeline_resource.json b/ingestion/tests/unit/resources/datasets/databricks_pipeline_resource.json new file mode 100644 index 00000000000..61db60cc81f --- /dev/null +++ b/ingestion/tests/unit/resources/datasets/databricks_pipeline_resource.json @@ -0,0 +1,120 @@ +[ + { + "job_id": 606358633757175, + "creator_user_name": "user_name", + "settings": { + "name": "OpenMetadata Databricks Workflow", + "email_notifications": { + "no_alert_for_skipped_runs": false + }, + "timeout_seconds": 0, + "schedule": { + "quartz_cron_expression": "16 1/2 * * * ?", + "timezone_id": "UTC", + "pause_status": "PAUSED" + }, + "max_concurrent_runs": 3, + "tasks": [ + { + "task_key": "task_1", + "notebook_task": { + "notebook_path": "/Users/user_name/notebook", + "source": "WORKSPACE" + }, + "job_cluster_key": "cluster_name", + "timeout_seconds": 0, + "email_notifications": {} + }, + { + "task_key": "task_2", + "depends_on": [ + { + "task_key": "task_1" + } + ], + "spark_python_task": { + "python_file": "dbfs:/user/python_script" + }, + "job_cluster_key": "cluster_name", + "timeout_seconds": 0, + "email_notifications": {} + }, + { + "task_key": "task_3", + "depends_on": [ + { + "task_key": "task_1" + } + ], + "python_wheel_task": { + "package_name": "my_package", + "entry_point": "run_job" + }, + "job_cluster_key": "cluster_name", + "timeout_seconds": 0, + "email_notifications": {} + }, + { + "task_key": "task_4", + "depends_on": [ + { + "task_key": "task_1" + } + ], + "pipeline_task": { + "pipeline_id": "4068beca-753c-46ed-9a64-370108ffcd8f", + "full_refresh": false + }, + "timeout_seconds": 0, + "email_notifications": {} + }, + { + "task_key": "task_5", + "depends_on": [ + { + "task_key": "task_3" + }, + { + "task_key": "task_4" + } + ], + "sql_task": { + "query": { + "query_id": "032a8806-bd62-4594-b222-43a973290210" + }, + "warehouse_id": "21e75a911f658d1b" + }, + "timeout_seconds": 0, + "email_notifications": {} + } + ], + "job_clusters": [ + { + "job_cluster_key": "cluster_name", + "new_cluster": { + "cluster_name": "", + "spark_version": "11.3.x-scala2.12", + "spark_conf": { + "spark.databricks.delta.preview.enabled": "true" + }, + "azure_attributes": { + "first_on_demand": 1, + "availability": "ON_DEMAND_AZURE", + "spot_bid_max_price": -1.0 + }, + "node_type_id": "Standard_DS3_v2", + "spark_env_vars": { + "PYSPARK_PYTHON": "/databricks/python3/bin/python3" + }, + "enable_elastic_disk": true, + "data_security_mode": "SINGLE_USER", + "runtime_engine": "STANDARD", + "num_workers": 8 + } + } + ], + "format": "MULTI_TASK" + }, + "created_time": 1663513966931 + } +] \ No newline at end of file diff --git a/ingestion/tests/unit/topology/pipeline/test_databricks_pipeline.py b/ingestion/tests/unit/topology/pipeline/test_databricks_pipeline.py new file mode 100644 index 00000000000..742984f7403 --- /dev/null +++ b/ingestion/tests/unit/topology/pipeline/test_databricks_pipeline.py @@ -0,0 +1,265 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Databricks Pipeline utils tests +""" + +import json +from pathlib import Path +from unittest import TestCase +from unittest.mock import patch + +from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest +from metadata.generated.schema.entity.data.pipeline import ( + Pipeline, + PipelineStatus, + StatusType, + Task, + TaskStatus, +) +from metadata.generated.schema.entity.services.pipelineService import ( + PipelineConnection, + PipelineService, + PipelineServiceType, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + OpenMetadataWorkflowConfig, +) +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus +from metadata.ingestion.source.pipeline.databrickspipeline.metadata import ( + DatabrickspipelineSource, +) +from metadata.utils.ansi import print_ansi_encoded_string + +mock_file_path = ( + Path(__file__).parent.parent.parent + / "resources/datasets/databricks_pipeline_resource.json" +) +with open(mock_file_path) as file: + mock_data: dict = json.load(file) + +mock_file_path = ( + Path(__file__).parent.parent.parent + / "resources/datasets/databricks_pipeline_history.json" +) +with open(mock_file_path) as file: + mock_history_data: dict = json.load(file) + + +mock_databricks_config = { + "source": { + "type": "DatabricksPipeline", + "serviceName": "DatabricksPipeline", + "serviceConnection": { + "config": { + "type": "DatabricksPipeline", + "token": "random_token", + "hostPort": "localhost:443", + "connectionArguments": { + "http_path": "sql/1.0/endpoints/path", + }, + } + }, + "sourceConfig": {"config": {"type": "PipelineMetadata"}}, + }, + "sink": {"type": "metadata-rest", "config": {}}, + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "openmetadata", + "securityConfig": { + "jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGc" + "iOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE" + "2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXB" + "iEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fN" + "r3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3u" + "d-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" + }, + } + }, +} + +MOCK_PIPELINE_SERVICE = PipelineService( + id="85811038-099a-11ed-861d-0242ac120002", + name="databricks_pipeline_test", + connection=PipelineConnection(), + serviceType=PipelineServiceType.DatabricksPipeline, +) + +MOCK_PIPELINE = Pipeline( + id="2aaa012e-099a-11ed-861d-0242ac120002", + name="606358633757175", + fullyQualifiedName="databricks_pipeline_source.606358633757175", + displayName="OpenMetadata Databricks Workflow", + tasks=[ + Task( + name="task_1", + displayName="task_1", + taskType="notebook_task", + downstreamTasks=["task_2", "task_3", "task_4"], + ), + Task( + name="task_2", + displayName="task_2", + taskType="spark_python_task", + downstreamTasks=[], + ), + Task( + name="task_3", + displayName="task_3", + taskType="python_wheel_task", + downstreamTasks=["task_5"], + ), + Task( + name="task_4", + displayName="task_4", + taskType="pipeline_task", + downstreamTasks=["task_5"], + ), + Task( + name="task_5", + displayName="task_5", + taskType="sql_task", + downstreamTasks=[], + ), + ], + service=EntityReference( + id="85811038-099a-11ed-861d-0242ac120002", type="pipelineService" + ), +) + +EXPECTED_CREATED_PIPELINES = CreatePipelineRequest( + name="606358633757175", + displayName="OpenMetadata Databricks Workflow", + description="OpenMetadata Databricks Workflow", + tasks=[ + Task( + name="task_1", + displayName="task_1", + taskType="notebook_task", + downstreamTasks=["task_2", "task_3", "task_4"], + ), + Task( + name="task_2", + displayName="task_2", + taskType="spark_python_task", + downstreamTasks=[], + ), + Task( + name="task_3", + displayName="task_3", + taskType="python_wheel_task", + downstreamTasks=["task_5"], + ), + Task( + name="task_4", + displayName="task_4", + taskType="pipeline_task", + downstreamTasks=["task_5"], + ), + Task( + name="task_5", + displayName="task_5", + taskType="sql_task", + downstreamTasks=[], + ), + ], + service=EntityReference( + id="85811038-099a-11ed-861d-0242ac120002", type="pipelineService" + ), +) + + +EXPECTED_PIPELINE_STATUS = [ + OMetaPipelineStatus( + pipeline_fqn="databricks_pipeline_source.606358633757175", + pipeline_status=PipelineStatus( + executionStatus=StatusType.Successful.value, + taskStatus=[ + TaskStatus( + name="one_task", + executionStatus=StatusType.Successful.value, + startTime=1672691730568, + endTime=1672691793559, + logLink="https://workspace.azuredatabricks.net/?o=workspace_id#job/325697581681107/run/821029", + ) + ], + timestamp=1672691730552, + ), + ), + OMetaPipelineStatus( + pipeline_fqn="databricks_pipeline_source.606358633757175", + pipeline_status=PipelineStatus( + executionStatus=StatusType.Failed.value, + taskStatus=[ + TaskStatus( + name="one_task", + executionStatus=StatusType.Failed.value, + startTime=1672691610544, + endTime=1672691677696, + logLink="https://workspace.azuredatabricks.net/?o=workspace_id#job/325697581681107/run/820956", + ) + ], + timestamp=1672691610525, + ), + ), +] + + +class DatabricksPipelineTests(TestCase): + """ + Implements the necessary methods to extract + Databricks Pipeline test + """ + + maxDiff = None + + @patch( + "metadata.ingestion.source.pipeline.pipeline_service.PipelineServiceSource.test_connection" + ) + def __init__(self, methodName, test_connection) -> None: + super().__init__(methodName) + print_ansi_encoded_string(message="init") + test_connection.return_value = False + config = OpenMetadataWorkflowConfig.parse_obj(mock_databricks_config) + + self.databricks = DatabrickspipelineSource.create( + mock_databricks_config["source"], + config.workflowConfig.openMetadataServerConfig, + ) + self.databricks.context.__dict__["pipeline"] = MOCK_PIPELINE + self.databricks.context.__dict__["pipeline_service"] = MOCK_PIPELINE_SERVICE + self.databricks.context.__dict__["job_id_list"] = [ + mock_history_data[0]["job_id"] + ] + + @patch( + "metadata.ingestion.source.database.databricks.client.DatabricksClient.list_jobs" + ) + def test_get_pipelines_list(self, list_jobs): + list_jobs.return_value = mock_data + results = list(self.databricks.get_pipelines_list()) + self.assertEqual(mock_data, results) + + def test_yield_pipeline(self): + pipelines = list(self.databricks.yield_pipeline(mock_data[0]))[0] + self.assertEqual(pipelines, EXPECTED_CREATED_PIPELINES) + + @patch( + "metadata.ingestion.source.database.databricks.client.DatabricksClient.get_job_runs" + ) + def test_yield_pipeline_status(self, get_job_runs): + get_job_runs.return_value = mock_history_data + pipeline_status = list( + self.databricks.yield_pipeline_status(mock_history_data[0]["job_id"]) + ) + self.assertEqual(pipeline_status, EXPECTED_PIPELINE_STATUS) diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/databricksPipelineConnection.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/databricksPipelineConnection.json new file mode 100644 index 00000000000..2ea179143a8 --- /dev/null +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/databricksPipelineConnection.json @@ -0,0 +1,55 @@ +{ + "$id": "https://open-metadata.org/schema/entity/services/connections/pipeline/DatabricksPipelineConnection.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "DatabricksPipelineConnection", + "description": "Databricks Connection Config", + "type": "object", + "javaType": "org.openmetadata.schema.services.connections.pipeline.DatabricksPipelineConnection", + "definitions": { + "databricksType": { + "description": "Service type.", + "type": "string", + "enum": [ + "DatabricksPipeline" + ], + "default": "DatabricksPipeline" + } + }, + "properties": { + "type": { + "title": "Service Type", + "description": "Service Type", + "$ref": "#/definitions/databricksType", + "default": "DatabricksPipeline" + }, + "hostPort": { + "title": "Host and Port", + "description": "Host and port of the Databricks service.", + "type": "string" + }, + "token": { + "title": "Token", + "description": "Generated Token to connect to Databricks.", + "type": "string", + "format": "password" + }, + "httpPath": { + "title": "Http Path", + "description": "Databricks compute resources URL.", + "type": "string" + }, + "connectionArguments": { + "title": "Connection Arguments", + "$ref": "../connectionBasicType.json#/definitions/connectionArguments" + }, + "supportsMetadataExtraction": { + "title": "Supports Metadata Extraction", + "$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction" + } + }, + "additionalProperties": false, + "required": [ + "hostPort", + "token" + ] +} \ No newline at end of file diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/pipelineService.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/pipelineService.json index 887a0532292..86b2ceb8bff 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/services/pipelineService.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/pipelineService.json @@ -13,7 +13,9 @@ "pipelineServiceType": { "description": "Type of pipeline service - Airflow or Prefect.", "type": "string", - "javaInterfaces": ["org.openmetadata.schema.EnumInterface"], + "javaInterfaces": [ + "org.openmetadata.schema.EnumInterface" + ], "enum": [ "Airflow", "GluePipeline", @@ -22,7 +24,8 @@ "Dagster", "Nifi", "DomoPipeline", - "CustomPipeline" + "CustomPipeline", + "DatabricksPipeline" ], "javaEnums": [ { @@ -48,6 +51,9 @@ }, { "name": "CustomPipeline" + }, + { + "name": "DatabricksPipeline" } ] }, @@ -85,6 +91,9 @@ }, { "$ref": "./connections/pipeline/customPipelineConnection.json" + }, + { + "$ref": "./connections/pipeline/databricksPipelineConnection.json" } ] } @@ -162,6 +171,10 @@ "default": false } }, - "required": ["id", "name", "serviceType"], + "required": [ + "id", + "name", + "serviceType" + ], "additionalProperties": false -} +} \ No newline at end of file