fix(schema-registry): fix internal schema reg with custom duhe topic … (#8371)

This commit is contained in:
david-leifker 2023-07-06 14:15:58 -05:00 committed by GitHub
parent c57037eb86
commit 217151ea55
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 44 additions and 8 deletions

View File

@ -19,7 +19,8 @@ import static org.testng.AssertJUnit.assertNotNull;
@ActiveProfiles("test")
@SpringBootTest(classes = {UpgradeCliApplication.class, UpgradeCliApplicationTestConfiguration.class},
properties = {
"kafka.schemaRegistry.type=INTERNAL"
"kafka.schemaRegistry.type=INTERNAL",
"DATAHUB_UPGRADE_HISTORY_TOPIC_NAME=test_due_topic"
})
public class DatahubUpgradeNoSchemaRegistryTest extends AbstractTestNGSpringContextTests {

View File

@ -6,15 +6,24 @@ import com.linkedin.metadata.boot.kafka.MockDUHESerializer;
import com.linkedin.metadata.config.kafka.KafkaConfiguration;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
import static com.linkedin.metadata.boot.kafka.DataHubUpgradeKafkaListener.TOPIC_NAME;
@Slf4j
@Configuration
public class DUHESchemaRegistryFactory {
public static final String DUHE_SCHEMA_REGISTRY_TOPIC_KEY = "duheTopicName";
@Value(TOPIC_NAME)
private String duheTopicName;
/**
* Configure Kafka Producer/Consumer processes with a custom schema registry.
*/
@ -25,6 +34,7 @@ public class DUHESchemaRegistryFactory {
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaConfiguration
.getSchemaRegistry().getUrl());
props.put(DUHE_SCHEMA_REGISTRY_TOPIC_KEY, duheTopicName);
log.info("DataHub System Update Registry");
return new SchemaRegistryConfig(MockDUHESerializer.class, MockDUHEDeserializer.class, props);

View File

@ -38,7 +38,7 @@ public class DataHubUpgradeKafkaListener implements ConsumerSeekAware, Bootstrap
private static final String CONSUMER_GROUP = "${DATAHUB_UPGRADE_HISTORY_KAFKA_CONSUMER_GROUP_ID:generic-duhe-consumer-job-client}";
private static final String SUFFIX = "temp";
private static final String TOPIC_NAME = "${DATAHUB_UPGRADE_HISTORY_TOPIC_NAME:" + Topics.DATAHUB_UPGRADE_HISTORY_TOPIC_NAME + "}";
public static final String TOPIC_NAME = "${DATAHUB_UPGRADE_HISTORY_TOPIC_NAME:" + Topics.DATAHUB_UPGRADE_HISTORY_TOPIC_NAME + "}";
private final DefaultKafkaConsumerFactory<String, GenericRecord> _defaultKafkaConsumerFactory;

View File

@ -12,7 +12,8 @@ import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.Map;
import static com.linkedin.metadata.boot.kafka.MockDUHESerializer.DATAHUB_UPGRADE_HISTORY_EVENT_SUBJECT;
import static com.linkedin.gms.factory.kafka.schemaregistry.DUHESchemaRegistryFactory.DUHE_SCHEMA_REGISTRY_TOPIC_KEY;
import static com.linkedin.metadata.boot.kafka.MockDUHESerializer.topicToSubjectName;
/**
* Used for early bootstrap to avoid contact with not yet existing schema registry
@ -20,11 +21,14 @@ import static com.linkedin.metadata.boot.kafka.MockDUHESerializer.DATAHUB_UPGRAD
@Slf4j
public class MockDUHEDeserializer extends KafkaAvroDeserializer {
private String topicName;
public MockDUHEDeserializer() {
this.schemaRegistry = buildMockSchemaRegistryClient();
}
public MockDUHEDeserializer(SchemaRegistryClient client) {
super(client);
this.schemaRegistry = buildMockSchemaRegistryClient();
}
@ -33,10 +37,16 @@ public class MockDUHEDeserializer extends KafkaAvroDeserializer {
this.schemaRegistry = buildMockSchemaRegistryClient();
}
private static MockSchemaRegistryClient buildMockSchemaRegistryClient() {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
super.configure(configs, isKey);
topicName = configs.get(DUHE_SCHEMA_REGISTRY_TOPIC_KEY).toString();
}
private MockSchemaRegistryClient buildMockSchemaRegistryClient() {
MockSchemaRegistryClient schemaRegistry = new MockSchemaRegistryClient2();
try {
schemaRegistry.register(DATAHUB_UPGRADE_HISTORY_EVENT_SUBJECT,
schemaRegistry.register(topicToSubjectName(topicName),
new AvroSchema(EventUtils.ORIGINAL_DUHE_AVRO_SCHEMA));
return schemaRegistry;
} catch (IOException | RestClientException e) {

View File

@ -11,19 +11,24 @@ import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.Map;
import static com.linkedin.gms.factory.kafka.schemaregistry.DUHESchemaRegistryFactory.DUHE_SCHEMA_REGISTRY_TOPIC_KEY;
/**
* Used for early bootstrap to avoid contact with not yet existing schema registry
*/
@Slf4j
public class MockDUHESerializer extends KafkaAvroSerializer {
static final String DATAHUB_UPGRADE_HISTORY_EVENT_SUBJECT = "DataHubUpgradeHistory_v1-value";
private static final String DATAHUB_UPGRADE_HISTORY_EVENT_SUBJECT_SUFFIX = "-value";
private String topicName;
public MockDUHESerializer() {
this.schemaRegistry = buildMockSchemaRegistryClient();
}
public MockDUHESerializer(SchemaRegistryClient client) {
super(client);
this.schemaRegistry = buildMockSchemaRegistryClient();
}
@ -32,14 +37,24 @@ public class MockDUHESerializer extends KafkaAvroSerializer {
this.schemaRegistry = buildMockSchemaRegistryClient();
}
private static MockSchemaRegistryClient buildMockSchemaRegistryClient() {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
super.configure(configs, isKey);
topicName = configs.get(DUHE_SCHEMA_REGISTRY_TOPIC_KEY).toString();
}
private MockSchemaRegistryClient buildMockSchemaRegistryClient() {
MockSchemaRegistryClient schemaRegistry = new MockSchemaRegistryClient();
try {
schemaRegistry.register(DATAHUB_UPGRADE_HISTORY_EVENT_SUBJECT,
schemaRegistry.register(topicToSubjectName(topicName),
new AvroSchema(EventUtils.ORIGINAL_DUHE_AVRO_SCHEMA));
return schemaRegistry;
} catch (IOException | RestClientException e) {
throw new RuntimeException(e);
}
}
public static String topicToSubjectName(String topicName) {
return topicName + DATAHUB_UPGRADE_HISTORY_EVENT_SUBJECT_SUFFIX;
}
}