Kafka, Schema Registry, JUnit and Test Containers — Part II: Creating a Kafka Cluster Test Extension
Introduction
In Part Iof this series, we saw how we could leverage Test Containers to make integration tests that require a Kafka Cluster and a Schema Registry, although we used a single-node Kafka Cluster. In this part, we will refactor the code so that it can be easily used in multiple tests.
Creating a JUnit5 Extension
To reuse code, we will create a JUnit5 Extension. Usually we use these extensions, but creating one is quite simple and opens a new door for test code reuse. As a side note, what is being done here can also be done in JUnit4, but for JUnit4 you are probably better off developing a Test Rule.
Our idea is to make the code in our test as simple as this:
@ExtendWith(KafkaTestClusterExtension.class)
class KafkaWithJUnitExtensionTest {
@Test
void clusterStartsWithSchemasTopic(KafkaTestCluster cluster) throws Exception {
Properties kafkaSettings = cluster.getKafkaSettings();
// Run the test
try (Admin admin = Admin.create(kafkaSettings)) {
ListTopicsResult listTopicsResult = admin.listTopics();
Set<String> names = listTopicsResult.names().get();
assertThat(names.size(), equalTo(1));
assertThat(names.iterator().next(), equalTo("_schemas"));
}
}
}The magic is hidden behind the KafkaTestClisterExtension and the KafkkaTestCluster interface. If you are wondering, the KafkaTestCluster interface has only one method to retrieve the configuration needed to connect to the cluster.
public interface KafkaTestCluster {
Properties getKafkaSettings();
}Let’s look at the KafkaTestClusterExtension. It implements three interfaces from JUnit:
public class KafkaTestClusterExtension implements
BeforeAllCallback, AfterAllCallback, ParameterResolver {The first are callback interfaces that are called before and after all the tests in the class are executed. The last is used to resolve the parameters used in the test constructor or test method. In our case, we are using a parameter in the test method.
Next, we have the same Docker network and containers that we had in the previous article. The difference is that we do not use the *@Container* annotation.
private final Network network = Network.newNetwork();
private final KafkaContainer kafkaContainer = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:7.5.2"))
.withNetwork(network);
private final GenericContainer<?> schemaRegistry = new GenericContainer<>(
DockerImageName.parse("confluentinc/cp-schema-registry:7.5.2"))
.withNetwork(network)
.withExposedPorts(8081)
.withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry")
.withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8081")
.withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS",
"PLAINTEXT://" + kafkaContainer.getNetworkAliases().get(0) + ":9092")
.waitingFor(Wait.forHttp("/subjects").forStatusCode(200));JUnit has no support for extensions to extend extensions, so we need to start and stop the containers ourselves.
@Override
public void beforeAll(ExtensionContext context) {
kafkaContainer.start();
schemaRegistry.start();
}
@Override
public void afterAll(ExtensionContext context) {
schemaRegistry.start();
kafkaContainer.stop();
}To inject parameters, we need to implement the methods defined in the ParameterResolver interface. In our case, we simply declare that we can inject KafkaTestCluster type parameters and then return an inline implementation of that interface.
@Override
public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
return parameterContext.getParameter().getType()
.equals(KafkaTestCluster.class);
}
@Override
public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
return (KafkaTestCluster) () -> {
Properties kafkaSettings = new Properties();
kafkaSettings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaContainer.getBootstrapServers());
kafkaSettings.put("schema.registry.url",
"http://" + schemaRegistry.getHost() +
":" + schemaRegistry.getFirstMappedPort());
return kafkaSettings;
};
}Next Steps
With these changes, we now have a JUnit Extension that hides all the details of launching and stopping the containers and obtaining their configuration. It is a good step, but it has one issue that will surface when you have multiple tests. There are multiple strategies to avoid having to launch a container for every single unit test or test class. We will look at them in the next part of this series.
Articles in the series: