move producing to sub-project

This commit is contained in:
Savosin Denis
2025-06-03 11:34:09 +07:00
parent 0c858b59b3
commit 1bda2e1d21
15 changed files with 39 additions and 176 deletions

View File

@@ -0,0 +1,10 @@
dependencies {
implementation(project(":edge-contracts"))
implementation(rootProject.libs.jackson.datatype.jsr)
implementation(rootProject.libs.jackson.module.kotlin)
implementation(rootProject.libs.spring.boot.starter.validation)
implementation(rootProject.libs.spring.cloud.starter.streamKafka)
testImplementation(rootProject.libs.spring.cloud.streamTestBinder)
}

View File

@@ -0,0 +1,15 @@
package com.github.dannecron.demo.edgeproducing.dto
import kotlinx.serialization.Serializable
@Serializable
data class ProductDto(
val id: Long,
val guid: String,
val name: String,
val description: String?,
val price: Long,
val createdAt: String,
val updatedAt: String?,
val deletedAt: String?,
)

View File

@@ -0,0 +1,3 @@
package com.github.dannecron.demo.edgeproducing.exceptions
class InvalidArgumentException(argName: String): RuntimeException("invalid argument $argName")

View File

@@ -0,0 +1,9 @@
package com.github.dannecron.demo.edgeproducing.producer
import com.github.dannecron.demo.edgeproducing.dto.ProductDto
import com.github.dannecron.demo.edgeproducing.exceptions.InvalidArgumentException
interface ProductProducer {
@Throws(InvalidArgumentException::class)
fun produceProductSync(product: ProductDto)
}

View File

@@ -0,0 +1,36 @@
package com.github.dannecron.demo.edgeproducing.producer
import com.github.dannecron.demo.edgecontracts.validation.SchemaValidator
import com.github.dannecron.demo.edgeproducing.dto.ProductDto
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.encodeToJsonElement
import org.springframework.cloud.stream.function.StreamBridge
import org.springframework.messaging.support.MessageBuilder
import org.springframework.stereotype.Service
@Service
class ProductProducerImpl(
private val streamBridge: StreamBridge,
private val schemaValidator: SchemaValidator,
): ProductProducer {
private companion object {
private const val BINDING_NAME_PRODUCT_SYNC = "productSyncProducer"
private const val SCHEMA_KAFKA_PRODUCT_SYNC = "kafka-product-sync"
}
override fun produceProductSync(product: ProductDto) {
Json.encodeToJsonElement((product))
.also { schemaValidator.validate(SCHEMA_KAFKA_PRODUCT_SYNC, it) }
.let {
MessageBuilder.withPayload(it.toString())
.setHeader("X-Custom-Header", "some-custom-header")
.build()
}
.let {
streamBridge.send(
BINDING_NAME_PRODUCT_SYNC,
it,
)
}
}
}

View File

@@ -0,0 +1,62 @@
package com.github.dannecron.demo.edgeproducing.producer
import com.github.dannecron.demo.edgecontracts.validation.SchemaValidator
import com.github.dannecron.demo.edgeproducing.dto.ProductDto
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.encodeToJsonElement
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.mockito.kotlin.any
import org.mockito.kotlin.argumentCaptor
import org.mockito.kotlin.eq
import org.mockito.kotlin.mock
import org.mockito.kotlin.times
import org.mockito.kotlin.verify
import org.mockito.kotlin.whenever
import org.springframework.cloud.stream.function.StreamBridge
import org.springframework.messaging.Message
import java.time.OffsetDateTime
import java.time.format.DateTimeFormatter
import java.util.UUID
class ProductProducerImplTest {
private val streamBridge: StreamBridge = mock()
private val schemaValidator: SchemaValidator = mock()
private val producerImpl = ProductProducerImpl(
streamBridge = streamBridge,
schemaValidator = schemaValidator,
)
@Test
fun produceProductSync_success() {
val guid = UUID.randomUUID()
val createdAt = OffsetDateTime.now().minusDays(2)
val updatedAt = OffsetDateTime.now().minusHours(1)
val productDto = ProductDto(
id = 123,
guid = guid.toString(),
name = "name",
description = null,
price = 10050,
createdAt = createdAt.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME),
updatedAt = updatedAt.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME),
deletedAt = null,
)
val captor = argumentCaptor<Message<String>>()
whenever(streamBridge.send(any(), captor.capture())).thenReturn(true)
producerImpl.produceProductSync(productDto)
assertEquals(1, captor.allValues.count())
val actualArgument = captor.firstValue
val actualProductDto = Json.decodeFromString<ProductDto>(actualArgument.payload)
assertEquals(productDto, actualProductDto)
assertEquals("some-custom-header", actualArgument.headers["X-Custom-Header"])
verify(streamBridge, times(1)).send(eq("productSyncProducer"), any())
verify(schemaValidator, times(1)).validate("kafka-product-sync", Json.encodeToJsonElement(productDto))
}
}