feat(spark-plugin): user should be able to pass custom mcp kafka topic (#11767)

Co-authored-by: Neelab Chaudhuri <nchaudhuri@fanatics.com>
This commit is contained in:
ronybony1990 2024-11-05 03:18:15 -07:00 committed by GitHub
parent 83ec73b7d1
commit edb87ff2ea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 35 additions and 8 deletions

View File

@ -82,7 +82,10 @@ public class DatahubEventEmitter extends EventEmitter {
(KafkaDatahubEmitterConfig) datahubConf.getDatahubEmitterConfig();
try {
emitter =
Optional.of(new KafkaEmitter(datahubKafkaEmitterConfig.getKafkaEmitterConfig()));
Optional.of(
new KafkaEmitter(
datahubKafkaEmitterConfig.getKafkaEmitterConfig(),
datahubKafkaEmitterConfig.getMcpTopic()));
} catch (IOException e) {
throw new RuntimeException(e);
}

View File

@ -188,8 +188,12 @@ public class DatahubSparkListener extends SparkListener {
});
kafkaEmitterConfig.producerConfig(kafkaConfig);
}
return Optional.of(new KafkaDatahubEmitterConfig(kafkaEmitterConfig.build()));
if (sparkConf.hasPath(SparkConfigParser.KAFKA_MCP_TOPIC)) {
String mcpTopic = sparkConf.getString(SparkConfigParser.KAFKA_MCP_TOPIC);
return Optional.of(new KafkaDatahubEmitterConfig(kafkaEmitterConfig.build(), mcpTopic));
} else {
return Optional.of(new KafkaDatahubEmitterConfig(kafkaEmitterConfig.build()));
}
case "file":
log.info("File Emitter Configuration: File emitter will be used");
FileEmitterConfig.FileEmitterConfigBuilder fileEmitterConfig = FileEmitterConfig.builder();

View File

@ -1,5 +1,6 @@
package datahub.spark.conf;
import datahub.client.kafka.KafkaEmitter;
import datahub.client.kafka.KafkaEmitterConfig;
import lombok.Getter;
import lombok.Setter;
@ -11,8 +12,15 @@ import lombok.ToString;
public class KafkaDatahubEmitterConfig implements DatahubEmitterConfig {
final String type = "kafka";
KafkaEmitterConfig kafkaEmitterConfig;
String mcpTopic;
public KafkaDatahubEmitterConfig(KafkaEmitterConfig kafkaEmitterConfig) {
this.kafkaEmitterConfig = kafkaEmitterConfig;
this.mcpTopic = KafkaEmitter.DEFAULT_MCP_KAFKA_TOPIC;
}
public KafkaDatahubEmitterConfig(KafkaEmitterConfig kafkaEmitterConfig, String mcpTopic) {
this.kafkaEmitterConfig = kafkaEmitterConfig;
this.mcpTopic = mcpTopic;
}
}

View File

@ -32,6 +32,7 @@ public class SparkConfigParser {
public static final String DISABLE_SSL_VERIFICATION_KEY = "rest.disable_ssl_verification";
public static final String MAX_RETRIES = "rest.max_retries";
public static final String RETRY_INTERVAL_IN_SEC = "rest.retry_interval_in_sec";
public static final String KAFKA_MCP_TOPIC = "kafka.mcp_topic";
public static final String KAFKA_EMITTER_BOOTSTRAP = "kafka.bootstrap";
public static final String KAFKA_EMITTER_SCHEMA_REGISTRY_URL = "kafka.schema_registry_url";
public static final String KAFKA_EMITTER_SCHEMA_REGISTRY_CONFIG = "kafka.schema_registry_config";

View File

@ -9,7 +9,7 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
class AvroSerializer {
public class AvroSerializer {
private final Schema _recordSchema;
private final Schema _genericAspectSchema;

View File

@ -31,14 +31,25 @@ public class KafkaEmitter implements Emitter {
private final Properties kafkaConfigProperties;
private AvroSerializer _avroSerializer;
private static final int ADMIN_CLIENT_TIMEOUT_MS = 5000;
private final String mcpKafkaTopic;
/**
* The default constructor
*
* @param config
* @throws IOException
* @param config KafkaEmitterConfig
* @throws IOException when Avro Serialization fails
*/
public KafkaEmitter(KafkaEmitterConfig config) throws IOException {
this(config, DEFAULT_MCP_KAFKA_TOPIC);
}
/**
* Constructor that takes in KafkaEmitterConfig and mcp Kafka Topic Name
*
* @param config KafkaEmitterConfig
* @throws IOException when Avro Serialization fails
*/
public KafkaEmitter(KafkaEmitterConfig config, String mcpKafkaTopic) throws IOException {
this.config = config;
kafkaConfigProperties = new Properties();
kafkaConfigProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.config.getBootstrap());
@ -54,6 +65,7 @@ public class KafkaEmitter implements Emitter {
producer = new KafkaProducer<>(kafkaConfigProperties);
_avroSerializer = new AvroSerializer();
this.mcpKafkaTopic = mcpKafkaTopic;
}
@Override
@ -73,8 +85,7 @@ public class KafkaEmitter implements Emitter {
throws IOException {
GenericRecord genricRecord = _avroSerializer.serialize(mcp);
ProducerRecord<Object, Object> record =
new ProducerRecord<>(
KafkaEmitter.DEFAULT_MCP_KAFKA_TOPIC, mcp.getEntityUrn().toString(), genricRecord);
new ProducerRecord<>(this.mcpKafkaTopic, mcp.getEntityUrn().toString(), genricRecord);
org.apache.kafka.clients.producer.Callback callback =
new org.apache.kafka.clients.producer.Callback() {