Feat: spark UI supporting PR (#22886)

* Feat: spark UI supporting PR

- Added `rootProcessingEngine` to the ingestion workflow data structure.
- Updated `AddIngestion` component to handle the new processing engine field.
- Modified `IngestionWorkflowForm` to include custom fields for the profiler workflow.
- Introduced `INGESTION_RUNNER` entity type in the enum for better categorization.
- Updated multiple language files to include translations for "ingestion-runner".
- Refactored ingestion workflow utility functions to accommodate the new processing engine structure.

* fix: correct class name from 'matric-collapse-footer' to 'matrix-collapse-footer' and adjust styles
This commit is contained in:
Shailesh Parmar 2025-08-12 18:18:53 +05:30
parent f4127757c3
commit 3a950d9924
28 changed files with 95 additions and 112 deletions

View File

@ -113,6 +113,7 @@ const AddIngestion = ({
data?.displayName ?? getIngestionName(serviceData.name, pipelineType),
enableDebugLog: data?.loggerLevel === LogLevels.Debug,
raiseOnError: data?.raiseOnError ?? true,
rootProcessingEngine: data?.processingEngine,
})
);
@ -173,6 +174,7 @@ const AddIngestion = ({
enableDebugLog,
displayName,
raiseOnError,
rootProcessingEngine,
...rest
} = workflowData ?? {};
const ingestionName = trim(name);
@ -208,6 +210,7 @@ const AddIngestion = ({
// clean the data to remove empty fields
config: { ...cleanWorkFlowData(rest) },
},
processingEngine: rootProcessingEngine,
};
if (onAddIngestionSave) {
@ -246,6 +249,7 @@ const AddIngestion = ({
loggerLevel: workflowData?.enableDebugLog
? LogLevels.Debug
: LogLevels.Info,
processingEngine: workflowData?.rootProcessingEngine,
sourceConfig: {
config: {
// clean the data to remove empty fields
@ -255,6 +259,7 @@ const AddIngestion = ({
'enableDebugLog',
'displayName',
'raiseOnError',
'rootProcessingEngine',
]) ?? {}
),
},

View File

@ -31,11 +31,9 @@ import {
IngestionWorkflowData,
IngestionWorkflowFormProps,
} from '../../../../../interface/service.interface';
import ProfilerConfigurationClassBase from '../../../../../pages/ProfilerConfigurationPage/ProfilerConfigurationClassBase';
import { transformErrors } from '../../../../../utils/formUtils';
import {
getSchemaByWorkflowType,
transformProfilerProcessingEngine,
} from '../../../../../utils/IngestionWorkflowUtils';
import { getSchemaByWorkflowType } from '../../../../../utils/IngestionWorkflowUtils';
import BooleanFieldTemplate from '../../../../common/Form/JSONSchema/JSONSchemaTemplate/BooleanFieldTemplate';
import DescriptionFieldTemplate from '../../../../common/Form/JSONSchema/JSONSchemaTemplate/DescriptionFieldTemplate';
import { FieldErrorTemplate } from '../../../../common/Form/JSONSchema/JSONSchemaTemplate/FieldErrorTemplate/FieldErrorTemplate';
@ -125,17 +123,27 @@ const IngestionWorkflowForm: FC<IngestionWorkflowFormProps> = ({
},
};
}
if (pipeLineType === PipelineType.Profiler) {
formData = transformProfilerProcessingEngine(formData);
}
onChange?.(formData);
}
};
const customFields: RegistryFieldsType = {
BooleanField: BooleanFieldTemplate,
ArrayField: WorkflowArrayFieldTemplate,
};
const customFields = useMemo(() => {
const fields: RegistryFieldsType = {
BooleanField: BooleanFieldTemplate,
ArrayField: WorkflowArrayFieldTemplate,
};
const SparkAgentField = ProfilerConfigurationClassBase.getSparkAgentField();
if (
!isUndefined(SparkAgentField) &&
pipeLineType === PipelineType.Profiler
) {
fields['/schemas/rootProcessingEngine'] = SparkAgentField;
}
return fields;
}, [pipeLineType]);
const handleSubmit = (e: IChangeEvent<IngestionWorkflowData>) => {
if (e.formData) {
@ -162,9 +170,6 @@ const IngestionWorkflowForm: FC<IngestionWorkflowFormProps> = ({
},
};
}
if (pipeLineType === PipelineType.Profiler) {
formData = transformProfilerProcessingEngine(formData);
}
onSubmit(formData);
}

View File

@ -315,6 +315,7 @@ export const INGESTION_WORKFLOW_UI_SCHEMA = {
name: { 'ui:widget': 'hidden', 'ui:hideError': true },
processingEngine: { 'ui:widget': 'hidden', 'ui:hideError': true },
'ui:order': [
'rootProcessingEngine',
'name',
'displayName',
...SERVICE_FILTER_PATTERN_FIELDS,

View File

@ -79,6 +79,7 @@ export enum EntityType {
WORKFLOW_DEFINITION = 'workflowDefinition',
SERVICE = 'service',
DATA_CONTRACT = 'dataContract',
INGESTION_RUNNER = 'ingestionRunner',
}
export enum EntityLineageDirection {

View File

@ -52,6 +52,7 @@ import {
StorageConnection,
StorageService,
} from '../generated/entity/services/storageService';
import { EntityReference } from '../generated/entity/type';
import { Paging } from '../generated/type/paging';
export interface IngestionSchedule {
@ -130,6 +131,7 @@ export type IngestionWorkflowData = Pipeline & {
enableDebugLog?: boolean;
displayName?: string;
raiseOnError?: boolean;
rootProcessingEngine?: EntityReference;
};
export interface IngestionWorkflowFormProps {

View File

@ -794,6 +794,7 @@
"ingestion-pipeline": "Ingestion Pipeline",
"ingestion-pipeline-name": "Name der Erfassungspipeline",
"ingestion-plural": "Erfassungen",
"ingestion-runner": "Erfassungsläufer",
"ingestion-workflow-lowercase": "erfassungsworkflow",
"inherited": "inherited",
"inherited-entity": "Inherited {{entity}}",

View File

@ -794,6 +794,7 @@
"ingestion-pipeline": "Ingestion Pipeline",
"ingestion-pipeline-name": "Ingestion Pipeline Name",
"ingestion-plural": "Ingestions",
"ingestion-runner": "Ingestion Runner",
"ingestion-workflow-lowercase": "ingestion workflow",
"inherited": "inherited",
"inherited-entity": "Inherited {{entity}}",

View File

@ -794,6 +794,7 @@
"ingestion-pipeline": "Ingestion Pipeline",
"ingestion-pipeline-name": "Nombre del proceso de ingesta",
"ingestion-plural": "Ingestas",
"ingestion-runner": "Ejecutor de Ingesta",
"ingestion-workflow-lowercase": "flujo de trabajo de ingesta",
"inherited": "inherited",
"inherited-entity": "Inherited {{entity}}",

View File

@ -794,6 +794,7 @@
"ingestion-pipeline": "Pipeline d'Ingestion",
"ingestion-pipeline-name": "Nom de la Pipeline d'Ingestion",
"ingestion-plural": "Ingestions",
"ingestion-runner": "Exécuteur d'Ingestion",
"ingestion-workflow-lowercase": "workflow d'ingestion",
"inherited": "inherited",
"inherited-entity": "{{entity}} hérité",

View File

@ -794,6 +794,7 @@
"ingestion-pipeline": "Pipeline de inxestión",
"ingestion-pipeline-name": "Nome do pipeline de inxestión",
"ingestion-plural": "Inxestións",
"ingestion-runner": "Executador de Inxestión",
"ingestion-workflow-lowercase": "fluxo de traballo de inxestión",
"inherited": "inherited",
"inherited-entity": "Herdado {{entity}}",

View File

@ -794,6 +794,7 @@
"ingestion-pipeline": "Ingestion Pipeline",
"ingestion-pipeline-name": "שם תהליך הטעינה",
"ingestion-plural": "תהליכי טעינה",
"ingestion-runner": "מריץ טעינה",
"ingestion-workflow-lowercase": "זרימת תהליך הטעינה",
"inherited": "inherited",
"inherited-entity": "Inherited {{entity}}",

View File

@ -794,6 +794,7 @@
"ingestion-pipeline": "インジェストパイプライン",
"ingestion-pipeline-name": "インジェストパイプライン名",
"ingestion-plural": "インジェスト",
"ingestion-runner": "インジェスト実行者",
"ingestion-workflow-lowercase": "インジェストワークフロー",
"inherited": "継承済み",
"inherited-entity": "継承された{{entity}}",

View File

@ -794,6 +794,7 @@
"ingestion-pipeline": "수집 파이프라인",
"ingestion-pipeline-name": "수집 파이프라인 이름",
"ingestion-plural": "수집들",
"ingestion-runner": "수집 실행기",
"ingestion-workflow-lowercase": "수집 워크플로우",
"inherited": "inherited",
"inherited-entity": "상속된 {{entity}}",

View File

@ -794,6 +794,7 @@
"ingestion-pipeline": "अंतर्ग्रहण पाइपलाइन",
"ingestion-pipeline-name": "अंतर्ग्रहण पाइपलाइन नाव",
"ingestion-plural": "अंतर्ग्रहण",
"ingestion-runner": "अंतर्ग्रहण चालक",
"ingestion-workflow-lowercase": "अंतर्ग्रहण कार्यप्रवाह",
"inherited": "inherited",
"inherited-entity": "वारसाहक्काने मिळालेले {{entity}}",

View File

@ -794,6 +794,7 @@
"ingestion-pipeline": "Ingestion Pipeline",
"ingestion-pipeline-name": "Naam van het ingestieproces",
"ingestion-plural": "Ingesties",
"ingestion-runner": "Ingestie Uitvoerder",
"ingestion-workflow-lowercase": "ingestie werkstroom",
"inherited": "inherited",
"inherited-entity": "Inherited {{entity}}",

View File

@ -794,6 +794,7 @@
"ingestion-pipeline": "خط لوله ورود",
"ingestion-pipeline-name": "نام خط لوله ورود",
"ingestion-plural": "ورودها",
"ingestion-runner": "اجراکننده ورود",
"ingestion-workflow-lowercase": "جریان کاری ورود",
"inherited": "inherited",
"inherited-entity": "{{entity}} ارث برده",

View File

@ -794,6 +794,7 @@
"ingestion-pipeline": "Pipeline de ingestão",
"ingestion-pipeline-name": "Nome do Pipeline de Ingestão",
"ingestion-plural": "Ingestões",
"ingestion-runner": "Executor de Ingestão",
"ingestion-workflow-lowercase": "fluxo de trabalho de ingestão",
"inherited": "inherited",
"inherited-entity": "Herdado {{entity}}",

View File

@ -794,6 +794,7 @@
"ingestion-pipeline": "Ingestion Pipeline",
"ingestion-pipeline-name": "Nome do Pipeline de Ingestão",
"ingestion-plural": "Ingestões",
"ingestion-runner": "Executor de Ingestão",
"ingestion-workflow-lowercase": "fluxo de trabalho de ingestão",
"inherited": "inherited",
"inherited-entity": "Inherited {{entity}}",

View File

@ -794,6 +794,7 @@
"ingestion-pipeline": "Процесс извлечения",
"ingestion-pipeline-name": "Имя процесса извлечения",
"ingestion-plural": "Извлечения",
"ingestion-runner": "Исполнитель извлечения",
"ingestion-workflow-lowercase": "процесс извлечения",
"inherited": "Унаследованный",
"inherited-entity": "Унаследованный {{entity}}",

View File

@ -794,6 +794,7 @@
"ingestion-pipeline": "ท่อการนำเข้า",
"ingestion-pipeline-name": "ชื่อท่อการนำเข้า",
"ingestion-plural": "การนำเข้าหลายรายการ",
"ingestion-runner": "ตัวรันการนำเข้า",
"ingestion-workflow-lowercase": "กระบวนการทำงานการนำเข้า",
"inherited": "inherited",
"inherited-entity": "เอนทิตีที่สืบทอด {{entity}}",

View File

@ -794,6 +794,7 @@
"ingestion-pipeline": "Alım İş Akışı",
"ingestion-pipeline-name": "Alım İş Akışı Adı",
"ingestion-plural": "Alımlar",
"ingestion-runner": "Alım Çalıştırıcısı",
"ingestion-workflow-lowercase": "alım iş akışı",
"inherited": "inherited",
"inherited-entity": "Miras Alınan {{entity}}",

View File

@ -794,6 +794,7 @@
"ingestion-pipeline": "流水线",
"ingestion-pipeline-name": "提取流水线名称",
"ingestion-plural": "提取",
"ingestion-runner": "提取执行器",
"ingestion-workflow-lowercase": "提取工作流",
"inherited": "inherited",
"inherited-entity": "继承自{{entity}}",

View File

@ -10,10 +10,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { FieldProps } from '@rjsf/utils';
class ProfilerConfigurationClassBase {
public getSparkAgentConfigComponent(): (() => JSX.Element) | undefined {
return undefined;
}
public getSparkAgentField():
| ((props: FieldProps) => JSX.Element)
| undefined {
return undefined;
}
}
const profilerConfigurationClassBase = new ProfilerConfigurationClassBase();

View File

@ -284,7 +284,7 @@ const ProfilerConfigurationPage = () => {
))}
<Col span={24}>
<div className="matric-collapse-footer">
<div className="matrix-collapse-footer">
<Button
className="text-primary p-0"
data-testid="add-fields"

View File

@ -46,6 +46,10 @@
border-bottom-right-radius: @border-rad-sm;
border-bottom-left-radius: @border-rad-sm;
}
.ant-collapse-content-box {
padding: @padding-lg;
}
}
.ant-form-item {
@ -56,13 +60,13 @@
border-radius: 10px;
}
.matric-collapse-footer {
.matrix-collapse-footer {
display: flex;
justify-content: space-between;
align-items: center;
padding: @padding-md @padding-md 0 @padding-md;
margin-right: -@padding-md;
margin-left: -@padding-md;
padding: @padding-mlg @padding-lg 0;
margin-right: -@padding-lg;
margin-left: -@padding-lg;
border-top: 1px solid @grey-200;
}
}

View File

@ -2655,4 +2655,5 @@ export const EntityTypeName: Record<EntityType, string> = {
[EntityType.SERVICE]: t('label.service'),
[EntityType.DATA_CONTRACT]: t('label.data-contract'),
[EntityType.SECURITY_SERVICE]: t('label.security-service'),
[EntityType.INGESTION_RUNNER]: t('label.ingestion-runner'),
};

View File

@ -11,18 +11,17 @@
* limitations under the License.
*/
import { RJSFSchema } from '@rjsf/utils';
import { cloneDeep, isEmpty, isString } from 'lodash';
import { cloneDeep, isEmpty, isUndefined } from 'lodash';
import { ServiceCategory } from '../enums/service.enum';
import {
Pipeline,
PipelineType as WorkflowType,
ProcessingEngineType,
} from '../generated/api/services/ingestionPipelines/createIngestionPipeline';
import { IngestionWorkflowData } from '../interface/service.interface';
import apiServiceMetadataPipeline from '../jsons/ingestionSchemas/apiServiceMetadataPipeline.json';
import dashboardMetadataPipeline from '../jsons/ingestionSchemas/dashboardServiceMetadataPipeline.json';
import databaseAutoClassificationPipeline from '../jsons/ingestionSchemas/databaseServiceAutoClassificationPipeline.json';
import databaseMetadataPipeline from '../jsons/ingestionSchemas/databaseServiceMetadataPipeline.json';
import databaseProfilerPipeline from '../jsons/ingestionSchemas/databaseServiceProfilerPipeline.json';
import databaseLineagePipeline from '../jsons/ingestionSchemas/databaseServiceQueryLineagePipeline.json';
import databaseUsagePipeline from '../jsons/ingestionSchemas/databaseServiceQueryUsagePipeline.json';
import dataInsightPipeline from '../jsons/ingestionSchemas/dataInsightPipeline.json';
@ -34,7 +33,7 @@ import pipelineMetadataPipeline from '../jsons/ingestionSchemas/pipelineServiceM
import searchMetadataPipeline from '../jsons/ingestionSchemas/searchServiceMetadataPipeline.json';
import storageMetadataPipeline from '../jsons/ingestionSchemas/storageServiceMetadataPipeline.json';
import testSuitePipeline from '../jsons/ingestionSchemas/testSuitePipeline.json';
import serviceUtilClassBase from './ServiceUtilClassBase';
import ProfilerConfigurationClassBase from '../pages/ProfilerConfigurationPage/ProfilerConfigurationClassBase';
export const getMetadataSchemaByServiceCategory = (
serviceCategory: ServiceCategory
@ -71,7 +70,7 @@ export const getSchemaByWorkflowType = (
workflowType: WorkflowType,
serviceCategory: ServiceCategory
) => {
const customProperties = {
const customProperties: RJSFSchema = {
displayName: {
description: 'Display Name of the workflow',
type: 'string',
@ -87,6 +86,28 @@ export const getSchemaByWorkflowType = (
default: false,
},
};
if (
workflowType === WorkflowType.Profiler &&
!isUndefined(ProfilerConfigurationClassBase.getSparkAgentField())
) {
customProperties.rootProcessingEngine = {
type: 'object',
$id: '/schemas/rootProcessingEngine',
title: 'Processing Engine',
description: 'Select the processing engine for the profiler workflow',
properties: {
type: {
type: 'string',
default: 'ingestionRunner',
},
id: {
type: 'string',
},
},
required: ['type', 'id'],
};
}
let schema = {};
switch (workflowType) {
@ -97,11 +118,9 @@ export const getSchemaByWorkflowType = (
break;
case WorkflowType.Profiler:
{
const profilerConfig = serviceUtilClassBase.getProfilerConfig();
schema = profilerConfig.schema;
}
schema = {
...databaseProfilerPipeline,
};
break;
case WorkflowType.AutoClassification:
@ -194,67 +213,3 @@ export const cleanWorkFlowData = (workFlowData: Pipeline): Pipeline => {
return cleanedWorkFlowData;
};
/**
* Transforms profiler processing engine data to handle different formats
* @param formData - The form data containing processingEngine
* @returns Transformed form data with properly formatted processingEngine
*/
export const transformProfilerProcessingEngine = (
formData: IngestionWorkflowData
): IngestionWorkflowData => {
if (!formData.processingEngine) {
return formData;
}
// Check if processingEngine is a JSON string (from our custom component)
if (isString(formData.processingEngine)) {
try {
// Parse the JSON string back to object
const engineConfig = JSON.parse(formData.processingEngine);
formData.processingEngine = engineConfig;
} catch (error) {
// If parsing fails, it might be the old format
if (formData.processingEngine?.type === 'Spark') {
formData.processingEngine = {
type: ProcessingEngineType.Spark,
remote: '', // This will be required for Spark
config: {
tempPath: '/tmp/openmetadata',
extraConfig: {},
},
};
} else {
// For Native engine, set the type
formData.processingEngine = {
type: ProcessingEngineType.Native,
};
}
}
} else if (formData.processingEngine.type === ProcessingEngineType.Spark) {
// Ensure Spark engine has required fields with defaults
formData.processingEngine = {
type: ProcessingEngineType.Spark,
remote: formData.processingEngine.remote || '', // This will be required for Spark
config: formData.processingEngine.config || {
tempPath: '/tmp/openmetadata',
extraConfig: {},
},
};
}
// Force override processingEngine based on our hidden input
const hiddenInput = document.querySelector(
'input[name="processingEngine"]'
) as HTMLInputElement;
if (hiddenInput && hiddenInput.value) {
try {
const engineConfig = JSON.parse(hiddenInput.value);
formData.processingEngine = engineConfig;
} catch (_error) {
// Do nothing
}
}
return formData;
};

View File

@ -11,12 +11,8 @@
* limitations under the License.
*/
import {
ObjectFieldTemplatePropertyType,
RJSFSchema,
UiSchema,
} from '@rjsf/utils';
import { cloneDeep, get, toLower } from 'lodash';
import { ObjectFieldTemplatePropertyType } from '@rjsf/utils';
import { get, toLower } from 'lodash';
import { ServiceTypes } from 'Models';
import { ReactComponent as MetricIcon } from '../assets/svg/metric.svg';
import AgentsStatusWidget from '../components/ServiceInsights/AgentsStatusWidget/AgentsStatusWidget';
@ -37,7 +33,6 @@ import {
CASSANDRA,
CLICKHOUSE,
COCKROACH,
COMMON_UI_SCHEMA,
COUCHBASE,
CUSTOM_SEARCH_DEFAULT,
CUSTOM_STORAGE_DEFAULT,
@ -145,7 +140,6 @@ import { Type as SecurityServiceType } from '../generated/entity/services/securi
import { ServiceType } from '../generated/entity/services/serviceType';
import { SearchSourceAlias } from '../interface/search.interface';
import { ConfigData, ServicesType } from '../interface/service.interface';
import profilerPipeline from '../jsons/ingestionSchemas/databaseServiceProfilerPipeline.json';
import { getAPIConfig } from './APIServiceUtils';
import { getDashboardConfig } from './DashboardServiceUtils';
import { getDatabaseConfig } from './DatabaseServiceUtils';
@ -818,17 +812,6 @@ class ServiceUtilClassBase {
])
) as unknown as U;
}
public getProfilerConfig() {
const uiSchema = { ...COMMON_UI_SCHEMA };
const config = cloneDeep({ schema: profilerPipeline, uiSchema }) as {
schema: RJSFSchema;
uiSchema: UiSchema;
};
return config;
}
}
const serviceUtilClassBase = new ServiceUtilClassBase();