From a6a766251765e14caad97e1bcadf8d38d9c88f90 Mon Sep 17 00:00:00 2001 From: Abhishek Pandey Date: Sat, 13 Aug 2022 15:18:14 +0530 Subject: [PATCH] dagster-connector-added (#6513) * dagster-connector-added * code-formatted * dagster-comments-removed * dagster-comments-removed * dagster-unittest-added * code-formatted-pr-comments-resolved * improved-code-quality * increased-py-test-time-out * improved-dagster-tests * improved-dagster-tests * reverted-pytests-timeout * dagster-pytest-improved * added-schema-in-omdcore * updated-schema-in-omd-core * ui-schema-updated-omd-core * updated-dagster-schema * dagter-schema-test-updated --- .../pipeline/dagsterConnection.json | 56 ++++++ .../entity/services/pipelineService.json | 8 +- ingestion-core/src/metadata/_version.py | 2 +- ingestion/examples/workflows/dagster.yaml | 26 +++ ingestion/setup.py | 1 + .../metadata/clients/connection_clients.py | 6 + .../ingestion/source/pipeline/dagster.py | 183 +++++++++++++++++ ingestion/src/metadata/utils/connections.py | 23 +++ .../resources/datasets/dagster_dataset.json | 85 ++++++++ .../unit/topology/pipeline/test_dagster.py | 186 ++++++++++++++++++ .../pipeline/dagsterConnection.json | 57 ++++++ .../entity/services/pipelineService.json | 8 +- .../ui/src/utils/PipelineServiceUtils.ts | 6 + 13 files changed, 644 insertions(+), 3 deletions(-) create mode 100644 catalog-rest-service/src/main/resources/json/schema/entity/services/connections/pipeline/dagsterConnection.json create mode 100644 ingestion/examples/workflows/dagster.yaml create mode 100644 ingestion/src/metadata/ingestion/source/pipeline/dagster.py create mode 100644 ingestion/tests/unit/resources/datasets/dagster_dataset.json create mode 100644 ingestion/tests/unit/topology/pipeline/test_dagster.py create mode 100644 openmetadata-core/src/main/resources/json/schema/entity/services/connections/pipeline/dagsterConnection.json diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/pipeline/dagsterConnection.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/pipeline/dagsterConnection.json new file mode 100644 index 00000000000..948efe04178 --- /dev/null +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/pipeline/dagsterConnection.json @@ -0,0 +1,56 @@ +{ + "$id": "https://open-metadata.org/schema/entity/services/connections/pipeline/dagsterConnection.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "DagsterConnection", + "description": "Dagster Metadata Database Connection Config", + "type": "object", + "javaType": "org.openmetadata.catalog.services.connections.pipeline.DagsterConnection", + "definitions": { + "DagsterType": { + "description": "Service type.", + "type": "string", + "enum": ["Dagster"], + "default": "Dagster" + } + }, + "properties": { + "type": { + "title": "Service Type", + "description": "Service Type", + "$ref": "#/definitions/DagsterType", + "default": "Dagster" + }, + "hostPort": { + "title": "Host And Port", + "description": "Pipeline Service Management/UI URI.", + "type": "string", + "format": "uri" + }, + "numberOfStatus": { + "description": "Pipeline Service Number Of Status", + "type": "integer", + "default": "10" + }, + "dbConnection": { + "title": "Metadata Database Connection", + "description": "Underlying database connection. See https://docs.dagster.io/getting-started for supported backends.", + "oneOf": [ + { + "$ref": "../database/mysqlConnection.json" + }, + { + "$ref": "../database/postgresConnection.json" + }, + { + "$ref": "../database/sqliteConnection.json" + } + ] + }, + "supportsMetadataExtraction": { + "title": "Supports Metadata Extraction", + "$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction" + } + }, + "additionalProperties": false, + "required": ["hostPort", "dbConnection"] +} diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/pipelineService.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/pipelineService.json index b68e4b615ea..67fb3bf2859 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/pipelineService.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/pipelineService.json @@ -14,7 +14,7 @@ "description": "Type of pipeline service - Airflow or Prefect.", "type": "string", "javaInterfaces": ["org.openmetadata.catalog.EnumInterface"], - "enum": ["Airflow", "Glue", "Airbyte", "Fivetran"], + "enum": ["Airflow", "Glue", "Airbyte", "Fivetran", "Dagster"], "javaEnums": [ { "name": "Airflow" @@ -27,6 +27,9 @@ }, { "name": "Fivetran" + }, + { + "name": "Dagster" } ] }, @@ -52,6 +55,9 @@ }, { "$ref": "./connections/pipeline/fivetranConnection.json" + }, + { + "$ref": "./connections/pipeline/dagsterConnection.json" } ] } diff --git a/ingestion-core/src/metadata/_version.py b/ingestion-core/src/metadata/_version.py index 99e8a03ec79..7965b611a9b 100644 --- a/ingestion-core/src/metadata/_version.py +++ b/ingestion-core/src/metadata/_version.py @@ -7,5 +7,5 @@ Provides metadata version information. from incremental import Version -__version__ = Version("metadata", 0, 12, 0, dev=10) +__version__ = Version("metadata", 0, 12, 0, dev=11) __all__ = ["__version__"] diff --git a/ingestion/examples/workflows/dagster.yaml b/ingestion/examples/workflows/dagster.yaml new file mode 100644 index 00000000000..115f464a941 --- /dev/null +++ b/ingestion/examples/workflows/dagster.yaml @@ -0,0 +1,26 @@ +source: + type: dagster + serviceName: dagster_source + serviceConnection: + config: + type: Dagster + hostPort: http://localhost:8080 + numberOfStatus: 10 + dbConnection: + type: Mysql + username: dagster_user + password: dagter_pass + databaseSchema: dagster_db + hostPort: localhost:3306 + sourceConfig: + config: + type: PipelineMetadata +sink: + type: metadata-rest + config: { } +workflowConfig: + loggerLevel: INFO + openMetadataServerConfig: + hostPort: http://localhost:8585/api + authProvider: no-auth + \ No newline at end of file diff --git a/ingestion/setup.py b/ingestion/setup.py index b8c018e0833..c23f7d1a1b8 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -70,6 +70,7 @@ plugins: Dict[str, Set[str]] = { "bigquery-usage": {"google-cloud-logging", "cachetools"}, "docker": {"python_on_whales==0.34.0"}, "backup": {"boto3~=1.19.12"}, + "dagster": {"pymysql>=1.0.2", "psycopg2-binary", "GeoAlchemy2"}, "datalake": { "google-cloud-storage==1.43.0", "pandas==1.3.5", diff --git a/ingestion/src/metadata/clients/connection_clients.py b/ingestion/src/metadata/clients/connection_clients.py index 724d0c18ac8..dd04befb47e 100644 --- a/ingestion/src/metadata/clients/connection_clients.py +++ b/ingestion/src/metadata/clients/connection_clients.py @@ -121,3 +121,9 @@ class MlflowClientWrapper: class FivetranClient: def __init__(self, client) -> None: self.client = client + + +@dataclass +class DagsterClient: + def __init__(self, client) -> None: + self.client = client diff --git a/ingestion/src/metadata/ingestion/source/pipeline/dagster.py b/ingestion/src/metadata/ingestion/source/pipeline/dagster.py new file mode 100644 index 00000000000..1af65d9bccc --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/pipeline/dagster.py @@ -0,0 +1,183 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Dagster source to extract metadata from OM UI +""" +import json +from collections.abc import Iterable +from typing import Dict, Iterable, Optional + +from sqlalchemy import text +from sqlalchemy.engine.base import Engine +from sqlalchemy.orm import Session + +from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest +from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest +from metadata.generated.schema.entity.data.pipeline import ( + Pipeline, + PipelineStatus, + StatusType, + Task, + TaskStatus, +) +from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.generated.schema.entity.services.connections.pipeline.dagsterConnection import ( + DagsterConnection, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) +from metadata.generated.schema.type.entityLineage import EntitiesEdge +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.api.source import InvalidSourceException +from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus +from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource +from metadata.utils.connections import ( + create_and_bind_session, + get_connection, + test_connection, +) +from metadata.utils.helpers import datetime_to_ts +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() + +STATUS_MAP = { + "success": StatusType.Successful.value, + "failed": StatusType.Failed.value, + "queued": StatusType.Pending.value, +} + + +class DagsterSource(PipelineServiceSource): + """ + Implements the necessary methods ot extract + Pipeline metadata from Dagster's metadata db + """ + + config: WorkflowSource + + def __init__( + self, + config: WorkflowSource, + metadata_config: OpenMetadataConnection, + ): + self._session = None + self.service_connection = config.serviceConnection.__root__.config + self.engine: Engine = get_connection(self.service_connection.dbConnection) + super().__init__(config, metadata_config) + # Create the connection to the database + + @classmethod + def create(cls, config_dict, metadata_config: OpenMetadataConnection): + config: WorkflowSource = WorkflowSource.parse_obj(config_dict) + connection: DagsterConnection = config.serviceConnection.__root__.config + if not isinstance(connection, DagsterConnection): + raise InvalidSourceException( + f"Expected DagsterConnection, but got {connection}" + ) + return cls(config, metadata_config) + + @property + def session(self) -> Session: + """ + Return the SQLAlchemy session from the engine + """ + if not self._session: + self._session = create_and_bind_session(self.engine) + + return self._session + + def get_run_list(self): + + run_list = self.session.execute("select * from runs") + return run_list + + def yield_pipeline(self, pipeline_details) -> Iterable[CreatePipelineRequest]: + """ + Convert a DAG into a Pipeline Entity + :param serialized_dag: SerializedDAG from dagster metadata DB + :return: Create Pipeline request with tasks + """ + + task_list = [{"name": row["pipeline_name"]} for row in self.get_run_list()] + run_body = json.loads(pipeline_details["run_body"]) + location_name = run_body["external_pipeline_origin"][ + "external_repository_origin" + ]["repository_location_origin"]["location_name"] + repository_name = run_body["external_pipeline_origin"][ + "external_repository_origin" + ]["repository_name"] + pipeline_url = f"/workspace/{repository_name}@{location_name}/jobs/{pipeline_details.pipeline_name}/" + yield CreatePipelineRequest( + name=pipeline_details.pipeline_name, + description=pipeline_details.pipeline_name, + pipelineUrl=pipeline_url, + pipelineLocation=location_name, + startDate=pipeline_details.create_timestamp, + tasks=task_list, + service=EntityReference( + id=self.context.pipeline_service.id.__root__, type="pipelineService" + ), + ) + + def yield_pipeline_status(self, pipeline_details) -> OMetaPipelineStatus: + run_list = self.get_run_list() + for run in run_list: + log_link = f"{self.service_connection.hostPort}/instance/runs/{run.run_id}" + task_status = TaskStatus( + name=run["pipeline_name"], + executionStatus=STATUS_MAP.get( + run["status"].lower(), StatusType.Pending.value + ), + startTime=datetime_to_ts(run[9]), + endTime=datetime_to_ts(run[10]), + logLink=log_link, + ) + pipeline_status = PipelineStatus( + taskStatus=[task_status], + executionStatus=STATUS_MAP.get( + run["status"].lower(), StatusType.Pending.value + ), + timestamp=run[10].timestamp(), + ) + yield OMetaPipelineStatus( + pipeline_fqn=self.context.pipeline.fullyQualifiedName.__root__, + pipeline_status=pipeline_status, + ) + + def yield_pipeline_lineage_details( + self, pipeline_details + ) -> Optional[Iterable[AddLineageRequest]]: + """ + Not implemented, as this connector does not create any lineage + """ + + def close(self): + self.session.close() + + def test_connection(self) -> None: + test_connection(self.engine) + + def get_pipelines_list(self) -> Dict: + + results = self.engine.execute(text("SELECT * from runs")) + for result in results: + yield result + + def get_pipeline_name(self, pipeline_details) -> str: + """ + Get Pipeline Name + """ + return pipeline_details["pipeline_name"] diff --git a/ingestion/src/metadata/utils/connections.py b/ingestion/src/metadata/utils/connections.py index acd5b55996d..e78f30ebef7 100644 --- a/ingestion/src/metadata/utils/connections.py +++ b/ingestion/src/metadata/utils/connections.py @@ -113,6 +113,9 @@ from metadata.generated.schema.entity.services.connections.pipeline.airflowConne from metadata.generated.schema.entity.services.connections.pipeline.backendConnection import ( BackendConnection, ) +from metadata.generated.schema.entity.services.connections.pipeline.dagsterConnection import ( + DagsterConnection, +) from metadata.generated.schema.entity.services.connections.pipeline.fivetranConnection import ( FivetranConnection, ) @@ -849,3 +852,23 @@ def _(_: BackendConnection, verbose: bool = False): with settings.Session() as session: return session.get_bind() + + +@test_connection.register +def _(connection: DagsterConnection) -> None: + try: + test_connection(connection.dbConnection) + except Exception as err: + raise SourceConnectionException( + f"Unknown error connecting with {connection} - {err}." + ) + + +@get_connection.register +def _(connection: DagsterConnection) -> None: + try: + return get_connection(connection.dbConnection) + except Exception as err: + raise SourceConnectionException( + f"Unknown error connecting with {connection} - {err}." + ) diff --git a/ingestion/tests/unit/resources/datasets/dagster_dataset.json b/ingestion/tests/unit/resources/datasets/dagster_dataset.json new file mode 100644 index 00000000000..8618ea23ead --- /dev/null +++ b/ingestion/tests/unit/resources/datasets/dagster_dataset.json @@ -0,0 +1,85 @@ +[ + 1, + "a6ebb16c-505f-446d-8642-171c3320ccef", + "db1439a4208a2059caa193e7cbd39d9de7292f2a", + "do_it_all_with_default_config", + null, + "SUCCESS", + { + "__class__": "PipelineRun", + "asset_selection": null, + "execution_plan_snapshot_id": "a9609467aa63cf0306dd462b6e760ee51aa5cc52", + "external_pipeline_origin": { + "__class__": "ExternalPipelineOrigin", + "external_repository_origin": { + "__class__": "ExternalRepositoryOrigin", + "repository_location_origin": { + "__class__": "ManagedGrpcPythonEnvRepositoryLocationOrigin", + "loadable_target_origin": { + "__class__": "LoadableTargetOrigin", + "attribute": null, + "executable_path": null, + "module_name": null, + "package_name": null, + "python_file": "/opt/dagster/app/cereal.py", + "working_directory": null + }, + "location_name": "cereal.py" + }, + "repository_name": "__repository__do_it_all_with_default_config" + }, + "pipeline_name": "do_it_all_with_default_config" + }, + "mode": "default", + "parent_run_id": null, + "pipeline_code_origin": { + "__class__": "PipelinePythonOrigin", + "pipeline_name": "do_it_all_with_default_config", + "repository_origin": { + "__class__": "RepositoryPythonOrigin", + "code_pointer": { + "__class__": "FileCodePointer", + "fn_name": "do_it_all_with_default_config", + "python_file": "/opt/dagster/app/cereal.py", + "working_directory": "/opt/dagster/app" + }, + "container_context": {}, + "container_image": null, + "entry_point": [ + "dagster" + ], + "executable_path": "/usr/local/bin/python" + } + }, + "pipeline_name": "do_it_all_with_default_config", + "pipeline_snapshot_id": "db1439a4208a2059caa193e7cbd39d9de7292f2a", + "root_run_id": null, + "run_config": { + "ops": { + "do_something": { + "config": { + "config_param": "stuff" + } + } + } + }, + "run_id": "a6ebb16c-505f-446d-8642-171c3320ccef", + "solid_selection": null, + "solids_to_execute": null, + "status": { + "__enum__": "PipelineRunStatus.SUCCESS" + }, + "step_keys_to_execute": null, + "tags": { + ".dagster/grpc_info": {"host": "localhost", "socket": "/tmp/tmp09vd0qsu"}, + "dagster/preset_name": "default", + "dagster/solid_selection": "*" + } + }, + null, + null, + "2022-07-25T05:55:02.292119Z", + "2022-07-25T08:34:31.425131Z", + 1659616628.404192, + 1659616630.256226 +] \ No newline at end of file diff --git a/ingestion/tests/unit/topology/pipeline/test_dagster.py b/ingestion/tests/unit/topology/pipeline/test_dagster.py new file mode 100644 index 00000000000..a7f0e6938ba --- /dev/null +++ b/ingestion/tests/unit/topology/pipeline/test_dagster.py @@ -0,0 +1,186 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Test Dagster using the topology +""" +import json +from pathlib import Path +from unittest import TestCase +from unittest.mock import patch + +from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest +from metadata.generated.schema.entity.data.pipeline import ( + Pipeline, + PipelineStatus, + StatusType, + Task, + TaskStatus, +) +from metadata.generated.schema.entity.services.pipelineService import ( + PipelineConnection, + PipelineService, + PipelineServiceType, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + OpenMetadataWorkflowConfig, +) +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus +from metadata.ingestion.source.pipeline.dagster import DagsterSource + +mock_file_path = ( + Path(__file__).parent.parent.parent / "resources/datasets/dagster_dataset.json" +) +with open(mock_file_path) as file: + mock_data: dict = json.load(file) + +mock_dagster_config = { + "source": { + "type": "dagster", + "serviceName": "dagster_source", + "serviceConnection": { + "config": { + "type": "Dagster", + "hostPort": "http://localhost:3000", + "dbConnection": { + "username": "dagster_user", + "password": "dagter_pass", + "databaseSchema": "dagster_db", + "hostPort": "localhost:3306", + }, + } + }, + "sourceConfig": {"config": {"type": "PipelineMetadata"}}, + }, + "sink": {"type": "metadata-rest", "config": {}}, + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "no-auth", + } + }, +} + + +EXPECTED_DAGSTER_DETAILS = mock_data[6]["pipeline_code_origin"] + +MOCK_CONNECTION_URI_PATH = "/workspace/__repository__do_it_all_with_default_config@cereal.py/jobs/do_it_all_with_default_config/" +MOCK_LOG_URL = ( + "http://localhost:8080/instance/runs/a6ebb16c-505f-446d-8642-171c3320ccef" +) + + +EXPECTED_PIPELINE_STATUS = [ + OMetaPipelineStatus( + pipeline_fqn="dagster_source.a10f6d82-4fc6-4c90-ba04-bb773c8fbb0f", + pipeline_status=PipelineStatus( + executionStatus=StatusType.Pending.value, + taskStatus=[ + TaskStatus( + name="a10f6d82-4fc6-4c90-ba04-bb773c8fbb0f", + executionStatus=StatusType.Pending.value, + startTime=1659616627124, + endTime=1659616635858, + logLink=f"{MOCK_LOG_URL}/status", + ) + ], + timestamp=1659616635858, + ), + ), + OMetaPipelineStatus( + pipeline_fqn="dagster_source.a10f6d82-4fc6-4c90-ba04-bb773c8fbb0f", + pipeline_status=PipelineStatus( + executionStatus=StatusType.Successful.value, + taskStatus=[ + TaskStatus( + name="a10f6d82-4fc6-4c90-ba04-bb773c8fbb0f", + executionStatus=StatusType.Successful.value, + startTime=1655393914, + endTime=1655394054, + logLink=f"{MOCK_LOG_URL}/status", + ) + ], + timestamp=1655393914, + ), + ), +] + + +EXPECTED_CREATED_PIPELINES = CreatePipelineRequest( + name="a6ebb16c-505f-446d-8642-171c3320ccef", + displayName="do_it_all_with_default_configs", + description="do_it_all_with_default_config", + pipelineUrl=MOCK_CONNECTION_URI_PATH, + tasks=[ + Task( + name="do_it_all_with_default_config", + displayName="", + description="", + taskUrl="", + ) + ], + service=EntityReference( + id="85811038-099a-11ed-861d-0242ac120002", type="pipelineService" + ), +) + +MOCK_PIPELINE_SERVICE = PipelineService( + id="86ff3c40-7c51-4ff5-9727-738cead28d9a", + name="dagster_source_test", + connection=PipelineConnection(), + serviceType=PipelineServiceType.Dagster, +) + + +MOCK_PIPELINE = Pipeline( + id="a58b1856-729c-493b-bc87-6d2269b43ec0", + name="do_it_all_with_default_config", + fullyQualifiedName="dagster_source.do_it_all_with_default_config", + displayName="do_it_all_with_default_config", + description="", + pipelineUrl=MOCK_CONNECTION_URI_PATH, + tasks=[ + Task( + name="a58b1856-729c-493b-bc87-6d2269b43ec0", + displayName="do_it_all_with_default_config", + description="", + taskUrl="", + ) + ], + service=EntityReference( + id="85811038-099a-11ed-861d-0242ac120002", type="pipelineService" + ), +) + + +class DagsterUnitTest(TestCase): + @patch("metadata.ingestion.source.pipeline.pipeline_service.test_connection") + @patch("sqlalchemy.engine.base.Engine.connect") + def __init__(self, methodName, mock_connect, test_connection) -> None: + super().__init__(methodName) + test_connection.return_value = False + + config = OpenMetadataWorkflowConfig.parse_obj(mock_dagster_config) + + self.dagster = DagsterSource.create( + mock_dagster_config["source"], + config.workflowConfig.openMetadataServerConfig, + ) + self.engine = mock_connect.return_value + self.dagster.context.__dict__["pipeline"] = MOCK_PIPELINE + self.dagster.context.__dict__["pipeline_service"] = MOCK_PIPELINE_SERVICE + + def test_pipeline_name(self): + + assert ( + self.dagster.get_pipeline_name(EXPECTED_DAGSTER_DETAILS) + == mock_data[6]["pipeline_code_origin"]["pipeline_name"] + ) diff --git a/openmetadata-core/src/main/resources/json/schema/entity/services/connections/pipeline/dagsterConnection.json b/openmetadata-core/src/main/resources/json/schema/entity/services/connections/pipeline/dagsterConnection.json new file mode 100644 index 00000000000..4695a10c714 --- /dev/null +++ b/openmetadata-core/src/main/resources/json/schema/entity/services/connections/pipeline/dagsterConnection.json @@ -0,0 +1,57 @@ +{ + "$id": "https://open-metadata.org/schema/entity/services/connections/pipeline/dagsterConnection.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "DagsterConnection", + "description": "Dagster Metadata Database Connection Config", + "type": "object", + "javaType": "org.openmetadata.catalog.services.connections.pipeline.DagsterConnection", + "definitions": { + "DagsterType": { + "description": "Service type.", + "type": "string", + "enum": ["Dagster"], + "default": "Dagster" + } + }, + "properties": { + "type": { + "title": "Service Type", + "description": "Service Type", + "$ref": "#/definitions/DagsterType", + "default": "Dagster" + }, + "hostPort": { + "title": "Host And Port", + "description": "Pipeline Service Management/UI URI.", + "type": "string", + "format": "uri" + }, + "numberOfStatus": { + "description": "Pipeline Service Number Of Status", + "type": "integer", + "default": "10" + }, + "connection": { + "title": "Metadata Database Connection", + "description": "Underlying database connection. See https://docs.dagster.io/getting-started for supported backends.", + "oneOf": [ + { + "$ref": "../database/mysqlConnection.json" + }, + { + "$ref": "../database/postgresConnection.json" + }, + { + "$ref": "../database/sqliteConnection.json" + } + ] + }, + "supportsMetadataExtraction": { + "title": "Supports Metadata Extraction", + "$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction" + } + }, + "additionalProperties": false, + "required": ["hostPort", "connection"] + } + \ No newline at end of file diff --git a/openmetadata-core/src/main/resources/json/schema/entity/services/pipelineService.json b/openmetadata-core/src/main/resources/json/schema/entity/services/pipelineService.json index 9446d85ee0a..57bdb73fa98 100644 --- a/openmetadata-core/src/main/resources/json/schema/entity/services/pipelineService.json +++ b/openmetadata-core/src/main/resources/json/schema/entity/services/pipelineService.json @@ -13,7 +13,7 @@ "javaInterfaces": [ "org.openmetadata.core.entity.interfaces.EnumInterface" ], - "enum": ["Airflow", "Prefect", "Glue", "Generic", "Airbyte"], + "enum": ["Airflow", "Prefect", "Glue", "Generic", "Airbyte", "Dagster"], "javaEnums": [ { "name": "Airflow" @@ -29,6 +29,9 @@ }, { "name": "Airbyte" + }, + { + "name": "Dagster" } ] }, @@ -49,6 +52,9 @@ }, { "$ref": "./connections/pipeline/airbyteConnection.json" + }, + { + "$ref": "./connections/pipeline/dagsterConnection.json" } ] } diff --git a/openmetadata-ui/src/main/resources/ui/src/utils/PipelineServiceUtils.ts b/openmetadata-ui/src/main/resources/ui/src/utils/PipelineServiceUtils.ts index 755837edece..193725d5e70 100644 --- a/openmetadata-ui/src/main/resources/ui/src/utils/PipelineServiceUtils.ts +++ b/openmetadata-ui/src/main/resources/ui/src/utils/PipelineServiceUtils.ts @@ -16,6 +16,7 @@ import { COMMON_UI_SCHEMA } from '../constants/services.const'; import { PipelineServiceType } from '../generated/entity/services/pipelineService'; import airbyteConnection from '../jsons/connectionSchemas/connections/pipeline/airbyteConnection.json'; import airflowConnection from '../jsons/connectionSchemas/connections/pipeline/airflowConnection.json'; +import dagsterConnection from '../jsons/connectionSchemas/connections/pipeline/dagsterConnection.json'; import fivetranConnection from '../jsons/connectionSchemas/connections/pipeline/fivetranConnection.json'; import glueConnection from '../jsons/connectionSchemas/connections/pipeline/glueConnection.json'; @@ -42,6 +43,11 @@ export const getPipelineConfig = (type: PipelineServiceType) => { case PipelineServiceType.Fivetran: { schema = fivetranConnection; + break; + } + case PipelineServiceType.Dagster: { + schema = dagsterConnection; + break; } }