Apache Kafka Producer-Consumer Example – Spring Boot

In the previous article, we learned how to set up Apache Kafka on the Windows system. In this article, we will learn how to create a Kafka topic producer and how to create a consumer to consume the topic using the spring boot.

Let us begin!! 🙂

Table of Contents

Create Spring Boot Kafka Producer application

Create a spring boot application with required dependencies. We need to add spring-boot-starter-web, spring-kafka, and lombok(optional, just to reduce boilerplate code) dependencies.

The below image shows the required dependencies added while creating the spring boot application.

kafka dependencies

Also, add the gson dependency by adding the maven dependency into the spring boot application’s pom.xml file.

The pom.xml file with all the dependencies is shown below.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.2.0.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.asb.example</groupId>
	<artifactId>spring-kafka-example</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>spring-kafka-example</name>
	<description>Demo project for Spring Boot</description>
	<properties>
		<java.version>1.8</java.version>
		<maven-jar-plugin.version>3.1.1</maven-jar-plugin.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
			<exclusions>
				<exclusion>
					<groupId>org.junit.vintage</groupId>
					<artifactId>junit-vintage-engine</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka-test</artifactId>
			<scope>test</scope>
		</dependency>
		<!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
		<dependency>
		    <groupId>com.google.code.gson</groupId>
		    <artifactId>gson</artifactId>
		</dependency>
	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

Setting up Kafka producer configuration

The next step is to set up the Kafka configurations in our spring boot application.

We can configure Kafka configuration in two ways. By adding the Kafka configuration properties inside the application.properties file of our spring boot application or by creating a configuration class to override the required default Kafka configurations.

Using application.properties

We can set up the Kafka producer configuration by adding the following configuration properties.

spring.kafka.bootstrap-servers=127.0.0.1:9092
##Producer Serialization:
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
  • spring.kafka.bootstrap-servers: List of Kafka servers along with the port.
  • spring.kafka.producer.key-serializer: Kafka producer key serializer class. We have used the StringSerializer class of the Kafka library.
  • spring.kafka.producer.value-serializer: Kafka producer value serializer class. We have used the StringSerializer class of the Kafka library. This is helpful when we have different objects as values, that can be converted into JSON formatted string before produced by Kafka producer.

Using configuration class

We can create a spring configuration class and register Kafka beans with the required configurations.

@Configuration
public class KafkaConfig {
	@Bean
	ProducerFactory<String, String> producerFactory() {
		Map<String, Object> config = new HashMap<>();
		config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
		config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		return new DefaultKafkaProducerFactory<String, String>(config);
	}
	@Bean
	KafkaTemplate<String, String> kafkaTemplate() {
		return new KafkaTemplate<String, String>(producerFactory());
	}
}

Create a DTO class

Create an Employee.java DTO class. We will use the object of this DTO class to create and produce a Kafka topic message.

@Getter
@Setter
@ToString
public class Employee {
	private Integer id;
	private String name;
}

Create a REST controller class

We need to expose a REST endpoint, which takes the Employee object as input and sends it to the Kafka topic.

Create a java class KafkaProducerController.java as shown below.

Here, we have an HTTP POST method at the “/produce” endpoint. This endpoint takes the Employee object and converts it into a JSON formatted string. Then it produces the JSON formatted string to Kafka topic called “test“.

Also, we have injected the KafkaTemplate and the Gson beans into our controller class.

We have used the Gson library for converting the Employee object to JSON. Also, we can use the KafkaTemplate instance to produce the topic into the Kafka server.

@RestController
public class KafkaProducerController {
	@Autowired
	private KafkaTemplate<String, String> kafkaTemplate;
	@Autowired
	private Gson gson;
	@PostMapping("/produce")
	public ResponseEntity<String> postModelToKafka(@RequestBody Employee emp)
			throws InterruptedException, ExecutionException {
		ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send("test", gson.toJson(emp));
		return new ResponseEntity<>(result.get().getProducerRecord().value(), HttpStatus.OK);
	}
}

Start the spring boot application and also hit the endpoint “/produce” with a proper input body as shown below.

Producer example kafka spring boot.

we can now check all the messages produced under the Kafka topic “test” using the following command.

kafka-console-consumer.bat –bootstrap-server localhost:9092 –topic test –from-beginning

The below image shows the list of messages produced under test topics in my system.

Kafka topic messages list.

Finally, we have successfully created a Kafka topic and produced an employee object as the message. 🙂

Create Spring boot Kafka consumer application

We will create a new spring boot application and also configure the Kafka consumer configuration inside the new application.

create a spring boot application with required spring boot application dependencies. We will be using the same dependencies, that we used before for the producer applications. Add spring-boot-starter-web, spring-kafka, lombok, and gson dependencies.

Setting up Kafka consumer configuration

As seen earlier for producer application configuration, we can configure consumer applications with the application.properties file or by using java configuration class.

Using application.properties

We can configure the Kafka consumer configuration by adding the following properties.

server.port=8081
#Kafka config props:
spring.kafka.bootstrap-servers=127.0.0.1:9092
##Consumer Deserialization:
spring.kafka.consumer.key-deserializer= org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.group-id=myGroupId
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.missing-topics-fatal=false
  • server.port: We will use a different application port, as the default port is already used by the Kafka producer application.
  • spring.kafka.bootstrap-servers: List of available Kafka servers.
  • spring.kafka.consumer.key-deserializer: Consumer key de-serialization class. we are using the StringDeserializer class of Kafka library.
  • spring.kafka.consumer.value-deserializer: Consumer value de-serialization class. we are using the StringDeserializer class of Kafka library as we are consuming JSON formatted string messages.
  • spring.kafka.consumer.group-id: A group id value for the Kafka consumer.
  • spring.kafka.consumer.enable-auto-commit: Setting this value to false we can commit the offset messages manually, which avoids crashing of the consumer if new messages are consumed when the currently consumed message is being processed by the consumer.
  • spring.kafka.listener.missing-topics-fatal: By setting the value to false, we can avoid unwanted errors that are displayed during application startup if any of the configured topics are not present on the broker

Using configuration class

We can also create a spring configuration class and register the required beans inside that class.

package com.asb.example;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import com.google.gson.Gson;
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
	@Bean
	public ConsumerFactory<String, String> consumerFatory() {
		Map<String, Object> config = new HashMap<>();
		config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
		config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		config.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroupId");
		config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
		return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new StringDeserializer());
	}
	@Bean
	public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
		ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
		concurrentKafkaListenerContainerFactory.setConsumerFactory(consumerFatory());
		concurrentKafkaListenerContainerFactory.setMissingTopicsFatal(false);
		return concurrentKafkaListenerContainerFactory;
	}
	@Bean
	public Gson gsonJsonConverter() {
		return new Gson();
	}
}
  • @EnableKafka: This annotation enables the consumer application as Kafka listener.

Create DTO and Controller classes

Since we are consuming the Employee object produced by our Kafka application, we need to have the same DTO class inside our consumer application also.

Create a spring REST controller class with the name KafkaConsumerController as shown below.

package com.asb.example;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import com.google.gson.Gson;
@RestController
public class KafkaConsumerController {
	@Autowired
	private Gson gson;
	@KafkaListener(topics = { "test" })
	public void getTopics(@RequestBody String emp) {
		System.out.println("Kafka event consumed is: " + emp);
		Employee model = gson.fromJson(emp, Employee.class);
		System.out.println("Model converted value: " + model.toString());
	}
}

@KafkaListener: This annotation is used to annotate a method, that is used as a Kafka listener. we can pass a list of topics that should be consumed with the topics parameters as shown above.

In this controller class, we have a getTopics() method. This method keeps on listening to the Kafka server and also reads the newly produced messages of a particular topic(The topic name is “test” in our example).

Finally, the consumed value is converted into the Employee object and printed into the application’s console.

Kafka consumer output.

We have successfully created the Kafka consumer application!!

Conclusion

In this article, we learned how to create Kafka producer and consumer applications using spring boot.

We created an employee object, converted that into json formatted string and it to the Kafka message stream.

We also consumed that message using the @KafkaListener annotation on the consumer application and processed it successfully.

Example code is available on Github.