In the previous article, we learned how to set up Apache Kafka on the Windows system.

In this article, we will learn how to create a Kafka topic producer and how to create a consumer to consume the topic.

Let us begin!! πŸ™‚

Create Spring Boot Kafka Producer application

Create a spring boot application with required dependencies. We need to add spring-boot-starter-web, spring-kafka, and lombok(optional, just to reduce boilerplate code) dependencies.

The below image shows the required dependencies added while creating the spring boot application.

kafka dependencies

Also, add gson dependency by adding the maven dependency into spring boot application’s pom.xml file.

The pom.xml file with all the dependencies is shown below.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.2.0.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.asb.example</groupId>
	<artifactId>spring-kafka-example</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>spring-kafka-example</name>
	<description>Demo project for Spring Boot</description>

	<properties>
		<java.version>1.8</java.version>
		<maven-jar-plugin.version>3.1.1</maven-jar-plugin.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
			<exclusions>
				<exclusion>
					<groupId>org.junit.vintage</groupId>
					<artifactId>junit-vintage-engine</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka-test</artifactId>
			<scope>test</scope>
		</dependency>
		<!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
		<dependency>
		    <groupId>com.google.code.gson</groupId>
		    <artifactId>gson</artifactId>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

Setting up Kafka producer configuration

The next step is to set up the Kafka related configurations in our spring boot application.

We can configure Kafka configuration in two ways. By adding the Kafka configuration properties inside the application.properties file of our spring boot application or by creating a configuration class to override the required default Kafka configurations.

Using application.properties

We can set up the Kafka producer configuration by adding the following configuration properties.

spring.kafka.bootstrap-servers=127.0.0.1:9092
##Producer Serialization:
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
  • spring.kafka.bootstrap-servers: List of Kafka servers along with the port.
  • spring.kafka.producer.key-serializer: Kafka producer key serializer class. We have used the StringSerializer class of the Kafka library.
  • spring.kafka.producer.value-serializer: Kafka producer value serializer class. We have used the StringSerializer class of the Kafka library. This is helpful when we have different objects as values, that can be converted into JSON formatted string before produced by Kafka producer.

Using configuration class

We can create a spring configuration class and register Kafka beans with required configurations.

@Configuration
public class KafkaConfig {

	@Bean
	ProducerFactory<String, String> producerFactory() {
		Map<String, Object> config = new HashMap<>();
		config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
		config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		return new DefaultKafkaProducerFactory<String, String>(config);
	}

	@Bean
	KafkaTemplate<String, String> kafkaTemplate() {
		return new KafkaTemplate<String, String>(producerFactory());
	}
}

Create a DTO class

Create an Employee.java DTO class. We will use the object of this DTO class to create and produce as a Kafka topic message.

@Getter
@Setter
@ToString
public class Employee {

	private Integer id;
	private String name;
}

Create a REST controller class

We need to expose a REST endpoint, which takes the Employee object as input and send as Kafka topic.

Create a java class KafkaProducerController.java as shown below.

Here, we have an HTTP POST method at the “/produce” endpoint. This endpoint takes Employee object and converts it into a JSON formatted string. Then it produces the JSON formatted string to Kafka topic called “test“.

We have injected KafkaTemplate and Gson beans into our controller class.

We have used the Gson library for converting the Employee object to JSON. KafkaTemplate is used to produce the topic into the Kafka server.

@RestController
public class KafkaProducerController {

	@Autowired
	private KafkaTemplate<String, String> kafkaTemplate;

	@Autowired
	private Gson gson;

	@PostMapping("/produce")
	public ResponseEntity<String> postModelToKafka(@RequestBody Employee emp)
			throws InterruptedException, ExecutionException {
		ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send("test", gson.toJson(emp));
		return new ResponseEntity<>(result.get().getProducerRecord().value(), HttpStatus.OK);
	}
}

Start the spring boot application and hit the endpoint “/produce” with a proper input body as shown below.

Producer example kafka spring boot.

we can now check all the messages produced under the Kafka topic “test” using the following command.

kafka-console-consumer.bat –bootstrap-server localhost:9092 –topic test –from-beginning

The below image shows the list of messages produced under test topics in my system.

Kafka topic messages list.

Congratulations!! we have successfully created a Kafka topic and produced an employee object as message. πŸ™‚

Create Spring boot Kafka consumer application

We will create a new spring boot application and configure the Kafka consumer configuration inside the new application.

create a spring boot application with required spring boot application dependencies. We will be using the same dependencies, that we used before for producer application. Add spring-boot-starter-web, spring-kafka, lombok and gson dependencies.

Setting up Kafka consumer configuration

As seen earlier for producer application configuration, we can configure consumer application with application.properties or by using java configuration class.

Using application.properties

We can configure the Kafka consumer configuration adding the following properties.

server.port=8081
#Kafka config props:
spring.kafka.bootstrap-servers=127.0.0.1:9092
##Consumer Deserialization:
spring.kafka.consumer.key-deserializer= org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.group-id=myGroupId
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.missing-topics-fatal=false
  • server.port: We will use a different application port, as the default port is already used by the Kafka producer application.
  • spring.kafka.bootstrap-servers: List of available Kafka servers.
  • spring.kafka.consumer.key-deserializer: Consumer key de-serialization class. we are using the StringDeserializer class of Kafka library.
  • spring.kafka.consumer.value-deserializer: Consumer value de-serialization class. we are using the StringDeserializer class of Kafka library as we are consuming JSON formatted string messages.
  • spring.kafka.consumer.group-id: A group id value for the Kafka consumer.
  • spring.kafka.consumer.enable-auto-commit: Setting this value to false we can commit the offset messages manually, which avoids crashing of the consumer if new messages are consumed when the currently consumed message is being processed by the consumer.
  • spring.kafka.listener.missing-topics-fatal: By setting the value to false, we can avoid unwanted errors that are displayed during application startup if any of the configured topics are not present on the broker

Using configuration class

We can also create a spring configuration class and register the required beans inside that class.

package com.asb.example;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import com.google.gson.Gson;

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

	@Bean
	public ConsumerFactory<String, String> consumerFatory() {
		Map<String, Object> config = new HashMap<>();
		config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
		config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		config.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroupId");
		config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
		return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new StringDeserializer());
	}

	@Bean
	public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
		ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
		concurrentKafkaListenerContainerFactory.setConsumerFactory(consumerFatory());
		concurrentKafkaListenerContainerFactory.setMissingTopicsFatal(false);
		return concurrentKafkaListenerContainerFactory;
	}

	@Bean
	public Gson gsonJsonConverter() {
		return new Gson();
	}
}
  • @EnableKafka: This annotation enables the consumer application as Kafka listener.

Create DTO and Controller classes

Since we are consuming the Employee object produced by our Kafka application, we need to have the same DTO class inside our consumer application also.

Create a KafkaConsumerController.java class as shown below.

package com.asb.example;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

import com.google.gson.Gson;

@RestController
public class KafkaConsumerController {

	@Autowired
	private Gson gson;

	@KafkaListener(topics = { "test" })
	public void getTopics(@RequestBody String emp) {
		System.out.println("Kafka event consumed is: " + emp);
		Employee model = gson.fromJson(emp, Employee.class);
		System.out.println("Model converted value: " + model.toString());
	}
}

@KafkaListener: This annotation is used to annotate a method, that is used as a Kafka listener. we can pass a list of topics that should be consumed with topics parameter as shown above.

In this controller class, we have a getTopics() method. This method keeps on listening to the Kafka server and reads the newly produced messages of a particular topic(The topic name is “test” in our example).

The consumed value is converted into the Employee object and printed into the application’s console.

Kafka consumer output.

Congratulations!! πŸ™‚ We have successfully created Kafka consumer application!! πŸ™‚ Good job!! πŸ™‚ πŸ™‚

Conclusion

In this article, we learned how to create Kafka producer and consumer applications using spring boot.

We created an employee object, converted that into json formatted string and it to the Kafka message stream. We consumed that message using @KafkaListener on the consumer application and processed it successfully.

Example code is available on Github. Happy coding!! πŸ™‚

You may also interested in