Nowadays, event driven architecture is used in developing software applications in different areas, like microservices with patterns such as CQRS, Saga Pattern, etc.

We use publish-subscribe messaging systems such as Apache Kafka, for asynchronous communication between systems. However, we may need to establish synchronous communication (request/reply) in some of the requirements.

In this article, we will learn how to implement the synchronous communication pattern using Apache Kafka with Spring boot.

We will create an HTTP POST RESTful endpoint, which accepts student details and returns randomly calculated result and percentage. 🙂

Version details

The following are the version details we are going to use in our example.

  • Java version 1.8
  • Spring boot 2.2.2.RELEASE

Create a Spring Boot application with required dependencies

Create a spring boot application with required dependencies. We should add spring-boot-starter-web, spring-kafka and lombok(To reduce boiler plate code) dependencies.

The spring-kafka library provides wonderful support for spring based messaging solutions.

The below pom.xml file shows the required dependencies for our project.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.2.2.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.asb.example</groupId>
	<artifactId>spring-kafka-synchronous-example</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>spring-kafka-synchronous-example</name>
	<description>Demo project for Spring Boot</description>

	<properties>
		<java.version>1.8</java.version>
		<maven-jar-plugin.version>3.1.1</maven-jar-plugin.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>

		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
			<exclusions>
				<exclusion>
					<groupId>org.junit.vintage</groupId>
					<artifactId>junit-vintage-engine</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

Set up the Kafka configuration

Next step is to set up the required configuration for kafka.

application.properties

Add the below mentioned spring boot kafka properties into application.properties configuration file(under src/main/resources directory)

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.spring.json.trusted.packages=com.asb.example
##User defined Properties:
kafka.reuest.topic=student
kafka.reply.topic=result
kafka.group.id=student-result-group

We have defined spring kafka related configuration properties to set producer serializer, consumer deserializer, trusted packages for consumers, etc.

We have also defined custom properties to keep the request and reply topic names and Kafka group id details at one place.

The request will be published to the topic called student and return will be sent back to reply topic called result.

Kafka configuration class

In the next step, we will create a spring configuration class. This configuration class will contain two beans defined: ReplyingKafkaTemplate and a KafkaTemplate.

KafkaConfig.java

package com.asb.example;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;

@Configuration
public class KafkaConfig {

	@Value("${kafka.group.id}")
	private String groupId;

	@Value("${kafka.reply.topic}")
	private String replyTopic;

	@Bean
	public ReplyingKafkaTemplate<String, Student, Result> replyingKafkaTemplate(ProducerFactory<String, Student> pf,
			ConcurrentKafkaListenerContainerFactory<String, Result> factory) {
		ConcurrentMessageListenerContainer<String, Result> replyContainer = factory.createContainer(replyTopic);
		replyContainer.getContainerProperties().setMissingTopicsFatal(false);
		replyContainer.getContainerProperties().setGroupId(groupId);
		return new ReplyingKafkaTemplate<>(pf, replyContainer);
	}

	@Bean
	public KafkaTemplate<String, Result> replyTemplate(ProducerFactory<String, Result> pf,
			ConcurrentKafkaListenerContainerFactory<String, Result> factory) {
		KafkaTemplate<String, Result> kafkaTemplate = new KafkaTemplate<>(pf);
		factory.getContainerProperties().setMissingTopicsFatal(false);
		factory.setReplyTemplate(kafkaTemplate);
		return kafkaTemplate;
	}
}

Spring provides a ReplyingKafkaTemplate <K, V, R>, which offers a return object or a reply object once the message is consumed by the kafka listener from another side. The type parameters represent: K – Key type, V – Outbound data type and R – Reply data type.

In our example, we have defined the bean as ReplyingKafkaTemplate<String, Student, Result>, where we will send the Student details to kafka and get Result as reply data type. We are going to create the Student and the Result DTO classes in the next section.

We also need to define a KafkaTemplate bean, which is used as the reply template with producer factory having data type: ProducerFactory<String, Result>. This template is set as the reply template of ConcurrentKafkaListenerContainerFactory(which is the expecting Result as a return type from the listener).

Create request and reply DTO classes

We will create a Student and the Result DTO classes. These classes are used as request and reply data types.

Student.java

We will be posting the Student details like name, registration number, etc to the Kafka server and get Result information in return.

package com.asb.example;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class Student {
	private String registrationNumber;
	private String name;
	private String grade;
}

Result.java

Reply data will contain the calculated result, student name, and percentage details.

package com.asb.example;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class Result {
	
	private String name;
	private String percentage;
	private String result;
}

Create a Rest Controller class

To input student details, let us create a controller class. This RESTful controller will contain a POST endpoint. The endpoint will accept Student details and passed that information to the Kafka server to get the calculated result.

KafkaController.java

We have exposed a HTTP POST endpoint called get-result. This end point expects Student object and returns calculated Result details.

With the help of ReplyingKafkaTemplate<String, Student, Result> instance, we are sending the student details by using the sendAndReceive() method.

package com.asb.example;

import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
import org.springframework.kafka.requestreply.RequestReplyFuture;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class KafkaController {

	@Value("${kafka.reuest.topic}")
	private String requestTopic;

	@Autowired
	private ReplyingKafkaTemplate<String, Student, Result> replyingKafkaTemplate;

	@PostMapping("/get-result")
	public ResponseEntity<Result> getObject(@RequestBody Student student)
			throws InterruptedException, ExecutionException {
		ProducerRecord<String, Student> record = new ProducerRecord<>(requestTopic, null, student.getRegistrationNumber(), student);
		RequestReplyFuture<String, Student, Result> future = replyingKafkaTemplate.sendAndReceive(record);
		ConsumerRecord<String, Result> response = future.get();
		return new ResponseEntity<>(response.value(), HttpStatus.OK);
	}
}

Create a Listener class

Create a class called StudentResultCalculator as shown below. This class will have a Kafka listener method, which receives the request messages and responds to the reply topic with calculated student’s result details.

StudentResultCalculator.java

We have annotated the class with @Component to register it as a spring bean. The @KafkaListener is used to subscribe to the request Kafka topic.

Using the annotation @SendTo enables the listener method the capability to send a response back to another reply topic.

We also have a crazy little logic which randomly calculates student result and percentage. 🙂 🙂

package com.asb.example;

import java.util.concurrent.ThreadLocalRandom;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

@Component
public class StudentResultCalculator {

	@KafkaListener(topics = "${kafka.reuest.topic}", groupId = "${kafka.group.id}")
	@SendTo
	public Result handle(Student student) {
		System.out.println("Calculating Result...");
		double total = ThreadLocalRandom.current().nextDouble(2.5, 9.9);
		Result result = new Result();
		result.setName(student.getName());
		result.setResult((total > 3.5) ? "Pass" : "Fail");
		result.setPercentage(String.valueOf(total * 10).substring(0, 4) + "%");
		return result;
	}
}

Testing the application with Postman

Let’s test our crazy result calculator and test our luck! 🙂

Start the spring boot application. Post the student details as shown below. (Make sure to start the Kafka server locally).

kafka request reply example

Conclusion

In this article, we learned how to perform synchronous communication with spring kafka. We also learned how to return different data type as reply data from the consumer side.

Hope you have enjoyed the article.

Complete code is available in Github. Happy coding! 🙂

You may also interested in