Apache Kafka request-reply semantics implementation: ReplyingKafkaTemplate

The above diagram will show the implementation as part of the main application with Async microservice implemented using Kafka.

This topic will discuss the implementation of the request-response semantics that are not natural to Kafka, so how to handle this approach.ReplyingKafkaTemplate provided by spring is used.

The requirements as below.

id 'org.springframework.boot' version '2.4.0-SNAPSHOT'
id 'io.spring.dependency-management' version '1.0.10.RELEASE'
compile group: 'org.springframework.kafka', name: 'spring-kafka', version: '2.5.2.RELEASE'
testImplementation 'org.springframework.kafka:spring-kafka-test'

Flow is Consume apache Kafka s request-reply like as we are doing with rest call with RESTTEMPLATE.

What we will be creating for the request-reply semantic to work.

  1. Need one Kafka config that will be used to config the Kafka with producer and consumer config.
  2. One ReplyingKafkaTemplate that will be needed for request-reply semantic.

KAFKA Configuration


@Configuration
public class KafkaConfig {

In Spring property file

kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
kafka.topic.request-topic=request-topic
kafka.topic.requestreply-topic=requestreply-topic
kafka.consumergroup=requestreplygorup

Details

@Configuration : provide this to the class level so spring boot will consider it as spring configuration level.

Sample Controller

@RestController
@RequestMapping(value = "/kafka")
public class KafkaTestController {

private final ProducerKafka producer;

@Autowired
ReplyingKafkaTemplate<String, Model, Model> kafkaTemplate;

@Value("${kafka.topic.request-topic}")
String requestTopic;

@Value("${kafka.topic.requestreply-topic}")
String requestReplyTopic;

@Autowired
public KafkaTestController(ProducerKafka producer) {
this.producer = producer;
}


@PostMapping(value="/sum",produces=MediaType.APPLICATION_JSON_VALUE,consumes= MediaType.APPLICATION_JSON_VALUE)
public Model sum(@RequestBody Model request) throws InterruptedException, ExecutionException {
// create producer record
ProducerRecord<String, RequestModel> record = new ProducerRecord<String, Model>(requestTopic, request);
// set reply topic in header
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes()));
// post in kafka topic
RequestReplyFuture<String, RequestModel, Model> sendAndReceive = kafkaTemplate.sendAndReceive(record);


// confirm if producer produced successfully
SendResult<String, ResponseModel> sendResult = sendAndReceive.getSendFuture().get();
//print all headers
sendResult.getProducerRecord().headers().forEach(header -> System.out.println(header.key() + ":" + header.value().toString()));
// get consumer record
ConsumerRecord<String, ResponseModel> consumerRecord = sendAndReceive.get();
// return consumer value
return consumerRecord.value();
}

}

The template is very easy to integrate and understand.

RequestReplyFuture<String, RequestModel, ResponseModel>

Using this way you won't face any difference between RestTemplate and ReplyingKafkaTemplate.

How its working

The replay Kafka template when they place a message on producer topic, they will add one CorrelationId to the header. The mentioned CorrelationId should be consumed by the microservice and return the same in the header.

Inside ReplyingKafkaTemplate in Library.

ReplyingKafkaTemplate<K, V, R> extends KafkaTemplate<K, V> implements BatchMessageListener<K, R>,
InitializingBean, SmartLifecycle, DisposableBean, ReplyingKafkaOperations<K, V, R>

There are managing the CorrelationId with BatchMessageListener and they will map to the requested thread in request replay semantic.

If we want to override the CorrelationIdStrategy we could pass a function so that every time when Kafka tries to produce the topic the corresponding strategy will invoke and append the CorrelationId to the header

/**
* Set a function to be called to establish a unique correlation key for each request
* record.
*
@param correlationStrategy the function.
*
@since 2.3
*/
public void setCorrelationIdStrategy(Function<ProducerRecord<K, V>, CorrelationKey> correlationStrategy) {
Assert.notNull(correlationStrategy, "'correlationStrategy' cannot be null");
this.correlationStrategy = correlationStrategy;
}

KAFKA UI

The very good UI for local Kafka is https://www.xeotek.com/kadeck/.

Out of Box solution If needed CorrelationId support in payload than the header.

During produce, you can override the doSend, and append the payload with the spring Kafka header.

@Override
protected ListenableFuture<SendResult> doSend(ProducerRecord producerRecord) {
if(producerRecord.value()!=null){
String correlationKey =correlationId(producerRecord.headers().lastHeader(correlationHeaderName));
CommonUtils.setProperty(producerRecord.value(), correlationPropertyName, correlationKey);
}
return super.doSend(producerRecord);
}

On the consumer side, first, check if in Kafka header it contains the CorrelationId else set in a header from the payload.

@Override
public void onMessage(List<ConsumerRecord<K, R>> data) {
data.forEach(
krConsumerRecord -> this.onConsumeRecord(krConsumerRecord)
);
logger.info(String.valueOf(data));
super.onMessage(data);
}

In this way, it will work CorrelationId with header and also if in payload

Sample payload

{
“correlationId”: “2380eae1-a66f-4608–80ea-e1a66fa608ff”,
}

Java 8 | Spring | React | Angular | CI/CD | Microservices | Monitoring |Apache Kafka. https://www.linkedin.com/in/shuaib-kunhabdulla-83780188