some refactoring

This commit is contained in:
Denis Savosin
2024-10-02 13:54:38 +07:00
parent 9b4a4c58ea
commit a9aa1eb8b6
6 changed files with 35 additions and 32 deletions

View File

@@ -9,6 +9,7 @@ import com.example.demo.services.database.city.CityServiceImpl
import com.example.demo.services.database.product.ProductService
import com.example.demo.services.database.product.ProductServiceImpl
import com.example.demo.services.kafka.Producer
import com.fasterxml.jackson.databind.ObjectMapper
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
@@ -20,9 +21,10 @@ class AppConfig(
private val defaultProductSyncTopic: String
) {
@Bean
fun shopProvider(): ShopProvider{
return MockedShopProvider()
}
fun objectMapper(): ObjectMapper = ObjectMapper()
@Bean
fun shopProvider(): ShopProvider = MockedShopProvider()
@Bean
fun productService(

View File

@@ -2,7 +2,7 @@ package com.example.demo.config
import com.example.demo.services.kafka.Producer
import com.example.demo.services.kafka.ProducerImpl
import com.example.demo.services.kafka.dto.serializer.ProductSerializer
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringSerializer
import org.springframework.beans.factory.annotation.Autowired
@@ -24,7 +24,7 @@ class ProducerConfig(
configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = servers
configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = ProductSerializer::class.java
configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
return DefaultKafkaProducerFactory(configProps)
}
@@ -35,7 +35,11 @@ class ProducerConfig(
)
@Bean
fun producer(@Autowired kafkaTemplate: KafkaTemplate<String, Any>): Producer = ProducerImpl(
fun producer(
@Autowired kafkaTemplate: KafkaTemplate<String, Any>,
@Autowired objectMapper: ObjectMapper
): Producer = ProducerImpl(
kafkaTemplate,
objectMapper,
)
}

View File

@@ -2,6 +2,7 @@ package com.example.demo.services.kafka
import com.example.demo.models.Product
import com.example.demo.services.kafka.dto.ProductDto
import com.fasterxml.jackson.databind.ObjectMapper
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.support.KafkaHeaders
import org.springframework.messaging.Message
@@ -10,11 +11,12 @@ import org.springframework.stereotype.Service
@Service
class ProducerImpl(
private val kafkaTemplate: KafkaTemplate<String, Any>
private val kafkaTemplate: KafkaTemplate<String, Any>,
private val objectMapper: ObjectMapper
): Producer {
override fun produceProductInfo(topicName: String, product: Product) {
val message: Message<ProductDto> = MessageBuilder
.withPayload(ProductDto(product))
val message: Message<String> = MessageBuilder
.withPayload(objectMapper.writeValueAsString(ProductDto(product)))
.setHeader(KafkaHeaders.TOPIC, topicName)
.setHeader("X-Custom-Header", "some-custom-header")
.build()

View File

@@ -2,16 +2,25 @@ package com.example.demo.services.kafka.dto
import com.example.demo.models.Product
import com.example.demo.services.kafka.exceptions.InvalidArgumentException
import com.fasterxml.jackson.annotation.JsonProperty
import java.time.format.DateTimeFormatter
data class ProductDto(
@JsonProperty("id")
val id: Long,
@JsonProperty("guid")
val guid: String,
@JsonProperty("name")
val name: String,
@JsonProperty("description")
val description: String?,
@JsonProperty("price")
val price: Long,
@JsonProperty("createdAt")
val createdAt: String,
@JsonProperty("updatedAt")
val updatedAt: String?,
@JsonProperty("deletedAt")
val deletedAt: String?,
) {
@Throws(InvalidArgumentException::class)

View File

@@ -1,20 +0,0 @@
package com.example.demo.services.kafka.dto.serializer
import com.example.demo.services.kafka.dto.ProductDto
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.kafka.common.serialization.Serializer
import javax.sql.rowset.serial.SerialException
class ProductSerializer: Serializer<ProductDto> {
private val objectMapper = ObjectMapper()
override fun serialize(topic: String?, data: ProductDto?): ByteArray {
return objectMapper.writeValueAsBytes(
data ?: throw SerialException()
)
}
override fun close() {
// no logic
}
}

View File

@@ -2,6 +2,7 @@ package com.example.demo.services.kafka
import com.example.demo.models.Product
import com.example.demo.services.kafka.dto.ProductDto
import com.fasterxml.jackson.databind.ObjectMapper
import org.junit.runner.RunWith
import org.mockito.kotlin.argumentCaptor
import org.mockito.kotlin.doReturn
@@ -26,6 +27,9 @@ class ProducerImplTest {
@Autowired
private lateinit var producerImpl: ProducerImpl
@Autowired
private lateinit var objectMapper: ObjectMapper
@MockBean
private lateinit var kafkaTemplate: KafkaTemplate<String, Any>
@@ -43,7 +47,7 @@ class ProducerImplTest {
deletedAt = OffsetDateTime.now(),
)
val captor = argumentCaptor<Message<ProductDto>>()
val captor = argumentCaptor<Message<String>>()
whenever(kafkaTemplate.send(captor.capture()))
.doReturn(CompletableFuture<SendResult<String, Any>>())
@@ -52,8 +56,10 @@ class ProducerImplTest {
assertEquals(1, captor.allValues.count())
val actualArgument = captor.firstValue
assertEquals(product.id, actualArgument.payload.id)
assertEquals(product.guid.toString(), actualArgument.payload.guid)
val actualProductDto = objectMapper.readValue(actualArgument.payload, ProductDto::class.java)
assertEquals(product.id, actualProductDto.id)
assertEquals(product.guid.toString(), actualProductDto.guid)
assertEquals(topic, actualArgument.headers[KafkaHeaders.TOPIC])
assertEquals("some-custom-header", actualArgument.headers["X-Custom-Header"])
}