From 568989917f1b41178bf4ff6fa8c34d0e9f4052c1 Mon Sep 17 00:00:00 2001 From: Denis Savosin Date: Wed, 2 Oct 2024 16:59:27 +0700 Subject: [PATCH] add KafkaConsumerConfig configuration --- .../demo/config/KafkaConsumerConfig.kt | 65 +++++++++++++++++++ 1 file changed, 65 insertions(+) create mode 100644 src/main/kotlin/com/example/demo/config/KafkaConsumerConfig.kt diff --git a/src/main/kotlin/com/example/demo/config/KafkaConsumerConfig.kt b/src/main/kotlin/com/example/demo/config/KafkaConsumerConfig.kt new file mode 100644 index 0000000..40c17a4 --- /dev/null +++ b/src/main/kotlin/com/example/demo/config/KafkaConsumerConfig.kt @@ -0,0 +1,65 @@ +package com.example.demo.config + +import com.example.demo.services.kafka.dto.CityCreateDto +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.common.serialization.StringDeserializer +import org.springframework.beans.factory.annotation.Value +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory +import org.springframework.kafka.core.ConsumerFactory +import org.springframework.kafka.core.DefaultKafkaConsumerFactory +import org.springframework.kafka.support.converter.RecordMessageConverter +import org.springframework.kafka.support.converter.StringJsonMessageConverter +import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper +import org.springframework.kafka.support.mapping.Jackson2JavaTypeMapper + +@Configuration +class KafkaConsumerConfig( + @Value("\${kafka.bootstrap-servers}") + val servers: String, + @Value("\${kafka.consumer.group-id}") + val consumerGroup: String, +) { +// @Bean +// fun consumer(@Autowired cityService: CityService): Consumer = Consumer( +// cityService = cityService, +// ) + + @Bean + fun consumerFactory(): ConsumerFactory { + val configs = HashMap() + configs[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = servers + configs[ConsumerConfig.GROUP_ID_CONFIG] = consumerGroup + configs[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java + configs[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java + + return DefaultKafkaConsumerFactory(configs) + } + + @Bean + fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory { + val factory = ConcurrentKafkaListenerContainerFactory() + factory.consumerFactory = consumerFactory() + factory.setRecordMessageConverter(recordMessageConverter()) + + return factory + } + + @Bean + fun recordMessageConverter(): RecordMessageConverter { + val converter = StringJsonMessageConverter() + + val typeMapper = DefaultJackson2JavaTypeMapper() + typeMapper.typePrecedence = Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID + typeMapper.addTrustedPackages("com.baeldung.spring.kafka") + + val mappings: MutableMap> = HashMap() + mappings["city"] = CityCreateDto::class.java + typeMapper.idClassMapping = mappings + + converter.typeMapper = typeMapper + + return converter + } +} \ No newline at end of file