[ad_1]
Within the earlier chapter, we did a fast begin with Kafka in Spring Boot. However on this chapter, we’re going to deep-dive into every of its parts and choices to know if we are able to use it to implement our personal use-case. So as to join with Kafka, we have to focus primarily into the next beans offered by Kafka:
KafkaAdmin
– To configure subjects in Kafka programmaticallyProducerFactory
– To outline or override configurations to provide messages to KafkaConsumerFactory
– To outline or override configurations to devour messages from Kafka
Now earlier than we begin trying into every of the parts, we’re going to once more think about Steve and Jane’s e-commerce app that we had mentioned in our earlier chapters and choose a fundamental microservice that we are able to use to implement.
Since, we might be specializing in easy pub-sub module, let’s choose the Stock Service the place we’ll publish the brand new objects added to the stock listing into the Kafka matter after which devour it to replace or ahead that info to different companies or subjects.
Configuration of Kafka Matters
Let’s begin with our first part as a part of the Kafka in Spring Boot. Until now, if in case you have observed, we’ve got used command-line instruments to create subjects in Kafka. For instance, within the earlier chapter we ran one thing like this:
bin/kafka-topics.sh
--create
--zookeeper localhost:2181
--replication-factor 3
--partitions 3
--topic check
--config min.insync.replicas=2
However in Spring Boot, they’ve launched and built-in AdminClient
from Kafka to create the subjects programmatically. We’ll outline a configuration to make use of KafkaAdmin
beans to outline new subjects. We’ll fetch the bootstrap servers and the subject particulars from the properties outlined as a part of software.yml
which we’ve got seen in our earlier chapter as effectively:
server:
port: 8080
spring:
kafka:
shopper:
bootstrap-servers: localhost:9092
matter: product-inventory
forward-to: fulfillment-station
group-id: group-id
auto-start: true
auto-offset-reset: earliest
producer:
bootstrap-servers: localhost:9092
matter: product-inventory
Let’s outline our KafkaTopicConfig
class:
@Configuration
public class KafkaTopicConfig {
@Worth(worth = "${spring.kafka.producer.bootstrap-servers}")
non-public String bootstrapServers;
@Worth(worth = "${spring.kafka.producer.matter}")
non-public String matter;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic matter() {
return TopicBuilder.identify(matter)
.partitions(10)
.replicas(3)
.compact()
.construct();
}
}
This may create the product-inventory matter in Kafka with given configuration. KafkaAdmin
additionally supplies functionality to outline a number of subjects in a sequence by defining a number of TopicBuilder
config:
@Bean
public KafkaAdmin.NewTopics multipleTopics() {
return new KafkaAdmin.NewTopics(
TopicBuilder.identify("stackabuse-product-inventory")
.construct(),
TopicBuilder.identify("product-inventory-bytes")
.replicas(1)
.construct(),
TopicBuilder.identify("fulfillment-station")
.partitions(5)
.construct()
);
}
As you may see we’re additionally creating few extra subjects like stackabuse-product-inventory, product-inventory-bytes and fulfillment-station. We’ll look into every of its use-case additional on this chapter as we progress.
Publishing Messages to Matter
As soon as we’re achieved with the creation of subjects, let’s look into our mechanism to publish messages. Since Producer cases are thread secure, if we use a single occasion all through an software context, it would give greater efficiency. So let’s begin defining a KafkaTemplate
occasion and use it to ship messages.
KafkaTemplate
We have to first create a ProducerFactory
occasion to set the technique for configuring Kafka Producer occasion. Then we’ll outline a KafkaTemplate
bean which might wrap this Producer occasion and supply numerous comfort strategies to ship messages to Kafka matter. So we’ll first outline a HashMap
to overload the properties which we’ll go to ProducerFactory
which in flip might be once more handed to KafkaTemplate
:
@Configuration
public class KafkaProducerConfig {
@Worth(worth = "${spring.kafka.producer.bootstrap-servers}")
non-public String bootstrapServer;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Since KakfaTemplate cases are additionally thread secure, it’s at all times advisable to make use of one occasion of it. Now we’ll use this KafkaTemplate
occasion to ship our messages. So let’s outline our Controller class and obtain the messages as API to provide it in matter.
@Slf4j
@RestController
@RequestMapping(worth = "/stock")
public class PublishInventoryController {
@Worth(worth = "${spring.kafka.producer.matter}")
non-public String matter;
@Autowired
non-public KafkaTemplate<String, String> kafkaTemplate;
@PostMapping(worth = "/publish")
public void sendMessage(@RequestParam("message") String message) {
log.data(String.format("Message despatched to stock -> %s", message));
kafkaTemplate.ship(matter, message);
}
}
Now if we begin our software then we are able to publish our messages into the subject by calling the next API:
curl -i -X POST
-H "Content material-Kind:software/json"
-d
''
'http://localhost:8080/stock/publish?message=Hellopercent2520Inventory'
The ship
technique in KafkaTemplate
is asynchronous in nature. Kafka is type of a quick stream processing platform. Therefore, it is at all times higher to deal with the outcomes asynchronously in order that the following messages don’t await the results of the earlier message. However in any case, if we need to block the sending thread and carry out some actions then it additionally returns a ListenableFuture
object. We will name the get
API of the ListenableFuture
object in order that it the thread would await the callback outcome. However it will decelerate the producer.
@PostMapping(worth = "/publish-with-callback")
public void sendMessageWithCallback(@RequestParam("message") String message) {
log.data(String.format("Message with callback despatched to stock -> %s", message));
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.ship(matter, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> outcome) {
log.data("Despatched message=[" + item +
"] with offset=[" + result.getRecordMetadata().offset() + "]");
}
@Override
public void onFailure(Throwable ex) {
log.error("Unable to ship message=[" + item + "] as a result of error : " + ex.getMessage());
}
});
}
Now we are able to once more name the API, however this time it would print the messages within the logs with the returned callback response.
curl -i -X POST
-H "Content material-Kind:software/json"
-d
''
'http://localhost:8080/stock/publish-with-callback?message=Hellopercent2520Inventory'
If we don’t need to work with the Future
, then we are able to additionally register a ProducerListener
occasion and go it whereas creating the KafkaTemplate
occasion.
@Slf4j
@Configuration
public class KafkaProducerConfig {
...
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
KafkaTemplate<String, String> template = new KafkaTemplate<>(producerFactory());
template.setProducerListener(new ProducerListener<String, String>() {
@Override
public void onSuccess(ProducerRecord<String, String> producerRecord,
RecordMetadata recordMetadata) {
log.data("ACK obtained from ProducerListener message: {} offset: {}",
producerRecord.worth(), recordMetadata.offset());
}
});
return template;
}
}
RoutingKafkaTemplate
Suppose if we’ve got a number of producers with totally different configurations or matter names, then we are able to use RoutingKafkaTemplate
to pick out producers at runtime primarily based upon sure regex of the subject names at runtime.
@Configuration
public class KafkaProducerConfig {
...
@Bean
public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context) {
// ProducerFactory with Bytes serializer
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(props);
context.registerBean(DefaultKafkaProducerFactory.class, "bytesPF", bytesPF);
// ProducerFactory with String serializer
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
DefaultKafkaProducerFactory<Object, Object> stringPF = new DefaultKafkaProducerFactory<>(props);
Map<Sample, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
map.put(Sample.compile(".*-bytes"), bytesPF);
map.put(Sample.compile("stackabuse-.*"), stringPF);
return new RoutingKafkaTemplate(map);
}
}
In order we are able to see, RoutingKafkaTemplate
takes a HashMap of java.util.regex.Sample
and ProducerFactory
occasion after which redirects the messages to the ProducerFactory
matching a given matter identify. Now we have created two patterns, .*-bytes
which is able to use ByteArraySerializer
and stackabuse-.*
which is able to use StringSerializer
respectively. If you happen to discover above, we had created this subjects earlier.
Consuming Messages from Matter
As soon as we’re achieved with publishing messages into Kafka matter, let’s shortly look into numerous methods to devour or obtain messages from a given matter.
Message Listener Containers
We will obtain messages by configuring a MessageListenerContainer
as a bean. Spring Boot supplies two MessageListenerContainer
implementations:
KafkaMessageListenerContainer
ConcurrentMessageListenerContainer
The KafkaMessageListenerContainer
receives all message from all subjects or partitions on a single thread whereas the ConcurrentMessageListenerContainer
delegates to a number of KafkaMessageListenerContainer
cases to supply multi-threaded consumption.
@KafkaListener at Technique Degree
So as to obtain messages asynchronously from Kafka, Spring Boot additionally supplies @KafkaListener
annotation which requires to configure a ConsumerFactory
and KafkaListenerContainerFactory
bean. Yet another factor to notice is that, @EnableKafka
annotation can be required to be outlined on the configuration class to allow detection of @KafkaListener
annotation on Spring managed beans.
Let’s first outline KafkaConsumerConfig
:
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Worth(worth = "${spring.kafka.shopper.bootstrap-servers}")
non-public String bootstrapServer;
@Worth(worth = "${spring.kafka.shopper.group-id}")
non-public String groupId;
@Worth(worth = "${spring.kafka.shopper.auto-offset-reset}")
non-public String autoOffsetResetConfig;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetConfig);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> manufacturing facility =
new ConcurrentKafkaListenerContainerFactory<>();
manufacturing facility.setConsumerFactory(consumerFactory());
manufacturing facility.setBatchListener(true);
manufacturing facility.setConcurrency(3);
manufacturing facility.getContainerProperties().setPollTimeout(3000);
return manufacturing facility;
}
}
Then we’ll outline a Service layer to devour the messages and print it in console:
@Slf4j
@Service
public class ConsumeInventoryService {
@KafkaListener(id = "${spring.kafka.shopper.group-id}",
subjects = "${spring.kafka.shopper.matter}",
groupId = "${spring.kafka.shopper.group-id}",
autoStartup = "${spring.kafka.shopper.auto-start:true}")
public void listenMessage(String message,
@Header(identify = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false)
Integer key,
@Header(KafkaHeaders.RECEIVED_TOPIC) String matter,
@Header(KafkaHeaders.GROUP_ID) String groupId,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) lengthy ts) {
log.data("Acquired Message {} with key {} in matter {} as a part of group {} at partition {} at timestamp {}.",
message,
key,
matter,
groupId,
partition,
ts);
}
}
As you may see, we’re additionally printing the varied message headers of Kafka utilizing KafkaHeaders
object.
@KafkaListener at Class Degree
Within the earlier part, we’ve got used @KafkaListener
on the technique degree. However Spring Boot additionally permits to outline @KafkaListener
on the class degree. So as to do this, we should outline @KafkaHandler
on the technique degree. Normally each time the message is obtained, the strategy to name is decided by the transformed message payload kind. We will additionally designate a @KafkaHandler
technique because the default technique if there is no such thing as a match on different strategies.
@Slf4j
@Service
@KafkaListener(id = "multi-"+"${spring.kafka.shopper.group-id}",
subjects = "${spring.kafka.shopper.matter}",
groupId = "multi-"+"${spring.kafka.shopper.group-id}",
autoStartup = "${spring.kafka.shopper.@auto-start:true}")
public class ConsumeInventoryMultiListenerService {
@KafkaHandler
void pay attention(String message) {
log.data("KafkaHandler-String: {}", message);
}
@KafkaHandler(isDefault = true)
Object listenDefault(Object object) {
log.data("KafkaHandler-Default: {}", object);
return object;
}
}
Message Forwarder utilizing @SendTo annotation
Spring Boot additionally permits us to annotate a @KafkaListener
with a @SendTo
annotation to ahead the messages as it’s obtained to another matter. So as to execute this, we have to modify the configurations to just accept a reply template on Kafka Listener manufacturing facility.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
KafkaTemplate<String, String> kafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<String, String> manufacturing facility =
new ConcurrentKafkaListenerContainerFactory<>();
manufacturing facility.setConsumerFactory(consumerFactory());
manufacturing facility.setReplyTemplate(kafkaTemplate);
manufacturing facility.setBatchListener(true);
manufacturing facility.setConcurrency(3);
manufacturing facility.getContainerProperties().setPollTimeout(3000);
return manufacturing facility;
}
And eventually we are able to add a @SendTo
annotation together with @KafkaListener
:
@KafkaListener(id = "${spring.kafka.shopper.group-id}",
subjects = "${spring.kafka.shopper.matter}",
groupId = "${spring.kafka.shopper.group-id}",
autoStartup = "${spring.kafka.shopper.auto-start:true}")
@SendTo("${spring.kafka.shopper.forward-to}")
public void listenMessage(String message,
@Header(identify = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false)
Integer key,
@Header(KafkaHeaders.RECEIVED_TOPIC) String matter,
@Header(KafkaHeaders.GROUP_ID) String groupId,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) lengthy ts) {
log.data("Acquired Message {} with key {} in matter {} as a part of group {} at partition {} at timestamp {}.",
message,
key,
matter,
groupId,
partition,
ts);
}
@KafkaListener(id = "multi-"+"${spring.kafka.shopper.group-id}",
subjects = "${spring.kafka.shopper.matter}",
groupId = "multi-"+"${spring.kafka.shopper.group-id}",
autoStartup = "${spring.kafka.shopper.@auto-start:true}")
public class ConsumeInventoryMultiListenerService {
@KafkaHandler
@SendTo("${spring.kafka.shopper.forward-to}")
void pay attention(String message) {
log.data("KafkaHandler-String: {}", message);
}
@KafkaHandler(isDefault = true)
@SendTo("${spring.kafka.shopper.forward-to}")
Object listenDefault(Object object) {
log.data("KafkaHandler-Default: {}", object);
return object;
}
}
Learn from a particular partition
If we’ve got subjects with a number of partitions, then @KafkaListener
can subscribe explicitly to a selected partition of matter with an preliminary offset:
@Slf4j
@Service
@KafkaListener(id = "multi-"+"${spring.kafka.shopper.group-id}",
subjects = "${spring.kafka.shopper.matter}",
groupId = "multi-"+"${spring.kafka.shopper.group-id}",
autoStartup = "${spring.kafka.shopper.@auto-start:true}",
topicPartitions = @TopicPartition(matter = "${spring.kafka.shopper.matter}", partitionOffsets = {
@PartitionOffset(partition = "0", initialOffset = "0"),
@PartitionOffset(partition = "4", initialOffset = "0")}))
public class ConsumeInventoryMultiListenerService {
...
}
At any time when this explicit listener is initialized, all of the beforehand consumed messages from the partitions 0 and 4 might be re-consumed each time. The reason being that at each partition offset, the initialOffset
has been set to 0 within the listener.
Suppose, if there may be any use-case the place we don’t have to set the offset, then we are able to merely use the partitions property to set the partitions with out the offset.
@KafkaListener(id = "${spring.kafka.shopper.group-id}",
subjects = "${spring.kafka.shopper.matter}",
groupId = "${spring.kafka.shopper.group-id}",
autoStartup = "${spring.kafka.shopper.auto-start:true}",
topicPartitions = @TopicPartition(matter = "${spring.kafka.shopper.matter}",
partitions = { "0", "1", "3", "6", "8"}))
@SendTo("${spring.kafka.shopper.forward-to}")
public void listenMessage(String message,
@Header(identify = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false)
Integer key,
@Header(KafkaHeaders.RECEIVED_TOPIC) String matter,
@Header(KafkaHeaders.GROUP_ID) String groupId,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) lengthy ts) {
log.data("Acquired Message {} with key {} in matter {} as a part of group {} at partition {} at timestamp {}.",
message,
key,
matter,
groupId,
partition,
ts);
}
Message filters in Listeners
In case if we have to filter the incoming messages primarily based upon a regex or some content material, then we are able to add a customized filter to the listener that we’ve got outlined by setting a RecordFilterStrategy
.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
KafkaTemplate<String, String> kafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<String, String> manufacturing facility =
new ConcurrentKafkaListenerContainerFactory<>();
manufacturing facility.setConsumerFactory(consumerFactory());
manufacturing facility.setReplyTemplate(kafkaTemplate);
manufacturing facility.setBatchListener(true);
manufacturing facility.setConcurrency(3);
manufacturing facility.getContainerProperties().setPollTimeout(3000);
manufacturing facility.setRecordFilterStrategy(
document -> document.worth().getName().incorporates("Stock"));
return manufacturing facility;
}
Then we are able to go this container manufacturing facility to the @KafkaListener
annotation:
@KafkaListener(id = "${spring.kafka.shopper.group-id}",
subjects = "${spring.kafka.shopper.matter}",
groupId = "${spring.kafka.shopper.group-id}",
autoStartup = "${spring.kafka.shopper.auto-start:true}",
topicPartitions = @TopicPartition(matter = "${spring.kafka.shopper.matter}",
partitions = { "0", "1", "3", "6", "8"}),
containerFactory = "kafkaListenerContainerFactory")
@SendTo("${spring.kafka.shopper.forward-to}")
public void listenMessage(String message,
@Header(identify = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false)
Integer key,
@Header(KafkaHeaders.RECEIVED_TOPIC) String matter,
@Header(KafkaHeaders.GROUP_ID) String groupId,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) lengthy ts) {
log.data("Acquired Message {} with key {} in matter {} as a part of group {} at partition {} at timestamp {}.",
message,
key,
matter,
groupId,
partition,
ts);
}
Thus, all of the messages that match the matching filter might be discarded on this listener. On this approach, we are able to subscribe to the messages from the subject utilizing numerous means and additional course of the information as per our comfort.
Customized Serializers/Deserializers
Until now, we’ve got been sending or receiving String
messages. Nonetheless, Spring Boot additionally helps sending and receiving of customized objects. However that requires further configuration of serializer in ProducerFactory
and desrializer in ConsumerFactory
.
Within the beginning of this chapter, we determined that we are going to be engaged on the Stock service for the e-commerce app. So let’s now outline the easy bean class for an merchandise to be saved within the stock:
@ToString
@Builder
@Information
@AllArgsConstructor
@NoArgsConstructor
public class InventoryItem {
non-public String id;
non-public String identify;
non-public int rely;
non-public Date listingDate;
}
Right here we’re utilizing the Lombok annotations to outline a Builder sample and initiating the constructors. We’ll use these builder in our upcoming implementation.
Ship Customized Messages
Let’s outline a JsonSerializer
to serialize the article and ship it as a JSON content material to the subject. So as to do this we have to modify our current ProducerFactory
and replace our KafkaTemplate
:
@Configuration
public class KafkaProducerConfig {
...
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, InventoryItem> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, InventoryItem> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Now we are able to replace our Controller implementation to ship InventoryItem merchandise object to KafkaTemplate
:
@Autowired
non-public KafkaTemplate<String, InventoryItem> kafkaTemplate;
@PostMapping(worth = "/publish")
@Transactional("kafkaTransactionManager")
public void sendMessage(@RequestParam("identify") String identify, @RequestParam("rely") int rely) {
InventoryItem merchandise = InventoryItem.builder()
.id(UUID.randomUUID().toString())
.identify(identify)
.rely(rely)
.listingDate(new Date())
.construct();
log.data(String.format("Message despatched to stock -> %s", merchandise));
kafkaTemplate.ship(matter, merchandise);
}
Obtain Customized Messages
So as to devour the identical objects, we have to replace our ConsumerFactory
and ConcurrentKafkaListenerContainerFactory
to just accept the JSON objects and convert into Java POJOs:
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
...
@Bean
public ConsumerFactory<String, InventoryItem> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetConfig);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props,
new StringDeserializer(),
new JsonDeserializer<>(InventoryItem.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, InventoryItem> kafkaListenerContainerFactory(
KafkaTemplate<String, InventoryItem> kafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<String, InventoryItem> manufacturing facility = new ConcurrentKafkaListenerContainerFactory<>();
manufacturing facility.setConsumerFactory(consumerFactory());
manufacturing facility.setReplyTemplate(kafkaTemplate);
manufacturing facility.setBatchListener(true);
manufacturing facility.setConcurrency(3);
manufacturing facility.getContainerProperties().setPollTimeout(3000);
manufacturing facility.setRecordFilterStrategy(
document -> document.worth().getName().incorporates("Stock"));
return manufacturing facility;
}
}
Now we have to replace our service layer with KafkaListener
to devour the Stock merchandise.
@KafkaListener(id = "${spring.kafka.shopper.group-id}",
subjects = "${spring.kafka.shopper.matter}",
groupId = "${spring.kafka.shopper.group-id}",
autoStartup = "${spring.kafka.shopper.auto-start:true}",
topicPartitions = @TopicPartition(matter = "${spring.kafka.shopper.matter}",
partitions = { "0", "1", "3", "6", "8"}))
@SendTo("${spring.kafka.shopper.forward-to}")
public void listenMessage(@Payload @Legitimate InventoryItem merchandise,
@Header(identify = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false)
Integer key,
@Header(KafkaHeaders.RECEIVED_TOPIC) String matter,
@Header(KafkaHeaders.GROUP_ID) String groupId,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) lengthy ts) {
log.data("Acquired Message {} with key {} in matter {} as a part of group {} at partition {} at timestamp {}.",
merchandise,
key,
matter,
groupId,
partition,
ts);
}
We will annotate our objects with @Legitimate
to carry out any type of validation to just accept or reject the messages as effectively.
Transaction Help
Kafka helps transactional characteristic the place it ensures exactly-once message supply between producer and shopper functions by way of the Transactional API. The functions that primarily exhibit the “consume-process-produce” sample want to make use of transactions to assist atomic operations. For instance, if we’ve got the next coding sample then we’d like to verify of the exactly-once message supply:
1. devour(message) {
2. course of(message);
3. template.ship("matter", message1);
4. saveInDb(message);
5. template.ship("matter", message2);
6. }
Let’s take a fast take a look at numerous prospects the place we are able to see exceptions:
- If there may be any type of exception at line no. 2, then the message could be subsequent consumed when the patron restarts. This may be resolved if the messages is idempotant.
- Let’s say if an exception is thrown at line no. 3, then the message1 could be revealed when the patron restarts once more. Thus message1 is revealed twice.
- If an exception is thrown at line no. 4, then the message might be saved in database once more when the patron restarts. Thus the message is saved twice and the message1 can be revealed twice.
- Let’s say if there may be an exception after line no. 4, then message2 might be revealed once more when the patron restarts.
Now think about if we’ve got transactions in place, then each time any exception happens, then all of the messages which were revealed and the database transactions that occurred inside a transaction might be rolled again. When the patron restarts, then all of the operations throughout the transaction might be dedicated if all of the processes succeed.
Initially, Kafka use to assist solely at-most-once or at-least-once message supply. However with the introduction of transactions between Kafka brokers and purchasers, functions began making certain exactly-once message supply.
A Transaction-aware Producer
We will outline a typical Kafka producer to allow transactions. Initially we have to allow idempotence and specify a prefix for the transaction id:
spring:
kafka:
producer:
bootstrap-servers: localhost:9092
matter: product-inventory
enable-idempotence: true
transaction-id-prefix: tx-
Then we’ll replace our producer configuration to go these values as properties for Kafka Producer:
@Configuration
public class KafkaProducerConfig {
...
@Worth(worth = "${spring.kafka.producer.enable-idempotence}")
non-public String enableIdempotence;
@Worth(worth = "${spring.kafka.producer.transaction-id-prefix}")
non-public String transactionIdPrefix;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotence);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, InventoryItem> producerFactory() {
DefaultKafkaProducerFactory<String, InventoryItem> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(producerConfigs());
defaultKafkaProducerFactory.setTransactionIdPrefix(transactionIdPrefix);
return defaultKafkaProducerFactory;
}
@Bean
public KafkaTransactionManager<String, InventoryItem> kafkaTransactionManager() {
return new KafkaTransactionManager<>(producerFactory());
}
...
}
As we’ve got enabled idempotence, Kafka will use this transaction id prefix as a part of its precise algorithm to doubtlessly deduplicate any type of message that’s being despatched by the producer. This settings allow Kafka to note if the producer sends the identical message to the subject greater than as soon as. Now we have to simply be sure that the transaction id is sort of distinct for every producer whereas constant even when it restarts.
We will additionally discover that we’ve got outlined a Transaction Supervisor. We have to annotate the prevailing technique that sends the message with @Transactional
and go this transaction supervisor bean. We also needs to use executeInTransaction()
technique as a substitute of ship()
to course of the transactions.
@PostMapping(worth = "/publish")
@Transactional("kafkaTransactionManager")
public void sendMessage(@RequestParam("identify") String identify, @RequestParam("rely") int rely) {
InventoryItem merchandise = InventoryItem.builder()
.id(UUID.randomUUID().toString())
.identify(identify)
.rely(rely)
.listingDate(new Date())
.construct();
log.data(String.format("Message despatched to stock -> %s", merchandise));
kafkaTemplate.executeInTransaction(t -> t.ship(matter, merchandise));
}
This may register the dealer with the dealer which might now use transactions. The dealer in flip would use these to carry out write-ahead transactions to a transaction log. Thus the dealer will take away any actions type that log which belongs to a producer with the same transaction id however from an earlier epoch.
A Transaction-aware Shopper
Equally, we are able to be sure that the customers adhere to the transactions whereas consuming messages from the subject. Though we learn all of the messages from a subject partition in a selected order, we’ve got two choices to learn the transactional messages:
- read_committed: This permits us to not solely learn the messages that aren’t a part of the transaction, but additionally learn those after the transaction is dedicated.
- read_uncommitted: This permits us to learn all of the messages within the offset order with out ready for the general transactions to be dedicated.
We have to thus outline isolation.degree on the devour config to go on these property. The default worth of isolation.degree is read_uncommitted.
spring:
kafka:
shopper:
bootstrap-servers: localhost:9092
matter: product-inventory
forward-to: fulfillment-station
group-id: group-id
auto-start: true
auto-offset-reset: earliest
enable-auto-commit: false
isolation-level: read_committed
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
...
@Worth(worth = "${spring.kafka.shopper.isolation-level}")
non-public String isolationLevel;
@Worth(worth = "${spring.kafka.shopper.enable-auto-commit}")
non-public String enableAutoCommit;
@Bean
public ConsumerFactory<String, InventoryItem> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetConfig);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, isolationLevel);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props,
new StringDeserializer(),
new JsonDeserializer<>(InventoryItem.class));
}
...
}
Then we are able to additionally annotate our @KafkaListener
with @Transactional
annotation to verify it follows transactions.
@KafkaListener(id = "${spring.kafka.shopper.group-id}",
subjects = "${spring.kafka.shopper.matter}",
groupId = "${spring.kafka.shopper.group-id}",
autoStartup = "${spring.kafka.shopper.auto-start:true}",
topicPartitions = @TopicPartition(matter = "${spring.kafka.shopper.matter}",
partitions = { "0", "1", "3", "6", "8"}))
@SendTo("${spring.kafka.shopper.forward-to}")
@Transactional("kafkaTransactionManager")
public void listenMessage(@Payload InventoryItem merchandise,
@Header(identify = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false)
Integer key,
@Header(KafkaHeaders.RECEIVED_TOPIC) String matter,
@Header(KafkaHeaders.GROUP_ID) String groupId,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) lengthy ts) {
log.data("Acquired Message {} with key {} in matter {} as a part of group {} at partition {} at timestamp {}.",
merchandise,
key,
matter,
groupId,
partition,
ts);
}
Conclusion
On this chapter, we took a reasonably deep-dive into the varied options offered by Kafka assist for Spring. Now we have additionally constructed the Stock Service for the ecommerce-app that we began with to assist out Steve and Jane.
The general implementation for this and the earlier chapter could be present in GitHub.
Within the subsequent chapter, we’ll have a look into the Spring Cloud Stream and its numerous binders to assist Kafka.
[ad_2]