diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/glueConnection.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/glueConnection.json index f0aa0a16072..8f6679f6da2 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/glueConnection.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/glueConnection.json @@ -29,11 +29,6 @@ "description": "AWS storageServiceName Name.", "type": "string" }, - "pipelineServiceName": { - "title": "Pipeline Service Name", - "description": "AWS pipelineServiceName Name.", - "type": "string" - }, "connectionOptions": { "title": "Connection Options", "$ref": "../connectionBasicType.json#/definitions/connectionOptions" @@ -48,7 +43,5 @@ } }, "additionalProperties": false, - "required": [ - "awsConfig", "storageServiceName", "pipelineServiceName" - ] + "required": ["awsConfig", "storageServiceName"] } diff --git a/ingestion/examples/sample_data/glue/database_service.json b/ingestion/examples/sample_data/glue/database_service.json index 39bfc5e1d53..385ea94d639 100644 --- a/ingestion/examples/sample_data/glue/database_service.json +++ b/ingestion/examples/sample_data/glue/database_service.json @@ -10,8 +10,7 @@ "awsRegion": "aws region", "endPointURL": "https://glue..amazonaws.com/" }, - "storageServiceName": "glue_s3", - "pipelineServiceName": "glue_etl" + "storageServiceName": "glue_s3" } }, "sourceConfig": { diff --git a/ingestion/examples/workflows/glue.yaml b/ingestion/examples/workflows/glue.yaml index 1c04ff5da3f..b78f7f43243 100644 --- a/ingestion/examples/workflows/glue.yaml +++ b/ingestion/examples/workflows/glue.yaml @@ -9,7 +9,6 @@ source: awsSecretAccessKey: aws secret access key awsRegion: aws region endPointURL: https://glue..amazonaws.com/ - database: local_glue_db storageServiceName: storage_name pipelineServiceName: pipeline_name sourceConfig: diff --git a/ingestion/examples/workflows/glue_pipeline.yaml b/ingestion/examples/workflows/glue_pipeline.yaml new file mode 100644 index 00000000000..f0d3bfcfb0c --- /dev/null +++ b/ingestion/examples/workflows/glue_pipeline.yaml @@ -0,0 +1,21 @@ +source: + type: glue + serviceName: local_glue + serviceConnection: + config: + type: Glue + awsConfig: + awsAccessKeyId: aws accessKey id + awsSecretAccessKey: aws secret access key + awsRegion: aws region + endPointURL: https://glue..amazonaws.com/ + sourceConfig: + config: + type: PipelineMetadata +sink: + type: metadata-rest + config: {} +workflowConfig: + openMetadataServerConfig: + hostPort: http://localhost:8585/api + authProvider: no-auth diff --git a/ingestion/src/metadata/ingestion/source/database/glue.py b/ingestion/src/metadata/ingestion/source/database/glue.py index c5de1b16ed1..c0c58459716 100755 --- a/ingestion/src/metadata/ingestion/source/database/glue.py +++ b/ingestion/src/metadata/ingestion/source/database/glue.py @@ -16,7 +16,7 @@ from typing import Iterable, List, Optional from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.location import Location, LocationType -from metadata.generated.schema.entity.data.pipeline import Pipeline, Task +from metadata.generated.schema.entity.data.pipeline import Task from metadata.generated.schema.entity.data.table import Column, Table, TableType from metadata.generated.schema.entity.services.connections.database.glueConnection import ( GlueConnection, @@ -24,14 +24,7 @@ from metadata.generated.schema.entity.services.connections.database.glueConnecti from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) -from metadata.generated.schema.entity.services.connections.pipeline.glueConnection import ( - GlueConnection as GluePipelineConnection, -) from metadata.generated.schema.entity.services.databaseService import DatabaseService -from metadata.generated.schema.entity.services.pipelineService import ( - PipelineConnection, - PipelineService, -) from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) @@ -71,22 +64,7 @@ class GlueSource(Source[Entity]): }, metadata_config, ) - self.task_id_mapping = {} - # Create a Glue Pipeline Connection based on the Database Connection details - self.pipeline_service = self.metadata.get_service_or_create( - entity=PipelineService, - config=WorkflowSource( - type="glue", - serviceName=self.service_connection.pipelineServiceName, - serviceConnection=PipelineConnection( - config=GluePipelineConnection( - awsConfig=self.service_connection.awsConfig - ), - ), - sourceConfig={}, - ), - ) self.connection = get_connection(self.service_connection) self.glue = self.connection.client @@ -109,7 +87,6 @@ class GlueSource(Source[Entity]): def next_record(self) -> Iterable[Entity]: yield from self.ingest_catalog() - yield from self.ingest_pipelines() def ingest_catalog(self) -> Iterable[Entity]: """ @@ -301,28 +278,6 @@ class GlueSource(Source[Entity]): ) return task_list - def ingest_pipelines(self) -> Iterable[OMetaDatabaseAndTable]: - try: - for workflow in self.glue.list_workflows()["Workflows"]: - jobs = self.glue.get_workflow(Name=workflow, IncludeGraph=True)[ - "Workflow" - ] - tasks = self.get_tasks(jobs) - pipeline_ev = Pipeline( - id=uuid.uuid4(), - name=jobs["Name"], - displayName=jobs["Name"], - description="", - tasks=tasks, - service=EntityReference( - id=self.pipeline_service.id, type="pipelineService" - ), - ) - yield pipeline_ev - except Exception as err: - logger.debug(traceback.format_exc()) - logger.error(err) - def close(self): pass diff --git a/ingestion/src/metadata/ingestion/source/pipeline/glue.py b/ingestion/src/metadata/ingestion/source/pipeline/glue.py new file mode 100644 index 00000000000..db258609680 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/pipeline/glue.py @@ -0,0 +1,135 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import traceback +import uuid +from typing import Iterable + +from metadata.generated.schema.entity.data.pipeline import Pipeline, Task +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.generated.schema.entity.services.connections.pipeline.glueConnection import ( + GlueConnection, +) +from metadata.generated.schema.entity.services.connections.pipeline.glueConnection import ( + GlueConnection as GluePipelineConnection, +) +from metadata.generated.schema.entity.services.pipelineService import ( + PipelineConnection, + PipelineService, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.api.common import Entity +from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus +from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.source.database.common_db_source import SQLSourceStatus +from metadata.utils.connections import get_connection, test_connection +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() + +GRAPH = "Graph" +NODES = "Nodes" +NAME = "Name" + + +class GlueSource(Source[Entity]): + def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection): + super().__init__() + self.status = SQLSourceStatus() + self.config = config + self.metadata_config = metadata_config + self.metadata = OpenMetadata(metadata_config) + + self.service_connection = self.config.serviceConnection.__root__.config + self.pipeline_service = self.metadata.get_service_or_create( + entity=PipelineService, + config=WorkflowSource( + type="glue", + serviceName=self.config.serviceName, + serviceConnection=PipelineConnection( + config=GluePipelineConnection( + awsConfig=self.service_connection.awsConfig + ), + ), + sourceConfig={}, + ), + ) + self.connection = get_connection(self.service_connection) + self.glue = self.connection.client + + @classmethod + def create(cls, config_dict, metadata_config: OpenMetadataConnection): + config: WorkflowSource = WorkflowSource.parse_obj(config_dict) + connection: GlueConnection = config.serviceConnection.__root__.config + if not isinstance(connection, GlueConnection): + raise InvalidSourceException( + f"Expected GlueConnection, but got {connection}" + ) + return cls(config, metadata_config) + + def prepare(self): + pass + + def next_record(self) -> Iterable[Entity]: + + yield from self.ingest_pipelines() + + def get_tasks(self, tasks): + task_list = [] + for task in tasks[GRAPH][NODES]: + task_list.append( + Task( + name=task[NAME], + displayName=task[NAME], + taskType=task["Type"], + downstreamTasks=self.get_downstream_tasks( + task["UniqueId"], tasks[GRAPH] + ), + ) + ) + return task_list + + def ingest_pipelines(self) -> Iterable[OMetaDatabaseAndTable]: + try: + for workflow in self.glue.list_workflows()["Workflows"]: + jobs = self.glue.get_workflow(Name=workflow, IncludeGraph=True)[ + "Workflow" + ] + tasks = self.get_tasks(jobs) + pipeline_ev = Pipeline( + id=uuid.uuid4(), + name=jobs[NAME], + displayName=jobs[NAME], + description="", + tasks=tasks, + service=EntityReference( + id=self.pipeline_service.id, type="pipelineService" + ), + ) + yield pipeline_ev + except Exception as err: + logger.debug(traceback.format_exc()) + logger.error(err) + + def close(self): + pass + + def get_status(self) -> SourceStatus: + return self.status + + def test_connection(self) -> None: + test_connection(self.connection) diff --git a/ingestion/src/metadata/utils/aws_client.py b/ingestion/src/metadata/utils/aws_client.py index 0280a8bb1d2..0ec3d0f589a 100644 --- a/ingestion/src/metadata/utils/aws_client.py +++ b/ingestion/src/metadata/utils/aws_client.py @@ -14,7 +14,11 @@ from typing import Any from boto3 import Session from metadata.generated.schema.security.credentials.awsCredentials import AWSCredentials -from metadata.utils.connection_clients import DynamoClient, GlueClient +from metadata.utils.connection_clients import ( + DynamoClient, + GlueDBClient, + GluePipelineClient, +) class AWSClient: @@ -69,5 +73,8 @@ class AWSClient: def get_dynomo_client(self) -> DynamoClient: return DynamoClient(self.get_resource("dynamodb")) - def get_glue_client(self) -> GlueClient: - return GlueClient(self.get_client("glue")) + def get_glue_db_client(self) -> GlueDBClient: + return GlueDBClient(self.get_client("glue")) + + def get_glue_pipeline_client(self) -> GluePipelineClient: + return GluePipelineClient(self.get_client("glue")) diff --git a/ingestion/src/metadata/utils/connection_clients.py b/ingestion/src/metadata/utils/connection_clients.py index 115a6193e44..bc902890563 100644 --- a/ingestion/src/metadata/utils/connection_clients.py +++ b/ingestion/src/metadata/utils/connection_clients.py @@ -19,7 +19,13 @@ if non-sqlalchemy package is not installed @dataclass -class GlueClient: +class GlueDBClient: + def __init__(self, client) -> None: + self.client = client + + +@dataclass +class GluePipelineClient: def __init__(self, client) -> None: self.client = client diff --git a/ingestion/src/metadata/utils/connections.py b/ingestion/src/metadata/utils/connections.py index c17d37cdb85..6346d3a81b3 100644 --- a/ingestion/src/metadata/utils/connections.py +++ b/ingestion/src/metadata/utils/connections.py @@ -69,7 +69,7 @@ from metadata.generated.schema.entity.services.connections.database.dynamoDBConn DynamoDBConnection, ) from metadata.generated.schema.entity.services.connections.database.glueConnection import ( - GlueConnection, + GlueConnection as GlueDBConnection, ) from metadata.generated.schema.entity.services.connections.database.salesforceConnection import ( SalesforceConnection, @@ -89,13 +89,17 @@ from metadata.generated.schema.entity.services.connections.pipeline.airflowConne from metadata.generated.schema.entity.services.connections.pipeline.backendConnection import ( BackendConnection, ) +from metadata.generated.schema.entity.services.connections.pipeline.glueConnection import ( + GlueConnection as GluePipelineConnection, +) from metadata.orm_profiler.orm.functions.conn_test import ConnTestFn from metadata.utils.connection_clients import ( AirByteClient, DatalakeClient, DeltaLakeClient, DynamoClient, - GlueClient, + GlueDBClient, + GluePipelineClient, KafkaClient, LookerClient, MetabaseClient, @@ -140,7 +144,14 @@ def create_generic_connection(connection, verbose: bool = False) -> Engine: @singledispatch def get_connection( connection, verbose: bool = False -) -> Union[Engine, DynamoClient, GlueClient, SalesforceClient, KafkaClient]: +) -> Union[ + Engine, + DynamoClient, + GlueDBClient, + GluePipelineClient, + SalesforceClient, + KafkaClient, +]: """ Given an SQL configuration, build the SQLAlchemy Engine """ @@ -212,10 +223,20 @@ def _(connection: DynamoDBConnection, verbose: bool = False) -> DynamoClient: @get_connection.register -def _(connection: GlueConnection, verbose: bool = False) -> GlueClient: +def _(connection: GlueDBConnection, verbose: bool = False) -> GlueDBClient: from metadata.utils.aws_client import AWSClient - glue_connection = AWSClient(connection.awsConfig).get_glue_client() + glue_connection = AWSClient(connection.awsConfig).get_glue_db_client() + return glue_connection + + +@get_connection.register +def _( + connection: GluePipelineConnection, verbose: bool = False +) -> GluePipelineConnection: + from metadata.utils.aws_client import AWSClient + + glue_connection = AWSClient(connection.awsConfig).get_glue_pipeline_client() return glue_connection @@ -360,7 +381,7 @@ def _(connection: DynamoClient) -> None: @test_connection.register -def _(connection: GlueClient) -> None: +def _(connection: GlueDBClient) -> None: """ Test that we can connect to the source using the given aws resource :param engine: boto cliet to test @@ -380,6 +401,27 @@ def _(connection: GlueClient) -> None: ) +@test_connection.register +def _(connection: GluePipelineClient) -> None: + """ + Test that we can connect to the source using the given aws resource + :param engine: boto cliet to test + :return: None or raise an exception if we cannot connect + """ + from botocore.client import ClientError + + try: + connection.client.get_paginator("get_databases") + except ClientError as err: + raise SourceConnectionException( + f"Connection error for {connection} - {err}. Check the connection details." + ) + except Exception as err: + raise SourceConnectionException( + f"Unknown error connecting with {connection} - {err}." + ) + + @test_connection.register def _(connection: SalesforceClient) -> None: from simple_salesforce.exceptions import SalesforceAuthenticationFailed diff --git a/ingestion/tests/unit/test_workflow_parse.py b/ingestion/tests/unit/test_workflow_parse.py index 6897e9e30ad..316c84c635a 100644 --- a/ingestion/tests/unit/test_workflow_parse.py +++ b/ingestion/tests/unit/test_workflow_parse.py @@ -176,7 +176,6 @@ class TestWorkflowParse(TestCase): "endPointURL": "https://glue..amazonaws.com/", }, "storageServiceName": "storage_name", - "pipelineServiceName": "pipeline_name", "random": "extra", } }, diff --git a/openmetadata-ui/src/main/resources/ui/cypress/integration/AddNewService/glue.spec.js b/openmetadata-ui/src/main/resources/ui/cypress/integration/AddNewService/glue.spec.js index 7d58264358a..a3cb1d354e3 100644 --- a/openmetadata-ui/src/main/resources/ui/cypress/integration/AddNewService/glue.spec.js +++ b/openmetadata-ui/src/main/resources/ui/cypress/integration/AddNewService/glue.spec.js @@ -32,9 +32,6 @@ describe('Glue Ingestion', () => { cy.get('#root_storageServiceName') .scrollIntoView() .type(Cypress.env('glueStorageServiceName')); - cy.get('#root_pipelineServiceName') - .scrollIntoView() - .type(Cypress.env('gluePipelineServiceName')); }; const addIngestionInput = () => {