diff --git a/docker/metadata/docker-compose.yml b/docker/metadata/docker-compose.yml index 9fd16f50880..94be0605039 100644 --- a/docker/metadata/docker-compose.yml +++ b/docker/metadata/docker-compose.yml @@ -41,10 +41,8 @@ services: - 9200:9200 - 9300:9300 - catalog: - build: - context: ../../. - dockerfile: docker/metadata/Dockerfile + openmetadata-server: + image: openmetadata/server:latest expose: - 8585 - 9200 @@ -62,9 +60,7 @@ services: - "localhost:172.16.239.11" ingestion: - build: - context: ../../ingestion/. - dockerfile: Dockerfile + image: openmetadata/ingestion:latest expose: - 7777 ports: @@ -75,20 +71,6 @@ services: - "localhost:172.16.239.10" - "localhost:172.16.239.11" - "localhost:172.16.239.12" - - "localhost:172.16.239.13" - - postgres: - image: postgres - restart: always - environment: - POSTGRES_DB: pagila - POSTGRES_USER: openmetadata_user - POSTGRES_PASSWORD: openmetadata_password - ports: - - 5433:5432 - networks: - app_net: - ipv4_address: 172.16.239.13 networks: app_net: diff --git a/docs/install/metadata-ingestion/connectors/mssql.md b/docs/install/metadata-ingestion/connectors/mssql.md index edd0763cace..693fa29785b 100644 --- a/docs/install/metadata-ingestion/connectors/mssql.md +++ b/docs/install/metadata-ingestion/connectors/mssql.md @@ -95,7 +95,7 @@ and ```metadata-rest-tables``` sink along with ```metadata-server``` config } }, "processor": { - "type": "pii-tags", + "type": "pii", "config": { } }, diff --git a/docs/install/metadata-ingestion/connectors/mysql.md b/docs/install/metadata-ingestion/connectors/mysql.md index d497c618137..b542b11ae87 100644 --- a/docs/install/metadata-ingestion/connectors/mysql.md +++ b/docs/install/metadata-ingestion/connectors/mysql.md @@ -93,7 +93,7 @@ and ```metadata-rest-tables``` sink along with ```metadata-server``` config } }, "processor": { - "type": "pii-tags", + "type": "pii", "config": { "api_endpoint": "http://localhost:8585/api" } diff --git a/docs/install/metadata-ingestion/connectors/postgres.md b/docs/install/metadata-ingestion/connectors/postgres.md index cbb388e6831..8a41212ada9 100644 --- a/docs/install/metadata-ingestion/connectors/postgres.md +++ b/docs/install/metadata-ingestion/connectors/postgres.md @@ -91,7 +91,7 @@ and ```metadata-rest-tables``` sink along with ```metadata-server``` config } }, "processor": { - "type": "pii-tags", + "type": "pii", "config": {} }, "sink": { diff --git a/ingestion/Dockerfile b/ingestion/Dockerfile index a83d76fbe55..073a49ea1fc 100644 --- a/ingestion/Dockerfile +++ b/ingestion/Dockerfile @@ -1,8 +1,7 @@ -FROM python:3.9.2 +FROM python:3.8.10 EXPOSE 7777 -COPY ./examples /openmetadata-ingestion/examples COPY ./pipelines /openmetadata-ingestion/pipelines COPY ./ingestion_scheduler /openmetadata-ingestion/ingestion_scheduler COPY ./ingestion_dependency.sh /openmetadata-ingestion/ingestion_dependency.sh diff --git a/ingestion/examples/workflows/bigquery.json b/ingestion/examples/workflows/bigquery.json index 5be5760a015..6ac3cd657f6 100644 --- a/ingestion/examples/workflows/bigquery.json +++ b/ingestion/examples/workflows/bigquery.json @@ -10,7 +10,7 @@ } }, "processor": { - "type": "pii-tags", + "type": "pii", "config": { "api_endpoint": "http://localhost:8585/api" } diff --git a/ingestion/examples/workflows/hive.json b/ingestion/examples/workflows/hive.json index cd5b8a93498..4e5343889e7 100644 --- a/ingestion/examples/workflows/hive.json +++ b/ingestion/examples/workflows/hive.json @@ -8,14 +8,12 @@ } }, "processor": { - "type": "pii-tags", - "config": { - } + "type": "pii", + "config": {} }, "sink": { "type": "metadata-rest-tables", - "config": { - } + "config": {} }, "metadata_server": { "type": "metadata-server", diff --git a/ingestion/examples/workflows/mssql.json b/ingestion/examples/workflows/mssql.json index 1cf10b4f215..49b4cc187bd 100644 --- a/ingestion/examples/workflows/mssql.json +++ b/ingestion/examples/workflows/mssql.json @@ -14,7 +14,7 @@ } }, "processor": { - "type": "pii-tags", + "type": "pii", "config": { } }, diff --git a/ingestion/examples/workflows/postgres.json b/ingestion/examples/workflows/postgres.json index 36cc07ec4eb..cf0fedc56d4 100644 --- a/ingestion/examples/workflows/postgres.json +++ b/ingestion/examples/workflows/postgres.json @@ -11,7 +11,7 @@ } }, "processor": { - "type": "pii-tags", + "type": "pii", "config": {} }, "sink": { diff --git a/ingestion/examples/workflows/redshift.json b/ingestion/examples/workflows/redshift.json index 022e96ba339..45eea9f86ba 100644 --- a/ingestion/examples/workflows/redshift.json +++ b/ingestion/examples/workflows/redshift.json @@ -11,16 +11,14 @@ } }, "processor": { - "type": "pii-tags", - "config": { - } + "type": "pii", + "config": {} }, "sink": { "type": "metadata-rest-tables", - "config": { - } + "config": {} }, - "metadata_server": { + "metadata_server": { "type": "metadata-server", "config": { "api_endpoint": "http://localhost:8585/api", diff --git a/ingestion/examples/workflows/redshift_usage.json b/ingestion/examples/workflows/redshift_usage.json index 56c394d3bd7..74a3f5238b5 100644 --- a/ingestion/examples/workflows/redshift_usage.json +++ b/ingestion/examples/workflows/redshift_usage.json @@ -19,7 +19,7 @@ } }, "stage": { - "type": "table-usage-stage", + "type": "table-usage", "config": { "filename": "/tmp/redshift_usage" } diff --git a/ingestion/examples/workflows/snowflake.json b/ingestion/examples/workflows/snowflake.json index 3075f111ac4..86128d93665 100644 --- a/ingestion/examples/workflows/snowflake.json +++ b/ingestion/examples/workflows/snowflake.json @@ -19,7 +19,7 @@ } }, "processor": { - "type": "pii-tags", + "type": "pii", "config": {} }, "sink": { diff --git a/ingestion/examples/workflows/snowflake_usage.json b/ingestion/examples/workflows/snowflake_usage.json index ec99974752e..fd22111f7bb 100644 --- a/ingestion/examples/workflows/snowflake_usage.json +++ b/ingestion/examples/workflows/snowflake_usage.json @@ -19,7 +19,7 @@ } }, "stage": { - "type": "table-usage-stage", + "type": "table-usage", "config": { "filename": "/tmp/snowflake_usage" } diff --git a/ingestion/pipelines/metadata_to_es.json b/ingestion/pipelines/metadata_to_es.json index 522a70ccb8b..81cae399d47 100644 --- a/ingestion/pipelines/metadata_to_es.json +++ b/ingestion/pipelines/metadata_to_es.json @@ -1,10 +1,10 @@ { "source": { - "type": "metadata-rest-tables", + "type": "metadata_es", "config": {} }, "stage": { - "type": "file-stage", + "type": "file", "config": { "filename": "/tmp/tables.txt" } diff --git a/ingestion/pipelines/sample_tables.json b/ingestion/pipelines/sample_tables.json index 2203a07af63..169851b8ad4 100644 --- a/ingestion/pipelines/sample_tables.json +++ b/ingestion/pipelines/sample_tables.json @@ -9,7 +9,7 @@ } }, "processor": { - "type": "pii-tags", + "type": "pii", "config": { } }, diff --git a/ingestion/pipelines/sample_usage.json b/ingestion/pipelines/sample_usage.json index 60e77b99b44..91a83870a40 100644 --- a/ingestion/pipelines/sample_usage.json +++ b/ingestion/pipelines/sample_usage.json @@ -15,7 +15,7 @@ } }, "stage": { - "type": "table-usage-stage", + "type": "table-usage", "config": { "filename": "/tmp/sample_usage" } diff --git a/ingestion/setup.py b/ingestion/setup.py index 23e465165c0..44378e59d88 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -44,7 +44,7 @@ base_requirements = { "typing_extensions>=3.7.4" "mypy_extensions>=0.4.3", "typing-inspect", - "pydantic~=1.7.4", + "pydantic==1.7.4", "pydantic[email]>=1.7.2", "google>=3.0.0", "google-auth>=1.33.0", @@ -54,13 +54,14 @@ base_requirements = { "python-jose==3.3.0", "okta==1.7.0", "pandas~=1.3.1", - "sqlalchemy>=1.3.24", - "sql-metadata~=2.0.0", - "spacy==3.0.5", - "requests~=2.25.1" + "sqlalchemy>=1.3.24" + "sql-metadata~=2.0.0" + "spacy==3.0.5" + "requests~=2.25.1", + "en_core_web_sm@https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.0.0/en_core_web_sm-3.0.0.tar.gz#egg=en_core_web" } base_plugins = { - "pii-tags", + "pii-processor", "query-parser", "metadata-usage", "file-stage", @@ -110,44 +111,6 @@ setup( packages=find_namespace_packages(where='./src', exclude=['tests*']), entry_points={ "console_scripts": ["metadata = metadata.cmd:metadata"], - "metadata.ingestion.source.plugins": [ - "mysql = metadata.ingestion.source.mysql:MySQLSource", - "postgres = metadata.ingestion.source.postgres:PostgresSource", - "snowflake = metadata.ingestion.source.snowflake:SnowflakeSource", - "redshift = metadata.ingestion.source.redshift:RedshiftSource", - "redshift-sql = metadata.ingestion.source.redshift_sql:RedshiftSQLSource", - "bigquery = metadata.ingestion.source.bigquery:BigQuerySource", - "athena = metadata.ingestion.source.athena:AthenaSource", - "oracle = metadata.ingestion.source.oracle:OracleSource", - "mssql = metadata.ingestion.source.mssql:SQLServerSource", - "hive = metadata.ingestion.source.hive:HiveSource", - "sample-tables = metadata.ingestion.source.sample_data_generator:SampleTableSource", - "sample-users = metadata.ingestion.source.sample_data_generator:SampleUserSource", - "sample-usage = metadata.ingestion.source.sample_data_generator:SampleUsageSource", - "metadata-rest-tables = metadata.ingestion.source.metadata_rest:MetadataTablesRestSource", - "redshift-usage = metadata.ingestion.source.redshift_usage:RedshiftUsageSource", - "snowflake-usage = metadata.ingestion.source.snowflake_usage:SnowflakeUsageSource", - "ldap-users = metadata.ingestion.source.ldap_source:LDAPUserSource" - ], - "metadata.ingestion.sink.plugins": [ - "file = metadata.ingestion.sink.file:FileSink", - "console = metadata.ingestion.sink.console:ConsoleSink", - "metadata-rest-tables = metadata.ingestion.sink.metadata_tables_rest:MetadataTablesRestSink", - "metadata-rest-users = metadata.ingestion.sink.metadata_users_rest:MetadataUsersRestSink", - "ldap-rest-users = metadata.ingestion.sink.ldap_add_user:LdapUserRestSink" - ], - "metadata.ingestion.processor.plugins": [ - "pii-tags = metadata.ingestion.processor.pii_processor:PIIProcessor", - "query-parser = metadata.ingestion.processor.query_parser:QueryParserProcessor", - ], - "metadata.ingestion.stage.plugins": [ - "file-stage = metadata.ingestion.stage.file:FileStage", - "table-usage-stage = metadata.ingestion.stage.table_usage_stage:TableUsageStage" - ], - "metadata.ingestion.bulksink.plugins": [ - "elasticsearch = metadata.ingestion.bulksink.elastic_search:ElasticSearchBulkSink", - "metadata-usage = metadata.ingestion.bulksink.metadata_usage_rest:MetadataUsageBulkSink", - ], }, install_requires=list(base_requirements), extras_require={ diff --git a/ingestion/src/metadata/cmd.py b/ingestion/src/metadata/cmd.py index b3553bd3dbe..60d8da43aaf 100644 --- a/ingestion/src/metadata/cmd.py +++ b/ingestion/src/metadata/cmd.py @@ -71,8 +71,8 @@ def ingest(config: str) -> None: sys.exit(1) workflow.execute() - ret = workflow.print_status() workflow.stop() + ret = workflow.print_status() sys.exit(ret) diff --git a/ingestion/src/metadata/ingestion/bulksink/elastic_search.py b/ingestion/src/metadata/ingestion/bulksink/elasticsearch.py similarity index 99% rename from ingestion/src/metadata/ingestion/bulksink/elastic_search.py rename to ingestion/src/metadata/ingestion/bulksink/elasticsearch.py index 85c37869c81..c07ccd020c0 100644 --- a/ingestion/src/metadata/ingestion/bulksink/elastic_search.py +++ b/ingestion/src/metadata/ingestion/bulksink/elasticsearch.py @@ -30,7 +30,7 @@ class ElasticSearchConfig(ConfigModel): batch_size: Optional[int] = 10000 -class ElasticSearchBulkSink(BulkSink): +class ElasticsearchBulkSink(BulkSink): """ Elasticsearch Publisher uses Bulk API to load data from JSON file. A new index is created and data is uploaded into it. After the upload diff --git a/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py b/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py index 1592458c061..6f4dc5cfa8d 100644 --- a/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py +++ b/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py @@ -32,7 +32,7 @@ class MetadataUsageSinkConfig(ConfigModel): filename: str -class MetadataUsageRestBulkSink(BulkSink): +class MetadataUsageBulkSink(BulkSink): config: MetadataUsageSinkConfig def __init__(self, ctx: WorkflowContext, config: MetadataUsageSinkConfig, metadata_config: MetadataServerConfig): diff --git a/ingestion/src/metadata/ingestion/sink/ldap_users_rest.py b/ingestion/src/metadata/ingestion/sink/ldap_rest_users.py similarity index 98% rename from ingestion/src/metadata/ingestion/sink/ldap_users_rest.py rename to ingestion/src/metadata/ingestion/sink/ldap_rest_users.py index 55f6fae9dbe..5975140f026 100644 --- a/ingestion/src/metadata/ingestion/sink/ldap_users_rest.py +++ b/ingestion/src/metadata/ingestion/sink/ldap_rest_users.py @@ -29,7 +29,7 @@ class LDAPSourceConfig(ConfigModel): api_end_point: str -class LdapUserRestSink(Sink): +class LdapRestUsersSink(Sink): config: LDAPSourceConfig status: SinkStatus diff --git a/ingestion/src/metadata/ingestion/source/bigquery.py b/ingestion/src/metadata/ingestion/source/bigquery.py index c6d3b951659..dd74c1ce025 100644 --- a/ingestion/src/metadata/ingestion/source/bigquery.py +++ b/ingestion/src/metadata/ingestion/source/bigquery.py @@ -31,7 +31,7 @@ class BigQueryConfig(SQLConnectionConfig, SQLSource): return f"{self.scheme}://" -class BigQuerySource(SQLSource): +class BigquerySource(SQLSource): def __init__(self, config, metadata_config, ctx): super().__init__(config, metadata_config, ctx) diff --git a/ingestion/src/metadata/ingestion/source/ldap_source.py b/ingestion/src/metadata/ingestion/source/ldap_users.py similarity index 99% rename from ingestion/src/metadata/ingestion/source/ldap_source.py rename to ingestion/src/metadata/ingestion/source/ldap_users.py index eccb594682a..52900711478 100644 --- a/ingestion/src/metadata/ingestion/source/ldap_source.py +++ b/ingestion/src/metadata/ingestion/source/ldap_users.py @@ -32,7 +32,7 @@ class LDAPUserConfig(ConfigModel): password: str -class LDAPUserSource(Source): +class LdapUsersSource(Source): config: LDAPUserConfig status: SourceStatus diff --git a/ingestion/src/metadata/ingestion/source/metadata_rest.py b/ingestion/src/metadata/ingestion/source/metadata_es.py similarity index 98% rename from ingestion/src/metadata/ingestion/source/metadata_rest.py rename to ingestion/src/metadata/ingestion/source/metadata_es.py index 74f0953c5be..364ed368972 100644 --- a/ingestion/src/metadata/ingestion/source/metadata_rest.py +++ b/ingestion/src/metadata/ingestion/source/metadata_es.py @@ -30,7 +30,7 @@ class MetadataTablesRestSourceConfig(ConfigModel): api_endpoint: Optional[str] = None -class MetadataTablesRestSource(Source): +class MetadataEsSource(Source): config: MetadataTablesRestSourceConfig report: SourceStatus diff --git a/ingestion/src/metadata/ingestion/source/mssql.py b/ingestion/src/metadata/ingestion/source/mssql.py index f5eae64302f..31bcb9ff5d4 100644 --- a/ingestion/src/metadata/ingestion/source/mssql.py +++ b/ingestion/src/metadata/ingestion/source/mssql.py @@ -20,7 +20,7 @@ from .sql_source import SQLConnectionConfig, SQLSource from ..ometa.auth_provider import MetadataServerConfig -class SQLServerConfig(SQLConnectionConfig): +class MssqlConfig(SQLConnectionConfig): host_port = "localhost:1433" scheme = "mssql+pytds" @@ -28,12 +28,12 @@ class SQLServerConfig(SQLConnectionConfig): return super().get_connection_url() -class SQLServerSource(SQLSource): +class MssqlSource(SQLSource): def __init__(self, config, metadata_config, ctx): super().__init__(config, metadata_config, ctx) @classmethod def create(cls, config_dict, metadata_config_dict, ctx): - config = SQLServerConfig.parse_obj(config_dict) + config = MssqlConfig.parse_obj(config_dict) metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict) return cls(config, metadata_config, ctx) diff --git a/ingestion/src/metadata/ingestion/source/sample_tables.py b/ingestion/src/metadata/ingestion/source/sample_tables.py index 2840604f2fc..49cfb7b45e4 100644 --- a/ingestion/src/metadata/ingestion/source/sample_tables.py +++ b/ingestion/src/metadata/ingestion/source/sample_tables.py @@ -169,7 +169,7 @@ class SampleTableMetadataGenerator: return sorted_row_dict -class SampleTableSource(Source): +class SampleTablesSource(Source): def __init__(self, config: SampleTableSourceConfig, metadata_config: MetadataServerConfig, ctx): super().__init__(ctx) diff --git a/ingestion/src/metadata/ingestion/source/sample_usage.py b/ingestion/src/metadata/ingestion/source/sample_usage.py index 30c7526bf22..06bfbca4cb6 100644 --- a/ingestion/src/metadata/ingestion/source/sample_usage.py +++ b/ingestion/src/metadata/ingestion/source/sample_usage.py @@ -1,7 +1,7 @@ import json import csv from metadata.ingestion.api.source import Source -from sample_tables import SampleTableSourceConfig, SampleTableSourceStatus, get_service_or_create +from .sample_tables import SampleTableSourceConfig, SampleTableSourceStatus, get_service_or_create from metadata.ingestion.ometa.auth_provider import MetadataServerConfig from metadata.ingestion.models.table_queries import TableQuery from typing import Iterable diff --git a/ingestion/src/metadata/ingestion/source/snowflake_usage.py b/ingestion/src/metadata/ingestion/source/snowflake_usage.py index f97751de436..30c0ccd884e 100644 --- a/ingestion/src/metadata/ingestion/source/snowflake_usage.py +++ b/ingestion/src/metadata/ingestion/source/snowflake_usage.py @@ -28,7 +28,7 @@ class SnowflakeUsageSource(Source): # SELECT statement from mysql information_schema to extract table and column metadata SQL_STATEMENT = """ select query_id as query,Query_text as sql,query_type as label, - database_name as database,start_time as starttime,end_time as endtime + database_name as database,start_time as starttime,end_time as endtime,schema_name from table(information_schema.query_history( end_time_range_start=>to_timestamp_ltz('{start_date}'), end_time_range_end=>to_timestamp_ltz('{end_date}'))); @@ -83,7 +83,7 @@ class SnowflakeUsageSource(Source): for row in self._get_raw_extract_iter(): tq = TableQuery(row['query'], row['label'], 0, 0, 0, str(row['starttime']), str(row['endtime']), str(row['starttime'])[0:19], 2, row['database'], 0, row['sql']) - self.report.scanned(tq) + self.report.scanned(f"{row['database']}.{row['schema_name']}") yield tq def get_report(self): diff --git a/ingestion/src/metadata/ingestion/stage/table_usage_stage.py b/ingestion/src/metadata/ingestion/stage/table_usage.py similarity index 100% rename from ingestion/src/metadata/ingestion/stage/table_usage_stage.py rename to ingestion/src/metadata/ingestion/stage/table_usage.py diff --git a/ingestion/src/metadata/ingestion/workflow/workflow.py b/ingestion/src/metadata/ingestion/workflow/workflow.py index a237ede0d81..fdb29d5dce3 100644 --- a/ingestion/src/metadata/ingestion/workflow/workflow.py +++ b/ingestion/src/metadata/ingestion/workflow/workflow.py @@ -31,11 +31,8 @@ from metadata.ingestion.api.processor import Processor from metadata.ingestion.api.sink import Sink from metadata.ingestion.api.source import Source from metadata.ingestion.api.stage import Stage -from metadata.ingestion.bulksink.bulk_sink_registry import bulk_sink_registry -from metadata.ingestion.sink.sink_registry import sink_registry -from metadata.ingestion.source.source_registry import source_registry -from metadata.ingestion.processor.processor_registry import processor_registry -from metadata.ingestion.stage.stage_registry import stage_registry +from metadata.ingestion.api.registry import Registry +from metadata.ingestion.api.source import Source logger = logging.getLogger(__name__) @@ -61,9 +58,10 @@ class Workflow: def __init__(self, config: WorkflowConfig): self.config = config self.ctx = WorkflowContext(workflow_id=self.config.run_id) - source_type = self.config.source.type - source_class = source_registry.get(source_type) + source_registry = Registry[Source]() + source_class = source_registry.get('metadata.ingestion.source.{}.{}Source'.format( + source_type.replace('-', '_'), ''.join([i.title() for i in source_type.replace('-', '_').split('_')]))) metadata_config = self.config.metadata_server.dict().get("config", {}) self.source: Source = source_class.create( self.config.source.dict().get("config", {}), metadata_config, self.ctx @@ -74,28 +72,36 @@ class Workflow: if self.config.processor: processor_type = self.config.processor.type - processor_class = processor_registry.get(processor_type) + processor_registry = Registry[Processor]() + processor_class = processor_registry.get('metadata.ingestion.processor.{}.{}Processor'.format( + processor_type.replace('-', '_'), ''.join([i.title() for i in processor_type.replace('-', '_').split('_')]))) processor_config = self.config.processor.dict().get("config", {}) self.processor: Processor = processor_class.create(processor_config, metadata_config, self.ctx) logger.debug(f"Processor Type: {processor_type}, {processor_class} configured") if self.config.stage: stage_type = self.config.stage.type - stage_class = stage_registry.get(stage_type) + stage_registry = Registry[Stage]() + stage_class = stage_registry.get('metadata.ingestion.stage.{}.{}Stage'.format( + stage_type.replace('-', '_'), ''.join([i.title() for i in stage_type.replace('-', '_').split('_')]))) stage_config = self.config.stage.dict().get("config", {}) self.stage: Stage = stage_class.create(stage_config, metadata_config, self.ctx) logger.debug(f"Stage Type: {stage_type}, {stage_class} configured") if self.config.sink: sink_type = self.config.sink.type - sink_class = sink_registry.get(sink_type) + sink_registry = Registry[Sink]() + sink_class = sink_registry.get('metadata.ingestion.sink.{}.{}Sink'.format( + sink_type.replace('-', '_'), ''.join([i.title() for i in sink_type.replace('-', '_').split('_')]))) sink_config = self.config.sink.dict().get("config", {}) self.sink: Sink = sink_class.create(sink_config, metadata_config, self.ctx) logger.debug(f"Sink type:{self.config.sink.type},{sink_class} configured") if self.config.bulk_sink: bulk_sink_type = self.config.bulk_sink.type - bulk_sink_class = bulk_sink_registry.get(bulk_sink_type) + bulk_sink_registry = Registry[BulkSink]() + bulk_sink_class = bulk_sink_registry.get('metadata.ingestion.bulksink.{}.{}BulkSink'.format( + bulk_sink_type.replace('-', '_'), ''.join([i.title() for i in bulk_sink_type.replace('-', '_').split('_')]))) bulk_sink_config = self.config.bulk_sink.dict().get("config", {}) self.bulk_sink: BulkSink = bulk_sink_class.create(bulk_sink_config, metadata_config, self.ctx) logger.info(f"BulkSink type:{self.config.bulk_sink.type},{bulk_sink_class} configured")