FIX 5419: Glue pipeline connector not working (#5467)

FIX 5419: Glue pipeline connector not working (#5467)
This commit is contained in:
Milan Bariya 2022-06-27 15:08:41 +05:30 committed by GitHub
parent 049e4a5a91
commit 338d1efe40
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 224 additions and 71 deletions

View File

@ -29,11 +29,6 @@
"description": "AWS storageServiceName Name.",
"type": "string"
},
"pipelineServiceName": {
"title": "Pipeline Service Name",
"description": "AWS pipelineServiceName Name.",
"type": "string"
},
"connectionOptions": {
"title": "Connection Options",
"$ref": "../connectionBasicType.json#/definitions/connectionOptions"
@ -48,7 +43,5 @@
}
},
"additionalProperties": false,
"required": [
"awsConfig", "storageServiceName", "pipelineServiceName"
]
"required": ["awsConfig", "storageServiceName"]
}

View File

@ -10,8 +10,7 @@
"awsRegion": "aws region",
"endPointURL": "https://glue.<region_name>.amazonaws.com/"
},
"storageServiceName": "glue_s3",
"pipelineServiceName": "glue_etl"
"storageServiceName": "glue_s3"
}
},
"sourceConfig": {

View File

@ -9,7 +9,6 @@ source:
awsSecretAccessKey: aws secret access key
awsRegion: aws region
endPointURL: https://glue.<region_name>.amazonaws.com/
database: local_glue_db
storageServiceName: storage_name
pipelineServiceName: pipeline_name
sourceConfig:

View File

@ -0,0 +1,21 @@
source:
type: glue
serviceName: local_glue
serviceConnection:
config:
type: Glue
awsConfig:
awsAccessKeyId: aws accessKey id
awsSecretAccessKey: aws secret access key
awsRegion: aws region
endPointURL: https://glue.<region_name>.amazonaws.com/
sourceConfig:
config:
type: PipelineMetadata
sink:
type: metadata-rest
config: {}
workflowConfig:
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: no-auth

View File

@ -16,7 +16,7 @@ from typing import Iterable, List, Optional
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.location import Location, LocationType
from metadata.generated.schema.entity.data.pipeline import Pipeline, Task
from metadata.generated.schema.entity.data.pipeline import Task
from metadata.generated.schema.entity.data.table import Column, Table, TableType
from metadata.generated.schema.entity.services.connections.database.glueConnection import (
GlueConnection,
@ -24,14 +24,7 @@ from metadata.generated.schema.entity.services.connections.database.glueConnecti
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.connections.pipeline.glueConnection import (
GlueConnection as GluePipelineConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.services.pipelineService import (
PipelineConnection,
PipelineService,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
@ -71,22 +64,7 @@ class GlueSource(Source[Entity]):
},
metadata_config,
)
self.task_id_mapping = {}
# Create a Glue Pipeline Connection based on the Database Connection details
self.pipeline_service = self.metadata.get_service_or_create(
entity=PipelineService,
config=WorkflowSource(
type="glue",
serviceName=self.service_connection.pipelineServiceName,
serviceConnection=PipelineConnection(
config=GluePipelineConnection(
awsConfig=self.service_connection.awsConfig
),
),
sourceConfig={},
),
)
self.connection = get_connection(self.service_connection)
self.glue = self.connection.client
@ -109,7 +87,6 @@ class GlueSource(Source[Entity]):
def next_record(self) -> Iterable[Entity]:
yield from self.ingest_catalog()
yield from self.ingest_pipelines()
def ingest_catalog(self) -> Iterable[Entity]:
"""
@ -301,28 +278,6 @@ class GlueSource(Source[Entity]):
)
return task_list
def ingest_pipelines(self) -> Iterable[OMetaDatabaseAndTable]:
try:
for workflow in self.glue.list_workflows()["Workflows"]:
jobs = self.glue.get_workflow(Name=workflow, IncludeGraph=True)[
"Workflow"
]
tasks = self.get_tasks(jobs)
pipeline_ev = Pipeline(
id=uuid.uuid4(),
name=jobs["Name"],
displayName=jobs["Name"],
description="",
tasks=tasks,
service=EntityReference(
id=self.pipeline_service.id, type="pipelineService"
),
)
yield pipeline_ev
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(err)
def close(self):
pass

View File

@ -0,0 +1,135 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import traceback
import uuid
from typing import Iterable
from metadata.generated.schema.entity.data.pipeline import Pipeline, Task
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.connections.pipeline.glueConnection import (
GlueConnection,
)
from metadata.generated.schema.entity.services.connections.pipeline.glueConnection import (
GlueConnection as GluePipelineConnection,
)
from metadata.generated.schema.entity.services.pipelineService import (
PipelineConnection,
PipelineService,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.common_db_source import SQLSourceStatus
from metadata.utils.connections import get_connection, test_connection
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
GRAPH = "Graph"
NODES = "Nodes"
NAME = "Name"
class GlueSource(Source[Entity]):
def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection):
super().__init__()
self.status = SQLSourceStatus()
self.config = config
self.metadata_config = metadata_config
self.metadata = OpenMetadata(metadata_config)
self.service_connection = self.config.serviceConnection.__root__.config
self.pipeline_service = self.metadata.get_service_or_create(
entity=PipelineService,
config=WorkflowSource(
type="glue",
serviceName=self.config.serviceName,
serviceConnection=PipelineConnection(
config=GluePipelineConnection(
awsConfig=self.service_connection.awsConfig
),
),
sourceConfig={},
),
)
self.connection = get_connection(self.service_connection)
self.glue = self.connection.client
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: GlueConnection = config.serviceConnection.__root__.config
if not isinstance(connection, GlueConnection):
raise InvalidSourceException(
f"Expected GlueConnection, but got {connection}"
)
return cls(config, metadata_config)
def prepare(self):
pass
def next_record(self) -> Iterable[Entity]:
yield from self.ingest_pipelines()
def get_tasks(self, tasks):
task_list = []
for task in tasks[GRAPH][NODES]:
task_list.append(
Task(
name=task[NAME],
displayName=task[NAME],
taskType=task["Type"],
downstreamTasks=self.get_downstream_tasks(
task["UniqueId"], tasks[GRAPH]
),
)
)
return task_list
def ingest_pipelines(self) -> Iterable[OMetaDatabaseAndTable]:
try:
for workflow in self.glue.list_workflows()["Workflows"]:
jobs = self.glue.get_workflow(Name=workflow, IncludeGraph=True)[
"Workflow"
]
tasks = self.get_tasks(jobs)
pipeline_ev = Pipeline(
id=uuid.uuid4(),
name=jobs[NAME],
displayName=jobs[NAME],
description="",
tasks=tasks,
service=EntityReference(
id=self.pipeline_service.id, type="pipelineService"
),
)
yield pipeline_ev
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(err)
def close(self):
pass
def get_status(self) -> SourceStatus:
return self.status
def test_connection(self) -> None:
test_connection(self.connection)

View File

@ -14,7 +14,11 @@ from typing import Any
from boto3 import Session
from metadata.generated.schema.security.credentials.awsCredentials import AWSCredentials
from metadata.utils.connection_clients import DynamoClient, GlueClient
from metadata.utils.connection_clients import (
DynamoClient,
GlueDBClient,
GluePipelineClient,
)
class AWSClient:
@ -69,5 +73,8 @@ class AWSClient:
def get_dynomo_client(self) -> DynamoClient:
return DynamoClient(self.get_resource("dynamodb"))
def get_glue_client(self) -> GlueClient:
return GlueClient(self.get_client("glue"))
def get_glue_db_client(self) -> GlueDBClient:
return GlueDBClient(self.get_client("glue"))
def get_glue_pipeline_client(self) -> GluePipelineClient:
return GluePipelineClient(self.get_client("glue"))

View File

@ -19,7 +19,13 @@ if non-sqlalchemy package is not installed
@dataclass
class GlueClient:
class GlueDBClient:
def __init__(self, client) -> None:
self.client = client
@dataclass
class GluePipelineClient:
def __init__(self, client) -> None:
self.client = client

View File

@ -69,7 +69,7 @@ from metadata.generated.schema.entity.services.connections.database.dynamoDBConn
DynamoDBConnection,
)
from metadata.generated.schema.entity.services.connections.database.glueConnection import (
GlueConnection,
GlueConnection as GlueDBConnection,
)
from metadata.generated.schema.entity.services.connections.database.salesforceConnection import (
SalesforceConnection,
@ -89,13 +89,17 @@ from metadata.generated.schema.entity.services.connections.pipeline.airflowConne
from metadata.generated.schema.entity.services.connections.pipeline.backendConnection import (
BackendConnection,
)
from metadata.generated.schema.entity.services.connections.pipeline.glueConnection import (
GlueConnection as GluePipelineConnection,
)
from metadata.orm_profiler.orm.functions.conn_test import ConnTestFn
from metadata.utils.connection_clients import (
AirByteClient,
DatalakeClient,
DeltaLakeClient,
DynamoClient,
GlueClient,
GlueDBClient,
GluePipelineClient,
KafkaClient,
LookerClient,
MetabaseClient,
@ -140,7 +144,14 @@ def create_generic_connection(connection, verbose: bool = False) -> Engine:
@singledispatch
def get_connection(
connection, verbose: bool = False
) -> Union[Engine, DynamoClient, GlueClient, SalesforceClient, KafkaClient]:
) -> Union[
Engine,
DynamoClient,
GlueDBClient,
GluePipelineClient,
SalesforceClient,
KafkaClient,
]:
"""
Given an SQL configuration, build the SQLAlchemy Engine
"""
@ -212,10 +223,20 @@ def _(connection: DynamoDBConnection, verbose: bool = False) -> DynamoClient:
@get_connection.register
def _(connection: GlueConnection, verbose: bool = False) -> GlueClient:
def _(connection: GlueDBConnection, verbose: bool = False) -> GlueDBClient:
from metadata.utils.aws_client import AWSClient
glue_connection = AWSClient(connection.awsConfig).get_glue_client()
glue_connection = AWSClient(connection.awsConfig).get_glue_db_client()
return glue_connection
@get_connection.register
def _(
connection: GluePipelineConnection, verbose: bool = False
) -> GluePipelineConnection:
from metadata.utils.aws_client import AWSClient
glue_connection = AWSClient(connection.awsConfig).get_glue_pipeline_client()
return glue_connection
@ -360,7 +381,7 @@ def _(connection: DynamoClient) -> None:
@test_connection.register
def _(connection: GlueClient) -> None:
def _(connection: GlueDBClient) -> None:
"""
Test that we can connect to the source using the given aws resource
:param engine: boto cliet to test
@ -380,6 +401,27 @@ def _(connection: GlueClient) -> None:
)
@test_connection.register
def _(connection: GluePipelineClient) -> None:
"""
Test that we can connect to the source using the given aws resource
:param engine: boto cliet to test
:return: None or raise an exception if we cannot connect
"""
from botocore.client import ClientError
try:
connection.client.get_paginator("get_databases")
except ClientError as err:
raise SourceConnectionException(
f"Connection error for {connection} - {err}. Check the connection details."
)
except Exception as err:
raise SourceConnectionException(
f"Unknown error connecting with {connection} - {err}."
)
@test_connection.register
def _(connection: SalesforceClient) -> None:
from simple_salesforce.exceptions import SalesforceAuthenticationFailed

View File

@ -176,7 +176,6 @@ class TestWorkflowParse(TestCase):
"endPointURL": "https://glue.<region_name>.amazonaws.com/",
},
"storageServiceName": "storage_name",
"pipelineServiceName": "pipeline_name",
"random": "extra",
}
},

View File

@ -32,9 +32,6 @@ describe('Glue Ingestion', () => {
cy.get('#root_storageServiceName')
.scrollIntoView()
.type(Cypress.env('glueStorageServiceName'));
cy.get('#root_pipelineServiceName')
.scrollIntoView()
.type(Cypress.env('gluePipelineServiceName'));
};
const addIngestionInput = () => {