Request Reply pattern with Apache Kafka – Spring Boot

Nowadays, event-driven architecture is used in developing software applications in different areas, like microservices with patterns such as CQRS, Saga Pattern, etc. We use publish-subscribe messaging systems such as Apache Kafka, for asynchronous communication between systems. However, we may need to establish a request/reply pattern in some of the requirements. We can implement this request-reply pattern with Kafka using the spring boot framework.

In this article, we will learn how to implement the synchronous communication pattern using Apache Kafka with Spring boot.

We will also create an HTTP POST REST endpoint, which accepts student details and returns randomly calculated result and percentage.

Table of Contents

Version details

The following are the version details we are going to use in our example.

  • Java version 1.8
  • Spring boot 2.2.2.RELEASE

Create a Spring Boot application with required dependencies

Create a spring boot application with required dependencies. We should add spring-boot-starter-web, spring-kafka, and lombok(To reduce boiler plate code) dependencies.

The spring-kafka library provides wonderful support for spring messaging solutions.

Finally, the below pom.xml file shows the required dependencies for our project.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.2.2.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.asb.example</groupId>
	<artifactId>spring-kafka-synchronous-example</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>spring-kafka-synchronous-example</name>
	<description>Demo project for Spring Boot</description>
	<properties>
		<java.version>1.8</java.version>
		<maven-jar-plugin.version>3.1.1</maven-jar-plugin.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
			<exclusions>
				<exclusion>
					<groupId>org.junit.vintage</groupId>
					<artifactId>junit-vintage-engine</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

Set up the Kafka configuration

The next step is to set up the required configuration for Kafka.

application.properties

Add the below spring boot Kafka properties into the application.properties configuration file(under src/main/resources folder)

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.spring.json.trusted.packages=com.asb.example
##User defined Properties:
kafka.reuest.topic=student
kafka.reply.topic=result
kafka.group.id=student-result-group

We have defined spring Kafka configuration properties to set producer serializer, consumer deserializer, trusted packages for consumers, etc.

We have also defined custom properties to keep the request and reply topic names and Kafka group id details in one place.

Also, the request will be published to the topic called student and the response will be sent back to the reply topic called result.

Kafka configuration class

In the next step, we will create a spring configuration class. This configuration class will contain two beans defined: ReplyingKafkaTemplate and a KafkaTemplate.

KafkaConfig.java

package com.asb.example;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
@Configuration
public class KafkaConfig {
	@Value("${kafka.group.id}")
	private String groupId;
	@Value("${kafka.reply.topic}")
	private String replyTopic;
	@Bean
	public ReplyingKafkaTemplate<String, Student, Result> replyingKafkaTemplate(ProducerFactory<String, Student> pf,
			ConcurrentKafkaListenerContainerFactory<String, Result> factory) {
		ConcurrentMessageListenerContainer<String, Result> replyContainer = factory.createContainer(replyTopic);
		replyContainer.getContainerProperties().setMissingTopicsFatal(false);
		replyContainer.getContainerProperties().setGroupId(groupId);
		return new ReplyingKafkaTemplate<>(pf, replyContainer);
	}
	@Bean
	public KafkaTemplate<String, Result> replyTemplate(ProducerFactory<String, Result> pf,
			ConcurrentKafkaListenerContainerFactory<String, Result> factory) {
		KafkaTemplate<String, Result> kafkaTemplate = new KafkaTemplate<>(pf);
		factory.getContainerProperties().setMissingTopicsFatal(false);
		factory.setReplyTemplate(kafkaTemplate);
		return kafkaTemplate;
	}
}

Spring provides a ReplyingKafkaTemplate <K, V, R>, which offers a return object or a reply object once the message is consumed by the Kafka listener from another side. The type parameters represent: K – Key type, V – Outbound data type, and R – Reply data type.

In our example, we have defined the bean as ReplyingKafkaTemplate<String, Student, Result>, where we will send the Student details to Kafka and then get the Result object as reply data type. We are going to create the Student and the Result DTO classes in the next section.

We then need to define a KafkaTemplate bean, which we will use as the reply template with producer factory having data type: ProducerFactory<String, Result>.

Also, this template is set as the reply template of the ConcurrentKafkaListenerContainerFactory(which is the expecting Result as a return type from the listener).

Create request and reply DTO classes

We will create a Student and the Result DTO classes. These classes are used as request and reply data types.

Student.java

We will be posting the Student details like name, registration number, etc to the Kafka server and get Result information in return.

package com.asb.example;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class Student {
	private String registrationNumber;
	private String name;
	private String grade;
}

Result.java

Reply data will contain the calculated result, student name, and percentage details.

package com.asb.example;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class Result {
	
	private String name;
	private String percentage;
	private String result;
}

Create a Rest Controller class

To input student details, let us create a controller class. This RESTful controller will contain a POST endpoint. The endpoint will accept Student details and pass that information to the Kafka server to get the calculated result.

KafkaController.java

We have exposed an HTTP POST endpoint called get-result. This endpoint expects the Student objects and returns calculated Result details.

Next, with the help of ReplyingKafkaTemplate<String, Student, Result> instance, we are sending the student details by using the sendAndReceive() method.

package com.asb.example;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
import org.springframework.kafka.requestreply.RequestReplyFuture;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class KafkaController {
	@Value("${kafka.reuest.topic}")
	private String requestTopic;
	@Autowired
	private ReplyingKafkaTemplate<String, Student, Result> replyingKafkaTemplate;
	@PostMapping("/get-result")
	public ResponseEntity<Result> getObject(@RequestBody Student student)
			throws InterruptedException, ExecutionException {
		ProducerRecord<String, Student> record = new ProducerRecord<>(requestTopic, null, student.getRegistrationNumber(), student);
		RequestReplyFuture<String, Student, Result> future = replyingKafkaTemplate.sendAndReceive(record);
		ConsumerRecord<String, Result> response = future.get();
		return new ResponseEntity<>(response.value(), HttpStatus.OK);
	}
}

Create a Listener class

Create a class called StudentResultCalculator as shown below. This class will have a Kafka listener method, which receives the request messages and then responds to the reply topic with calculated student’s result details.

StudentResultCalculator.java

We have annotated the class with the @Component annotation to register it as a spring bean. The @KafkaListener is then used to subscribe to the request Kafka topic.

Using the annotation the @SendTo annotation enables the listener method the capability to send a response back to another reply topic.

We also have a tiny logic that randomly calculates student results and percentages.

package com.asb.example;
import java.util.concurrent.ThreadLocalRandom;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;
@Component
public class StudentResultCalculator {
	@KafkaListener(topics = "${kafka.reuest.topic}", groupId = "${kafka.group.id}")
	@SendTo
	public Result handle(Student student) {
		System.out.println("Calculating Result...");
		double total = ThreadLocalRandom.current().nextDouble(2.5, 9.9);
		Result result = new Result();
		result.setName(student.getName());
		result.setResult((total > 3.5) ? "Pass" : "Fail");
		result.setPercentage(String.valueOf(total * 10).substring(0, 4) + "%");
		return result;
	}
}

Testing the application with Postman

Finally, it’s time to test our result calculator.

Start the spring boot application and then post the student details as shown below. (Make sure to start the Kafka server locally).

kafka request reply example

Conclusion

In this article, we learned how to perform synchronous communication with spring Kafka.

We also learned how to return different data types as reply data from the consumer side.

Hope you have enjoyed the article.

Finally, complete example code is available in Github.