Fix #1968: Query Runner Schema (#23077)

This commit is contained in:
Mayur Singal 2025-09-11 10:41:11 +05:30 committed by GitHub
parent c336257cf4
commit d705fffc1d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 244 additions and 9 deletions

View File

@ -20,6 +20,9 @@ from typing import Dict, List, Optional, Type, TypeVar, Union
from pydantic import BaseModel
from metadata.generated.schema.entity.automations.response.queryRunnerResponse import (
QueryRunnerResponse,
)
from metadata.generated.schema.entity.automations.workflow import (
Workflow as AutomationWorkflow,
)
@ -557,7 +560,9 @@ class OMetaPatchMixin(OMetaPatchMixinBase):
def patch_automation_workflow_response(
self,
automation_workflow: AutomationWorkflow,
result: Union[TestConnectionResult, ReverseIngestionResponse],
result: Union[
TestConnectionResult, ReverseIngestionResponse, QueryRunnerResponse
],
workflow_status: WorkflowStatus,
) -> None:
"""
@ -570,17 +575,16 @@ class OMetaPatchMixin(OMetaPatchMixinBase):
}
# for deserializing into json convert enum object to string
if isinstance(result, TestConnectionResult):
result_data[PatchField.VALUE]["status"] = result_data[PatchField.VALUE][
"status"
].value
else:
if isinstance(result, ReverseIngestionResponse):
# Convert UUID in string
data = result_data[PatchField.VALUE]
data["serviceId"] = str(data["serviceId"])
for operation_result in data["results"]:
operation_result["id"] = str(operation_result["id"])
else:
result_data[PatchField.VALUE]["status"] = result_data[PatchField.VALUE][
"status"
].value
status_data: Dict = {
PatchField.PATH: PatchPath.STATUS,
PatchField.OPERATION: PatchOperation.ADD,

View File

@ -14,6 +14,7 @@
package org.openmetadata.service.secrets.converter;
import java.util.List;
import org.openmetadata.schema.entity.automations.QueryRunnerRequest;
import org.openmetadata.schema.entity.automations.TestServiceConnectionRequest;
import org.openmetadata.schema.entity.automations.TestSparkEngineConnectionRequest;
import org.openmetadata.schema.entity.automations.Workflow;
@ -37,6 +38,7 @@ public class WorkflowClassConverter extends ClassConverter {
List.of(
TestServiceConnectionRequest.class,
ReverseIngestionPipeline.class,
QueryRunnerRequest.class,
TestSparkEngineConnectionRequest.class))
.ifPresent(workflow::setRequest);

View File

@ -0,0 +1,32 @@
{
"$id": "https://open-metadata.org/schema/entity/automations/queryRunnerRequest.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "QueryRunnerRequest",
"javaType": "org.openmetadata.schema.entity.automations.QueryRunnerRequest",
"description": "Query Runner Request",
"type": "object",
"properties": {
"connectionType": {
"description": "Type of the connection to test such as Snowflake, MySQL, Looker, etc.",
"type": "string"
},
"serviceName": {
"description": "Optional value that identifies this service name.",
"$ref": "../../type/basic.json#/definitions/entityName",
"default": null
},
"query": {
"description": "Query to be executed.",
"type": "string"
},
"transpile": {
"description": "Optional value to indicate if the query should be transpiled.",
"type": "boolean"
},
"ingestionRunner": {
"description": "Optional value of the ingestion runner name responsible for running the test",
"type": "string"
}
},
"additionalProperties": false
}

View File

@ -0,0 +1,43 @@
{
"$id": "https://open-metadata.org/schema/entity/automations/response/queryRunnerResponse.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "QueryRunnerResponse",
"javaType": "org.openmetadata.schema.entity.automations.QueryRunnerResponse",
"description": "Query Runner Response",
"type": "object",
"definitions": {
"statusType": {
"javaType": "org.openmetadata.schema.entity.automations.QueryRunnerStatus",
"description": "Enum defining possible Query Runner status",
"type": "string",
"enum": [
"Successful",
"Failed",
"Running"
]
}
},
"properties": {
"status": {
"description": "Status of the query execution",
"$ref": "#/definitions/statusType"
},
"message": {
"description": "Error message in case of failure",
"type": "string"
},
"errorLog": {
"description": "Detailed error log in case of failure",
"type": "string"
},
"duration": {
"description": "Duration of the query execution in seconds",
"type": "number"
},
"results": {
"description": "Results of the query execution",
"$ref": "../../data/table.json#/definitions/tableData"
}
},
"additionalProperties": false
}

View File

@ -16,6 +16,7 @@
"enum": [
"TEST_CONNECTION",
"REVERSE_INGESTION",
"QUERY_RUNNER",
"TEST_SPARK_ENGINE_CONNECTION"
]
},
@ -70,6 +71,9 @@
{
"$ref": "../../metadataIngestion/reverseIngestionPipeline.json"
},
{
"$ref": "queryRunnerRequest.json"
},
{
"$ref": "testSparkEngineConnection.json"
}
@ -83,6 +87,9 @@
},
{
"$ref": "../services/ingestionPipelines/reverseIngestionResponse.json"
},
{
"$ref": "response/queryRunnerResponse.json"
}
]
},

View File

@ -36,8 +36,8 @@ import React from 'react';
import ReactDOMServer from 'react-dom/server';
import CopyIcon from '../../../../assets/svg/icon-copy.svg';
import {
MARKDOWN_MATCH_ID,
markdownTextAndIdRegex,
MARKDOWN_MATCH_ID,
} from '../../../../constants/regex.constants';
import { MarkdownToHTMLConverter } from '../../../../utils/FeedUtils';
import i18n from '../../../../utils/i18next/LocalUtil';

View File

@ -4458,6 +4458,7 @@ export enum WorkflowStatus {
* This enum defines the type for which this workflow applies to.
*/
export enum WorkflowType {
QueryRunner = "QUERY_RUNNER",
ReverseIngestion = "REVERSE_INGESTION",
TestConnection = "TEST_CONNECTION",
TestSparkEngineConnection = "TEST_SPARK_ENGINE_CONNECTION",

View File

@ -0,0 +1,37 @@
/*
* Copyright 2025 Collate.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Query Runner Request
*/
export interface QueryRunnerRequest {
/**
* Type of the connection to test such as Snowflake, MySQL, Looker, etc.
*/
connectionType?: string;
/**
* Optional value of the ingestion runner name responsible for running the test
*/
ingestionRunner?: string;
/**
* Query to be executed.
*/
query?: string;
/**
* Optional value that identifies this service name.
*/
serviceName?: string;
/**
* Optional value to indicate if the query should be transpiled.
*/
transpile?: boolean;
}

View File

@ -0,0 +1,64 @@
/*
* Copyright 2025 Collate.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Query Runner Response
*/
export interface QueryRunnerResponse {
/**
* Duration of the query execution in seconds
*/
duration?: number;
/**
* Detailed error log in case of failure
*/
errorLog?: string;
/**
* Error message in case of failure
*/
message?: string;
/**
* Results of the query execution
*/
results?: TableData;
/**
* Status of the query execution
*/
status?: StatusType;
}
/**
* Results of the query execution
*
* This schema defines the type to capture rows of sample data for a table.
*/
export interface TableData {
/**
* List of local column names (not fully qualified column names) of the table.
*/
columns?: string[];
/**
* Data for multiple rows of the table.
*/
rows?: Array<any[]>;
}
/**
* Status of the query execution
*
* Enum defining possible Query Runner status
*/
export enum StatusType {
Failed = "Failed",
Running = "Running",
Successful = "Successful",
}

View File

@ -535,6 +535,8 @@ export enum VerifySSL {
*
* Apply a set of operations on a service
*
* Query Runner Request
*
* Test Spark Engine Connection to test user provided configuration is valid or not.
*/
export interface TestServiceConnectionRequest {
@ -576,6 +578,14 @@ export interface TestServiceConnectionRequest {
* Pipeline type
*/
type?: ReverseIngestionType;
/**
* Query to be executed.
*/
query?: string;
/**
* Optional value to indicate if the query should be transpiled.
*/
transpile?: boolean;
/**
* Spark Engine Configuration.
*/
@ -4657,6 +4667,8 @@ export enum ReverseIngestionType {
* connection steps.
*
* Apply a set of operations on a service
*
* Query Runner Response
*/
export interface TestConnectionResult {
/**
@ -4665,6 +4677,8 @@ export interface TestConnectionResult {
lastUpdatedAt?: number;
/**
* Test Connection Result computation status.
*
* Status of the query execution
*/
status?: StatusType;
/**
@ -4677,8 +4691,10 @@ export interface TestConnectionResult {
message?: string;
/**
* List of operations to be performed on the service
*
* Results of the query execution
*/
results?: ReverseIngestionOperationResult[];
results?: ReverseIngestionOperationResult[] | TableData;
/**
* The id of the service to be modified
*/
@ -4688,6 +4704,14 @@ export interface TestConnectionResult {
* connection issues.
*/
success?: boolean;
/**
* Duration of the query execution in seconds
*/
duration?: number;
/**
* Detailed error log in case of failure
*/
errorLog?: string;
}
export interface ReverseIngestionOperationResult {
@ -4706,10 +4730,30 @@ export interface ReverseIngestionOperationResult {
[property: string]: any;
}
/**
* Results of the query execution
*
* This schema defines the type to capture rows of sample data for a table.
*/
export interface TableData {
/**
* List of local column names (not fully qualified column names) of the table.
*/
columns?: string[];
/**
* Data for multiple rows of the table.
*/
rows?: Array<any[]>;
}
/**
* Test Connection Result computation status.
*
* Enum defining possible Test Connection Result status
*
* Status of the query execution
*
* Enum defining possible Query Runner status
*/
export enum StatusType {
Failed = "Failed",
@ -4763,6 +4807,7 @@ export enum WorkflowStatus {
* This enum defines the type for which this workflow applies to.
*/
export enum WorkflowType {
QueryRunner = "QUERY_RUNNER",
ReverseIngestion = "REVERSE_INGESTION",
TestConnection = "TEST_CONNECTION",
TestSparkEngineConnection = "TEST_SPARK_ENGINE_CONNECTION",