Apache Kafka request-reply semantics implementation: ReplyingKafkaTemplate

The above diagram will show the implementation as part of the main application with Async microservice implemented using Kafka.

This topic will discuss the implementation of the request-response semantics that are not natural to Kafka, so how to handle this approach.ReplyingKafkaTemplate provided by spring is used.

The requirements as below.

id 'org.springframework.boot' version '2.4.0-SNAPSHOT'
id 'io.spring.dependency-management' version '1.0.10.RELEASE'
compile group: 'org.springframework.kafka', name: 'spring-kafka', version: '2.5.2.RELEASE'
testImplementation 'org.springframework.kafka:spring-kafka-test'

Flow is Consume apache Kafka s request-reply like as we are doing with rest call with RESTTEMPLATE.

What we will be creating for the request-reply semantic to work.

  1. Need one Kafka config that will be used to config the Kafka with producer and consumer config.

KAFKA Configuration


@Configuration
public class KafkaConfig {

@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;

@Value("${kafka.topic.requestreply-topic}")
private String requestReplyTopic;


@Value("${kafka.consumergroup}")
private String consumerGroup;

@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}

@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "helloworld");
return props;
}

@Bean
public ProducerFactory<String, Model> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public KafkaTemplate<String, Model> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}


@Bean
public ReplyingKafkaTemplate<String, Model, Model> replyKafkaTemplate(ProducerFactory<String, Model> pf,
KafkaMessageListenerContainer<String, Model> container){
return new ReplyingKafkaTemplate<>(pf, container);

}

@Bean
public KafkaMessageListenerContainer<String, Model> replyContainer(ConsumerFactory<String, Model> cf) {
ContainerProperties containerProperties = new ContainerProperties(requestReplyTopic);
//containerProperties.setMissingTopicsFatal(false);
return new KafkaMessageListenerContainer<>(cf, containerProperties);
}

@Bean
public ConsumerFactory<String, Model> consumerFactory() {
JsonDeserializer<Model> deserializer = new JsonDeserializer<>(Model.class);
deserializer.setRemoveTypeHeaders(false);
deserializer.addTrustedPackages("*");
deserializer.setUseTypeMapperForKey(true);

return new DefaultKafkaConsumerFactory<>(consumerConfigs(),new StringDeserializer(),deserializer);
}

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Model>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Model> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setReplyTemplate(kafkaTemplate());
return factory;
}

}

In Spring property file

kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
kafka.topic.request-topic=request-topic
kafka.topic.requestreply-topic=requestreply-topic
kafka.consumergroup=requestreplygorup

Details

@Configuration : provide this to the class level so spring boot will consider it as spring configuration level.props : This is the configuration needed to create the ProducerFactory as it needed the information regarding the kafka server and key and value serialisation .ReplyingKafkaTemplate : We need producerFactory and KafkaMessageListenerContainer for the creation of template.Group-id : This is very important configuration as same groupid is used by different consumer ,there is a chance of the consumer wont get the listner reply topic.Always use unique group id for consumer
More on https://dzone.com/articles/dont-use-apache-kafka-consumer-groups-the-wrong-wa

Sample Controller

@RestController
@RequestMapping(value = "/kafka")
public class KafkaTestController {

private final ProducerKafka producer;

@Autowired
ReplyingKafkaTemplate<String, Model, Model> kafkaTemplate;

@Value("${kafka.topic.request-topic}")
String requestTopic;

@Value("${kafka.topic.requestreply-topic}")
String requestReplyTopic;

@Autowired
public KafkaTestController(ProducerKafka producer) {
this.producer = producer;
}


@PostMapping(value="/sum",produces=MediaType.APPLICATION_JSON_VALUE,consumes= MediaType.APPLICATION_JSON_VALUE)
public Model sum(@RequestBody Model request) throws InterruptedException, ExecutionException {
// create producer record
ProducerRecord<String, RequestModel> record = new ProducerRecord<String, Model>(requestTopic, request);
// set reply topic in header
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes()));
// post in kafka topic
RequestReplyFuture<String, RequestModel, Model> sendAndReceive = kafkaTemplate.sendAndReceive(record);


// confirm if producer produced successfully
SendResult<String, ResponseModel> sendResult = sendAndReceive.getSendFuture().get();
//print all headers
sendResult.getProducerRecord().headers().forEach(header -> System.out.println(header.key() + ":" + header.value().toString()));
// get consumer record
ConsumerRecord<String, ResponseModel> consumerRecord = sendAndReceive.get();
// return consumer value
return consumerRecord.value();
}

}

The template is very easy to integrate and understand.

RequestReplyFuture<String, RequestModel, ResponseModel>String --> String -> Key we are sendiing.
RequestModel-> Request message in the topic .
ResponseModel --> the reply topic that you recived in reposne

Using this way you won't face any difference between RestTemplate and ReplyingKafkaTemplate.

How its working

The replay Kafka template when they place a message on producer topic, they will add one CorrelationId to the header. The mentioned CorrelationId should be consumed by the microservice and return the same in the header.

Inside ReplyingKafkaTemplate in Library.

ReplyingKafkaTemplate<K, V, R> extends KafkaTemplate<K, V> implements BatchMessageListener<K, R>,
InitializingBean, SmartLifecycle, DisposableBean, ReplyingKafkaOperations<K, V, R>

There are managing the CorrelationId with BatchMessageListener and they will map to the requested thread in request replay semantic.

If we want to override the CorrelationIdStrategy we could pass a function so that every time when Kafka tries to produce the topic the corresponding strategy will invoke and append the CorrelationId to the header

/**
* Set a function to be called to establish a unique correlation key for each request
* record.
*
@param correlationStrategy the function.
*
@since 2.3
*/
public void setCorrelationIdStrategy(Function<ProducerRecord<K, V>, CorrelationKey> correlationStrategy) {
Assert.notNull(correlationStrategy, "'correlationStrategy' cannot be null");
this.correlationStrategy = correlationStrategy;
}
Function<ProducerRecord<K, V>, CorrelationKey> correlationStrategythis.setCorrelationIdStrategy((kvProducerRecord -> new CorrelationKey(IDUtils.randomUUID().toString().getBytes())));Using lamba we could easily pass the CorrelationIdStrategy.

KAFKA UI

The very good UI for local Kafka is https://www.xeotek.com/kadeck/.

Out of Box solution If needed CorrelationId support in payload than the header.

During produce, you can override the doSend, and append the payload with the spring Kafka header.

@Override
protected ListenableFuture<SendResult> doSend(ProducerRecord producerRecord) {
if(producerRecord.value()!=null){
String correlationKey =correlationId(producerRecord.headers().lastHeader(correlationHeaderName));
CommonUtils.setProperty(producerRecord.value(), correlationPropertyName, correlationKey);
}
return super.doSend(producerRecord);
}

On the consumer side, first, check if in Kafka header it contains the CorrelationId else set in a header from the payload.

@Override
public void onMessage(List<ConsumerRecord<K, R>> data) {
data.forEach(
krConsumerRecord -> this.onConsumeRecord(krConsumerRecord)
);
logger.info(String.valueOf(data));
super.onMessage(data);
}
private void onConsumeRecord(ConsumerRecord<K,R> data){
if(data.headers().lastHeader(correlationHeaderName)!=null)
return;
try {
JsonNode parent = applicationContext.getBean(ObjectMapper.class).readTree((String) data.value());
String correlationId = parent.path(correlationPropertyName).asText();
if (!StringUtils.isEmpty(correlationId))
data.headers().add(correlationHeaderName, correlationId.getBytes());
else
throw new InvalidPropertiesFormatException("Failed to get the '"+correlationPropertyName+"' from response");
}catch (JsonProcessingException|InvalidPropertiesFormatException jsonProcessingException){
logger.error("Failed to extract '" + correlationPropertyName + "' from response", jsonProcessingException);
}
}

In this way, it will work CorrelationId with header and also if in payload

Sample payload

{
“correlationId”: “2380eae1-a66f-4608–80ea-e1a66fa608ff”,
}

Java 8 | Spring | React | Angular | CI/CD | Microservices | Monitoring |Apache Kafka. https://www.linkedin.com/in/shuaib-kunhabdulla-83780188