In this post, we'll take a look at how to work with Spring Boot and Apache Kafka. We'll cover the following topics:
Apache Kafka is a distributed streaming platform. It is used for building real-time data pipelines and streaming apps. It is horizontally scalable, fault-tolerant, and fast.
Kafka is written in Scala and Java. It has a publish-subscribe messaging model. It is a high-performance system.
Spring Boot is a Java-based framework used to create microservices. It is used to simplify the bootstrapping and development of a new Spring application.
Spring Boot is opinionated. It provides a set of default configurations. It makes it easy to create stand-alone, production-grade Spring-based applications.
In order to use Spring Boot with Apache Kafka, we need to add the following dependency to our project:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
We also need to add the following dependency if we want to use Apache Kafka with Avro:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-avro-serializer</artifactId>
</dependency>
Now that we have the dependencies in place, let's take a look at how to use Spring Boot with Apache Kafka.
First, we need to configure the Apache Kafka broker. We can do this by setting the following properties in our application.properties file:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.retries=0
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.consumer.group-id=test
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
In the above configuration, we are setting the following properties:
Now let's take a look at how to create a producer. We can do this by creating a Producer class:
@Component
public class Producer {
private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic, String message) {
LOGGER.info("sending message='{}' to topic='{}'", message, topic);
kafkaTemplate.send(topic, message);
}
}
In the above producer, we are using the KafkaTemplate to send messages to the Kafka broker.
Now let's take a look at how to create a consumer. We can do this by creating a Consumer class:
@Component
public class Consumer {
private static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);
@KafkaListener(topics = "test", groupId = "test")
public void listen(String message) {
LOGGER.info("received message='{}'", message);
}
}
In the above consumer, we are using the @KafkaListener annotation to listen for messages on the "test" topic.