diff --git a/src/main/kotlin/com/example/demo/config/AppConfig.kt b/src/main/kotlin/com/example/demo/config/AppConfig.kt index ae013fa..6fae17f 100644 --- a/src/main/kotlin/com/example/demo/config/AppConfig.kt +++ b/src/main/kotlin/com/example/demo/config/AppConfig.kt @@ -9,6 +9,7 @@ import com.example.demo.services.database.city.CityServiceImpl import com.example.demo.services.database.product.ProductService import com.example.demo.services.database.product.ProductServiceImpl import com.example.demo.services.kafka.Producer +import com.fasterxml.jackson.databind.ObjectMapper import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Value import org.springframework.context.annotation.Bean @@ -20,9 +21,10 @@ class AppConfig( private val defaultProductSyncTopic: String ) { @Bean - fun shopProvider(): ShopProvider{ - return MockedShopProvider() - } + fun objectMapper(): ObjectMapper = ObjectMapper() + + @Bean + fun shopProvider(): ShopProvider = MockedShopProvider() @Bean fun productService( diff --git a/src/main/kotlin/com/example/demo/config/ProducerConfig.kt b/src/main/kotlin/com/example/demo/config/ProducerConfig.kt index c4fea98..ee2d395 100644 --- a/src/main/kotlin/com/example/demo/config/ProducerConfig.kt +++ b/src/main/kotlin/com/example/demo/config/ProducerConfig.kt @@ -2,7 +2,7 @@ package com.example.demo.config import com.example.demo.services.kafka.Producer import com.example.demo.services.kafka.ProducerImpl -import com.example.demo.services.kafka.dto.serializer.ProductSerializer +import com.fasterxml.jackson.databind.ObjectMapper import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.serialization.StringSerializer import org.springframework.beans.factory.annotation.Autowired @@ -24,7 +24,7 @@ class ProducerConfig( configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = servers configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java - configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = ProductSerializer::class.java + configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java return DefaultKafkaProducerFactory(configProps) } @@ -35,7 +35,11 @@ class ProducerConfig( ) @Bean - fun producer(@Autowired kafkaTemplate: KafkaTemplate): Producer = ProducerImpl( + fun producer( + @Autowired kafkaTemplate: KafkaTemplate, + @Autowired objectMapper: ObjectMapper + ): Producer = ProducerImpl( kafkaTemplate, + objectMapper, ) } \ No newline at end of file diff --git a/src/main/kotlin/com/example/demo/services/kafka/ProducerImpl.kt b/src/main/kotlin/com/example/demo/services/kafka/ProducerImpl.kt index b9b6654..19aa17b 100644 --- a/src/main/kotlin/com/example/demo/services/kafka/ProducerImpl.kt +++ b/src/main/kotlin/com/example/demo/services/kafka/ProducerImpl.kt @@ -2,6 +2,7 @@ package com.example.demo.services.kafka import com.example.demo.models.Product import com.example.demo.services.kafka.dto.ProductDto +import com.fasterxml.jackson.databind.ObjectMapper import org.springframework.kafka.core.KafkaTemplate import org.springframework.kafka.support.KafkaHeaders import org.springframework.messaging.Message @@ -10,11 +11,12 @@ import org.springframework.stereotype.Service @Service class ProducerImpl( - private val kafkaTemplate: KafkaTemplate + private val kafkaTemplate: KafkaTemplate, + private val objectMapper: ObjectMapper ): Producer { override fun produceProductInfo(topicName: String, product: Product) { - val message: Message = MessageBuilder - .withPayload(ProductDto(product)) + val message: Message = MessageBuilder + .withPayload(objectMapper.writeValueAsString(ProductDto(product))) .setHeader(KafkaHeaders.TOPIC, topicName) .setHeader("X-Custom-Header", "some-custom-header") .build() diff --git a/src/main/kotlin/com/example/demo/services/kafka/dto/ProductDto.kt b/src/main/kotlin/com/example/demo/services/kafka/dto/ProductDto.kt index ce52d23..d45197b 100644 --- a/src/main/kotlin/com/example/demo/services/kafka/dto/ProductDto.kt +++ b/src/main/kotlin/com/example/demo/services/kafka/dto/ProductDto.kt @@ -2,16 +2,25 @@ package com.example.demo.services.kafka.dto import com.example.demo.models.Product import com.example.demo.services.kafka.exceptions.InvalidArgumentException +import com.fasterxml.jackson.annotation.JsonProperty import java.time.format.DateTimeFormatter data class ProductDto( + @JsonProperty("id") val id: Long, + @JsonProperty("guid") val guid: String, + @JsonProperty("name") val name: String, + @JsonProperty("description") val description: String?, + @JsonProperty("price") val price: Long, + @JsonProperty("createdAt") val createdAt: String, + @JsonProperty("updatedAt") val updatedAt: String?, + @JsonProperty("deletedAt") val deletedAt: String?, ) { @Throws(InvalidArgumentException::class) diff --git a/src/main/kotlin/com/example/demo/services/kafka/dto/serializer/ProductSerializer.kt b/src/main/kotlin/com/example/demo/services/kafka/dto/serializer/ProductSerializer.kt deleted file mode 100644 index d4e5c1b..0000000 --- a/src/main/kotlin/com/example/demo/services/kafka/dto/serializer/ProductSerializer.kt +++ /dev/null @@ -1,20 +0,0 @@ -package com.example.demo.services.kafka.dto.serializer - -import com.example.demo.services.kafka.dto.ProductDto -import com.fasterxml.jackson.databind.ObjectMapper -import org.apache.kafka.common.serialization.Serializer -import javax.sql.rowset.serial.SerialException - -class ProductSerializer: Serializer { - private val objectMapper = ObjectMapper() - - override fun serialize(topic: String?, data: ProductDto?): ByteArray { - return objectMapper.writeValueAsBytes( - data ?: throw SerialException() - ) - } - - override fun close() { - // no logic - } -} \ No newline at end of file diff --git a/src/test/kotlin/com/example/demo/services/kafka/ProducerImplTest.kt b/src/test/kotlin/com/example/demo/services/kafka/ProducerImplTest.kt index 929d96e..81e9def 100644 --- a/src/test/kotlin/com/example/demo/services/kafka/ProducerImplTest.kt +++ b/src/test/kotlin/com/example/demo/services/kafka/ProducerImplTest.kt @@ -2,6 +2,7 @@ package com.example.demo.services.kafka import com.example.demo.models.Product import com.example.demo.services.kafka.dto.ProductDto +import com.fasterxml.jackson.databind.ObjectMapper import org.junit.runner.RunWith import org.mockito.kotlin.argumentCaptor import org.mockito.kotlin.doReturn @@ -26,6 +27,9 @@ class ProducerImplTest { @Autowired private lateinit var producerImpl: ProducerImpl + @Autowired + private lateinit var objectMapper: ObjectMapper + @MockBean private lateinit var kafkaTemplate: KafkaTemplate @@ -43,7 +47,7 @@ class ProducerImplTest { deletedAt = OffsetDateTime.now(), ) - val captor = argumentCaptor>() + val captor = argumentCaptor>() whenever(kafkaTemplate.send(captor.capture())) .doReturn(CompletableFuture>()) @@ -52,8 +56,10 @@ class ProducerImplTest { assertEquals(1, captor.allValues.count()) val actualArgument = captor.firstValue - assertEquals(product.id, actualArgument.payload.id) - assertEquals(product.guid.toString(), actualArgument.payload.guid) + + val actualProductDto = objectMapper.readValue(actualArgument.payload, ProductDto::class.java) + assertEquals(product.id, actualProductDto.id) + assertEquals(product.guid.toString(), actualProductDto.guid) assertEquals(topic, actualArgument.headers[KafkaHeaders.TOPIC]) assertEquals("some-custom-header", actualArgument.headers["X-Custom-Header"]) }