Fix #401: Merge sample data generation into single connector (#402)

* Fix #401: Merge sample data generation into single connector

* Path for datasets modified

Co-authored-by: Ayush Shah <ayush@getcollate.io>
This commit is contained in:
Sriharsha Chintalapani 2021-09-05 10:05:02 -07:00 committed by GitHub
parent 13c6342c90
commit d0dbcc19b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 783 additions and 10 deletions

View File

@ -0,0 +1,98 @@
{
"charts": [
{
"id": "2841fdb1-e378-4a2c-94f8-27c9f5d6ef8e",
"name": "101",
"displayName": "# of Games That Hit 100k in Sales By Release Year",
"fullyQualifiedName": "local_superset.# of Games That Hit 100k in Sales By Release Year",
"description": "",
"chartId": "114",
"chartType": "Area",
"chartUrl": "http://localhost:8088/superset/explore/?form_data=%7B%22slice_id%22%3A%20114%7D",
"href": "http://localhost:8585/api/v1/charts/2841fdb1-e378-4a2c-94f8-27c9f5d6ef8e"
}, {
"id": "3bcba490-9e5c-4946-a0e3-41e8ff8f4aa4",
"name":"110",
"displayName": "% Rural",
"fullyQualifiedName": "local_superset.% Rural",
"description": "",
"chartId": "166",
"chartType": "Other",
"chartUrl": "http://localhost:8088/superset/explore/?form_data=%7B%22slice_id%22%3A%20166%7D",
"href": "http://localhost:8585/api/v1/charts/3bcba490-9e5c-4946-a0e3-41e8ff8f4aa4"
}, {
"id": "22b95748-4a7b-48ad-859e-cf7c66a7f343",
"name": "92",
"displayName": "✈️ Relocation ability",
"description": "",
"chartId": "92",
"chartType": "Other",
"chartUrl": "http://localhost:8088/superset/explore/?form_data=%7B%22slice_id%22%3A%2092%7D",
"href": "http://localhost:8585/api/v1/charts/22b95748-4a7b-48ad-859e-cf7c66a7f343"
}, {
"id": "62b31dcc-4619-46a0-99b1-0fa7cd6f93da",
"name": "11y",
"displayName": "Age distribution of respondents",
"fullyQualifiedName": "local_superset.Age distribution of respondents",
"description": "",
"chartId": "117",
"chartType": "Histogram",
"chartUrl": "http://localhost:8088/superset/explore/?form_data=%7B%22slice_id%22%3A%20117%7D",
"href": "http://localhost:8585/api/v1/charts/62b31dcc-4619-46a0-99b1-0fa7cd6f93da"
}, {
"id": "57944482-e187-439a-aaae-0e8aabd2f455",
"displayName": "Arcs",
"fullyQualifiedName": "local_superset.Arcs",
"description": "",
"name": "197",
"chartType": "Other",
"chartUrl": "http://localhost:8088/superset/explore/?form_data=%7B%22slice_id%22%3A%20197%7D",
"href": "http://localhost:8585/api/v1/charts/57944482-e187-439a-aaae-0e8aabd2f455"
}, {
"id": "d88e2056-c74a-410d-829e-eb31b040c132",
"displayName": "Are you an ethnic minority in your city?",
"fullyQualifiedName": "local_superset.Are you an ethnic minority in your city?",
"description": "",
"name": "127",
"chartType": "Other",
"chartUrl": "http://localhost:8088/superset/explore/?form_data=%7B%22slice_id%22%3A%20127%7D",
"href": "http://localhost:8585/api/v1/charts/d88e2056-c74a-410d-829e-eb31b040c132"
}, {
"id": "c1d3e156-4628-414e-8d6e-a6bdd486128f",
"displayName": "Average and Sum Trends",
"fullyQualifiedName": "local_superset.Average and Sum Trends",
"description": "",
"name": "183",
"chartType": "Line",
"chartUrl": "http://localhost:8088/superset/explore/?form_data=%7B%22slice_id%22%3A%20183%7D",
"href": "http://localhost:8585/api/v1/charts/c1d3e156-4628-414e-8d6e-a6bdd486128f"
}, {
"id": "bfc57519-8cef-47e6-a423-375d5b89a6a4",
"displayName": "Birth in France by department in 2016",
"fullyQualifiedName": "local_superset.Birth in France by department in 2016",
"description": "",
"name": "161",
"chartType": "Other",
"chartUrl": "http://localhost:8088/superset/explore/?form_data=%7B%22slice_id%22%3A%20161%7D",
"href": "http://localhost:8585/api/v1/charts/bfc57519-8cef-47e6-a423-375d5b89a6a4"
}, {
"id": "bf2eeac4-7226-46c6-bbef-918569c137a0",
"displayName": "Box plot",
"fullyQualifiedName": "local_superset.Box plot",
"description": "",
"name": "170",
"chartType": "Bar",
"chartUrl": "http://localhost:8088/superset/explore/?form_data=%7B%22slice_id%22%3A%20170%7D",
"href": "http://localhost:8585/api/v1/charts/bf2eeac4-7226-46c6-bbef-918569c137a0"
}, {
"id": "167fd63b-42f1-4d7e-a37d-893fd8173b44",
"displayName": "Boy Name Cloud",
"fullyQualifiedName": "local_superset.Boy Name Cloud",
"description": "",
"name": "180",
"chartType": "Other",
"chartUrl": "http://localhost:8088/superset/explore/?form_data=%7B%22slice_id%22%3A%20180%7D",
"href": "http://localhost:8585/api/v1/charts/167fd63b-42f1-4d7e-a37d-893fd8173b44"
}
]
}

View File

@ -0,0 +1,104 @@
{
"dashboards": [
{
"id": "d4dc7baf-1b17-45f8-acd5-a15b78cc7c5f",
"name": "8",
"displayName": "[ untitled dashboard ]",
"fullyQualifiedName": "local_superset.[ untitled dashboard ]",
"description": "",
"dashboardUrl": "http://localhost:808/superset/dashboard/1/",
"charts": [183, 170, 197],
"href": "http://localhost:8585/api/v1/dashboards/d4dc7baf-1b17-45f8-acd5-a15b78cc7c5f"
},
{
"id": "063cd787-8630-4809-9702-34d3992c7248",
"name": "9",
"displayName": "COVID Vaccine Dashboard",
"fullyQualifiedName": "local_superset.COVID Vaccine Dashboard",
"description": "",
"dashboardUrl": "http://localhost:808/superset/dashboard/8/",
"charts": [117, 197],
"href": "http://localhost:8585/api/v1/dashboards/063cd787-8630-4809-9702-34d3992c7248"
},
{
"id": "df6c698e-066a-4440-be0a-121025573b73",
"name": "10",
"displayName": "deck.gl Demo",
"fullyQualifiedName": "local_superset.deck.gl Demo",
"description": "",
"dashboardUrl": "http://localhost:808/superset/dashboard/deck/",
"charts": [127, 166, 114],
"href": "http://localhost:8585/api/v1/dashboards/df6c698e-066a-4440-be0a-121025573b73"
},
{
"id": "98b38a49-b5c6-431b-b61f-690e39f8ead2",
"name": "11",
"displayName": "FCC New Coder Survey 2018",
"fullyQualifiedName": "local_superset.FCC New Coder Survey 2018",
"description": "",
"dashboardUrl": "http://localhost:808/superset/dashboard/7/",
"charts": [183, 197, 170, 180],
"href": "http://localhost:8585/api/v1/dashboards/98b38a49-b5c6-431b-b61f-690e39f8ead2"
},
{
"id": "dffcf9b2-4f43-4881-a5f5-10109655bf50",
"name": "12",
"displayName": "Misc Charts",
"fullyQualifiedName": "local_superset.Misc Charts",
"description": "",
"dashboardUrl": "http://localhost:808/superset/dashboard/misc_charts/",
"charts": [127, 197],
"href": "http://localhost:8585/api/v1/dashboards/dffcf9b2-4f43-4881-a5f5-10109655bf50"
},
{
"id": "2583737d-6236-421e-ba0f-cd0b79adb216",
"name": "31",
"displayName": "Sales Dashboard",
"fullyQualifiedName": "local_superset.Sales Dashboard",
"description": "",
"dashboardUrl": "http://localhost:808/superset/dashboard/6/",
"charts": [92,117,166],
"href": "http://localhost:8585/api/v1/dashboards/2583737d-6236-421e-ba0f-cd0b79adb216"
},
{
"id": "6bf9bfcb-4e80-4af0-9f0c-13e47bbc27a2",
"name": "33",
"displayName": "Slack Dashboard",
"fullyQualifiedName": "local_superset.Slack Dashboard",
"description": "",
"dashboardUrl": "http://localhost:808/superset/dashboard/10/",
"charts": [114, 92, 127],
"href": "http://localhost:8585/api/v1/dashboards/6bf9bfcb-4e80-4af0-9f0c-13e47bbc27a2"
},
{
"id": "1f02caf2-c5e5-442d-bda3-b8ce3e757b45",
"name": "34",
"displayName": "Unicode Test",
"fullyQualifiedName": "local_superset.Unicode Test",
"description": "",
"dashboardUrl": "http://localhost:808/superset/dashboard/unicode-test/",
"charts": [161, 170, 180],
"href": "http://localhost:8585/api/v1/dashboards/1f02caf2-c5e5-442d-bda3-b8ce3e757b45"
},
{
"id": "a3ace318-ee37-4da1-974a-62eddbd77d20",
"name": "45",
"displayName": "USA Births Names",
"fullyQualifiedName": "local_superset.USA Births Names",
"description": "",
"dashboardUrl": "http://localhost:808/superset/dashboard/births/",
"charts": [180],
"href": "http://localhost:8585/api/v1/dashboards/a3ace318-ee37-4da1-974a-62eddbd77d20"
},
{
"id": "e6e21717-1164-403f-8807-d12be277aec6",
"name": "51",
"displayName": "Video Game Sales",
"fullyQualifiedName": "local_superset.Video Game Sales",
"description": "",
"dashboardUrl": "http://localhost:808/superset/dashboard/11/",
"charts": [127, 183],
"href": "http://localhost:8585/api/v1/dashboards/e6e21717-1164-403f-8807-d12be277aec6"
}
]
}

View File

@ -0,0 +1,8 @@
{
"name": "sample_superset",
"serviceType": "Superset",
"description": "Supset Service",
"dashboardUrl": "http://localhost:8088",
"username": "admin",
"password": "admin"
}

View File

@ -1,9 +1,8 @@
{
"id": "a6fb4f54-ba3d-4a16-97f0-766713199141",
"name": "sample_superset",
"serviceType": "Superset",
"description": "Supset Service",
"dashboardUrl": "http://localhost:8088",
"username": "admin",
"password": "admin"
"dashboardUrl": "http://localhost:8088",
"username": "admin",
"password": "admin"
}

View File

@ -7,7 +7,7 @@
}
},
"sink": {
"type": "metadata-rest-dashboards",
"type": "metadata-rest",
"config": {}
},
"metadata_server": {

View File

@ -0,0 +1,26 @@
{
"source": {
"type": "sample-data",
"config": {
"sample_data_folder": "./examples/sample_data"
}
},
"sink": {
"type": "metadata-rest",
"config": {}
},
"metadata_server": {
"type": "metadata-server",
"config": {
"api_endpoint": "http://localhost:8585/api",
"auth_provider_type": "no-auth"
}
},
"cron": {
"minute": "*/5",
"hour": null,
"day": null,
"month": null,
"day_of_week": null
}
}

View File

@ -4,7 +4,7 @@
"config": {
"database": "warehouse",
"service_name": "gcp_bigquery",
"sample_schema_folder": "./examples/shopify_schemas/"
"sample_schema_folder": "./examples/sample_data/datasets"
}
},
"processor": {
@ -12,7 +12,7 @@
"config": {}
},
"sink": {
"type": "metadata-rest-tables",
"type": "metadata-rest",
"config": {}
},
"metadata_server": {

View File

@ -3,11 +3,11 @@
"type": "sample-topics",
"config": {
"service_name": "sample_kafka",
"sample_schema_folder": "./examples/kafka_schemas/"
"sample_schema_folder": "./examples/sample_data/topics"
}
},
"sink": {
"type": "metadata-rest-topics",
"type": "metadata-rest",
"config": {}
},
"metadata_server": {

View File

@ -4,7 +4,7 @@
"config": {
"database": "warehouse",
"service_name": "gcp_bigquery",
"sample_schema_folder": "./examples/shopify_schemas/"
"sample_schema_folder": "./examples/sample_data/datasets"
}
},
"processor": {

View File

@ -95,6 +95,7 @@ plugins: Dict[str, Set[str]] = {
"snowflake-usage": {"snowflake-sqlalchemy<=1.2.4"},
"sample-tables": {"faker~=8.1.1", },
"sample-topics": {},
"sample-data": {"faker~=8.1.1",},
"superset": {}
}

View File

@ -0,0 +1,188 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from pydantic import ValidationError
from metadata.config.common import ConfigModel
from metadata.generated.schema.api.data.createChart import CreateChartEntityRequest
from metadata.generated.schema.api.data.createDashboard import CreateDashboardEntityRequest
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseEntityRequest
from metadata.generated.schema.api.data.createTable import CreateTableEntityRequest
from metadata.generated.schema.api.data.createTopic import CreateTopic
from metadata.generated.schema.entity.data.chart import ChartType
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import WorkflowContext, Record
from metadata.ingestion.api.sink import Sink, SinkStatus
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.models.table_metadata import Chart, Dashboard
from metadata.ingestion.ometa.client import APIError
from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient, MetadataServerConfig
logger = logging.getLogger(__name__)
om_chart_type_dict = {
"line": ChartType.Line,
"table": ChartType.Table,
"dist_bar": ChartType.Bar,
"bar": ChartType.Bar,
"big_number": ChartType.Line,
"histogram": ChartType.Histogram,
"big_number_total": ChartType.Line,
"dual_line": ChartType.Line,
"line_multi": ChartType.Line,
"treemap": ChartType.Area,
"box_plot": ChartType.Bar
}
class MetadataRestSinkConfig(ConfigModel):
api_endpoint: str = None
class MetadataRestSink(Sink):
config: MetadataRestSinkConfig
status: SinkStatus
def __init__(self, ctx: WorkflowContext, config: MetadataRestSinkConfig, metadata_config: MetadataServerConfig):
super().__init__(ctx)
self.config = config
self.metadata_config = metadata_config
self.status = SinkStatus()
self.wrote_something = False
self.charts_dict = {}
self.client = OpenMetadataAPIClient(self.metadata_config)
@classmethod
def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext):
config = MetadataRestSinkConfig.parse_obj(config_dict)
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
return cls(ctx, config, metadata_config)
def write_record(self, record: Record) -> None:
if isinstance(record, OMetaDatabaseAndTable):
self.write_tables(record)
elif isinstance(record, CreateTopic):
self.write_topics(record)
elif isinstance(record, Chart):
self.write_charts(record)
elif isinstance(record, Dashboard):
self.write_dashboards(record)
else:
logging.info("Ignoring the record due to unknown Record type {}".format(type(record)))
def write_tables(self, table_and_db: OMetaDatabaseAndTable):
try:
db_request = CreateDatabaseEntityRequest(name=table_and_db.database.name,
description=table_and_db.database.description,
service=EntityReference(id=table_and_db.database.service.id,
type="databaseService"))
db = self.client.create_database(db_request)
table_request = CreateTableEntityRequest(name=table_and_db.table.name,
tableType=table_and_db.table.tableType,
columns=table_and_db.table.columns,
description=table_and_db.table.description,
database=db.id)
if table_and_db.table.viewDefinition is not None and table_and_db.table.viewDefinition != "":
table_request.viewDefinition = table_and_db.table.viewDefinition.__root__
created_table = self.client.create_or_update_table(table_request)
if table_and_db.table.sampleData is not None:
self.client.ingest_sample_data(id=created_table.id, sample_data=table_and_db.table.sampleData)
logger.info(
'Successfully ingested table {}.{}'.
format(table_and_db.database.name.__root__, created_table.name.__root__))
self.status.records_written(
'{}.{}'.format(table_and_db.database.name.__root__, created_table.name.__root__))
except (APIError, ValidationError) as err:
logger.error(
"Failed to ingest table {} in database {} ".format(table_and_db.table.name.__root__,
table_and_db.database.name.__root__))
logger.error(err)
self.status.failure(table_and_db.table.name.__root__)
def write_topics(self, topic: CreateTopic) -> None:
try:
created_topic = self.client.create_or_update_topic(topic)
logger.info(
'Successfully ingested topic {}'.format(created_topic.name.__root__))
self.status.records_written(created_topic.name)
except (APIError, ValidationError) as err:
logger.error(
"Failed to ingest topic {} ".format(topic.name.__root__))
logger.error(err)
self.status.failure(topic.name)
def write_charts(self, chart: Chart):
try:
om_chart_type = ChartType.Other
if chart.chart_type is not None and chart.chart_type in om_chart_type_dict.keys():
om_chart_type = om_chart_type_dict[chart.chart_type]
chart_request = CreateChartEntityRequest(
name=chart.name,
displayName=chart.displayName,
description=chart.description,
chartType=om_chart_type,
chartUrl=chart.url,
service=chart.service
)
created_chart = self.client.create_or_update_chart(chart_request)
self.charts_dict[chart.name] = EntityReference(id=created_chart.id, type='chart')
logger.info(
'Successfully ingested chart {}'.format(created_chart.displayName))
self.status.records_written(
'{}'.format(created_chart.displayName))
except (APIError, ValidationError) as err:
logger.error(
"Failed to ingest chart {}".format(chart.displayName))
logger.error(err)
self.status.failure(chart.displayName)
def write_dashboards(self, dashboard: Dashboard):
try:
charts = self._get_chart_references(dashboard)
dashboard_request = CreateDashboardEntityRequest(
name=dashboard.name,
displayName=dashboard.displayName,
description=dashboard.description,
dashboardUrl=dashboard.url,
charts=charts,
service=dashboard.service
)
created_dashboard = self.client.create_or_update_dashboard(dashboard_request)
logger.info('Successfully ingested dashboard {}'.format(created_dashboard.displayName))
self.status.records_written('{}'.format(created_dashboard.displayName))
except (APIError, ValidationError) as err:
logger.error("Failed to ingest dashboard {}".format(dashboard.name))
logger.error(err)
self.status.failure(dashboard.name)
def _get_chart_references(self, dashboard: Dashboard) -> []:
chart_references = []
for chart_id in dashboard.charts:
if chart_id in self.charts_dict.keys():
chart_references.append(self.charts_dict[chart_id])
return chart_references
def get_status(self):
return self.status
def close(self):
pass

View File

@ -0,0 +1,349 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import csv
import pandas as pd
import uuid
import os
import json
from faker import Faker
from collections import namedtuple
from dataclasses import dataclass, field
from typing import Iterable, List, Dict, Any, Union
from pydantic import ValidationError
from metadata.config.common import ConfigModel
from metadata.generated.schema.api.data.createTopic import CreateTopic
from metadata.generated.schema.api.services.createDashboardService import CreateDashboardServiceEntityRequest
from metadata.generated.schema.api.services.createMessagingService import CreateMessagingServiceEntityRequest
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.services.dashboardService import DashboardService
from metadata.generated.schema.entity.services.messagingService import MessagingService
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import Record
from metadata.ingestion.api.source import SourceStatus, Source
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.models.table_metadata import Dashboard, Chart
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.generated.schema.api.services.createDatabaseService import CreateDatabaseServiceEntityRequest
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient
import logging
from metadata.utils.helpers import get_database_service_or_create
logger: logging.Logger = logging.getLogger(__name__)
COLUMN_NAME = 'Column'
KEY_TYPE = 'Key type'
DATA_TYPE = 'Data type'
COL_DESCRIPTION = 'Description'
TableKey = namedtuple('TableKey', ['schema', 'table_name'])
def get_database_service_or_create(service_json, metadata_config) -> DatabaseService:
client = OpenMetadataAPIClient(metadata_config)
service = client.get_database_service(service_json['name'])
if service is not None:
return service
else:
created_service = client.create_database_service(CreateDatabaseServiceEntityRequest(**service_json))
return created_service
def get_messaging_service_or_create(service_json, metadata_config) -> MessagingService:
client = OpenMetadataAPIClient(metadata_config)
service = client.get_messaging_service(service_json['name'])
if service is not None:
return service
else:
created_service = client.create_messaging_service(CreateMessagingServiceEntityRequest(**service_json))
return created_service
def get_dashboard_service_or_create(service_json, metadata_config) -> DashboardService:
client = OpenMetadataAPIClient(metadata_config)
service = client.get_dashboard_service(service_json['name'])
if service is not None:
return service
else:
created_service = client.create_dashboard_service(CreateDashboardServiceEntityRequest(**service_json))
return created_service
def get_table_key(row: Dict[str, Any]) -> Union[TableKey, None]:
"""
Table key consists of schema and table name
:param row:
:return:
"""
return TableKey(schema=row['schema'], table_name=row['table_name'])
class SampleDataSourceConfig(ConfigModel):
sample_data_folder: str
service_name: str = "bigquery_gcp"
database: str = "warehouse"
service_type: str = "BigQuery"
scheme = "bigquery+pymysql"
def get_sample_data_folder(self):
return self.sample_data_folder
@dataclass
class SampleDataSourceStatus(SourceStatus):
success: List[str] = field(default_factory=list)
failures: List[str] = field(default_factory=list)
warnings: List[str] = field(default_factory=list)
def scanned(self, entity_type: str, entity_name: str) -> None:
self.success.append(entity_name)
logger.info('{} Scanned: {}'.format(entity_type, entity_name))
def filtered(self, entity_type: str, entity_name: str, err: str) -> None:
self.warnings.append(entity_name)
logger.warning("Dropped {} {} due to {}".format(entity_type, entity_name, err))
class TableSchema:
def __init__(self, filename):
# error if the file is not csv file
if not filename.endswith('.csv'):
raise Exception('Input file should be a csv file')
# file name is assumed to be the table name
basename = os.path.basename(filename)
self.table_name = os.path.splitext(basename)[0]
with open(filename, 'r') as fin:
self.columns = [dict(i) for i in csv.DictReader(fin)]
def primary_keys(self):
return [c[COLUMN_NAME] for c in self.columns if c[KEY_TYPE] == 'PK']
def foreign_keys(self):
return [c[COLUMN_NAME] for c in self.columns if c[KEY_TYPE] == 'FK']
def get_name(self):
return self.table_name
def get_schema(self):
return self.columns
def get_column_names(self):
return [c[COLUMN_NAME] for c in self.columns]
class SampleTableMetadataGenerator:
def __init__(self, table_to_df_dict, table_to_schema_map):
self.table_to_df_dict = table_to_df_dict
self.table_to_schema_map = table_to_schema_map
self.sample_user = None
self.sample_table = None
self.sample_table_owner = None
self.sample_table_last_updated = None
def get_empty_dict_with_cols(self, columns):
data = {}
for c in columns:
data[c] = []
return data
def generate_sample_table(self):
keys = ['database', 'cluster', 'schema', 'name', 'description', 'tags', 'is_view', 'description_source']
data = self.get_empty_dict_with_cols(keys)
for tname in self.table_to_df_dict.keys():
data['database'].append('hive')
data['cluster'].append('gold')
data['schema'].append('gdw')
data['name'].append(tname)
data['description'].append('this is the table to hold data on ' + tname)
data['tags'].append('pii')
data['is_view'].append('false')
data['description_source'].append('')
sample_table = pd.DataFrame(data)
table_dict = {}
for index, row in sample_table.iterrows():
table_dict[row['name']] = row
return table_dict
def generate_sample_col(self):
# name, description, col_type, sort_order, database, cluster, schema, table_name
# col1, "col1 description", "string", 1, hive, gold, test_schema, test_table1
keys = ['name', 'description', 'col_type', 'sort_order', 'database', 'cluster', 'schema', 'table_name']
data = self.get_empty_dict_with_cols(keys)
for (tname, df) in self.table_to_df_dict.items():
tschema = self.table_to_schema_map[tname].get_schema()
for col in df.columns:
data['name'].append(col)
for c in tschema:
if c[COLUMN_NAME] is col:
data['description'].append(c[COL_DESCRIPTION])
data['col_type'].append(c[DATA_TYPE])
break
data['sort_order'].append(1)
data['cluster'].append('gold')
data['database'].append('hive')
data['schema'].append('dwh')
data['table_name'].append(tname)
pd_rows = pd.DataFrame(data)
row_dict = []
for index, row in pd_rows.iterrows():
row_dict.append(row)
sorted_row_dict = sorted(row_dict, key=get_table_key)
return sorted_row_dict
class GenerateFakeSampleData:
def __init__(self) -> None:
pass
@classmethod
def check_columns(self, columns):
fake = Faker()
colData = []
colList = [column['name'] for column in columns]
for i in range(25):
row = []
for column in columns:
col_name = column['name']
value = None
if "id" in col_name:
value = uuid.uuid4()
elif "price" in col_name:
value = fake.pricetag()
elif "barcode" in col_name:
value = fake.ean(length=13)
elif "phone" in col_name:
value = fake.phone_number()
elif "zip" in col_name:
value = fake.postcode()
elif "address" in col_name:
value = fake.street_address()
elif "company" in col_name:
value = fake.company()
elif "region" in col_name:
value = fake.street_address()
elif "name" in col_name:
value = fake.first_name()
elif "city" in col_name:
value = fake.city()
elif "country" in col_name:
value = fake.country()
if value is None:
if "TIMESTAMP" in column['columnDataType'] or "date" in col_name:
value = fake.unix_time()
elif "BOOLEAN" in column['columnDataType']:
value = fake.pybool()
elif "NUMERIC" in column['columnDataType']:
value = fake.pyint()
elif "VARCHAR" in column['columnDataType']:
value = fake.text(max_nb_chars=20)
else:
value = None
row.append(value)
colData.append(row)
return {"columns": colList, "rows": colData}
class SampleDataSource(Source):
def __init__(self, config: SampleDataSourceConfig, metadata_config: MetadataServerConfig, ctx):
super().__init__(ctx)
self.status = SampleDataSourceStatus()
self.config = config
self.metadata_config = metadata_config
self.client = OpenMetadataAPIClient(metadata_config)
self.database_service_json = json.load(open(self.config.sample_data_folder + "/datasets/service.json", 'r'))
self.database = json.load(open(self.config.sample_data_folder + "/datasets/database.json", 'r'))
self.tables = json.load(open(self.config.sample_data_folder + "/datasets/tables.json", 'r'))
self.database_service = get_database_service_or_create(self.database_service_json, self.metadata_config)
self.kafka_service_json = json.load(open(self.config.sample_data_folder + "/topics/service.json", 'r'))
self.topics = json.load(open(self.config.sample_data_folder + "/topics/topics.json", 'r'))
self.kafka_service = get_messaging_service_or_create(self.kafka_service_json, self.metadata_config)
self.dashboard_service_json = json.load(open(self.config.sample_data_folder + "/dashboards/service.json", 'r'))
self.charts = json.load(open(self.config.sample_data_folder + "/dashboards/charts.json", 'r'))
self.dashboards = json.load(open(self.config.sample_data_folder + "/dashboards/dashboards.json", 'r'))
self.dashboard_service = get_dashboard_service_or_create(self.dashboard_service_json, metadata_config)
@classmethod
def create(cls, config_dict, metadata_config_dict, ctx):
config = SampleDataSourceConfig.parse_obj(config_dict)
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
return cls(config, metadata_config, ctx)
def prepare(self):
pass
def next_record(self) -> Iterable[Record]:
yield from self.ingest_tables()
yield from self.ingest_topics()
yield from self.ingest_charts()
yield from self.ingest_dashboards()
def ingest_tables(self) -> Iterable[OMetaDatabaseAndTable]:
db = Database(id=uuid.uuid4(),
name=self.database['name'],
description=self.database['description'],
service=EntityReference(id=self.database_service.id, type=self.config.service_type))
for table in self.tables['tables']:
if not table.get('sampleData'):
table['sampleData'] = GenerateFakeSampleData.check_columns(table['columns'])
table_metadata = Table(**table)
table_and_db = OMetaDatabaseAndTable(table=table_metadata, database=db)
self.status.scanned("table", table_metadata.name.__root__)
yield table_and_db
def ingest_topics(self) -> Iterable[CreateTopic]:
for topic in self.topics['topics']:
topic['service'] = EntityReference(id=self.kafka_service.id, type="messagingService")
create_topic = CreateTopic(**topic)
self.status.scanned("topic", create_topic.name.__root__)
yield create_topic
def ingest_charts(self) -> Iterable[Chart]:
for chart in self.charts['charts']:
try:
chart_ev = Chart(name=chart['name'],
displayName=chart['displayName'],
description=chart['description'],
chart_type=chart['chartType'],
url=chart['chartUrl'],
service=EntityReference(id=self.dashboard_service.id, type="dashboardService"))
self.status.scanned("chart", chart_ev.name)
yield chart_ev
except ValidationError as err:
logger.error(err)
def ingest_dashboards(self) -> Iterable[Dashboard]:
for dashboard in self.dashboards['dashboards']:
dashboard_ev = Dashboard(name=dashboard['name'],
displayName=dashboard['displayName'],
description=dashboard['description'],
url=dashboard['dashboardUrl'],
charts=dashboard['charts'],
service=EntityReference(id=self.dashboard_service.id, type="dashboardService"))
self.status.scanned("dashboard", dashboard_ev.name)
yield dashboard_ev
def close(self):
pass
def get_status(self):
return self.status