mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-10-15 10:48:31 +00:00
Merge pull request #150 from open-metadata/setup-optimize
Setup optimize
This commit is contained in:
commit
82bec0e50f
@ -41,10 +41,8 @@ services:
|
|||||||
- 9200:9200
|
- 9200:9200
|
||||||
- 9300:9300
|
- 9300:9300
|
||||||
|
|
||||||
catalog:
|
openmetadata-server:
|
||||||
build:
|
image: openmetadata/server:latest
|
||||||
context: ../../.
|
|
||||||
dockerfile: docker/metadata/Dockerfile
|
|
||||||
expose:
|
expose:
|
||||||
- 8585
|
- 8585
|
||||||
- 9200
|
- 9200
|
||||||
@ -62,9 +60,7 @@ services:
|
|||||||
- "localhost:172.16.239.11"
|
- "localhost:172.16.239.11"
|
||||||
|
|
||||||
ingestion:
|
ingestion:
|
||||||
build:
|
image: openmetadata/ingestion:latest
|
||||||
context: ../../ingestion/.
|
|
||||||
dockerfile: Dockerfile
|
|
||||||
expose:
|
expose:
|
||||||
- 7777
|
- 7777
|
||||||
ports:
|
ports:
|
||||||
@ -75,20 +71,6 @@ services:
|
|||||||
- "localhost:172.16.239.10"
|
- "localhost:172.16.239.10"
|
||||||
- "localhost:172.16.239.11"
|
- "localhost:172.16.239.11"
|
||||||
- "localhost:172.16.239.12"
|
- "localhost:172.16.239.12"
|
||||||
- "localhost:172.16.239.13"
|
|
||||||
|
|
||||||
postgres:
|
|
||||||
image: postgres
|
|
||||||
restart: always
|
|
||||||
environment:
|
|
||||||
POSTGRES_DB: pagila
|
|
||||||
POSTGRES_USER: openmetadata_user
|
|
||||||
POSTGRES_PASSWORD: openmetadata_password
|
|
||||||
ports:
|
|
||||||
- 5433:5432
|
|
||||||
networks:
|
|
||||||
app_net:
|
|
||||||
ipv4_address: 172.16.239.13
|
|
||||||
|
|
||||||
networks:
|
networks:
|
||||||
app_net:
|
app_net:
|
||||||
|
@ -95,7 +95,7 @@ Add Optional `pii-tags` processor and `metadata-rest-tables` sink along with `me
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
"processor": {
|
"processor": {
|
||||||
"type": "pii-tags",
|
"type": "pii",
|
||||||
"config": {
|
"config": {
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
@ -94,7 +94,7 @@ Add Optional `pii-tags` processor and `metadata-rest-tables` sink along with `me
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
"processor": {
|
"processor": {
|
||||||
"type": "pii-tags",
|
"type": "pii",
|
||||||
"config": {
|
"config": {
|
||||||
"api_endpoint": "http://localhost:8585/api"
|
"api_endpoint": "http://localhost:8585/api"
|
||||||
}
|
}
|
||||||
|
@ -95,7 +95,7 @@ Add Optional `pii-tags` processor and `metadata-rest-tables` sink along with `me
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
"processor": {
|
"processor": {
|
||||||
"type": "pii-tags",
|
"type": "pii",
|
||||||
"config": {}
|
"config": {}
|
||||||
},
|
},
|
||||||
"sink": {
|
"sink": {
|
||||||
|
@ -1,8 +1,7 @@
|
|||||||
FROM python:3.9.2
|
FROM python:3.8.10
|
||||||
|
|
||||||
EXPOSE 7777
|
EXPOSE 7777
|
||||||
|
|
||||||
COPY ./examples /openmetadata-ingestion/examples
|
|
||||||
COPY ./pipelines /openmetadata-ingestion/pipelines
|
COPY ./pipelines /openmetadata-ingestion/pipelines
|
||||||
COPY ./ingestion_scheduler /openmetadata-ingestion/ingestion_scheduler
|
COPY ./ingestion_scheduler /openmetadata-ingestion/ingestion_scheduler
|
||||||
COPY ./ingestion_dependency.sh /openmetadata-ingestion/ingestion_dependency.sh
|
COPY ./ingestion_dependency.sh /openmetadata-ingestion/ingestion_dependency.sh
|
||||||
|
@ -10,7 +10,7 @@
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
"processor": {
|
"processor": {
|
||||||
"type": "pii-tags",
|
"type": "pii",
|
||||||
"config": {
|
"config": {
|
||||||
"api_endpoint": "http://localhost:8585/api"
|
"api_endpoint": "http://localhost:8585/api"
|
||||||
}
|
}
|
||||||
|
@ -8,14 +8,12 @@
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
"processor": {
|
"processor": {
|
||||||
"type": "pii-tags",
|
"type": "pii",
|
||||||
"config": {
|
"config": {}
|
||||||
}
|
|
||||||
},
|
},
|
||||||
"sink": {
|
"sink": {
|
||||||
"type": "metadata-rest-tables",
|
"type": "metadata-rest-tables",
|
||||||
"config": {
|
"config": {}
|
||||||
}
|
|
||||||
},
|
},
|
||||||
"metadata_server": {
|
"metadata_server": {
|
||||||
"type": "metadata-server",
|
"type": "metadata-server",
|
||||||
|
@ -14,7 +14,7 @@
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
"processor": {
|
"processor": {
|
||||||
"type": "pii-tags",
|
"type": "pii",
|
||||||
"config": {
|
"config": {
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
@ -11,7 +11,7 @@
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
"processor": {
|
"processor": {
|
||||||
"type": "pii-tags",
|
"type": "pii",
|
||||||
"config": {}
|
"config": {}
|
||||||
},
|
},
|
||||||
"sink": {
|
"sink": {
|
||||||
|
@ -11,16 +11,14 @@
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
"processor": {
|
"processor": {
|
||||||
"type": "pii-tags",
|
"type": "pii",
|
||||||
"config": {
|
"config": {}
|
||||||
}
|
|
||||||
},
|
},
|
||||||
"sink": {
|
"sink": {
|
||||||
"type": "metadata-rest-tables",
|
"type": "metadata-rest-tables",
|
||||||
"config": {
|
"config": {}
|
||||||
}
|
|
||||||
},
|
},
|
||||||
"metadata_server": {
|
"metadata_server": {
|
||||||
"type": "metadata-server",
|
"type": "metadata-server",
|
||||||
"config": {
|
"config": {
|
||||||
"api_endpoint": "http://localhost:8585/api",
|
"api_endpoint": "http://localhost:8585/api",
|
||||||
|
@ -19,7 +19,7 @@
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
"stage": {
|
"stage": {
|
||||||
"type": "table-usage-stage",
|
"type": "table-usage",
|
||||||
"config": {
|
"config": {
|
||||||
"filename": "/tmp/redshift_usage"
|
"filename": "/tmp/redshift_usage"
|
||||||
}
|
}
|
||||||
|
@ -19,7 +19,7 @@
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
"processor": {
|
"processor": {
|
||||||
"type": "pii-tags",
|
"type": "pii",
|
||||||
"config": {}
|
"config": {}
|
||||||
},
|
},
|
||||||
"sink": {
|
"sink": {
|
||||||
|
@ -19,7 +19,7 @@
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
"stage": {
|
"stage": {
|
||||||
"type": "table-usage-stage",
|
"type": "table-usage",
|
||||||
"config": {
|
"config": {
|
||||||
"filename": "/tmp/snowflake_usage"
|
"filename": "/tmp/snowflake_usage"
|
||||||
}
|
}
|
||||||
|
@ -17,7 +17,9 @@
|
|||||||
#
|
#
|
||||||
|
|
||||||
set -euo pipefail
|
set -euo pipefail
|
||||||
pip install --upgrade pip setuptools openmetadata-ingestion==0.2.1 apns
|
pip install --upgrade setuptools openmetadata-ingestion==0.2.1 apns
|
||||||
pip install openmetadata-ingestion[mysql,sample-tables,elasticsearch]
|
# wget https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.0.0/en_core_web_sm-3.0.0-py3-none-any.whl
|
||||||
|
# pip install en_core_web_sm-3.0.0-py3-none-any.whl
|
||||||
python -m spacy download en_core_web_sm
|
python -m spacy download en_core_web_sm
|
||||||
|
rm -rf en_core_web_sm-3.0.0-py3-none-any.whl
|
||||||
pip install "simplescheduler@git+https://github.com/StreamlineData/sdscheduler.git#egg=simplescheduler"
|
pip install "simplescheduler@git+https://github.com/StreamlineData/sdscheduler.git#egg=simplescheduler"
|
||||||
|
@ -1,10 +1,10 @@
|
|||||||
{
|
{
|
||||||
"source": {
|
"source": {
|
||||||
"type": "metadata-rest-tables",
|
"type": "metadata_es",
|
||||||
"config": {}
|
"config": {}
|
||||||
},
|
},
|
||||||
"stage": {
|
"stage": {
|
||||||
"type": "file-stage",
|
"type": "file",
|
||||||
"config": {
|
"config": {
|
||||||
"filename": "/tmp/tables.txt"
|
"filename": "/tmp/tables.txt"
|
||||||
}
|
}
|
||||||
|
@ -12,7 +12,7 @@
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
"processor": {
|
"processor": {
|
||||||
"type": "pii-tags",
|
"type": "pii",
|
||||||
"config": {
|
"config": {
|
||||||
"api_endpoint": "http://localhost:8585/api"
|
"api_endpoint": "http://localhost:8585/api"
|
||||||
}
|
}
|
||||||
|
@ -9,7 +9,7 @@
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
"processor": {
|
"processor": {
|
||||||
"type": "pii-tags",
|
"type": "pii",
|
||||||
"config": {
|
"config": {
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
@ -15,7 +15,7 @@
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
"stage": {
|
"stage": {
|
||||||
"type": "table-usage-stage",
|
"type": "table-usage",
|
||||||
"config": {
|
"config": {
|
||||||
"filename": "/tmp/sample_usage"
|
"filename": "/tmp/sample_usage"
|
||||||
}
|
}
|
||||||
|
@ -44,7 +44,7 @@ base_requirements = {
|
|||||||
"typing_extensions>=3.7.4"
|
"typing_extensions>=3.7.4"
|
||||||
"mypy_extensions>=0.4.3",
|
"mypy_extensions>=0.4.3",
|
||||||
"typing-inspect",
|
"typing-inspect",
|
||||||
"pydantic~=1.7.4",
|
"pydantic==1.7.4",
|
||||||
"pydantic[email]>=1.7.2",
|
"pydantic[email]>=1.7.2",
|
||||||
"google>=3.0.0",
|
"google>=3.0.0",
|
||||||
"google-auth>=1.33.0",
|
"google-auth>=1.33.0",
|
||||||
@ -54,13 +54,14 @@ base_requirements = {
|
|||||||
"python-jose==3.3.0",
|
"python-jose==3.3.0",
|
||||||
"okta==1.7.0",
|
"okta==1.7.0",
|
||||||
"pandas~=1.3.1",
|
"pandas~=1.3.1",
|
||||||
"sqlalchemy>=1.3.24",
|
"sqlalchemy>=1.3.24"
|
||||||
"sql-metadata~=2.0.0",
|
"sql-metadata~=2.0.0"
|
||||||
"spacy==3.0.5",
|
"spacy==3.0.5"
|
||||||
"requests~=2.25.1"
|
"requests~=2.25.1",
|
||||||
|
"en_core_web_sm@https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.0.0/en_core_web_sm-3.0.0.tar.gz#egg=en_core_web"
|
||||||
}
|
}
|
||||||
base_plugins = {
|
base_plugins = {
|
||||||
"pii-tags",
|
"pii-processor",
|
||||||
"query-parser",
|
"query-parser",
|
||||||
"metadata-usage",
|
"metadata-usage",
|
||||||
"file-stage",
|
"file-stage",
|
||||||
@ -110,44 +111,6 @@ setup(
|
|||||||
packages=find_namespace_packages(where='./src', exclude=['tests*']),
|
packages=find_namespace_packages(where='./src', exclude=['tests*']),
|
||||||
entry_points={
|
entry_points={
|
||||||
"console_scripts": ["metadata = metadata.cmd:metadata"],
|
"console_scripts": ["metadata = metadata.cmd:metadata"],
|
||||||
"metadata.ingestion.source.plugins": [
|
|
||||||
"mysql = metadata.ingestion.source.mysql:MySQLSource",
|
|
||||||
"postgres = metadata.ingestion.source.postgres:PostgresSource",
|
|
||||||
"snowflake = metadata.ingestion.source.snowflake:SnowflakeSource",
|
|
||||||
"redshift = metadata.ingestion.source.redshift:RedshiftSource",
|
|
||||||
"redshift-sql = metadata.ingestion.source.redshift_sql:RedshiftSQLSource",
|
|
||||||
"bigquery = metadata.ingestion.source.bigquery:BigQuerySource",
|
|
||||||
"athena = metadata.ingestion.source.athena:AthenaSource",
|
|
||||||
"oracle = metadata.ingestion.source.oracle:OracleSource",
|
|
||||||
"mssql = metadata.ingestion.source.mssql:SQLServerSource",
|
|
||||||
"hive = metadata.ingestion.source.hive:HiveSource",
|
|
||||||
"sample-tables = metadata.ingestion.source.sample_data_generator:SampleTableSource",
|
|
||||||
"sample-users = metadata.ingestion.source.sample_data_generator:SampleUserSource",
|
|
||||||
"sample-usage = metadata.ingestion.source.sample_data_generator:SampleUsageSource",
|
|
||||||
"metadata-rest-tables = metadata.ingestion.source.metadata_rest:MetadataTablesRestSource",
|
|
||||||
"redshift-usage = metadata.ingestion.source.redshift_usage:RedshiftUsageSource",
|
|
||||||
"snowflake-usage = metadata.ingestion.source.snowflake_usage:SnowflakeUsageSource",
|
|
||||||
"ldap-users = metadata.ingestion.source.ldap_source:LDAPUserSource"
|
|
||||||
],
|
|
||||||
"metadata.ingestion.sink.plugins": [
|
|
||||||
"file = metadata.ingestion.sink.file:FileSink",
|
|
||||||
"console = metadata.ingestion.sink.console:ConsoleSink",
|
|
||||||
"metadata-rest-tables = metadata.ingestion.sink.metadata_tables_rest:MetadataTablesRestSink",
|
|
||||||
"metadata-rest-users = metadata.ingestion.sink.metadata_users_rest:MetadataUsersRestSink",
|
|
||||||
"ldap-rest-users = metadata.ingestion.sink.ldap_add_user:LdapUserRestSink"
|
|
||||||
],
|
|
||||||
"metadata.ingestion.processor.plugins": [
|
|
||||||
"pii-tags = metadata.ingestion.processor.pii_processor:PIIProcessor",
|
|
||||||
"query-parser = metadata.ingestion.processor.query_parser:QueryParserProcessor",
|
|
||||||
],
|
|
||||||
"metadata.ingestion.stage.plugins": [
|
|
||||||
"file-stage = metadata.ingestion.stage.file:FileStage",
|
|
||||||
"table-usage-stage = metadata.ingestion.stage.table_usage_stage:TableUsageStage"
|
|
||||||
],
|
|
||||||
"metadata.ingestion.bulksink.plugins": [
|
|
||||||
"elasticsearch = metadata.ingestion.bulksink.elastic_search:ElasticSearchBulkSink",
|
|
||||||
"metadata-usage = metadata.ingestion.bulksink.metadata_usage_rest:MetadataUsageBulkSink",
|
|
||||||
],
|
|
||||||
},
|
},
|
||||||
install_requires=list(base_requirements),
|
install_requires=list(base_requirements),
|
||||||
extras_require={
|
extras_require={
|
||||||
|
@ -71,8 +71,8 @@ def ingest(config: str) -> None:
|
|||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
workflow.execute()
|
workflow.execute()
|
||||||
ret = workflow.print_status()
|
|
||||||
workflow.stop()
|
workflow.stop()
|
||||||
|
ret = workflow.print_status()
|
||||||
sys.exit(ret)
|
sys.exit(ret)
|
||||||
|
|
||||||
|
|
||||||
|
@ -1,22 +0,0 @@
|
|||||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
# contributor license agreements. See the NOTICE file distributed with
|
|
||||||
# this work for additional information regarding copyright ownership.
|
|
||||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
# (the "License"); you may not use this file except in compliance with
|
|
||||||
# the License. You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
|
|
||||||
from metadata.ingestion.api.bulk_sink import BulkSink
|
|
||||||
from metadata.ingestion.api.registry import Registry
|
|
||||||
|
|
||||||
bulk_sink_registry = Registry[BulkSink]()
|
|
||||||
bulk_sink_registry.load("metadata.ingestion.bulksink.plugins")
|
|
||||||
|
|
||||||
|
|
@ -30,7 +30,7 @@ class ElasticSearchConfig(ConfigModel):
|
|||||||
batch_size: Optional[int] = 10000
|
batch_size: Optional[int] = 10000
|
||||||
|
|
||||||
|
|
||||||
class ElasticSearchBulkSink(BulkSink):
|
class ElasticsearchBulkSink(BulkSink):
|
||||||
"""
|
"""
|
||||||
Elasticsearch Publisher uses Bulk API to load data from JSON file.
|
Elasticsearch Publisher uses Bulk API to load data from JSON file.
|
||||||
A new index is created and data is uploaded into it. After the upload
|
A new index is created and data is uploaded into it. After the upload
|
@ -160,19 +160,19 @@ class ColumnNameScanner(Scanner):
|
|||||||
return list(types)
|
return list(types)
|
||||||
|
|
||||||
|
|
||||||
class PIIProcessorConfig(ConfigModel):
|
class PiiProcessorConfig(ConfigModel):
|
||||||
filter: Optional[str] = None
|
filter: Optional[str] = None
|
||||||
api_endpoint: Optional[str] = None
|
api_endpoint: Optional[str] = None
|
||||||
auth_provider_type: Optional[str] = None
|
auth_provider_type: Optional[str] = None
|
||||||
|
|
||||||
|
|
||||||
class PIIProcessor(Processor):
|
class PiiProcessor(Processor):
|
||||||
config: PIIProcessorConfig
|
config: PiiProcessorConfig
|
||||||
metadata_config: MetadataServerConfig
|
metadata_config: MetadataServerConfig
|
||||||
status: ProcessorStatus
|
status: ProcessorStatus
|
||||||
client: REST
|
client: REST
|
||||||
|
|
||||||
def __init__(self, ctx: WorkflowContext, config: PIIProcessorConfig, metadata_config: MetadataServerConfig):
|
def __init__(self, ctx: WorkflowContext, config: PiiProcessorConfig, metadata_config: MetadataServerConfig):
|
||||||
super().__init__(ctx)
|
super().__init__(ctx)
|
||||||
self.config = config
|
self.config = config
|
||||||
self.metadata_config = metadata_config
|
self.metadata_config = metadata_config
|
||||||
@ -184,7 +184,7 @@ class PIIProcessor(Processor):
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext):
|
def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext):
|
||||||
config = PIIProcessorConfig.parse_obj(config_dict)
|
config = PiiProcessorConfig.parse_obj(config_dict)
|
||||||
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
|
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
|
||||||
return cls(ctx, config, metadata_config)
|
return cls(ctx, config, metadata_config)
|
||||||
|
|
@ -1,23 +0,0 @@
|
|||||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
# contributor license agreements. See the NOTICE file distributed with
|
|
||||||
# this work for additional information regarding copyright ownership.
|
|
||||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
# (the "License"); you may not use this file except in compliance with
|
|
||||||
# the License. You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
|
|
||||||
from metadata.ingestion.api.processor import Processor
|
|
||||||
from metadata.ingestion.api.registry import Registry
|
|
||||||
|
|
||||||
processor_registry = Registry[Processor]()
|
|
||||||
processor_registry.load("metadata.ingestion.processor.plugins")
|
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -29,7 +29,7 @@ class LDAPSourceConfig(ConfigModel):
|
|||||||
api_end_point: str
|
api_end_point: str
|
||||||
|
|
||||||
|
|
||||||
class LdapUserRestSink(Sink):
|
class LdapRestUsersSink(Sink):
|
||||||
config: LDAPSourceConfig
|
config: LDAPSourceConfig
|
||||||
status: SinkStatus
|
status: SinkStatus
|
||||||
|
|
@ -33,7 +33,7 @@ class MetadataTablesSinkConfig(ConfigModel):
|
|||||||
api_endpoint: str = None
|
api_endpoint: str = None
|
||||||
|
|
||||||
|
|
||||||
class MetadataTablesRestSink(Sink):
|
class MetadataRestTablesSink(Sink):
|
||||||
config: MetadataTablesSinkConfig
|
config: MetadataTablesSinkConfig
|
||||||
status: SinkStatus
|
status: SinkStatus
|
||||||
|
|
@ -30,7 +30,7 @@ class MetadataUsersSinkConfig(ConfigModel):
|
|||||||
api_end_point: str = None
|
api_end_point: str = None
|
||||||
|
|
||||||
|
|
||||||
class MetadataUsersRestSink(Sink):
|
class MetadataRestUsersSink(Sink):
|
||||||
config: MetadataUsersSinkConfig
|
config: MetadataUsersSinkConfig
|
||||||
metadata_config: MetadataServerConfig
|
metadata_config: MetadataServerConfig
|
||||||
status: SinkStatus
|
status: SinkStatus
|
@ -1,22 +0,0 @@
|
|||||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
# contributor license agreements. See the NOTICE file distributed with
|
|
||||||
# this work for additional information regarding copyright ownership.
|
|
||||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
# (the "License"); you may not use this file except in compliance with
|
|
||||||
# the License. You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
|
|
||||||
from metadata.ingestion.api.registry import Registry
|
|
||||||
from metadata.ingestion.api.sink import Sink
|
|
||||||
|
|
||||||
sink_registry = Registry[Sink]()
|
|
||||||
sink_registry.load("metadata.ingestion.sink.plugins")
|
|
||||||
# These sinks are always enabled
|
|
||||||
assert sink_registry.get("file")
|
|
@ -31,7 +31,7 @@ class BigQueryConfig(SQLConnectionConfig, SQLSource):
|
|||||||
return f"{self.scheme}://"
|
return f"{self.scheme}://"
|
||||||
|
|
||||||
|
|
||||||
class BigQuerySource(SQLSource):
|
class BigquerySource(SQLSource):
|
||||||
def __init__(self, config, metadata_config, ctx):
|
def __init__(self, config, metadata_config, ctx):
|
||||||
super().__init__(config, metadata_config, ctx)
|
super().__init__(config, metadata_config, ctx)
|
||||||
|
|
||||||
|
@ -32,7 +32,7 @@ class LDAPUserConfig(ConfigModel):
|
|||||||
password: str
|
password: str
|
||||||
|
|
||||||
|
|
||||||
class LDAPUserSource(Source):
|
class LdapUsersSource(Source):
|
||||||
config: LDAPUserConfig
|
config: LDAPUserConfig
|
||||||
status: SourceStatus
|
status: SourceStatus
|
||||||
|
|
@ -30,7 +30,7 @@ class MetadataTablesRestSourceConfig(ConfigModel):
|
|||||||
api_endpoint: Optional[str] = None
|
api_endpoint: Optional[str] = None
|
||||||
|
|
||||||
|
|
||||||
class MetadataTablesRestSource(Source):
|
class MetadataEsSource(Source):
|
||||||
config: MetadataTablesRestSourceConfig
|
config: MetadataTablesRestSourceConfig
|
||||||
report: SourceStatus
|
report: SourceStatus
|
||||||
|
|
@ -20,7 +20,7 @@ from .sql_source import SQLConnectionConfig, SQLSource
|
|||||||
from ..ometa.auth_provider import MetadataServerConfig
|
from ..ometa.auth_provider import MetadataServerConfig
|
||||||
|
|
||||||
|
|
||||||
class SQLServerConfig(SQLConnectionConfig):
|
class MssqlConfig(SQLConnectionConfig):
|
||||||
host_port = "localhost:1433"
|
host_port = "localhost:1433"
|
||||||
scheme = "mssql+pytds"
|
scheme = "mssql+pytds"
|
||||||
|
|
||||||
@ -28,12 +28,12 @@ class SQLServerConfig(SQLConnectionConfig):
|
|||||||
return super().get_connection_url()
|
return super().get_connection_url()
|
||||||
|
|
||||||
|
|
||||||
class SQLServerSource(SQLSource):
|
class MssqlSource(SQLSource):
|
||||||
def __init__(self, config, metadata_config, ctx):
|
def __init__(self, config, metadata_config, ctx):
|
||||||
super().__init__(config, metadata_config, ctx)
|
super().__init__(config, metadata_config, ctx)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def create(cls, config_dict, metadata_config_dict, ctx):
|
def create(cls, config_dict, metadata_config_dict, ctx):
|
||||||
config = SQLServerConfig.parse_obj(config_dict)
|
config = MssqlConfig.parse_obj(config_dict)
|
||||||
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
|
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
|
||||||
return cls(config, metadata_config, ctx)
|
return cls(config, metadata_config, ctx)
|
||||||
|
@ -24,7 +24,7 @@ class MySQLConfig(SQLConnectionConfig):
|
|||||||
def get_connection_url(self):
|
def get_connection_url(self):
|
||||||
return super().get_connection_url()
|
return super().get_connection_url()
|
||||||
|
|
||||||
class MySQLSource(SQLSource):
|
class MysqlSource(SQLSource):
|
||||||
def __init__(self, config, metadata_config, ctx):
|
def __init__(self, config, metadata_config, ctx):
|
||||||
super().__init__(config, metadata_config, ctx)
|
super().__init__(config, metadata_config, ctx)
|
||||||
|
|
||||||
|
@ -14,45 +14,50 @@
|
|||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import csv
|
import csv
|
||||||
import json
|
import pandas as pd
|
||||||
import uuid
|
import uuid
|
||||||
import os
|
import os
|
||||||
from datetime import datetime
|
import json
|
||||||
|
|
||||||
import pandas as pd
|
|
||||||
import random
|
|
||||||
import string
|
|
||||||
import logging
|
|
||||||
|
|
||||||
from faker import Faker
|
|
||||||
from collections import namedtuple
|
from collections import namedtuple
|
||||||
from typing import Iterable, Dict, Any, List, Union
|
from dataclasses import dataclass, field
|
||||||
from metadata.generated.schema.api.services.createDatabaseService import CreateDatabaseServiceEntityRequest
|
from typing import Iterable, List, Dict, Any, Union
|
||||||
from metadata.generated.schema.entity.services.databaseService import DatabaseServiceEntity
|
|
||||||
from metadata.config.common import ConfigModel
|
from metadata.config.common import ConfigModel
|
||||||
from metadata.generated.schema.entity.data.table import TableEntity
|
from metadata.generated.schema.entity.data.table import TableEntity
|
||||||
from metadata.generated.schema.entity.data.database import DatabaseEntity
|
from metadata.generated.schema.entity.data.database import DatabaseEntity
|
||||||
from metadata.generated.schema.type.entityReference import EntityReference
|
from metadata.generated.schema.type.entityReference import EntityReference
|
||||||
from metadata.ingestion.api.source import Source, SourceStatus
|
from metadata.ingestion.api.source import SourceStatus, Source
|
||||||
from dataclasses import dataclass, field
|
|
||||||
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
|
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
|
||||||
from metadata.ingestion.models.table_metadata import DatabaseMetadata
|
|
||||||
from metadata.ingestion.models.table_queries import TableQuery
|
|
||||||
from metadata.ingestion.models.user import User
|
|
||||||
from metadata.ingestion.ometa.auth_provider import MetadataServerConfig
|
from metadata.ingestion.ometa.auth_provider import MetadataServerConfig
|
||||||
from metadata.ingestion.ometa.client import REST
|
from metadata.ingestion.ometa.client import REST
|
||||||
|
from metadata.generated.schema.api.services.createDatabaseService import CreateDatabaseServiceEntityRequest
|
||||||
|
from metadata.generated.schema.entity.services.databaseService import DatabaseServiceEntity
|
||||||
|
|
||||||
COLUMN_NAME = 'Column'
|
COLUMN_NAME = 'Column'
|
||||||
KEY_TYPE = 'Key type'
|
KEY_TYPE = 'Key type'
|
||||||
DATA_TYPE = 'Data type'
|
DATA_TYPE = 'Data type'
|
||||||
FAKER_METHOD = 'Faker method'
|
|
||||||
COL_DESCRIPTION = 'Description'
|
COL_DESCRIPTION = 'Description'
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
TableKey = namedtuple('TableKey', ['schema', 'table_name'])
|
TableKey = namedtuple('TableKey', ['schema', 'table_name'])
|
||||||
|
|
||||||
|
|
||||||
|
def get_service_or_create(service_json, metadata_config) -> DatabaseServiceEntity:
|
||||||
|
client = REST(metadata_config)
|
||||||
|
service = client.get_database_service(service_json['name'])
|
||||||
|
if service is not None:
|
||||||
|
return service
|
||||||
|
else:
|
||||||
|
created_service = client.create_database_service(CreateDatabaseServiceEntityRequest(**service_json))
|
||||||
|
return created_service
|
||||||
|
|
||||||
|
|
||||||
|
def get_table_key(row: Dict[str, Any]) -> Union[TableKey, None]:
|
||||||
|
"""
|
||||||
|
Table key consists of schema and table name
|
||||||
|
:param row:
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
return TableKey(schema=row['schema'], table_name=row['table_name'])
|
||||||
|
|
||||||
|
|
||||||
class SampleTableSourceConfig(ConfigModel):
|
class SampleTableSourceConfig(ConfigModel):
|
||||||
sample_schema_folder: str
|
sample_schema_folder: str
|
||||||
service_name: str
|
service_name: str
|
||||||
@ -64,19 +69,6 @@ class SampleTableSourceConfig(ConfigModel):
|
|||||||
return self.sample_schema_folder
|
return self.sample_schema_folder
|
||||||
|
|
||||||
|
|
||||||
class SampleUserSourceConfig(ConfigModel):
|
|
||||||
no_of_users: int
|
|
||||||
|
|
||||||
|
|
||||||
def get_table_key(row: Dict[str, Any]) -> Union[TableKey, None]:
|
|
||||||
"""
|
|
||||||
Table key consists of schema and table name
|
|
||||||
:param row:
|
|
||||||
:return:
|
|
||||||
"""
|
|
||||||
return TableKey(schema=row['schema'], table_name=row['table_name'])
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class SampleTableSourceStatus(SourceStatus):
|
class SampleTableSourceStatus(SourceStatus):
|
||||||
tables_scanned: List[str] = field(default_factory=list)
|
tables_scanned: List[str] = field(default_factory=list)
|
||||||
@ -85,14 +77,6 @@ class SampleTableSourceStatus(SourceStatus):
|
|||||||
self.tables_scanned.append(table_name)
|
self.tables_scanned.append(table_name)
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class SampleUserSourceStatus(SourceStatus):
|
|
||||||
users_scanned: List[str] = field(default_factory=list)
|
|
||||||
|
|
||||||
def report_table_scanned(self, user_name: str) -> None:
|
|
||||||
self.users_scanned.append(user_name)
|
|
||||||
|
|
||||||
|
|
||||||
class TableSchema:
|
class TableSchema:
|
||||||
def __init__(self, filename):
|
def __init__(self, filename):
|
||||||
# error if the file is not csv file
|
# error if the file is not csv file
|
||||||
@ -122,31 +106,6 @@ class TableSchema:
|
|||||||
return [c[COLUMN_NAME] for c in self.columns]
|
return [c[COLUMN_NAME] for c in self.columns]
|
||||||
|
|
||||||
|
|
||||||
class DataGenerator:
|
|
||||||
def __init__(self, schemas):
|
|
||||||
if not schemas:
|
|
||||||
raise Exception('Input schemas should be an array of one or more TableSchemas')
|
|
||||||
|
|
||||||
self.schemas = schemas
|
|
||||||
|
|
||||||
# validate that each FK is a PK in one of the input schemas
|
|
||||||
# TODO
|
|
||||||
|
|
||||||
self.table_to_schema = dict((s.get_name(), s) for s in schemas)
|
|
||||||
|
|
||||||
def generate_data(self, table_name, number_of_rows):
|
|
||||||
fake = Faker()
|
|
||||||
schema = self.table_to_schema[table_name]
|
|
||||||
data = {}
|
|
||||||
for c in schema.get_schema():
|
|
||||||
if not c[FAKER_METHOD]:
|
|
||||||
logging.debug('{} has no faker method input'.format(c))
|
|
||||||
continue
|
|
||||||
fn = getattr(fake, c[FAKER_METHOD])
|
|
||||||
data[c[COLUMN_NAME]] = [fn() for _ in range(number_of_rows)]
|
|
||||||
return pd.DataFrame(data)
|
|
||||||
|
|
||||||
|
|
||||||
class SampleTableMetadataGenerator:
|
class SampleTableMetadataGenerator:
|
||||||
def __init__(self, table_to_df_dict, table_to_schema_map):
|
def __init__(self, table_to_df_dict, table_to_schema_map):
|
||||||
self.table_to_df_dict = table_to_df_dict
|
self.table_to_df_dict = table_to_df_dict
|
||||||
@ -210,61 +169,7 @@ class SampleTableMetadataGenerator:
|
|||||||
return sorted_row_dict
|
return sorted_row_dict
|
||||||
|
|
||||||
|
|
||||||
class SampleUserMetadataGenerator:
|
class SampleTablesSource(Source):
|
||||||
|
|
||||||
def __init__(self, number_of_users):
|
|
||||||
self.number_of_users = number_of_users
|
|
||||||
|
|
||||||
def generate_sample_user(self):
|
|
||||||
schema = dict()
|
|
||||||
fake = Faker()
|
|
||||||
# columns that use faker
|
|
||||||
schema['email'] = lambda: None
|
|
||||||
schema['first_name'] = lambda: fake.first_name()
|
|
||||||
schema['last_name'] = lambda: fake.last_name()
|
|
||||||
schema['full_name'] = lambda: None
|
|
||||||
schema['github_username'] = lambda: None
|
|
||||||
schema['team_name'] = lambda: random.choice(
|
|
||||||
['Data_Infra', 'Infra', 'Payments', 'Legal', 'Dev_Platform', 'Trust', 'Marketplace'])
|
|
||||||
schema['employee_type'] = lambda: None
|
|
||||||
schema['manager_email'] = lambda: fake.email()
|
|
||||||
schema['slack_id'] = lambda: None
|
|
||||||
schema['role_name'] = lambda: random.choices(
|
|
||||||
['ROLE_ENGINEER', 'ROLE_DATA_SCIENTIST', 'ROLE_ADMIN'], weights=[40, 40, 10])[0]
|
|
||||||
data = {}
|
|
||||||
|
|
||||||
for k in schema.keys():
|
|
||||||
data[k] = [schema[k]() for _ in range(self.number_of_users)]
|
|
||||||
|
|
||||||
# fill in the columns that can be derived from the random data above
|
|
||||||
for i in range(self.number_of_users):
|
|
||||||
data['full_name'][i] = data['first_name'][i] + ' ' + data['last_name'][i]
|
|
||||||
username = data['first_name'][i].lower() + '_' + data['last_name'][i].lower() + random.choice(
|
|
||||||
string.digits)
|
|
||||||
data['slack_id'][i] = username
|
|
||||||
data['github_username'][i] = username
|
|
||||||
data['email'][i] = username + '@gmail.com'
|
|
||||||
data['employee_type'] = data['role_name']
|
|
||||||
|
|
||||||
pd_rows = pd.DataFrame(data)
|
|
||||||
row_dict = []
|
|
||||||
for index, row in pd_rows.iterrows():
|
|
||||||
row_dict.append(row)
|
|
||||||
|
|
||||||
return row_dict
|
|
||||||
|
|
||||||
|
|
||||||
def get_service_or_create(service_json, metadata_config) -> DatabaseServiceEntity:
|
|
||||||
client = REST(metadata_config)
|
|
||||||
service = client.get_database_service(service_json['name'])
|
|
||||||
if service is not None:
|
|
||||||
return service
|
|
||||||
else:
|
|
||||||
created_service = client.create_database_service(CreateDatabaseServiceEntityRequest(**service_json))
|
|
||||||
return created_service
|
|
||||||
|
|
||||||
|
|
||||||
class SampleTableSource(Source):
|
|
||||||
|
|
||||||
def __init__(self, config: SampleTableSourceConfig, metadata_config: MetadataServerConfig, ctx):
|
def __init__(self, config: SampleTableSourceConfig, metadata_config: MetadataServerConfig, ctx):
|
||||||
super().__init__(ctx)
|
super().__init__(ctx)
|
||||||
@ -302,80 +207,3 @@ class SampleTableSource(Source):
|
|||||||
|
|
||||||
def get_status(self):
|
def get_status(self):
|
||||||
return self.status
|
return self.status
|
||||||
|
|
||||||
|
|
||||||
class SampleUsageSource(Source):
|
|
||||||
|
|
||||||
def __init__(self, config: SampleTableSourceConfig, metadata_config: MetadataServerConfig, ctx):
|
|
||||||
super().__init__(ctx)
|
|
||||||
self.status = SampleTableSourceStatus()
|
|
||||||
self.config = config
|
|
||||||
self.metadata_config = metadata_config
|
|
||||||
self.client = REST(metadata_config)
|
|
||||||
self.service_json = json.load(open(config.sample_schema_folder + "/service.json", 'r'))
|
|
||||||
self.query_log_csv = config.sample_schema_folder + "/query_log"
|
|
||||||
with open(self.query_log_csv, 'r') as fin:
|
|
||||||
self.query_logs = [dict(i) for i in csv.DictReader(fin)]
|
|
||||||
self.service = get_service_or_create(self.service_json, metadata_config)
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def create(cls, config_dict, metadata_config_dict, ctx):
|
|
||||||
config = SampleTableSourceConfig.parse_obj(config_dict)
|
|
||||||
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
|
|
||||||
return cls(config, metadata_config, ctx)
|
|
||||||
|
|
||||||
def prepare(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def next_record(self) -> Iterable[TableQuery]:
|
|
||||||
for row in self.query_logs:
|
|
||||||
tq = TableQuery(row['query'], '', 100, 0, 0, '',
|
|
||||||
'', datetime.today().strftime('%Y-%m-%d %H:%M:%S'), 100, 'shopify',
|
|
||||||
False, row['query'])
|
|
||||||
yield tq
|
|
||||||
|
|
||||||
def close(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def get_status(self):
|
|
||||||
return self.status
|
|
||||||
|
|
||||||
|
|
||||||
class SampleUserSource(Source):
|
|
||||||
|
|
||||||
def __init__(self, config: SampleUserSourceConfig, metadata_config: MetadataServerConfig, ctx):
|
|
||||||
super().__init__(ctx)
|
|
||||||
self.status = SampleUserSourceStatus()
|
|
||||||
metadata_gen = SampleUserMetadataGenerator(config.no_of_users)
|
|
||||||
self.sample_columns = metadata_gen.generate_sample_user()
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def create(cls, config_dict, metadata_config_dict, ctx):
|
|
||||||
config = SampleUserSourceConfig.parse_obj(config_dict)
|
|
||||||
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
|
|
||||||
return cls(config, metadata_config, ctx)
|
|
||||||
|
|
||||||
def prepare(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def next_record(self) -> Iterable[DatabaseMetadata]:
|
|
||||||
for user in self.sample_columns:
|
|
||||||
user_metadata = User(user['email'],
|
|
||||||
user['first_name'],
|
|
||||||
user['last_name'],
|
|
||||||
user['full_name'],
|
|
||||||
user['github_username'],
|
|
||||||
user['team_name'],
|
|
||||||
user['employee_type'],
|
|
||||||
user['manager_email'],
|
|
||||||
user['slack_id'],
|
|
||||||
True,
|
|
||||||
0)
|
|
||||||
self.status.report_table_scanned(user['github_username'])
|
|
||||||
yield user_metadata
|
|
||||||
|
|
||||||
def close(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def get_status(self):
|
|
||||||
return self.status
|
|
46
ingestion/src/metadata/ingestion/source/sample_usage.py
Normal file
46
ingestion/src/metadata/ingestion/source/sample_usage.py
Normal file
@ -0,0 +1,46 @@
|
|||||||
|
import json
|
||||||
|
import csv
|
||||||
|
from metadata.ingestion.api.source import Source
|
||||||
|
from .sample_tables import SampleTableSourceConfig, SampleTableSourceStatus, get_service_or_create
|
||||||
|
from metadata.ingestion.ometa.auth_provider import MetadataServerConfig
|
||||||
|
from metadata.ingestion.models.table_queries import TableQuery
|
||||||
|
from typing import Iterable
|
||||||
|
from datetime import datetime
|
||||||
|
from metadata.ingestion.ometa.client import REST
|
||||||
|
|
||||||
|
|
||||||
|
class SampleUsageSource(Source):
|
||||||
|
|
||||||
|
def __init__(self, config: SampleTableSourceConfig, metadata_config: MetadataServerConfig, ctx):
|
||||||
|
super().__init__(ctx)
|
||||||
|
self.status = SampleTableSourceStatus()
|
||||||
|
self.config = config
|
||||||
|
self.metadata_config = metadata_config
|
||||||
|
self.client = REST(metadata_config)
|
||||||
|
self.service_json = json.load(open(config.sample_schema_folder + "/service.json", 'r'))
|
||||||
|
self.query_log_csv = config.sample_schema_folder + "/query_log"
|
||||||
|
with open(self.query_log_csv, 'r') as fin:
|
||||||
|
self.query_logs = [dict(i) for i in csv.DictReader(fin)]
|
||||||
|
self.service = get_service_or_create(self.service_json, metadata_config)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def create(cls, config_dict, metadata_config_dict, ctx):
|
||||||
|
config = SampleTableSourceConfig.parse_obj(config_dict)
|
||||||
|
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
|
||||||
|
return cls(config, metadata_config, ctx)
|
||||||
|
|
||||||
|
def prepare(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def next_record(self) -> Iterable[TableQuery]:
|
||||||
|
for row in self.query_logs:
|
||||||
|
tq = TableQuery(row['query'], '', 100, 0, 0, '',
|
||||||
|
'', datetime.today().strftime('%Y-%m-%d %H:%M:%S'), 100, 'shopify',
|
||||||
|
False, row['query'])
|
||||||
|
yield tq
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def get_status(self):
|
||||||
|
return self.status
|
122
ingestion/src/metadata/ingestion/source/sample_users.py
Normal file
122
ingestion/src/metadata/ingestion/source/sample_users.py
Normal file
@ -0,0 +1,122 @@
|
|||||||
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
# contributor license agreements. See the NOTICE file distributed with
|
||||||
|
# this work for additional information regarding copyright ownership.
|
||||||
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
# (the "License"); you may not use this file except in compliance with
|
||||||
|
# the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import random
|
||||||
|
import string
|
||||||
|
import pandas as pd
|
||||||
|
from faker import Faker
|
||||||
|
from typing import Iterable, List
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from metadata.config.common import ConfigModel
|
||||||
|
from metadata.ingestion.api.source import Source, SourceStatus
|
||||||
|
from metadata.ingestion.ometa.auth_provider import MetadataServerConfig
|
||||||
|
from metadata.ingestion.models.table_metadata import DatabaseMetadata
|
||||||
|
from metadata.ingestion.models.user import User
|
||||||
|
|
||||||
|
|
||||||
|
class SampleUserSourceConfig(ConfigModel):
|
||||||
|
no_of_users: int
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class SampleUserSourceStatus(SourceStatus):
|
||||||
|
users_scanned: List[str] = field(default_factory=list)
|
||||||
|
|
||||||
|
def report_table_scanned(self, user_name: str) -> None:
|
||||||
|
self.users_scanned.append(user_name)
|
||||||
|
|
||||||
|
|
||||||
|
class SampleUserMetadataGenerator:
|
||||||
|
|
||||||
|
def __init__(self, number_of_users):
|
||||||
|
self.number_of_users = number_of_users
|
||||||
|
|
||||||
|
def generate_sample_user(self):
|
||||||
|
schema = dict()
|
||||||
|
fake = Faker()
|
||||||
|
# columns that use faker
|
||||||
|
schema['email'] = lambda: None
|
||||||
|
schema['first_name'] = lambda: fake.first_name()
|
||||||
|
schema['last_name'] = lambda: fake.last_name()
|
||||||
|
schema['full_name'] = lambda: None
|
||||||
|
schema['github_username'] = lambda: None
|
||||||
|
schema['team_name'] = lambda: random.choice(
|
||||||
|
['Data_Infra', 'Infra', 'Payments', 'Legal', 'Dev_Platform', 'Trust', 'Marketplace'])
|
||||||
|
schema['employee_type'] = lambda: None
|
||||||
|
schema['manager_email'] = lambda: fake.email()
|
||||||
|
schema['slack_id'] = lambda: None
|
||||||
|
schema['role_name'] = lambda: random.choices(
|
||||||
|
['ROLE_ENGINEER', 'ROLE_DATA_SCIENTIST', 'ROLE_ADMIN'], weights=[40, 40, 10])[0]
|
||||||
|
data = {}
|
||||||
|
|
||||||
|
for k in schema.keys():
|
||||||
|
data[k] = [schema[k]() for _ in range(self.number_of_users)]
|
||||||
|
|
||||||
|
# fill in the columns that can be derived from the random data above
|
||||||
|
for i in range(self.number_of_users):
|
||||||
|
data['full_name'][i] = data['first_name'][i] + ' ' + data['last_name'][i]
|
||||||
|
username = data['first_name'][i].lower() + '_' + data['last_name'][i].lower() + random.choice(
|
||||||
|
string.digits)
|
||||||
|
data['slack_id'][i] = username
|
||||||
|
data['github_username'][i] = username
|
||||||
|
data['email'][i] = username + '@gmail.com'
|
||||||
|
data['employee_type'] = data['role_name']
|
||||||
|
|
||||||
|
pd_rows = pd.DataFrame(data)
|
||||||
|
row_dict = []
|
||||||
|
for index, row in pd_rows.iterrows():
|
||||||
|
row_dict.append(row)
|
||||||
|
|
||||||
|
return row_dict
|
||||||
|
|
||||||
|
|
||||||
|
class SampleUsersSource(Source):
|
||||||
|
|
||||||
|
def __init__(self, config: SampleUserSourceConfig, metadata_config: MetadataServerConfig, ctx):
|
||||||
|
super().__init__(ctx)
|
||||||
|
self.status = SampleUserSourceStatus()
|
||||||
|
metadata_gen = SampleUserMetadataGenerator(config.no_of_users)
|
||||||
|
self.sample_columns = metadata_gen.generate_sample_user()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def create(cls, config_dict, metadata_config_dict, ctx):
|
||||||
|
config = SampleUserSourceConfig.parse_obj(config_dict)
|
||||||
|
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
|
||||||
|
return cls(config, metadata_config, ctx)
|
||||||
|
|
||||||
|
def prepare(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def next_record(self) -> Iterable[DatabaseMetadata]:
|
||||||
|
for user in self.sample_columns:
|
||||||
|
user_metadata = User(user['email'],
|
||||||
|
user['first_name'],
|
||||||
|
user['last_name'],
|
||||||
|
user['full_name'],
|
||||||
|
user['github_username'],
|
||||||
|
user['team_name'],
|
||||||
|
user['employee_type'],
|
||||||
|
user['manager_email'],
|
||||||
|
user['slack_id'],
|
||||||
|
True,
|
||||||
|
0)
|
||||||
|
self.status.report_table_scanned(user['github_username'])
|
||||||
|
yield user_metadata
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def get_status(self):
|
||||||
|
return self.status
|
@ -28,7 +28,7 @@ class SnowflakeUsageSource(Source):
|
|||||||
# SELECT statement from mysql information_schema to extract table and column metadata
|
# SELECT statement from mysql information_schema to extract table and column metadata
|
||||||
SQL_STATEMENT = """
|
SQL_STATEMENT = """
|
||||||
select query_id as query,Query_text as sql,query_type as label,
|
select query_id as query,Query_text as sql,query_type as label,
|
||||||
database_name as database,start_time as starttime,end_time as endtime
|
database_name as database,start_time as starttime,end_time as endtime,schema_name
|
||||||
from table(information_schema.query_history(
|
from table(information_schema.query_history(
|
||||||
end_time_range_start=>to_timestamp_ltz('{start_date}'),
|
end_time_range_start=>to_timestamp_ltz('{start_date}'),
|
||||||
end_time_range_end=>to_timestamp_ltz('{end_date}')));
|
end_time_range_end=>to_timestamp_ltz('{end_date}')));
|
||||||
@ -83,7 +83,7 @@ class SnowflakeUsageSource(Source):
|
|||||||
for row in self._get_raw_extract_iter():
|
for row in self._get_raw_extract_iter():
|
||||||
tq = TableQuery(row['query'], row['label'], 0, 0, 0, str(row['starttime']),
|
tq = TableQuery(row['query'], row['label'], 0, 0, 0, str(row['starttime']),
|
||||||
str(row['endtime']), str(row['starttime'])[0:19], 2, row['database'], 0, row['sql'])
|
str(row['endtime']), str(row['starttime'])[0:19], 2, row['database'], 0, row['sql'])
|
||||||
self.report.scanned(tq)
|
self.report.scanned(f"{row['database']}.{row['schema_name']}")
|
||||||
yield tq
|
yield tq
|
||||||
|
|
||||||
def get_report(self):
|
def get_report(self):
|
||||||
|
@ -1,22 +0,0 @@
|
|||||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
# contributor license agreements. See the NOTICE file distributed with
|
|
||||||
# this work for additional information regarding copyright ownership.
|
|
||||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
# (the "License"); you may not use this file except in compliance with
|
|
||||||
# the License. You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
|
|
||||||
from metadata.ingestion.api.registry import Registry
|
|
||||||
from metadata.ingestion.api.source import Source
|
|
||||||
|
|
||||||
source_registry = Registry[Source]()
|
|
||||||
source_registry.load("metadata.ingestion.source.plugins")
|
|
||||||
|
|
||||||
# This source is always enabled
|
|
@ -1,22 +0,0 @@
|
|||||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
# contributor license agreements. See the NOTICE file distributed with
|
|
||||||
# this work for additional information regarding copyright ownership.
|
|
||||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
# (the "License"); you may not use this file except in compliance with
|
|
||||||
# the License. You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
|
|
||||||
from metadata.ingestion.api.registry import Registry
|
|
||||||
from metadata.ingestion.api.stage import Stage
|
|
||||||
|
|
||||||
stage_registry = Registry[Stage]()
|
|
||||||
stage_registry.load("metadata.ingestion.stage.plugins")
|
|
||||||
|
|
||||||
|
|
@ -31,11 +31,8 @@ from metadata.ingestion.api.processor import Processor
|
|||||||
from metadata.ingestion.api.sink import Sink
|
from metadata.ingestion.api.sink import Sink
|
||||||
from metadata.ingestion.api.source import Source
|
from metadata.ingestion.api.source import Source
|
||||||
from metadata.ingestion.api.stage import Stage
|
from metadata.ingestion.api.stage import Stage
|
||||||
from metadata.ingestion.bulksink.bulk_sink_registry import bulk_sink_registry
|
from metadata.ingestion.api.registry import Registry
|
||||||
from metadata.ingestion.sink.sink_registry import sink_registry
|
from metadata.ingestion.api.source import Source
|
||||||
from metadata.ingestion.source.source_registry import source_registry
|
|
||||||
from metadata.ingestion.processor.processor_registry import processor_registry
|
|
||||||
from metadata.ingestion.stage.stage_registry import stage_registry
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -61,9 +58,10 @@ class Workflow:
|
|||||||
def __init__(self, config: WorkflowConfig):
|
def __init__(self, config: WorkflowConfig):
|
||||||
self.config = config
|
self.config = config
|
||||||
self.ctx = WorkflowContext(workflow_id=self.config.run_id)
|
self.ctx = WorkflowContext(workflow_id=self.config.run_id)
|
||||||
|
|
||||||
source_type = self.config.source.type
|
source_type = self.config.source.type
|
||||||
source_class = source_registry.get(source_type)
|
source_registry = Registry[Source]()
|
||||||
|
source_class = source_registry.get('metadata.ingestion.source.{}.{}Source'.format(
|
||||||
|
source_type.replace('-', '_'), ''.join([i.title() for i in source_type.replace('-', '_').split('_')])))
|
||||||
metadata_config = self.config.metadata_server.dict().get("config", {})
|
metadata_config = self.config.metadata_server.dict().get("config", {})
|
||||||
self.source: Source = source_class.create(
|
self.source: Source = source_class.create(
|
||||||
self.config.source.dict().get("config", {}), metadata_config, self.ctx
|
self.config.source.dict().get("config", {}), metadata_config, self.ctx
|
||||||
@ -74,28 +72,36 @@ class Workflow:
|
|||||||
|
|
||||||
if self.config.processor:
|
if self.config.processor:
|
||||||
processor_type = self.config.processor.type
|
processor_type = self.config.processor.type
|
||||||
processor_class = processor_registry.get(processor_type)
|
processor_registry = Registry[Processor]()
|
||||||
|
processor_class = processor_registry.get('metadata.ingestion.processor.{}.{}Processor'.format(
|
||||||
|
processor_type.replace('-', '_'), ''.join([i.title() for i in processor_type.replace('-', '_').split('_')])))
|
||||||
processor_config = self.config.processor.dict().get("config", {})
|
processor_config = self.config.processor.dict().get("config", {})
|
||||||
self.processor: Processor = processor_class.create(processor_config, metadata_config, self.ctx)
|
self.processor: Processor = processor_class.create(processor_config, metadata_config, self.ctx)
|
||||||
logger.debug(f"Processor Type: {processor_type}, {processor_class} configured")
|
logger.debug(f"Processor Type: {processor_type}, {processor_class} configured")
|
||||||
|
|
||||||
if self.config.stage:
|
if self.config.stage:
|
||||||
stage_type = self.config.stage.type
|
stage_type = self.config.stage.type
|
||||||
stage_class = stage_registry.get(stage_type)
|
stage_registry = Registry[Stage]()
|
||||||
|
stage_class = stage_registry.get('metadata.ingestion.stage.{}.{}Stage'.format(
|
||||||
|
stage_type.replace('-', '_'), ''.join([i.title() for i in stage_type.replace('-', '_').split('_')])))
|
||||||
stage_config = self.config.stage.dict().get("config", {})
|
stage_config = self.config.stage.dict().get("config", {})
|
||||||
self.stage: Stage = stage_class.create(stage_config, metadata_config, self.ctx)
|
self.stage: Stage = stage_class.create(stage_config, metadata_config, self.ctx)
|
||||||
logger.debug(f"Stage Type: {stage_type}, {stage_class} configured")
|
logger.debug(f"Stage Type: {stage_type}, {stage_class} configured")
|
||||||
|
|
||||||
if self.config.sink:
|
if self.config.sink:
|
||||||
sink_type = self.config.sink.type
|
sink_type = self.config.sink.type
|
||||||
sink_class = sink_registry.get(sink_type)
|
sink_registry = Registry[Sink]()
|
||||||
|
sink_class = sink_registry.get('metadata.ingestion.sink.{}.{}Sink'.format(
|
||||||
|
sink_type.replace('-', '_'), ''.join([i.title() for i in sink_type.replace('-', '_').split('_')])))
|
||||||
sink_config = self.config.sink.dict().get("config", {})
|
sink_config = self.config.sink.dict().get("config", {})
|
||||||
self.sink: Sink = sink_class.create(sink_config, metadata_config, self.ctx)
|
self.sink: Sink = sink_class.create(sink_config, metadata_config, self.ctx)
|
||||||
logger.debug(f"Sink type:{self.config.sink.type},{sink_class} configured")
|
logger.debug(f"Sink type:{self.config.sink.type},{sink_class} configured")
|
||||||
|
|
||||||
if self.config.bulk_sink:
|
if self.config.bulk_sink:
|
||||||
bulk_sink_type = self.config.bulk_sink.type
|
bulk_sink_type = self.config.bulk_sink.type
|
||||||
bulk_sink_class = bulk_sink_registry.get(bulk_sink_type)
|
bulk_sink_registry = Registry[BulkSink]()
|
||||||
|
bulk_sink_class = bulk_sink_registry.get('metadata.ingestion.bulksink.{}.{}BulkSink'.format(
|
||||||
|
bulk_sink_type.replace('-', '_'), ''.join([i.title() for i in bulk_sink_type.replace('-', '_').split('_')])))
|
||||||
bulk_sink_config = self.config.bulk_sink.dict().get("config", {})
|
bulk_sink_config = self.config.bulk_sink.dict().get("config", {})
|
||||||
self.bulk_sink: BulkSink = bulk_sink_class.create(bulk_sink_config, metadata_config, self.ctx)
|
self.bulk_sink: BulkSink = bulk_sink_class.create(bulk_sink_config, metadata_config, self.ctx)
|
||||||
logger.info(f"BulkSink type:{self.config.bulk_sink.type},{bulk_sink_class} configured")
|
logger.info(f"BulkSink type:{self.config.bulk_sink.type},{bulk_sink_class} configured")
|
||||||
|
Loading…
x
Reference in New Issue
Block a user