mirror of
https://github.com/Dannecron/spring-boot-demo.git
synced 2025-12-25 16:22:35 +03:00
refactor services
This commit is contained in:
@@ -8,7 +8,6 @@ import kotlinx.serialization.json.Json
|
||||
import kotlinx.serialization.json.encodeToJsonElement
|
||||
import org.springframework.kafka.core.KafkaTemplate
|
||||
import org.springframework.kafka.support.KafkaHeaders
|
||||
import org.springframework.messaging.Message
|
||||
import org.springframework.messaging.support.MessageBuilder
|
||||
import org.springframework.stereotype.Service
|
||||
|
||||
@@ -18,17 +17,16 @@ class ProducerImpl(
|
||||
private val schemaValidator: SchemaValidator,
|
||||
): Producer {
|
||||
override fun produceProductInfo(topicName: String, product: Product) {
|
||||
Json.encodeToJsonElement(ProductDto(product)).let {
|
||||
schemaValidator.validate(SCHEMA_KAFKA_PRODUCT_SYNC, it)
|
||||
|
||||
val serializedProduct = Json.encodeToJsonElement(ProductDto(product))
|
||||
|
||||
schemaValidator.validate(SCHEMA_KAFKA_PRODUCT_SYNC, serializedProduct)
|
||||
|
||||
val message: Message<String> = MessageBuilder
|
||||
.withPayload(serializedProduct.toString())
|
||||
MessageBuilder.withPayload(it.toString())
|
||||
.setHeader(KafkaHeaders.TOPIC, topicName)
|
||||
.setHeader("X-Custom-Header", "some-custom-header")
|
||||
.build()
|
||||
|
||||
kafkaTemplate.send(message)
|
||||
}
|
||||
.let {
|
||||
msg -> kafkaTemplate.send(msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,12 +11,9 @@ import java.time.format.DateTimeFormatter
|
||||
class OffsetDateTimeSerialization: KSerializer<OffsetDateTime> {
|
||||
override val descriptor = PrimitiveSerialDescriptor("Time", PrimitiveKind.STRING)
|
||||
|
||||
override fun deserialize(decoder: Decoder): OffsetDateTime {
|
||||
return OffsetDateTime.parse(decoder.decodeString())
|
||||
}
|
||||
override fun deserialize(decoder: Decoder): OffsetDateTime = OffsetDateTime.parse(decoder.decodeString())
|
||||
|
||||
override fun serialize(encoder: Encoder, value: OffsetDateTime) {
|
||||
encoder.encodeString(value.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME))
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,12 +10,9 @@ import java.util.*
|
||||
class UuidSerialization: KSerializer<UUID> {
|
||||
override val descriptor = PrimitiveSerialDescriptor("UUID", PrimitiveKind.STRING)
|
||||
|
||||
override fun deserialize(decoder: Decoder): UUID {
|
||||
return UUID.fromString(decoder.decodeString())
|
||||
}
|
||||
override fun deserialize(decoder: Decoder): UUID = UUID.fromString(decoder.decodeString())
|
||||
|
||||
override fun serialize(encoder: Encoder, value: UUID) {
|
||||
encoder.encodeString(value.toString())
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,10 +5,10 @@ import com.github.dannecron.demo.services.validation.exceptions.SchemaNotFoundEx
|
||||
import kotlinx.serialization.json.JsonElement
|
||||
|
||||
interface SchemaValidator {
|
||||
@Throws(ElementNotValidException::class, SchemaNotFoundException::class)
|
||||
fun validate(schemaName: String, value: JsonElement)
|
||||
|
||||
companion object {
|
||||
const val SCHEMA_KAFKA_PRODUCT_SYNC = "kafka-product-sync"
|
||||
}
|
||||
|
||||
@Throws(ElementNotValidException::class, SchemaNotFoundException::class)
|
||||
fun validate(schemaName: String, value: JsonElement)
|
||||
}
|
||||
|
||||
@@ -13,18 +13,16 @@ class SchemaValidatorImp(
|
||||
private val loadedSchema: MutableMap<String, String> = mutableMapOf()
|
||||
|
||||
override fun validate(schemaName: String, value: JsonElement) {
|
||||
|
||||
val schema = JsonSchema.fromDefinition(
|
||||
JsonSchema.fromDefinition(
|
||||
getSchema(schemaName),
|
||||
)
|
||||
|
||||
).also {
|
||||
val errors = mutableListOf<ValidationError>()
|
||||
|
||||
val valid = schema.validate(value, errors::add)
|
||||
if (!valid) {
|
||||
if (!it.validate(value, errors::add)) {
|
||||
throw ElementNotValidException(errors)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun getSchema(schemaName: String): String {
|
||||
val loaded = loadedSchema[schemaName]
|
||||
|
||||
Reference in New Issue
Block a user