Nowadays, event-driven architecture is used in developing software applications in different areas, like microservices with patterns such as CQRS, Saga Pattern, etc. We use publish-subscribe messaging systems such as Apache Kafka, for asynchronous communication between systems. However, we may need to establish a request/reply pattern in some of the requirements. We can implement this request-reply pattern with Kafka using the spring boot framework.
In this article, we will learn how to implement the synchronous communication pattern using Apache Kafka with Spring boot.
We will also create an HTTP POST REST endpoint, which accepts student details and returns randomly calculated result and percentage.
Table of Contents
- Version details
- Create a Spring Boot application with required dependencies
- Set up the Kafka configuration
- Create request and reply DTO classes
- Create a Rest Controller class
- Create a Listener class
- Testing the application with Postman
- Conclusion
Version details
The following are the version details we are going to use in our example.
- Java version 1.8
- Spring boot 2.2.2.RELEASE
Create a Spring Boot application with required dependencies
Create a spring boot application with required dependencies. We should add spring-boot-starter-web, spring-kafka, and lombok(To reduce boiler plate code) dependencies.
The spring-kafka library provides wonderful support for spring messaging solutions.
Finally, the below pom.xml file shows the required dependencies for our project.
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.2.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.asb.example</groupId> <artifactId>spring-kafka-synchronous-example</artifactId> <version>0.0.1-SNAPSHOT</version> <name>spring-kafka-synchronous-example</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> <maven-jar-plugin.version>3.1.1</maven-jar-plugin.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
Set up the Kafka configuration
The next step is to set up the required configuration for Kafka.
application.properties
Add the below spring boot Kafka properties into the application.properties configuration file(under src/main/resources folder)
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.enable-auto-commit=false spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.properties.spring.json.trusted.packages=com.asb.example ##User defined Properties: kafka.reuest.topic=student kafka.reply.topic=result kafka.group.id=student-result-group
We have defined spring Kafka configuration properties to set producer serializer, consumer deserializer, trusted packages for consumers, etc.
We have also defined custom properties to keep the request and reply topic names and Kafka group id details in one place.
Also, the request will be published to the topic called student and the response will be sent back to the reply topic called result.
Kafka configuration class
In the next step, we will create a spring configuration class. This configuration class will contain two beans defined: ReplyingKafkaTemplate and a KafkaTemplate.
KafkaConfig.java
package com.asb.example; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.requestreply.ReplyingKafkaTemplate; @Configuration public class KafkaConfig { @Value("${kafka.group.id}") private String groupId; @Value("${kafka.reply.topic}") private String replyTopic; @Bean public ReplyingKafkaTemplate<String, Student, Result> replyingKafkaTemplate(ProducerFactory<String, Student> pf, ConcurrentKafkaListenerContainerFactory<String, Result> factory) { ConcurrentMessageListenerContainer<String, Result> replyContainer = factory.createContainer(replyTopic); replyContainer.getContainerProperties().setMissingTopicsFatal(false); replyContainer.getContainerProperties().setGroupId(groupId); return new ReplyingKafkaTemplate<>(pf, replyContainer); } @Bean public KafkaTemplate<String, Result> replyTemplate(ProducerFactory<String, Result> pf, ConcurrentKafkaListenerContainerFactory<String, Result> factory) { KafkaTemplate<String, Result> kafkaTemplate = new KafkaTemplate<>(pf); factory.getContainerProperties().setMissingTopicsFatal(false); factory.setReplyTemplate(kafkaTemplate); return kafkaTemplate; } }
Spring provides a ReplyingKafkaTemplate <K, V, R>, which offers a return object or a reply object once the message is consumed by the Kafka listener from another side. The type parameters represent: K – Key type, V – Outbound data type, and R – Reply data type.
In our example, we have defined the bean as ReplyingKafkaTemplate<String, Student, Result>, where we will send the Student details to Kafka and then get the Result object as reply data type. We are going to create the Student and the Result DTO classes in the next section.
We then need to define a KafkaTemplate bean, which we will use as the reply template with producer factory having data type: ProducerFactory<String, Result>.
Also, this template is set as the reply template of the ConcurrentKafkaListenerContainerFactory(which is the expecting Result as a return type from the listener).
Create request and reply DTO classes
We will create a Student and the Result DTO classes. These classes are used as request and reply data types.
Student.java
We will be posting the Student details like name, registration number, etc to the Kafka server and get Result information in return.
package com.asb.example; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; @Getter @Setter @AllArgsConstructor @NoArgsConstructor public class Student { private String registrationNumber; private String name; private String grade; }
Result.java
Reply data will contain the calculated result, student name, and percentage details.
package com.asb.example; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; @Getter @Setter @AllArgsConstructor @NoArgsConstructor public class Result { private String name; private String percentage; private String result; }
Create a Rest Controller class
To input student details, let us create a controller class. This RESTful controller will contain a POST endpoint. The endpoint will accept Student details and pass that information to the Kafka server to get the calculated result.
KafkaController.java
We have exposed an HTTP POST endpoint called get-result. This endpoint expects the Student objects and returns calculated Result details.
Next, with the help of ReplyingKafkaTemplate<String, Student, Result> instance, we are sending the student details by using the sendAndReceive() method.
package com.asb.example; import java.util.concurrent.ExecutionException; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.kafka.requestreply.ReplyingKafkaTemplate; import org.springframework.kafka.requestreply.RequestReplyFuture; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; @RestController public class KafkaController { @Value("${kafka.reuest.topic}") private String requestTopic; @Autowired private ReplyingKafkaTemplate<String, Student, Result> replyingKafkaTemplate; @PostMapping("/get-result") public ResponseEntity<Result> getObject(@RequestBody Student student) throws InterruptedException, ExecutionException { ProducerRecord<String, Student> record = new ProducerRecord<>(requestTopic, null, student.getRegistrationNumber(), student); RequestReplyFuture<String, Student, Result> future = replyingKafkaTemplate.sendAndReceive(record); ConsumerRecord<String, Result> response = future.get(); return new ResponseEntity<>(response.value(), HttpStatus.OK); } }
Create a Listener class
Create a class called StudentResultCalculator as shown below. This class will have a Kafka listener method, which receives the request messages and then responds to the reply topic with calculated student’s result details.
StudentResultCalculator.java
We have annotated the class with the @Component annotation to register it as a spring bean. The @KafkaListener is then used to subscribe to the request Kafka topic.
Using the annotation the @SendTo annotation enables the listener method the capability to send a response back to another reply topic.
We also have a tiny logic that randomly calculates student results and percentages.
package com.asb.example; import java.util.concurrent.ThreadLocalRandom; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.stereotype.Component; @Component public class StudentResultCalculator { @KafkaListener(topics = "${kafka.reuest.topic}", groupId = "${kafka.group.id}") @SendTo public Result handle(Student student) { System.out.println("Calculating Result..."); double total = ThreadLocalRandom.current().nextDouble(2.5, 9.9); Result result = new Result(); result.setName(student.getName()); result.setResult((total > 3.5) ? "Pass" : "Fail"); result.setPercentage(String.valueOf(total * 10).substring(0, 4) + "%"); return result; } }
Testing the application with Postman
Finally, it’s time to test our result calculator.
Start the spring boot application and then post the student details as shown below. (Make sure to start the Kafka server locally).

Conclusion
In this article, we learned how to perform synchronous request reply pattern with spring Kafka library.
We also learned how to return different data types as reply data from the consumer side.
Hope you have enjoyed the article.
Finally, complete example code is available in Github.