add kafka consumer

This commit is contained in:
Denis Savosin
2024-10-02 18:16:51 +07:00
parent 568989917f
commit f6211ea5d3
9 changed files with 61 additions and 39 deletions

View File

@@ -1,18 +1,17 @@
package com.example.demo.config package com.example.demo.config
import com.example.demo.services.kafka.dto.CityCreateDto import com.example.demo.services.database.city.CityService
import com.example.demo.services.kafka.Consumer
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer import org.apache.kafka.common.serialization.StringDeserializer
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration import org.springframework.context.annotation.Configuration
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.ConsumerFactory import org.springframework.kafka.core.ConsumerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.support.converter.RecordMessageConverter
import org.springframework.kafka.support.converter.StringJsonMessageConverter
import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper
import org.springframework.kafka.support.mapping.Jackson2JavaTypeMapper
@Configuration @Configuration
class KafkaConsumerConfig( class KafkaConsumerConfig(
@@ -21,18 +20,23 @@ class KafkaConsumerConfig(
@Value("\${kafka.consumer.group-id}") @Value("\${kafka.consumer.group-id}")
val consumerGroup: String, val consumerGroup: String,
) { ) {
// @Bean @Bean
// fun consumer(@Autowired cityService: CityService): Consumer = Consumer( fun consumer(
// cityService = cityService, @Autowired cityService: CityService,
// ) @Autowired objectMapper: ObjectMapper,
): Consumer = Consumer(
cityService = cityService,
objectMapper = objectMapper,
)
@Bean @Bean
fun consumerFactory(): ConsumerFactory<String, String> { fun consumerFactory(): ConsumerFactory<String, String> {
val configs = HashMap<String, Any>() val configs = mapOf(
configs[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = servers ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to servers,
configs[ConsumerConfig.GROUP_ID_CONFIG] = consumerGroup ConsumerConfig.GROUP_ID_CONFIG to consumerGroup,
configs[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
configs[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
)
return DefaultKafkaConsumerFactory(configs) return DefaultKafkaConsumerFactory(configs)
} }
@@ -41,25 +45,7 @@ class KafkaConsumerConfig(
fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> { fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
val factory = ConcurrentKafkaListenerContainerFactory<String, String>() val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
factory.consumerFactory = consumerFactory() factory.consumerFactory = consumerFactory()
factory.setRecordMessageConverter(recordMessageConverter())
return factory return factory
} }
@Bean
fun recordMessageConverter(): RecordMessageConverter {
val converter = StringJsonMessageConverter()
val typeMapper = DefaultJackson2JavaTypeMapper()
typeMapper.typePrecedence = Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID
typeMapper.addTrustedPackages("com.baeldung.spring.kafka")
val mappings: MutableMap<String, Class<*>> = HashMap()
mappings["city"] = CityCreateDto::class.java
typeMapper.idClassMapping = mappings
converter.typeMapper = typeMapper
return converter
}
} }

View File

@@ -0,0 +1,23 @@
package com.example.demo.services.kafka
import com.example.demo.services.database.city.CityService
import com.example.demo.services.kafka.dto.CityCreateDto
import com.fasterxml.jackson.databind.ObjectMapper
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.messaging.handler.annotation.Payload
import org.springframework.stereotype.Component
@Component
class Consumer(
private val cityService: CityService,
private val objectMapper: ObjectMapper
) {
@KafkaListener(
topics = ["#{'\${kafka.consumer.topics}'.split(',')}"],
autoStartup = "\${kafka.consumer.auto-startup:false}",
)
fun handleCityCreate(@Payload message: String) {
val cityCreateDto = objectMapper.readValue(message, CityCreateDto::class.java)
cityService.create(cityCreateDto)
}
}

View File

@@ -23,4 +23,4 @@ kafka:
consumer: consumer:
group-id: demo-consumer group-id: demo-consumer
topics: demo-city-sync topics: demo-city-sync
auto-startup: false auto-startup: true

View File

@@ -0,0 +1,9 @@
package com.example.demo
import com.example.demo.services.kafka.Consumer
import org.springframework.boot.test.mock.mockito.MockBean
open class BaseUnitTest {
@MockBean
lateinit var consumer: Consumer
}

View File

@@ -1,6 +1,6 @@
package com.example.demo.http.controllers package com.example.demo.http.controllers
import com.example.demo.http.controllers.GreetingController import com.example.demo.BaseUnitTest
import org.hamcrest.core.StringContains import org.hamcrest.core.StringContains
import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest
@@ -11,7 +11,7 @@ import org.springframework.test.web.servlet.result.MockMvcResultMatchers.status
import kotlin.test.Test import kotlin.test.Test
@WebMvcTest(GreetingController::class) @WebMvcTest(GreetingController::class)
class GreetingControllerTest(@Autowired val mockMvc: MockMvc) { class GreetingControllerTest(@Autowired val mockMvc: MockMvc): BaseUnitTest() {
@Test @Test
fun greetings_shouldSeeGreetingMessage() { fun greetings_shouldSeeGreetingMessage() {
mockMvc.perform(get("/greeting")) mockMvc.perform(get("/greeting"))

View File

@@ -1,5 +1,6 @@
package com.example.demo.http.controllers package com.example.demo.http.controllers
import com.example.demo.BaseUnitTest
import com.example.demo.http.responses.ResponseStatus import com.example.demo.http.responses.ResponseStatus
import com.example.demo.models.Product import com.example.demo.models.Product
import com.example.demo.services.database.product.ProductService import com.example.demo.services.database.product.ProductService
@@ -24,7 +25,7 @@ import java.time.format.DateTimeFormatter
import java.util.* import java.util.*
@WebMvcTest(ProductController::class) @WebMvcTest(ProductController::class)
class ProductControllerTest(@Autowired val mockMvc: MockMvc) { class ProductControllerTest(@Autowired val mockMvc: MockMvc): BaseUnitTest() {
@MockBean @MockBean
private lateinit var productService: ProductService private lateinit var productService: ProductService
private val mapper = jacksonObjectMapper() private val mapper = jacksonObjectMapper()

View File

@@ -1,5 +1,6 @@
package com.example.demo.http.controllers package com.example.demo.http.controllers
import com.example.demo.BaseUnitTest
import com.example.demo.models.* import com.example.demo.models.*
import com.example.demo.providers.ShopProvider import com.example.demo.providers.ShopProvider
import org.mockito.kotlin.doReturn import org.mockito.kotlin.doReturn
@@ -16,7 +17,7 @@ import java.util.*
import kotlin.test.Test import kotlin.test.Test
@WebMvcTest(ShopController::class) @WebMvcTest(ShopController::class)
class ShopControllerTest(@Autowired val mockMvc: MockMvc) { class ShopControllerTest(@Autowired val mockMvc: MockMvc): BaseUnitTest() {
@MockBean @MockBean
private lateinit var shopProvider: ShopProvider private lateinit var shopProvider: ShopProvider

View File

@@ -1,5 +1,6 @@
package com.example.demo.services.database.product package com.example.demo.services.database.product
import com.example.demo.BaseUnitTest
import com.example.demo.models.Product import com.example.demo.models.Product
import com.example.demo.providers.ProductRepository import com.example.demo.providers.ProductRepository
import com.example.demo.services.database.product.exceptions.ProductNotFoundException import com.example.demo.services.database.product.exceptions.ProductNotFoundException
@@ -19,7 +20,7 @@ import kotlin.test.Test
@RunWith(SpringRunner::class) @RunWith(SpringRunner::class)
@SpringBootTest @SpringBootTest
class ProductServiceImplTest { class ProductServiceImplTest: BaseUnitTest() {
private val defaultTopic = "some-default-topic" private val defaultTopic = "some-default-topic"
private lateinit var productService: ProductServiceImpl private lateinit var productService: ProductServiceImpl

View File

@@ -1,5 +1,6 @@
package com.example.demo.services.kafka package com.example.demo.services.kafka
import com.example.demo.BaseUnitTest
import com.example.demo.models.Product import com.example.demo.models.Product
import com.example.demo.services.kafka.dto.ProductDto import com.example.demo.services.kafka.dto.ProductDto
import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.ObjectMapper
@@ -23,7 +24,7 @@ import kotlin.test.assertEquals
@RunWith(SpringRunner::class) @RunWith(SpringRunner::class)
@SpringBootTest @SpringBootTest
class ProducerImplTest { class ProducerImplTest: BaseUnitTest() {
@Autowired @Autowired
private lateinit var producerImpl: ProducerImpl private lateinit var producerImpl: ProducerImpl