fix(kafka): datahub-upgrade job (#6864)

This commit is contained in:
david-leifker 2022-12-27 12:35:48 -06:00 committed by GitHub
parent bef87abd6c
commit a5b5abf270
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 93 additions and 11 deletions

View File

@ -46,6 +46,10 @@ dependencies {
annotationProcessor externalDependency.lombok
annotationProcessor externalDependency.picocli
testImplementation externalDependency.springBootTest
testCompile externalDependency.mockito
testCompile externalDependency.testng
}
bootJar {

View File

@ -11,9 +11,13 @@ import org.springframework.context.annotation.FilterType;
@SuppressWarnings("checkstyle:HideUtilityClassConstructor")
@SpringBootApplication(exclude = {ElasticsearchRestClientAutoConfiguration.class})
@ComponentScan(basePackages = {"com.linkedin.gms.factory", "com.linkedin.datahub.upgrade.config"},
excludeFilters = {
@ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = ScheduledAnalyticsFactory.class)})
@ComponentScan(basePackages = {
"com.linkedin.gms.factory",
"com.linkedin.datahub.upgrade.config",
"com.linkedin.metadata.dao.producer"
}, excludeFilters = {
@ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = ScheduledAnalyticsFactory.class)
})
public class UpgradeCliApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(UpgradeCliApplication.class, UpgradeCli.class).web(WebApplicationType.NONE).run(args);

View File

@ -0,0 +1,29 @@
package com.linkedin.datahub.upgrade;
import com.linkedin.datahub.upgrade.restoreindices.RestoreIndices;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
import org.testng.annotations.Test;
import javax.inject.Named;
import static org.testng.AssertJUnit.assertEquals;
@ActiveProfiles("test")
@SpringBootTest(classes = {UpgradeCliApplication.class, UpgradeCliApplicationTestConfiguration.class})
public class UpgradeCliApplicationTest extends AbstractTestNGSpringContextTests {
@Autowired
@Named("restoreIndices")
private RestoreIndices restoreIndices;
@Test
public void testKafkaHealthCheck() {
/*
This might seem like a simple test however it does exercise the spring autowiring of the kafka health check bean
*/
assertEquals(3, restoreIndices.steps().size());
}
}

View File

@ -0,0 +1,38 @@
package com.linkedin.datahub.upgrade;
import com.linkedin.gms.factory.auth.SystemAuthenticationFactory;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.models.registry.ConfigEntityRegistry;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.search.SearchService;
import io.ebean.EbeanServer;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.annotation.Import;
@TestConfiguration
@Import(value = {SystemAuthenticationFactory.class})
public class UpgradeCliApplicationTestConfiguration {
@MockBean
private UpgradeCli upgradeCli;
@MockBean
private EbeanServer ebeanServer;
@MockBean
private EntityService entityService;
@MockBean
private SearchService searchService;
@MockBean
private GraphService graphService;
@MockBean
private EntityRegistry entityRegistry;
@MockBean
ConfigEntityRegistry configEntityRegistry;
}

View File

@ -6,6 +6,8 @@
: ${DATAHUB_ANALYTICS_ENABLED:=true}
: ${KAFKA_HEAP_OPTS:=-Xmx64M}
CONNECTION_PROPERTIES_PATH=/tmp/connection.properties
function wait_ex {
@ -60,26 +62,31 @@ if [[ -n "$KAFKA_PROPERTIES_SASL_CLIENT_CALLBACK_HANDLER_CLASS" ]]; then
fi
cub kafka-ready -c $CONNECTION_PROPERTIES_PATH -b $KAFKA_BOOTSTRAP_SERVER 1 180
kafka-topics.sh --create --if-not-exists --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --partitions $PARTITIONS --replication-factor $REPLICATION_FACTOR --topic $METADATA_AUDIT_EVENT_NAME &
kafka-topics.sh --create --if-not-exists --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --partitions $PARTITIONS --replication-factor $REPLICATION_FACTOR --topic $METADATA_CHANGE_EVENT_NAME &
kafka-topics.sh --create --if-not-exists --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --partitions $PARTITIONS --replication-factor $REPLICATION_FACTOR --topic $FAILED_METADATA_CHANGE_EVENT_NAME &
kafka-topics.sh --create --if-not-exists --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --partitions $PARTITIONS --replication-factor $REPLICATION_FACTOR --topic $METADATA_CHANGE_LOG_VERSIONED_TOPIC &
echo "Waiting for topic creation group 1."
result=$(wait_ex)
rc=$?
if [ $rc -ne 0 ]; then exit $rc; fi
echo "Finished topic creation group 1."
# Set retention to 90 days
kafka-topics.sh --create --if-not-exists --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --partitions $PARTITIONS --replication-factor $REPLICATION_FACTOR --config retention.ms=7776000000 --topic $METADATA_CHANGE_LOG_TIMESERIES_TOPIC &
kafka-topics.sh --create --if-not-exists --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --partitions $PARTITIONS --replication-factor $REPLICATION_FACTOR --topic $METADATA_CHANGE_PROPOSAL_TOPIC &
kafka-topics.sh --create --if-not-exists --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --partitions $PARTITIONS --replication-factor $REPLICATION_FACTOR --topic $FAILED_METADATA_CHANGE_PROPOSAL_TOPIC &
kafka-topics.sh --create --if-not-exists --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --partitions $PARTITIONS --replication-factor $REPLICATION_FACTOR --topic $PLATFORM_EVENT_TOPIC_NAME &
# Create topic for datahub usage event
if [[ $DATAHUB_ANALYTICS_ENABLED == true ]]; then
kafka-topics.sh --create --if-not-exists --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --partitions $PARTITIONS --replication-factor $REPLICATION_FACTOR --topic $DATAHUB_USAGE_EVENT_NAME &
fi
echo "Waiting for topic creation."
echo "Waiting for topic creation group 2."
result=$(wait_ex)
rc=$?
if [ $rc -ne 0 ]; then exit $rc; fi
echo "Finished topic creation."
echo "Finished topic creation group 2."
# Create topic for datahub usage event
if [[ $DATAHUB_ANALYTICS_ENABLED == true ]]; then
kafka-topics.sh --create --if-not-exists --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --partitions $PARTITIONS --replication-factor $REPLICATION_FACTOR --topic $DATAHUB_USAGE_EVENT_NAME
fi
kafka-configs.sh --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --entity-type topics --entity-name _schemas --alter --add-config cleanup.policy=compact