diff --git a/ingestion/examples/sample_data/dashboards/charts.json b/ingestion/examples/sample_data/dashboards/charts.json new file mode 100644 index 00000000000..4c441141984 --- /dev/null +++ b/ingestion/examples/sample_data/dashboards/charts.json @@ -0,0 +1,98 @@ +{ + "charts": [ + { + "id": "2841fdb1-e378-4a2c-94f8-27c9f5d6ef8e", + "name": "101", + "displayName": "# of Games That Hit 100k in Sales By Release Year", + "fullyQualifiedName": "local_superset.# of Games That Hit 100k in Sales By Release Year", + "description": "", + "chartId": "114", + "chartType": "Area", + "chartUrl": "http://localhost:8088/superset/explore/?form_data=%7B%22slice_id%22%3A%20114%7D", + "href": "http://localhost:8585/api/v1/charts/2841fdb1-e378-4a2c-94f8-27c9f5d6ef8e" + }, { + "id": "3bcba490-9e5c-4946-a0e3-41e8ff8f4aa4", + "name":"110", + "displayName": "% Rural", + "fullyQualifiedName": "local_superset.% Rural", + "description": "", + "chartId": "166", + "chartType": "Other", + "chartUrl": "http://localhost:8088/superset/explore/?form_data=%7B%22slice_id%22%3A%20166%7D", + "href": "http://localhost:8585/api/v1/charts/3bcba490-9e5c-4946-a0e3-41e8ff8f4aa4" + }, { + "id": "22b95748-4a7b-48ad-859e-cf7c66a7f343", + "name": "92", + "displayName": "✈️ Relocation ability", + "description": "", + "chartId": "92", + "chartType": "Other", + "chartUrl": "http://localhost:8088/superset/explore/?form_data=%7B%22slice_id%22%3A%2092%7D", + "href": "http://localhost:8585/api/v1/charts/22b95748-4a7b-48ad-859e-cf7c66a7f343" + }, { + "id": "62b31dcc-4619-46a0-99b1-0fa7cd6f93da", + "name": "11y", + "displayName": "Age distribution of respondents", + "fullyQualifiedName": "local_superset.Age distribution of respondents", + "description": "", + "chartId": "117", + "chartType": "Histogram", + "chartUrl": "http://localhost:8088/superset/explore/?form_data=%7B%22slice_id%22%3A%20117%7D", + "href": "http://localhost:8585/api/v1/charts/62b31dcc-4619-46a0-99b1-0fa7cd6f93da" + }, { + "id": "57944482-e187-439a-aaae-0e8aabd2f455", + "displayName": "Arcs", + "fullyQualifiedName": "local_superset.Arcs", + "description": "", + "name": "197", + "chartType": "Other", + "chartUrl": "http://localhost:8088/superset/explore/?form_data=%7B%22slice_id%22%3A%20197%7D", + "href": "http://localhost:8585/api/v1/charts/57944482-e187-439a-aaae-0e8aabd2f455" + }, { + "id": "d88e2056-c74a-410d-829e-eb31b040c132", + "displayName": "Are you an ethnic minority in your city?", + "fullyQualifiedName": "local_superset.Are you an ethnic minority in your city?", + "description": "", + "name": "127", + "chartType": "Other", + "chartUrl": "http://localhost:8088/superset/explore/?form_data=%7B%22slice_id%22%3A%20127%7D", + "href": "http://localhost:8585/api/v1/charts/d88e2056-c74a-410d-829e-eb31b040c132" + }, { + "id": "c1d3e156-4628-414e-8d6e-a6bdd486128f", + "displayName": "Average and Sum Trends", + "fullyQualifiedName": "local_superset.Average and Sum Trends", + "description": "", + "name": "183", + "chartType": "Line", + "chartUrl": "http://localhost:8088/superset/explore/?form_data=%7B%22slice_id%22%3A%20183%7D", + "href": "http://localhost:8585/api/v1/charts/c1d3e156-4628-414e-8d6e-a6bdd486128f" + }, { + "id": "bfc57519-8cef-47e6-a423-375d5b89a6a4", + "displayName": "Birth in France by department in 2016", + "fullyQualifiedName": "local_superset.Birth in France by department in 2016", + "description": "", + "name": "161", + "chartType": "Other", + "chartUrl": "http://localhost:8088/superset/explore/?form_data=%7B%22slice_id%22%3A%20161%7D", + "href": "http://localhost:8585/api/v1/charts/bfc57519-8cef-47e6-a423-375d5b89a6a4" + }, { + "id": "bf2eeac4-7226-46c6-bbef-918569c137a0", + "displayName": "Box plot", + "fullyQualifiedName": "local_superset.Box plot", + "description": "", + "name": "170", + "chartType": "Bar", + "chartUrl": "http://localhost:8088/superset/explore/?form_data=%7B%22slice_id%22%3A%20170%7D", + "href": "http://localhost:8585/api/v1/charts/bf2eeac4-7226-46c6-bbef-918569c137a0" + }, { + "id": "167fd63b-42f1-4d7e-a37d-893fd8173b44", + "displayName": "Boy Name Cloud", + "fullyQualifiedName": "local_superset.Boy Name Cloud", + "description": "", + "name": "180", + "chartType": "Other", + "chartUrl": "http://localhost:8088/superset/explore/?form_data=%7B%22slice_id%22%3A%20180%7D", + "href": "http://localhost:8585/api/v1/charts/167fd63b-42f1-4d7e-a37d-893fd8173b44" + } +] +} diff --git a/ingestion/examples/sample_data/dashboards/dashboards.json b/ingestion/examples/sample_data/dashboards/dashboards.json new file mode 100644 index 00000000000..933d946d95b --- /dev/null +++ b/ingestion/examples/sample_data/dashboards/dashboards.json @@ -0,0 +1,104 @@ +{ + "dashboards": [ + { + "id": "d4dc7baf-1b17-45f8-acd5-a15b78cc7c5f", + "name": "8", + "displayName": "[ untitled dashboard ]", + "fullyQualifiedName": "local_superset.[ untitled dashboard ]", + "description": "", + "dashboardUrl": "http://localhost:808/superset/dashboard/1/", + "charts": [183, 170, 197], + "href": "http://localhost:8585/api/v1/dashboards/d4dc7baf-1b17-45f8-acd5-a15b78cc7c5f" + }, + { + "id": "063cd787-8630-4809-9702-34d3992c7248", + "name": "9", + "displayName": "COVID Vaccine Dashboard", + "fullyQualifiedName": "local_superset.COVID Vaccine Dashboard", + "description": "", + "dashboardUrl": "http://localhost:808/superset/dashboard/8/", + "charts": [117, 197], + "href": "http://localhost:8585/api/v1/dashboards/063cd787-8630-4809-9702-34d3992c7248" + }, + { + "id": "df6c698e-066a-4440-be0a-121025573b73", + "name": "10", + "displayName": "deck.gl Demo", + "fullyQualifiedName": "local_superset.deck.gl Demo", + "description": "", + "dashboardUrl": "http://localhost:808/superset/dashboard/deck/", + "charts": [127, 166, 114], + "href": "http://localhost:8585/api/v1/dashboards/df6c698e-066a-4440-be0a-121025573b73" + }, + { + "id": "98b38a49-b5c6-431b-b61f-690e39f8ead2", + "name": "11", + "displayName": "FCC New Coder Survey 2018", + "fullyQualifiedName": "local_superset.FCC New Coder Survey 2018", + "description": "", + "dashboardUrl": "http://localhost:808/superset/dashboard/7/", + "charts": [183, 197, 170, 180], + "href": "http://localhost:8585/api/v1/dashboards/98b38a49-b5c6-431b-b61f-690e39f8ead2" + }, + { + "id": "dffcf9b2-4f43-4881-a5f5-10109655bf50", + "name": "12", + "displayName": "Misc Charts", + "fullyQualifiedName": "local_superset.Misc Charts", + "description": "", + "dashboardUrl": "http://localhost:808/superset/dashboard/misc_charts/", + "charts": [127, 197], + "href": "http://localhost:8585/api/v1/dashboards/dffcf9b2-4f43-4881-a5f5-10109655bf50" + }, + { + "id": "2583737d-6236-421e-ba0f-cd0b79adb216", + "name": "31", + "displayName": "Sales Dashboard", + "fullyQualifiedName": "local_superset.Sales Dashboard", + "description": "", + "dashboardUrl": "http://localhost:808/superset/dashboard/6/", + "charts": [92,117,166], + "href": "http://localhost:8585/api/v1/dashboards/2583737d-6236-421e-ba0f-cd0b79adb216" + }, + { + "id": "6bf9bfcb-4e80-4af0-9f0c-13e47bbc27a2", + "name": "33", + "displayName": "Slack Dashboard", + "fullyQualifiedName": "local_superset.Slack Dashboard", + "description": "", + "dashboardUrl": "http://localhost:808/superset/dashboard/10/", + "charts": [114, 92, 127], + "href": "http://localhost:8585/api/v1/dashboards/6bf9bfcb-4e80-4af0-9f0c-13e47bbc27a2" + }, + { + "id": "1f02caf2-c5e5-442d-bda3-b8ce3e757b45", + "name": "34", + "displayName": "Unicode Test", + "fullyQualifiedName": "local_superset.Unicode Test", + "description": "", + "dashboardUrl": "http://localhost:808/superset/dashboard/unicode-test/", + "charts": [161, 170, 180], + "href": "http://localhost:8585/api/v1/dashboards/1f02caf2-c5e5-442d-bda3-b8ce3e757b45" + }, + { + "id": "a3ace318-ee37-4da1-974a-62eddbd77d20", + "name": "45", + "displayName": "USA Births Names", + "fullyQualifiedName": "local_superset.USA Births Names", + "description": "", + "dashboardUrl": "http://localhost:808/superset/dashboard/births/", + "charts": [180], + "href": "http://localhost:8585/api/v1/dashboards/a3ace318-ee37-4da1-974a-62eddbd77d20" + }, + { + "id": "e6e21717-1164-403f-8807-d12be277aec6", + "name": "51", + "displayName": "Video Game Sales", + "fullyQualifiedName": "local_superset.Video Game Sales", + "description": "", + "dashboardUrl": "http://localhost:808/superset/dashboard/11/", + "charts": [127, 183], + "href": "http://localhost:8585/api/v1/dashboards/e6e21717-1164-403f-8807-d12be277aec6" + } + ] +} diff --git a/ingestion/examples/sample_data/dashboards/service.json b/ingestion/examples/sample_data/dashboards/service.json new file mode 100644 index 00000000000..81a97872e60 --- /dev/null +++ b/ingestion/examples/sample_data/dashboards/service.json @@ -0,0 +1,8 @@ +{ + "name": "sample_superset", + "serviceType": "Superset", + "description": "Supset Service", + "dashboardUrl": "http://localhost:8088", + "username": "admin", + "password": "admin" +} diff --git a/ingestion/examples/shopify_schemas/database.json b/ingestion/examples/sample_data/datasets/database.json similarity index 100% rename from ingestion/examples/shopify_schemas/database.json rename to ingestion/examples/sample_data/datasets/database.json diff --git a/ingestion/examples/shopify_schemas/query_log b/ingestion/examples/sample_data/datasets/query_log similarity index 100% rename from ingestion/examples/shopify_schemas/query_log rename to ingestion/examples/sample_data/datasets/query_log diff --git a/ingestion/examples/shopify_schemas/service.json b/ingestion/examples/sample_data/datasets/service.json similarity index 100% rename from ingestion/examples/shopify_schemas/service.json rename to ingestion/examples/sample_data/datasets/service.json diff --git a/ingestion/examples/shopify_schemas/tables.json b/ingestion/examples/sample_data/datasets/tables.json similarity index 100% rename from ingestion/examples/shopify_schemas/tables.json rename to ingestion/examples/sample_data/datasets/tables.json diff --git a/ingestion/examples/kafka_schemas/service.json b/ingestion/examples/sample_data/topics/service.json similarity index 100% rename from ingestion/examples/kafka_schemas/service.json rename to ingestion/examples/sample_data/topics/service.json diff --git a/ingestion/examples/kafka_schemas/topics.json b/ingestion/examples/sample_data/topics/topics.json similarity index 100% rename from ingestion/examples/kafka_schemas/topics.json rename to ingestion/examples/sample_data/topics/topics.json diff --git a/ingestion/examples/superset_data/service.json b/ingestion/examples/superset_data/service.json index 2e896705025..81a97872e60 100644 --- a/ingestion/examples/superset_data/service.json +++ b/ingestion/examples/superset_data/service.json @@ -1,9 +1,8 @@ { - "id": "a6fb4f54-ba3d-4a16-97f0-766713199141", "name": "sample_superset", "serviceType": "Superset", "description": "Supset Service", - "dashboardUrl": "http://localhost:8088", - "username": "admin", - "password": "admin" + "dashboardUrl": "http://localhost:8088", + "username": "admin", + "password": "admin" } diff --git a/ingestion/pipelines/sample_dashboards.json b/ingestion/pipelines/sample_dashboards.json index 1422cc87627..1cf338c3c23 100644 --- a/ingestion/pipelines/sample_dashboards.json +++ b/ingestion/pipelines/sample_dashboards.json @@ -7,7 +7,7 @@ } }, "sink": { - "type": "metadata-rest-dashboards", + "type": "metadata-rest", "config": {} }, "metadata_server": { diff --git a/ingestion/pipelines/sample_data.json b/ingestion/pipelines/sample_data.json new file mode 100644 index 00000000000..0243d737dbc --- /dev/null +++ b/ingestion/pipelines/sample_data.json @@ -0,0 +1,26 @@ +{ + "source": { + "type": "sample-data", + "config": { + "sample_data_folder": "./examples/sample_data" + } + }, + "sink": { + "type": "metadata-rest", + "config": {} + }, + "metadata_server": { + "type": "metadata-server", + "config": { + "api_endpoint": "http://localhost:8585/api", + "auth_provider_type": "no-auth" + } + }, + "cron": { + "minute": "*/5", + "hour": null, + "day": null, + "month": null, + "day_of_week": null + } +} diff --git a/ingestion/pipelines/sample_tables.json b/ingestion/pipelines/sample_tables.json index b04b72abc8b..107ab283a50 100644 --- a/ingestion/pipelines/sample_tables.json +++ b/ingestion/pipelines/sample_tables.json @@ -4,7 +4,7 @@ "config": { "database": "warehouse", "service_name": "gcp_bigquery", - "sample_schema_folder": "./examples/shopify_schemas/" + "sample_schema_folder": "./examples/sample_data/datasets" } }, "processor": { @@ -12,7 +12,7 @@ "config": {} }, "sink": { - "type": "metadata-rest-tables", + "type": "metadata-rest", "config": {} }, "metadata_server": { diff --git a/ingestion/pipelines/sample_topics.json b/ingestion/pipelines/sample_topics.json index 31c81e27172..1ca5df64615 100644 --- a/ingestion/pipelines/sample_topics.json +++ b/ingestion/pipelines/sample_topics.json @@ -3,11 +3,11 @@ "type": "sample-topics", "config": { "service_name": "sample_kafka", - "sample_schema_folder": "./examples/kafka_schemas/" + "sample_schema_folder": "./examples/sample_data/topics" } }, "sink": { - "type": "metadata-rest-topics", + "type": "metadata-rest", "config": {} }, "metadata_server": { diff --git a/ingestion/pipelines/sample_usage.json b/ingestion/pipelines/sample_usage.json index feef377ea01..f18d4e8e63f 100644 --- a/ingestion/pipelines/sample_usage.json +++ b/ingestion/pipelines/sample_usage.json @@ -4,7 +4,7 @@ "config": { "database": "warehouse", "service_name": "gcp_bigquery", - "sample_schema_folder": "./examples/shopify_schemas/" + "sample_schema_folder": "./examples/sample_data/datasets" } }, "processor": { diff --git a/ingestion/setup.py b/ingestion/setup.py index 1715fb3ef41..341334b2be0 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -95,6 +95,7 @@ plugins: Dict[str, Set[str]] = { "snowflake-usage": {"snowflake-sqlalchemy<=1.2.4"}, "sample-tables": {"faker~=8.1.1", }, "sample-topics": {}, + "sample-data": {"faker~=8.1.1",}, "superset": {} } diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest.py new file mode 100644 index 00000000000..e1a25445d7f --- /dev/null +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest.py @@ -0,0 +1,188 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from pydantic import ValidationError + +from metadata.config.common import ConfigModel +from metadata.generated.schema.api.data.createChart import CreateChartEntityRequest +from metadata.generated.schema.api.data.createDashboard import CreateDashboardEntityRequest +from metadata.generated.schema.api.data.createDatabase import CreateDatabaseEntityRequest +from metadata.generated.schema.api.data.createTable import CreateTableEntityRequest +from metadata.generated.schema.api.data.createTopic import CreateTopic +from metadata.generated.schema.entity.data.chart import ChartType +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.api.common import WorkflowContext, Record +from metadata.ingestion.api.sink import Sink, SinkStatus +from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable +from metadata.ingestion.models.table_metadata import Chart, Dashboard +from metadata.ingestion.ometa.client import APIError +from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient, MetadataServerConfig + +logger = logging.getLogger(__name__) + +om_chart_type_dict = { + "line": ChartType.Line, + "table": ChartType.Table, + "dist_bar": ChartType.Bar, + "bar": ChartType.Bar, + "big_number": ChartType.Line, + "histogram": ChartType.Histogram, + "big_number_total": ChartType.Line, + "dual_line": ChartType.Line, + "line_multi": ChartType.Line, + "treemap": ChartType.Area, + "box_plot": ChartType.Bar +} + + +class MetadataRestSinkConfig(ConfigModel): + api_endpoint: str = None + + +class MetadataRestSink(Sink): + config: MetadataRestSinkConfig + status: SinkStatus + + def __init__(self, ctx: WorkflowContext, config: MetadataRestSinkConfig, metadata_config: MetadataServerConfig): + super().__init__(ctx) + self.config = config + self.metadata_config = metadata_config + self.status = SinkStatus() + self.wrote_something = False + self.charts_dict = {} + self.client = OpenMetadataAPIClient(self.metadata_config) + + @classmethod + def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext): + config = MetadataRestSinkConfig.parse_obj(config_dict) + metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict) + return cls(ctx, config, metadata_config) + + def write_record(self, record: Record) -> None: + if isinstance(record, OMetaDatabaseAndTable): + self.write_tables(record) + elif isinstance(record, CreateTopic): + self.write_topics(record) + elif isinstance(record, Chart): + self.write_charts(record) + elif isinstance(record, Dashboard): + self.write_dashboards(record) + else: + logging.info("Ignoring the record due to unknown Record type {}".format(type(record))) + + def write_tables(self, table_and_db: OMetaDatabaseAndTable): + try: + db_request = CreateDatabaseEntityRequest(name=table_and_db.database.name, + description=table_and_db.database.description, + service=EntityReference(id=table_and_db.database.service.id, + type="databaseService")) + db = self.client.create_database(db_request) + table_request = CreateTableEntityRequest(name=table_and_db.table.name, + tableType=table_and_db.table.tableType, + columns=table_and_db.table.columns, + description=table_and_db.table.description, + database=db.id) + + if table_and_db.table.viewDefinition is not None and table_and_db.table.viewDefinition != "": + table_request.viewDefinition = table_and_db.table.viewDefinition.__root__ + + created_table = self.client.create_or_update_table(table_request) + if table_and_db.table.sampleData is not None: + self.client.ingest_sample_data(id=created_table.id, sample_data=table_and_db.table.sampleData) + + logger.info( + 'Successfully ingested table {}.{}'. + format(table_and_db.database.name.__root__, created_table.name.__root__)) + self.status.records_written( + '{}.{}'.format(table_and_db.database.name.__root__, created_table.name.__root__)) + except (APIError, ValidationError) as err: + logger.error( + "Failed to ingest table {} in database {} ".format(table_and_db.table.name.__root__, + table_and_db.database.name.__root__)) + logger.error(err) + self.status.failure(table_and_db.table.name.__root__) + + def write_topics(self, topic: CreateTopic) -> None: + try: + created_topic = self.client.create_or_update_topic(topic) + logger.info( + 'Successfully ingested topic {}'.format(created_topic.name.__root__)) + self.status.records_written(created_topic.name) + except (APIError, ValidationError) as err: + logger.error( + "Failed to ingest topic {} ".format(topic.name.__root__)) + logger.error(err) + self.status.failure(topic.name) + + def write_charts(self, chart: Chart): + try: + om_chart_type = ChartType.Other + if chart.chart_type is not None and chart.chart_type in om_chart_type_dict.keys(): + om_chart_type = om_chart_type_dict[chart.chart_type] + + chart_request = CreateChartEntityRequest( + name=chart.name, + displayName=chart.displayName, + description=chart.description, + chartType=om_chart_type, + chartUrl=chart.url, + service=chart.service + ) + created_chart = self.client.create_or_update_chart(chart_request) + self.charts_dict[chart.name] = EntityReference(id=created_chart.id, type='chart') + logger.info( + 'Successfully ingested chart {}'.format(created_chart.displayName)) + self.status.records_written( + '{}'.format(created_chart.displayName)) + except (APIError, ValidationError) as err: + logger.error( + "Failed to ingest chart {}".format(chart.displayName)) + logger.error(err) + self.status.failure(chart.displayName) + + def write_dashboards(self, dashboard: Dashboard): + try: + charts = self._get_chart_references(dashboard) + + dashboard_request = CreateDashboardEntityRequest( + name=dashboard.name, + displayName=dashboard.displayName, + description=dashboard.description, + dashboardUrl=dashboard.url, + charts=charts, + service=dashboard.service + ) + created_dashboard = self.client.create_or_update_dashboard(dashboard_request) + logger.info('Successfully ingested dashboard {}'.format(created_dashboard.displayName)) + self.status.records_written('{}'.format(created_dashboard.displayName)) + except (APIError, ValidationError) as err: + logger.error("Failed to ingest dashboard {}".format(dashboard.name)) + logger.error(err) + self.status.failure(dashboard.name) + + def _get_chart_references(self, dashboard: Dashboard) -> []: + chart_references = [] + for chart_id in dashboard.charts: + if chart_id in self.charts_dict.keys(): + chart_references.append(self.charts_dict[chart_id]) + return chart_references + + def get_status(self): + return self.status + + def close(self): + pass diff --git a/ingestion/src/metadata/ingestion/source/sample_data.py b/ingestion/src/metadata/ingestion/source/sample_data.py new file mode 100644 index 00000000000..dd75747eb05 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/sample_data.py @@ -0,0 +1,349 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import csv +import pandas as pd +import uuid +import os +import json +from faker import Faker +from collections import namedtuple +from dataclasses import dataclass, field +from typing import Iterable, List, Dict, Any, Union + +from pydantic import ValidationError + +from metadata.config.common import ConfigModel +from metadata.generated.schema.api.data.createTopic import CreateTopic +from metadata.generated.schema.api.services.createDashboardService import CreateDashboardServiceEntityRequest +from metadata.generated.schema.api.services.createMessagingService import CreateMessagingServiceEntityRequest +from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.entity.data.database import Database +from metadata.generated.schema.entity.services.dashboardService import DashboardService +from metadata.generated.schema.entity.services.messagingService import MessagingService +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.api.common import Record +from metadata.ingestion.api.source import SourceStatus, Source +from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable +from metadata.ingestion.models.table_metadata import Dashboard, Chart +from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig +from metadata.generated.schema.api.services.createDatabaseService import CreateDatabaseServiceEntityRequest +from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient +import logging + +from metadata.utils.helpers import get_database_service_or_create + +logger: logging.Logger = logging.getLogger(__name__) + +COLUMN_NAME = 'Column' +KEY_TYPE = 'Key type' +DATA_TYPE = 'Data type' +COL_DESCRIPTION = 'Description' +TableKey = namedtuple('TableKey', ['schema', 'table_name']) + + +def get_database_service_or_create(service_json, metadata_config) -> DatabaseService: + client = OpenMetadataAPIClient(metadata_config) + service = client.get_database_service(service_json['name']) + if service is not None: + return service + else: + created_service = client.create_database_service(CreateDatabaseServiceEntityRequest(**service_json)) + return created_service + +def get_messaging_service_or_create(service_json, metadata_config) -> MessagingService: + client = OpenMetadataAPIClient(metadata_config) + service = client.get_messaging_service(service_json['name']) + if service is not None: + return service + else: + created_service = client.create_messaging_service(CreateMessagingServiceEntityRequest(**service_json)) + return created_service + +def get_dashboard_service_or_create(service_json, metadata_config) -> DashboardService: + client = OpenMetadataAPIClient(metadata_config) + service = client.get_dashboard_service(service_json['name']) + if service is not None: + return service + else: + created_service = client.create_dashboard_service(CreateDashboardServiceEntityRequest(**service_json)) + return created_service + + +def get_table_key(row: Dict[str, Any]) -> Union[TableKey, None]: + """ + Table key consists of schema and table name + :param row: + :return: + """ + return TableKey(schema=row['schema'], table_name=row['table_name']) + + +class SampleDataSourceConfig(ConfigModel): + sample_data_folder: str + service_name: str = "bigquery_gcp" + database: str = "warehouse" + service_type: str = "BigQuery" + scheme = "bigquery+pymysql" + + def get_sample_data_folder(self): + return self.sample_data_folder + + +@dataclass +class SampleDataSourceStatus(SourceStatus): + success: List[str] = field(default_factory=list) + failures: List[str] = field(default_factory=list) + warnings: List[str] = field(default_factory=list) + + def scanned(self, entity_type: str, entity_name: str) -> None: + self.success.append(entity_name) + logger.info('{} Scanned: {}'.format(entity_type, entity_name)) + + def filtered(self, entity_type: str, entity_name: str, err: str) -> None: + self.warnings.append(entity_name) + logger.warning("Dropped {} {} due to {}".format(entity_type, entity_name, err)) + + +class TableSchema: + def __init__(self, filename): + # error if the file is not csv file + if not filename.endswith('.csv'): + raise Exception('Input file should be a csv file') + + # file name is assumed to be the table name + basename = os.path.basename(filename) + self.table_name = os.path.splitext(basename)[0] + + with open(filename, 'r') as fin: + self.columns = [dict(i) for i in csv.DictReader(fin)] + + def primary_keys(self): + return [c[COLUMN_NAME] for c in self.columns if c[KEY_TYPE] == 'PK'] + + def foreign_keys(self): + return [c[COLUMN_NAME] for c in self.columns if c[KEY_TYPE] == 'FK'] + + def get_name(self): + return self.table_name + + def get_schema(self): + return self.columns + + def get_column_names(self): + return [c[COLUMN_NAME] for c in self.columns] + + +class SampleTableMetadataGenerator: + def __init__(self, table_to_df_dict, table_to_schema_map): + self.table_to_df_dict = table_to_df_dict + self.table_to_schema_map = table_to_schema_map + self.sample_user = None + self.sample_table = None + self.sample_table_owner = None + self.sample_table_last_updated = None + + def get_empty_dict_with_cols(self, columns): + data = {} + for c in columns: + data[c] = [] + return data + + def generate_sample_table(self): + keys = ['database', 'cluster', 'schema', 'name', 'description', 'tags', 'is_view', 'description_source'] + data = self.get_empty_dict_with_cols(keys) + + for tname in self.table_to_df_dict.keys(): + data['database'].append('hive') + data['cluster'].append('gold') + data['schema'].append('gdw') + data['name'].append(tname) + data['description'].append('this is the table to hold data on ' + tname) + data['tags'].append('pii') + data['is_view'].append('false') + data['description_source'].append('') + sample_table = pd.DataFrame(data) + table_dict = {} + for index, row in sample_table.iterrows(): + table_dict[row['name']] = row + return table_dict + + def generate_sample_col(self): + # name, description, col_type, sort_order, database, cluster, schema, table_name + # col1, "col1 description", "string", 1, hive, gold, test_schema, test_table1 + keys = ['name', 'description', 'col_type', 'sort_order', 'database', 'cluster', 'schema', 'table_name'] + + data = self.get_empty_dict_with_cols(keys) + + for (tname, df) in self.table_to_df_dict.items(): + tschema = self.table_to_schema_map[tname].get_schema() + for col in df.columns: + data['name'].append(col) + for c in tschema: + if c[COLUMN_NAME] is col: + data['description'].append(c[COL_DESCRIPTION]) + data['col_type'].append(c[DATA_TYPE]) + break + data['sort_order'].append(1) + data['cluster'].append('gold') + data['database'].append('hive') + data['schema'].append('dwh') + data['table_name'].append(tname) + pd_rows = pd.DataFrame(data) + row_dict = [] + for index, row in pd_rows.iterrows(): + row_dict.append(row) + sorted_row_dict = sorted(row_dict, key=get_table_key) + return sorted_row_dict + + +class GenerateFakeSampleData: + def __init__(self) -> None: + pass + + @classmethod + def check_columns(self, columns): + fake = Faker() + colData = [] + colList = [column['name'] for column in columns] + for i in range(25): + row = [] + for column in columns: + col_name = column['name'] + value = None + if "id" in col_name: + value = uuid.uuid4() + elif "price" in col_name: + value = fake.pricetag() + elif "barcode" in col_name: + value = fake.ean(length=13) + elif "phone" in col_name: + value = fake.phone_number() + elif "zip" in col_name: + value = fake.postcode() + elif "address" in col_name: + value = fake.street_address() + elif "company" in col_name: + value = fake.company() + elif "region" in col_name: + value = fake.street_address() + elif "name" in col_name: + value = fake.first_name() + elif "city" in col_name: + value = fake.city() + elif "country" in col_name: + value = fake.country() + if value is None: + if "TIMESTAMP" in column['columnDataType'] or "date" in col_name: + value = fake.unix_time() + elif "BOOLEAN" in column['columnDataType']: + value = fake.pybool() + elif "NUMERIC" in column['columnDataType']: + value = fake.pyint() + elif "VARCHAR" in column['columnDataType']: + value = fake.text(max_nb_chars=20) + else: + value = None + row.append(value) + colData.append(row) + return {"columns": colList, "rows": colData} + + +class SampleDataSource(Source): + + def __init__(self, config: SampleDataSourceConfig, metadata_config: MetadataServerConfig, ctx): + super().__init__(ctx) + self.status = SampleDataSourceStatus() + self.config = config + self.metadata_config = metadata_config + self.client = OpenMetadataAPIClient(metadata_config) + self.database_service_json = json.load(open(self.config.sample_data_folder + "/datasets/service.json", 'r')) + self.database = json.load(open(self.config.sample_data_folder + "/datasets/database.json", 'r')) + self.tables = json.load(open(self.config.sample_data_folder + "/datasets/tables.json", 'r')) + self.database_service = get_database_service_or_create(self.database_service_json, self.metadata_config) + self.kafka_service_json = json.load(open(self.config.sample_data_folder + "/topics/service.json", 'r')) + self.topics = json.load(open(self.config.sample_data_folder + "/topics/topics.json", 'r')) + self.kafka_service = get_messaging_service_or_create(self.kafka_service_json, self.metadata_config) + self.dashboard_service_json = json.load(open(self.config.sample_data_folder + "/dashboards/service.json", 'r')) + self.charts = json.load(open(self.config.sample_data_folder + "/dashboards/charts.json", 'r')) + self.dashboards = json.load(open(self.config.sample_data_folder + "/dashboards/dashboards.json", 'r')) + self.dashboard_service = get_dashboard_service_or_create(self.dashboard_service_json, metadata_config) + + @classmethod + def create(cls, config_dict, metadata_config_dict, ctx): + config = SampleDataSourceConfig.parse_obj(config_dict) + metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict) + return cls(config, metadata_config, ctx) + + def prepare(self): + pass + + def next_record(self) -> Iterable[Record]: + yield from self.ingest_tables() + yield from self.ingest_topics() + yield from self.ingest_charts() + yield from self.ingest_dashboards() + + def ingest_tables(self) -> Iterable[OMetaDatabaseAndTable]: + db = Database(id=uuid.uuid4(), + name=self.database['name'], + description=self.database['description'], + service=EntityReference(id=self.database_service.id, type=self.config.service_type)) + for table in self.tables['tables']: + if not table.get('sampleData'): + table['sampleData'] = GenerateFakeSampleData.check_columns(table['columns']) + table_metadata = Table(**table) + table_and_db = OMetaDatabaseAndTable(table=table_metadata, database=db) + self.status.scanned("table", table_metadata.name.__root__) + yield table_and_db + + def ingest_topics(self) -> Iterable[CreateTopic]: + for topic in self.topics['topics']: + topic['service'] = EntityReference(id=self.kafka_service.id, type="messagingService") + create_topic = CreateTopic(**topic) + self.status.scanned("topic", create_topic.name.__root__) + yield create_topic + + def ingest_charts(self) -> Iterable[Chart]: + for chart in self.charts['charts']: + try: + chart_ev = Chart(name=chart['name'], + displayName=chart['displayName'], + description=chart['description'], + chart_type=chart['chartType'], + url=chart['chartUrl'], + service=EntityReference(id=self.dashboard_service.id, type="dashboardService")) + self.status.scanned("chart", chart_ev.name) + yield chart_ev + except ValidationError as err: + logger.error(err) + + def ingest_dashboards(self) -> Iterable[Dashboard]: + for dashboard in self.dashboards['dashboards']: + dashboard_ev = Dashboard(name=dashboard['name'], + displayName=dashboard['displayName'], + description=dashboard['description'], + url=dashboard['dashboardUrl'], + charts=dashboard['charts'], + service=EntityReference(id=self.dashboard_service.id, type="dashboardService")) + self.status.scanned("dashboard", dashboard_ev.name) + yield dashboard_ev + + def close(self): + pass + + def get_status(self): + return self.status