use kotlinx.serialization instead of ObjectMapper in kafka producer

This commit is contained in:
Denis Savosin
2024-10-03 15:01:10 +07:00
parent 690e265eb4
commit f9cf8b84d4
8 changed files with 15 additions and 26 deletions

View File

@@ -2,7 +2,6 @@ package com.example.demo.config
import com.example.demo.services.kafka.Producer
import com.example.demo.services.kafka.ProducerImpl
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringSerializer
import org.springframework.beans.factory.annotation.Autowired
@@ -37,9 +36,7 @@ class KafkaProducerConfig(
@Bean
fun producer(
@Autowired kafkaTemplate: KafkaTemplate<String, Any>,
@Autowired objectMapper: ObjectMapper
): Producer = ProducerImpl(
kafkaTemplate,
objectMapper,
)
}

View File

@@ -1,7 +1,7 @@
package com.example.demo.models
import com.example.demo.models.serializables.OffsetDateTimeSerialization
import com.example.demo.models.serializables.UuidSerialization
import com.example.demo.services.serializables.OffsetDateTimeSerialization
import com.example.demo.services.serializables.UuidSerialization
import kotlinx.serialization.Serializable
import org.springframework.data.annotation.Id
import org.springframework.data.relational.core.mapping.Column

View File

@@ -1,8 +1,8 @@
package com.example.demo.models
import com.example.demo.models.serializables.OffsetDateTimeSerialization
import com.example.demo.models.serializables.UuidSerialization
import com.example.demo.services.serializables.OffsetDateTimeSerialization
import com.example.demo.services.serializables.UuidSerialization
import com.example.demo.utils.roundTo
import kotlinx.serialization.Serializable
import org.springframework.data.annotation.Id

View File

@@ -2,7 +2,8 @@ package com.example.demo.services.kafka
import com.example.demo.models.Product
import com.example.demo.services.kafka.dto.ProductDto
import com.fasterxml.jackson.databind.ObjectMapper
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.encodeToJsonElement
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.support.KafkaHeaders
import org.springframework.messaging.Message
@@ -12,11 +13,12 @@ import org.springframework.stereotype.Service
@Service
class ProducerImpl(
private val kafkaTemplate: KafkaTemplate<String, Any>,
private val objectMapper: ObjectMapper
): Producer {
override fun produceProductInfo(topicName: String, product: Product) {
val serializedProduct = Json.encodeToJsonElement(ProductDto(product))
val message: Message<String> = MessageBuilder
.withPayload(objectMapper.writeValueAsString(ProductDto(product)))
.withPayload(serializedProduct.toString())
.setHeader(KafkaHeaders.TOPIC, topicName)
.setHeader("X-Custom-Header", "some-custom-header")
.build()

View File

@@ -2,25 +2,18 @@ package com.example.demo.services.kafka.dto
import com.example.demo.models.Product
import com.example.demo.services.kafka.exceptions.InvalidArgumentException
import com.fasterxml.jackson.annotation.JsonProperty
import kotlinx.serialization.Serializable
import java.time.format.DateTimeFormatter
@Serializable
data class ProductDto(
@JsonProperty("id")
val id: Long,
@JsonProperty("guid")
val guid: String,
@JsonProperty("name")
val name: String,
@JsonProperty("description")
val description: String?,
@JsonProperty("price")
val price: Long,
@JsonProperty("createdAt")
val createdAt: String,
@JsonProperty("updatedAt")
val updatedAt: String?,
@JsonProperty("deletedAt")
val deletedAt: String?,
) {
@Throws(InvalidArgumentException::class)

View File

@@ -1,4 +1,4 @@
package com.example.demo.models.serializables
package com.example.demo.services.serializables
import kotlinx.serialization.KSerializer
import kotlinx.serialization.descriptors.PrimitiveKind

View File

@@ -1,4 +1,4 @@
package com.example.demo.models.serializables
package com.example.demo.services.serializables
import kotlinx.serialization.KSerializer
import kotlinx.serialization.descriptors.PrimitiveKind

View File

@@ -3,7 +3,7 @@ package com.example.demo.services.kafka
import com.example.demo.BaseUnitTest
import com.example.demo.models.Product
import com.example.demo.services.kafka.dto.ProductDto
import com.fasterxml.jackson.databind.ObjectMapper
import kotlinx.serialization.json.Json
import org.junit.runner.RunWith
import org.mockito.kotlin.argumentCaptor
import org.mockito.kotlin.doReturn
@@ -28,9 +28,6 @@ class ProducerImplTest: BaseUnitTest() {
@Autowired
private lateinit var producerImpl: ProducerImpl
@Autowired
private lateinit var objectMapper: ObjectMapper
@MockBean
private lateinit var kafkaTemplate: KafkaTemplate<String, Any>
@@ -58,7 +55,7 @@ class ProducerImplTest: BaseUnitTest() {
assertEquals(1, captor.allValues.count())
val actualArgument = captor.firstValue
val actualProductDto = objectMapper.readValue(actualArgument.payload, ProductDto::class.java)
val actualProductDto = Json.decodeFromString<ProductDto>(actualArgument.payload)
assertEquals(product.id, actualProductDto.id)
assertEquals(product.guid.toString(), actualProductDto.guid)
assertEquals(topic, actualArgument.headers[KafkaHeaders.TOPIC])