add schema validator

use ConfigurationProperties instead of Value for configuration
This commit is contained in:
Denis Savosin
2024-10-04 17:59:08 +07:00
parent f58a99a68d
commit ea77bf8b61
11 changed files with 178 additions and 21 deletions

View File

@@ -26,6 +26,7 @@ dependencies {
implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.15.4") implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.15.4")
implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.15.4") implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.15.4")
implementation("io.github.optimumcode:json-schema-validator:0.2.3")
implementation("org.flywaydb:flyway-core:9.22.3") implementation("org.flywaydb:flyway-core:9.22.3")
implementation("org.jetbrains.kotlin:kotlin-reflect:2.0.20") implementation("org.jetbrains.kotlin:kotlin-reflect:2.0.20")
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.6.3") implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.6.3")

View File

@@ -9,17 +9,19 @@ import com.example.demo.services.database.city.CityServiceImpl
import com.example.demo.services.database.product.ProductService import com.example.demo.services.database.product.ProductService
import com.example.demo.services.database.product.ProductServiceImpl import com.example.demo.services.database.product.ProductServiceImpl
import com.example.demo.services.kafka.Producer import com.example.demo.services.kafka.Producer
import com.example.demo.services.kafka.SchemaValidator
import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.SerializationFeature import com.fasterxml.jackson.databind.SerializationFeature
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration import org.springframework.context.annotation.Configuration
@Configuration @Configuration
@EnableConfigurationProperties(KafkaProperties::class)
class AppConfig( class AppConfig(
@Value("\${kafka.producer.product.default-sync-topic}") @Autowired private val kafkaProperties: KafkaProperties,
private val defaultProductSyncTopic: String
) { ) {
@Bean @Bean
fun objectMapper(): ObjectMapper { fun objectMapper(): ObjectMapper {
@@ -38,12 +40,15 @@ class AppConfig(
@Autowired productRepository: ProductRepository, @Autowired productRepository: ProductRepository,
@Autowired producer: Producer, @Autowired producer: Producer,
): ProductService = ProductServiceImpl( ): ProductService = ProductServiceImpl(
defaultProductSyncTopic, kafkaProperties.producer.product.defaultSyncTopic,
productRepository, productRepository,
producer, producer,
) )
@Bean @Bean
fun cityService(@Autowired cityRepository: CityRepository): CityService = CityServiceImpl(cityRepository) fun cityService(@Autowired cityRepository: CityRepository): CityService = CityServiceImpl(cityRepository)
@Bean
fun schemaValidator(): SchemaValidator = SchemaValidator(kafkaProperties.validation.schema)
} }

View File

@@ -5,7 +5,6 @@ import com.example.demo.services.kafka.Consumer
import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer import org.apache.kafka.common.serialization.StringDeserializer
import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration import org.springframework.context.annotation.Configuration
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
@@ -14,23 +13,18 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory
@Configuration @Configuration
class KafkaConsumerConfig( class KafkaConsumerConfig(
@Value("\${kafka.bootstrap-servers}") @Autowired val kafkaProperties: KafkaProperties
val servers: String,
@Value("\${kafka.consumer.group-id}")
val consumerGroup: String,
) { ) {
@Bean @Bean
fun consumer( fun consumer(
@Autowired cityService: CityService, @Autowired cityService: CityService,
): Consumer = Consumer( ): Consumer = Consumer(cityService)
cityService = cityService,
)
@Bean @Bean
fun consumerFactory(): ConsumerFactory<String, String> { fun consumerFactory(): ConsumerFactory<String, String> {
val configs = mapOf( val configs = mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to servers, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaProperties.bootstrapServers,
ConsumerConfig.GROUP_ID_CONFIG to consumerGroup, ConsumerConfig.GROUP_ID_CONFIG to kafkaProperties.consumer.groupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
) )

View File

@@ -2,10 +2,10 @@ package com.example.demo.config
import com.example.demo.services.kafka.Producer import com.example.demo.services.kafka.Producer
import com.example.demo.services.kafka.ProducerImpl import com.example.demo.services.kafka.ProducerImpl
import com.example.demo.services.kafka.SchemaValidator
import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringSerializer import org.apache.kafka.common.serialization.StringSerializer
import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration import org.springframework.context.annotation.Configuration
import org.springframework.kafka.core.DefaultKafkaProducerFactory import org.springframework.kafka.core.DefaultKafkaProducerFactory
@@ -14,14 +14,13 @@ import org.springframework.kafka.core.ProducerFactory
@Configuration @Configuration
class KafkaProducerConfig( class KafkaProducerConfig(
@Value("\${kafka.bootstrap-servers}") @Autowired val kafkaProperties: KafkaProperties
val servers: String
) { ) {
@Bean @Bean
fun producerFactory(): ProducerFactory<String, Any> { fun producerFactory(): ProducerFactory<String, Any> {
val configProps: MutableMap<String, Any> = HashMap() val configProps: MutableMap<String, Any> = HashMap()
configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = servers configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = kafkaProperties.bootstrapServers
configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
@@ -36,7 +35,9 @@ class KafkaProducerConfig(
@Bean @Bean
fun producer( fun producer(
@Autowired kafkaTemplate: KafkaTemplate<String, Any>, @Autowired kafkaTemplate: KafkaTemplate<String, Any>,
@Autowired schemaValidator: SchemaValidator,
): Producer = ProducerImpl( ): Producer = ProducerImpl(
kafkaTemplate, kafkaTemplate,
schemaValidator,
) )
} }

View File

@@ -0,0 +1,30 @@
package com.example.demo.config
import org.springframework.boot.context.properties.ConfigurationProperties
import org.springframework.boot.context.properties.bind.ConstructorBinding
@ConfigurationProperties("kafka")
data class KafkaProperties @ConstructorBinding constructor(
val bootstrapServers: String,
val producer: Producer,
val consumer: Consumer,
val validation: Validation,
) {
data class Producer(
val product: Product,
) {
data class Product(
val defaultSyncTopic: String
)
}
data class Consumer(
val groupId: String,
val topics: String,
val autoStartup: Boolean,
)
data class Validation(
val schema: Map<String, String>
)
}

View File

@@ -13,10 +13,14 @@ import org.springframework.stereotype.Service
@Service @Service
class ProducerImpl( class ProducerImpl(
private val kafkaTemplate: KafkaTemplate<String, Any>, private val kafkaTemplate: KafkaTemplate<String, Any>,
private val schemaValidator: SchemaValidator,
): Producer { ): Producer {
override fun produceProductInfo(topicName: String, product: Product) { override fun produceProductInfo(topicName: String, product: Product) {
val serializedProduct = Json.encodeToJsonElement(ProductDto(product)) val serializedProduct = Json.encodeToJsonElement(ProductDto(product))
schemaValidator.validate("product-sync", serializedProduct)
val message: Message<String> = MessageBuilder val message: Message<String> = MessageBuilder
.withPayload(serializedProduct.toString()) .withPayload(serializedProduct.toString())
.setHeader(KafkaHeaders.TOPIC, topicName) .setHeader(KafkaHeaders.TOPIC, topicName)

View File

@@ -0,0 +1,46 @@
package com.example.demo.services.kafka
import io.github.optimumcode.json.schema.JsonSchema
import io.github.optimumcode.json.schema.ValidationError
import kotlinx.serialization.json.JsonElement
import org.springframework.util.ResourceUtils
class SchemaValidator(
private val schemaMap: Map<String, String>,
) {
private val loadedSchema: MutableMap<String, String> = mutableMapOf()
fun validate(schemaName: String, value: JsonElement) {
val schema = JsonSchema.fromDefinition(
getSchema(schemaName),
)
val errors = mutableListOf<ValidationError>()
val valid = schema.validate(value, errors::add)
if (!valid) {
// todo throw another exception
println(errors.toString())
throw RuntimeException("invalid schema")
}
}
private fun getSchema(schemaName: String): String {
val loaded = loadedSchema[schemaName]
if (loaded != null) {
return loaded
}
val schemaFile = schemaMap[schemaName]
?: // todo throw another exception
throw RuntimeException("unknown json-schema")
val schema = ResourceUtils.getFile("classpath:json-schemas/$schemaFile")
.readText(Charsets.UTF_8)
loadedSchema[schemaName] = schema
return schema
}
}

View File

@@ -24,6 +24,9 @@ kafka:
group-id: demo-consumer group-id: demo-consumer
topics: demo-city-sync topics: demo-city-sync
auto-startup: true auto-startup: true
validation:
schema:
product-sync: product/sync.json
springdoc: springdoc:
api-docs: api-docs:

View File

@@ -0,0 +1,41 @@
{
"schema": "http://json-schema.org/draft-07/schema#",
"title": "event sync product",
"type": "object",
"required": ["id", "guid"],
"properties": {
"id": {
"type": "number"
},
"guid": {
"type": "string"
},
"name": {
"type": "string"
},
"description": {
"oneOf": [
{ "type": "string" },
{ "type": "null" }
]
},
"number": {
"type": "string"
},
"createdAt": {
"type": "string"
},
"updatedAt": {
"oneOf": [
{ "type": "string" },
{ "type": "null" }
]
},
"deletedAt": {
"oneOf": [
{ "type": "string" },
{ "type": "null" }
]
}
}
}

View File

@@ -1,9 +1,33 @@
package com.example.demo package com.example.demo
import com.example.demo.config.KafkaProperties
import com.example.demo.services.kafka.Consumer import com.example.demo.services.kafka.Consumer
import org.springframework.boot.test.context.TestConfiguration
import org.springframework.boot.test.mock.mockito.MockBean import org.springframework.boot.test.mock.mockito.MockBean
import org.springframework.context.annotation.Bean
open class BaseUnitTest { open class BaseUnitTest {
@MockBean @MockBean
lateinit var consumer: Consumer lateinit var consumer: Consumer
@TestConfiguration
class TestConfig {
@Bean
fun kafkaProperties(): KafkaProperties = KafkaProperties(
bootstrapServers = "localhost:1111",
producer = KafkaProperties.Producer(
product = KafkaProperties.Producer.Product(
defaultSyncTopic = "some-default",
),
),
consumer = KafkaProperties.Consumer(
groupId = "group",
topics = "topic",
autoStartup = false,
),
validation = KafkaProperties.Validation(
schema = mapOf("product-sync" to "foo"),
),
)
}
} }

View File

@@ -4,10 +4,9 @@ import com.example.demo.BaseUnitTest
import com.example.demo.models.Product import com.example.demo.models.Product
import com.example.demo.services.kafka.dto.ProductDto import com.example.demo.services.kafka.dto.ProductDto
import kotlinx.serialization.json.Json import kotlinx.serialization.json.Json
import kotlinx.serialization.json.encodeToJsonElement
import org.junit.runner.RunWith import org.junit.runner.RunWith
import org.mockito.kotlin.argumentCaptor import org.mockito.kotlin.*
import org.mockito.kotlin.doReturn
import org.mockito.kotlin.whenever
import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest import org.springframework.boot.test.context.SpringBootTest
import org.springframework.boot.test.mock.mockito.MockBean import org.springframework.boot.test.mock.mockito.MockBean
@@ -31,6 +30,9 @@ class ProducerImplTest: BaseUnitTest() {
@MockBean @MockBean
private lateinit var kafkaTemplate: KafkaTemplate<String, Any> private lateinit var kafkaTemplate: KafkaTemplate<String, Any>
@MockBean
private lateinit var schemaValidator: SchemaValidator
@Test @Test
fun produceProductInfo_success() { fun produceProductInfo_success() {
val topic = "some-topic" val topic = "some-topic"
@@ -50,6 +52,12 @@ class ProducerImplTest: BaseUnitTest() {
whenever(kafkaTemplate.send(captor.capture())) whenever(kafkaTemplate.send(captor.capture()))
.doReturn(CompletableFuture<SendResult<String, Any>>()) .doReturn(CompletableFuture<SendResult<String, Any>>())
whenever(schemaValidator.validate(
eq("product-sync"),
eq(Json.encodeToJsonElement(product))
))
.doAnswer { }
producerImpl.produceProductInfo(topic, product) producerImpl.produceProductInfo(topic, product)
assertEquals(1, captor.allValues.count()) assertEquals(1, captor.allValues.count())