Fix: SQLSourceStatus import error (#4272)

This commit is contained in:
Milan Bariya 2022-04-20 17:43:53 +05:30 committed by GitHub
parent e876d01841
commit 51110c2a86
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 3 additions and 341 deletions

View File

@ -18,10 +18,9 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata
OpenMetadataConnection,
)
from metadata.ingestion.source.sql_source import SQLSource
from metadata.ingestion.source.sql_source_common import SQLConnectionConfig
class AthenaConfig(AthenaConnection, SQLConnectionConfig):
class AthenaConfig(AthenaConnection):
def get_connection_url(self):
url = f"{self.scheme}://"
if self.username:

View File

@ -19,10 +19,9 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata
OpenMetadataConnection,
)
from metadata.ingestion.source.sql_source import SQLSource
from metadata.ingestion.source.sql_source_common import SQLConnectionConfig
class DruidConfig(DruidConnection, SQLConnectionConfig):
class DruidConfig(DruidConnection):
def get_connection_url(self):
url = super().get_connection_url()
return f"{url}/druid/v2/sql"

View File

@ -35,7 +35,7 @@ from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.sql_source_common import SQLSourceStatus
from metadata.ingestion.source.sql_source import SQLSourceStatus
from metadata.utils.aws_client import AWSClient
from metadata.utils.column_type_parser import ColumnTypeParser
from metadata.utils.filters import filter_by_schema, filter_by_table

View File

@ -1,336 +0,0 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import random
import uuid
from dataclasses import dataclass, field
from typing import Iterable, List
from faker import Faker
from metadata.generated.schema.api.data.createTopic import CreateTopicRequest
from metadata.generated.schema.api.services.createDashboardService import (
CreateDashboardServiceRequest,
)
from metadata.generated.schema.api.services.createDatabaseService import (
CreateDatabaseServiceRequest,
)
from metadata.generated.schema.api.services.createMessagingService import (
CreateMessagingServiceRequest,
)
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import Column, Constraint, Table
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.tagLabel import TagLabel
from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import Source, SourceStatus
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.models.table_metadata import Chart, Dashboard
from metadata.ingestion.ometa.client import APIError
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.sql_source import SQLConnectionConfig
logger: logging.Logger = logging.getLogger(__name__)
class SampleEntitySourceConfig(SQLConnectionConfig):
no_of_services: int
no_of_databases: int
no_of_tables: int
no_of_columns: int
no_of_dashboards: int
no_of_charts: int
no_of_topics: int
generate_tables: bool = True
generate_dashboards: bool = True
generate_topics: bool = True
def get_connection_url(self):
pass
@dataclass
class SampleEntitySourceStatus(SourceStatus):
success: List[str] = field(default_factory=list)
failures: List[str] = field(default_factory=list)
warnings: List[str] = field(default_factory=list)
def scanned(self, entity_type: str, entity_name: str) -> None:
self.success.append(entity_name)
logger.info("{} Scanned: {}".format(entity_type, entity_name))
def filtered(self, entity_type: str, entity_name: str, err: str) -> None:
self.warnings.append(entity_name)
logger.warning("Dropped {} {} due to {}".format(entity_type, entity_name, err))
class SampleEntitySource(Source[Entity]):
def __init__(
self,
config: SampleEntitySourceConfig,
metadata_config: OpenMetadataConnection,
):
super().__init__()
self.faker = Faker()
self.status = SampleEntitySourceStatus()
self.config = config
self.metadata_config = metadata_config
self.metadata = OpenMetadata(metadata_config)
self.service_name = lambda: self.faker.word()
self.service_type = lambda: random.choice(
["BigQuery", "Hive", "MSSQL", "MySQL", "Postgres", "Redshift", "Snowflake"]
)
self.database_name = lambda: self.faker.word()
self.table_name = lambda: self.faker.word()
self.column_name = lambda: self.faker.word()
self.description = lambda: self.faker.text()
self.chart_ids = lambda: self.faker.random_int()
self.tags = self.__get_tags()
self.tagFQN = lambda: self.faker.first_name()
self.labelType = lambda: random.choice(
["Automated", "Derived", "Manual", "Propagated"]
)
self.state = lambda: random.choice(["Confirmed", "Suggested"])
self.href = lambda: self.faker.url()
self.col_type = lambda: random.choice(["INT", "STRING", "VARCHAR", "DATE"])
self.chart_type = lambda: random.choice(
["Area", "Line", "Table", "Bar", "Pie", "Histogram", "Scatter", "Text"]
)
self.col_constraint = lambda: random.choice(
[Constraint.UNIQUE, Constraint.NOT_NULL, Constraint.NULL]
)
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
config = SampleEntitySourceConfig.parse_obj(config_dict)
return cls(config, metadata_config)
def prepare(self):
pass
def __get_tags(self) -> {}:
return self.metadata.list_tags_by_category("user")
def scan(self, text):
types = set()
for pii_type in self.regex:
if self.regex[pii_type].match(text) is not None:
types.add(pii_type.name)
logging.debug("PiiTypes are %s", ",".join(str(x) for x in list(types)))
return list(types)
def next_record(self) -> Iterable[Entity]:
if self.config.generate_tables:
yield from self.ingest_tables()
if self.config.generate_dashboards:
yield from self.ingest_dashboards()
if self.config.generate_topics:
yield from self.ingest_topics()
def ingest_tables(self) -> Iterable[OMetaDatabaseAndTable]:
for h in range(self.config.no_of_services):
service = {
"databaseConnection": {
"hostPort": f"localhost",
"username": "sample_user",
"password": "sample_password",
},
"name": self.service_name(),
"description": self.description(),
"serviceType": self.service_type(),
}
create_service = None
while True:
try:
create_service = self.metadata.create_or_update(
CreateDatabaseServiceRequest(**service)
)
break
except APIError as err:
continue
logger.info(
"Ingesting service {}/{}".format(h + 1, self.config.no_of_services)
)
for i in range(self.config.no_of_databases):
db = Database(
id=uuid.uuid4(),
name=self.database_name().replace(".", "_"),
description=self.description(),
service=EntityReference(
id=create_service.id, type=self.config.service_type
),
)
logger.info(
"Ingesting database {}/{} in service: {}/{}".format(
i + 1,
self.config.no_of_databases,
h + 1,
self.config.no_of_services,
)
)
for j in range(self.config.no_of_tables):
table_columns = []
table_entity = Table(
id=uuid.uuid4(),
name=self.table_name().replace(".", "_"),
tableType="Regular",
description=self.description(),
columns=table_columns,
)
row_order = 0
for t in range(self.config.no_of_columns):
tag_labels = []
tag_entity = random.choice(self.tags)
tag_labels.append(
TagLabel(
tagFQN=tag_entity.fullyQualifiedName,
labelType="Automated",
state="Suggested",
href=tag_entity.href,
)
)
table_columns.append(
Column(
name=self.column_name(),
description=self.description(),
dataType=self.col_type(),
constraint=self.col_constraint(),
dataLength=100,
ordinalPosition=row_order,
tags=tag_labels,
)
)
table_entity.columns = table_columns
row_order = row_order + 1
table_and_db = OMetaDatabaseAndTable(
table=table_entity, database=db
)
yield table_and_db
def ingest_dashboards(self) -> Iterable[Dashboard]:
for h in range(self.config.no_of_services):
create_service = None
while True:
try:
service = {
"name": self.service_name(),
"description": self.description(),
"dashboardUrl": "http://localhost:8088",
"userName": "admin",
"password": "admin",
"serviceType": "Superset",
}
create_service = self.metadata.create_or_update(
CreateDashboardServiceRequest(**service)
)
break
except APIError:
continue
logger.info(
"Ingesting service {}/{}".format(h + 1, self.config.no_of_services)
)
for i in range(self.config.no_of_dashboards):
logger.info(
"Ingesting dashboard {}/{} in service: {}/{}".format(
i + 1,
self.config.no_of_databases,
h + 1,
self.config.no_of_services,
)
)
chart_ids = []
for j in range(self.config.no_of_charts):
chart_id = self.chart_ids()
chart_entity = Chart(
id=uuid.uuid4(),
name=str(chart_id),
displayName=self.table_name(),
description=self.description(),
chart_type=self.chart_type(),
chartId=str(chart_id),
url="http://superset:8080/chartUrl",
service=EntityReference(
id=create_service.id, type="dashboardService"
),
)
chart_ids.append(str(chart_id))
yield chart_entity
dashboard = Dashboard(
id=uuid.uuid4(),
name=str(self.chart_ids()),
displayName=self.table_name(),
description=self.description(),
url="http://superset:8080/dashboardUrl",
charts=chart_ids,
service=EntityReference(
id=create_service.id, type="dashboardService"
),
)
yield dashboard
def ingest_topics(self) -> Iterable[CreateTopicRequest]:
for h in range(self.config.no_of_services):
create_service = None
while True:
try:
service = {
"name": self.service_name(),
"description": self.description(),
"brokers": ["localhost:9092"],
"schemaRegistry": "http://localhost:8081",
"serviceType": "Kafka",
}
create_service = self.metadata.create_or_update(
CreateMessagingServiceRequest(**service)
)
break
except APIError:
continue
logger.info(
"Ingesting service {}/{}".format(h + 1, self.config.no_of_services)
)
for j in range(self.config.no_of_topics):
topic_entity = CreateTopicRequest(
name=self.table_name(),
description=self.description(),
partitions=self.chart_ids(),
retentionSize=322122382273,
replicationFactor=2,
maximumMessageSize=167,
cleanupPolicies=["delete"],
schemaType="Avro",
schemaText='{"namespace":"org.open-metadata.kafka","name":"Customer","type":"record","fields":[{"name":"id","type":"string"},{"name":"first_name","type":"string"},{"name":"last_name","type":"string"},{"name":"email","type":"string"},{"name":"address_line_1","type":"string"},{"name":"address_line_2","type":"string"},{"name":"post_code","type":"string"},{"name":"country","type":"string"}]}',
service=EntityReference(
id=create_service.id, type="messagingService"
),
)
yield topic_entity
def close(self):
pass
def get_status(self):
return self.status
def test_connection(self) -> None:
pass