What is NewTopic, what is its purpose, and what is it used by?
NewTopic is an Apache class and its purpose is to represent a new topic in the Kafka cluster. NewTopic is used by AdminClient to create a new topic if it does not already exist in the Kafka cluster. In addition, if NewTopic is declared as a Spring bean, KafkaAdmin (wrapper for AdminClient) will use it to create a new topic in the Kafka cluster if it does not already exist.
For example:
@Bean
public NewTopic topic() {
return TopicBuilder.name("topic1")
.partitions(10)
.replicas(1)
.build();
}What is KafkaAdmin and what is its primary purpose?
KafkaAdmin is a bean automatically created by Spring Boot and its primary purpose is to provide a way to manage topics by delegating to Apache’s AdminClient
What is KafkaTemplate<K, V> and what is its primary purpose?
KafkaTemplate<K, V> is a bean automatically created by Spring Boot and its primary purpose is to provide a class for sending messages to a topic. The template delegates to an Apache KafkaProducer<K, V>
How do the following beans: producerFactory, producerConfigs, and kafkaTemplate relate to one another?
producerFactory uses producerConfigs to configure the factory. kafkaTemplate uses producerFactory to create a producer. For example:
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// See https://kafka.apache.org/documentation/#producerconfigs for more properties
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}What is a ProducerRecord<Key, Value>?
A ProducerRecord<Key, Value> is a Kafka class that represents the message to be sent to the Kafka cluster
When configuring a topic, what is the difference between CREATE_TIME and LOG_APPEND_TIME?
CREATE_TIME will use the timestamp provided by the Producer. LOG_APPEND_TIME will use the timestamp provided by the Broker when it appends the message to its log
What is the default ProducerListener that a KafkaTemplate<K, V> uses and what is its behavior?
LoggingProducerListener, it logs errors when a send is unsuccessful and does nothing when a send is successful. You can configure your own ProducerListener.
For example:
template.setProducerListener(...);
What is the purpose of a ProducerListener?
The purpose of a ProducerListener is to execute an asynchronous callback with the results of a send (success or failure) instead of waiting for a CompletableFutuer<T> to complete. A KafkaTemplate<K, V> is automatically configured with a LoggingProducerListener
True or False
By default, the DefaultKafkaProducerFactory creates a singleton producer used by all clients, as recommended in the KafkaProducer JavaDocs
True. However, should you wish to use a separate producer per thread, set producerPerThread on the DefaultKafkaProducerFactory to true. Care must be taken when setting this property to true.
See: https://docs.spring.io/spring-kafka/reference/kafka/sending-messages.html#producer-factory
What is a message listener container and what does it use to listen to messages?
A message listener container is a Spring concept (interface). Essentially, a message listener container is responsible for setting up all of the infrastructure needed to receive messages from a message provider (e.g. Kafka). A container uses message listeners to receive messages.
What are the 2 implementations of MessageListenerContainer and what are their differences?
KafkaMessageListenerContainerConcurrentMessageListenerContainerKafkaMessageListenerContainer is a single-threaded message listener container that uses a KafkaConsumer<K, V> to consume messages. ConcurrentMessageListenerContainer creates 1 or more KafkaMessageListenerContainers based on concurrency
What is the default offset commit AckMode for a KafkaMessageListenerContainer?
Batch. This commits the offset when all the records returned by poll() have been processed. Remember, a consumer’s poll() returns one or more ConsumerRecords. The consumer will block until and acknowledgement is returned by the broker (e.g. syncCommits=true)
Review the following code:
@KafkaListener(id = "one", topics = "one")
public void listen1(String in) {
System.out.println("1: " + in);
}
@KafkaListener(id = "two", topics = "two",
properties = "value.deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer")
public void listen2(byte[] in) {
System.out.println("2: " + new String(in));
}What bean is responsible for creating a message listener container when using @KafkaListener?
KafkaListenerEndpointRegistry is automatically created by Spring and is responsible for creating and managing message listener containers when @KafkaListener is used
Review the following code:
@Autowired
private KafkaListenerEndpointRegistry registry;
...
this.registry.getListenerContainer("myContainer").start();
...Which implementation of MessageListenerContainer is used when using @KafkaListener?
ConcurrentMessageListenerContainer