MINOR - Hybrid Ingestion Agent - Link ingestion agents to ingestion pipelines (#19751)

* Hybrid Ingestion Agent - Link ingestion agents to ingestion pipelines

* Store relationship as HAS types

* Java checkstyle

* Trigger CI
This commit is contained in:
Gyowanny P. Queiroz 2025-02-12 10:31:51 -03:00 committed by GitHub
parent ede8108a5a
commit 2559c79edd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 112 additions and 42 deletions

View File

@ -39,6 +39,7 @@ import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.FieldChange;
import org.openmetadata.schema.type.Include;
import org.openmetadata.schema.type.Relationship;
import org.openmetadata.sdk.PipelineServiceClientInterface;
import org.openmetadata.service.Entity;
import org.openmetadata.service.OpenMetadataApplicationConfig;
@ -53,6 +54,7 @@ import org.openmetadata.service.util.RestUtil;
import org.openmetadata.service.util.ResultList;
public class IngestionPipelineRepository extends EntityRepository<IngestionPipeline> {
private static final String UPDATE_FIELDS =
"sourceConfig,airflowConfig,loggerLevel,enabled,deployed";
private static final String PATCH_FIELDS =
@ -151,6 +153,12 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
@Override
public void storeRelationships(IngestionPipeline ingestionPipeline) {
addServiceRelationship(ingestionPipeline, ingestionPipeline.getService());
addRelationship(
ingestionPipeline.getService().getId(),
ingestionPipeline.getId(),
ingestionPipeline.getService().getType(),
entityType,
Relationship.HAS);
}
@Override
@ -291,8 +299,11 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
PipelineStatus.class);
}
/** Handles entity updated from PUT and POST operation. */
/**
* Handles entity updated from PUT and POST operation.
*/
public class IngestionPipelineUpdater extends EntityUpdater {
public IngestionPipelineUpdater(
IngestionPipeline original, IngestionPipeline updated, Operation operation) {
super(buildIngestionPipelineDecrypted(original), updated, operation);

View File

@ -26,6 +26,7 @@ public class IngestionPipelineMapper
.withOpenMetadataServerConnection(openMetadataServerConnection)
.withSourceConfig(create.getSourceConfig())
.withLoggerLevel(create.getLoggerLevel())
.withService(create.getService());
.withService(create.getService())
.withIngestionAgent(create.getIngestionAgent());
}
}

View File

@ -44,6 +44,10 @@
"domain" : {
"description": "Fully qualified name of the domain the Table belongs to.",
"type": "string"
},
"ingestionAgent" : {
"description": "The ingestion agent responsible for executing the ingestion pipeline.",
"$ref": "../../../type/entityReference.json"
}
},
"required": [

View File

@ -221,6 +221,10 @@
"applicationType": {
"description": "Type of the application when pipelineType is 'application'.",
"type": "string"
},
"ingestionAgent" : {
"description": "The ingestion agent responsible for executing the ingestion pipeline.",
"$ref": "../../../type/entityReference.json"
}
},
"required": [

View File

@ -1,5 +1,5 @@
/*
* Copyright 2024 Collate.
* Copyright 2025 Collate.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@ -10,9 +10,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
/**
* Ingestion Pipeline Config is used to set up an Airflow DAG.
*/
export interface CreateIngestionPipeline {
@ -29,6 +27,10 @@ export interface CreateIngestionPipeline {
* Fully qualified name of the domain the Table belongs to.
*/
domain?: string;
/**
* The ingestion agent responsible for executing the ingestion pipeline.
*/
ingestionAgent?: EntityReference;
/**
* Set the logging level for the workflow.
*/
@ -112,26 +114,16 @@ export interface AirflowConfig {
}
/**
* Set the logging level for the workflow.
* The ingestion agent responsible for executing the ingestion pipeline.
*
* Supported logging levels
*/
export enum LogLevels {
Debug = "DEBUG",
Error = "ERROR",
Info = "INFO",
Warn = "WARN",
}
/**
* Owner of this Ingestion Pipeline.
*
* This schema defines the EntityReferenceList type used for referencing an entity.
* This schema defines the EntityReference type used for referencing an entity.
* EntityReference is used for capturing relationships from one entity to another. For
* example, a table has an attribute called database of type EntityReference that captures
* the relationship of a table `belongs to a` database.
*
* This schema defines the EntityReference type used for referencing an entity.
* Owner of this Ingestion Pipeline.
*
* This schema defines the EntityReferenceList type used for referencing an entity.
* EntityReference is used for capturing relationships from one entity to another. For
* example, a table has an attribute called database of type EntityReference that captures
* the relationship of a table `belongs to a` database.
@ -183,6 +175,18 @@ export interface EntityReference {
type: string;
}
/**
* Set the logging level for the workflow.
*
* Supported logging levels
*/
export enum LogLevels {
Debug = "DEBUG",
Error = "ERROR",
Info = "INFO",
Warn = "WARN",
}
/**
* Type of Pipeline - metadata, usage
*/
@ -360,6 +364,10 @@ export interface Pipeline {
* Set 'Cross Database Service Names' to process lineage with the database.
*/
crossDatabaseServiceNames?: string[];
/**
* Handle Lineage for Snowflake Temporary and Transient Tables.
*/
enableTempTableLineage?: boolean;
/**
* Set the 'Override View Lineage' toggle to control whether to override the existing view
* lineage.
@ -2346,10 +2354,11 @@ export interface ConfigClass {
*
* Http/Https connection scheme
*/
scheme?: string;
supportsDatabase?: boolean;
supportsDataDiff?: boolean;
supportsDBTExtraction?: boolean;
scheme?: string;
supportsDatabase?: boolean;
supportsDataDiff?: boolean;
supportsDBTExtraction?: boolean;
supportsIncrementalMetadataExtraction?: boolean;
/**
* Supports Lineage Extraction.
*/
@ -2716,6 +2725,11 @@ export interface ConfigClass {
* Confluent Redpanda Consumer Config
*/
consumerConfig?: { [key: string]: any };
/**
* Consumer Config SSL Config. Configuration for enabling SSL for the Consumer Config
* connection.
*/
consumerConfigSSL?: ConsumerConfigSSLClass;
/**
* sasl.mechanism Consumer Config property
*/
@ -2739,7 +2753,7 @@ export interface ConfigClass {
* Schema Registry SSL Config. Configuration for enabling SSL for the Schema Registry
* connection.
*/
schemaRegistrySSL?: SchemaRegistrySSLClass;
schemaRegistrySSL?: ConsumerConfigSSLClass;
/**
* Schema Registry Topic Suffix Name. The suffix to be appended to the topic name to get
* topic schema from registry.
@ -3555,7 +3569,7 @@ export interface SSLCertificatesByPath {
* Qlik Authentication Certificate File Path
*/
export interface QlikCertificatesBy {
sslConfig?: SchemaRegistrySSLClass;
sslConfig?: ConsumerConfigSSLClass;
/**
* Client Certificate
*/
@ -3576,6 +3590,9 @@ export interface QlikCertificatesBy {
*
* SSL Configuration details.
*
* Consumer Config SSL Config. Configuration for enabling SSL for the Consumer Config
* connection.
*
* Schema Registry SSL Config. Configuration for enabling SSL for the Schema Registry
* connection.
*
@ -3583,7 +3600,7 @@ export interface QlikCertificatesBy {
*
* OpenMetadata Client configured to validate SSL certificates.
*/
export interface SchemaRegistrySSLClass {
export interface ConsumerConfigSSLClass {
/**
* The CA certificate used for SSL validation.
*/
@ -3983,6 +4000,9 @@ export enum ConnectionScheme {
*
* SSL Configuration details.
*
* Consumer Config SSL Config. Configuration for enabling SSL for the Consumer Config
* connection.
*
* Schema Registry SSL Config. Configuration for enabling SSL for the Schema Registry
* connection.
*
@ -4212,7 +4232,7 @@ export interface HiveMetastoreConnectionDetails {
/**
* SSL Configuration details.
*/
sslConfig?: SchemaRegistrySSLClass;
sslConfig?: ConsumerConfigSSLClass;
sslMode?: SSLMode;
supportsDatabase?: boolean;
supportsDataDiff?: boolean;
@ -4478,6 +4498,9 @@ export enum KafkaSecurityProtocol {
*
* SSL Configuration details.
*
* Consumer Config SSL Config. Configuration for enabling SSL for the Consumer Config
* connection.
*
* Schema Registry SSL Config. Configuration for enabling SSL for the Schema Registry
* connection.
*

View File

@ -1,5 +1,5 @@
/*
* Copyright 2024 Collate.
* Copyright 2025 Collate.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@ -10,9 +10,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
/**
* Ingestion Pipeline Config is used to set up a DAG and deploy. This entity is used to
* setup metadata/quality pipelines on Apache Airflow.
*/
@ -63,6 +61,10 @@ export interface IngestionPipeline {
* Unique identifier that identifies this pipeline.
*/
id?: string;
/**
* The ingestion agent responsible for executing the ingestion pipeline.
*/
ingestionAgent?: EntityReference;
/**
* Set the logging level for the workflow.
*/
@ -215,6 +217,8 @@ export interface FieldChange {
* example, a table has an attribute called database of type EntityReference that captures
* the relationship of a table `belongs to a` database.
*
* The ingestion agent responsible for executing the ingestion pipeline.
*
* Owners of this Pipeline.
*
* This schema defines the EntityReferenceList type used for referencing an entity.
@ -386,7 +390,7 @@ export interface OpenMetadataConnection {
/**
* SSL Configuration for OpenMetadata Server
*/
sslConfig?: SchemaRegistrySSLClass;
sslConfig?: ConsumerConfigSSLClass;
/**
* If set to true, when creating a service during the ingestion we will store its Service
* Connection. Otherwise, the ingestion will create a bare service without connection
@ -490,12 +494,15 @@ export interface OpenMetadataJWTClientConfig {
*
* SSL Configuration details.
*
* Consumer Config SSL Config. Configuration for enabling SSL for the Consumer Config
* connection.
*
* Schema Registry SSL Config. Configuration for enabling SSL for the Schema Registry
* connection.
*
* OpenMetadata Client configured to validate SSL certificates.
*/
export interface SchemaRegistrySSLClass {
export interface ConsumerConfigSSLClass {
/**
* The CA certificate used for SSL validation.
*/
@ -536,6 +543,10 @@ export enum VerifySSL {
* This defines runtime status of Pipeline.
*/
export interface PipelineStatus {
/**
* Pipeline configuration for this particular execution.
*/
config?: { [key: string]: any };
/**
* endDate of the pipeline run for this particular execution.
*/
@ -816,6 +827,10 @@ export interface Pipeline {
* Set 'Cross Database Service Names' to process lineage with the database.
*/
crossDatabaseServiceNames?: string[];
/**
* Handle Lineage for Snowflake Temporary and Transient Tables.
*/
enableTempTableLineage?: boolean;
/**
* Set the 'Override View Lineage' toggle to control whether to override the existing view
* lineage.
@ -2802,10 +2817,11 @@ export interface ConfigClass {
*
* Http/Https connection scheme
*/
scheme?: string;
supportsDatabase?: boolean;
supportsDataDiff?: boolean;
supportsDBTExtraction?: boolean;
scheme?: string;
supportsDatabase?: boolean;
supportsDataDiff?: boolean;
supportsDBTExtraction?: boolean;
supportsIncrementalMetadataExtraction?: boolean;
/**
* Supports Lineage Extraction.
*/
@ -3172,6 +3188,11 @@ export interface ConfigClass {
* Confluent Redpanda Consumer Config
*/
consumerConfig?: { [key: string]: any };
/**
* Consumer Config SSL Config. Configuration for enabling SSL for the Consumer Config
* connection.
*/
consumerConfigSSL?: ConsumerConfigSSLClass;
/**
* sasl.mechanism Consumer Config property
*/
@ -3195,7 +3216,7 @@ export interface ConfigClass {
* Schema Registry SSL Config. Configuration for enabling SSL for the Schema Registry
* connection.
*/
schemaRegistrySSL?: SchemaRegistrySSLClass;
schemaRegistrySSL?: ConsumerConfigSSLClass;
/**
* Schema Registry Topic Suffix Name. The suffix to be appended to the topic name to get
* topic schema from registry.
@ -3992,7 +4013,7 @@ export interface SSLCertificatesByPath {
* Qlik Authentication Certificate File Path
*/
export interface QlikCertificatesBy {
sslConfig?: SchemaRegistrySSLClass;
sslConfig?: ConsumerConfigSSLClass;
/**
* Client Certificate
*/
@ -4395,6 +4416,9 @@ export enum ConnectionScheme {
*
* SSL Configuration details.
*
* Consumer Config SSL Config. Configuration for enabling SSL for the Consumer Config
* connection.
*
* Schema Registry SSL Config. Configuration for enabling SSL for the Schema Registry
* connection.
*/
@ -4611,7 +4635,7 @@ export interface HiveMetastoreConnectionDetails {
/**
* SSL Configuration details.
*/
sslConfig?: SchemaRegistrySSLClass;
sslConfig?: ConsumerConfigSSLClass;
sslMode?: SSLMode;
supportsDatabase?: boolean;
supportsDataDiff?: boolean;
@ -4837,6 +4861,9 @@ export enum KafkaSecurityProtocol {
*
* SSL Configuration details.
*
* Consumer Config SSL Config. Configuration for enabling SSL for the Consumer Config
* connection.
*
* Schema Registry SSL Config. Configuration for enabling SSL for the Schema Registry
* connection.
*