Kafka Stubbing (Private Beta)
The
specmatic-kafka
module described in this document is currently in private beta. Please get in touch with us through theContact Us
form at https://specmatic.in if you’d like to try it out.
Introduction to Kafka stubbing
Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.
Pre-requisite Setup
Add the specification file (which will be used to stub kafka) in the src/test/resources directory. See a sample specification here:
asyncapi: 2.0.0
info:
title: Kafka Queue Example
version: '1.0.0'
servers:
activemq:
url: tcp://localhost:61616
protocol: amqp
channels:
taskQueueObject:
publish:
operationId: publishObjectMessage
message:
payload:
$ref: "#/components/messages/Task"
bindings:
amqp:
is: queue
components:
messages:
Task:
name: Task
title: A Task to be processed
summary: Inform about a new user task in the system
contentType: application/json
payload:
type: object
properties:
id:
type: integer
name:
type: string
Note the following:
- Protocol for now should be amqp (line 8)
- The block on line 10 is an example of how the queue is declared. The key name is the queue name.
- The payload (line 14) should define the structure of a message on the queue.
- The payload specification format is the same as how the structure is declared in an OpenAPI specification.
- The AMQP Bindings on lines 16-18 should be declared in all queues (we will use AMQP bindings to declare Kafka queues).
There are 2 ways to stub out Kafka:-
I) Self-managed Kafka instance
Use this when the kafka-clients
package in the project has the same major version as Specmatic’s Kafka (kafka_2.13
2.8.0). Also in this approach specmatic will automatically pick the kafka yaml from the specified file path.( src/test/resources/kafka.yaml) In this approach, Specmatic manages an in-memory instance of Kafka that the system-under-test can send messages to.
- Add the specmatic Kafka dependency to
pom.xml
:<dependency> <groupId>in.specmatic</groupId> <artifactId>specmatic-kafka</artifactId> <version>0.3.0</version> </dependency>
- Use the following code in
@BeforeAll
to start up the Kafka stub:// kafkaMock is a static variable kafkaMock = KafkaMock.fromAsyncAPIFiles( listOf("src/test/resources/kafka.yaml"), 9092, 2171, "./kafka-logs"); kafkaMock.start();
- In the
@AfterAll
method, use the following code to stop the mock.kafkaMock.close();
II) External Kafka instance
Use this when the kafka-clients
package in the project has a different major version from Specmatic’s Kafka (kafka_2.13
2.8.0). Specmatic may not be able to start its own Kafka service due to this library conflict.
Use the code below to start up a Kafka server, and let Specmatic mock subscribe to it to verify interactions with it.
-
Add the following to
pom.xml
(the version should match that ofkafka-clients
version which would already be in the projects’ pom or parent pom)<dependency> <groupId>in.specmatic</groupId> <artifactId>specmatic-kafka</artifactId> <version>0.6.0</version> </dependency>
when facing issues with kafka version in the setting up externalKafkaServer then you can explicitly provide kafka version as 2.8.0 in properties in pom.xml as
<kafka.version>2.8.0</kafka.version>
-
Define the following as global variables for the class ContractTests.java:
private static KafkaMock kafkaMock = null; private static TestingServer zkServer = null; private static KafkaServer externalKafkaServer = null; private static ConfigurableApplicationContext context = null;
-
Use the following code in
@BeforeAll
apart from the generic System.setProperty() methods to start up the Kafka stub:// Start the Kafka mock instance. List<String> fileList= new ArrayList<>(); fileList.add("src/test/resources/kafka_stub.yaml"); kafkaMock = KafkaMock.fromAsyncAPIFiles(fileList,9092,2181,"./kafka-logs"); // Without this, the Kafka server may not start kafkaMock.cleanupLogDir(); // Start Zookeeper zkServer = kafkaMock.startZooKeeper(); // Start a new Kafka server using the Kafka dependency already added to the pom externalKafkaServer = new KafkaServer( kafkaMock.getKafkaConfigInstance(), Time.SYSTEM, Option.empty(), scala.collection.JavaConverters.asScalaBuffer(new ArrayList<KafkaMetricsReporter>()).toList()); externalKafkaServer.startup(); // subscribe the KafkaMock instance to all the topics kafkaMock.subscribe();
Note
The above example is for kafka version 2.7.1; for kafka 2.8.0 use:
externalKafkaServer = new KafkaServer( kafkaMock.getKafkaConfigInstance(), Time.SYSTEM, Option.empty(), true);
-
In the
@AfterAll
method, add the following code:if (context != null) context.close(); if (externalKafkaServer != null) externalKafkaServer.shutdown(); if (zkServer != null) zkServer.close();