fix(internal schema): changes to make internal schema registry more compatible with other schema registries (#9636)

This commit is contained in:
Davi Arnaut 2024-01-16 10:47:58 -08:00 committed by GitHub
parent 261fcb06bf
commit bcaa0caba2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1,6 +1,7 @@
package io.datahubproject.openapi.schema.registry;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet;
import com.linkedin.gms.factory.kafka.schemaregistry.InternalSchemaRegistryFactory;
import com.linkedin.metadata.registry.SchemaRegistryService;
import io.datahubproject.schema_registry.openapi.generated.CompatibilityCheckResponse;
@ -23,9 +24,11 @@ import io.swagger.api.SubjectsApi;
import io.swagger.api.V1Api;
import io.swagger.v3.oas.annotations.Operation;
import jakarta.servlet.http.HttpServletRequest;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
@ -57,6 +60,8 @@ public class SchemaRegistryController
private final HttpServletRequest request;
private static final Set<String> SCHEMA_VERSIONS = ImmutableSet.of("1", "latest");
@Qualifier("schemaRegistryService")
private final SchemaRegistryService _schemaRegistryService;
@ -109,8 +114,30 @@ public class SchemaRegistryController
@Override
public ResponseEntity<Schema> getSchemaByVersion(
String subject, String version, Boolean deleted) {
log.error("[SubjectsApi] getSchemaByVersion method not implemented");
return SubjectsApi.super.getSchemaByVersion(subject, version, deleted);
final String topicName = subject.replaceFirst("-value", "");
if (!SCHEMA_VERSIONS.contains(version)) {
log.error(
"[SubjectsApi] getSchemaByVersion subject {} version {} not found.", subject, version);
return new ResponseEntity<>(HttpStatus.NOT_FOUND);
}
return _schemaRegistryService
.getSchemaForTopic(topicName)
.map(
schema -> {
Schema result = new Schema();
result.setSubject(subject);
result.setVersion(1);
result.setId(_schemaRegistryService.getSchemaIdForTopic(topicName).get());
result.setSchema(schema.toString());
return new ResponseEntity<>(result, HttpStatus.OK);
})
.orElseGet(
() -> {
log.error("[SubjectsApi] getSchemaByVersion couldn't find topic {}.", topicName);
return new ResponseEntity<>(HttpStatus.NOT_FOUND);
});
}
@Override
@ -129,8 +156,18 @@ public class SchemaRegistryController
@Override
public ResponseEntity<List<Integer>> listVersions(
String subject, Boolean deleted, Boolean deletedOnly) {
log.error("[SubjectsApi] listVersions method not implemented");
return SubjectsApi.super.listVersions(subject, deleted, deletedOnly);
final String topicName = subject.replaceFirst("-value", "");
return _schemaRegistryService
.getSchemaForTopic(topicName)
.map(
schema -> {
return new ResponseEntity<>(Arrays.asList(1), HttpStatus.OK);
})
.orElseGet(
() -> {
log.error("[SubjectsApi] listVersions couldn't find topic with name {}.", topicName);
return new ResponseEntity<>(HttpStatus.NOT_FOUND);
});
}
@Override
@ -274,8 +311,9 @@ public class SchemaRegistryController
.map(
schema -> {
SchemaString result = new SchemaString();
result.setMaxId(id);
result.setSchemaType("AVRO");
if (fetchMaxId) {
result.setMaxId(id);
}
result.setSchema(schema.toString());
return new ResponseEntity<>(result, HttpStatus.OK);
})