Fix #4923 - Presto ingestion & SQLAlchemy test connection refactor (#4946)

Fix #4923 - Presto ingestion & SQLAlchemy test connection refactor (#4946)
This commit is contained in:
Pere Miquel Brull 2022-05-14 06:44:43 +02:00 committed by GitHub
parent 60b8730df3
commit c6915ca510
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 118 additions and 37 deletions

View File

@ -76,5 +76,5 @@
} }
}, },
"additionalProperties": false, "additionalProperties": false,
"required": ["hostPort", "username"] "required": ["hostPort", "username", "catalog"]
} }

View File

@ -9,9 +9,9 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import json
from datetime import timedelta from datetime import timedelta
import yaml
from airflow import DAG from airflow import DAG
try: try:
@ -21,10 +21,6 @@ except ModuleNotFoundError:
from airflow.utils.dates import days_ago from airflow.utils.dates import days_ago
from airflow_provider_openmetadata.lineage.callback import (
failure_callback,
success_callback,
)
from metadata.ingestion.api.workflow import Workflow from metadata.ingestion.api.workflow import Workflow
default_args = { default_args = {
@ -34,39 +30,29 @@ default_args = {
"retries": 3, "retries": 3,
"retry_delay": timedelta(seconds=10), "retry_delay": timedelta(seconds=10),
"execution_timeout": timedelta(minutes=60), "execution_timeout": timedelta(minutes=60),
"on_failure_callback": failure_callback,
"on_success_callback": success_callback,
} }
config = """ config = """
{ source:
"source": { type: sample_data
"type": "sample_data", serviceName: sample_data
"serviceName": "sample_data", serviceConnection:
"serviceConnection": { config:
"config": { type: SampleData
"type": "SampleData", sampleDataFolder: "./examples/sample_data"
"sampleDataFolder": "./examples/sample_data" sourceConfig: {}
} sink:
}, type: metadata-rest
"sourceConfig": {} config: {}
}, workflowConfig:
"sink": { openMetadataServerConfig:
"type": "metadata-rest", hostPort: http://localhost:8585/api
"config": {} authProvider: no-auth
},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "no-auth"
}
}
}
""" """
def metadata_ingestion_workflow(): def metadata_ingestion_workflow():
workflow_config = json.loads(config) workflow_config = yaml.safe_load(config)
workflow = Workflow.create(workflow_config) workflow = Workflow.create(workflow_config)
workflow.execute() workflow.execute()
workflow.raise_from_status() workflow.raise_from_status()

View File

@ -10,20 +10,29 @@
# limitations under the License. # limitations under the License.
import re import re
from urllib.parse import quote_plus from typing import Iterable
from pyhive.sqlalchemy_presto import PrestoDialect, _type_map from pyhive.sqlalchemy_presto import PrestoDialect, _type_map
from sqlalchemy import types, util from sqlalchemy import inspect, types, util
from sqlalchemy.engine import reflection from sqlalchemy.engine import reflection
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection, OpenMetadataConnection,
) )
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource, Source as WorkflowSource,
) )
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.source.sql_source import SQLSource from metadata.ingestion.source.sql_source import SQLSource
from metadata.utils.filters import filter_by_schema
from metadata.utils.fqdn_generator import get_fqdn
from metadata.utils.logger import ometa_logger
logger = ometa_logger()
_type_map.update( _type_map.update(
{ {
@ -81,6 +90,8 @@ from metadata.generated.schema.entity.services.connections.database.prestoConnec
class PrestoSource(SQLSource): class PrestoSource(SQLSource):
def __init__(self, config, metadata_config): def __init__(self, config, metadata_config):
super().__init__(config, metadata_config) super().__init__(config, metadata_config)
self.schema_names = None
self.inspector = None
@classmethod @classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection): def create(cls, config_dict, metadata_config: OpenMetadataConnection):
@ -91,3 +102,42 @@ class PrestoSource(SQLSource):
f"Expected PrestoConnection, but got {connection}" f"Expected PrestoConnection, but got {connection}"
) )
return cls(config, metadata_config) return cls(config, metadata_config)
def _get_database(self, _) -> Database:
return Database(
name=self.service_connection.catalog,
service=EntityReference(
id=self.service.id, type=self.service_connection.type.value
),
)
def prepare(self):
self.inspector = inspect(self.engine)
self.schema_names = (
self.inspector.get_schema_names()
if not self.service_connection.database
else [self.service_connection.database]
)
return super().prepare()
def next_record(self) -> Iterable[OMetaDatabaseAndTable]:
for schema in self.schema_names:
self.database_source_state.clear()
if filter_by_schema(
self.source_config.schemaFilterPattern, schema_name=schema
):
self.status.filter(schema, "Schema pattern not allowed")
continue
if self.source_config.includeTables:
yield from self.fetch_tables(self.inspector, schema)
if self.source_config.includeViews:
logger.info("Presto includes views when fetching tables.")
if self.source_config.markDeletedTables:
schema_fqdn = get_fqdn(
DatabaseSchema,
self.config.serviceName,
self.service_connection.catalog,
schema,
)
yield from self.delete_tables(schema_fqdn)

View File

@ -0,0 +1,44 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Define a vanilla connection testing function.
This will be executed as a way to make sure
that the Engine can reach and execute in the
source.
"""
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import ClauseElement, Executable
from metadata.orm_profiler.metrics.core import CACHE
from metadata.orm_profiler.orm.registry import Dialects
from metadata.utils.logger import profiler_logger
logger = profiler_logger()
class ConnTestFn(Executable, ClauseElement):
inherit_cache = CACHE
@compiles(ConnTestFn)
def _(*_, **__):
return "SELECT 42"
@compiles(ConnTestFn, Dialects.Oracle)
def _(*_, **__):
return "SELECT 42 FROM DUAL"
@compiles(ConnTestFn, Dialects.BigQuery)
def _(*_, **__):
return "SELECT SESSION_USER()"

View File

@ -19,7 +19,7 @@ from functools import singledispatch
from typing import Union from typing import Union
import requests import requests
from sqlalchemy import create_engine from sqlalchemy import create_engine, func
from sqlalchemy.engine.base import Engine from sqlalchemy.engine.base import Engine
from sqlalchemy.exc import OperationalError from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
@ -65,6 +65,7 @@ from metadata.generated.schema.entity.services.connections.database.snowflakeCon
from metadata.generated.schema.entity.services.connections.messaging.kafkaConnection import ( from metadata.generated.schema.entity.services.connections.messaging.kafkaConnection import (
KafkaConnection, KafkaConnection,
) )
from metadata.orm_profiler.orm.functions.conn_test import ConnTestFn
from metadata.utils.connection_clients import ( from metadata.utils.connection_clients import (
DeltaLakeClient, DeltaLakeClient,
DynamoClient, DynamoClient,
@ -287,8 +288,8 @@ def test_connection(connection) -> None:
:return: None or raise an exception if we cannot connect :return: None or raise an exception if we cannot connect
""" """
try: try:
with connection.connect() as _: with connection.connect() as conn:
pass conn.execute(ConnTestFn())
except OperationalError as err: except OperationalError as err:
raise SourceConnectionException( raise SourceConnectionException(
f"Connection error for {connection} - {err}. Check the connection details." f"Connection error for {connection} - {err}. Check the connection details."