mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-08-15 04:26:59 +00:00
dagster-connector-added (#6513)
* dagster-connector-added * code-formatted * dagster-comments-removed * dagster-comments-removed * dagster-unittest-added * code-formatted-pr-comments-resolved * improved-code-quality * increased-py-test-time-out * improved-dagster-tests * improved-dagster-tests * reverted-pytests-timeout * dagster-pytest-improved * added-schema-in-omdcore * updated-schema-in-omd-core * ui-schema-updated-omd-core * updated-dagster-schema * dagter-schema-test-updated
This commit is contained in:
parent
80ca224c39
commit
a6a7662517
@ -0,0 +1,56 @@
|
|||||||
|
{
|
||||||
|
"$id": "https://open-metadata.org/schema/entity/services/connections/pipeline/dagsterConnection.json",
|
||||||
|
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||||
|
"title": "DagsterConnection",
|
||||||
|
"description": "Dagster Metadata Database Connection Config",
|
||||||
|
"type": "object",
|
||||||
|
"javaType": "org.openmetadata.catalog.services.connections.pipeline.DagsterConnection",
|
||||||
|
"definitions": {
|
||||||
|
"DagsterType": {
|
||||||
|
"description": "Service type.",
|
||||||
|
"type": "string",
|
||||||
|
"enum": ["Dagster"],
|
||||||
|
"default": "Dagster"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"properties": {
|
||||||
|
"type": {
|
||||||
|
"title": "Service Type",
|
||||||
|
"description": "Service Type",
|
||||||
|
"$ref": "#/definitions/DagsterType",
|
||||||
|
"default": "Dagster"
|
||||||
|
},
|
||||||
|
"hostPort": {
|
||||||
|
"title": "Host And Port",
|
||||||
|
"description": "Pipeline Service Management/UI URI.",
|
||||||
|
"type": "string",
|
||||||
|
"format": "uri"
|
||||||
|
},
|
||||||
|
"numberOfStatus": {
|
||||||
|
"description": "Pipeline Service Number Of Status",
|
||||||
|
"type": "integer",
|
||||||
|
"default": "10"
|
||||||
|
},
|
||||||
|
"dbConnection": {
|
||||||
|
"title": "Metadata Database Connection",
|
||||||
|
"description": "Underlying database connection. See https://docs.dagster.io/getting-started for supported backends.",
|
||||||
|
"oneOf": [
|
||||||
|
{
|
||||||
|
"$ref": "../database/mysqlConnection.json"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"$ref": "../database/postgresConnection.json"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"$ref": "../database/sqliteConnection.json"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"supportsMetadataExtraction": {
|
||||||
|
"title": "Supports Metadata Extraction",
|
||||||
|
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"additionalProperties": false,
|
||||||
|
"required": ["hostPort", "dbConnection"]
|
||||||
|
}
|
@ -14,7 +14,7 @@
|
|||||||
"description": "Type of pipeline service - Airflow or Prefect.",
|
"description": "Type of pipeline service - Airflow or Prefect.",
|
||||||
"type": "string",
|
"type": "string",
|
||||||
"javaInterfaces": ["org.openmetadata.catalog.EnumInterface"],
|
"javaInterfaces": ["org.openmetadata.catalog.EnumInterface"],
|
||||||
"enum": ["Airflow", "Glue", "Airbyte", "Fivetran"],
|
"enum": ["Airflow", "Glue", "Airbyte", "Fivetran", "Dagster"],
|
||||||
"javaEnums": [
|
"javaEnums": [
|
||||||
{
|
{
|
||||||
"name": "Airflow"
|
"name": "Airflow"
|
||||||
@ -27,6 +27,9 @@
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
"name": "Fivetran"
|
"name": "Fivetran"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Dagster"
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
@ -52,6 +55,9 @@
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
"$ref": "./connections/pipeline/fivetranConnection.json"
|
"$ref": "./connections/pipeline/fivetranConnection.json"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"$ref": "./connections/pipeline/dagsterConnection.json"
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
@ -7,5 +7,5 @@ Provides metadata version information.
|
|||||||
|
|
||||||
from incremental import Version
|
from incremental import Version
|
||||||
|
|
||||||
__version__ = Version("metadata", 0, 12, 0, dev=10)
|
__version__ = Version("metadata", 0, 12, 0, dev=11)
|
||||||
__all__ = ["__version__"]
|
__all__ = ["__version__"]
|
||||||
|
26
ingestion/examples/workflows/dagster.yaml
Normal file
26
ingestion/examples/workflows/dagster.yaml
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
source:
|
||||||
|
type: dagster
|
||||||
|
serviceName: dagster_source
|
||||||
|
serviceConnection:
|
||||||
|
config:
|
||||||
|
type: Dagster
|
||||||
|
hostPort: http://localhost:8080
|
||||||
|
numberOfStatus: 10
|
||||||
|
dbConnection:
|
||||||
|
type: Mysql
|
||||||
|
username: dagster_user
|
||||||
|
password: dagter_pass
|
||||||
|
databaseSchema: dagster_db
|
||||||
|
hostPort: localhost:3306
|
||||||
|
sourceConfig:
|
||||||
|
config:
|
||||||
|
type: PipelineMetadata
|
||||||
|
sink:
|
||||||
|
type: metadata-rest
|
||||||
|
config: { }
|
||||||
|
workflowConfig:
|
||||||
|
loggerLevel: INFO
|
||||||
|
openMetadataServerConfig:
|
||||||
|
hostPort: http://localhost:8585/api
|
||||||
|
authProvider: no-auth
|
||||||
|
|
@ -70,6 +70,7 @@ plugins: Dict[str, Set[str]] = {
|
|||||||
"bigquery-usage": {"google-cloud-logging", "cachetools"},
|
"bigquery-usage": {"google-cloud-logging", "cachetools"},
|
||||||
"docker": {"python_on_whales==0.34.0"},
|
"docker": {"python_on_whales==0.34.0"},
|
||||||
"backup": {"boto3~=1.19.12"},
|
"backup": {"boto3~=1.19.12"},
|
||||||
|
"dagster": {"pymysql>=1.0.2", "psycopg2-binary", "GeoAlchemy2"},
|
||||||
"datalake": {
|
"datalake": {
|
||||||
"google-cloud-storage==1.43.0",
|
"google-cloud-storage==1.43.0",
|
||||||
"pandas==1.3.5",
|
"pandas==1.3.5",
|
||||||
|
@ -121,3 +121,9 @@ class MlflowClientWrapper:
|
|||||||
class FivetranClient:
|
class FivetranClient:
|
||||||
def __init__(self, client) -> None:
|
def __init__(self, client) -> None:
|
||||||
self.client = client
|
self.client = client
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class DagsterClient:
|
||||||
|
def __init__(self, client) -> None:
|
||||||
|
self.client = client
|
||||||
|
183
ingestion/src/metadata/ingestion/source/pipeline/dagster.py
Normal file
183
ingestion/src/metadata/ingestion/source/pipeline/dagster.py
Normal file
@ -0,0 +1,183 @@
|
|||||||
|
# Copyright 2021 Collate
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
"""
|
||||||
|
Dagster source to extract metadata from OM UI
|
||||||
|
"""
|
||||||
|
import json
|
||||||
|
from collections.abc import Iterable
|
||||||
|
from typing import Dict, Iterable, Optional
|
||||||
|
|
||||||
|
from sqlalchemy import text
|
||||||
|
from sqlalchemy.engine.base import Engine
|
||||||
|
from sqlalchemy.orm import Session
|
||||||
|
|
||||||
|
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
|
||||||
|
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
|
||||||
|
from metadata.generated.schema.entity.data.pipeline import (
|
||||||
|
Pipeline,
|
||||||
|
PipelineStatus,
|
||||||
|
StatusType,
|
||||||
|
Task,
|
||||||
|
TaskStatus,
|
||||||
|
)
|
||||||
|
from metadata.generated.schema.entity.data.table import Table
|
||||||
|
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
||||||
|
OpenMetadataConnection,
|
||||||
|
)
|
||||||
|
from metadata.generated.schema.entity.services.connections.pipeline.dagsterConnection import (
|
||||||
|
DagsterConnection,
|
||||||
|
)
|
||||||
|
from metadata.generated.schema.metadataIngestion.workflow import (
|
||||||
|
Source as WorkflowSource,
|
||||||
|
)
|
||||||
|
from metadata.generated.schema.type.entityLineage import EntitiesEdge
|
||||||
|
from metadata.generated.schema.type.entityReference import EntityReference
|
||||||
|
from metadata.ingestion.api.source import InvalidSourceException
|
||||||
|
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
|
||||||
|
from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource
|
||||||
|
from metadata.utils.connections import (
|
||||||
|
create_and_bind_session,
|
||||||
|
get_connection,
|
||||||
|
test_connection,
|
||||||
|
)
|
||||||
|
from metadata.utils.helpers import datetime_to_ts
|
||||||
|
from metadata.utils.logger import ingestion_logger
|
||||||
|
|
||||||
|
logger = ingestion_logger()
|
||||||
|
|
||||||
|
STATUS_MAP = {
|
||||||
|
"success": StatusType.Successful.value,
|
||||||
|
"failed": StatusType.Failed.value,
|
||||||
|
"queued": StatusType.Pending.value,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class DagsterSource(PipelineServiceSource):
|
||||||
|
"""
|
||||||
|
Implements the necessary methods ot extract
|
||||||
|
Pipeline metadata from Dagster's metadata db
|
||||||
|
"""
|
||||||
|
|
||||||
|
config: WorkflowSource
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
config: WorkflowSource,
|
||||||
|
metadata_config: OpenMetadataConnection,
|
||||||
|
):
|
||||||
|
self._session = None
|
||||||
|
self.service_connection = config.serviceConnection.__root__.config
|
||||||
|
self.engine: Engine = get_connection(self.service_connection.dbConnection)
|
||||||
|
super().__init__(config, metadata_config)
|
||||||
|
# Create the connection to the database
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
|
||||||
|
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
|
||||||
|
connection: DagsterConnection = config.serviceConnection.__root__.config
|
||||||
|
if not isinstance(connection, DagsterConnection):
|
||||||
|
raise InvalidSourceException(
|
||||||
|
f"Expected DagsterConnection, but got {connection}"
|
||||||
|
)
|
||||||
|
return cls(config, metadata_config)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def session(self) -> Session:
|
||||||
|
"""
|
||||||
|
Return the SQLAlchemy session from the engine
|
||||||
|
"""
|
||||||
|
if not self._session:
|
||||||
|
self._session = create_and_bind_session(self.engine)
|
||||||
|
|
||||||
|
return self._session
|
||||||
|
|
||||||
|
def get_run_list(self):
|
||||||
|
|
||||||
|
run_list = self.session.execute("select * from runs")
|
||||||
|
return run_list
|
||||||
|
|
||||||
|
def yield_pipeline(self, pipeline_details) -> Iterable[CreatePipelineRequest]:
|
||||||
|
"""
|
||||||
|
Convert a DAG into a Pipeline Entity
|
||||||
|
:param serialized_dag: SerializedDAG from dagster metadata DB
|
||||||
|
:return: Create Pipeline request with tasks
|
||||||
|
"""
|
||||||
|
|
||||||
|
task_list = [{"name": row["pipeline_name"]} for row in self.get_run_list()]
|
||||||
|
run_body = json.loads(pipeline_details["run_body"])
|
||||||
|
location_name = run_body["external_pipeline_origin"][
|
||||||
|
"external_repository_origin"
|
||||||
|
]["repository_location_origin"]["location_name"]
|
||||||
|
repository_name = run_body["external_pipeline_origin"][
|
||||||
|
"external_repository_origin"
|
||||||
|
]["repository_name"]
|
||||||
|
pipeline_url = f"/workspace/{repository_name}@{location_name}/jobs/{pipeline_details.pipeline_name}/"
|
||||||
|
yield CreatePipelineRequest(
|
||||||
|
name=pipeline_details.pipeline_name,
|
||||||
|
description=pipeline_details.pipeline_name,
|
||||||
|
pipelineUrl=pipeline_url,
|
||||||
|
pipelineLocation=location_name,
|
||||||
|
startDate=pipeline_details.create_timestamp,
|
||||||
|
tasks=task_list,
|
||||||
|
service=EntityReference(
|
||||||
|
id=self.context.pipeline_service.id.__root__, type="pipelineService"
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
def yield_pipeline_status(self, pipeline_details) -> OMetaPipelineStatus:
|
||||||
|
run_list = self.get_run_list()
|
||||||
|
for run in run_list:
|
||||||
|
log_link = f"{self.service_connection.hostPort}/instance/runs/{run.run_id}"
|
||||||
|
task_status = TaskStatus(
|
||||||
|
name=run["pipeline_name"],
|
||||||
|
executionStatus=STATUS_MAP.get(
|
||||||
|
run["status"].lower(), StatusType.Pending.value
|
||||||
|
),
|
||||||
|
startTime=datetime_to_ts(run[9]),
|
||||||
|
endTime=datetime_to_ts(run[10]),
|
||||||
|
logLink=log_link,
|
||||||
|
)
|
||||||
|
pipeline_status = PipelineStatus(
|
||||||
|
taskStatus=[task_status],
|
||||||
|
executionStatus=STATUS_MAP.get(
|
||||||
|
run["status"].lower(), StatusType.Pending.value
|
||||||
|
),
|
||||||
|
timestamp=run[10].timestamp(),
|
||||||
|
)
|
||||||
|
yield OMetaPipelineStatus(
|
||||||
|
pipeline_fqn=self.context.pipeline.fullyQualifiedName.__root__,
|
||||||
|
pipeline_status=pipeline_status,
|
||||||
|
)
|
||||||
|
|
||||||
|
def yield_pipeline_lineage_details(
|
||||||
|
self, pipeline_details
|
||||||
|
) -> Optional[Iterable[AddLineageRequest]]:
|
||||||
|
"""
|
||||||
|
Not implemented, as this connector does not create any lineage
|
||||||
|
"""
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
self.session.close()
|
||||||
|
|
||||||
|
def test_connection(self) -> None:
|
||||||
|
test_connection(self.engine)
|
||||||
|
|
||||||
|
def get_pipelines_list(self) -> Dict:
|
||||||
|
|
||||||
|
results = self.engine.execute(text("SELECT * from runs"))
|
||||||
|
for result in results:
|
||||||
|
yield result
|
||||||
|
|
||||||
|
def get_pipeline_name(self, pipeline_details) -> str:
|
||||||
|
"""
|
||||||
|
Get Pipeline Name
|
||||||
|
"""
|
||||||
|
return pipeline_details["pipeline_name"]
|
@ -113,6 +113,9 @@ from metadata.generated.schema.entity.services.connections.pipeline.airflowConne
|
|||||||
from metadata.generated.schema.entity.services.connections.pipeline.backendConnection import (
|
from metadata.generated.schema.entity.services.connections.pipeline.backendConnection import (
|
||||||
BackendConnection,
|
BackendConnection,
|
||||||
)
|
)
|
||||||
|
from metadata.generated.schema.entity.services.connections.pipeline.dagsterConnection import (
|
||||||
|
DagsterConnection,
|
||||||
|
)
|
||||||
from metadata.generated.schema.entity.services.connections.pipeline.fivetranConnection import (
|
from metadata.generated.schema.entity.services.connections.pipeline.fivetranConnection import (
|
||||||
FivetranConnection,
|
FivetranConnection,
|
||||||
)
|
)
|
||||||
@ -849,3 +852,23 @@ def _(_: BackendConnection, verbose: bool = False):
|
|||||||
|
|
||||||
with settings.Session() as session:
|
with settings.Session() as session:
|
||||||
return session.get_bind()
|
return session.get_bind()
|
||||||
|
|
||||||
|
|
||||||
|
@test_connection.register
|
||||||
|
def _(connection: DagsterConnection) -> None:
|
||||||
|
try:
|
||||||
|
test_connection(connection.dbConnection)
|
||||||
|
except Exception as err:
|
||||||
|
raise SourceConnectionException(
|
||||||
|
f"Unknown error connecting with {connection} - {err}."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@get_connection.register
|
||||||
|
def _(connection: DagsterConnection) -> None:
|
||||||
|
try:
|
||||||
|
return get_connection(connection.dbConnection)
|
||||||
|
except Exception as err:
|
||||||
|
raise SourceConnectionException(
|
||||||
|
f"Unknown error connecting with {connection} - {err}."
|
||||||
|
)
|
||||||
|
85
ingestion/tests/unit/resources/datasets/dagster_dataset.json
Normal file
85
ingestion/tests/unit/resources/datasets/dagster_dataset.json
Normal file
@ -0,0 +1,85 @@
|
|||||||
|
[
|
||||||
|
1,
|
||||||
|
"a6ebb16c-505f-446d-8642-171c3320ccef",
|
||||||
|
"db1439a4208a2059caa193e7cbd39d9de7292f2a",
|
||||||
|
"do_it_all_with_default_config",
|
||||||
|
null,
|
||||||
|
"SUCCESS",
|
||||||
|
{
|
||||||
|
"__class__": "PipelineRun",
|
||||||
|
"asset_selection": null,
|
||||||
|
"execution_plan_snapshot_id": "a9609467aa63cf0306dd462b6e760ee51aa5cc52",
|
||||||
|
"external_pipeline_origin": {
|
||||||
|
"__class__": "ExternalPipelineOrigin",
|
||||||
|
"external_repository_origin": {
|
||||||
|
"__class__": "ExternalRepositoryOrigin",
|
||||||
|
"repository_location_origin": {
|
||||||
|
"__class__": "ManagedGrpcPythonEnvRepositoryLocationOrigin",
|
||||||
|
"loadable_target_origin": {
|
||||||
|
"__class__": "LoadableTargetOrigin",
|
||||||
|
"attribute": null,
|
||||||
|
"executable_path": null,
|
||||||
|
"module_name": null,
|
||||||
|
"package_name": null,
|
||||||
|
"python_file": "/opt/dagster/app/cereal.py",
|
||||||
|
"working_directory": null
|
||||||
|
},
|
||||||
|
"location_name": "cereal.py"
|
||||||
|
},
|
||||||
|
"repository_name": "__repository__do_it_all_with_default_config"
|
||||||
|
},
|
||||||
|
"pipeline_name": "do_it_all_with_default_config"
|
||||||
|
},
|
||||||
|
"mode": "default",
|
||||||
|
"parent_run_id": null,
|
||||||
|
"pipeline_code_origin": {
|
||||||
|
"__class__": "PipelinePythonOrigin",
|
||||||
|
"pipeline_name": "do_it_all_with_default_config",
|
||||||
|
"repository_origin": {
|
||||||
|
"__class__": "RepositoryPythonOrigin",
|
||||||
|
"code_pointer": {
|
||||||
|
"__class__": "FileCodePointer",
|
||||||
|
"fn_name": "do_it_all_with_default_config",
|
||||||
|
"python_file": "/opt/dagster/app/cereal.py",
|
||||||
|
"working_directory": "/opt/dagster/app"
|
||||||
|
},
|
||||||
|
"container_context": {},
|
||||||
|
"container_image": null,
|
||||||
|
"entry_point": [
|
||||||
|
"dagster"
|
||||||
|
],
|
||||||
|
"executable_path": "/usr/local/bin/python"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"pipeline_name": "do_it_all_with_default_config",
|
||||||
|
"pipeline_snapshot_id": "db1439a4208a2059caa193e7cbd39d9de7292f2a",
|
||||||
|
"root_run_id": null,
|
||||||
|
"run_config": {
|
||||||
|
"ops": {
|
||||||
|
"do_something": {
|
||||||
|
"config": {
|
||||||
|
"config_param": "stuff"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"run_id": "a6ebb16c-505f-446d-8642-171c3320ccef",
|
||||||
|
"solid_selection": null,
|
||||||
|
"solids_to_execute": null,
|
||||||
|
"status": {
|
||||||
|
"__enum__": "PipelineRunStatus.SUCCESS"
|
||||||
|
},
|
||||||
|
"step_keys_to_execute": null,
|
||||||
|
"tags": {
|
||||||
|
".dagster/grpc_info": {"host": "localhost", "socket": "/tmp/tmp09vd0qsu"},
|
||||||
|
"dagster/preset_name": "default",
|
||||||
|
"dagster/solid_selection": "*"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
"2022-07-25T05:55:02.292119Z",
|
||||||
|
"2022-07-25T08:34:31.425131Z",
|
||||||
|
1659616628.404192,
|
||||||
|
1659616630.256226
|
||||||
|
]
|
186
ingestion/tests/unit/topology/pipeline/test_dagster.py
Normal file
186
ingestion/tests/unit/topology/pipeline/test_dagster.py
Normal file
@ -0,0 +1,186 @@
|
|||||||
|
# Copyright 2021 Collate
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
"""
|
||||||
|
Test Dagster using the topology
|
||||||
|
"""
|
||||||
|
import json
|
||||||
|
from pathlib import Path
|
||||||
|
from unittest import TestCase
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
|
||||||
|
from metadata.generated.schema.entity.data.pipeline import (
|
||||||
|
Pipeline,
|
||||||
|
PipelineStatus,
|
||||||
|
StatusType,
|
||||||
|
Task,
|
||||||
|
TaskStatus,
|
||||||
|
)
|
||||||
|
from metadata.generated.schema.entity.services.pipelineService import (
|
||||||
|
PipelineConnection,
|
||||||
|
PipelineService,
|
||||||
|
PipelineServiceType,
|
||||||
|
)
|
||||||
|
from metadata.generated.schema.metadataIngestion.workflow import (
|
||||||
|
OpenMetadataWorkflowConfig,
|
||||||
|
)
|
||||||
|
from metadata.generated.schema.type.entityReference import EntityReference
|
||||||
|
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
|
||||||
|
from metadata.ingestion.source.pipeline.dagster import DagsterSource
|
||||||
|
|
||||||
|
mock_file_path = (
|
||||||
|
Path(__file__).parent.parent.parent / "resources/datasets/dagster_dataset.json"
|
||||||
|
)
|
||||||
|
with open(mock_file_path) as file:
|
||||||
|
mock_data: dict = json.load(file)
|
||||||
|
|
||||||
|
mock_dagster_config = {
|
||||||
|
"source": {
|
||||||
|
"type": "dagster",
|
||||||
|
"serviceName": "dagster_source",
|
||||||
|
"serviceConnection": {
|
||||||
|
"config": {
|
||||||
|
"type": "Dagster",
|
||||||
|
"hostPort": "http://localhost:3000",
|
||||||
|
"dbConnection": {
|
||||||
|
"username": "dagster_user",
|
||||||
|
"password": "dagter_pass",
|
||||||
|
"databaseSchema": "dagster_db",
|
||||||
|
"hostPort": "localhost:3306",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"sourceConfig": {"config": {"type": "PipelineMetadata"}},
|
||||||
|
},
|
||||||
|
"sink": {"type": "metadata-rest", "config": {}},
|
||||||
|
"workflowConfig": {
|
||||||
|
"openMetadataServerConfig": {
|
||||||
|
"hostPort": "http://localhost:8585/api",
|
||||||
|
"authProvider": "no-auth",
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
EXPECTED_DAGSTER_DETAILS = mock_data[6]["pipeline_code_origin"]
|
||||||
|
|
||||||
|
MOCK_CONNECTION_URI_PATH = "/workspace/__repository__do_it_all_with_default_config@cereal.py/jobs/do_it_all_with_default_config/"
|
||||||
|
MOCK_LOG_URL = (
|
||||||
|
"http://localhost:8080/instance/runs/a6ebb16c-505f-446d-8642-171c3320ccef"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
EXPECTED_PIPELINE_STATUS = [
|
||||||
|
OMetaPipelineStatus(
|
||||||
|
pipeline_fqn="dagster_source.a10f6d82-4fc6-4c90-ba04-bb773c8fbb0f",
|
||||||
|
pipeline_status=PipelineStatus(
|
||||||
|
executionStatus=StatusType.Pending.value,
|
||||||
|
taskStatus=[
|
||||||
|
TaskStatus(
|
||||||
|
name="a10f6d82-4fc6-4c90-ba04-bb773c8fbb0f",
|
||||||
|
executionStatus=StatusType.Pending.value,
|
||||||
|
startTime=1659616627124,
|
||||||
|
endTime=1659616635858,
|
||||||
|
logLink=f"{MOCK_LOG_URL}/status",
|
||||||
|
)
|
||||||
|
],
|
||||||
|
timestamp=1659616635858,
|
||||||
|
),
|
||||||
|
),
|
||||||
|
OMetaPipelineStatus(
|
||||||
|
pipeline_fqn="dagster_source.a10f6d82-4fc6-4c90-ba04-bb773c8fbb0f",
|
||||||
|
pipeline_status=PipelineStatus(
|
||||||
|
executionStatus=StatusType.Successful.value,
|
||||||
|
taskStatus=[
|
||||||
|
TaskStatus(
|
||||||
|
name="a10f6d82-4fc6-4c90-ba04-bb773c8fbb0f",
|
||||||
|
executionStatus=StatusType.Successful.value,
|
||||||
|
startTime=1655393914,
|
||||||
|
endTime=1655394054,
|
||||||
|
logLink=f"{MOCK_LOG_URL}/status",
|
||||||
|
)
|
||||||
|
],
|
||||||
|
timestamp=1655393914,
|
||||||
|
),
|
||||||
|
),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
EXPECTED_CREATED_PIPELINES = CreatePipelineRequest(
|
||||||
|
name="a6ebb16c-505f-446d-8642-171c3320ccef",
|
||||||
|
displayName="do_it_all_with_default_configs",
|
||||||
|
description="do_it_all_with_default_config",
|
||||||
|
pipelineUrl=MOCK_CONNECTION_URI_PATH,
|
||||||
|
tasks=[
|
||||||
|
Task(
|
||||||
|
name="do_it_all_with_default_config",
|
||||||
|
displayName="",
|
||||||
|
description="",
|
||||||
|
taskUrl="",
|
||||||
|
)
|
||||||
|
],
|
||||||
|
service=EntityReference(
|
||||||
|
id="85811038-099a-11ed-861d-0242ac120002", type="pipelineService"
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
MOCK_PIPELINE_SERVICE = PipelineService(
|
||||||
|
id="86ff3c40-7c51-4ff5-9727-738cead28d9a",
|
||||||
|
name="dagster_source_test",
|
||||||
|
connection=PipelineConnection(),
|
||||||
|
serviceType=PipelineServiceType.Dagster,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
MOCK_PIPELINE = Pipeline(
|
||||||
|
id="a58b1856-729c-493b-bc87-6d2269b43ec0",
|
||||||
|
name="do_it_all_with_default_config",
|
||||||
|
fullyQualifiedName="dagster_source.do_it_all_with_default_config",
|
||||||
|
displayName="do_it_all_with_default_config",
|
||||||
|
description="",
|
||||||
|
pipelineUrl=MOCK_CONNECTION_URI_PATH,
|
||||||
|
tasks=[
|
||||||
|
Task(
|
||||||
|
name="a58b1856-729c-493b-bc87-6d2269b43ec0",
|
||||||
|
displayName="do_it_all_with_default_config",
|
||||||
|
description="",
|
||||||
|
taskUrl="",
|
||||||
|
)
|
||||||
|
],
|
||||||
|
service=EntityReference(
|
||||||
|
id="85811038-099a-11ed-861d-0242ac120002", type="pipelineService"
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class DagsterUnitTest(TestCase):
|
||||||
|
@patch("metadata.ingestion.source.pipeline.pipeline_service.test_connection")
|
||||||
|
@patch("sqlalchemy.engine.base.Engine.connect")
|
||||||
|
def __init__(self, methodName, mock_connect, test_connection) -> None:
|
||||||
|
super().__init__(methodName)
|
||||||
|
test_connection.return_value = False
|
||||||
|
|
||||||
|
config = OpenMetadataWorkflowConfig.parse_obj(mock_dagster_config)
|
||||||
|
|
||||||
|
self.dagster = DagsterSource.create(
|
||||||
|
mock_dagster_config["source"],
|
||||||
|
config.workflowConfig.openMetadataServerConfig,
|
||||||
|
)
|
||||||
|
self.engine = mock_connect.return_value
|
||||||
|
self.dagster.context.__dict__["pipeline"] = MOCK_PIPELINE
|
||||||
|
self.dagster.context.__dict__["pipeline_service"] = MOCK_PIPELINE_SERVICE
|
||||||
|
|
||||||
|
def test_pipeline_name(self):
|
||||||
|
|
||||||
|
assert (
|
||||||
|
self.dagster.get_pipeline_name(EXPECTED_DAGSTER_DETAILS)
|
||||||
|
== mock_data[6]["pipeline_code_origin"]["pipeline_name"]
|
||||||
|
)
|
@ -0,0 +1,57 @@
|
|||||||
|
{
|
||||||
|
"$id": "https://open-metadata.org/schema/entity/services/connections/pipeline/dagsterConnection.json",
|
||||||
|
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||||
|
"title": "DagsterConnection",
|
||||||
|
"description": "Dagster Metadata Database Connection Config",
|
||||||
|
"type": "object",
|
||||||
|
"javaType": "org.openmetadata.catalog.services.connections.pipeline.DagsterConnection",
|
||||||
|
"definitions": {
|
||||||
|
"DagsterType": {
|
||||||
|
"description": "Service type.",
|
||||||
|
"type": "string",
|
||||||
|
"enum": ["Dagster"],
|
||||||
|
"default": "Dagster"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"properties": {
|
||||||
|
"type": {
|
||||||
|
"title": "Service Type",
|
||||||
|
"description": "Service Type",
|
||||||
|
"$ref": "#/definitions/DagsterType",
|
||||||
|
"default": "Dagster"
|
||||||
|
},
|
||||||
|
"hostPort": {
|
||||||
|
"title": "Host And Port",
|
||||||
|
"description": "Pipeline Service Management/UI URI.",
|
||||||
|
"type": "string",
|
||||||
|
"format": "uri"
|
||||||
|
},
|
||||||
|
"numberOfStatus": {
|
||||||
|
"description": "Pipeline Service Number Of Status",
|
||||||
|
"type": "integer",
|
||||||
|
"default": "10"
|
||||||
|
},
|
||||||
|
"connection": {
|
||||||
|
"title": "Metadata Database Connection",
|
||||||
|
"description": "Underlying database connection. See https://docs.dagster.io/getting-started for supported backends.",
|
||||||
|
"oneOf": [
|
||||||
|
{
|
||||||
|
"$ref": "../database/mysqlConnection.json"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"$ref": "../database/postgresConnection.json"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"$ref": "../database/sqliteConnection.json"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"supportsMetadataExtraction": {
|
||||||
|
"title": "Supports Metadata Extraction",
|
||||||
|
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"additionalProperties": false,
|
||||||
|
"required": ["hostPort", "connection"]
|
||||||
|
}
|
||||||
|
|
@ -13,7 +13,7 @@
|
|||||||
"javaInterfaces": [
|
"javaInterfaces": [
|
||||||
"org.openmetadata.core.entity.interfaces.EnumInterface"
|
"org.openmetadata.core.entity.interfaces.EnumInterface"
|
||||||
],
|
],
|
||||||
"enum": ["Airflow", "Prefect", "Glue", "Generic", "Airbyte"],
|
"enum": ["Airflow", "Prefect", "Glue", "Generic", "Airbyte", "Dagster"],
|
||||||
"javaEnums": [
|
"javaEnums": [
|
||||||
{
|
{
|
||||||
"name": "Airflow"
|
"name": "Airflow"
|
||||||
@ -29,6 +29,9 @@
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
"name": "Airbyte"
|
"name": "Airbyte"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Dagster"
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
@ -49,6 +52,9 @@
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
"$ref": "./connections/pipeline/airbyteConnection.json"
|
"$ref": "./connections/pipeline/airbyteConnection.json"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"$ref": "./connections/pipeline/dagsterConnection.json"
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
@ -16,6 +16,7 @@ import { COMMON_UI_SCHEMA } from '../constants/services.const';
|
|||||||
import { PipelineServiceType } from '../generated/entity/services/pipelineService';
|
import { PipelineServiceType } from '../generated/entity/services/pipelineService';
|
||||||
import airbyteConnection from '../jsons/connectionSchemas/connections/pipeline/airbyteConnection.json';
|
import airbyteConnection from '../jsons/connectionSchemas/connections/pipeline/airbyteConnection.json';
|
||||||
import airflowConnection from '../jsons/connectionSchemas/connections/pipeline/airflowConnection.json';
|
import airflowConnection from '../jsons/connectionSchemas/connections/pipeline/airflowConnection.json';
|
||||||
|
import dagsterConnection from '../jsons/connectionSchemas/connections/pipeline/dagsterConnection.json';
|
||||||
import fivetranConnection from '../jsons/connectionSchemas/connections/pipeline/fivetranConnection.json';
|
import fivetranConnection from '../jsons/connectionSchemas/connections/pipeline/fivetranConnection.json';
|
||||||
import glueConnection from '../jsons/connectionSchemas/connections/pipeline/glueConnection.json';
|
import glueConnection from '../jsons/connectionSchemas/connections/pipeline/glueConnection.json';
|
||||||
|
|
||||||
@ -42,6 +43,11 @@ export const getPipelineConfig = (type: PipelineServiceType) => {
|
|||||||
case PipelineServiceType.Fivetran: {
|
case PipelineServiceType.Fivetran: {
|
||||||
schema = fivetranConnection;
|
schema = fivetranConnection;
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case PipelineServiceType.Dagster: {
|
||||||
|
schema = dagsterConnection;
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user