From 51110c2a86da6e02cefdd59f544a36831504e8de Mon Sep 17 00:00:00 2001 From: Milan Bariya <52292922+MilanBariya@users.noreply.github.com> Date: Wed, 20 Apr 2022 17:43:53 +0530 Subject: [PATCH] Fix: SQLSourceStatus import error (#4272) --- .../src/metadata/ingestion/source/athena.py | 3 +- .../src/metadata/ingestion/source/druid.py | 3 +- .../src/metadata/ingestion/source/glue.py | 2 +- .../ingestion/source/sample_entity.py | 336 ------------------ 4 files changed, 3 insertions(+), 341 deletions(-) delete mode 100644 ingestion/src/metadata/ingestion/source/sample_entity.py diff --git a/ingestion/src/metadata/ingestion/source/athena.py b/ingestion/src/metadata/ingestion/source/athena.py index 11fd024c518..d0818a6d582 100644 --- a/ingestion/src/metadata/ingestion/source/athena.py +++ b/ingestion/src/metadata/ingestion/source/athena.py @@ -18,10 +18,9 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata OpenMetadataConnection, ) from metadata.ingestion.source.sql_source import SQLSource -from metadata.ingestion.source.sql_source_common import SQLConnectionConfig -class AthenaConfig(AthenaConnection, SQLConnectionConfig): +class AthenaConfig(AthenaConnection): def get_connection_url(self): url = f"{self.scheme}://" if self.username: diff --git a/ingestion/src/metadata/ingestion/source/druid.py b/ingestion/src/metadata/ingestion/source/druid.py index 26a392986d6..de7de22c288 100644 --- a/ingestion/src/metadata/ingestion/source/druid.py +++ b/ingestion/src/metadata/ingestion/source/druid.py @@ -19,10 +19,9 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata OpenMetadataConnection, ) from metadata.ingestion.source.sql_source import SQLSource -from metadata.ingestion.source.sql_source_common import SQLConnectionConfig -class DruidConfig(DruidConnection, SQLConnectionConfig): +class DruidConfig(DruidConnection): def get_connection_url(self): url = super().get_connection_url() return f"{url}/druid/v2/sql" diff --git a/ingestion/src/metadata/ingestion/source/glue.py b/ingestion/src/metadata/ingestion/source/glue.py index e45193b3eea..1c6d8cd749d 100644 --- a/ingestion/src/metadata/ingestion/source/glue.py +++ b/ingestion/src/metadata/ingestion/source/glue.py @@ -35,7 +35,7 @@ from metadata.ingestion.api.common import Entity from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.ingestion.source.sql_source_common import SQLSourceStatus +from metadata.ingestion.source.sql_source import SQLSourceStatus from metadata.utils.aws_client import AWSClient from metadata.utils.column_type_parser import ColumnTypeParser from metadata.utils.filters import filter_by_schema, filter_by_table diff --git a/ingestion/src/metadata/ingestion/source/sample_entity.py b/ingestion/src/metadata/ingestion/source/sample_entity.py deleted file mode 100644 index 2598e889542..00000000000 --- a/ingestion/src/metadata/ingestion/source/sample_entity.py +++ /dev/null @@ -1,336 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -import random -import uuid -from dataclasses import dataclass, field -from typing import Iterable, List - -from faker import Faker - -from metadata.generated.schema.api.data.createTopic import CreateTopicRequest -from metadata.generated.schema.api.services.createDashboardService import ( - CreateDashboardServiceRequest, -) -from metadata.generated.schema.api.services.createDatabaseService import ( - CreateDatabaseServiceRequest, -) -from metadata.generated.schema.api.services.createMessagingService import ( - CreateMessagingServiceRequest, -) -from metadata.generated.schema.entity.data.database import Database -from metadata.generated.schema.entity.data.table import Column, Constraint, Table -from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( - OpenMetadataConnection, -) -from metadata.generated.schema.type.entityReference import EntityReference -from metadata.generated.schema.type.tagLabel import TagLabel -from metadata.ingestion.api.common import Entity -from metadata.ingestion.api.source import Source, SourceStatus -from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable -from metadata.ingestion.models.table_metadata import Chart, Dashboard -from metadata.ingestion.ometa.client import APIError -from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.ingestion.source.sql_source import SQLConnectionConfig - -logger: logging.Logger = logging.getLogger(__name__) - - -class SampleEntitySourceConfig(SQLConnectionConfig): - no_of_services: int - no_of_databases: int - no_of_tables: int - no_of_columns: int - no_of_dashboards: int - no_of_charts: int - no_of_topics: int - generate_tables: bool = True - generate_dashboards: bool = True - generate_topics: bool = True - - def get_connection_url(self): - pass - - -@dataclass -class SampleEntitySourceStatus(SourceStatus): - success: List[str] = field(default_factory=list) - failures: List[str] = field(default_factory=list) - warnings: List[str] = field(default_factory=list) - - def scanned(self, entity_type: str, entity_name: str) -> None: - self.success.append(entity_name) - logger.info("{} Scanned: {}".format(entity_type, entity_name)) - - def filtered(self, entity_type: str, entity_name: str, err: str) -> None: - self.warnings.append(entity_name) - logger.warning("Dropped {} {} due to {}".format(entity_type, entity_name, err)) - - -class SampleEntitySource(Source[Entity]): - def __init__( - self, - config: SampleEntitySourceConfig, - metadata_config: OpenMetadataConnection, - ): - super().__init__() - self.faker = Faker() - self.status = SampleEntitySourceStatus() - self.config = config - self.metadata_config = metadata_config - self.metadata = OpenMetadata(metadata_config) - self.service_name = lambda: self.faker.word() - self.service_type = lambda: random.choice( - ["BigQuery", "Hive", "MSSQL", "MySQL", "Postgres", "Redshift", "Snowflake"] - ) - self.database_name = lambda: self.faker.word() - self.table_name = lambda: self.faker.word() - self.column_name = lambda: self.faker.word() - self.description = lambda: self.faker.text() - self.chart_ids = lambda: self.faker.random_int() - self.tags = self.__get_tags() - self.tagFQN = lambda: self.faker.first_name() - self.labelType = lambda: random.choice( - ["Automated", "Derived", "Manual", "Propagated"] - ) - self.state = lambda: random.choice(["Confirmed", "Suggested"]) - self.href = lambda: self.faker.url() - self.col_type = lambda: random.choice(["INT", "STRING", "VARCHAR", "DATE"]) - self.chart_type = lambda: random.choice( - ["Area", "Line", "Table", "Bar", "Pie", "Histogram", "Scatter", "Text"] - ) - self.col_constraint = lambda: random.choice( - [Constraint.UNIQUE, Constraint.NOT_NULL, Constraint.NULL] - ) - - @classmethod - def create(cls, config_dict, metadata_config: OpenMetadataConnection): - config = SampleEntitySourceConfig.parse_obj(config_dict) - return cls(config, metadata_config) - - def prepare(self): - pass - - def __get_tags(self) -> {}: - return self.metadata.list_tags_by_category("user") - - def scan(self, text): - types = set() - for pii_type in self.regex: - if self.regex[pii_type].match(text) is not None: - types.add(pii_type.name) - - logging.debug("PiiTypes are %s", ",".join(str(x) for x in list(types))) - return list(types) - - def next_record(self) -> Iterable[Entity]: - if self.config.generate_tables: - yield from self.ingest_tables() - if self.config.generate_dashboards: - yield from self.ingest_dashboards() - if self.config.generate_topics: - yield from self.ingest_topics() - - def ingest_tables(self) -> Iterable[OMetaDatabaseAndTable]: - for h in range(self.config.no_of_services): - service = { - "databaseConnection": { - "hostPort": f"localhost", - "username": "sample_user", - "password": "sample_password", - }, - "name": self.service_name(), - "description": self.description(), - "serviceType": self.service_type(), - } - create_service = None - while True: - try: - create_service = self.metadata.create_or_update( - CreateDatabaseServiceRequest(**service) - ) - break - except APIError as err: - continue - - logger.info( - "Ingesting service {}/{}".format(h + 1, self.config.no_of_services) - ) - for i in range(self.config.no_of_databases): - db = Database( - id=uuid.uuid4(), - name=self.database_name().replace(".", "_"), - description=self.description(), - service=EntityReference( - id=create_service.id, type=self.config.service_type - ), - ) - - logger.info( - "Ingesting database {}/{} in service: {}/{}".format( - i + 1, - self.config.no_of_databases, - h + 1, - self.config.no_of_services, - ) - ) - - for j in range(self.config.no_of_tables): - table_columns = [] - table_entity = Table( - id=uuid.uuid4(), - name=self.table_name().replace(".", "_"), - tableType="Regular", - description=self.description(), - columns=table_columns, - ) - row_order = 0 - for t in range(self.config.no_of_columns): - tag_labels = [] - tag_entity = random.choice(self.tags) - tag_labels.append( - TagLabel( - tagFQN=tag_entity.fullyQualifiedName, - labelType="Automated", - state="Suggested", - href=tag_entity.href, - ) - ) - table_columns.append( - Column( - name=self.column_name(), - description=self.description(), - dataType=self.col_type(), - constraint=self.col_constraint(), - dataLength=100, - ordinalPosition=row_order, - tags=tag_labels, - ) - ) - table_entity.columns = table_columns - row_order = row_order + 1 - - table_and_db = OMetaDatabaseAndTable( - table=table_entity, database=db - ) - yield table_and_db - - def ingest_dashboards(self) -> Iterable[Dashboard]: - for h in range(self.config.no_of_services): - create_service = None - while True: - try: - service = { - "name": self.service_name(), - "description": self.description(), - "dashboardUrl": "http://localhost:8088", - "userName": "admin", - "password": "admin", - "serviceType": "Superset", - } - create_service = self.metadata.create_or_update( - CreateDashboardServiceRequest(**service) - ) - break - except APIError: - continue - - logger.info( - "Ingesting service {}/{}".format(h + 1, self.config.no_of_services) - ) - for i in range(self.config.no_of_dashboards): - logger.info( - "Ingesting dashboard {}/{} in service: {}/{}".format( - i + 1, - self.config.no_of_databases, - h + 1, - self.config.no_of_services, - ) - ) - chart_ids = [] - for j in range(self.config.no_of_charts): - chart_id = self.chart_ids() - chart_entity = Chart( - id=uuid.uuid4(), - name=str(chart_id), - displayName=self.table_name(), - description=self.description(), - chart_type=self.chart_type(), - chartId=str(chart_id), - url="http://superset:8080/chartUrl", - service=EntityReference( - id=create_service.id, type="dashboardService" - ), - ) - chart_ids.append(str(chart_id)) - yield chart_entity - - dashboard = Dashboard( - id=uuid.uuid4(), - name=str(self.chart_ids()), - displayName=self.table_name(), - description=self.description(), - url="http://superset:8080/dashboardUrl", - charts=chart_ids, - service=EntityReference( - id=create_service.id, type="dashboardService" - ), - ) - yield dashboard - - def ingest_topics(self) -> Iterable[CreateTopicRequest]: - for h in range(self.config.no_of_services): - create_service = None - while True: - try: - service = { - "name": self.service_name(), - "description": self.description(), - "brokers": ["localhost:9092"], - "schemaRegistry": "http://localhost:8081", - "serviceType": "Kafka", - } - create_service = self.metadata.create_or_update( - CreateMessagingServiceRequest(**service) - ) - break - except APIError: - continue - - logger.info( - "Ingesting service {}/{}".format(h + 1, self.config.no_of_services) - ) - for j in range(self.config.no_of_topics): - topic_entity = CreateTopicRequest( - name=self.table_name(), - description=self.description(), - partitions=self.chart_ids(), - retentionSize=322122382273, - replicationFactor=2, - maximumMessageSize=167, - cleanupPolicies=["delete"], - schemaType="Avro", - schemaText='{"namespace":"org.open-metadata.kafka","name":"Customer","type":"record","fields":[{"name":"id","type":"string"},{"name":"first_name","type":"string"},{"name":"last_name","type":"string"},{"name":"email","type":"string"},{"name":"address_line_1","type":"string"},{"name":"address_line_2","type":"string"},{"name":"post_code","type":"string"},{"name":"country","type":"string"}]}', - service=EntityReference( - id=create_service.id, type="messagingService" - ), - ) - yield topic_entity - - def close(self): - pass - - def get_status(self): - return self.status - - def test_connection(self) -> None: - pass