diff --git a/src/main/kotlin/com/github/dannecron/demo/services/kafka/ProducerImpl.kt b/src/main/kotlin/com/github/dannecron/demo/services/kafka/ProducerImpl.kt index 1cfd932..a9c9561 100644 --- a/src/main/kotlin/com/github/dannecron/demo/services/kafka/ProducerImpl.kt +++ b/src/main/kotlin/com/github/dannecron/demo/services/kafka/ProducerImpl.kt @@ -8,7 +8,6 @@ import kotlinx.serialization.json.Json import kotlinx.serialization.json.encodeToJsonElement import org.springframework.kafka.core.KafkaTemplate import org.springframework.kafka.support.KafkaHeaders -import org.springframework.messaging.Message import org.springframework.messaging.support.MessageBuilder import org.springframework.stereotype.Service @@ -18,17 +17,16 @@ class ProducerImpl( private val schemaValidator: SchemaValidator, ): Producer { override fun produceProductInfo(topicName: String, product: Product) { + Json.encodeToJsonElement(ProductDto(product)).let { + schemaValidator.validate(SCHEMA_KAFKA_PRODUCT_SYNC, it) - val serializedProduct = Json.encodeToJsonElement(ProductDto(product)) - - schemaValidator.validate(SCHEMA_KAFKA_PRODUCT_SYNC, serializedProduct) - - val message: Message = MessageBuilder - .withPayload(serializedProduct.toString()) - .setHeader(KafkaHeaders.TOPIC, topicName) - .setHeader("X-Custom-Header", "some-custom-header") - .build() - - kafkaTemplate.send(message) + MessageBuilder.withPayload(it.toString()) + .setHeader(KafkaHeaders.TOPIC, topicName) + .setHeader("X-Custom-Header", "some-custom-header") + .build() + } + .let { + msg -> kafkaTemplate.send(msg) + } } } diff --git a/src/main/kotlin/com/github/dannecron/demo/services/serializables/OffsetDateTimeSerialization.kt b/src/main/kotlin/com/github/dannecron/demo/services/serializables/OffsetDateTimeSerialization.kt index 60da6d8..b685ae5 100644 --- a/src/main/kotlin/com/github/dannecron/demo/services/serializables/OffsetDateTimeSerialization.kt +++ b/src/main/kotlin/com/github/dannecron/demo/services/serializables/OffsetDateTimeSerialization.kt @@ -11,12 +11,9 @@ import java.time.format.DateTimeFormatter class OffsetDateTimeSerialization: KSerializer { override val descriptor = PrimitiveSerialDescriptor("Time", PrimitiveKind.STRING) - override fun deserialize(decoder: Decoder): OffsetDateTime { - return OffsetDateTime.parse(decoder.decodeString()) - } + override fun deserialize(decoder: Decoder): OffsetDateTime = OffsetDateTime.parse(decoder.decodeString()) override fun serialize(encoder: Encoder, value: OffsetDateTime) { encoder.encodeString(value.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME)) - } } diff --git a/src/main/kotlin/com/github/dannecron/demo/services/serializables/UuidSerialization.kt b/src/main/kotlin/com/github/dannecron/demo/services/serializables/UuidSerialization.kt index d9386b5..7348cda 100644 --- a/src/main/kotlin/com/github/dannecron/demo/services/serializables/UuidSerialization.kt +++ b/src/main/kotlin/com/github/dannecron/demo/services/serializables/UuidSerialization.kt @@ -10,12 +10,9 @@ import java.util.* class UuidSerialization: KSerializer { override val descriptor = PrimitiveSerialDescriptor("UUID", PrimitiveKind.STRING) - override fun deserialize(decoder: Decoder): UUID { - return UUID.fromString(decoder.decodeString()) - } + override fun deserialize(decoder: Decoder): UUID = UUID.fromString(decoder.decodeString()) override fun serialize(encoder: Encoder, value: UUID) { encoder.encodeString(value.toString()) - } } diff --git a/src/main/kotlin/com/github/dannecron/demo/services/validation/SchemaValidator.kt b/src/main/kotlin/com/github/dannecron/demo/services/validation/SchemaValidator.kt index 91f5b49..44f353a 100644 --- a/src/main/kotlin/com/github/dannecron/demo/services/validation/SchemaValidator.kt +++ b/src/main/kotlin/com/github/dannecron/demo/services/validation/SchemaValidator.kt @@ -5,10 +5,10 @@ import com.github.dannecron.demo.services.validation.exceptions.SchemaNotFoundEx import kotlinx.serialization.json.JsonElement interface SchemaValidator { - @Throws(ElementNotValidException::class, SchemaNotFoundException::class) - fun validate(schemaName: String, value: JsonElement) - companion object { const val SCHEMA_KAFKA_PRODUCT_SYNC = "kafka-product-sync" } + + @Throws(ElementNotValidException::class, SchemaNotFoundException::class) + fun validate(schemaName: String, value: JsonElement) } diff --git a/src/main/kotlin/com/github/dannecron/demo/services/validation/SchemaValidatorImp.kt b/src/main/kotlin/com/github/dannecron/demo/services/validation/SchemaValidatorImp.kt index 38fc4ac..3ae174d 100644 --- a/src/main/kotlin/com/github/dannecron/demo/services/validation/SchemaValidatorImp.kt +++ b/src/main/kotlin/com/github/dannecron/demo/services/validation/SchemaValidatorImp.kt @@ -13,16 +13,14 @@ class SchemaValidatorImp( private val loadedSchema: MutableMap = mutableMapOf() override fun validate(schemaName: String, value: JsonElement) { - - val schema = JsonSchema.fromDefinition( + JsonSchema.fromDefinition( getSchema(schemaName), - ) + ).also { + val errors = mutableListOf() - val errors = mutableListOf() - - val valid = schema.validate(value, errors::add) - if (!valid) { - throw ElementNotValidException(errors) + if (!it.validate(value, errors::add)) { + throw ElementNotValidException(errors) + } } }