Fix #3830: Fixed Query parser

This commit is contained in:
ulixius9 2022-04-11 18:59:36 +05:30
parent 9c92424f74
commit 38ee0eca49
5 changed files with 186 additions and 82 deletions

View File

@ -0,0 +1,49 @@
{
"$id": "https://open-metadata.org/schema/entity/services/connections/database/sampleDataConnection.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "SampleDataConnection",
"description": "Sample Data Connection Config",
"type": "object",
"javaType": "org.openmetadata.catalog.services.connections.database.SampleDataConnection",
"definitions": {
"sampledataType": {
"description": "Service type.",
"type": "string",
"enum": ["BigQuery"],
"default": "BigQuery"
},
"sampledataScheme": {
"description": "SQLAlchemy driver scheme options.",
"type": "string",
"enum": ["bigquery"],
"default": "bigquery"
}
},
"properties": {
"type": {
"description": "Service Type",
"$ref": "#/definitions/sampledataType",
"default": "BigQuery"
},
"sampleDataFolder": {
"description": "Sample Data File Path",
"type": "string"
},
"connectionOptions": {
"$ref": "connectionBasicType.json#/definitions/connectionOptions"
},
"connectionArguments": {
"$ref": "connectionBasicType.json#/definitions/connectionArguments"
},
"supportedPipelineTypes": {
"description": "Supported Metadata Extraction Pipelines.",
"type": "string",
"items": {
"type": "string",
"enum": ["Metadata", "Usage"]
},
"default": ["Metadata", "Usage"]
}
},
"additionalProperties": false
}

View File

@ -186,6 +186,9 @@
}, },
{ {
"$ref": "./connections/database/verticaConnection.json" "$ref": "./connections/database/verticaConnection.json"
},
{
"$ref": "./connections/database/sampleDataConnection.json"
} }
] ]
} }

View File

@ -35,8 +35,8 @@ from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.location import Location, LocationType from metadata.generated.schema.entity.data.location import Location, LocationType
from metadata.generated.schema.entity.data.pipeline import Pipeline from metadata.generated.schema.entity.data.pipeline import Pipeline
from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.databaseService import ( from metadata.generated.schema.entity.services.connections.database.sampleDataConnection import (
DatabaseServiceType, SampleDataConnection,
) )
from metadata.generated.schema.entity.teams.user import User from metadata.generated.schema.entity.teams.user import User
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
@ -51,7 +51,7 @@ from metadata.generated.schema.tests.tableTest import TableTestCase
from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import Entity from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import Source, SourceStatus from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.models.table_metadata import Chart, Dashboard from metadata.ingestion.models.table_metadata import Chart, Dashboard
from metadata.ingestion.models.table_tests import OMetaTableTest from metadata.ingestion.models.table_tests import OMetaTableTest
@ -110,17 +110,6 @@ def get_table_key(row: Dict[str, Any]) -> Union[TableKey, None]:
return TableKey(schema=row["schema"], table_name=row["table_name"]) return TableKey(schema=row["schema"], table_name=row["table_name"])
class SampleDataSourceConfig(WorkflowSource):
service_type = DatabaseServiceType.BigQuery.value
sample_data_folder: str = "./examples/sample_data"
def get_sample_data_folder(self):
return self.sample_data_folder
def get_service_type(self):
return self.service_type
@dataclass @dataclass
class SampleDataSourceStatus(SourceStatus): class SampleDataSourceStatus(SourceStatus):
success: List[str] = field(default_factory=list) success: List[str] = field(default_factory=list)
@ -188,37 +177,54 @@ class SampleDataSource(Source[Entity]):
""" """
def __init__( def __init__(
self, config: SampleDataSourceConfig, metadata_config: OpenMetadataServerConfig self, config: WorkflowSource, metadata_config: OpenMetadataServerConfig
): ):
super().__init__() super().__init__()
self.status = SampleDataSourceStatus() self.status = SampleDataSourceStatus()
self.config = config self.config = config
self.service_connection = config.serviceConnection.__root__.config
self.metadata_config = metadata_config self.metadata_config = metadata_config
self.metadata = OpenMetadata(metadata_config) self.metadata = OpenMetadata(metadata_config)
self.storage_service_json = json.load( self.storage_service_json = json.load(
open(self.config.sample_data_folder + "/locations/service.json", "r") open(
self.service_connection.sampleDataFolder + "/locations/service.json",
"r",
)
) )
self.locations = json.load( self.locations = json.load(
open(self.config.sample_data_folder + "/locations/locations.json", "r") open(
self.service_connection.sampleDataFolder + "/locations/locations.json",
"r",
)
) )
self.storage_service = get_storage_service_or_create( self.storage_service = get_storage_service_or_create(
service_json=self.storage_service_json, service_json=self.storage_service_json,
metadata_config=metadata_config, metadata_config=metadata_config,
) )
self.glue_storage_service_json = json.load( self.glue_storage_service_json = json.load(
open(self.config.sample_data_folder + "/glue/storage_service.json", "r") open(
self.service_connection.sampleDataFolder + "/glue/storage_service.json",
"r",
)
) )
self.glue_database_service_json = json.load( self.glue_database_service_json = json.load(
open(self.config.sample_data_folder + "/glue/database_service.json", "r") open(
self.service_connection.sampleDataFolder
+ "/glue/database_service.json",
"r",
)
) )
self.glue_database = json.load( self.glue_database = json.load(
open(self.config.sample_data_folder + "/glue/database.json", "r") open(self.service_connection.sampleDataFolder + "/glue/database.json", "r")
) )
self.glue_database_schema = json.load( self.glue_database_schema = json.load(
open(self.config.sample_data_folder + "/glue/database_schema.json", "r") open(
self.service_connection.sampleDataFolder + "/glue/database_schema.json",
"r",
)
) )
self.glue_tables = json.load( self.glue_tables = json.load(
open(self.config.sample_data_folder + "/glue/tables.json", "r") open(self.service_connection.sampleDataFolder + "/glue/tables.json", "r")
) )
self.glue_database_service = get_database_service_or_create_v2( self.glue_database_service = get_database_service_or_create_v2(
service_json=self.glue_database_service_json, service_json=self.glue_database_service_json,
@ -229,25 +235,36 @@ class SampleDataSource(Source[Entity]):
metadata_config, metadata_config,
) )
self.database_service_json = json.load( self.database_service_json = json.load(
open(self.config.sample_data_folder + "/datasets/service.json", "r") open(
self.service_connection.sampleDataFolder + "/datasets/service.json", "r"
)
) )
self.database = json.load( self.database = json.load(
open(self.config.sample_data_folder + "/datasets/database.json", "r") open(
self.service_connection.sampleDataFolder + "/datasets/database.json",
"r",
)
) )
self.database_schema = json.load( self.database_schema = json.load(
open(self.config.sample_data_folder + "/datasets/database_schema.json", "r") open(
self.service_connection.sampleDataFolder
+ "/datasets/database_schema.json",
"r",
)
) )
self.tables = json.load( self.tables = json.load(
open(self.config.sample_data_folder + "/datasets/tables.json", "r") open(
self.service_connection.sampleDataFolder + "/datasets/tables.json", "r"
)
) )
self.database_service = get_database_service_or_create( self.database_service = get_database_service_or_create(
config=config, metadata_config=self.metadata_config config=config, metadata_config=self.metadata_config
) )
self.kafka_service_json = json.load( self.kafka_service_json = json.load(
open(self.config.sample_data_folder + "/topics/service.json", "r") open(self.service_connection.sampleDataFolder + "/topics/service.json", "r")
) )
self.topics = json.load( self.topics = json.load(
open(self.config.sample_data_folder + "/topics/topics.json", "r") open(self.service_connection.sampleDataFolder + "/topics/topics.json", "r")
) )
kafka_config = { kafka_config = {
"config": { "config": {
@ -266,13 +283,23 @@ class SampleDataSource(Source[Entity]):
metadata_config=self.metadata_config, metadata_config=self.metadata_config,
) )
self.dashboard_service_json = json.load( self.dashboard_service_json = json.load(
open(self.config.sample_data_folder + "/dashboards/service.json", "r") open(
self.service_connection.sampleDataFolder + "/dashboards/service.json",
"r",
)
) )
self.charts = json.load( self.charts = json.load(
open(self.config.sample_data_folder + "/dashboards/charts.json", "r") open(
self.service_connection.sampleDataFolder + "/dashboards/charts.json",
"r",
)
) )
self.dashboards = json.load( self.dashboards = json.load(
open(self.config.sample_data_folder + "/dashboards/dashboards.json", "r") open(
self.service_connection.sampleDataFolder
+ "/dashboards/dashboards.json",
"r",
)
) )
self.dashboard_service = get_dashboard_service_or_create( self.dashboard_service = get_dashboard_service_or_create(
service_name=self.dashboard_service_json.get("name"), service_name=self.dashboard_service_json.get("name"),
@ -281,33 +308,49 @@ class SampleDataSource(Source[Entity]):
metadata_config=metadata_config, metadata_config=metadata_config,
) )
self.pipeline_service_json = json.load( self.pipeline_service_json = json.load(
open(self.config.sample_data_folder + "/pipelines/service.json", "r") open(
self.service_connection.sampleDataFolder + "/pipelines/service.json",
"r",
)
) )
self.pipelines = json.load( self.pipelines = json.load(
open(self.config.sample_data_folder + "/pipelines/pipelines.json", "r") open(
self.service_connection.sampleDataFolder + "/pipelines/pipelines.json",
"r",
)
) )
self.pipeline_service = get_pipeline_service_or_create( self.pipeline_service = get_pipeline_service_or_create(
service_json=self.pipeline_service_json, service_json=self.pipeline_service_json,
metadata_config=metadata_config, metadata_config=metadata_config,
) )
self.lineage = json.load( self.lineage = json.load(
open(self.config.sample_data_folder + "/lineage/lineage.json", "r") open(
self.service_connection.sampleDataFolder + "/lineage/lineage.json", "r"
)
) )
self.users = json.load( self.users = json.load(
open(self.config.sample_data_folder + "/users/users.json", "r") open(self.service_connection.sampleDataFolder + "/users/users.json", "r")
) )
self.models = json.load( self.models = json.load(
open(self.config.sample_data_folder + "/models/models.json", "r") open(self.service_connection.sampleDataFolder + "/models/models.json", "r")
) )
self.user_entity = {} self.user_entity = {}
self.table_tests = json.load( self.table_tests = json.load(
open(self.config.sample_data_folder + "/datasets/tableTests.json", "r") open(
self.service_connection.sampleDataFolder + "/datasets/tableTests.json",
"r",
)
) )
@classmethod @classmethod
def create(cls, config_dict, metadata_config): def create(cls, config_dict, metadata_config: OpenMetadataServerConfig):
config = SampleDataSourceConfig.parse_obj(config_dict) """Create class instance"""
metadata_config = OpenMetadataServerConfig.parse_obj(metadata_config) config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: SampleDataConnection = config.serviceConnection.__root__.config
if not isinstance(connection, SampleDataConnection):
raise InvalidSourceException(
f"Expected MssqlConnection, but got {connection}"
)
return cls(config, metadata_config) return cls(config, metadata_config)
def prepare(self): def prepare(self):
@ -396,7 +439,7 @@ class SampleDataSource(Source[Entity]):
name=self.database["name"], name=self.database["name"],
description=self.database["description"], description=self.database["description"],
service=EntityReference( service=EntityReference(
id=self.database_service.id, type=self.config.service_type id=self.database_service.id, type=self.service_connection.type.value
), ),
) )
schema = DatabaseSchema( schema = DatabaseSchema(
@ -404,7 +447,7 @@ class SampleDataSource(Source[Entity]):
name=self.database_schema["name"], name=self.database_schema["name"],
description=self.database_schema["description"], description=self.database_schema["description"],
service=EntityReference( service=EntityReference(
id=self.database_service.id, type=self.config.service_type id=self.database_service.id, type=self.service_connection.type.value
), ),
database=EntityReference(id=db.id, type="database"), database=EntityReference(id=db.id, type="database"),
) )

View File

@ -14,18 +14,21 @@ import json
from datetime import datetime from datetime import datetime
from typing import Iterable from typing import Iterable
from metadata.generated.schema.entity.services.connections.database.sampleDataConnection import (
SampleDataConnection,
)
from metadata.generated.schema.entity.services.databaseService import ( from metadata.generated.schema.entity.services.databaseService import (
DatabaseServiceType, DatabaseServiceType,
) )
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataServerConfig, OpenMetadataServerConfig,
) )
from metadata.ingestion.api.source import Source from metadata.generated.schema.metadataIngestion.workflow import (
from metadata.ingestion.models.table_queries import TableQuery Source as WorkflowSource,
from metadata.ingestion.source.sample_data import (
SampleDataSourceConfig,
SampleDataSourceStatus,
) )
from metadata.ingestion.api.source import InvalidSourceException, Source
from metadata.ingestion.models.table_queries import TableQuery
from metadata.ingestion.source.sample_data import SampleDataSourceStatus
from metadata.utils.helpers import get_database_service_or_create from metadata.utils.helpers import get_database_service_or_create
@ -34,16 +37,21 @@ class SampleUsageSource(Source[TableQuery]):
service_type = DatabaseServiceType.BigQuery.value service_type = DatabaseServiceType.BigQuery.value
def __init__( def __init__(
self, config: SampleDataSourceConfig, metadata_config: OpenMetadataServerConfig self, config: WorkflowSource, metadata_config: OpenMetadataServerConfig
): ):
super().__init__() super().__init__()
self.status = SampleDataSourceStatus() self.status = SampleDataSourceStatus()
self.config = config self.config = config
self.service_connection = config.serviceConnection.__root__.config
self.metadata_config = metadata_config self.metadata_config = metadata_config
self.service_json = json.load( self.service_json = json.load(
open(config.sample_data_folder + "/datasets/service.json", "r") open(
self.service_connection.sampleDataFolder + "/datasets/service.json", "r"
)
)
self.query_log_csv = (
self.service_connection.sampleDataFolder + "/datasets/query_log"
) )
self.query_log_csv = config.sample_data_folder + "/datasets/query_log"
with open(self.query_log_csv, "r") as fin: with open(self.query_log_csv, "r") as fin:
self.query_logs = [dict(i) for i in csv.DictReader(fin)] self.query_logs = [dict(i) for i in csv.DictReader(fin)]
self.service = get_database_service_or_create( self.service = get_database_service_or_create(
@ -52,7 +60,13 @@ class SampleUsageSource(Source[TableQuery]):
@classmethod @classmethod
def create(cls, config_dict, metadata_config: OpenMetadataServerConfig): def create(cls, config_dict, metadata_config: OpenMetadataServerConfig):
config = SampleDataSourceConfig.parse_obj(config_dict) """Create class instance"""
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: SampleDataConnection = config.serviceConnection.__root__.config
if not isinstance(connection, SampleDataConnection):
raise InvalidSourceException(
f"Expected MssqlConnection, but got {connection}"
)
return cls(config, metadata_config) return cls(config, metadata_config)
def prepare(self): def prepare(self):

View File

@ -21,11 +21,14 @@ config = """
{ {
"source": { "source": {
"type": "sample-usage", "type": "sample-usage",
"config": { "serviceName": "bigquery_gcp",
"database": "warehouse", "serviceConnection": {
"service_name": "bigquery_gcp", "config": {
"sample_data_folder": "ingestion/tests/unit/resources" "type": "BigQuery",
} "sampleDataFolder":"./examples/sample_data"
}
},
"sourceConfig": {}
}, },
"processor": { "processor": {
"type": "query-parser", "type": "query-parser",
@ -39,21 +42,19 @@ config = """
"filename": "/tmp/sample_usage" "filename": "/tmp/sample_usage"
} }
}, },
"bulk_sink": { "bulkSink": {
"type": "metadata-usage", "type": "metadata-usage",
"config": { "config": {
"filename": "/tmp/sample_usage" "filename": "/tmp/sample_usage"
} }
}, },
"metadata_server": { "workflowConfig": {
"type": "metadata-server", "openMetadataServerConfig": {
"config": { "hostPort": "http://localhost:8585/api",
"api_endpoint": "http://localhost:8585/api", "authProvider": "no-auth"
"auth_provider_type": "no-auth"
} }
} }
} }
""" """
@ -63,28 +64,22 @@ class QueryParserTest(TestCase):
Check the join count Check the join count
""" """
expected_result = { expected_result = {
"dim_address": 100, "shopify.dim_address": 100,
"dim_shop": 196, "shopify.dim_shop": 190,
"dim_customer": 140, "shopify.dim_customer": 125,
"dim_location": 75, "dim_customer": 9,
"shopify.dim_location": 75,
"dim_location.shop_id": 25, "dim_location.shop_id": 25,
"dim_shop.shop_id": 105, "dim_shop.shop_id": 105,
"dim_product": 130, "shopify.dim_product": 130,
"dim_product.shop_id": 80, "dim_product.shop_id": 80,
"dim_product_variant": 35, "shopify.dim_product_variant": 35,
"dim_staff": 75, "dim_shop": 5,
"fact_line_item": 100, "shopify.dim_staff": 75,
"fact_order": 185, "shopify.fact_line_item": 100,
"dim_api_client": 85, "shopify.fact_order": 185,
"fact_sale": 400, "shopify.dim_api_client": 85,
"customer": 4, "shopify.fact_sale": 420,
"orders": 2,
"products": 2,
"orderdetails": 2,
"country": 1,
"city": 1,
"call": 1,
"countries": 1,
} }
workflow = Workflow.create(json.loads(config)) workflow = Workflow.create(json.loads(config))
workflow.execute() workflow.execute()