From 042ef45ca42649a0126ed16f21d8424523043d87 Mon Sep 17 00:00:00 2001 From: Ayush Shah Date: Fri, 13 Aug 2021 20:35:17 +0530 Subject: [PATCH] Registry Cleanup --- ingestion/ingestion_dependency.sh | 6 +- ingestion/pipelines/mysql.json | 2 +- .../ingestion/bulksink/bulk_sink_registry.py | 22 -- ...tadata_usage_rest.py => metadata_usage.py} | 2 +- .../processor/{pii_processor.py => pii.py} | 10 +- .../ingestion/processor/processor_registry.py | 23 -- .../{ldap_add_user.py => ldap_users_rest.py} | 0 ...tables_rest.py => metadata_rest_tables.py} | 2 +- ...a_users_rest.py => metadata_rest_users.py} | 2 +- .../metadata/ingestion/sink/sink_registry.py | 22 -- .../src/metadata/ingestion/source/mysql.py | 2 +- ...ple_data_generator.py => sample_tables.py} | 224 ++---------------- .../metadata/ingestion/source/sample_usage.py | 46 ++++ .../metadata/ingestion/source/sample_users.py | 122 ++++++++++ .../ingestion/source/source_registry.py | 22 -- .../ingestion/stage/stage_registry.py | 22 -- 16 files changed, 208 insertions(+), 321 deletions(-) delete mode 100644 ingestion/src/metadata/ingestion/bulksink/bulk_sink_registry.py rename ingestion/src/metadata/ingestion/bulksink/{metadata_usage_rest.py => metadata_usage.py} (99%) rename ingestion/src/metadata/ingestion/processor/{pii_processor.py => pii.py} (97%) delete mode 100644 ingestion/src/metadata/ingestion/processor/processor_registry.py rename ingestion/src/metadata/ingestion/sink/{ldap_add_user.py => ldap_users_rest.py} (100%) rename ingestion/src/metadata/ingestion/sink/{metadata_tables_rest.py => metadata_rest_tables.py} (99%) rename ingestion/src/metadata/ingestion/sink/{metadata_users_rest.py => metadata_rest_users.py} (99%) delete mode 100644 ingestion/src/metadata/ingestion/sink/sink_registry.py rename ingestion/src/metadata/ingestion/source/{sample_data_generator.py => sample_tables.py} (56%) create mode 100644 ingestion/src/metadata/ingestion/source/sample_usage.py create mode 100644 ingestion/src/metadata/ingestion/source/sample_users.py delete mode 100644 ingestion/src/metadata/ingestion/source/source_registry.py delete mode 100644 ingestion/src/metadata/ingestion/stage/stage_registry.py diff --git a/ingestion/ingestion_dependency.sh b/ingestion/ingestion_dependency.sh index da0c9490bee..08f091458bb 100755 --- a/ingestion/ingestion_dependency.sh +++ b/ingestion/ingestion_dependency.sh @@ -17,7 +17,9 @@ # set -euo pipefail -pip install --upgrade pip setuptools openmetadata-ingestion==0.2.1 apns -pip install openmetadata-ingestion[mysql,sample-tables,elasticsearch] +pip install --upgrade setuptools openmetadata-ingestion==0.2.1 apns +# wget https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.0.0/en_core_web_sm-3.0.0-py3-none-any.whl +# pip install en_core_web_sm-3.0.0-py3-none-any.whl python -m spacy download en_core_web_sm +rm -rf en_core_web_sm-3.0.0-py3-none-any.whl pip install "simplescheduler@git+https://github.com/StreamlineData/sdscheduler.git#egg=simplescheduler" diff --git a/ingestion/pipelines/mysql.json b/ingestion/pipelines/mysql.json index 1b1826ae772..09e47380441 100644 --- a/ingestion/pipelines/mysql.json +++ b/ingestion/pipelines/mysql.json @@ -12,7 +12,7 @@ } }, "processor": { - "type": "pii-tags", + "type": "pii", "config": { "api_endpoint": "http://localhost:8585/api" } diff --git a/ingestion/src/metadata/ingestion/bulksink/bulk_sink_registry.py b/ingestion/src/metadata/ingestion/bulksink/bulk_sink_registry.py deleted file mode 100644 index e9722065dfa..00000000000 --- a/ingestion/src/metadata/ingestion/bulksink/bulk_sink_registry.py +++ /dev/null @@ -1,22 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from metadata.ingestion.api.bulk_sink import BulkSink -from metadata.ingestion.api.registry import Registry - -bulk_sink_registry = Registry[BulkSink]() -bulk_sink_registry.load("metadata.ingestion.bulksink.plugins") - - diff --git a/ingestion/src/metadata/ingestion/bulksink/metadata_usage_rest.py b/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py similarity index 99% rename from ingestion/src/metadata/ingestion/bulksink/metadata_usage_rest.py rename to ingestion/src/metadata/ingestion/bulksink/metadata_usage.py index 6f4dc5cfa8d..1592458c061 100644 --- a/ingestion/src/metadata/ingestion/bulksink/metadata_usage_rest.py +++ b/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py @@ -32,7 +32,7 @@ class MetadataUsageSinkConfig(ConfigModel): filename: str -class MetadataUsageBulkSink(BulkSink): +class MetadataUsageRestBulkSink(BulkSink): config: MetadataUsageSinkConfig def __init__(self, ctx: WorkflowContext, config: MetadataUsageSinkConfig, metadata_config: MetadataServerConfig): diff --git a/ingestion/src/metadata/ingestion/processor/pii_processor.py b/ingestion/src/metadata/ingestion/processor/pii.py similarity index 97% rename from ingestion/src/metadata/ingestion/processor/pii_processor.py rename to ingestion/src/metadata/ingestion/processor/pii.py index 27e6d68608d..58126730885 100644 --- a/ingestion/src/metadata/ingestion/processor/pii_processor.py +++ b/ingestion/src/metadata/ingestion/processor/pii.py @@ -160,19 +160,19 @@ class ColumnNameScanner(Scanner): return list(types) -class PIIProcessorConfig(ConfigModel): +class PiiProcessorConfig(ConfigModel): filter: Optional[str] = None api_endpoint: Optional[str] = None auth_provider_type: Optional[str] = None -class PIIProcessor(Processor): - config: PIIProcessorConfig +class PiiProcessor(Processor): + config: PiiProcessorConfig metadata_config: MetadataServerConfig status: ProcessorStatus client: REST - def __init__(self, ctx: WorkflowContext, config: PIIProcessorConfig, metadata_config: MetadataServerConfig): + def __init__(self, ctx: WorkflowContext, config: PiiProcessorConfig, metadata_config: MetadataServerConfig): super().__init__(ctx) self.config = config self.metadata_config = metadata_config @@ -184,7 +184,7 @@ class PIIProcessor(Processor): @classmethod def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext): - config = PIIProcessorConfig.parse_obj(config_dict) + config = PiiProcessorConfig.parse_obj(config_dict) metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict) return cls(ctx, config, metadata_config) diff --git a/ingestion/src/metadata/ingestion/processor/processor_registry.py b/ingestion/src/metadata/ingestion/processor/processor_registry.py deleted file mode 100644 index 94049afea80..00000000000 --- a/ingestion/src/metadata/ingestion/processor/processor_registry.py +++ /dev/null @@ -1,23 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from metadata.ingestion.api.processor import Processor -from metadata.ingestion.api.registry import Registry - -processor_registry = Registry[Processor]() -processor_registry.load("metadata.ingestion.processor.plugins") - - - diff --git a/ingestion/src/metadata/ingestion/sink/ldap_add_user.py b/ingestion/src/metadata/ingestion/sink/ldap_users_rest.py similarity index 100% rename from ingestion/src/metadata/ingestion/sink/ldap_add_user.py rename to ingestion/src/metadata/ingestion/sink/ldap_users_rest.py diff --git a/ingestion/src/metadata/ingestion/sink/metadata_tables_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest_tables.py similarity index 99% rename from ingestion/src/metadata/ingestion/sink/metadata_tables_rest.py rename to ingestion/src/metadata/ingestion/sink/metadata_rest_tables.py index 4620a9240fe..3da28e093fb 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_tables_rest.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest_tables.py @@ -33,7 +33,7 @@ class MetadataTablesSinkConfig(ConfigModel): api_endpoint: str = None -class MetadataTablesRestSink(Sink): +class MetadataRestTablesSink(Sink): config: MetadataTablesSinkConfig status: SinkStatus diff --git a/ingestion/src/metadata/ingestion/sink/metadata_users_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest_users.py similarity index 99% rename from ingestion/src/metadata/ingestion/sink/metadata_users_rest.py rename to ingestion/src/metadata/ingestion/sink/metadata_rest_users.py index 3fc8cd1c391..43ba7e3111c 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_users_rest.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest_users.py @@ -30,7 +30,7 @@ class MetadataUsersSinkConfig(ConfigModel): api_end_point: str = None -class MetadataUsersRestSink(Sink): +class MetadataRestUsersSink(Sink): config: MetadataUsersSinkConfig metadata_config: MetadataServerConfig status: SinkStatus diff --git a/ingestion/src/metadata/ingestion/sink/sink_registry.py b/ingestion/src/metadata/ingestion/sink/sink_registry.py deleted file mode 100644 index 65e5a5bb942..00000000000 --- a/ingestion/src/metadata/ingestion/sink/sink_registry.py +++ /dev/null @@ -1,22 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from metadata.ingestion.api.registry import Registry -from metadata.ingestion.api.sink import Sink - -sink_registry = Registry[Sink]() -sink_registry.load("metadata.ingestion.sink.plugins") -# These sinks are always enabled -assert sink_registry.get("file") diff --git a/ingestion/src/metadata/ingestion/source/mysql.py b/ingestion/src/metadata/ingestion/source/mysql.py index a082d5ed1a1..097a08fd732 100644 --- a/ingestion/src/metadata/ingestion/source/mysql.py +++ b/ingestion/src/metadata/ingestion/source/mysql.py @@ -24,7 +24,7 @@ class MySQLConfig(SQLConnectionConfig): def get_connection_url(self): return super().get_connection_url() -class MySQLSource(SQLSource): +class MysqlSource(SQLSource): def __init__(self, config, metadata_config, ctx): super().__init__(config, metadata_config, ctx) diff --git a/ingestion/src/metadata/ingestion/source/sample_data_generator.py b/ingestion/src/metadata/ingestion/source/sample_tables.py similarity index 56% rename from ingestion/src/metadata/ingestion/source/sample_data_generator.py rename to ingestion/src/metadata/ingestion/source/sample_tables.py index 3beb59a921f..2840604f2fc 100644 --- a/ingestion/src/metadata/ingestion/source/sample_data_generator.py +++ b/ingestion/src/metadata/ingestion/source/sample_tables.py @@ -14,45 +14,50 @@ # limitations under the License. import csv -import json +import pandas as pd import uuid import os -from datetime import datetime - -import pandas as pd -import random -import string -import logging - -from faker import Faker +import json from collections import namedtuple -from typing import Iterable, Dict, Any, List, Union -from metadata.generated.schema.api.services.createDatabaseService import CreateDatabaseServiceEntityRequest -from metadata.generated.schema.entity.services.databaseService import DatabaseServiceEntity +from dataclasses import dataclass, field +from typing import Iterable, List, Dict, Any, Union from metadata.config.common import ConfigModel from metadata.generated.schema.entity.data.table import TableEntity from metadata.generated.schema.entity.data.database import DatabaseEntity from metadata.generated.schema.type.entityReference import EntityReference -from metadata.ingestion.api.source import Source, SourceStatus -from dataclasses import dataclass, field +from metadata.ingestion.api.source import SourceStatus, Source from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable -from metadata.ingestion.models.table_metadata import DatabaseMetadata -from metadata.ingestion.models.table_queries import TableQuery -from metadata.ingestion.models.user import User from metadata.ingestion.ometa.auth_provider import MetadataServerConfig from metadata.ingestion.ometa.client import REST +from metadata.generated.schema.api.services.createDatabaseService import CreateDatabaseServiceEntityRequest +from metadata.generated.schema.entity.services.databaseService import DatabaseServiceEntity COLUMN_NAME = 'Column' KEY_TYPE = 'Key type' DATA_TYPE = 'Data type' -FAKER_METHOD = 'Faker method' COL_DESCRIPTION = 'Description' - -logger = logging.getLogger(__name__) - TableKey = namedtuple('TableKey', ['schema', 'table_name']) +def get_service_or_create(service_json, metadata_config) -> DatabaseServiceEntity: + client = REST(metadata_config) + service = client.get_database_service(service_json['name']) + if service is not None: + return service + else: + created_service = client.create_database_service(CreateDatabaseServiceEntityRequest(**service_json)) + return created_service + + +def get_table_key(row: Dict[str, Any]) -> Union[TableKey, None]: + """ + Table key consists of schema and table name + :param row: + :return: + """ + return TableKey(schema=row['schema'], table_name=row['table_name']) + + class SampleTableSourceConfig(ConfigModel): sample_schema_folder: str service_name: str @@ -64,19 +69,6 @@ class SampleTableSourceConfig(ConfigModel): return self.sample_schema_folder -class SampleUserSourceConfig(ConfigModel): - no_of_users: int - - -def get_table_key(row: Dict[str, Any]) -> Union[TableKey, None]: - """ - Table key consists of schema and table name - :param row: - :return: - """ - return TableKey(schema=row['schema'], table_name=row['table_name']) - - @dataclass class SampleTableSourceStatus(SourceStatus): tables_scanned: List[str] = field(default_factory=list) @@ -85,14 +77,6 @@ class SampleTableSourceStatus(SourceStatus): self.tables_scanned.append(table_name) -@dataclass -class SampleUserSourceStatus(SourceStatus): - users_scanned: List[str] = field(default_factory=list) - - def report_table_scanned(self, user_name: str) -> None: - self.users_scanned.append(user_name) - - class TableSchema: def __init__(self, filename): # error if the file is not csv file @@ -122,31 +106,6 @@ class TableSchema: return [c[COLUMN_NAME] for c in self.columns] -class DataGenerator: - def __init__(self, schemas): - if not schemas: - raise Exception('Input schemas should be an array of one or more TableSchemas') - - self.schemas = schemas - - # validate that each FK is a PK in one of the input schemas - # TODO - - self.table_to_schema = dict((s.get_name(), s) for s in schemas) - - def generate_data(self, table_name, number_of_rows): - fake = Faker() - schema = self.table_to_schema[table_name] - data = {} - for c in schema.get_schema(): - if not c[FAKER_METHOD]: - logging.debug('{} has no faker method input'.format(c)) - continue - fn = getattr(fake, c[FAKER_METHOD]) - data[c[COLUMN_NAME]] = [fn() for _ in range(number_of_rows)] - return pd.DataFrame(data) - - class SampleTableMetadataGenerator: def __init__(self, table_to_df_dict, table_to_schema_map): self.table_to_df_dict = table_to_df_dict @@ -210,60 +169,6 @@ class SampleTableMetadataGenerator: return sorted_row_dict -class SampleUserMetadataGenerator: - - def __init__(self, number_of_users): - self.number_of_users = number_of_users - - def generate_sample_user(self): - schema = dict() - fake = Faker() - # columns that use faker - schema['email'] = lambda: None - schema['first_name'] = lambda: fake.first_name() - schema['last_name'] = lambda: fake.last_name() - schema['full_name'] = lambda: None - schema['github_username'] = lambda: None - schema['team_name'] = lambda: random.choice( - ['Data_Infra', 'Infra', 'Payments', 'Legal', 'Dev_Platform', 'Trust', 'Marketplace']) - schema['employee_type'] = lambda: None - schema['manager_email'] = lambda: fake.email() - schema['slack_id'] = lambda: None - schema['role_name'] = lambda: random.choices( - ['ROLE_ENGINEER', 'ROLE_DATA_SCIENTIST', 'ROLE_ADMIN'], weights=[40, 40, 10])[0] - data = {} - - for k in schema.keys(): - data[k] = [schema[k]() for _ in range(self.number_of_users)] - - # fill in the columns that can be derived from the random data above - for i in range(self.number_of_users): - data['full_name'][i] = data['first_name'][i] + ' ' + data['last_name'][i] - username = data['first_name'][i].lower() + '_' + data['last_name'][i].lower() + random.choice( - string.digits) - data['slack_id'][i] = username - data['github_username'][i] = username - data['email'][i] = username + '@gmail.com' - data['employee_type'] = data['role_name'] - - pd_rows = pd.DataFrame(data) - row_dict = [] - for index, row in pd_rows.iterrows(): - row_dict.append(row) - - return row_dict - - -def get_service_or_create(service_json, metadata_config) -> DatabaseServiceEntity: - client = REST(metadata_config) - service = client.get_database_service(service_json['name']) - if service is not None: - return service - else: - created_service = client.create_database_service(CreateDatabaseServiceEntityRequest(**service_json)) - return created_service - - class SampleTableSource(Source): def __init__(self, config: SampleTableSourceConfig, metadata_config: MetadataServerConfig, ctx): @@ -302,80 +207,3 @@ class SampleTableSource(Source): def get_status(self): return self.status - - -class SampleUsageSource(Source): - - def __init__(self, config: SampleTableSourceConfig, metadata_config: MetadataServerConfig, ctx): - super().__init__(ctx) - self.status = SampleTableSourceStatus() - self.config = config - self.metadata_config = metadata_config - self.client = REST(metadata_config) - self.service_json = json.load(open(config.sample_schema_folder + "/service.json", 'r')) - self.query_log_csv = config.sample_schema_folder + "/query_log" - with open(self.query_log_csv, 'r') as fin: - self.query_logs = [dict(i) for i in csv.DictReader(fin)] - self.service = get_service_or_create(self.service_json, metadata_config) - - @classmethod - def create(cls, config_dict, metadata_config_dict, ctx): - config = SampleTableSourceConfig.parse_obj(config_dict) - metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict) - return cls(config, metadata_config, ctx) - - def prepare(self): - pass - - def next_record(self) -> Iterable[TableQuery]: - for row in self.query_logs: - tq = TableQuery(row['query'], '', 100, 0, 0, '', - '', datetime.today().strftime('%Y-%m-%d %H:%M:%S'), 100, 'shopify', - False, row['query']) - yield tq - - def close(self): - pass - - def get_status(self): - return self.status - - -class SampleUserSource(Source): - - def __init__(self, config: SampleUserSourceConfig, metadata_config: MetadataServerConfig, ctx): - super().__init__(ctx) - self.status = SampleUserSourceStatus() - metadata_gen = SampleUserMetadataGenerator(config.no_of_users) - self.sample_columns = metadata_gen.generate_sample_user() - - @classmethod - def create(cls, config_dict, metadata_config_dict, ctx): - config = SampleUserSourceConfig.parse_obj(config_dict) - metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict) - return cls(config, metadata_config, ctx) - - def prepare(self): - pass - - def next_record(self) -> Iterable[DatabaseMetadata]: - for user in self.sample_columns: - user_metadata = User(user['email'], - user['first_name'], - user['last_name'], - user['full_name'], - user['github_username'], - user['team_name'], - user['employee_type'], - user['manager_email'], - user['slack_id'], - True, - 0) - self.status.report_table_scanned(user['github_username']) - yield user_metadata - - def close(self): - pass - - def get_status(self): - return self.status diff --git a/ingestion/src/metadata/ingestion/source/sample_usage.py b/ingestion/src/metadata/ingestion/source/sample_usage.py new file mode 100644 index 00000000000..30c7526bf22 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/sample_usage.py @@ -0,0 +1,46 @@ +import json +import csv +from metadata.ingestion.api.source import Source +from sample_tables import SampleTableSourceConfig, SampleTableSourceStatus, get_service_or_create +from metadata.ingestion.ometa.auth_provider import MetadataServerConfig +from metadata.ingestion.models.table_queries import TableQuery +from typing import Iterable +from datetime import datetime +from metadata.ingestion.ometa.client import REST + + +class SampleUsageSource(Source): + + def __init__(self, config: SampleTableSourceConfig, metadata_config: MetadataServerConfig, ctx): + super().__init__(ctx) + self.status = SampleTableSourceStatus() + self.config = config + self.metadata_config = metadata_config + self.client = REST(metadata_config) + self.service_json = json.load(open(config.sample_schema_folder + "/service.json", 'r')) + self.query_log_csv = config.sample_schema_folder + "/query_log" + with open(self.query_log_csv, 'r') as fin: + self.query_logs = [dict(i) for i in csv.DictReader(fin)] + self.service = get_service_or_create(self.service_json, metadata_config) + + @classmethod + def create(cls, config_dict, metadata_config_dict, ctx): + config = SampleTableSourceConfig.parse_obj(config_dict) + metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict) + return cls(config, metadata_config, ctx) + + def prepare(self): + pass + + def next_record(self) -> Iterable[TableQuery]: + for row in self.query_logs: + tq = TableQuery(row['query'], '', 100, 0, 0, '', + '', datetime.today().strftime('%Y-%m-%d %H:%M:%S'), 100, 'shopify', + False, row['query']) + yield tq + + def close(self): + pass + + def get_status(self): + return self.status diff --git a/ingestion/src/metadata/ingestion/source/sample_users.py b/ingestion/src/metadata/ingestion/source/sample_users.py new file mode 100644 index 00000000000..5ccbf6c68fd --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/sample_users.py @@ -0,0 +1,122 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import random +import string +import pandas as pd +from faker import Faker +from typing import Iterable, List +from dataclasses import dataclass, field +from metadata.config.common import ConfigModel +from metadata.ingestion.api.source import Source, SourceStatus +from metadata.ingestion.ometa.auth_provider import MetadataServerConfig +from metadata.ingestion.models.table_metadata import DatabaseMetadata +from metadata.ingestion.models.user import User + + +class SampleUserSourceConfig(ConfigModel): + no_of_users: int + + +@dataclass +class SampleUserSourceStatus(SourceStatus): + users_scanned: List[str] = field(default_factory=list) + + def report_table_scanned(self, user_name: str) -> None: + self.users_scanned.append(user_name) + + +class SampleUserMetadataGenerator: + + def __init__(self, number_of_users): + self.number_of_users = number_of_users + + def generate_sample_user(self): + schema = dict() + fake = Faker() + # columns that use faker + schema['email'] = lambda: None + schema['first_name'] = lambda: fake.first_name() + schema['last_name'] = lambda: fake.last_name() + schema['full_name'] = lambda: None + schema['github_username'] = lambda: None + schema['team_name'] = lambda: random.choice( + ['Data_Infra', 'Infra', 'Payments', 'Legal', 'Dev_Platform', 'Trust', 'Marketplace']) + schema['employee_type'] = lambda: None + schema['manager_email'] = lambda: fake.email() + schema['slack_id'] = lambda: None + schema['role_name'] = lambda: random.choices( + ['ROLE_ENGINEER', 'ROLE_DATA_SCIENTIST', 'ROLE_ADMIN'], weights=[40, 40, 10])[0] + data = {} + + for k in schema.keys(): + data[k] = [schema[k]() for _ in range(self.number_of_users)] + + # fill in the columns that can be derived from the random data above + for i in range(self.number_of_users): + data['full_name'][i] = data['first_name'][i] + ' ' + data['last_name'][i] + username = data['first_name'][i].lower() + '_' + data['last_name'][i].lower() + random.choice( + string.digits) + data['slack_id'][i] = username + data['github_username'][i] = username + data['email'][i] = username + '@gmail.com' + data['employee_type'] = data['role_name'] + + pd_rows = pd.DataFrame(data) + row_dict = [] + for index, row in pd_rows.iterrows(): + row_dict.append(row) + + return row_dict + + +class SampleUsersSource(Source): + + def __init__(self, config: SampleUserSourceConfig, metadata_config: MetadataServerConfig, ctx): + super().__init__(ctx) + self.status = SampleUserSourceStatus() + metadata_gen = SampleUserMetadataGenerator(config.no_of_users) + self.sample_columns = metadata_gen.generate_sample_user() + + @classmethod + def create(cls, config_dict, metadata_config_dict, ctx): + config = SampleUserSourceConfig.parse_obj(config_dict) + metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict) + return cls(config, metadata_config, ctx) + + def prepare(self): + pass + + def next_record(self) -> Iterable[DatabaseMetadata]: + for user in self.sample_columns: + user_metadata = User(user['email'], + user['first_name'], + user['last_name'], + user['full_name'], + user['github_username'], + user['team_name'], + user['employee_type'], + user['manager_email'], + user['slack_id'], + True, + 0) + self.status.report_table_scanned(user['github_username']) + yield user_metadata + + def close(self): + pass + + def get_status(self): + return self.status diff --git a/ingestion/src/metadata/ingestion/source/source_registry.py b/ingestion/src/metadata/ingestion/source/source_registry.py deleted file mode 100644 index 8e4ae2dbcd5..00000000000 --- a/ingestion/src/metadata/ingestion/source/source_registry.py +++ /dev/null @@ -1,22 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from metadata.ingestion.api.registry import Registry -from metadata.ingestion.api.source import Source - -source_registry = Registry[Source]() -source_registry.load("metadata.ingestion.source.plugins") - -# This source is always enabled diff --git a/ingestion/src/metadata/ingestion/stage/stage_registry.py b/ingestion/src/metadata/ingestion/stage/stage_registry.py deleted file mode 100644 index 538aa0e419e..00000000000 --- a/ingestion/src/metadata/ingestion/stage/stage_registry.py +++ /dev/null @@ -1,22 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from metadata.ingestion.api.registry import Registry -from metadata.ingestion.api.stage import Stage - -stage_registry = Registry[Stage]() -stage_registry.load("metadata.ingestion.stage.plugins") - -