diff --git a/src/main/kotlin/com/example/demo/config/KafkaConsumerConfig.kt b/src/main/kotlin/com/example/demo/config/KafkaConsumerConfig.kt index 40c17a4..30b50c0 100644 --- a/src/main/kotlin/com/example/demo/config/KafkaConsumerConfig.kt +++ b/src/main/kotlin/com/example/demo/config/KafkaConsumerConfig.kt @@ -1,18 +1,17 @@ package com.example.demo.config -import com.example.demo.services.kafka.dto.CityCreateDto +import com.example.demo.services.database.city.CityService +import com.example.demo.services.kafka.Consumer +import com.fasterxml.jackson.databind.ObjectMapper import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.StringDeserializer +import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Value import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory import org.springframework.kafka.core.ConsumerFactory import org.springframework.kafka.core.DefaultKafkaConsumerFactory -import org.springframework.kafka.support.converter.RecordMessageConverter -import org.springframework.kafka.support.converter.StringJsonMessageConverter -import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper -import org.springframework.kafka.support.mapping.Jackson2JavaTypeMapper @Configuration class KafkaConsumerConfig( @@ -21,18 +20,23 @@ class KafkaConsumerConfig( @Value("\${kafka.consumer.group-id}") val consumerGroup: String, ) { -// @Bean -// fun consumer(@Autowired cityService: CityService): Consumer = Consumer( -// cityService = cityService, -// ) + @Bean + fun consumer( + @Autowired cityService: CityService, + @Autowired objectMapper: ObjectMapper, + ): Consumer = Consumer( + cityService = cityService, + objectMapper = objectMapper, + ) @Bean fun consumerFactory(): ConsumerFactory { - val configs = HashMap() - configs[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = servers - configs[ConsumerConfig.GROUP_ID_CONFIG] = consumerGroup - configs[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java - configs[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java + val configs = mapOf( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to servers, + ConsumerConfig.GROUP_ID_CONFIG to consumerGroup, + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, + ) return DefaultKafkaConsumerFactory(configs) } @@ -41,25 +45,7 @@ class KafkaConsumerConfig( fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory { val factory = ConcurrentKafkaListenerContainerFactory() factory.consumerFactory = consumerFactory() - factory.setRecordMessageConverter(recordMessageConverter()) return factory } - - @Bean - fun recordMessageConverter(): RecordMessageConverter { - val converter = StringJsonMessageConverter() - - val typeMapper = DefaultJackson2JavaTypeMapper() - typeMapper.typePrecedence = Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID - typeMapper.addTrustedPackages("com.baeldung.spring.kafka") - - val mappings: MutableMap> = HashMap() - mappings["city"] = CityCreateDto::class.java - typeMapper.idClassMapping = mappings - - converter.typeMapper = typeMapper - - return converter - } } \ No newline at end of file diff --git a/src/main/kotlin/com/example/demo/services/kafka/Consumer.kt b/src/main/kotlin/com/example/demo/services/kafka/Consumer.kt new file mode 100644 index 0000000..8dac3df --- /dev/null +++ b/src/main/kotlin/com/example/demo/services/kafka/Consumer.kt @@ -0,0 +1,23 @@ +package com.example.demo.services.kafka + +import com.example.demo.services.database.city.CityService +import com.example.demo.services.kafka.dto.CityCreateDto +import com.fasterxml.jackson.databind.ObjectMapper +import org.springframework.kafka.annotation.KafkaListener +import org.springframework.messaging.handler.annotation.Payload +import org.springframework.stereotype.Component + +@Component +class Consumer( + private val cityService: CityService, + private val objectMapper: ObjectMapper +) { + @KafkaListener( + topics = ["#{'\${kafka.consumer.topics}'.split(',')}"], + autoStartup = "\${kafka.consumer.auto-startup:false}", + ) + fun handleCityCreate(@Payload message: String) { + val cityCreateDto = objectMapper.readValue(message, CityCreateDto::class.java) + cityService.create(cityCreateDto) + } +} \ No newline at end of file diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 8de8710..1a583e2 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -23,4 +23,4 @@ kafka: consumer: group-id: demo-consumer topics: demo-city-sync - auto-startup: false + auto-startup: true diff --git a/src/test/kotlin/com/example/demo/BaseUnitTest.kt b/src/test/kotlin/com/example/demo/BaseUnitTest.kt new file mode 100644 index 0000000..4124e58 --- /dev/null +++ b/src/test/kotlin/com/example/demo/BaseUnitTest.kt @@ -0,0 +1,9 @@ +package com.example.demo + +import com.example.demo.services.kafka.Consumer +import org.springframework.boot.test.mock.mockito.MockBean + +open class BaseUnitTest { + @MockBean + lateinit var consumer: Consumer +} \ No newline at end of file diff --git a/src/test/kotlin/com/example/demo/http/controllers/GreetingControllerTest.kt b/src/test/kotlin/com/example/demo/http/controllers/GreetingControllerTest.kt index 244595a..584c0f4 100644 --- a/src/test/kotlin/com/example/demo/http/controllers/GreetingControllerTest.kt +++ b/src/test/kotlin/com/example/demo/http/controllers/GreetingControllerTest.kt @@ -1,6 +1,6 @@ package com.example.demo.http.controllers -import com.example.demo.http.controllers.GreetingController +import com.example.demo.BaseUnitTest import org.hamcrest.core.StringContains import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest @@ -11,7 +11,7 @@ import org.springframework.test.web.servlet.result.MockMvcResultMatchers.status import kotlin.test.Test @WebMvcTest(GreetingController::class) -class GreetingControllerTest(@Autowired val mockMvc: MockMvc) { +class GreetingControllerTest(@Autowired val mockMvc: MockMvc): BaseUnitTest() { @Test fun greetings_shouldSeeGreetingMessage() { mockMvc.perform(get("/greeting")) diff --git a/src/test/kotlin/com/example/demo/http/controllers/ProductControllerTest.kt b/src/test/kotlin/com/example/demo/http/controllers/ProductControllerTest.kt index b546652..61d6d28 100644 --- a/src/test/kotlin/com/example/demo/http/controllers/ProductControllerTest.kt +++ b/src/test/kotlin/com/example/demo/http/controllers/ProductControllerTest.kt @@ -1,5 +1,6 @@ package com.example.demo.http.controllers +import com.example.demo.BaseUnitTest import com.example.demo.http.responses.ResponseStatus import com.example.demo.models.Product import com.example.demo.services.database.product.ProductService @@ -24,7 +25,7 @@ import java.time.format.DateTimeFormatter import java.util.* @WebMvcTest(ProductController::class) -class ProductControllerTest(@Autowired val mockMvc: MockMvc) { +class ProductControllerTest(@Autowired val mockMvc: MockMvc): BaseUnitTest() { @MockBean private lateinit var productService: ProductService private val mapper = jacksonObjectMapper() diff --git a/src/test/kotlin/com/example/demo/http/controllers/ShopControllerTest.kt b/src/test/kotlin/com/example/demo/http/controllers/ShopControllerTest.kt index 7a0efd9..4adbb76 100644 --- a/src/test/kotlin/com/example/demo/http/controllers/ShopControllerTest.kt +++ b/src/test/kotlin/com/example/demo/http/controllers/ShopControllerTest.kt @@ -1,5 +1,6 @@ package com.example.demo.http.controllers +import com.example.demo.BaseUnitTest import com.example.demo.models.* import com.example.demo.providers.ShopProvider import org.mockito.kotlin.doReturn @@ -16,7 +17,7 @@ import java.util.* import kotlin.test.Test @WebMvcTest(ShopController::class) -class ShopControllerTest(@Autowired val mockMvc: MockMvc) { +class ShopControllerTest(@Autowired val mockMvc: MockMvc): BaseUnitTest() { @MockBean private lateinit var shopProvider: ShopProvider diff --git a/src/test/kotlin/com/example/demo/services/database/product/ProductServiceImplTest.kt b/src/test/kotlin/com/example/demo/services/database/product/ProductServiceImplTest.kt index f82edbf..b4edb08 100644 --- a/src/test/kotlin/com/example/demo/services/database/product/ProductServiceImplTest.kt +++ b/src/test/kotlin/com/example/demo/services/database/product/ProductServiceImplTest.kt @@ -1,5 +1,6 @@ package com.example.demo.services.database.product +import com.example.demo.BaseUnitTest import com.example.demo.models.Product import com.example.demo.providers.ProductRepository import com.example.demo.services.database.product.exceptions.ProductNotFoundException @@ -19,7 +20,7 @@ import kotlin.test.Test @RunWith(SpringRunner::class) @SpringBootTest -class ProductServiceImplTest { +class ProductServiceImplTest: BaseUnitTest() { private val defaultTopic = "some-default-topic" private lateinit var productService: ProductServiceImpl diff --git a/src/test/kotlin/com/example/demo/services/kafka/ProducerImplTest.kt b/src/test/kotlin/com/example/demo/services/kafka/ProducerImplTest.kt index 81e9def..6d9e73d 100644 --- a/src/test/kotlin/com/example/demo/services/kafka/ProducerImplTest.kt +++ b/src/test/kotlin/com/example/demo/services/kafka/ProducerImplTest.kt @@ -1,5 +1,6 @@ package com.example.demo.services.kafka +import com.example.demo.BaseUnitTest import com.example.demo.models.Product import com.example.demo.services.kafka.dto.ProductDto import com.fasterxml.jackson.databind.ObjectMapper @@ -23,7 +24,7 @@ import kotlin.test.assertEquals @RunWith(SpringRunner::class) @SpringBootTest -class ProducerImplTest { +class ProducerImplTest: BaseUnitTest() { @Autowired private lateinit var producerImpl: ProducerImpl