diff --git a/build.gradle.kts b/build.gradle.kts index 77cef8f..1d5ea89 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -26,6 +26,7 @@ dependencies { implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.15.4") implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.15.4") + implementation("io.github.optimumcode:json-schema-validator:0.2.3") implementation("org.flywaydb:flyway-core:9.22.3") implementation("org.jetbrains.kotlin:kotlin-reflect:2.0.20") implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.6.3") diff --git a/src/main/kotlin/com/example/demo/config/AppConfig.kt b/src/main/kotlin/com/example/demo/config/AppConfig.kt index 1a546f1..0f28ae6 100644 --- a/src/main/kotlin/com/example/demo/config/AppConfig.kt +++ b/src/main/kotlin/com/example/demo/config/AppConfig.kt @@ -9,17 +9,19 @@ import com.example.demo.services.database.city.CityServiceImpl import com.example.demo.services.database.product.ProductService import com.example.demo.services.database.product.ProductServiceImpl import com.example.demo.services.kafka.Producer +import com.example.demo.services.kafka.SchemaValidator import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.SerializationFeature import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule import org.springframework.beans.factory.annotation.Autowired -import org.springframework.beans.factory.annotation.Value +import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration + @Configuration +@EnableConfigurationProperties(KafkaProperties::class) class AppConfig( - @Value("\${kafka.producer.product.default-sync-topic}") - private val defaultProductSyncTopic: String + @Autowired private val kafkaProperties: KafkaProperties, ) { @Bean fun objectMapper(): ObjectMapper { @@ -38,12 +40,15 @@ class AppConfig( @Autowired productRepository: ProductRepository, @Autowired producer: Producer, ): ProductService = ProductServiceImpl( - defaultProductSyncTopic, + kafkaProperties.producer.product.defaultSyncTopic, productRepository, producer, ) @Bean fun cityService(@Autowired cityRepository: CityRepository): CityService = CityServiceImpl(cityRepository) + + @Bean + fun schemaValidator(): SchemaValidator = SchemaValidator(kafkaProperties.validation.schema) } diff --git a/src/main/kotlin/com/example/demo/config/KafkaConsumerConfig.kt b/src/main/kotlin/com/example/demo/config/KafkaConsumerConfig.kt index 4841730..7fd4bfb 100644 --- a/src/main/kotlin/com/example/demo/config/KafkaConsumerConfig.kt +++ b/src/main/kotlin/com/example/demo/config/KafkaConsumerConfig.kt @@ -5,7 +5,6 @@ import com.example.demo.services.kafka.Consumer import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.StringDeserializer import org.springframework.beans.factory.annotation.Autowired -import org.springframework.beans.factory.annotation.Value import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory @@ -14,23 +13,18 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory @Configuration class KafkaConsumerConfig( - @Value("\${kafka.bootstrap-servers}") - val servers: String, - @Value("\${kafka.consumer.group-id}") - val consumerGroup: String, + @Autowired val kafkaProperties: KafkaProperties ) { @Bean fun consumer( @Autowired cityService: CityService, - ): Consumer = Consumer( - cityService = cityService, - ) + ): Consumer = Consumer(cityService) @Bean fun consumerFactory(): ConsumerFactory { val configs = mapOf( - ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to servers, - ConsumerConfig.GROUP_ID_CONFIG to consumerGroup, + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaProperties.bootstrapServers, + ConsumerConfig.GROUP_ID_CONFIG to kafkaProperties.consumer.groupId, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, ) diff --git a/src/main/kotlin/com/example/demo/config/KafkaProducerConfig.kt b/src/main/kotlin/com/example/demo/config/KafkaProducerConfig.kt index 43470ca..91a6eba 100644 --- a/src/main/kotlin/com/example/demo/config/KafkaProducerConfig.kt +++ b/src/main/kotlin/com/example/demo/config/KafkaProducerConfig.kt @@ -2,10 +2,10 @@ package com.example.demo.config import com.example.demo.services.kafka.Producer import com.example.demo.services.kafka.ProducerImpl +import com.example.demo.services.kafka.SchemaValidator import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.serialization.StringSerializer import org.springframework.beans.factory.annotation.Autowired -import org.springframework.beans.factory.annotation.Value import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.kafka.core.DefaultKafkaProducerFactory @@ -14,14 +14,13 @@ import org.springframework.kafka.core.ProducerFactory @Configuration class KafkaProducerConfig( - @Value("\${kafka.bootstrap-servers}") - val servers: String + @Autowired val kafkaProperties: KafkaProperties ) { @Bean fun producerFactory(): ProducerFactory { val configProps: MutableMap = HashMap() - configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = servers + configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = kafkaProperties.bootstrapServers configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java @@ -36,7 +35,9 @@ class KafkaProducerConfig( @Bean fun producer( @Autowired kafkaTemplate: KafkaTemplate, + @Autowired schemaValidator: SchemaValidator, ): Producer = ProducerImpl( kafkaTemplate, + schemaValidator, ) } \ No newline at end of file diff --git a/src/main/kotlin/com/example/demo/config/KafkaProperties.kt b/src/main/kotlin/com/example/demo/config/KafkaProperties.kt new file mode 100644 index 0000000..f51d0be --- /dev/null +++ b/src/main/kotlin/com/example/demo/config/KafkaProperties.kt @@ -0,0 +1,30 @@ +package com.example.demo.config + +import org.springframework.boot.context.properties.ConfigurationProperties +import org.springframework.boot.context.properties.bind.ConstructorBinding + +@ConfigurationProperties("kafka") +data class KafkaProperties @ConstructorBinding constructor( + val bootstrapServers: String, + val producer: Producer, + val consumer: Consumer, + val validation: Validation, +) { + data class Producer( + val product: Product, + ) { + data class Product( + val defaultSyncTopic: String + ) + } + + data class Consumer( + val groupId: String, + val topics: String, + val autoStartup: Boolean, + ) + + data class Validation( + val schema: Map + ) +} \ No newline at end of file diff --git a/src/main/kotlin/com/example/demo/services/kafka/ProducerImpl.kt b/src/main/kotlin/com/example/demo/services/kafka/ProducerImpl.kt index 3544640..0da67f0 100644 --- a/src/main/kotlin/com/example/demo/services/kafka/ProducerImpl.kt +++ b/src/main/kotlin/com/example/demo/services/kafka/ProducerImpl.kt @@ -13,10 +13,14 @@ import org.springframework.stereotype.Service @Service class ProducerImpl( private val kafkaTemplate: KafkaTemplate, + private val schemaValidator: SchemaValidator, ): Producer { override fun produceProductInfo(topicName: String, product: Product) { val serializedProduct = Json.encodeToJsonElement(ProductDto(product)) + + schemaValidator.validate("product-sync", serializedProduct) + val message: Message = MessageBuilder .withPayload(serializedProduct.toString()) .setHeader(KafkaHeaders.TOPIC, topicName) diff --git a/src/main/kotlin/com/example/demo/services/kafka/SchemaValidator.kt b/src/main/kotlin/com/example/demo/services/kafka/SchemaValidator.kt new file mode 100644 index 0000000..968e52c --- /dev/null +++ b/src/main/kotlin/com/example/demo/services/kafka/SchemaValidator.kt @@ -0,0 +1,46 @@ +package com.example.demo.services.kafka + +import io.github.optimumcode.json.schema.JsonSchema +import io.github.optimumcode.json.schema.ValidationError +import kotlinx.serialization.json.JsonElement +import org.springframework.util.ResourceUtils + +class SchemaValidator( + private val schemaMap: Map, +) { + private val loadedSchema: MutableMap = mutableMapOf() + + fun validate(schemaName: String, value: JsonElement) { + + val schema = JsonSchema.fromDefinition( + getSchema(schemaName), + ) + + val errors = mutableListOf() + + val valid = schema.validate(value, errors::add) + if (!valid) { + // todo throw another exception + println(errors.toString()) + + throw RuntimeException("invalid schema") + } + } + + private fun getSchema(schemaName: String): String { + val loaded = loadedSchema[schemaName] + if (loaded != null) { + return loaded + } + + val schemaFile = schemaMap[schemaName] + ?: // todo throw another exception + throw RuntimeException("unknown json-schema") + + val schema = ResourceUtils.getFile("classpath:json-schemas/$schemaFile") + .readText(Charsets.UTF_8) + loadedSchema[schemaName] = schema + + return schema + } +} \ No newline at end of file diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index fec240b..0d008c6 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -24,6 +24,9 @@ kafka: group-id: demo-consumer topics: demo-city-sync auto-startup: true + validation: + schema: + product-sync: product/sync.json springdoc: api-docs: diff --git a/src/main/resources/json-schemas/product/sync.json b/src/main/resources/json-schemas/product/sync.json new file mode 100644 index 0000000..226d730 --- /dev/null +++ b/src/main/resources/json-schemas/product/sync.json @@ -0,0 +1,41 @@ +{ + "schema": "http://json-schema.org/draft-07/schema#", + "title": "event sync product", + "type": "object", + "required": ["id", "guid"], + "properties": { + "id": { + "type": "number" + }, + "guid": { + "type": "string" + }, + "name": { + "type": "string" + }, + "description": { + "oneOf": [ + { "type": "string" }, + { "type": "null" } + ] + }, + "number": { + "type": "string" + }, + "createdAt": { + "type": "string" + }, + "updatedAt": { + "oneOf": [ + { "type": "string" }, + { "type": "null" } + ] + }, + "deletedAt": { + "oneOf": [ + { "type": "string" }, + { "type": "null" } + ] + } + } +} \ No newline at end of file diff --git a/src/test/kotlin/com/example/demo/BaseUnitTest.kt b/src/test/kotlin/com/example/demo/BaseUnitTest.kt index 4124e58..55426b4 100644 --- a/src/test/kotlin/com/example/demo/BaseUnitTest.kt +++ b/src/test/kotlin/com/example/demo/BaseUnitTest.kt @@ -1,9 +1,33 @@ package com.example.demo +import com.example.demo.config.KafkaProperties import com.example.demo.services.kafka.Consumer +import org.springframework.boot.test.context.TestConfiguration import org.springframework.boot.test.mock.mockito.MockBean +import org.springframework.context.annotation.Bean open class BaseUnitTest { @MockBean lateinit var consumer: Consumer + + @TestConfiguration + class TestConfig { + @Bean + fun kafkaProperties(): KafkaProperties = KafkaProperties( + bootstrapServers = "localhost:1111", + producer = KafkaProperties.Producer( + product = KafkaProperties.Producer.Product( + defaultSyncTopic = "some-default", + ), + ), + consumer = KafkaProperties.Consumer( + groupId = "group", + topics = "topic", + autoStartup = false, + ), + validation = KafkaProperties.Validation( + schema = mapOf("product-sync" to "foo"), + ), + ) + } } \ No newline at end of file diff --git a/src/test/kotlin/com/example/demo/services/kafka/ProducerImplTest.kt b/src/test/kotlin/com/example/demo/services/kafka/ProducerImplTest.kt index 1220737..36c8d99 100644 --- a/src/test/kotlin/com/example/demo/services/kafka/ProducerImplTest.kt +++ b/src/test/kotlin/com/example/demo/services/kafka/ProducerImplTest.kt @@ -4,10 +4,9 @@ import com.example.demo.BaseUnitTest import com.example.demo.models.Product import com.example.demo.services.kafka.dto.ProductDto import kotlinx.serialization.json.Json +import kotlinx.serialization.json.encodeToJsonElement import org.junit.runner.RunWith -import org.mockito.kotlin.argumentCaptor -import org.mockito.kotlin.doReturn -import org.mockito.kotlin.whenever +import org.mockito.kotlin.* import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.test.context.SpringBootTest import org.springframework.boot.test.mock.mockito.MockBean @@ -31,6 +30,9 @@ class ProducerImplTest: BaseUnitTest() { @MockBean private lateinit var kafkaTemplate: KafkaTemplate + @MockBean + private lateinit var schemaValidator: SchemaValidator + @Test fun produceProductInfo_success() { val topic = "some-topic" @@ -50,6 +52,12 @@ class ProducerImplTest: BaseUnitTest() { whenever(kafkaTemplate.send(captor.capture())) .doReturn(CompletableFuture>()) + whenever(schemaValidator.validate( + eq("product-sync"), + eq(Json.encodeToJsonElement(product)) + )) + .doAnswer { } + producerImpl.produceProductInfo(topic, product) assertEquals(1, captor.allValues.count())