package com.example.demo.config import com.example.demo.config.properties.KafkaProperties import com.example.demo.services.database.city.CityService import com.example.demo.services.kafka.Consumer import io.micrometer.core.instrument.MeterRegistry import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.StringDeserializer import org.springframework.beans.factory.annotation.Autowired import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory import org.springframework.kafka.core.ConsumerFactory import org.springframework.kafka.core.DefaultKafkaConsumerFactory @Configuration class KafkaConsumerConfig( @Autowired val kafkaProperties: KafkaProperties ) { @Bean fun consumer( @Autowired cityService: CityService, @Autowired metricRegistry: MeterRegistry ): Consumer = Consumer( cityService = cityService, metricRegistry = metricRegistry, ) @Bean fun consumerFactory(): ConsumerFactory { val configs = mapOf( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaProperties.bootstrapServers, ConsumerConfig.GROUP_ID_CONFIG to kafkaProperties.consumer.groupId, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to kafkaProperties.consumer.autoOffsetReset, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java ) return DefaultKafkaConsumerFactory(configs) } @Bean fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory { val factory = ConcurrentKafkaListenerContainerFactory() factory.consumerFactory = consumerFactory() return factory } }