From 042ef45ca42649a0126ed16f21d8424523043d87 Mon Sep 17 00:00:00 2001 From: Ayush Shah Date: Fri, 13 Aug 2021 20:35:17 +0530 Subject: [PATCH 1/2] Registry Cleanup --- ingestion/ingestion_dependency.sh | 6 +- ingestion/pipelines/mysql.json | 2 +- .../ingestion/bulksink/bulk_sink_registry.py | 22 -- ...tadata_usage_rest.py => metadata_usage.py} | 2 +- .../processor/{pii_processor.py => pii.py} | 10 +- .../ingestion/processor/processor_registry.py | 23 -- .../{ldap_add_user.py => ldap_users_rest.py} | 0 ...tables_rest.py => metadata_rest_tables.py} | 2 +- ...a_users_rest.py => metadata_rest_users.py} | 2 +- .../metadata/ingestion/sink/sink_registry.py | 22 -- .../src/metadata/ingestion/source/mysql.py | 2 +- ...ple_data_generator.py => sample_tables.py} | 224 ++---------------- .../metadata/ingestion/source/sample_usage.py | 46 ++++ .../metadata/ingestion/source/sample_users.py | 122 ++++++++++ .../ingestion/source/source_registry.py | 22 -- .../ingestion/stage/stage_registry.py | 22 -- 16 files changed, 208 insertions(+), 321 deletions(-) delete mode 100644 ingestion/src/metadata/ingestion/bulksink/bulk_sink_registry.py rename ingestion/src/metadata/ingestion/bulksink/{metadata_usage_rest.py => metadata_usage.py} (99%) rename ingestion/src/metadata/ingestion/processor/{pii_processor.py => pii.py} (97%) delete mode 100644 ingestion/src/metadata/ingestion/processor/processor_registry.py rename ingestion/src/metadata/ingestion/sink/{ldap_add_user.py => ldap_users_rest.py} (100%) rename ingestion/src/metadata/ingestion/sink/{metadata_tables_rest.py => metadata_rest_tables.py} (99%) rename ingestion/src/metadata/ingestion/sink/{metadata_users_rest.py => metadata_rest_users.py} (99%) delete mode 100644 ingestion/src/metadata/ingestion/sink/sink_registry.py rename ingestion/src/metadata/ingestion/source/{sample_data_generator.py => sample_tables.py} (56%) create mode 100644 ingestion/src/metadata/ingestion/source/sample_usage.py create mode 100644 ingestion/src/metadata/ingestion/source/sample_users.py delete mode 100644 ingestion/src/metadata/ingestion/source/source_registry.py delete mode 100644 ingestion/src/metadata/ingestion/stage/stage_registry.py diff --git a/ingestion/ingestion_dependency.sh b/ingestion/ingestion_dependency.sh index da0c9490bee..08f091458bb 100755 --- a/ingestion/ingestion_dependency.sh +++ b/ingestion/ingestion_dependency.sh @@ -17,7 +17,9 @@ # set -euo pipefail -pip install --upgrade pip setuptools openmetadata-ingestion==0.2.1 apns -pip install openmetadata-ingestion[mysql,sample-tables,elasticsearch] +pip install --upgrade setuptools openmetadata-ingestion==0.2.1 apns +# wget https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.0.0/en_core_web_sm-3.0.0-py3-none-any.whl +# pip install en_core_web_sm-3.0.0-py3-none-any.whl python -m spacy download en_core_web_sm +rm -rf en_core_web_sm-3.0.0-py3-none-any.whl pip install "simplescheduler@git+https://github.com/StreamlineData/sdscheduler.git#egg=simplescheduler" diff --git a/ingestion/pipelines/mysql.json b/ingestion/pipelines/mysql.json index 1b1826ae772..09e47380441 100644 --- a/ingestion/pipelines/mysql.json +++ b/ingestion/pipelines/mysql.json @@ -12,7 +12,7 @@ } }, "processor": { - "type": "pii-tags", + "type": "pii", "config": { "api_endpoint": "http://localhost:8585/api" } diff --git a/ingestion/src/metadata/ingestion/bulksink/bulk_sink_registry.py b/ingestion/src/metadata/ingestion/bulksink/bulk_sink_registry.py deleted file mode 100644 index e9722065dfa..00000000000 --- a/ingestion/src/metadata/ingestion/bulksink/bulk_sink_registry.py +++ /dev/null @@ -1,22 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from metadata.ingestion.api.bulk_sink import BulkSink -from metadata.ingestion.api.registry import Registry - -bulk_sink_registry = Registry[BulkSink]() -bulk_sink_registry.load("metadata.ingestion.bulksink.plugins") - - diff --git a/ingestion/src/metadata/ingestion/bulksink/metadata_usage_rest.py b/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py similarity index 99% rename from ingestion/src/metadata/ingestion/bulksink/metadata_usage_rest.py rename to ingestion/src/metadata/ingestion/bulksink/metadata_usage.py index 6f4dc5cfa8d..1592458c061 100644 --- a/ingestion/src/metadata/ingestion/bulksink/metadata_usage_rest.py +++ b/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py @@ -32,7 +32,7 @@ class MetadataUsageSinkConfig(ConfigModel): filename: str -class MetadataUsageBulkSink(BulkSink): +class MetadataUsageRestBulkSink(BulkSink): config: MetadataUsageSinkConfig def __init__(self, ctx: WorkflowContext, config: MetadataUsageSinkConfig, metadata_config: MetadataServerConfig): diff --git a/ingestion/src/metadata/ingestion/processor/pii_processor.py b/ingestion/src/metadata/ingestion/processor/pii.py similarity index 97% rename from ingestion/src/metadata/ingestion/processor/pii_processor.py rename to ingestion/src/metadata/ingestion/processor/pii.py index 27e6d68608d..58126730885 100644 --- a/ingestion/src/metadata/ingestion/processor/pii_processor.py +++ b/ingestion/src/metadata/ingestion/processor/pii.py @@ -160,19 +160,19 @@ class ColumnNameScanner(Scanner): return list(types) -class PIIProcessorConfig(ConfigModel): +class PiiProcessorConfig(ConfigModel): filter: Optional[str] = None api_endpoint: Optional[str] = None auth_provider_type: Optional[str] = None -class PIIProcessor(Processor): - config: PIIProcessorConfig +class PiiProcessor(Processor): + config: PiiProcessorConfig metadata_config: MetadataServerConfig status: ProcessorStatus client: REST - def __init__(self, ctx: WorkflowContext, config: PIIProcessorConfig, metadata_config: MetadataServerConfig): + def __init__(self, ctx: WorkflowContext, config: PiiProcessorConfig, metadata_config: MetadataServerConfig): super().__init__(ctx) self.config = config self.metadata_config = metadata_config @@ -184,7 +184,7 @@ class PIIProcessor(Processor): @classmethod def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext): - config = PIIProcessorConfig.parse_obj(config_dict) + config = PiiProcessorConfig.parse_obj(config_dict) metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict) return cls(ctx, config, metadata_config) diff --git a/ingestion/src/metadata/ingestion/processor/processor_registry.py b/ingestion/src/metadata/ingestion/processor/processor_registry.py deleted file mode 100644 index 94049afea80..00000000000 --- a/ingestion/src/metadata/ingestion/processor/processor_registry.py +++ /dev/null @@ -1,23 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from metadata.ingestion.api.processor import Processor -from metadata.ingestion.api.registry import Registry - -processor_registry = Registry[Processor]() -processor_registry.load("metadata.ingestion.processor.plugins") - - - diff --git a/ingestion/src/metadata/ingestion/sink/ldap_add_user.py b/ingestion/src/metadata/ingestion/sink/ldap_users_rest.py similarity index 100% rename from ingestion/src/metadata/ingestion/sink/ldap_add_user.py rename to ingestion/src/metadata/ingestion/sink/ldap_users_rest.py diff --git a/ingestion/src/metadata/ingestion/sink/metadata_tables_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest_tables.py similarity index 99% rename from ingestion/src/metadata/ingestion/sink/metadata_tables_rest.py rename to ingestion/src/metadata/ingestion/sink/metadata_rest_tables.py index 4620a9240fe..3da28e093fb 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_tables_rest.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest_tables.py @@ -33,7 +33,7 @@ class MetadataTablesSinkConfig(ConfigModel): api_endpoint: str = None -class MetadataTablesRestSink(Sink): +class MetadataRestTablesSink(Sink): config: MetadataTablesSinkConfig status: SinkStatus diff --git a/ingestion/src/metadata/ingestion/sink/metadata_users_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest_users.py similarity index 99% rename from ingestion/src/metadata/ingestion/sink/metadata_users_rest.py rename to ingestion/src/metadata/ingestion/sink/metadata_rest_users.py index 3fc8cd1c391..43ba7e3111c 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_users_rest.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest_users.py @@ -30,7 +30,7 @@ class MetadataUsersSinkConfig(ConfigModel): api_end_point: str = None -class MetadataUsersRestSink(Sink): +class MetadataRestUsersSink(Sink): config: MetadataUsersSinkConfig metadata_config: MetadataServerConfig status: SinkStatus diff --git a/ingestion/src/metadata/ingestion/sink/sink_registry.py b/ingestion/src/metadata/ingestion/sink/sink_registry.py deleted file mode 100644 index 65e5a5bb942..00000000000 --- a/ingestion/src/metadata/ingestion/sink/sink_registry.py +++ /dev/null @@ -1,22 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from metadata.ingestion.api.registry import Registry -from metadata.ingestion.api.sink import Sink - -sink_registry = Registry[Sink]() -sink_registry.load("metadata.ingestion.sink.plugins") -# These sinks are always enabled -assert sink_registry.get("file") diff --git a/ingestion/src/metadata/ingestion/source/mysql.py b/ingestion/src/metadata/ingestion/source/mysql.py index a082d5ed1a1..097a08fd732 100644 --- a/ingestion/src/metadata/ingestion/source/mysql.py +++ b/ingestion/src/metadata/ingestion/source/mysql.py @@ -24,7 +24,7 @@ class MySQLConfig(SQLConnectionConfig): def get_connection_url(self): return super().get_connection_url() -class MySQLSource(SQLSource): +class MysqlSource(SQLSource): def __init__(self, config, metadata_config, ctx): super().__init__(config, metadata_config, ctx) diff --git a/ingestion/src/metadata/ingestion/source/sample_data_generator.py b/ingestion/src/metadata/ingestion/source/sample_tables.py similarity index 56% rename from ingestion/src/metadata/ingestion/source/sample_data_generator.py rename to ingestion/src/metadata/ingestion/source/sample_tables.py index 3beb59a921f..2840604f2fc 100644 --- a/ingestion/src/metadata/ingestion/source/sample_data_generator.py +++ b/ingestion/src/metadata/ingestion/source/sample_tables.py @@ -14,45 +14,50 @@ # limitations under the License. import csv -import json +import pandas as pd import uuid import os -from datetime import datetime - -import pandas as pd -import random -import string -import logging - -from faker import Faker +import json from collections import namedtuple -from typing import Iterable, Dict, Any, List, Union -from metadata.generated.schema.api.services.createDatabaseService import CreateDatabaseServiceEntityRequest -from metadata.generated.schema.entity.services.databaseService import DatabaseServiceEntity +from dataclasses import dataclass, field +from typing import Iterable, List, Dict, Any, Union from metadata.config.common import ConfigModel from metadata.generated.schema.entity.data.table import TableEntity from metadata.generated.schema.entity.data.database import DatabaseEntity from metadata.generated.schema.type.entityReference import EntityReference -from metadata.ingestion.api.source import Source, SourceStatus -from dataclasses import dataclass, field +from metadata.ingestion.api.source import SourceStatus, Source from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable -from metadata.ingestion.models.table_metadata import DatabaseMetadata -from metadata.ingestion.models.table_queries import TableQuery -from metadata.ingestion.models.user import User from metadata.ingestion.ometa.auth_provider import MetadataServerConfig from metadata.ingestion.ometa.client import REST +from metadata.generated.schema.api.services.createDatabaseService import CreateDatabaseServiceEntityRequest +from metadata.generated.schema.entity.services.databaseService import DatabaseServiceEntity COLUMN_NAME = 'Column' KEY_TYPE = 'Key type' DATA_TYPE = 'Data type' -FAKER_METHOD = 'Faker method' COL_DESCRIPTION = 'Description' - -logger = logging.getLogger(__name__) - TableKey = namedtuple('TableKey', ['schema', 'table_name']) +def get_service_or_create(service_json, metadata_config) -> DatabaseServiceEntity: + client = REST(metadata_config) + service = client.get_database_service(service_json['name']) + if service is not None: + return service + else: + created_service = client.create_database_service(CreateDatabaseServiceEntityRequest(**service_json)) + return created_service + + +def get_table_key(row: Dict[str, Any]) -> Union[TableKey, None]: + """ + Table key consists of schema and table name + :param row: + :return: + """ + return TableKey(schema=row['schema'], table_name=row['table_name']) + + class SampleTableSourceConfig(ConfigModel): sample_schema_folder: str service_name: str @@ -64,19 +69,6 @@ class SampleTableSourceConfig(ConfigModel): return self.sample_schema_folder -class SampleUserSourceConfig(ConfigModel): - no_of_users: int - - -def get_table_key(row: Dict[str, Any]) -> Union[TableKey, None]: - """ - Table key consists of schema and table name - :param row: - :return: - """ - return TableKey(schema=row['schema'], table_name=row['table_name']) - - @dataclass class SampleTableSourceStatus(SourceStatus): tables_scanned: List[str] = field(default_factory=list) @@ -85,14 +77,6 @@ class SampleTableSourceStatus(SourceStatus): self.tables_scanned.append(table_name) -@dataclass -class SampleUserSourceStatus(SourceStatus): - users_scanned: List[str] = field(default_factory=list) - - def report_table_scanned(self, user_name: str) -> None: - self.users_scanned.append(user_name) - - class TableSchema: def __init__(self, filename): # error if the file is not csv file @@ -122,31 +106,6 @@ class TableSchema: return [c[COLUMN_NAME] for c in self.columns] -class DataGenerator: - def __init__(self, schemas): - if not schemas: - raise Exception('Input schemas should be an array of one or more TableSchemas') - - self.schemas = schemas - - # validate that each FK is a PK in one of the input schemas - # TODO - - self.table_to_schema = dict((s.get_name(), s) for s in schemas) - - def generate_data(self, table_name, number_of_rows): - fake = Faker() - schema = self.table_to_schema[table_name] - data = {} - for c in schema.get_schema(): - if not c[FAKER_METHOD]: - logging.debug('{} has no faker method input'.format(c)) - continue - fn = getattr(fake, c[FAKER_METHOD]) - data[c[COLUMN_NAME]] = [fn() for _ in range(number_of_rows)] - return pd.DataFrame(data) - - class SampleTableMetadataGenerator: def __init__(self, table_to_df_dict, table_to_schema_map): self.table_to_df_dict = table_to_df_dict @@ -210,60 +169,6 @@ class SampleTableMetadataGenerator: return sorted_row_dict -class SampleUserMetadataGenerator: - - def __init__(self, number_of_users): - self.number_of_users = number_of_users - - def generate_sample_user(self): - schema = dict() - fake = Faker() - # columns that use faker - schema['email'] = lambda: None - schema['first_name'] = lambda: fake.first_name() - schema['last_name'] = lambda: fake.last_name() - schema['full_name'] = lambda: None - schema['github_username'] = lambda: None - schema['team_name'] = lambda: random.choice( - ['Data_Infra', 'Infra', 'Payments', 'Legal', 'Dev_Platform', 'Trust', 'Marketplace']) - schema['employee_type'] = lambda: None - schema['manager_email'] = lambda: fake.email() - schema['slack_id'] = lambda: None - schema['role_name'] = lambda: random.choices( - ['ROLE_ENGINEER', 'ROLE_DATA_SCIENTIST', 'ROLE_ADMIN'], weights=[40, 40, 10])[0] - data = {} - - for k in schema.keys(): - data[k] = [schema[k]() for _ in range(self.number_of_users)] - - # fill in the columns that can be derived from the random data above - for i in range(self.number_of_users): - data['full_name'][i] = data['first_name'][i] + ' ' + data['last_name'][i] - username = data['first_name'][i].lower() + '_' + data['last_name'][i].lower() + random.choice( - string.digits) - data['slack_id'][i] = username - data['github_username'][i] = username - data['email'][i] = username + '@gmail.com' - data['employee_type'] = data['role_name'] - - pd_rows = pd.DataFrame(data) - row_dict = [] - for index, row in pd_rows.iterrows(): - row_dict.append(row) - - return row_dict - - -def get_service_or_create(service_json, metadata_config) -> DatabaseServiceEntity: - client = REST(metadata_config) - service = client.get_database_service(service_json['name']) - if service is not None: - return service - else: - created_service = client.create_database_service(CreateDatabaseServiceEntityRequest(**service_json)) - return created_service - - class SampleTableSource(Source): def __init__(self, config: SampleTableSourceConfig, metadata_config: MetadataServerConfig, ctx): @@ -302,80 +207,3 @@ class SampleTableSource(Source): def get_status(self): return self.status - - -class SampleUsageSource(Source): - - def __init__(self, config: SampleTableSourceConfig, metadata_config: MetadataServerConfig, ctx): - super().__init__(ctx) - self.status = SampleTableSourceStatus() - self.config = config - self.metadata_config = metadata_config - self.client = REST(metadata_config) - self.service_json = json.load(open(config.sample_schema_folder + "/service.json", 'r')) - self.query_log_csv = config.sample_schema_folder + "/query_log" - with open(self.query_log_csv, 'r') as fin: - self.query_logs = [dict(i) for i in csv.DictReader(fin)] - self.service = get_service_or_create(self.service_json, metadata_config) - - @classmethod - def create(cls, config_dict, metadata_config_dict, ctx): - config = SampleTableSourceConfig.parse_obj(config_dict) - metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict) - return cls(config, metadata_config, ctx) - - def prepare(self): - pass - - def next_record(self) -> Iterable[TableQuery]: - for row in self.query_logs: - tq = TableQuery(row['query'], '', 100, 0, 0, '', - '', datetime.today().strftime('%Y-%m-%d %H:%M:%S'), 100, 'shopify', - False, row['query']) - yield tq - - def close(self): - pass - - def get_status(self): - return self.status - - -class SampleUserSource(Source): - - def __init__(self, config: SampleUserSourceConfig, metadata_config: MetadataServerConfig, ctx): - super().__init__(ctx) - self.status = SampleUserSourceStatus() - metadata_gen = SampleUserMetadataGenerator(config.no_of_users) - self.sample_columns = metadata_gen.generate_sample_user() - - @classmethod - def create(cls, config_dict, metadata_config_dict, ctx): - config = SampleUserSourceConfig.parse_obj(config_dict) - metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict) - return cls(config, metadata_config, ctx) - - def prepare(self): - pass - - def next_record(self) -> Iterable[DatabaseMetadata]: - for user in self.sample_columns: - user_metadata = User(user['email'], - user['first_name'], - user['last_name'], - user['full_name'], - user['github_username'], - user['team_name'], - user['employee_type'], - user['manager_email'], - user['slack_id'], - True, - 0) - self.status.report_table_scanned(user['github_username']) - yield user_metadata - - def close(self): - pass - - def get_status(self): - return self.status diff --git a/ingestion/src/metadata/ingestion/source/sample_usage.py b/ingestion/src/metadata/ingestion/source/sample_usage.py new file mode 100644 index 00000000000..30c7526bf22 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/sample_usage.py @@ -0,0 +1,46 @@ +import json +import csv +from metadata.ingestion.api.source import Source +from sample_tables import SampleTableSourceConfig, SampleTableSourceStatus, get_service_or_create +from metadata.ingestion.ometa.auth_provider import MetadataServerConfig +from metadata.ingestion.models.table_queries import TableQuery +from typing import Iterable +from datetime import datetime +from metadata.ingestion.ometa.client import REST + + +class SampleUsageSource(Source): + + def __init__(self, config: SampleTableSourceConfig, metadata_config: MetadataServerConfig, ctx): + super().__init__(ctx) + self.status = SampleTableSourceStatus() + self.config = config + self.metadata_config = metadata_config + self.client = REST(metadata_config) + self.service_json = json.load(open(config.sample_schema_folder + "/service.json", 'r')) + self.query_log_csv = config.sample_schema_folder + "/query_log" + with open(self.query_log_csv, 'r') as fin: + self.query_logs = [dict(i) for i in csv.DictReader(fin)] + self.service = get_service_or_create(self.service_json, metadata_config) + + @classmethod + def create(cls, config_dict, metadata_config_dict, ctx): + config = SampleTableSourceConfig.parse_obj(config_dict) + metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict) + return cls(config, metadata_config, ctx) + + def prepare(self): + pass + + def next_record(self) -> Iterable[TableQuery]: + for row in self.query_logs: + tq = TableQuery(row['query'], '', 100, 0, 0, '', + '', datetime.today().strftime('%Y-%m-%d %H:%M:%S'), 100, 'shopify', + False, row['query']) + yield tq + + def close(self): + pass + + def get_status(self): + return self.status diff --git a/ingestion/src/metadata/ingestion/source/sample_users.py b/ingestion/src/metadata/ingestion/source/sample_users.py new file mode 100644 index 00000000000..5ccbf6c68fd --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/sample_users.py @@ -0,0 +1,122 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import random +import string +import pandas as pd +from faker import Faker +from typing import Iterable, List +from dataclasses import dataclass, field +from metadata.config.common import ConfigModel +from metadata.ingestion.api.source import Source, SourceStatus +from metadata.ingestion.ometa.auth_provider import MetadataServerConfig +from metadata.ingestion.models.table_metadata import DatabaseMetadata +from metadata.ingestion.models.user import User + + +class SampleUserSourceConfig(ConfigModel): + no_of_users: int + + +@dataclass +class SampleUserSourceStatus(SourceStatus): + users_scanned: List[str] = field(default_factory=list) + + def report_table_scanned(self, user_name: str) -> None: + self.users_scanned.append(user_name) + + +class SampleUserMetadataGenerator: + + def __init__(self, number_of_users): + self.number_of_users = number_of_users + + def generate_sample_user(self): + schema = dict() + fake = Faker() + # columns that use faker + schema['email'] = lambda: None + schema['first_name'] = lambda: fake.first_name() + schema['last_name'] = lambda: fake.last_name() + schema['full_name'] = lambda: None + schema['github_username'] = lambda: None + schema['team_name'] = lambda: random.choice( + ['Data_Infra', 'Infra', 'Payments', 'Legal', 'Dev_Platform', 'Trust', 'Marketplace']) + schema['employee_type'] = lambda: None + schema['manager_email'] = lambda: fake.email() + schema['slack_id'] = lambda: None + schema['role_name'] = lambda: random.choices( + ['ROLE_ENGINEER', 'ROLE_DATA_SCIENTIST', 'ROLE_ADMIN'], weights=[40, 40, 10])[0] + data = {} + + for k in schema.keys(): + data[k] = [schema[k]() for _ in range(self.number_of_users)] + + # fill in the columns that can be derived from the random data above + for i in range(self.number_of_users): + data['full_name'][i] = data['first_name'][i] + ' ' + data['last_name'][i] + username = data['first_name'][i].lower() + '_' + data['last_name'][i].lower() + random.choice( + string.digits) + data['slack_id'][i] = username + data['github_username'][i] = username + data['email'][i] = username + '@gmail.com' + data['employee_type'] = data['role_name'] + + pd_rows = pd.DataFrame(data) + row_dict = [] + for index, row in pd_rows.iterrows(): + row_dict.append(row) + + return row_dict + + +class SampleUsersSource(Source): + + def __init__(self, config: SampleUserSourceConfig, metadata_config: MetadataServerConfig, ctx): + super().__init__(ctx) + self.status = SampleUserSourceStatus() + metadata_gen = SampleUserMetadataGenerator(config.no_of_users) + self.sample_columns = metadata_gen.generate_sample_user() + + @classmethod + def create(cls, config_dict, metadata_config_dict, ctx): + config = SampleUserSourceConfig.parse_obj(config_dict) + metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict) + return cls(config, metadata_config, ctx) + + def prepare(self): + pass + + def next_record(self) -> Iterable[DatabaseMetadata]: + for user in self.sample_columns: + user_metadata = User(user['email'], + user['first_name'], + user['last_name'], + user['full_name'], + user['github_username'], + user['team_name'], + user['employee_type'], + user['manager_email'], + user['slack_id'], + True, + 0) + self.status.report_table_scanned(user['github_username']) + yield user_metadata + + def close(self): + pass + + def get_status(self): + return self.status diff --git a/ingestion/src/metadata/ingestion/source/source_registry.py b/ingestion/src/metadata/ingestion/source/source_registry.py deleted file mode 100644 index 8e4ae2dbcd5..00000000000 --- a/ingestion/src/metadata/ingestion/source/source_registry.py +++ /dev/null @@ -1,22 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from metadata.ingestion.api.registry import Registry -from metadata.ingestion.api.source import Source - -source_registry = Registry[Source]() -source_registry.load("metadata.ingestion.source.plugins") - -# This source is always enabled diff --git a/ingestion/src/metadata/ingestion/stage/stage_registry.py b/ingestion/src/metadata/ingestion/stage/stage_registry.py deleted file mode 100644 index 538aa0e419e..00000000000 --- a/ingestion/src/metadata/ingestion/stage/stage_registry.py +++ /dev/null @@ -1,22 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from metadata.ingestion.api.registry import Registry -from metadata.ingestion.api.stage import Stage - -stage_registry = Registry[Stage]() -stage_registry.load("metadata.ingestion.stage.plugins") - - From 03ad583744844cc346aa42f91f588eb1b99e87c6 Mon Sep 17 00:00:00 2001 From: Ayush Shah Date: Sat, 14 Aug 2021 00:09:51 +0530 Subject: [PATCH 2/2] Registry dependency removed --- docker/metadata/docker-compose.yml | 24 ++------- .../metadata-ingestion/connectors/mssql.md | 2 +- .../metadata-ingestion/connectors/mysql.md | 2 +- .../metadata-ingestion/connectors/postgres.md | 2 +- ingestion/Dockerfile | 3 +- ingestion/examples/workflows/bigquery.json | 2 +- ingestion/examples/workflows/hive.json | 8 ++- ingestion/examples/workflows/mssql.json | 2 +- ingestion/examples/workflows/postgres.json | 2 +- ingestion/examples/workflows/redshift.json | 10 ++-- .../examples/workflows/redshift_usage.json | 2 +- ingestion/examples/workflows/snowflake.json | 2 +- .../examples/workflows/snowflake_usage.json | 2 +- ingestion/pipelines/metadata_to_es.json | 4 +- ingestion/pipelines/sample_tables.json | 2 +- ingestion/pipelines/sample_usage.json | 2 +- ingestion/setup.py | 51 +++---------------- ingestion/src/metadata/cmd.py | 2 +- .../{elastic_search.py => elasticsearch.py} | 2 +- .../ingestion/bulksink/metadata_usage.py | 2 +- ...{ldap_users_rest.py => ldap_rest_users.py} | 2 +- .../src/metadata/ingestion/source/bigquery.py | 2 +- .../source/{ldap_source.py => ldap_users.py} | 2 +- .../{metadata_rest.py => metadata_es.py} | 2 +- .../src/metadata/ingestion/source/mssql.py | 6 +-- .../ingestion/source/sample_tables.py | 2 +- .../metadata/ingestion/source/sample_usage.py | 2 +- .../ingestion/source/snowflake_usage.py | 4 +- .../{table_usage_stage.py => table_usage.py} | 0 .../metadata/ingestion/workflow/workflow.py | 28 ++++++---- 30 files changed, 62 insertions(+), 116 deletions(-) rename ingestion/src/metadata/ingestion/bulksink/{elastic_search.py => elasticsearch.py} (99%) rename ingestion/src/metadata/ingestion/sink/{ldap_users_rest.py => ldap_rest_users.py} (98%) rename ingestion/src/metadata/ingestion/source/{ldap_source.py => ldap_users.py} (99%) rename ingestion/src/metadata/ingestion/source/{metadata_rest.py => metadata_es.py} (98%) rename ingestion/src/metadata/ingestion/stage/{table_usage_stage.py => table_usage.py} (100%) diff --git a/docker/metadata/docker-compose.yml b/docker/metadata/docker-compose.yml index 9fd16f50880..94be0605039 100644 --- a/docker/metadata/docker-compose.yml +++ b/docker/metadata/docker-compose.yml @@ -41,10 +41,8 @@ services: - 9200:9200 - 9300:9300 - catalog: - build: - context: ../../. - dockerfile: docker/metadata/Dockerfile + openmetadata-server: + image: openmetadata/server:latest expose: - 8585 - 9200 @@ -62,9 +60,7 @@ services: - "localhost:172.16.239.11" ingestion: - build: - context: ../../ingestion/. - dockerfile: Dockerfile + image: openmetadata/ingestion:latest expose: - 7777 ports: @@ -75,20 +71,6 @@ services: - "localhost:172.16.239.10" - "localhost:172.16.239.11" - "localhost:172.16.239.12" - - "localhost:172.16.239.13" - - postgres: - image: postgres - restart: always - environment: - POSTGRES_DB: pagila - POSTGRES_USER: openmetadata_user - POSTGRES_PASSWORD: openmetadata_password - ports: - - 5433:5432 - networks: - app_net: - ipv4_address: 172.16.239.13 networks: app_net: diff --git a/docs/install/metadata-ingestion/connectors/mssql.md b/docs/install/metadata-ingestion/connectors/mssql.md index edd0763cace..693fa29785b 100644 --- a/docs/install/metadata-ingestion/connectors/mssql.md +++ b/docs/install/metadata-ingestion/connectors/mssql.md @@ -95,7 +95,7 @@ and ```metadata-rest-tables``` sink along with ```metadata-server``` config } }, "processor": { - "type": "pii-tags", + "type": "pii", "config": { } }, diff --git a/docs/install/metadata-ingestion/connectors/mysql.md b/docs/install/metadata-ingestion/connectors/mysql.md index d497c618137..b542b11ae87 100644 --- a/docs/install/metadata-ingestion/connectors/mysql.md +++ b/docs/install/metadata-ingestion/connectors/mysql.md @@ -93,7 +93,7 @@ and ```metadata-rest-tables``` sink along with ```metadata-server``` config } }, "processor": { - "type": "pii-tags", + "type": "pii", "config": { "api_endpoint": "http://localhost:8585/api" } diff --git a/docs/install/metadata-ingestion/connectors/postgres.md b/docs/install/metadata-ingestion/connectors/postgres.md index cbb388e6831..8a41212ada9 100644 --- a/docs/install/metadata-ingestion/connectors/postgres.md +++ b/docs/install/metadata-ingestion/connectors/postgres.md @@ -91,7 +91,7 @@ and ```metadata-rest-tables``` sink along with ```metadata-server``` config } }, "processor": { - "type": "pii-tags", + "type": "pii", "config": {} }, "sink": { diff --git a/ingestion/Dockerfile b/ingestion/Dockerfile index a83d76fbe55..073a49ea1fc 100644 --- a/ingestion/Dockerfile +++ b/ingestion/Dockerfile @@ -1,8 +1,7 @@ -FROM python:3.9.2 +FROM python:3.8.10 EXPOSE 7777 -COPY ./examples /openmetadata-ingestion/examples COPY ./pipelines /openmetadata-ingestion/pipelines COPY ./ingestion_scheduler /openmetadata-ingestion/ingestion_scheduler COPY ./ingestion_dependency.sh /openmetadata-ingestion/ingestion_dependency.sh diff --git a/ingestion/examples/workflows/bigquery.json b/ingestion/examples/workflows/bigquery.json index 5be5760a015..6ac3cd657f6 100644 --- a/ingestion/examples/workflows/bigquery.json +++ b/ingestion/examples/workflows/bigquery.json @@ -10,7 +10,7 @@ } }, "processor": { - "type": "pii-tags", + "type": "pii", "config": { "api_endpoint": "http://localhost:8585/api" } diff --git a/ingestion/examples/workflows/hive.json b/ingestion/examples/workflows/hive.json index cd5b8a93498..4e5343889e7 100644 --- a/ingestion/examples/workflows/hive.json +++ b/ingestion/examples/workflows/hive.json @@ -8,14 +8,12 @@ } }, "processor": { - "type": "pii-tags", - "config": { - } + "type": "pii", + "config": {} }, "sink": { "type": "metadata-rest-tables", - "config": { - } + "config": {} }, "metadata_server": { "type": "metadata-server", diff --git a/ingestion/examples/workflows/mssql.json b/ingestion/examples/workflows/mssql.json index 1cf10b4f215..49b4cc187bd 100644 --- a/ingestion/examples/workflows/mssql.json +++ b/ingestion/examples/workflows/mssql.json @@ -14,7 +14,7 @@ } }, "processor": { - "type": "pii-tags", + "type": "pii", "config": { } }, diff --git a/ingestion/examples/workflows/postgres.json b/ingestion/examples/workflows/postgres.json index 36cc07ec4eb..cf0fedc56d4 100644 --- a/ingestion/examples/workflows/postgres.json +++ b/ingestion/examples/workflows/postgres.json @@ -11,7 +11,7 @@ } }, "processor": { - "type": "pii-tags", + "type": "pii", "config": {} }, "sink": { diff --git a/ingestion/examples/workflows/redshift.json b/ingestion/examples/workflows/redshift.json index 022e96ba339..45eea9f86ba 100644 --- a/ingestion/examples/workflows/redshift.json +++ b/ingestion/examples/workflows/redshift.json @@ -11,16 +11,14 @@ } }, "processor": { - "type": "pii-tags", - "config": { - } + "type": "pii", + "config": {} }, "sink": { "type": "metadata-rest-tables", - "config": { - } + "config": {} }, - "metadata_server": { + "metadata_server": { "type": "metadata-server", "config": { "api_endpoint": "http://localhost:8585/api", diff --git a/ingestion/examples/workflows/redshift_usage.json b/ingestion/examples/workflows/redshift_usage.json index 56c394d3bd7..74a3f5238b5 100644 --- a/ingestion/examples/workflows/redshift_usage.json +++ b/ingestion/examples/workflows/redshift_usage.json @@ -19,7 +19,7 @@ } }, "stage": { - "type": "table-usage-stage", + "type": "table-usage", "config": { "filename": "/tmp/redshift_usage" } diff --git a/ingestion/examples/workflows/snowflake.json b/ingestion/examples/workflows/snowflake.json index 3075f111ac4..86128d93665 100644 --- a/ingestion/examples/workflows/snowflake.json +++ b/ingestion/examples/workflows/snowflake.json @@ -19,7 +19,7 @@ } }, "processor": { - "type": "pii-tags", + "type": "pii", "config": {} }, "sink": { diff --git a/ingestion/examples/workflows/snowflake_usage.json b/ingestion/examples/workflows/snowflake_usage.json index ec99974752e..fd22111f7bb 100644 --- a/ingestion/examples/workflows/snowflake_usage.json +++ b/ingestion/examples/workflows/snowflake_usage.json @@ -19,7 +19,7 @@ } }, "stage": { - "type": "table-usage-stage", + "type": "table-usage", "config": { "filename": "/tmp/snowflake_usage" } diff --git a/ingestion/pipelines/metadata_to_es.json b/ingestion/pipelines/metadata_to_es.json index 522a70ccb8b..81cae399d47 100644 --- a/ingestion/pipelines/metadata_to_es.json +++ b/ingestion/pipelines/metadata_to_es.json @@ -1,10 +1,10 @@ { "source": { - "type": "metadata-rest-tables", + "type": "metadata_es", "config": {} }, "stage": { - "type": "file-stage", + "type": "file", "config": { "filename": "/tmp/tables.txt" } diff --git a/ingestion/pipelines/sample_tables.json b/ingestion/pipelines/sample_tables.json index 2203a07af63..169851b8ad4 100644 --- a/ingestion/pipelines/sample_tables.json +++ b/ingestion/pipelines/sample_tables.json @@ -9,7 +9,7 @@ } }, "processor": { - "type": "pii-tags", + "type": "pii", "config": { } }, diff --git a/ingestion/pipelines/sample_usage.json b/ingestion/pipelines/sample_usage.json index 60e77b99b44..91a83870a40 100644 --- a/ingestion/pipelines/sample_usage.json +++ b/ingestion/pipelines/sample_usage.json @@ -15,7 +15,7 @@ } }, "stage": { - "type": "table-usage-stage", + "type": "table-usage", "config": { "filename": "/tmp/sample_usage" } diff --git a/ingestion/setup.py b/ingestion/setup.py index 23e465165c0..44378e59d88 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -44,7 +44,7 @@ base_requirements = { "typing_extensions>=3.7.4" "mypy_extensions>=0.4.3", "typing-inspect", - "pydantic~=1.7.4", + "pydantic==1.7.4", "pydantic[email]>=1.7.2", "google>=3.0.0", "google-auth>=1.33.0", @@ -54,13 +54,14 @@ base_requirements = { "python-jose==3.3.0", "okta==1.7.0", "pandas~=1.3.1", - "sqlalchemy>=1.3.24", - "sql-metadata~=2.0.0", - "spacy==3.0.5", - "requests~=2.25.1" + "sqlalchemy>=1.3.24" + "sql-metadata~=2.0.0" + "spacy==3.0.5" + "requests~=2.25.1", + "en_core_web_sm@https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.0.0/en_core_web_sm-3.0.0.tar.gz#egg=en_core_web" } base_plugins = { - "pii-tags", + "pii-processor", "query-parser", "metadata-usage", "file-stage", @@ -110,44 +111,6 @@ setup( packages=find_namespace_packages(where='./src', exclude=['tests*']), entry_points={ "console_scripts": ["metadata = metadata.cmd:metadata"], - "metadata.ingestion.source.plugins": [ - "mysql = metadata.ingestion.source.mysql:MySQLSource", - "postgres = metadata.ingestion.source.postgres:PostgresSource", - "snowflake = metadata.ingestion.source.snowflake:SnowflakeSource", - "redshift = metadata.ingestion.source.redshift:RedshiftSource", - "redshift-sql = metadata.ingestion.source.redshift_sql:RedshiftSQLSource", - "bigquery = metadata.ingestion.source.bigquery:BigQuerySource", - "athena = metadata.ingestion.source.athena:AthenaSource", - "oracle = metadata.ingestion.source.oracle:OracleSource", - "mssql = metadata.ingestion.source.mssql:SQLServerSource", - "hive = metadata.ingestion.source.hive:HiveSource", - "sample-tables = metadata.ingestion.source.sample_data_generator:SampleTableSource", - "sample-users = metadata.ingestion.source.sample_data_generator:SampleUserSource", - "sample-usage = metadata.ingestion.source.sample_data_generator:SampleUsageSource", - "metadata-rest-tables = metadata.ingestion.source.metadata_rest:MetadataTablesRestSource", - "redshift-usage = metadata.ingestion.source.redshift_usage:RedshiftUsageSource", - "snowflake-usage = metadata.ingestion.source.snowflake_usage:SnowflakeUsageSource", - "ldap-users = metadata.ingestion.source.ldap_source:LDAPUserSource" - ], - "metadata.ingestion.sink.plugins": [ - "file = metadata.ingestion.sink.file:FileSink", - "console = metadata.ingestion.sink.console:ConsoleSink", - "metadata-rest-tables = metadata.ingestion.sink.metadata_tables_rest:MetadataTablesRestSink", - "metadata-rest-users = metadata.ingestion.sink.metadata_users_rest:MetadataUsersRestSink", - "ldap-rest-users = metadata.ingestion.sink.ldap_add_user:LdapUserRestSink" - ], - "metadata.ingestion.processor.plugins": [ - "pii-tags = metadata.ingestion.processor.pii_processor:PIIProcessor", - "query-parser = metadata.ingestion.processor.query_parser:QueryParserProcessor", - ], - "metadata.ingestion.stage.plugins": [ - "file-stage = metadata.ingestion.stage.file:FileStage", - "table-usage-stage = metadata.ingestion.stage.table_usage_stage:TableUsageStage" - ], - "metadata.ingestion.bulksink.plugins": [ - "elasticsearch = metadata.ingestion.bulksink.elastic_search:ElasticSearchBulkSink", - "metadata-usage = metadata.ingestion.bulksink.metadata_usage_rest:MetadataUsageBulkSink", - ], }, install_requires=list(base_requirements), extras_require={ diff --git a/ingestion/src/metadata/cmd.py b/ingestion/src/metadata/cmd.py index b3553bd3dbe..60d8da43aaf 100644 --- a/ingestion/src/metadata/cmd.py +++ b/ingestion/src/metadata/cmd.py @@ -71,8 +71,8 @@ def ingest(config: str) -> None: sys.exit(1) workflow.execute() - ret = workflow.print_status() workflow.stop() + ret = workflow.print_status() sys.exit(ret) diff --git a/ingestion/src/metadata/ingestion/bulksink/elastic_search.py b/ingestion/src/metadata/ingestion/bulksink/elasticsearch.py similarity index 99% rename from ingestion/src/metadata/ingestion/bulksink/elastic_search.py rename to ingestion/src/metadata/ingestion/bulksink/elasticsearch.py index 85c37869c81..c07ccd020c0 100644 --- a/ingestion/src/metadata/ingestion/bulksink/elastic_search.py +++ b/ingestion/src/metadata/ingestion/bulksink/elasticsearch.py @@ -30,7 +30,7 @@ class ElasticSearchConfig(ConfigModel): batch_size: Optional[int] = 10000 -class ElasticSearchBulkSink(BulkSink): +class ElasticsearchBulkSink(BulkSink): """ Elasticsearch Publisher uses Bulk API to load data from JSON file. A new index is created and data is uploaded into it. After the upload diff --git a/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py b/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py index 1592458c061..6f4dc5cfa8d 100644 --- a/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py +++ b/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py @@ -32,7 +32,7 @@ class MetadataUsageSinkConfig(ConfigModel): filename: str -class MetadataUsageRestBulkSink(BulkSink): +class MetadataUsageBulkSink(BulkSink): config: MetadataUsageSinkConfig def __init__(self, ctx: WorkflowContext, config: MetadataUsageSinkConfig, metadata_config: MetadataServerConfig): diff --git a/ingestion/src/metadata/ingestion/sink/ldap_users_rest.py b/ingestion/src/metadata/ingestion/sink/ldap_rest_users.py similarity index 98% rename from ingestion/src/metadata/ingestion/sink/ldap_users_rest.py rename to ingestion/src/metadata/ingestion/sink/ldap_rest_users.py index 55f6fae9dbe..5975140f026 100644 --- a/ingestion/src/metadata/ingestion/sink/ldap_users_rest.py +++ b/ingestion/src/metadata/ingestion/sink/ldap_rest_users.py @@ -29,7 +29,7 @@ class LDAPSourceConfig(ConfigModel): api_end_point: str -class LdapUserRestSink(Sink): +class LdapRestUsersSink(Sink): config: LDAPSourceConfig status: SinkStatus diff --git a/ingestion/src/metadata/ingestion/source/bigquery.py b/ingestion/src/metadata/ingestion/source/bigquery.py index c6d3b951659..dd74c1ce025 100644 --- a/ingestion/src/metadata/ingestion/source/bigquery.py +++ b/ingestion/src/metadata/ingestion/source/bigquery.py @@ -31,7 +31,7 @@ class BigQueryConfig(SQLConnectionConfig, SQLSource): return f"{self.scheme}://" -class BigQuerySource(SQLSource): +class BigquerySource(SQLSource): def __init__(self, config, metadata_config, ctx): super().__init__(config, metadata_config, ctx) diff --git a/ingestion/src/metadata/ingestion/source/ldap_source.py b/ingestion/src/metadata/ingestion/source/ldap_users.py similarity index 99% rename from ingestion/src/metadata/ingestion/source/ldap_source.py rename to ingestion/src/metadata/ingestion/source/ldap_users.py index eccb594682a..52900711478 100644 --- a/ingestion/src/metadata/ingestion/source/ldap_source.py +++ b/ingestion/src/metadata/ingestion/source/ldap_users.py @@ -32,7 +32,7 @@ class LDAPUserConfig(ConfigModel): password: str -class LDAPUserSource(Source): +class LdapUsersSource(Source): config: LDAPUserConfig status: SourceStatus diff --git a/ingestion/src/metadata/ingestion/source/metadata_rest.py b/ingestion/src/metadata/ingestion/source/metadata_es.py similarity index 98% rename from ingestion/src/metadata/ingestion/source/metadata_rest.py rename to ingestion/src/metadata/ingestion/source/metadata_es.py index 74f0953c5be..364ed368972 100644 --- a/ingestion/src/metadata/ingestion/source/metadata_rest.py +++ b/ingestion/src/metadata/ingestion/source/metadata_es.py @@ -30,7 +30,7 @@ class MetadataTablesRestSourceConfig(ConfigModel): api_endpoint: Optional[str] = None -class MetadataTablesRestSource(Source): +class MetadataEsSource(Source): config: MetadataTablesRestSourceConfig report: SourceStatus diff --git a/ingestion/src/metadata/ingestion/source/mssql.py b/ingestion/src/metadata/ingestion/source/mssql.py index f5eae64302f..31bcb9ff5d4 100644 --- a/ingestion/src/metadata/ingestion/source/mssql.py +++ b/ingestion/src/metadata/ingestion/source/mssql.py @@ -20,7 +20,7 @@ from .sql_source import SQLConnectionConfig, SQLSource from ..ometa.auth_provider import MetadataServerConfig -class SQLServerConfig(SQLConnectionConfig): +class MssqlConfig(SQLConnectionConfig): host_port = "localhost:1433" scheme = "mssql+pytds" @@ -28,12 +28,12 @@ class SQLServerConfig(SQLConnectionConfig): return super().get_connection_url() -class SQLServerSource(SQLSource): +class MssqlSource(SQLSource): def __init__(self, config, metadata_config, ctx): super().__init__(config, metadata_config, ctx) @classmethod def create(cls, config_dict, metadata_config_dict, ctx): - config = SQLServerConfig.parse_obj(config_dict) + config = MssqlConfig.parse_obj(config_dict) metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict) return cls(config, metadata_config, ctx) diff --git a/ingestion/src/metadata/ingestion/source/sample_tables.py b/ingestion/src/metadata/ingestion/source/sample_tables.py index 2840604f2fc..49cfb7b45e4 100644 --- a/ingestion/src/metadata/ingestion/source/sample_tables.py +++ b/ingestion/src/metadata/ingestion/source/sample_tables.py @@ -169,7 +169,7 @@ class SampleTableMetadataGenerator: return sorted_row_dict -class SampleTableSource(Source): +class SampleTablesSource(Source): def __init__(self, config: SampleTableSourceConfig, metadata_config: MetadataServerConfig, ctx): super().__init__(ctx) diff --git a/ingestion/src/metadata/ingestion/source/sample_usage.py b/ingestion/src/metadata/ingestion/source/sample_usage.py index 30c7526bf22..06bfbca4cb6 100644 --- a/ingestion/src/metadata/ingestion/source/sample_usage.py +++ b/ingestion/src/metadata/ingestion/source/sample_usage.py @@ -1,7 +1,7 @@ import json import csv from metadata.ingestion.api.source import Source -from sample_tables import SampleTableSourceConfig, SampleTableSourceStatus, get_service_or_create +from .sample_tables import SampleTableSourceConfig, SampleTableSourceStatus, get_service_or_create from metadata.ingestion.ometa.auth_provider import MetadataServerConfig from metadata.ingestion.models.table_queries import TableQuery from typing import Iterable diff --git a/ingestion/src/metadata/ingestion/source/snowflake_usage.py b/ingestion/src/metadata/ingestion/source/snowflake_usage.py index f97751de436..30c0ccd884e 100644 --- a/ingestion/src/metadata/ingestion/source/snowflake_usage.py +++ b/ingestion/src/metadata/ingestion/source/snowflake_usage.py @@ -28,7 +28,7 @@ class SnowflakeUsageSource(Source): # SELECT statement from mysql information_schema to extract table and column metadata SQL_STATEMENT = """ select query_id as query,Query_text as sql,query_type as label, - database_name as database,start_time as starttime,end_time as endtime + database_name as database,start_time as starttime,end_time as endtime,schema_name from table(information_schema.query_history( end_time_range_start=>to_timestamp_ltz('{start_date}'), end_time_range_end=>to_timestamp_ltz('{end_date}'))); @@ -83,7 +83,7 @@ class SnowflakeUsageSource(Source): for row in self._get_raw_extract_iter(): tq = TableQuery(row['query'], row['label'], 0, 0, 0, str(row['starttime']), str(row['endtime']), str(row['starttime'])[0:19], 2, row['database'], 0, row['sql']) - self.report.scanned(tq) + self.report.scanned(f"{row['database']}.{row['schema_name']}") yield tq def get_report(self): diff --git a/ingestion/src/metadata/ingestion/stage/table_usage_stage.py b/ingestion/src/metadata/ingestion/stage/table_usage.py similarity index 100% rename from ingestion/src/metadata/ingestion/stage/table_usage_stage.py rename to ingestion/src/metadata/ingestion/stage/table_usage.py diff --git a/ingestion/src/metadata/ingestion/workflow/workflow.py b/ingestion/src/metadata/ingestion/workflow/workflow.py index a237ede0d81..fdb29d5dce3 100644 --- a/ingestion/src/metadata/ingestion/workflow/workflow.py +++ b/ingestion/src/metadata/ingestion/workflow/workflow.py @@ -31,11 +31,8 @@ from metadata.ingestion.api.processor import Processor from metadata.ingestion.api.sink import Sink from metadata.ingestion.api.source import Source from metadata.ingestion.api.stage import Stage -from metadata.ingestion.bulksink.bulk_sink_registry import bulk_sink_registry -from metadata.ingestion.sink.sink_registry import sink_registry -from metadata.ingestion.source.source_registry import source_registry -from metadata.ingestion.processor.processor_registry import processor_registry -from metadata.ingestion.stage.stage_registry import stage_registry +from metadata.ingestion.api.registry import Registry +from metadata.ingestion.api.source import Source logger = logging.getLogger(__name__) @@ -61,9 +58,10 @@ class Workflow: def __init__(self, config: WorkflowConfig): self.config = config self.ctx = WorkflowContext(workflow_id=self.config.run_id) - source_type = self.config.source.type - source_class = source_registry.get(source_type) + source_registry = Registry[Source]() + source_class = source_registry.get('metadata.ingestion.source.{}.{}Source'.format( + source_type.replace('-', '_'), ''.join([i.title() for i in source_type.replace('-', '_').split('_')]))) metadata_config = self.config.metadata_server.dict().get("config", {}) self.source: Source = source_class.create( self.config.source.dict().get("config", {}), metadata_config, self.ctx @@ -74,28 +72,36 @@ class Workflow: if self.config.processor: processor_type = self.config.processor.type - processor_class = processor_registry.get(processor_type) + processor_registry = Registry[Processor]() + processor_class = processor_registry.get('metadata.ingestion.processor.{}.{}Processor'.format( + processor_type.replace('-', '_'), ''.join([i.title() for i in processor_type.replace('-', '_').split('_')]))) processor_config = self.config.processor.dict().get("config", {}) self.processor: Processor = processor_class.create(processor_config, metadata_config, self.ctx) logger.debug(f"Processor Type: {processor_type}, {processor_class} configured") if self.config.stage: stage_type = self.config.stage.type - stage_class = stage_registry.get(stage_type) + stage_registry = Registry[Stage]() + stage_class = stage_registry.get('metadata.ingestion.stage.{}.{}Stage'.format( + stage_type.replace('-', '_'), ''.join([i.title() for i in stage_type.replace('-', '_').split('_')]))) stage_config = self.config.stage.dict().get("config", {}) self.stage: Stage = stage_class.create(stage_config, metadata_config, self.ctx) logger.debug(f"Stage Type: {stage_type}, {stage_class} configured") if self.config.sink: sink_type = self.config.sink.type - sink_class = sink_registry.get(sink_type) + sink_registry = Registry[Sink]() + sink_class = sink_registry.get('metadata.ingestion.sink.{}.{}Sink'.format( + sink_type.replace('-', '_'), ''.join([i.title() for i in sink_type.replace('-', '_').split('_')]))) sink_config = self.config.sink.dict().get("config", {}) self.sink: Sink = sink_class.create(sink_config, metadata_config, self.ctx) logger.debug(f"Sink type:{self.config.sink.type},{sink_class} configured") if self.config.bulk_sink: bulk_sink_type = self.config.bulk_sink.type - bulk_sink_class = bulk_sink_registry.get(bulk_sink_type) + bulk_sink_registry = Registry[BulkSink]() + bulk_sink_class = bulk_sink_registry.get('metadata.ingestion.bulksink.{}.{}BulkSink'.format( + bulk_sink_type.replace('-', '_'), ''.join([i.title() for i in bulk_sink_type.replace('-', '_').split('_')]))) bulk_sink_config = self.config.bulk_sink.dict().get("config", {}) self.bulk_sink: BulkSink = bulk_sink_class.create(bulk_sink_config, metadata_config, self.ctx) logger.info(f"BulkSink type:{self.config.bulk_sink.type},{bulk_sink_class} configured")