Fix for connectors based on refactoring of schemas V2 (#3870)

Co-authored-by: Ayush Shah <ayush@getcollate.io>
This commit is contained in:
Pere Miquel Brull 2022-04-06 03:33:25 +02:00 committed by GitHub
parent 2db3b9dd94
commit 63533eb388
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 223 additions and 121 deletions

View File

@ -1,25 +1,25 @@
{ {
"source": { "source": {
"type": "mysql", "type": "mysql",
"config": { "serviceName": "local_mysql",
"username": "openmetadata_user", "serviceConnection": {
"password": "openmetadata_password", "config": {
"database": "openmetadata_db", "type": "MySQL",
"service_name": "local_mysql", "username": "openmetadata_user",
"schema_filter_pattern": { "password": "openmetadata_password",
"includes": ["test_delete.*"] "hostPort": "localhost:3306"
} }
} },
"sourceConfig": {"config": {"enableDataProfiler": false}}
}, },
"sink": { "sink": {
"type": "metadata-rest", "type": "metadata-rest",
"config": {} "config": {}
}, },
"metadata_server": { "workflowConfig": {
"type": "metadata-server", "openMetadataServerConfig": {
"config": { "hostPort": "http://localhost:8585/api",
"api_endpoint": "http://localhost:8585/api", "authProvider": "no-auth"
"auth_provider_type": "no-auth"
} }
} }
} }

View File

@ -11,10 +11,8 @@
}, },
"workflowConfig": { "workflowConfig": {
"openMetadataServerConfig": { "openMetadataServerConfig": {
"api_endpoint": "http://localhost:8585/api", "hostPort": "http://localhost:8585/api",
"auth_provider_type": "no-auth" "authProvider": "no-auth"
},
"config": {
} }
} }
} }

View File

@ -19,7 +19,9 @@ from typing import Type, TypeVar
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataServerConfig, OpenMetadataServerConfig,
) )
from metadata.generated.schema.metadataIngestion.workflow import Source as SourceConfig from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.common import DynamicTypedConfig from metadata.ingestion.api.common import DynamicTypedConfig
from metadata.ingestion.api.processor import Processor from metadata.ingestion.api.processor import Processor
from metadata.ingestion.api.sink import Sink from metadata.ingestion.api.sink import Sink
@ -55,7 +57,7 @@ def get_class(key: str) -> Type[T]:
def get_ingestion_source( def get_ingestion_source(
source_type: str, source_type: str,
source_config: SourceConfig, source_config: WorkflowSource,
metadata_config: OpenMetadataServerConfig, metadata_config: OpenMetadataServerConfig,
) -> Source: ) -> Source:
""" """

View File

@ -17,6 +17,7 @@ import click
from metadata.config.common import WorkflowExecutionError from metadata.config.common import WorkflowExecutionError
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataServerConfig,
OpenMetadataWorkflowConfig, OpenMetadataWorkflowConfig,
) )
from metadata.ingestion.api.bulk_sink import BulkSink from metadata.ingestion.api.bulk_sink import BulkSink
@ -48,9 +49,10 @@ class Workflow:
self.typeClassFetch(source_type, False), self.typeClassFetch(source_type, False),
) )
) )
metadata_config = self.config.workflowConfig.dict().get( metadata_config: OpenMetadataServerConfig = (
"openMetadataServerConfig", {} self.config.workflowConfig.openMetadataServerConfig
) )
self.source: Source = source_class.create( self.source: Source = source_class.create(
self.config.source.dict(), metadata_config self.config.source.dict(), metadata_config
) )

View File

@ -219,7 +219,7 @@ class MetadataRestSink(Sink[Entity]):
if db_schema_and_table.table.tableQueries is not None: if db_schema_and_table.table.tableQueries is not None:
self.metadata.ingest_table_queries_data( self.metadata.ingest_table_queries_data(
table=created_table, table=created_table,
table_queries=db_schema_and_table.table.tableQueries, table_queries=db_schema_and_table.table.dict().get("tableQueries"),
) )
logger.info( logger.info(

View File

@ -19,14 +19,12 @@ from metadata.generated.schema.entity.services.connections.database.mysqlConnect
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataServerConfig, OpenMetadataServerConfig,
) )
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.common import Entity from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.sql_source import SQLSource from metadata.ingestion.source.sql_source import SQLSource
from metadata.ingestion.source.sql_source_common import SQLConnectionConfig
class MySQLConfig(MysqlConnection, SQLConnectionConfig):
def get_connection_url(self):
return super().get_connection_url()
class MysqlSource(SQLSource): class MysqlSource(SQLSource):
@ -35,28 +33,35 @@ class MysqlSource(SQLSource):
@classmethod @classmethod
def create(cls, config_dict, metadata_config: OpenMetadataServerConfig): def create(cls, config_dict, metadata_config: OpenMetadataServerConfig):
config = MySQLConfig.parse_obj(config_dict) config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: MysqlConnection = config.serviceConnection.__root__.config
if not isinstance(connection, MysqlConnection):
raise InvalidSourceException(
f"Expected SQLiteConnection, but got {connection}"
)
return cls(config, metadata_config) return cls(config, metadata_config)
def prepare(self): def prepare(self):
self.inspector = inspect(self.engine) self.inspector = inspect(self.engine)
self.schema_names = ( self.service_connection.database = "default"
self.inspector.get_schema_names()
if not self.config.database
else [self.config.database]
)
return super().prepare() return super().prepare()
def next_record(self) -> Iterable[Entity]: def next_record(self) -> Iterable[Entity]:
for schema in self.schema_names: for schema in self.inspector.get_schema_names():
self.database_source_state.clear() self.database_source_state.clear()
if not self.sql_config.schema_filter_pattern.included(schema): if (
self.source_config.schemaFilterPattern
and schema not in self.source_config.schemaFilterPattern.includes
):
self.status.filter(schema, "Schema pattern not allowed") self.status.filter(schema, "Schema pattern not allowed")
continue continue
if self.config.include_tables:
yield from self.fetch_tables(self.inspector, schema) # Fetch tables by default
if self.config.include_views: yield from self.fetch_tables(self.inspector, schema)
if self.source_config.includeViews:
yield from self.fetch_views(self.inspector, schema) yield from self.fetch_views(self.inspector, schema)
if self.config.mark_deleted_tables_as_deleted: if self.source_config.markDeletedTables:
schema_fqdn = f"{self.config.service_name}.{schema}" schema_fqdn = f"{self.config.serviceName}.{schema}"
yield from self.delete_tables(schema_fqdn) yield from self.delete_tables(schema_fqdn)

View File

@ -307,6 +307,7 @@ class SampleDataSource(Source[Entity]):
@classmethod @classmethod
def create(cls, config_dict, metadata_config): def create(cls, config_dict, metadata_config):
config = SampleDataSourceConfig.parse_obj(config_dict) config = SampleDataSourceConfig.parse_obj(config_dict)
metadata_config = OpenMetadataServerConfig.parse_obj(metadata_config)
return cls(config, metadata_config) return cls(config, metadata_config)
def prepare(self): def prepare(self):

View File

@ -23,12 +23,10 @@ from sqlalchemy.inspection import inspect
from sqlalchemy.sql import text from sqlalchemy.sql import text
from metadata.config.common import FQDN_SEPARATOR from metadata.config.common import FQDN_SEPARATOR
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import TableData from metadata.generated.schema.entity.data.table import TableData
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataServerConfig, OpenMetadataServerConfig,
) )
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.source.sql_source import SQLSource from metadata.ingestion.source.sql_source import SQLSource
from metadata.ingestion.source.sql_source_common import SQLConnectionConfig from metadata.ingestion.source.sql_source_common import SQLConnectionConfig
from metadata.utils.column_type_parser import create_sqlalchemy_type from metadata.utils.column_type_parser import create_sqlalchemy_type

View File

@ -11,6 +11,7 @@
""" """
Generic source to build SQL connectors. Generic source to build SQL connectors.
""" """
import copy
import json import json
import logging import logging
import re import re
@ -39,10 +40,15 @@ from metadata.generated.schema.entity.data.table import (
TableData, TableData,
TableProfile, TableProfile,
) )
from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import (
DatabaseServiceMetadataPipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataServerConfig, OpenMetadataServerConfig,
) )
from metadata.generated.schema.metadataIngestion.workflow import Source as SourceConfig from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.tagLabel import TagLabel from metadata.generated.schema.type.tagLabel import TagLabel
from metadata.ingestion.api.common import Entity from metadata.ingestion.api.common import Entity
@ -82,28 +88,40 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
def __init__( def __init__(
self, self,
config: SourceConfig, config: WorkflowSource,
metadata_config: OpenMetadataServerConfig, metadata_config: OpenMetadataServerConfig,
): ):
super().__init__() super().__init__()
self.config = config self.config = config
# It will be one of the Unions. We don't know the specific type here.
self.service_connection = self.config.serviceConnection.__root__.config
self.source_config: DatabaseServiceMetadataPipeline = (
self.config.sourceConfig.config
)
self.metadata_config = metadata_config self.metadata_config = metadata_config
self.service = get_database_service_or_create(config, metadata_config) self.service = get_database_service_or_create(config, metadata_config)
self.metadata = OpenMetadata(metadata_config) self.metadata = OpenMetadata(metadata_config)
self.status = SQLSourceStatus() self.status = SQLSourceStatus()
self.sql_config = self.config self.engine = get_engine(config=self.config)
self.engine = get_engine(config=self.sql_config)
self._session = None # We will instantiate this just if needed self._session = None # We will instantiate this just if needed
self.connection = self.engine.connect() self.connection = self.engine.connect()
self.data_profiler = None self.data_profiler = None
self.data_models = {} self.data_models = {}
self.table_constraints = None self.table_constraints = None
self.database_source_state = set() self.database_source_state = set()
if self.config.dbt_catalog_file is not None: if self.source_config.dbtCatalogFilePath:
with open(self.config.dbt_catalog_file, "r", encoding="utf-8") as catalog: with open(
self.source_config.dbtCatalogFilePath, "r", encoding="utf-8"
) as catalog:
self.dbt_catalog = json.load(catalog) self.dbt_catalog = json.load(catalog)
if self.config.dbt_manifest_file is not None: if self.source_config.dbtManifestFilePath:
with open(self.config.dbt_manifest_file, "r", encoding="utf-8") as manifest: with open(
self.source_config.dbtManifestFilePath, "r", encoding="utf-8"
) as manifest:
self.dbt_manifest = json.load(manifest) self.dbt_manifest = json.load(manifest)
self.profile_date = datetime.now() self.profile_date = datetime.now()
@ -129,7 +147,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
logger.error( logger.error(
f"Profiling not available for this databaseService: {str(err)}" f"Profiling not available for this databaseService: {str(err)}"
) )
self.config.data_profiler_enabled = False self.source_config.enableDataProfiler = False
except Exception as exc: # pylint: disable=broad-except except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.print_exc()) logger.debug(traceback.print_exc())
@ -164,7 +182,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
to the Table Entities to the Table Entities
""" """
try: try:
query = self.config.query.format(schema, table) query = self.source_config.sampleDataQuery.format(schema, table)
logger.info(query) logger.info(query)
results = self.connection.execute(query) results = self.connection.execute(query)
cols = [] cols = []
@ -191,18 +209,20 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
inspectors = self.get_databases() inspectors = self.get_databases()
for inspector in inspectors: for inspector in inspectors:
schema_names = inspector.get_schema_names() schema_names = inspector.get_schema_names()
print(schema_names)
for schema in schema_names: for schema in schema_names:
# clear any previous source database state # clear any previous source database state
self.database_source_state.clear() self.database_source_state.clear()
if not self.sql_config.schema_filter_pattern.included(schema): if (
self.source_config.schemaFilterPattern
and schema not in self.source_config.schemaFilterPattern.includes
):
self.status.filter(schema, "Schema pattern not allowed") self.status.filter(schema, "Schema pattern not allowed")
continue continue
if self.config.include_tables: if self.source_config.includeViews:
yield from self.fetch_tables(inspector, schema)
if self.config.include_views:
yield from self.fetch_views(inspector, schema) yield from self.fetch_views(inspector, schema)
if self.config.mark_deleted_tables_as_deleted: if self.source_config.markDeletedTables:
schema_fqdn = f"{self.config.service_name}.{schema}" schema_fqdn = f"{self.config.serviceName}.{schema}"
yield from self.delete_tables(schema_fqdn) yield from self.delete_tables(schema_fqdn)
def fetch_tables( def fetch_tables(
@ -218,20 +238,23 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
schema, table_name = self.standardize_schema_table_names( schema, table_name = self.standardize_schema_table_names(
schema, table_name schema, table_name
) )
if not self.sql_config.table_filter_pattern.included(table_name): if (
self.source_config.tableFilterPattern
and table_name not in self.source_config.tableFilterPattern.includes
):
self.status.filter( self.status.filter(
f"{self.config.get_service_name()}.{table_name}", f"{self.config.serviceName}.{table_name}",
"Table pattern not allowed", "Table pattern not allowed",
) )
continue continue
if self._is_partition(table_name, schema, inspector): if self._is_partition(table_name, schema, inspector):
self.status.filter( self.status.filter(
f"{self.config.get_service_name()}.{table_name}", f"{self.config.serviceName}.{table_name}",
"Table is partition", "Table is partition",
) )
continue continue
description = _get_table_description(schema, table_name, inspector) description = _get_table_description(schema, table_name, inspector)
fqn = self.get_table_fqn(self.config.service_name, schema, table_name) fqn = self.get_table_fqn(self.config.serviceName, schema, table_name)
self.database_source_state.add(fqn) self.database_source_state.add(fqn)
self.table_constraints = None self.table_constraints = None
table_columns = self._get_columns(schema, table_name, inspector) table_columns = self._get_columns(schema, table_name, inspector)
@ -240,13 +263,12 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
name=table_name, name=table_name,
tableType="Regular", tableType="Regular",
description=description if description is not None else " ", description=description if description is not None else " ",
fullyQualifiedName=fqn,
columns=table_columns, columns=table_columns,
) )
if self.table_constraints: if self.table_constraints:
table_entity.tableConstraints = self.table_constraints table_entity.tableConstraints = self.table_constraints
try: try:
if self.sql_config.generate_sample_data: if self.source_config.generateSampleData:
table_data = self.fetch_sample_data(schema, table_name) table_data = self.fetch_sample_data(schema, table_name)
if table_data: if table_data:
table_entity.sampleData = table_data table_entity.sampleData = table_data
@ -256,7 +278,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
logger.error(err) logger.error(err)
try: try:
if self.config.data_profiler_enabled: if self.source_config.enableDataProfiler:
profile = self.run_profiler(table=table_entity, schema=schema) profile = self.run_profiler(table=table_entity, schema=schema)
table_entity.tableProfile = [profile] if profile else None table_entity.tableProfile = [profile] if profile else None
# Catch any errors during the profile runner and continue # Catch any errors during the profile runner and continue
@ -265,21 +287,19 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
# check if we have any model to associate with # check if we have any model to associate with
table_entity.dataModel = self._get_data_model(schema, table_name) table_entity.dataModel = self._get_data_model(schema, table_name)
database = self._get_database(self.config.database) database = self._get_database(self.service_connection.database)
table_schema_and_db = OMetaDatabaseAndTable( table_schema_and_db = OMetaDatabaseAndTable(
table=table_entity, table=table_entity,
database=database, database=database,
database_schema=self._get_schema(schema, database), database_schema=self._get_schema(schema, database),
) )
yield table_schema_and_db yield table_schema_and_db
self.status.scanned( self.status.scanned("{}.{}".format(self.config.serviceName, table_name))
"{}.{}".format(self.config.get_service_name(), table_name)
)
except Exception as err: except Exception as err:
logger.debug(traceback.print_exc()) logger.debug(traceback.print_exc())
logger.error(err) logger.error(err)
self.status.failures.append( self.status.failures.append(
"{}.{}".format(self.config.service_name, table_name) "{}.{}".format(self.config.serviceName, table_name)
) )
continue continue
@ -292,20 +312,23 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
""" """
for view_name in inspector.get_view_names(schema): for view_name in inspector.get_view_names(schema):
try: try:
if self.config.scheme == "bigquery": if self.service_connection.scheme == "bigquery":
schema, view_name = self.standardize_schema_table_names( schema, view_name = self.standardize_schema_table_names(
schema, view_name schema, view_name
) )
if not self.sql_config.table_filter_pattern.included(view_name): if (
self.source_config.tableFilterPattern
and view_name not in self.source_config.tableFilterPattern.includes
):
self.status.filter( self.status.filter(
f"{self.config.get_service_name()}.{view_name}", f"{self.config.serviceName}.{view_name}",
"View pattern not allowed", "View pattern not allowed",
) )
continue continue
try: try:
if self.config.scheme == "bigquery": if self.service_connection.scheme == "bigquery":
view_definition = inspector.get_view_definition( view_definition = inspector.get_view_definition(
f"{self.config.project_id}.{schema}.{view_name}" f"{self.service_connection.projectId}.{schema}.{view_name}"
) )
else: else:
view_definition = inspector.get_view_definition( view_definition = inspector.get_view_definition(
@ -316,7 +339,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
) )
except NotImplementedError: except NotImplementedError:
view_definition = "" view_definition = ""
fqn = self.get_table_fqn(self.config.service_name, schema, view_name) fqn = self.get_table_fqn(self.config.serviceName, schema, view_name)
self.database_source_state.add(fqn) self.database_source_state.add(fqn)
table = Table( table = Table(
id=uuid.uuid4(), id=uuid.uuid4(),
@ -325,7 +348,6 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
description=_get_table_description(schema, view_name, inspector) description=_get_table_description(schema, view_name, inspector)
or "", or "",
# This will be generated in the backend!! #1673 # This will be generated in the backend!! #1673
fullyQualifiedName=view_name,
columns=self._get_columns(schema, view_name, inspector), columns=self._get_columns(schema, view_name, inspector),
viewDefinition=view_definition, viewDefinition=view_definition,
) )
@ -334,16 +356,16 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
"sql": table.viewDefinition.__root__, "sql": table.viewDefinition.__root__,
"from_type": "table", "from_type": "table",
"to_type": "table", "to_type": "table",
"service_name": self.config.service_name, "service_name": self.config.serviceName,
} }
ingest_lineage( ingest_lineage(
query_info=query_info, metadata_config=self.metadata_config query_info=query_info, metadata_config=self.metadata_config
) )
if self.sql_config.generate_sample_data: if self.source_config.generateSampleData:
table_data = self.fetch_sample_data(schema, view_name) table_data = self.fetch_sample_data(schema, view_name)
table.sampleData = table_data table.sampleData = table_data
table.dataModel = self._get_data_model(schema, view_name) # table.dataModel = self._get_data_model(schema, view_name)
database = self._get_database(self.config.database) database = self._get_database(self.service_connection.database)
table_schema_and_db = OMetaDatabaseAndTable( table_schema_and_db = OMetaDatabaseAndTable(
table=table, table=table,
database=database, database=database,
@ -353,7 +375,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
# Catch any errors and continue the ingestion # Catch any errors and continue the ingestion
except Exception as err: # pylint: disable=broad-except except Exception as err: # pylint: disable=broad-except
logger.error(err) logger.error(err)
self.status.warnings.append(f"{self.config.service_name}.{view_name}") self.status.warnings.append(f"{self.config.serviceName}.{view_name}")
continue continue
def delete_tables(self, schema_fqdn: str) -> DeleteTable: def delete_tables(self, schema_fqdn: str) -> DeleteTable:
@ -370,7 +392,10 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
""" """
Get all the DBT information and feed it to the Table Entity Get all the DBT information and feed it to the Table Entity
""" """
if self.config.dbt_manifest_file and self.config.dbt_catalog_file: if (
self.source_config.dbtManifestFilePath
and self.source_config.dbtCatalogFilePath
):
logger.info("Parsing Data Models") logger.info("Parsing Data Models")
manifest_entities = { manifest_entities = {
**self.dbt_manifest["nodes"], **self.dbt_manifest["nodes"],
@ -421,7 +446,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
try: try:
_, database, table = node.split(".", 2) _, database, table = node.split(".", 2)
table_fqn = self.get_table_fqn( table_fqn = self.get_table_fqn(
self.config.service_name, database, table self.config.serviceName, database, table
).lower() ).lower()
upstream_nodes.append(table_fqn) upstream_nodes.append(table_fqn)
except Exception as err: # pylint: disable=broad-except except Exception as err: # pylint: disable=broad-except
@ -471,14 +496,18 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
def _get_database(self, database: str) -> Database: def _get_database(self, database: str) -> Database:
return Database( return Database(
name=database, name=database,
service=EntityReference(id=self.service.id, type=self.config.service_type), service=EntityReference(
id=self.service.id, type=self.service_connection.type.value
),
) )
def _get_schema(self, schema: str, database: Database) -> DatabaseSchema: def _get_schema(self, schema: str, database: Database) -> DatabaseSchema:
return DatabaseSchema( return DatabaseSchema(
name=schema, name=schema,
database=database.service, database=database.service,
service=EntityReference(id=self.service.id, type=self.config.service_type), service=EntityReference(
id=self.service.id, type=self.service_connection.type.value
),
) )
@staticmethod @staticmethod

View File

@ -20,7 +20,9 @@ from metadata.generated.schema.entity.services.connections.database.sqliteConnec
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataServerConfig, OpenMetadataServerConfig,
) )
from metadata.generated.schema.metadataIngestion.workflow import Source as SourceConfig from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.sql_source import SQLSource from metadata.ingestion.source.sql_source import SQLSource
@ -31,7 +33,7 @@ class SqliteSource(SQLSource):
@classmethod @classmethod
def create(cls, config_dict, metadata_config: OpenMetadataServerConfig): def create(cls, config_dict, metadata_config: OpenMetadataServerConfig):
config: SourceConfig = SourceConfig.parse_obj(config_dict) config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection = config.serviceConnection.__root__.config connection = config.serviceConnection.__root__.config
if not isinstance(connection, SQLiteConnection): if not isinstance(connection, SQLiteConnection):
raise InvalidSourceException( raise InvalidSourceException(

View File

@ -19,22 +19,30 @@ from sqlalchemy.engine.base import Engine
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm.session import Session from sqlalchemy.orm.session import Session
from metadata.generated.schema.metadataIngestion.workflow import Source as SourceConfig from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.utils.source_connections import get_connection_url
logger = logging.getLogger("Utils") logger = logging.getLogger("Utils")
# TODO: fix this and use the singledispatch to build the URL instead of get_connection_url def get_engine(config: WorkflowSource, verbose: bool = False) -> Engine:
def get_engine(config: SourceConfig, verbose: bool = False) -> Engine:
""" """
Given an SQL configuration, build the SQLAlchemy Engine Given an SQL configuration, build the SQLAlchemy Engine
""" """
logger.info(f"Building Engine for {config.serviceName}...") logger.info(f"Building Engine for {config.serviceName}...")
service_connection_config = config.serviceConnection.__root__.config
options = service_connection_config.connectionOptions
if not options:
options = {}
connect_args = service_connection_config.connectionArguments
if not connect_args:
connect_args = {}
engine = create_engine( engine = create_engine(
config.get_connection_url(), get_connection_url(config.serviceConnection.__root__.config),
**config.options, **options,
connect_args=config.connect_args, connect_args=connect_args,
echo=verbose, echo=verbose,
) )

View File

@ -13,6 +13,7 @@ import logging
import traceback import traceback
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Any, Dict, Iterable from typing import Any, Dict, Iterable
from urllib.parse import quote_plus
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.api.services.createDashboardService import ( from metadata.generated.schema.api.services.createDashboardService import (
@ -36,7 +37,9 @@ from metadata.generated.schema.entity.services.databaseService import DatabaseSe
from metadata.generated.schema.entity.services.messagingService import MessagingService from metadata.generated.schema.entity.services.messagingService import MessagingService
from metadata.generated.schema.entity.services.pipelineService import PipelineService from metadata.generated.schema.entity.services.pipelineService import PipelineService
from metadata.generated.schema.entity.services.storageService import StorageService from metadata.generated.schema.entity.services.storageService import StorageService
from metadata.generated.schema.metadataIngestion.workflow import Source as SourceConfig from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
@ -62,7 +65,7 @@ def snake_to_camel(s):
def get_database_service_or_create( def get_database_service_or_create(
config: SourceConfig, metadata_config, service_name=None config: WorkflowSource, metadata_config, service_name=None
) -> DatabaseService: ) -> DatabaseService:
metadata = OpenMetadata(metadata_config) metadata = OpenMetadata(metadata_config)
if not service_name: if not service_name:
@ -70,47 +73,44 @@ def get_database_service_or_create(
service: DatabaseService = metadata.get_by_name( service: DatabaseService = metadata.get_by_name(
entity=DatabaseService, fqdn=service_name entity=DatabaseService, fqdn=service_name
) )
if service: if not service:
return service config_dict = config.dict()
else: service_connection_config = config_dict.get("serviceConnection").get("config")
password = ( password = (
config.password.get_secret_value() service_connection_config.get("password").get_secret_value()
if hasattr(config, "password") and config.password if service_connection_config and service_connection_config.get("password")
else None else None
) )
# Use a JSON to dynamically parse the pydantic model # Use a JSON to dynamically parse the pydantic model
# based on the serviceType # based on the serviceType
# TODO revisit me
service_json = { service_json = {
"connection": { "connection": {
"config": { "config": {
"hostPort": config.host_port "hostPort": service_connection_config.get("hostPort"),
if hasattr(config, "host_port") "username": service_connection_config.get("username"),
else None,
"username": config.username
if hasattr(config, "username")
else None,
"password": password, "password": password,
"database": config.database "database": service_connection_config.get("database"),
if hasattr(config, "database") "connectionOptions": service_connection_config.get(
else None, "connectionOptions"
"connectionOptions": config.options ),
if hasattr(config, "options") "connectionArguments": service_connection_config.get(
else None, "connectionArguments"
"connectionArguments": config.connect_args ),
if hasattr(config, "connect_args")
else None,
} }
}, },
"name": service_name, "name": service_name,
"description": "", "description": "",
"serviceType": config.service_type, "serviceType": service_connection_config.get("type").value,
} }
created_service: DatabaseService = metadata.create_or_update( created_service: DatabaseService = metadata.create_or_update(
CreateDatabaseServiceRequest(**service_json) CreateDatabaseServiceRequest(**service_json)
) )
logger.info(f"Creating DatabaseService instance for {service_name}") logger.info(f"Creating DatabaseService instance for {service_name}")
return created_service return created_service
return service
def get_messaging_service_or_create( def get_messaging_service_or_create(

View File

@ -0,0 +1,55 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Hosts the singledispatch to build source URLs
"""
from functools import singledispatch
from urllib.parse import quote_plus
from metadata.generated.schema.entity.services.connections.database.mysqlConnection import (
MysqlConnection,
)
@singledispatch
def get_connection_url(connection):
raise NotImplemented(
f"Connection URL build not implemented for type {type(connection)}: {connection}"
)
@get_connection_url.register
def _(connection: MysqlConnection):
url = f"{connection.scheme.value}://"
if connection.username:
url += f"{connection.username}"
url += (
f":{quote_plus(connection.password.get_secret_value())}"
if connection
else ""
)
url += "@"
url += connection.hostPort
url += f"/{connection.database}" if connection.database else ""
options = connection.connectionOptions
if options:
if not connection.database:
url += "/"
params = "&".join(
f"{key}={quote_plus(value)}" for (key, value) in options.items() if value
)
url = f"{url}?{params}"
return url

View File

@ -24,7 +24,9 @@ from metadata.generated.schema.entity.data.table import Column, DataType, Table
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataServerConfig, OpenMetadataServerConfig,
) )
from metadata.generated.schema.metadataIngestion.workflow import Source as SourceConfig from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.tests.column.columnValuesToBeBetween import ( from metadata.generated.schema.tests.column.columnValuesToBeBetween import (
ColumnValuesToBeBetween, ColumnValuesToBeBetween,
) )
@ -66,7 +68,7 @@ def test_init_workflow():
""" """
We can initialise the workflow from a config We can initialise the workflow from a config
""" """
assert isinstance(workflow.source_config, SourceConfig) assert isinstance(workflow.source_config, WorkflowSource)
assert isinstance(workflow.metadata_config, OpenMetadataServerConfig) assert isinstance(workflow.metadata_config, OpenMetadataServerConfig)
assert isinstance(workflow.processor, OrmProfilerProcessor) assert isinstance(workflow.processor, OrmProfilerProcessor)