From 38ee0eca49bcfec20af2bc245f1c3a7283a1e4d9 Mon Sep 17 00:00:00 2001 From: ulixius9 Date: Mon, 11 Apr 2022 18:59:36 +0530 Subject: [PATCH] Fix #3830: Fixed Query parser --- .../database/sampleDataConnection.json | 49 +++++++ .../entity/services/databaseService.json | 3 + .../metadata/ingestion/source/sample_data.py | 127 ++++++++++++------ .../metadata/ingestion/source/sample_usage.py | 32 +++-- ingestion/tests/unit/test_query_parser.py | 57 ++++---- 5 files changed, 186 insertions(+), 82 deletions(-) create mode 100644 catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/sampleDataConnection.json diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/sampleDataConnection.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/sampleDataConnection.json new file mode 100644 index 00000000000..3be4196ff0f --- /dev/null +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/sampleDataConnection.json @@ -0,0 +1,49 @@ +{ + "$id": "https://open-metadata.org/schema/entity/services/connections/database/sampleDataConnection.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "SampleDataConnection", + "description": "Sample Data Connection Config", + "type": "object", + "javaType": "org.openmetadata.catalog.services.connections.database.SampleDataConnection", + "definitions": { + "sampledataType": { + "description": "Service type.", + "type": "string", + "enum": ["BigQuery"], + "default": "BigQuery" + }, + "sampledataScheme": { + "description": "SQLAlchemy driver scheme options.", + "type": "string", + "enum": ["bigquery"], + "default": "bigquery" + } + }, + "properties": { + "type": { + "description": "Service Type", + "$ref": "#/definitions/sampledataType", + "default": "BigQuery" + }, + "sampleDataFolder": { + "description": "Sample Data File Path", + "type": "string" + }, + "connectionOptions": { + "$ref": "connectionBasicType.json#/definitions/connectionOptions" + }, + "connectionArguments": { + "$ref": "connectionBasicType.json#/definitions/connectionArguments" + }, + "supportedPipelineTypes": { + "description": "Supported Metadata Extraction Pipelines.", + "type": "string", + "items": { + "type": "string", + "enum": ["Metadata", "Usage"] + }, + "default": ["Metadata", "Usage"] + } + }, + "additionalProperties": false +} diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/databaseService.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/databaseService.json index 4191e3ba5cb..f7650014592 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/databaseService.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/databaseService.json @@ -186,6 +186,9 @@ }, { "$ref": "./connections/database/verticaConnection.json" + }, + { + "$ref": "./connections/database/sampleDataConnection.json" } ] } diff --git a/ingestion/src/metadata/ingestion/source/sample_data.py b/ingestion/src/metadata/ingestion/source/sample_data.py index ea947248d68..225f96c9822 100644 --- a/ingestion/src/metadata/ingestion/source/sample_data.py +++ b/ingestion/src/metadata/ingestion/source/sample_data.py @@ -35,8 +35,8 @@ from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.location import Location, LocationType from metadata.generated.schema.entity.data.pipeline import Pipeline from metadata.generated.schema.entity.data.table import Table -from metadata.generated.schema.entity.services.databaseService import ( - DatabaseServiceType, +from metadata.generated.schema.entity.services.connections.database.sampleDataConnection import ( + SampleDataConnection, ) from metadata.generated.schema.entity.teams.user import User from metadata.generated.schema.metadataIngestion.workflow import ( @@ -51,7 +51,7 @@ from metadata.generated.schema.tests.tableTest import TableTestCase from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.common import Entity -from metadata.ingestion.api.source import Source, SourceStatus +from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable from metadata.ingestion.models.table_metadata import Chart, Dashboard from metadata.ingestion.models.table_tests import OMetaTableTest @@ -110,17 +110,6 @@ def get_table_key(row: Dict[str, Any]) -> Union[TableKey, None]: return TableKey(schema=row["schema"], table_name=row["table_name"]) -class SampleDataSourceConfig(WorkflowSource): - service_type = DatabaseServiceType.BigQuery.value - sample_data_folder: str = "./examples/sample_data" - - def get_sample_data_folder(self): - return self.sample_data_folder - - def get_service_type(self): - return self.service_type - - @dataclass class SampleDataSourceStatus(SourceStatus): success: List[str] = field(default_factory=list) @@ -188,37 +177,54 @@ class SampleDataSource(Source[Entity]): """ def __init__( - self, config: SampleDataSourceConfig, metadata_config: OpenMetadataServerConfig + self, config: WorkflowSource, metadata_config: OpenMetadataServerConfig ): super().__init__() self.status = SampleDataSourceStatus() self.config = config + self.service_connection = config.serviceConnection.__root__.config self.metadata_config = metadata_config self.metadata = OpenMetadata(metadata_config) self.storage_service_json = json.load( - open(self.config.sample_data_folder + "/locations/service.json", "r") + open( + self.service_connection.sampleDataFolder + "/locations/service.json", + "r", + ) ) self.locations = json.load( - open(self.config.sample_data_folder + "/locations/locations.json", "r") + open( + self.service_connection.sampleDataFolder + "/locations/locations.json", + "r", + ) ) self.storage_service = get_storage_service_or_create( service_json=self.storage_service_json, metadata_config=metadata_config, ) self.glue_storage_service_json = json.load( - open(self.config.sample_data_folder + "/glue/storage_service.json", "r") + open( + self.service_connection.sampleDataFolder + "/glue/storage_service.json", + "r", + ) ) self.glue_database_service_json = json.load( - open(self.config.sample_data_folder + "/glue/database_service.json", "r") + open( + self.service_connection.sampleDataFolder + + "/glue/database_service.json", + "r", + ) ) self.glue_database = json.load( - open(self.config.sample_data_folder + "/glue/database.json", "r") + open(self.service_connection.sampleDataFolder + "/glue/database.json", "r") ) self.glue_database_schema = json.load( - open(self.config.sample_data_folder + "/glue/database_schema.json", "r") + open( + self.service_connection.sampleDataFolder + "/glue/database_schema.json", + "r", + ) ) self.glue_tables = json.load( - open(self.config.sample_data_folder + "/glue/tables.json", "r") + open(self.service_connection.sampleDataFolder + "/glue/tables.json", "r") ) self.glue_database_service = get_database_service_or_create_v2( service_json=self.glue_database_service_json, @@ -229,25 +235,36 @@ class SampleDataSource(Source[Entity]): metadata_config, ) self.database_service_json = json.load( - open(self.config.sample_data_folder + "/datasets/service.json", "r") + open( + self.service_connection.sampleDataFolder + "/datasets/service.json", "r" + ) ) self.database = json.load( - open(self.config.sample_data_folder + "/datasets/database.json", "r") + open( + self.service_connection.sampleDataFolder + "/datasets/database.json", + "r", + ) ) self.database_schema = json.load( - open(self.config.sample_data_folder + "/datasets/database_schema.json", "r") + open( + self.service_connection.sampleDataFolder + + "/datasets/database_schema.json", + "r", + ) ) self.tables = json.load( - open(self.config.sample_data_folder + "/datasets/tables.json", "r") + open( + self.service_connection.sampleDataFolder + "/datasets/tables.json", "r" + ) ) self.database_service = get_database_service_or_create( config=config, metadata_config=self.metadata_config ) self.kafka_service_json = json.load( - open(self.config.sample_data_folder + "/topics/service.json", "r") + open(self.service_connection.sampleDataFolder + "/topics/service.json", "r") ) self.topics = json.load( - open(self.config.sample_data_folder + "/topics/topics.json", "r") + open(self.service_connection.sampleDataFolder + "/topics/topics.json", "r") ) kafka_config = { "config": { @@ -266,13 +283,23 @@ class SampleDataSource(Source[Entity]): metadata_config=self.metadata_config, ) self.dashboard_service_json = json.load( - open(self.config.sample_data_folder + "/dashboards/service.json", "r") + open( + self.service_connection.sampleDataFolder + "/dashboards/service.json", + "r", + ) ) self.charts = json.load( - open(self.config.sample_data_folder + "/dashboards/charts.json", "r") + open( + self.service_connection.sampleDataFolder + "/dashboards/charts.json", + "r", + ) ) self.dashboards = json.load( - open(self.config.sample_data_folder + "/dashboards/dashboards.json", "r") + open( + self.service_connection.sampleDataFolder + + "/dashboards/dashboards.json", + "r", + ) ) self.dashboard_service = get_dashboard_service_or_create( service_name=self.dashboard_service_json.get("name"), @@ -281,33 +308,49 @@ class SampleDataSource(Source[Entity]): metadata_config=metadata_config, ) self.pipeline_service_json = json.load( - open(self.config.sample_data_folder + "/pipelines/service.json", "r") + open( + self.service_connection.sampleDataFolder + "/pipelines/service.json", + "r", + ) ) self.pipelines = json.load( - open(self.config.sample_data_folder + "/pipelines/pipelines.json", "r") + open( + self.service_connection.sampleDataFolder + "/pipelines/pipelines.json", + "r", + ) ) self.pipeline_service = get_pipeline_service_or_create( service_json=self.pipeline_service_json, metadata_config=metadata_config, ) self.lineage = json.load( - open(self.config.sample_data_folder + "/lineage/lineage.json", "r") + open( + self.service_connection.sampleDataFolder + "/lineage/lineage.json", "r" + ) ) self.users = json.load( - open(self.config.sample_data_folder + "/users/users.json", "r") + open(self.service_connection.sampleDataFolder + "/users/users.json", "r") ) self.models = json.load( - open(self.config.sample_data_folder + "/models/models.json", "r") + open(self.service_connection.sampleDataFolder + "/models/models.json", "r") ) self.user_entity = {} self.table_tests = json.load( - open(self.config.sample_data_folder + "/datasets/tableTests.json", "r") + open( + self.service_connection.sampleDataFolder + "/datasets/tableTests.json", + "r", + ) ) @classmethod - def create(cls, config_dict, metadata_config): - config = SampleDataSourceConfig.parse_obj(config_dict) - metadata_config = OpenMetadataServerConfig.parse_obj(metadata_config) + def create(cls, config_dict, metadata_config: OpenMetadataServerConfig): + """Create class instance""" + config: WorkflowSource = WorkflowSource.parse_obj(config_dict) + connection: SampleDataConnection = config.serviceConnection.__root__.config + if not isinstance(connection, SampleDataConnection): + raise InvalidSourceException( + f"Expected MssqlConnection, but got {connection}" + ) return cls(config, metadata_config) def prepare(self): @@ -396,7 +439,7 @@ class SampleDataSource(Source[Entity]): name=self.database["name"], description=self.database["description"], service=EntityReference( - id=self.database_service.id, type=self.config.service_type + id=self.database_service.id, type=self.service_connection.type.value ), ) schema = DatabaseSchema( @@ -404,7 +447,7 @@ class SampleDataSource(Source[Entity]): name=self.database_schema["name"], description=self.database_schema["description"], service=EntityReference( - id=self.database_service.id, type=self.config.service_type + id=self.database_service.id, type=self.service_connection.type.value ), database=EntityReference(id=db.id, type="database"), ) diff --git a/ingestion/src/metadata/ingestion/source/sample_usage.py b/ingestion/src/metadata/ingestion/source/sample_usage.py index e10ef078be8..fe8528fb5bb 100644 --- a/ingestion/src/metadata/ingestion/source/sample_usage.py +++ b/ingestion/src/metadata/ingestion/source/sample_usage.py @@ -14,18 +14,21 @@ import json from datetime import datetime from typing import Iterable +from metadata.generated.schema.entity.services.connections.database.sampleDataConnection import ( + SampleDataConnection, +) from metadata.generated.schema.entity.services.databaseService import ( DatabaseServiceType, ) from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataServerConfig, ) -from metadata.ingestion.api.source import Source -from metadata.ingestion.models.table_queries import TableQuery -from metadata.ingestion.source.sample_data import ( - SampleDataSourceConfig, - SampleDataSourceStatus, +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, ) +from metadata.ingestion.api.source import InvalidSourceException, Source +from metadata.ingestion.models.table_queries import TableQuery +from metadata.ingestion.source.sample_data import SampleDataSourceStatus from metadata.utils.helpers import get_database_service_or_create @@ -34,16 +37,21 @@ class SampleUsageSource(Source[TableQuery]): service_type = DatabaseServiceType.BigQuery.value def __init__( - self, config: SampleDataSourceConfig, metadata_config: OpenMetadataServerConfig + self, config: WorkflowSource, metadata_config: OpenMetadataServerConfig ): super().__init__() self.status = SampleDataSourceStatus() self.config = config + self.service_connection = config.serviceConnection.__root__.config self.metadata_config = metadata_config self.service_json = json.load( - open(config.sample_data_folder + "/datasets/service.json", "r") + open( + self.service_connection.sampleDataFolder + "/datasets/service.json", "r" + ) + ) + self.query_log_csv = ( + self.service_connection.sampleDataFolder + "/datasets/query_log" ) - self.query_log_csv = config.sample_data_folder + "/datasets/query_log" with open(self.query_log_csv, "r") as fin: self.query_logs = [dict(i) for i in csv.DictReader(fin)] self.service = get_database_service_or_create( @@ -52,7 +60,13 @@ class SampleUsageSource(Source[TableQuery]): @classmethod def create(cls, config_dict, metadata_config: OpenMetadataServerConfig): - config = SampleDataSourceConfig.parse_obj(config_dict) + """Create class instance""" + config: WorkflowSource = WorkflowSource.parse_obj(config_dict) + connection: SampleDataConnection = config.serviceConnection.__root__.config + if not isinstance(connection, SampleDataConnection): + raise InvalidSourceException( + f"Expected MssqlConnection, but got {connection}" + ) return cls(config, metadata_config) def prepare(self): diff --git a/ingestion/tests/unit/test_query_parser.py b/ingestion/tests/unit/test_query_parser.py index f300ca3ce9b..556367f57ac 100644 --- a/ingestion/tests/unit/test_query_parser.py +++ b/ingestion/tests/unit/test_query_parser.py @@ -21,11 +21,14 @@ config = """ { "source": { "type": "sample-usage", - "config": { - "database": "warehouse", - "service_name": "bigquery_gcp", - "sample_data_folder": "ingestion/tests/unit/resources" - } + "serviceName": "bigquery_gcp", + "serviceConnection": { + "config": { + "type": "BigQuery", + "sampleDataFolder":"./examples/sample_data" + } + }, + "sourceConfig": {} }, "processor": { "type": "query-parser", @@ -39,21 +42,19 @@ config = """ "filename": "/tmp/sample_usage" } }, - "bulk_sink": { + "bulkSink": { "type": "metadata-usage", "config": { "filename": "/tmp/sample_usage" } }, - "metadata_server": { - "type": "metadata-server", - "config": { - "api_endpoint": "http://localhost:8585/api", - "auth_provider_type": "no-auth" + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "no-auth" } } } - """ @@ -63,28 +64,22 @@ class QueryParserTest(TestCase): Check the join count """ expected_result = { - "dim_address": 100, - "dim_shop": 196, - "dim_customer": 140, - "dim_location": 75, + "shopify.dim_address": 100, + "shopify.dim_shop": 190, + "shopify.dim_customer": 125, + "dim_customer": 9, + "shopify.dim_location": 75, "dim_location.shop_id": 25, "dim_shop.shop_id": 105, - "dim_product": 130, + "shopify.dim_product": 130, "dim_product.shop_id": 80, - "dim_product_variant": 35, - "dim_staff": 75, - "fact_line_item": 100, - "fact_order": 185, - "dim_api_client": 85, - "fact_sale": 400, - "customer": 4, - "orders": 2, - "products": 2, - "orderdetails": 2, - "country": 1, - "city": 1, - "call": 1, - "countries": 1, + "shopify.dim_product_variant": 35, + "dim_shop": 5, + "shopify.dim_staff": 75, + "shopify.fact_line_item": 100, + "shopify.fact_order": 185, + "shopify.dim_api_client": 85, + "shopify.fact_sale": 420, } workflow = Workflow.create(json.loads(config)) workflow.execute()