Registry Cleanup

This commit is contained in:
Ayush Shah 2021-08-13 20:35:17 +05:30
parent 5670cb0518
commit 042ef45ca4
16 changed files with 208 additions and 321 deletions

View File

@ -17,7 +17,9 @@
#
set -euo pipefail
pip install --upgrade pip setuptools openmetadata-ingestion==0.2.1 apns
pip install openmetadata-ingestion[mysql,sample-tables,elasticsearch]
pip install --upgrade setuptools openmetadata-ingestion==0.2.1 apns
# wget https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.0.0/en_core_web_sm-3.0.0-py3-none-any.whl
# pip install en_core_web_sm-3.0.0-py3-none-any.whl
python -m spacy download en_core_web_sm
rm -rf en_core_web_sm-3.0.0-py3-none-any.whl
pip install "simplescheduler@git+https://github.com/StreamlineData/sdscheduler.git#egg=simplescheduler"

View File

@ -12,7 +12,7 @@
}
},
"processor": {
"type": "pii-tags",
"type": "pii",
"config": {
"api_endpoint": "http://localhost:8585/api"
}

View File

@ -1,22 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from metadata.ingestion.api.bulk_sink import BulkSink
from metadata.ingestion.api.registry import Registry
bulk_sink_registry = Registry[BulkSink]()
bulk_sink_registry.load("metadata.ingestion.bulksink.plugins")

View File

@ -32,7 +32,7 @@ class MetadataUsageSinkConfig(ConfigModel):
filename: str
class MetadataUsageBulkSink(BulkSink):
class MetadataUsageRestBulkSink(BulkSink):
config: MetadataUsageSinkConfig
def __init__(self, ctx: WorkflowContext, config: MetadataUsageSinkConfig, metadata_config: MetadataServerConfig):

View File

@ -160,19 +160,19 @@ class ColumnNameScanner(Scanner):
return list(types)
class PIIProcessorConfig(ConfigModel):
class PiiProcessorConfig(ConfigModel):
filter: Optional[str] = None
api_endpoint: Optional[str] = None
auth_provider_type: Optional[str] = None
class PIIProcessor(Processor):
config: PIIProcessorConfig
class PiiProcessor(Processor):
config: PiiProcessorConfig
metadata_config: MetadataServerConfig
status: ProcessorStatus
client: REST
def __init__(self, ctx: WorkflowContext, config: PIIProcessorConfig, metadata_config: MetadataServerConfig):
def __init__(self, ctx: WorkflowContext, config: PiiProcessorConfig, metadata_config: MetadataServerConfig):
super().__init__(ctx)
self.config = config
self.metadata_config = metadata_config
@ -184,7 +184,7 @@ class PIIProcessor(Processor):
@classmethod
def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext):
config = PIIProcessorConfig.parse_obj(config_dict)
config = PiiProcessorConfig.parse_obj(config_dict)
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
return cls(ctx, config, metadata_config)

View File

@ -1,23 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from metadata.ingestion.api.processor import Processor
from metadata.ingestion.api.registry import Registry
processor_registry = Registry[Processor]()
processor_registry.load("metadata.ingestion.processor.plugins")

View File

@ -33,7 +33,7 @@ class MetadataTablesSinkConfig(ConfigModel):
api_endpoint: str = None
class MetadataTablesRestSink(Sink):
class MetadataRestTablesSink(Sink):
config: MetadataTablesSinkConfig
status: SinkStatus

View File

@ -30,7 +30,7 @@ class MetadataUsersSinkConfig(ConfigModel):
api_end_point: str = None
class MetadataUsersRestSink(Sink):
class MetadataRestUsersSink(Sink):
config: MetadataUsersSinkConfig
metadata_config: MetadataServerConfig
status: SinkStatus

View File

@ -1,22 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from metadata.ingestion.api.registry import Registry
from metadata.ingestion.api.sink import Sink
sink_registry = Registry[Sink]()
sink_registry.load("metadata.ingestion.sink.plugins")
# These sinks are always enabled
assert sink_registry.get("file")

View File

@ -24,7 +24,7 @@ class MySQLConfig(SQLConnectionConfig):
def get_connection_url(self):
return super().get_connection_url()
class MySQLSource(SQLSource):
class MysqlSource(SQLSource):
def __init__(self, config, metadata_config, ctx):
super().__init__(config, metadata_config, ctx)

View File

@ -14,45 +14,50 @@
# limitations under the License.
import csv
import json
import pandas as pd
import uuid
import os
from datetime import datetime
import pandas as pd
import random
import string
import logging
from faker import Faker
import json
from collections import namedtuple
from typing import Iterable, Dict, Any, List, Union
from metadata.generated.schema.api.services.createDatabaseService import CreateDatabaseServiceEntityRequest
from metadata.generated.schema.entity.services.databaseService import DatabaseServiceEntity
from dataclasses import dataclass, field
from typing import Iterable, List, Dict, Any, Union
from metadata.config.common import ConfigModel
from metadata.generated.schema.entity.data.table import TableEntity
from metadata.generated.schema.entity.data.database import DatabaseEntity
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.source import Source, SourceStatus
from dataclasses import dataclass, field
from metadata.ingestion.api.source import SourceStatus, Source
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.models.table_metadata import DatabaseMetadata
from metadata.ingestion.models.table_queries import TableQuery
from metadata.ingestion.models.user import User
from metadata.ingestion.ometa.auth_provider import MetadataServerConfig
from metadata.ingestion.ometa.client import REST
from metadata.generated.schema.api.services.createDatabaseService import CreateDatabaseServiceEntityRequest
from metadata.generated.schema.entity.services.databaseService import DatabaseServiceEntity
COLUMN_NAME = 'Column'
KEY_TYPE = 'Key type'
DATA_TYPE = 'Data type'
FAKER_METHOD = 'Faker method'
COL_DESCRIPTION = 'Description'
logger = logging.getLogger(__name__)
TableKey = namedtuple('TableKey', ['schema', 'table_name'])
def get_service_or_create(service_json, metadata_config) -> DatabaseServiceEntity:
client = REST(metadata_config)
service = client.get_database_service(service_json['name'])
if service is not None:
return service
else:
created_service = client.create_database_service(CreateDatabaseServiceEntityRequest(**service_json))
return created_service
def get_table_key(row: Dict[str, Any]) -> Union[TableKey, None]:
"""
Table key consists of schema and table name
:param row:
:return:
"""
return TableKey(schema=row['schema'], table_name=row['table_name'])
class SampleTableSourceConfig(ConfigModel):
sample_schema_folder: str
service_name: str
@ -64,19 +69,6 @@ class SampleTableSourceConfig(ConfigModel):
return self.sample_schema_folder
class SampleUserSourceConfig(ConfigModel):
no_of_users: int
def get_table_key(row: Dict[str, Any]) -> Union[TableKey, None]:
"""
Table key consists of schema and table name
:param row:
:return:
"""
return TableKey(schema=row['schema'], table_name=row['table_name'])
@dataclass
class SampleTableSourceStatus(SourceStatus):
tables_scanned: List[str] = field(default_factory=list)
@ -85,14 +77,6 @@ class SampleTableSourceStatus(SourceStatus):
self.tables_scanned.append(table_name)
@dataclass
class SampleUserSourceStatus(SourceStatus):
users_scanned: List[str] = field(default_factory=list)
def report_table_scanned(self, user_name: str) -> None:
self.users_scanned.append(user_name)
class TableSchema:
def __init__(self, filename):
# error if the file is not csv file
@ -122,31 +106,6 @@ class TableSchema:
return [c[COLUMN_NAME] for c in self.columns]
class DataGenerator:
def __init__(self, schemas):
if not schemas:
raise Exception('Input schemas should be an array of one or more TableSchemas')
self.schemas = schemas
# validate that each FK is a PK in one of the input schemas
# TODO
self.table_to_schema = dict((s.get_name(), s) for s in schemas)
def generate_data(self, table_name, number_of_rows):
fake = Faker()
schema = self.table_to_schema[table_name]
data = {}
for c in schema.get_schema():
if not c[FAKER_METHOD]:
logging.debug('{} has no faker method input'.format(c))
continue
fn = getattr(fake, c[FAKER_METHOD])
data[c[COLUMN_NAME]] = [fn() for _ in range(number_of_rows)]
return pd.DataFrame(data)
class SampleTableMetadataGenerator:
def __init__(self, table_to_df_dict, table_to_schema_map):
self.table_to_df_dict = table_to_df_dict
@ -210,60 +169,6 @@ class SampleTableMetadataGenerator:
return sorted_row_dict
class SampleUserMetadataGenerator:
def __init__(self, number_of_users):
self.number_of_users = number_of_users
def generate_sample_user(self):
schema = dict()
fake = Faker()
# columns that use faker
schema['email'] = lambda: None
schema['first_name'] = lambda: fake.first_name()
schema['last_name'] = lambda: fake.last_name()
schema['full_name'] = lambda: None
schema['github_username'] = lambda: None
schema['team_name'] = lambda: random.choice(
['Data_Infra', 'Infra', 'Payments', 'Legal', 'Dev_Platform', 'Trust', 'Marketplace'])
schema['employee_type'] = lambda: None
schema['manager_email'] = lambda: fake.email()
schema['slack_id'] = lambda: None
schema['role_name'] = lambda: random.choices(
['ROLE_ENGINEER', 'ROLE_DATA_SCIENTIST', 'ROLE_ADMIN'], weights=[40, 40, 10])[0]
data = {}
for k in schema.keys():
data[k] = [schema[k]() for _ in range(self.number_of_users)]
# fill in the columns that can be derived from the random data above
for i in range(self.number_of_users):
data['full_name'][i] = data['first_name'][i] + ' ' + data['last_name'][i]
username = data['first_name'][i].lower() + '_' + data['last_name'][i].lower() + random.choice(
string.digits)
data['slack_id'][i] = username
data['github_username'][i] = username
data['email'][i] = username + '@gmail.com'
data['employee_type'] = data['role_name']
pd_rows = pd.DataFrame(data)
row_dict = []
for index, row in pd_rows.iterrows():
row_dict.append(row)
return row_dict
def get_service_or_create(service_json, metadata_config) -> DatabaseServiceEntity:
client = REST(metadata_config)
service = client.get_database_service(service_json['name'])
if service is not None:
return service
else:
created_service = client.create_database_service(CreateDatabaseServiceEntityRequest(**service_json))
return created_service
class SampleTableSource(Source):
def __init__(self, config: SampleTableSourceConfig, metadata_config: MetadataServerConfig, ctx):
@ -302,80 +207,3 @@ class SampleTableSource(Source):
def get_status(self):
return self.status
class SampleUsageSource(Source):
def __init__(self, config: SampleTableSourceConfig, metadata_config: MetadataServerConfig, ctx):
super().__init__(ctx)
self.status = SampleTableSourceStatus()
self.config = config
self.metadata_config = metadata_config
self.client = REST(metadata_config)
self.service_json = json.load(open(config.sample_schema_folder + "/service.json", 'r'))
self.query_log_csv = config.sample_schema_folder + "/query_log"
with open(self.query_log_csv, 'r') as fin:
self.query_logs = [dict(i) for i in csv.DictReader(fin)]
self.service = get_service_or_create(self.service_json, metadata_config)
@classmethod
def create(cls, config_dict, metadata_config_dict, ctx):
config = SampleTableSourceConfig.parse_obj(config_dict)
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
return cls(config, metadata_config, ctx)
def prepare(self):
pass
def next_record(self) -> Iterable[TableQuery]:
for row in self.query_logs:
tq = TableQuery(row['query'], '', 100, 0, 0, '',
'', datetime.today().strftime('%Y-%m-%d %H:%M:%S'), 100, 'shopify',
False, row['query'])
yield tq
def close(self):
pass
def get_status(self):
return self.status
class SampleUserSource(Source):
def __init__(self, config: SampleUserSourceConfig, metadata_config: MetadataServerConfig, ctx):
super().__init__(ctx)
self.status = SampleUserSourceStatus()
metadata_gen = SampleUserMetadataGenerator(config.no_of_users)
self.sample_columns = metadata_gen.generate_sample_user()
@classmethod
def create(cls, config_dict, metadata_config_dict, ctx):
config = SampleUserSourceConfig.parse_obj(config_dict)
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
return cls(config, metadata_config, ctx)
def prepare(self):
pass
def next_record(self) -> Iterable[DatabaseMetadata]:
for user in self.sample_columns:
user_metadata = User(user['email'],
user['first_name'],
user['last_name'],
user['full_name'],
user['github_username'],
user['team_name'],
user['employee_type'],
user['manager_email'],
user['slack_id'],
True,
0)
self.status.report_table_scanned(user['github_username'])
yield user_metadata
def close(self):
pass
def get_status(self):
return self.status

View File

@ -0,0 +1,46 @@
import json
import csv
from metadata.ingestion.api.source import Source
from sample_tables import SampleTableSourceConfig, SampleTableSourceStatus, get_service_or_create
from metadata.ingestion.ometa.auth_provider import MetadataServerConfig
from metadata.ingestion.models.table_queries import TableQuery
from typing import Iterable
from datetime import datetime
from metadata.ingestion.ometa.client import REST
class SampleUsageSource(Source):
def __init__(self, config: SampleTableSourceConfig, metadata_config: MetadataServerConfig, ctx):
super().__init__(ctx)
self.status = SampleTableSourceStatus()
self.config = config
self.metadata_config = metadata_config
self.client = REST(metadata_config)
self.service_json = json.load(open(config.sample_schema_folder + "/service.json", 'r'))
self.query_log_csv = config.sample_schema_folder + "/query_log"
with open(self.query_log_csv, 'r') as fin:
self.query_logs = [dict(i) for i in csv.DictReader(fin)]
self.service = get_service_or_create(self.service_json, metadata_config)
@classmethod
def create(cls, config_dict, metadata_config_dict, ctx):
config = SampleTableSourceConfig.parse_obj(config_dict)
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
return cls(config, metadata_config, ctx)
def prepare(self):
pass
def next_record(self) -> Iterable[TableQuery]:
for row in self.query_logs:
tq = TableQuery(row['query'], '', 100, 0, 0, '',
'', datetime.today().strftime('%Y-%m-%d %H:%M:%S'), 100, 'shopify',
False, row['query'])
yield tq
def close(self):
pass
def get_status(self):
return self.status

View File

@ -0,0 +1,122 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
import string
import pandas as pd
from faker import Faker
from typing import Iterable, List
from dataclasses import dataclass, field
from metadata.config.common import ConfigModel
from metadata.ingestion.api.source import Source, SourceStatus
from metadata.ingestion.ometa.auth_provider import MetadataServerConfig
from metadata.ingestion.models.table_metadata import DatabaseMetadata
from metadata.ingestion.models.user import User
class SampleUserSourceConfig(ConfigModel):
no_of_users: int
@dataclass
class SampleUserSourceStatus(SourceStatus):
users_scanned: List[str] = field(default_factory=list)
def report_table_scanned(self, user_name: str) -> None:
self.users_scanned.append(user_name)
class SampleUserMetadataGenerator:
def __init__(self, number_of_users):
self.number_of_users = number_of_users
def generate_sample_user(self):
schema = dict()
fake = Faker()
# columns that use faker
schema['email'] = lambda: None
schema['first_name'] = lambda: fake.first_name()
schema['last_name'] = lambda: fake.last_name()
schema['full_name'] = lambda: None
schema['github_username'] = lambda: None
schema['team_name'] = lambda: random.choice(
['Data_Infra', 'Infra', 'Payments', 'Legal', 'Dev_Platform', 'Trust', 'Marketplace'])
schema['employee_type'] = lambda: None
schema['manager_email'] = lambda: fake.email()
schema['slack_id'] = lambda: None
schema['role_name'] = lambda: random.choices(
['ROLE_ENGINEER', 'ROLE_DATA_SCIENTIST', 'ROLE_ADMIN'], weights=[40, 40, 10])[0]
data = {}
for k in schema.keys():
data[k] = [schema[k]() for _ in range(self.number_of_users)]
# fill in the columns that can be derived from the random data above
for i in range(self.number_of_users):
data['full_name'][i] = data['first_name'][i] + ' ' + data['last_name'][i]
username = data['first_name'][i].lower() + '_' + data['last_name'][i].lower() + random.choice(
string.digits)
data['slack_id'][i] = username
data['github_username'][i] = username
data['email'][i] = username + '@gmail.com'
data['employee_type'] = data['role_name']
pd_rows = pd.DataFrame(data)
row_dict = []
for index, row in pd_rows.iterrows():
row_dict.append(row)
return row_dict
class SampleUsersSource(Source):
def __init__(self, config: SampleUserSourceConfig, metadata_config: MetadataServerConfig, ctx):
super().__init__(ctx)
self.status = SampleUserSourceStatus()
metadata_gen = SampleUserMetadataGenerator(config.no_of_users)
self.sample_columns = metadata_gen.generate_sample_user()
@classmethod
def create(cls, config_dict, metadata_config_dict, ctx):
config = SampleUserSourceConfig.parse_obj(config_dict)
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
return cls(config, metadata_config, ctx)
def prepare(self):
pass
def next_record(self) -> Iterable[DatabaseMetadata]:
for user in self.sample_columns:
user_metadata = User(user['email'],
user['first_name'],
user['last_name'],
user['full_name'],
user['github_username'],
user['team_name'],
user['employee_type'],
user['manager_email'],
user['slack_id'],
True,
0)
self.status.report_table_scanned(user['github_username'])
yield user_metadata
def close(self):
pass
def get_status(self):
return self.status

View File

@ -1,22 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from metadata.ingestion.api.registry import Registry
from metadata.ingestion.api.source import Source
source_registry = Registry[Source]()
source_registry.load("metadata.ingestion.source.plugins")
# This source is always enabled

View File

@ -1,22 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from metadata.ingestion.api.registry import Registry
from metadata.ingestion.api.stage import Stage
stage_registry = Registry[Stage]()
stage_registry.load("metadata.ingestion.stage.plugins")