From bcaa0caba2d017002b10bb48be1b8a110755e608 Mon Sep 17 00:00:00 2001 From: Davi Arnaut Date: Tue, 16 Jan 2024 10:47:58 -0800 Subject: [PATCH] fix(internal schema): changes to make internal schema registry more compatible with other schema registries (#9636) --- .../registry/SchemaRegistryController.java | 50 ++++++++++++++++--- 1 file changed, 44 insertions(+), 6 deletions(-) diff --git a/metadata-service/schema-registry-servlet/src/main/java/io/datahubproject/openapi/schema/registry/SchemaRegistryController.java b/metadata-service/schema-registry-servlet/src/main/java/io/datahubproject/openapi/schema/registry/SchemaRegistryController.java index 8b3eb40dc3..f4228e0d63 100644 --- a/metadata-service/schema-registry-servlet/src/main/java/io/datahubproject/openapi/schema/registry/SchemaRegistryController.java +++ b/metadata-service/schema-registry-servlet/src/main/java/io/datahubproject/openapi/schema/registry/SchemaRegistryController.java @@ -1,6 +1,7 @@ package io.datahubproject.openapi.schema.registry; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableSet; import com.linkedin.gms.factory.kafka.schemaregistry.InternalSchemaRegistryFactory; import com.linkedin.metadata.registry.SchemaRegistryService; import io.datahubproject.schema_registry.openapi.generated.CompatibilityCheckResponse; @@ -23,9 +24,11 @@ import io.swagger.api.SubjectsApi; import io.swagger.api.V1Api; import io.swagger.v3.oas.annotations.Operation; import jakarta.servlet.http.HttpServletRequest; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Qualifier; @@ -57,6 +60,8 @@ public class SchemaRegistryController private final HttpServletRequest request; + private static final Set SCHEMA_VERSIONS = ImmutableSet.of("1", "latest"); + @Qualifier("schemaRegistryService") private final SchemaRegistryService _schemaRegistryService; @@ -109,8 +114,30 @@ public class SchemaRegistryController @Override public ResponseEntity getSchemaByVersion( String subject, String version, Boolean deleted) { - log.error("[SubjectsApi] getSchemaByVersion method not implemented"); - return SubjectsApi.super.getSchemaByVersion(subject, version, deleted); + final String topicName = subject.replaceFirst("-value", ""); + + if (!SCHEMA_VERSIONS.contains(version)) { + log.error( + "[SubjectsApi] getSchemaByVersion subject {} version {} not found.", subject, version); + return new ResponseEntity<>(HttpStatus.NOT_FOUND); + } + + return _schemaRegistryService + .getSchemaForTopic(topicName) + .map( + schema -> { + Schema result = new Schema(); + result.setSubject(subject); + result.setVersion(1); + result.setId(_schemaRegistryService.getSchemaIdForTopic(topicName).get()); + result.setSchema(schema.toString()); + return new ResponseEntity<>(result, HttpStatus.OK); + }) + .orElseGet( + () -> { + log.error("[SubjectsApi] getSchemaByVersion couldn't find topic {}.", topicName); + return new ResponseEntity<>(HttpStatus.NOT_FOUND); + }); } @Override @@ -129,8 +156,18 @@ public class SchemaRegistryController @Override public ResponseEntity> listVersions( String subject, Boolean deleted, Boolean deletedOnly) { - log.error("[SubjectsApi] listVersions method not implemented"); - return SubjectsApi.super.listVersions(subject, deleted, deletedOnly); + final String topicName = subject.replaceFirst("-value", ""); + return _schemaRegistryService + .getSchemaForTopic(topicName) + .map( + schema -> { + return new ResponseEntity<>(Arrays.asList(1), HttpStatus.OK); + }) + .orElseGet( + () -> { + log.error("[SubjectsApi] listVersions couldn't find topic with name {}.", topicName); + return new ResponseEntity<>(HttpStatus.NOT_FOUND); + }); } @Override @@ -274,8 +311,9 @@ public class SchemaRegistryController .map( schema -> { SchemaString result = new SchemaString(); - result.setMaxId(id); - result.setSchemaType("AVRO"); + if (fetchMaxId) { + result.setMaxId(id); + } result.setSchema(schema.toString()); return new ResponseEntity<>(result, HttpStatus.OK); })