fix(ci): cassandra test stabilization (#13997)

This commit is contained in:
david-leifker 2025-07-08 15:40:21 -05:00 committed by GitHub
parent d2d9d36987
commit 5c1244d922
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 73 additions and 16 deletions

View File

@ -2,6 +2,7 @@ package com.linkedin.metadata;
import static org.testng.Assert.assertEquals;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
@ -17,9 +18,11 @@ import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.net.ssl.SSLContext;
import org.testcontainers.containers.CassandraContainer;
import lombok.extern.slf4j.Slf4j;
import org.testcontainers.cassandra.CassandraContainer;
import org.testcontainers.utility.DockerImageName;
@Slf4j
public class CassandraTestUtils {
private CassandraTestUtils() {}
@ -28,21 +31,59 @@ public class CassandraTestUtils {
private static final String IMAGE_NAME = "cassandra:3.11";
public static CassandraContainer setupContainer() {
final DockerImageName imageName =
DockerImageName.parse(IMAGE_NAME).asCompatibleSubstituteFor("cassandra");
final DockerImageName imageName = DockerImageName.parse(IMAGE_NAME);
CassandraContainer container = new CassandraContainer(imageName);
container
.withEnv("JVM_OPTS", "-Xms64M -Xmx96M")
.withStartupTimeout(Duration.ofMinutes(5)) // usually < 1min
.start();
CassandraContainer container =
new CassandraContainer(imageName)
.withEnv("JVM_OPTS", "-Xms128M -Xmx128M -Djava.net.preferIPv4Stack=true")
.withStartupTimeout(Duration.ofMinutes(2))
.withReuse(true);
try (Session session = container.getCluster().connect()) {
container.start();
Cluster cluster = null;
Session session = null;
Exception lastException = null;
for (int i = 0; i < 30; i++) { // Try for up to 60 seconds
try {
cluster =
Cluster.builder()
.addContactPointsWithPorts(container.getContactPoint())
.withCredentials(container.getUsername(), container.getPassword())
.withoutJMXReporting()
.build();
session = cluster.connect();
break;
} catch (Exception e) {
lastException = e;
if (cluster != null) {
try {
cluster.close();
} catch (Exception ignored) {
}
}
try {
Thread.sleep(2000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while waiting for Cassandra", ie);
}
}
}
if (session == null) {
throw new RuntimeException("Could not connect to Cassandra container", lastException);
}
try {
session.execute(
String.format(
"CREATE KEYSPACE IF NOT EXISTS %s WITH replication = \n"
+ "{'class':'SimpleStrategy','replication_factor':'1'};",
KEYSPACE_NAME));
session.execute(
String.format(
"create table %s.%s (urn varchar, \n"
@ -58,14 +99,22 @@ public class CassandraTestUtils {
+ "with clustering order by (aspect asc, version asc);",
KEYSPACE_NAME, CassandraAspect.TABLE_NAME));
List<KeyspaceMetadata> keyspaces = session.getCluster().getMetadata().getKeyspaces();
List<KeyspaceMetadata> keyspaces = cluster.getMetadata().getKeyspaces();
List<KeyspaceMetadata> filteredKeyspaces =
keyspaces.stream()
.filter(km -> km.getName().equals(KEYSPACE_NAME))
.collect(Collectors.toList());
assertEquals(filteredKeyspaces.size(), 1);
} finally {
if (session != null) {
session.close();
}
if (cluster != null) {
cluster.close();
}
}
return container;
}
@ -94,7 +143,7 @@ public class CassandraTestUtils {
try {
csb = csb.withSslContext(SSLContext.getDefault());
} catch (Exception e) {
e.printStackTrace();
log.error("Failed to connect using SSL", e);
}
}
@ -111,14 +160,22 @@ public class CassandraTestUtils {
put("password", container.getPassword());
put("hosts", container.getHost());
put("port", container.getMappedPort(9042).toString());
put("datacenter", "datacenter1");
put("datacenter", container.getLocalDatacenter()); // Use the new method
put("useSsl", "false");
}
};
}
public static void purgeData(CassandraContainer container) {
try (Session session = container.getCluster().connect()) {
// Create a new cluster connection for purging
try (Cluster cluster =
Cluster.builder()
.addContactPointsWithPorts(container.getContactPoint())
.withCredentials(container.getUsername(), container.getPassword())
.withoutJMXReporting()
.build();
Session session = cluster.connect()) {
session.execute(String.format("TRUNCATE %s.%s;", KEYSPACE_NAME, CassandraAspect.TABLE_NAME));
List<Row> rs =
session

View File

@ -10,7 +10,7 @@ import com.linkedin.metadata.entity.EntityServiceImpl;
import com.linkedin.metadata.event.EventProducer;
import com.linkedin.metadata.models.registry.EntityRegistryException;
import com.linkedin.metadata.service.UpdateIndicesService;
import org.testcontainers.containers.CassandraContainer;
import org.testcontainers.cassandra.CassandraContainer;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;

View File

@ -30,7 +30,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.testcontainers.containers.CassandraContainer;
import org.testcontainers.cassandra.CassandraContainer;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;

View File

@ -11,7 +11,7 @@ import com.linkedin.metadata.event.EventProducer;
import com.linkedin.metadata.models.registry.EntityRegistryException;
import com.linkedin.metadata.timeline.TimelineServiceImpl;
import com.linkedin.metadata.timeline.TimelineServiceTest;
import org.testcontainers.containers.CassandraContainer;
import org.testcontainers.cassandra.CassandraContainer;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;