Deep-Dive into Spring Boot Kafka Elements

[ad_1]

Within the earlier chapter, we did a fast begin with Kafka in Spring Boot. However on this chapter, we’re going to deep-dive into every of its parts and choices to know if we are able to use it to implement our personal use-case. So as to join with Kafka, we have to focus primarily into the next beans offered by Kafka:

  • KafkaAdmin – To configure subjects in Kafka programmatically
  • ProducerFactory – To outline or override configurations to provide messages to Kafka
  • ConsumerFactory – To outline or override configurations to devour messages from Kafka

Now earlier than we begin trying into every of the parts, we’re going to once more think about Steve and Jane’s e-commerce app that we had mentioned in our earlier chapters and choose a fundamental microservice that we are able to use to implement.

Microservice E-commerce Event-Driven app

Since, we might be specializing in easy pub-sub module, let’s choose the Stock Service the place we’ll publish the brand new objects added to the stock listing into the Kafka matter after which devour it to replace or ahead that info to different companies or subjects.

Configuration of Kafka Matters

Let’s begin with our first part as a part of the Kafka in Spring Boot. Until now, if in case you have observed, we’ve got used command-line instruments to create subjects in Kafka. For instance, within the earlier chapter we ran one thing like this:

bin/kafka-topics.sh 
    --create 
    --zookeeper localhost:2181 
    --replication-factor 3 
    --partitions 3 
    --topic check 
    --config min.insync.replicas=2

However in Spring Boot, they’ve launched and built-in AdminClient from Kafka to create the subjects programmatically. We’ll outline a configuration to make use of KafkaAdmin beans to outline new subjects. We’ll fetch the bootstrap servers and the subject particulars from the properties outlined as a part of software.yml which we’ve got seen in our earlier chapter as effectively:

server:
  port: 8080
spring:
  kafka:
    shopper:
      bootstrap-servers: localhost:9092
      matter: product-inventory
      forward-to: fulfillment-station
      group-id: group-id
      auto-start: true
      auto-offset-reset: earliest
    producer:
      bootstrap-servers: localhost:9092
      matter: product-inventory

Let’s outline our KafkaTopicConfig class:

@Configuration
public class KafkaTopicConfig {

    @Worth(worth = "${spring.kafka.producer.bootstrap-servers}")
    non-public String bootstrapServers;

    @Worth(worth = "${spring.kafka.producer.matter}")
    non-public String matter;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        return new KafkaAdmin(configs);
    }

    @Bean
    public NewTopic matter() {
        return TopicBuilder.identify(matter)
                .partitions(10)
                .replicas(3)
                .compact()
                .construct();
    }
}

This may create the product-inventory matter in Kafka with given configuration. KafkaAdmin additionally supplies functionality to outline a number of subjects in a sequence by defining a number of TopicBuilder config:

@Bean
public KafkaAdmin.NewTopics multipleTopics() {
    return new KafkaAdmin.NewTopics(
           TopicBuilder.identify("stackabuse-product-inventory")
                   .construct(),
           TopicBuilder.identify("product-inventory-bytes")
                   .replicas(1)
                   .construct(),
           TopicBuilder.identify("fulfillment-station")
                   .partitions(5)
                   .construct()
    );
}

As you may see we’re additionally creating few extra subjects like stackabuse-product-inventory, product-inventory-bytes and fulfillment-station. We’ll look into every of its use-case additional on this chapter as we progress.

Publishing Messages to Matter

As soon as we’re achieved with the creation of subjects, let’s look into our mechanism to publish messages. Since Producer cases are thread secure, if we use a single occasion all through an software context, it would give greater efficiency. So let’s begin defining a KafkaTemplate occasion and use it to ship messages.

KafkaTemplate

We have to first create a ProducerFactory occasion to set the technique for configuring Kafka Producer occasion. Then we’ll outline a KafkaTemplate bean which might wrap this Producer occasion and supply numerous comfort strategies to ship messages to Kafka matter. So we’ll first outline a HashMap to overload the properties which we’ll go to ProducerFactory which in flip might be once more handed to KafkaTemplate:

@Configuration
public class KafkaProducerConfig {

    @Worth(worth = "${spring.kafka.producer.bootstrap-servers}")
    non-public String bootstrapServer;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

Since KakfaTemplate cases are additionally thread secure, it’s at all times advisable to make use of one occasion of it. Now we’ll use this KafkaTemplate occasion to ship our messages. So let’s outline our Controller class and obtain the messages as API to provide it in matter.

@Slf4j
@RestController
@RequestMapping(worth = "/stock")
public class PublishInventoryController {

    @Worth(worth = "${spring.kafka.producer.matter}")
    non-public String matter;

    @Autowired
    non-public KafkaTemplate<String, String> kafkaTemplate;

    @PostMapping(worth = "/publish")
    public void sendMessage(@RequestParam("message") String message) {
        log.data(String.format("Message despatched to stock -> %s", message));
        kafkaTemplate.ship(matter, message);
    }
}

Now if we begin our software then we are able to publish our messages into the subject by calling the next API:

curl -i -X POST 
   -H "Content material-Kind:software/json" 
   -d 
'' 
 'http://localhost:8080/stock/publish?message=Hellopercent2520Inventory'

The ship technique in KafkaTemplate is asynchronous in nature. Kafka is type of a quick stream processing platform. Therefore, it is at all times higher to deal with the outcomes asynchronously in order that the following messages don’t await the results of the earlier message. However in any case, if we need to block the sending thread and carry out some actions then it additionally returns a ListenableFuture object. We will name the get API of the ListenableFuture object in order that it the thread would await the callback outcome. However it will decelerate the producer.

@PostMapping(worth = "/publish-with-callback")
public void sendMessageWithCallback(@RequestParam("message") String message) {
    log.data(String.format("Message with callback despatched to stock -> %s", message));
    ListenableFuture<SendResult<String, String>> future = kafkaTemplate.ship(matter, message);
    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

    	@Override
        public void onSuccess(SendResult<String, String> outcome) {
            log.data("Despatched message=[" + item +
                     "] with offset=[" + result.getRecordMetadata().offset() + "]");
        }

        @Override
        public void onFailure(Throwable ex) {
            log.error("Unable to ship message=[" + item + "] as a result of error : " + ex.getMessage());
        }
    });
}

Now we are able to once more name the API, however this time it would print the messages within the logs with the returned callback response.

curl -i -X POST 
   -H "Content material-Kind:software/json" 
   -d 
'' 
 'http://localhost:8080/stock/publish-with-callback?message=Hellopercent2520Inventory'

If we don’t need to work with the Future, then we are able to additionally register a ProducerListener occasion and go it whereas creating the KafkaTemplate occasion.

@Slf4j
@Configuration
public class KafkaProducerConfig {

    ...

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        KafkaTemplate<String, String> template = new KafkaTemplate<>(producerFactory());
        template.setProducerListener(new ProducerListener<String, String>() {
           	 @Override
    		public void onSuccess(ProducerRecord<String, String> producerRecord,
                                  RecordMetadata recordMetadata) {
      			log.data("ACK obtained from ProducerListener message: {} offset: {}",
                         producerRecord.worth(), recordMetadata.offset());
    		}
        });
        return template;
    }
}

RoutingKafkaTemplate

Suppose if we’ve got a number of producers with totally different configurations or matter names, then we are able to use RoutingKafkaTemplate to pick out producers at runtime primarily based upon sure regex of the subject names at runtime.

@Configuration
public class KafkaProducerConfig {

    ...

    @Bean
    public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context) {
        // ProducerFactory with Bytes serializer
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(props);
        context.registerBean(DefaultKafkaProducerFactory.class, "bytesPF", bytesPF);

        // ProducerFactory with String serializer
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        DefaultKafkaProducerFactory<Object, Object> stringPF = new DefaultKafkaProducerFactory<>(props);
        Map<Sample, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
        map.put(Sample.compile(".*-bytes"), bytesPF);
        map.put(Sample.compile("stackabuse-.*"), stringPF);
        return new RoutingKafkaTemplate(map);
    }
}

In order we are able to see, RoutingKafkaTemplate takes a HashMap of java.util.regex.Sample and ProducerFactory occasion after which redirects the messages to the ProducerFactory matching a given matter identify. Now we have created two patterns, .*-bytes which is able to use ByteArraySerializer and stackabuse-.* which is able to use StringSerializer respectively. If you happen to discover above, we had created this subjects earlier.

Consuming Messages from Matter

As soon as we’re achieved with publishing messages into Kafka matter, let’s shortly look into numerous methods to devour or obtain messages from a given matter.

Message Listener Containers

We will obtain messages by configuring a MessageListenerContainer as a bean. Spring Boot supplies two MessageListenerContainer implementations:

  • KafkaMessageListenerContainer
  • ConcurrentMessageListenerContainer

The KafkaMessageListenerContainer receives all message from all subjects or partitions on a single thread whereas the ConcurrentMessageListenerContainer delegates to a number of KafkaMessageListenerContainer cases to supply multi-threaded consumption.

@KafkaListener at Technique Degree

So as to obtain messages asynchronously from Kafka, Spring Boot additionally supplies @KafkaListener annotation which requires to configure a ConsumerFactory and KafkaListenerContainerFactory bean. Yet another factor to notice is that, @EnableKafka annotation can be required to be outlined on the configuration class to allow detection of @KafkaListener annotation on Spring managed beans.

Let’s first outline KafkaConsumerConfig:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Worth(worth = "${spring.kafka.shopper.bootstrap-servers}")
    non-public String bootstrapServer;

    @Worth(worth = "${spring.kafka.shopper.group-id}")
    non-public String groupId;

    @Worth(worth = "${spring.kafka.shopper.auto-offset-reset}")
    non-public String autoOffsetResetConfig;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetConfig);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> manufacturing facility = 
            new ConcurrentKafkaListenerContainerFactory<>();
        manufacturing facility.setConsumerFactory(consumerFactory());
        manufacturing facility.setBatchListener(true);
        manufacturing facility.setConcurrency(3);
        manufacturing facility.getContainerProperties().setPollTimeout(3000);
        return manufacturing facility;
    }
}

Then we’ll outline a Service layer to devour the messages and print it in console:

@Slf4j
@Service
public class ConsumeInventoryService {

    @KafkaListener(id = "${spring.kafka.shopper.group-id}",
            subjects = "${spring.kafka.shopper.matter}",
            groupId = "${spring.kafka.shopper.group-id}",
            autoStartup = "${spring.kafka.shopper.auto-start:true}")
    public void listenMessage(String message,
                              @Header(identify = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false)
                              Integer key,
                              @Header(KafkaHeaders.RECEIVED_TOPIC) String matter,
                              @Header(KafkaHeaders.GROUP_ID) String groupId,
                              @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                              @Header(KafkaHeaders.RECEIVED_TIMESTAMP) lengthy ts) {
        log.data("Acquired Message {} with key {} in matter {} as a part of group {} at partition {} at timestamp {}.",
                message,
                key,
                matter,
                groupId,
                partition,
                ts);
    }
}

As you may see, we’re additionally printing the varied message headers of Kafka utilizing KafkaHeaders object.

@KafkaListener at Class Degree

Within the earlier part, we’ve got used @KafkaListener on the technique degree. However Spring Boot additionally permits to outline @KafkaListener on the class degree. So as to do this, we should outline @KafkaHandler on the technique degree. Normally each time the message is obtained, the strategy to name is decided by the transformed message payload kind. We will additionally designate a @KafkaHandler technique because the default technique if there is no such thing as a match on different strategies.

@Slf4j
@Service
@KafkaListener(id = "multi-"+"${spring.kafka.shopper.group-id}",
        subjects = "${spring.kafka.shopper.matter}",
        groupId = "multi-"+"${spring.kafka.shopper.group-id}",
        autoStartup = "${spring.kafka.shopper.@auto-start:true}")
public class ConsumeInventoryMultiListenerService {

    @KafkaHandler
    void pay attention(String message) {
        log.data("KafkaHandler-String: {}", message);
    }

    @KafkaHandler(isDefault = true)
    Object listenDefault(Object object) {
        log.data("KafkaHandler-Default: {}", object);
        return object;
    }
}

Message Forwarder utilizing @SendTo annotation

Spring Boot additionally permits us to annotate a @KafkaListener with a @SendTo annotation to ahead the messages as it’s obtained to another matter. So as to execute this, we have to modify the configurations to just accept a reply template on Kafka Listener manufacturing facility.

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
        KafkaTemplate<String, String> kafkaTemplate) {
    ConcurrentKafkaListenerContainerFactory<String, String> manufacturing facility =
        new ConcurrentKafkaListenerContainerFactory<>();
    manufacturing facility.setConsumerFactory(consumerFactory());
    manufacturing facility.setReplyTemplate(kafkaTemplate);
    manufacturing facility.setBatchListener(true);
    manufacturing facility.setConcurrency(3);
    manufacturing facility.getContainerProperties().setPollTimeout(3000);
    return manufacturing facility;
}

And eventually we are able to add a @SendTo annotation together with @KafkaListener:

@KafkaListener(id = "${spring.kafka.shopper.group-id}",
        subjects = "${spring.kafka.shopper.matter}",
        groupId = "${spring.kafka.shopper.group-id}",
        autoStartup = "${spring.kafka.shopper.auto-start:true}")
@SendTo("${spring.kafka.shopper.forward-to}")
public void listenMessage(String message,
                          @Header(identify = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false)
                          Integer key,
                          @Header(KafkaHeaders.RECEIVED_TOPIC) String matter,
                          @Header(KafkaHeaders.GROUP_ID) String groupId,
                          @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                          @Header(KafkaHeaders.RECEIVED_TIMESTAMP) lengthy ts) {
     log.data("Acquired Message {} with key {} in matter {} as a part of group {} at partition {} at timestamp {}.",
             message,
             key,
             matter,
             groupId,
             partition,
             ts);
}
@KafkaListener(id = "multi-"+"${spring.kafka.shopper.group-id}",
        subjects = "${spring.kafka.shopper.matter}",
        groupId = "multi-"+"${spring.kafka.shopper.group-id}",
        autoStartup = "${spring.kafka.shopper.@auto-start:true}")
public class ConsumeInventoryMultiListenerService {

    @KafkaHandler
    @SendTo("${spring.kafka.shopper.forward-to}")
    void pay attention(String message) {
        log.data("KafkaHandler-String: {}", message);
    }

    @KafkaHandler(isDefault = true)
    @SendTo("${spring.kafka.shopper.forward-to}")
    Object listenDefault(Object object) {
        log.data("KafkaHandler-Default: {}", object);
        return object;
    }
}

Learn from a particular partition

If we’ve got subjects with a number of partitions, then @KafkaListener can subscribe explicitly to a selected partition of matter with an preliminary offset:

@Slf4j
@Service
@KafkaListener(id = "multi-"+"${spring.kafka.shopper.group-id}",
        subjects = "${spring.kafka.shopper.matter}",
        groupId = "multi-"+"${spring.kafka.shopper.group-id}",
        autoStartup = "${spring.kafka.shopper.@auto-start:true}",
        topicPartitions = @TopicPartition(matter = "${spring.kafka.shopper.matter}", partitionOffsets = {
                @PartitionOffset(partition = "0", initialOffset = "0"),
                @PartitionOffset(partition = "4", initialOffset = "0")}))
public class ConsumeInventoryMultiListenerService {

    ...
}

At any time when this explicit listener is initialized, all of the beforehand consumed messages from the partitions 0 and 4 might be re-consumed each time. The reason being that at each partition offset, the initialOffset has been set to 0 within the listener.

Suppose, if there may be any use-case the place we don’t have to set the offset, then we are able to merely use the partitions property to set the partitions with out the offset.

@KafkaListener(id = "${spring.kafka.shopper.group-id}",
        subjects = "${spring.kafka.shopper.matter}",
        groupId = "${spring.kafka.shopper.group-id}",
        autoStartup = "${spring.kafka.shopper.auto-start:true}",
        topicPartitions = @TopicPartition(matter = "${spring.kafka.shopper.matter}",
                partitions = { "0", "1", "3", "6", "8"}))
@SendTo("${spring.kafka.shopper.forward-to}")
public void listenMessage(String message,
                          @Header(identify = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false)
                          Integer key,
                          @Header(KafkaHeaders.RECEIVED_TOPIC) String matter,
                          @Header(KafkaHeaders.GROUP_ID) String groupId,
                          @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                          @Header(KafkaHeaders.RECEIVED_TIMESTAMP) lengthy ts) {
    log.data("Acquired Message {} with key {} in matter {} as a part of group {} at partition {} at timestamp {}.",
            message,
            key,
            matter,
            groupId,
            partition,
            ts);
}

Message filters in Listeners

In case if we have to filter the incoming messages primarily based upon a regex or some content material, then we are able to add a customized filter to the listener that we’ve got outlined by setting a RecordFilterStrategy.

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
        KafkaTemplate<String, String> kafkaTemplate) {
    ConcurrentKafkaListenerContainerFactory<String, String> manufacturing facility =
        new ConcurrentKafkaListenerContainerFactory<>();
    manufacturing facility.setConsumerFactory(consumerFactory());
    manufacturing facility.setReplyTemplate(kafkaTemplate);
    manufacturing facility.setBatchListener(true);
    manufacturing facility.setConcurrency(3);
    manufacturing facility.getContainerProperties().setPollTimeout(3000);
    manufacturing facility.setRecordFilterStrategy(
            document -> document.worth().getName().incorporates("Stock"));
    return manufacturing facility;
}

Then we are able to go this container manufacturing facility to the @KafkaListener annotation:

@KafkaListener(id = "${spring.kafka.shopper.group-id}",
        subjects = "${spring.kafka.shopper.matter}",
        groupId = "${spring.kafka.shopper.group-id}",
        autoStartup = "${spring.kafka.shopper.auto-start:true}",
        topicPartitions = @TopicPartition(matter = "${spring.kafka.shopper.matter}",
                partitions = { "0", "1", "3", "6", "8"}),
        containerFactory = "kafkaListenerContainerFactory")
@SendTo("${spring.kafka.shopper.forward-to}")
public void listenMessage(String message,
                          @Header(identify = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false)
                          Integer key,
                          @Header(KafkaHeaders.RECEIVED_TOPIC) String matter,
                          @Header(KafkaHeaders.GROUP_ID) String groupId,
                          @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                          @Header(KafkaHeaders.RECEIVED_TIMESTAMP) lengthy ts) {
    log.data("Acquired Message {} with key {} in matter {} as a part of group {} at partition {} at timestamp {}.",
            message,
            key,
            matter,
            groupId,
            partition,
            ts);
}

Thus, all of the messages that match the matching filter might be discarded on this listener. On this approach, we are able to subscribe to the messages from the subject utilizing numerous means and additional course of the information as per our comfort.

Customized Serializers/Deserializers

Until now, we’ve got been sending or receiving String messages. Nonetheless, Spring Boot additionally helps sending and receiving of customized objects. However that requires further configuration of serializer in ProducerFactory and desrializer in ConsumerFactory.

Within the beginning of this chapter, we determined that we are going to be engaged on the Stock service for the e-commerce app. So let’s now outline the easy bean class for an merchandise to be saved within the stock:

@ToString
@Builder
@Information
@AllArgsConstructor
@NoArgsConstructor
public class InventoryItem {

    non-public String id;
    non-public String identify;
    non-public int rely;
    non-public Date listingDate;
}

Right here we’re utilizing the Lombok annotations to outline a Builder sample and initiating the constructors. We’ll use these builder in our upcoming implementation.

Ship Customized Messages

Let’s outline a JsonSerializer to serialize the article and ship it as a JSON content material to the subject. So as to do this we have to modify our current ProducerFactory and replace our KafkaTemplate:

@Configuration
public class KafkaProducerConfig {

    ...
        
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, InventoryItem> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, InventoryItem> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

Now we are able to replace our Controller implementation to ship InventoryItem merchandise object to KafkaTemplate:

@Autowired
non-public KafkaTemplate<String, InventoryItem> kafkaTemplate;

@PostMapping(worth = "/publish")
@Transactional("kafkaTransactionManager")
public void sendMessage(@RequestParam("identify") String identify, @RequestParam("rely") int rely) {
    InventoryItem merchandise = InventoryItem.builder()
            .id(UUID.randomUUID().toString())
            .identify(identify)
            .rely(rely)
            .listingDate(new Date())
            .construct();
    log.data(String.format("Message despatched to stock -> %s", merchandise));
    kafkaTemplate.ship(matter, merchandise);
}

Obtain Customized Messages

So as to devour the identical objects, we have to replace our ConsumerFactory and ConcurrentKafkaListenerContainerFactory to just accept the JSON objects and convert into Java POJOs:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    ...

    @Bean
    public ConsumerFactory<String, InventoryItem> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetConfig);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props,
                new StringDeserializer(),
                new JsonDeserializer<>(InventoryItem.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, InventoryItem> kafkaListenerContainerFactory(
            KafkaTemplate<String, InventoryItem> kafkaTemplate) {
        ConcurrentKafkaListenerContainerFactory<String, InventoryItem> manufacturing facility = new ConcurrentKafkaListenerContainerFactory<>();
        manufacturing facility.setConsumerFactory(consumerFactory());
        manufacturing facility.setReplyTemplate(kafkaTemplate);
        manufacturing facility.setBatchListener(true);
        manufacturing facility.setConcurrency(3);
        manufacturing facility.getContainerProperties().setPollTimeout(3000);
        manufacturing facility.setRecordFilterStrategy(
                document -> document.worth().getName().incorporates("Stock"));
        return manufacturing facility;
    }
}

Now we have to replace our service layer with KafkaListener to devour the Stock merchandise.

@KafkaListener(id = "${spring.kafka.shopper.group-id}",
        subjects = "${spring.kafka.shopper.matter}",
        groupId = "${spring.kafka.shopper.group-id}",
        autoStartup = "${spring.kafka.shopper.auto-start:true}",
        topicPartitions = @TopicPartition(matter = "${spring.kafka.shopper.matter}",
                partitions = { "0", "1", "3", "6", "8"}))
@SendTo("${spring.kafka.shopper.forward-to}")
public void listenMessage(@Payload @Legitimate InventoryItem merchandise,
                          @Header(identify = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false)
                          Integer key,
                          @Header(KafkaHeaders.RECEIVED_TOPIC) String matter,
                          @Header(KafkaHeaders.GROUP_ID) String groupId,
                          @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                          @Header(KafkaHeaders.RECEIVED_TIMESTAMP) lengthy ts) {
    log.data("Acquired Message {} with key {} in matter {} as a part of group {} at partition {} at timestamp {}.",
            merchandise,
            key,
            matter,
            groupId,
            partition,
            ts);
}

We will annotate our objects with @Legitimate to carry out any type of validation to just accept or reject the messages as effectively.

Transaction Help

Kafka helps transactional characteristic the place it ensures exactly-once message supply between producer and shopper functions by way of the Transactional API. The functions that primarily exhibit the “consume-process-produce” sample want to make use of transactions to assist atomic operations. For instance, if we’ve got the next coding sample then we’d like to verify of the exactly-once message supply:

1. devour(message) {
2.     course of(message);
3.     template.ship("matter", message1);
4.     saveInDb(message);
5.     template.ship("matter", message2);
6. }

Let’s take a fast take a look at numerous prospects the place we are able to see exceptions:

  • If there may be any type of exception at line no. 2, then the message could be subsequent consumed when the patron restarts. This may be resolved if the messages is idempotant.
  • Let’s say if an exception is thrown at line no. 3, then the message1 could be revealed when the patron restarts once more. Thus message1 is revealed twice.
  • If an exception is thrown at line no. 4, then the message might be saved in database once more when the patron restarts. Thus the message is saved twice and the message1 can be revealed twice.
  • Let’s say if there may be an exception after line no. 4, then message2 might be revealed once more when the patron restarts.

Now think about if we’ve got transactions in place, then each time any exception happens, then all of the messages which were revealed and the database transactions that occurred inside a transaction might be rolled again. When the patron restarts, then all of the operations throughout the transaction might be dedicated if all of the processes succeed.

Initially, Kafka use to assist solely at-most-once or at-least-once message supply. However with the introduction of transactions between Kafka brokers and purchasers, functions began making certain exactly-once message supply.

A Transaction-aware Producer

We will outline a typical Kafka producer to allow transactions. Initially we have to allow idempotence and specify a prefix for the transaction id:

spring:
  kafka:
    producer:
      bootstrap-servers: localhost:9092
      matter: product-inventory
      enable-idempotence: true
      transaction-id-prefix: tx-

Then we’ll replace our producer configuration to go these values as properties for Kafka Producer:

@Configuration
public class KafkaProducerConfig {

	...

    @Worth(worth = "${spring.kafka.producer.enable-idempotence}")
    non-public String enableIdempotence;

    @Worth(worth = "${spring.kafka.producer.transaction-id-prefix}")
    non-public String transactionIdPrefix;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotence);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, InventoryItem> producerFactory() {
        DefaultKafkaProducerFactory<String, InventoryItem> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(producerConfigs());
        defaultKafkaProducerFactory.setTransactionIdPrefix(transactionIdPrefix);
        return defaultKafkaProducerFactory;
    }

    @Bean
    public KafkaTransactionManager<String, InventoryItem> kafkaTransactionManager() {
        return new KafkaTransactionManager<>(producerFactory());
    }

    ...
}

As we’ve got enabled idempotence, Kafka will use this transaction id prefix as a part of its precise algorithm to doubtlessly deduplicate any type of message that’s being despatched by the producer. This settings allow Kafka to note if the producer sends the identical message to the subject greater than as soon as. Now we have to simply be sure that the transaction id is sort of distinct for every producer whereas constant even when it restarts.

We will additionally discover that we’ve got outlined a Transaction Supervisor. We have to annotate the prevailing technique that sends the message with @Transactional and go this transaction supervisor bean. We also needs to use executeInTransaction() technique as a substitute of ship() to course of the transactions.

@PostMapping(worth = "/publish")
@Transactional("kafkaTransactionManager")
public void sendMessage(@RequestParam("identify") String identify, @RequestParam("rely") int rely) {
    InventoryItem merchandise = InventoryItem.builder()
             .id(UUID.randomUUID().toString())
             .identify(identify)
             .rely(rely)
             .listingDate(new Date())
             .construct();
    log.data(String.format("Message despatched to stock -> %s", merchandise));
    kafkaTemplate.executeInTransaction(t -> t.ship(matter, merchandise));
}

This may register the dealer with the dealer which might now use transactions. The dealer in flip would use these to carry out write-ahead transactions to a transaction log. Thus the dealer will take away any actions type that log which belongs to a producer with the same transaction id however from an earlier epoch.

A Transaction-aware Shopper

Equally, we are able to be sure that the customers adhere to the transactions whereas consuming messages from the subject. Though we learn all of the messages from a subject partition in a selected order, we’ve got two choices to learn the transactional messages:

  • read_committed: This permits us to not solely learn the messages that aren’t a part of the transaction, but additionally learn those after the transaction is dedicated.
  • read_uncommitted: This permits us to learn all of the messages within the offset order with out ready for the general transactions to be dedicated.

We have to thus outline isolation.degree on the devour config to go on these property. The default worth of isolation.degree is read_uncommitted.

spring:
  kafka:
    shopper:
      bootstrap-servers: localhost:9092
      matter: product-inventory
      forward-to: fulfillment-station
      group-id: group-id
      auto-start: true
      auto-offset-reset: earliest
      enable-auto-commit: false
      isolation-level: read_committed
@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    ...

    @Worth(worth = "${spring.kafka.shopper.isolation-level}")
    non-public String isolationLevel;

    @Worth(worth = "${spring.kafka.shopper.enable-auto-commit}")
    non-public String enableAutoCommit;

    @Bean
    public ConsumerFactory<String, InventoryItem> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetConfig);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, isolationLevel);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props,
                new StringDeserializer(),
                new JsonDeserializer<>(InventoryItem.class));
    }

    ...
}

Then we are able to additionally annotate our @KafkaListener with @Transactional annotation to verify it follows transactions.

@KafkaListener(id = "${spring.kafka.shopper.group-id}",
        subjects = "${spring.kafka.shopper.matter}",
        groupId = "${spring.kafka.shopper.group-id}",
        autoStartup = "${spring.kafka.shopper.auto-start:true}",
        topicPartitions = @TopicPartition(matter = "${spring.kafka.shopper.matter}",
                partitions = { "0", "1", "3", "6", "8"}))
@SendTo("${spring.kafka.shopper.forward-to}")
@Transactional("kafkaTransactionManager")
public void listenMessage(@Payload InventoryItem merchandise,
                          @Header(identify = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false)
                          Integer key,
                          @Header(KafkaHeaders.RECEIVED_TOPIC) String matter,
                          @Header(KafkaHeaders.GROUP_ID) String groupId,
                          @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                          @Header(KafkaHeaders.RECEIVED_TIMESTAMP) lengthy ts) {
    log.data("Acquired Message {} with key {} in matter {} as a part of group {} at partition {} at timestamp {}.",
             merchandise,
             key,
             matter,
             groupId,
             partition,
             ts);
}

Conclusion

On this chapter, we took a reasonably deep-dive into the varied options offered by Kafka assist for Spring. Now we have additionally constructed the Stock Service for the ecommerce-app that we began with to assist out Steve and Jane.

The general implementation for this and the earlier chapter could be present in GitHub.

Within the subsequent chapter, we’ll have a look into the Spring Cloud Stream and its numerous binders to assist Kafka.

[ad_2]

Leave a Reply

Your email address will not be published. Required fields are marked *