In the previous article, we learned how to set up Apache Kafka on the Windows system. In this article, we will learn how to create a Kafka topic producer and how to create a consumer to consume the topic using the spring boot.
Let us begin!! 🙂
Table of Contents
- Create Spring Boot Kafka Producer application
- Create Spring boot Kafka consumer application
- Conclusion
Create Spring Boot Kafka Producer application
Create a spring boot application with required dependencies. We need to add spring-boot-starter-web, spring-kafka, and lombok(optional, just to reduce boilerplate code) dependencies.
The below image shows the required dependencies added while creating the spring boot application.

Also, add the gson dependency by adding the maven dependency into the spring boot application’s pom.xml file.
The pom.xml file with all the dependencies is shown below.
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.0.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.asb.example</groupId> <artifactId>spring-kafka-example</artifactId> <version>0.0.1-SNAPSHOT</version> <name>spring-kafka-example</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> <maven-jar-plugin.version>3.1.1</maven-jar-plugin.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency> <!-- https://mvnrepository.com/artifact/com.google.code.gson/gson --> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
Setting up Kafka producer configuration
The next step is to set up the Kafka configurations in our spring boot application.
We can configure Kafka configuration in two ways. By adding the Kafka configuration properties inside the application.properties file of our spring boot application or by creating a configuration class to override the required default Kafka configurations.
Using application.properties
We can set up the Kafka producer configuration by adding the following configuration properties.
spring.kafka.bootstrap-servers=127.0.0.1:9092 ##Producer Serialization: spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
- spring.kafka.bootstrap-servers: List of Kafka servers along with the port.
- spring.kafka.producer.key-serializer: Kafka producer key serializer class. We have used the StringSerializer class of the Kafka library.
- spring.kafka.producer.value-serializer: Kafka producer value serializer class. We have used the StringSerializer class of the Kafka library. This is helpful when we have different objects as values, that can be converted into JSON formatted string before produced by Kafka producer.
Using configuration class
We can create a spring configuration class and register Kafka beans with the required configurations.
@Configuration public class KafkaConfig { @Bean ProducerFactory<String, String> producerFactory() { Map<String, Object> config = new HashMap<>(); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<String, String>(config); } @Bean KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<String, String>(producerFactory()); } }
Create a DTO class
Create an Employee.java DTO class. We will use the object of this DTO class to create and produce a Kafka topic message.
@Getter @Setter @ToString public class Employee { private Integer id; private String name; }
Create a REST controller class
We need to expose a REST endpoint, which takes the Employee object as input and sends it to the Kafka topic.
Create a java class KafkaProducerController.java as shown below.
Here, we have an HTTP POST method at the “/produce” endpoint. This endpoint takes the Employee object and converts it into a JSON formatted string. Then it produces the JSON formatted string to Kafka topic called “test“.
Also, we have injected the KafkaTemplate and the Gson beans into our controller class.
We have used the Gson library for converting the Employee object to JSON. Also, we can use the KafkaTemplate instance to produce the topic into the Kafka server.
@RestController public class KafkaProducerController { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Autowired private Gson gson; @PostMapping("/produce") public ResponseEntity<String> postModelToKafka(@RequestBody Employee emp) throws InterruptedException, ExecutionException { ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send("test", gson.toJson(emp)); return new ResponseEntity<>(result.get().getProducerRecord().value(), HttpStatus.OK); } }
Start the spring boot application and also hit the endpoint “/produce” with a proper input body as shown below.

we can now check all the messages produced under the Kafka topic “test” using the following command.
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
The below image shows the list of messages produced under test topics in my system.

Finally, we have successfully created a Kafka topic and produced an employee object as the message. 🙂
Create Spring boot Kafka consumer application
We will create a new spring boot application and also configure the Kafka consumer configuration inside the new application.
create a spring boot application with required spring boot application dependencies. We will be using the same dependencies, that we used before for the producer applications. Add spring-boot-starter-web, spring-kafka, lombok, and gson dependencies.
Setting up Kafka consumer configuration
As seen earlier for producer application configuration, we can configure consumer applications with the application.properties file or by using java configuration class.
Using application.properties
We can configure the Kafka consumer configuration by adding the following properties.
server.port=8081 #Kafka config props: spring.kafka.bootstrap-servers=127.0.0.1:9092 ##Consumer Deserialization: spring.kafka.consumer.key-deserializer= org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.group-id=myGroupId spring.kafka.consumer.enable-auto-commit=false spring.kafka.listener.missing-topics-fatal=false
- server.port: We will use a different application port, as the default port is already used by the Kafka producer application.
- spring.kafka.bootstrap-servers: List of available Kafka servers.
- spring.kafka.consumer.key-deserializer: Consumer key de-serialization class. we are using the StringDeserializer class of Kafka library.
- spring.kafka.consumer.value-deserializer: Consumer value de-serialization class. we are using the StringDeserializer class of Kafka library as we are consuming JSON formatted string messages.
- spring.kafka.consumer.group-id: A group id value for the Kafka consumer.
- spring.kafka.consumer.enable-auto-commit: Setting this value to false we can commit the offset messages manually, which avoids crashing of the consumer if new messages are consumed when the currently consumed message is being processed by the consumer.
- spring.kafka.listener.missing-topics-fatal: By setting the value to false, we can avoid unwanted errors that are displayed during application startup if any of the configured topics are not present on the broker
Using configuration class
We can also create a spring configuration class and register the required beans inside that class.
package com.asb.example; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import com.google.gson.Gson; @Configuration @EnableKafka public class KafkaConsumerConfig { @Bean public ConsumerFactory<String, String> consumerFatory() { Map<String, Object> config = new HashMap<>(); config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroupId"); config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new StringDeserializer()); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>(); concurrentKafkaListenerContainerFactory.setConsumerFactory(consumerFatory()); concurrentKafkaListenerContainerFactory.setMissingTopicsFatal(false); return concurrentKafkaListenerContainerFactory; } @Bean public Gson gsonJsonConverter() { return new Gson(); } }
- @EnableKafka: This annotation enables the consumer application as Kafka listener.
Create DTO and Controller classes
Since we are consuming the Employee object produced by our Kafka application, we need to have the same DTO class inside our consumer application also.
Create a spring REST controller class with the name KafkaConsumerController as shown below.
package com.asb.example; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; import com.google.gson.Gson; @RestController public class KafkaConsumerController { @Autowired private Gson gson; @KafkaListener(topics = { "test" }) public void getTopics(@RequestBody String emp) { System.out.println("Kafka event consumed is: " + emp); Employee model = gson.fromJson(emp, Employee.class); System.out.println("Model converted value: " + model.toString()); } }
@KafkaListener: This annotation is used to annotate a method, that is used as a Kafka listener. we can pass a list of topics that should be consumed with the topics parameters as shown above.
In this controller class, we have a getTopics() method. This method keeps on listening to the Kafka server and also reads the newly produced messages of a particular topic(The topic name is “test” in our example).
Finally, the consumed value is converted into the Employee object and printed into the application’s console.

We have successfully created the Kafka consumer application!!
Conclusion
In this article, we learned how to create Kafka producer and consumer applications using spring boot.
We created an employee object, converted that into json formatted string and it to the Kafka message stream.
We also consumed that message using the @KafkaListener annotation on the consumer application and processed it successfully.
Example code is available on Github.