fix(kafka-topic-convention): Fix DAOs that do not refer to TopicConvention (#2387)

This commit is contained in:
Dexter Lee 2021-04-13 07:58:31 -07:00 committed by GitHub
parent 1fc532d831
commit fa015c5aaa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 81 additions and 78 deletions

View File

@ -1,8 +1,12 @@
# This "container" is a workaround to pre-create topics
FROM confluentinc/cp-kafka:5.4.0
ENV METADATA_AUDIT_EVENT_NAME = "MetadataAuditEvent_v4"
ENV METADATA_CHANGE_EVENT_NAME = "MetadataChangeEvent_v4"
ENV FAILED_METADATA_CHANGE_EVENT_NAME = "FailedMetadataChangeEvent_v4"
CMD echo Waiting for Kafka to be ready... && \
cub kafka-ready -b $KAFKA_BOOTSTRAP_SERVER 1 60 && \
kafka-topics --create --if-not-exists --zookeeper $KAFKA_ZOOKEEPER_CONNECT --partitions 1 --replication-factor 1 --topic MetadataAuditEvent_v4 && \
kafka-topics --create --if-not-exists --zookeeper $KAFKA_ZOOKEEPER_CONNECT --partitions 1 --replication-factor 1 --topic MetadataChangeEvent_v4 && \
kafka-topics --create --if-not-exists --zookeeper $KAFKA_ZOOKEEPER_CONNECT --partitions 1 --replication-factor 1 --topic FailedMetadataChangeEvent_v4
kafka-topics --create --if-not-exists --zookeeper $KAFKA_ZOOKEEPER_CONNECT --partitions 1 --replication-factor 1 --topic $METADATA_AUDIT_EVENT_NAME && \
kafka-topics --create --if-not-exists --zookeeper $KAFKA_ZOOKEEPER_CONNECT --partitions 1 --replication-factor 1 --topic $METADATA_CHANGE_EVENT_NAME && \
kafka-topics --create --if-not-exists --zookeeper $KAFKA_ZOOKEEPER_CONNECT --partitions 1 --replication-factor 1 --topic $FAILED_METADATA_CHANGE_EVENT_NAME

View File

@ -1,10 +1,12 @@
package com.linkedin.gms.factory.dashboard;
import com.linkedin.common.urn.ChartUrn;
import com.linkedin.gms.factory.common.TopicConventionFactory;
import com.linkedin.metadata.aspect.ChartAspect;
import com.linkedin.metadata.dao.EbeanLocalDAO;
import com.linkedin.metadata.dao.producer.KafkaMetadataEventProducer;
import com.linkedin.metadata.snapshot.ChartSnapshot;
import com.linkedin.mxe.TopicConvention;
import io.ebean.config.ServerConfig;
import javax.annotation.Nonnull;
import org.apache.kafka.clients.producer.Producer;
@ -21,12 +23,12 @@ public class ChartDaoFactory {
ApplicationContext applicationContext;
@Bean(name = "chartDAO")
@DependsOn({"gmsEbeanServiceConfig", "kafkaEventProducer"})
@DependsOn({"gmsEbeanServiceConfig", "kafkaEventProducer", TopicConventionFactory.TOPIC_CONVENTION_BEAN})
@Nonnull
protected EbeanLocalDAO createInstance() {
KafkaMetadataEventProducer<ChartSnapshot, ChartAspect, ChartUrn> producer =
new KafkaMetadataEventProducer(ChartSnapshot.class, ChartAspect.class,
applicationContext.getBean(Producer.class));
applicationContext.getBean(Producer.class), applicationContext.getBean(TopicConvention.class));
return new EbeanLocalDAO<>(ChartAspect.class, producer, applicationContext.getBean(ServerConfig.class),
ChartUrn.class);
}

View File

@ -1,10 +1,12 @@
package com.linkedin.gms.factory.dashboard;
import com.linkedin.common.urn.DashboardUrn;
import com.linkedin.gms.factory.common.TopicConventionFactory;
import com.linkedin.metadata.aspect.DashboardAspect;
import com.linkedin.metadata.dao.EbeanLocalDAO;
import com.linkedin.metadata.dao.producer.KafkaMetadataEventProducer;
import com.linkedin.metadata.snapshot.DashboardSnapshot;
import com.linkedin.mxe.TopicConvention;
import io.ebean.config.ServerConfig;
import javax.annotation.Nonnull;
import org.apache.kafka.clients.producer.Producer;
@ -21,12 +23,12 @@ public class DashboardDaoFactory {
ApplicationContext applicationContext;
@Bean(name = "dashboardDAO")
@DependsOn({"gmsEbeanServiceConfig", "kafkaEventProducer"})
@DependsOn({"gmsEbeanServiceConfig", "kafkaEventProducer", TopicConventionFactory.TOPIC_CONVENTION_BEAN})
@Nonnull
protected EbeanLocalDAO createInstance() {
KafkaMetadataEventProducer<DashboardSnapshot, DashboardAspect, DashboardUrn> producer =
new KafkaMetadataEventProducer(DashboardSnapshot.class, DashboardAspect.class,
applicationContext.getBean(Producer.class));
applicationContext.getBean(Producer.class), applicationContext.getBean(TopicConvention.class));
return new EbeanLocalDAO<>(DashboardAspect.class, producer, applicationContext.getBean(ServerConfig.class),
DashboardUrn.class);
}

View File

@ -1,5 +1,7 @@
package com.linkedin.gms.factory.datajob;
import com.linkedin.gms.factory.common.TopicConventionFactory;
import com.linkedin.mxe.TopicConvention;
import javax.annotation.Nonnull;
import org.apache.kafka.clients.producer.Producer;
@ -17,18 +19,20 @@ import com.linkedin.metadata.snapshot.DataFlowSnapshot;
import io.ebean.config.ServerConfig;
@Configuration
public class DataFlowDAOFactory {
@Autowired
private ApplicationContext applicationContext;
@Autowired
private ApplicationContext applicationContext;
@Bean(name = "dataFlowDAO")
@DependsOn({"gmsEbeanServiceConfig", "kafkaEventProducer"})
@Nonnull
protected EbeanLocalDAO<DataFlowAspect, DataFlowUrn> createInstance() {
KafkaMetadataEventProducer<DataFlowSnapshot, DataFlowAspect, DataFlowUrn> producer =
new KafkaMetadataEventProducer<>(DataFlowSnapshot.class, DataFlowAspect.class,
applicationContext.getBean(Producer.class));
return new EbeanLocalDAO<>(DataFlowAspect.class, producer, applicationContext.getBean(ServerConfig.class), DataFlowUrn.class);
}
@Bean(name = "dataFlowDAO")
@DependsOn({"gmsEbeanServiceConfig", "kafkaEventProducer", TopicConventionFactory.TOPIC_CONVENTION_BEAN})
@Nonnull
protected EbeanLocalDAO<DataFlowAspect, DataFlowUrn> createInstance() {
KafkaMetadataEventProducer<DataFlowSnapshot, DataFlowAspect, DataFlowUrn> producer =
new KafkaMetadataEventProducer<>(DataFlowSnapshot.class, DataFlowAspect.class,
applicationContext.getBean(Producer.class), applicationContext.getBean(TopicConvention.class));
return new EbeanLocalDAO<>(DataFlowAspect.class, producer, applicationContext.getBean(ServerConfig.class),
DataFlowUrn.class);
}
}

View File

@ -1,5 +1,7 @@
package com.linkedin.gms.factory.datajob;
import com.linkedin.gms.factory.common.TopicConventionFactory;
import com.linkedin.mxe.TopicConvention;
import javax.annotation.Nonnull;
import org.apache.kafka.clients.producer.Producer;
@ -17,18 +19,20 @@ import com.linkedin.metadata.snapshot.DataJobSnapshot;
import io.ebean.config.ServerConfig;
@Configuration
public class DataJobDAOFactory {
@Autowired
private ApplicationContext applicationContext;
@Autowired
private ApplicationContext applicationContext;
@Bean(name = "dataJobDAO")
@DependsOn({"gmsEbeanServiceConfig", "kafkaEventProducer"})
@Nonnull
protected EbeanLocalDAO<DataJobAspect, DataJobUrn> createInstance() {
KafkaMetadataEventProducer<DataJobSnapshot, DataJobAspect, DataJobUrn> producer =
new KafkaMetadataEventProducer<>(DataJobSnapshot.class, DataJobAspect.class,
applicationContext.getBean(Producer.class));
return new EbeanLocalDAO<>(DataJobAspect.class, producer, applicationContext.getBean(ServerConfig.class), DataJobUrn.class);
}
@Bean(name = "dataJobDAO")
@DependsOn({"gmsEbeanServiceConfig", "kafkaEventProducer", TopicConventionFactory.TOPIC_CONVENTION_BEAN})
@Nonnull
protected EbeanLocalDAO<DataJobAspect, DataJobUrn> createInstance() {
KafkaMetadataEventProducer<DataJobSnapshot, DataJobAspect, DataJobUrn> producer =
new KafkaMetadataEventProducer<>(DataJobSnapshot.class, DataJobAspect.class,
applicationContext.getBean(Producer.class), applicationContext.getBean(TopicConvention.class));
return new EbeanLocalDAO<>(DataJobAspect.class, producer, applicationContext.getBean(ServerConfig.class),
DataJobUrn.class);
}
}

View File

@ -1,5 +1,7 @@
package com.linkedin.gms.factory.ml;
import com.linkedin.gms.factory.common.TopicConventionFactory;
import com.linkedin.mxe.TopicConvention;
import javax.annotation.Nonnull;
import org.apache.kafka.clients.producer.Producer;
@ -17,18 +19,20 @@ import com.linkedin.metadata.snapshot.MLModelSnapshot;
import io.ebean.config.ServerConfig;
@Configuration
public class MLModelDAOFactory {
@Autowired
private ApplicationContext applicationContext;
@Autowired
private ApplicationContext applicationContext;
@Bean(name = "mlModelDAO")
@DependsOn({"gmsEbeanServiceConfig", "kafkaEventProducer"})
@Nonnull
protected EbeanLocalDAO<MLModelAspect, MLModelUrn> createInstance() {
KafkaMetadataEventProducer<MLModelSnapshot, MLModelAspect, MLModelUrn> producer =
new KafkaMetadataEventProducer<>(MLModelSnapshot.class, MLModelAspect.class,
applicationContext.getBean(Producer.class));
return new EbeanLocalDAO<>(MLModelAspect.class, producer, applicationContext.getBean(ServerConfig.class), MLModelUrn.class);
}
@Bean(name = "mlModelDAO")
@DependsOn({"gmsEbeanServiceConfig", "kafkaEventProducer", TopicConventionFactory.TOPIC_CONVENTION_BEAN})
@Nonnull
protected EbeanLocalDAO<MLModelAspect, MLModelUrn> createInstance() {
KafkaMetadataEventProducer<MLModelSnapshot, MLModelAspect, MLModelUrn> producer =
new KafkaMetadataEventProducer<>(MLModelSnapshot.class, MLModelAspect.class,
applicationContext.getBean(Producer.class), applicationContext.getBean(TopicConvention.class));
return new EbeanLocalDAO<>(MLModelAspect.class, producer, applicationContext.getBean(ServerConfig.class),
MLModelUrn.class);
}
}

View File

@ -1,11 +1,13 @@
package com.linkedin.gms.factory.tag;
import com.linkedin.common.urn.TagUrn;
import com.linkedin.gms.factory.common.TopicConventionFactory;
import com.linkedin.metadata.aspect.TagAspect;
import com.linkedin.metadata.dao.EbeanLocalDAO;
import com.linkedin.metadata.dao.producer.KafkaMetadataEventProducer;
import com.linkedin.metadata.snapshot.TagSnapshot;
import com.linkedin.mxe.TopicConvention;
import io.ebean.config.ServerConfig;
import org.apache.kafka.clients.producer.Producer;
import org.springframework.beans.factory.annotation.Autowired;
@ -16,19 +18,19 @@ import org.springframework.context.annotation.DependsOn;
import javax.annotation.Nonnull;
@Configuration
public class TagDaoFactory {
@Autowired
ApplicationContext applicationContext;
@Autowired
ApplicationContext applicationContext;
@Bean(name = "tagDAO")
@DependsOn({"gmsEbeanServiceConfig", "kafkaEventProducer"})
@Nonnull
protected EbeanLocalDAO createInstance() {
KafkaMetadataEventProducer<TagSnapshot, TagAspect, TagUrn> producer =
new KafkaMetadataEventProducer(TagSnapshot.class, TagAspect.class,
applicationContext.getBean(Producer.class));
return new EbeanLocalDAO<>(TagAspect.class, producer, applicationContext.getBean(ServerConfig.class),
TagUrn.class);
}
@Bean(name = "tagDAO")
@DependsOn({"gmsEbeanServiceConfig", "kafkaEventProducer", TopicConventionFactory.TOPIC_CONVENTION_BEAN})
@Nonnull
protected EbeanLocalDAO createInstance() {
KafkaMetadataEventProducer<TagSnapshot, TagAspect, TagUrn> producer =
new KafkaMetadataEventProducer(TagSnapshot.class, TagAspect.class, applicationContext.getBean(Producer.class),
applicationContext.getBean(TopicConvention.class));
return new EbeanLocalDAO<>(TagAspect.class, producer, applicationContext.getBean(ServerConfig.class), TagUrn.class);
}
}

View File

@ -46,18 +46,6 @@ public class KafkaMetadataEventProducer<SNAPSHOT extends RecordTemplate, ASPECT_
private final Optional<Callback> _callback;
private final TopicConvention _topicConvention;
/**
* Constructor.
*
* @param snapshotClass The snapshot class for the produced events
* @param aspectUnionClass The aspect union in the snapshot
* @param producer The Kafka {@link Producer} to use
*/
public KafkaMetadataEventProducer(@Nonnull Class<SNAPSHOT> snapshotClass,
@Nonnull Class<ASPECT_UNION> aspectUnionClass, @Nonnull Producer<String, ? extends IndexedRecord> producer) {
this(snapshotClass, aspectUnionClass, producer, new TopicConventionImpl(), null);
}
/**
* Constructor.
*
@ -72,19 +60,6 @@ public class KafkaMetadataEventProducer<SNAPSHOT extends RecordTemplate, ASPECT_
this(snapshotClass, aspectUnionClass, producer, topicConvention, null);
}
/**
* Constructor.
*
* @param snapshotClass The snapshot class for the produced events
* @param aspectUnionClass The aspect union in the snapshot
* @param producer The Kafka {@link Producer} to use
*/
public KafkaMetadataEventProducer(@Nonnull Class<SNAPSHOT> snapshotClass,
@Nonnull Class<ASPECT_UNION> aspectUnionClass, @Nonnull Producer<String, ? extends IndexedRecord> producer,
@Nullable Callback callback) {
this(snapshotClass, aspectUnionClass, producer, new TopicConventionImpl(), callback);
}
/**
* Constructor.
*

View File

@ -7,8 +7,10 @@ import com.linkedin.common.urn.DataPlatformUrn;
import com.linkedin.common.urn.DatasetUrn;
import com.linkedin.metadata.aspect.DatasetAspect;
import com.linkedin.metadata.dao.producer.KafkaMetadataEventProducer;
import com.linkedin.metadata.examples.configs.TopicConventionFactory;
import com.linkedin.metadata.snapshot.DatasetSnapshot;
import com.linkedin.mxe.MetadataChangeEvent;
import com.linkedin.mxe.TopicConvention;
import com.linkedin.schema.KafkaSchema;
import com.linkedin.schema.SchemaField;
import com.linkedin.schema.SchemaFieldArray;
@ -44,6 +46,10 @@ public final class KafkaEtl implements CommandLineRunner {
@Named("kafkaEventProducer")
private Producer<String, GenericRecord> _producer;
@Inject
@Named(TopicConventionFactory.TOPIC_CONVENTION_BEAN)
private TopicConvention _topicConvention;
@Inject
@Named("zooKeeper")
private ZooKeeper _zooKeeper;
@ -76,7 +82,7 @@ public final class KafkaEtl implements CommandLineRunner {
// Kafka topics are considered datasets in the current DataHub metadata ecosystem.
final KafkaMetadataEventProducer<DatasetSnapshot, DatasetAspect, DatasetUrn> eventProducer =
new KafkaMetadataEventProducer<>(DatasetSnapshot.class, DatasetAspect.class, _producer);
new KafkaMetadataEventProducer<>(DatasetSnapshot.class, DatasetAspect.class, _producer, _topicConvention);
eventProducer.produceSnapshotBasedMetadataChangeEvent(
new DatasetUrn(KAFKA_URN, schemaMetadata.getSchemaName(), FabricType.PROD), schemaMetadata);
_producer.flush();