From 41b6d1005943d458768e9f95551a14e0dcbf7658 Mon Sep 17 00:00:00 2001 From: Denis Savosin Date: Tue, 8 Oct 2024 11:41:26 +0700 Subject: [PATCH] add custom prometheus metric add tests for kafka consumer --- .../demo/config/KafkaConsumerConfig.kt | 10 ++- .../example/demo/config/KafkaProperties.kt | 1 + .../example/demo/services/kafka/Consumer.kt | 10 +++ src/main/resources/application.yml | 1 + .../{BaseFeatureTest.kt => BaseDbTest.kt} | 4 +- .../kotlin/com/example/demo/BaseUnitTest.kt | 1 + ...eatureTest.kt => CityServiceImplDbTest.kt} | 4 +- ...ureTest.kt => ProductServiceImplDbTest.kt} | 4 +- .../demo/services/kafka/ConsumerKfkTest.kt | 79 +++++++++++++++++++ ...ication_feature.yml => application_db.yml} | 1 + src/test/resources/application_kfk.yml | 21 +++++ 11 files changed, 128 insertions(+), 8 deletions(-) rename src/test/kotlin/com/example/demo/{BaseFeatureTest.kt => BaseDbTest.kt} (93%) rename src/test/kotlin/com/example/demo/services/database/city/{CityServiceImplFeatureTest.kt => CityServiceImplDbTest.kt} (95%) rename src/test/kotlin/com/example/demo/services/database/product/{ProductServiceImplFeatureTest.kt => ProductServiceImplDbTest.kt} (96%) create mode 100644 src/test/kotlin/com/example/demo/services/kafka/ConsumerKfkTest.kt rename src/test/resources/{application_feature.yml => application_db.yml} (93%) create mode 100644 src/test/resources/application_kfk.yml diff --git a/src/main/kotlin/com/example/demo/config/KafkaConsumerConfig.kt b/src/main/kotlin/com/example/demo/config/KafkaConsumerConfig.kt index 7fd4bfb..10adee4 100644 --- a/src/main/kotlin/com/example/demo/config/KafkaConsumerConfig.kt +++ b/src/main/kotlin/com/example/demo/config/KafkaConsumerConfig.kt @@ -2,6 +2,7 @@ package com.example.demo.config import com.example.demo.services.database.city.CityService import com.example.demo.services.kafka.Consumer +import io.micrometer.core.instrument.MeterRegistry import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.StringDeserializer import org.springframework.beans.factory.annotation.Autowired @@ -18,15 +19,20 @@ class KafkaConsumerConfig( @Bean fun consumer( @Autowired cityService: CityService, - ): Consumer = Consumer(cityService) + @Autowired metricRegistry: MeterRegistry + ): Consumer = Consumer( + cityService = cityService, + metricRegistry = metricRegistry, + ) @Bean fun consumerFactory(): ConsumerFactory { val configs = mapOf( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaProperties.bootstrapServers, ConsumerConfig.GROUP_ID_CONFIG to kafkaProperties.consumer.groupId, + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to kafkaProperties.consumer.autoOffsetReset, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java ) return DefaultKafkaConsumerFactory(configs) diff --git a/src/main/kotlin/com/example/demo/config/KafkaProperties.kt b/src/main/kotlin/com/example/demo/config/KafkaProperties.kt index f51d0be..a92abde 100644 --- a/src/main/kotlin/com/example/demo/config/KafkaProperties.kt +++ b/src/main/kotlin/com/example/demo/config/KafkaProperties.kt @@ -22,6 +22,7 @@ data class KafkaProperties @ConstructorBinding constructor( val groupId: String, val topics: String, val autoStartup: Boolean, + val autoOffsetReset: String, ) data class Validation( diff --git a/src/main/kotlin/com/example/demo/services/kafka/Consumer.kt b/src/main/kotlin/com/example/demo/services/kafka/Consumer.kt index 598f14f..83745e5 100644 --- a/src/main/kotlin/com/example/demo/services/kafka/Consumer.kt +++ b/src/main/kotlin/com/example/demo/services/kafka/Consumer.kt @@ -2,6 +2,8 @@ package com.example.demo.services.kafka import com.example.demo.services.database.city.CityService import com.example.demo.services.kafka.dto.CityCreateDto +import io.micrometer.core.instrument.Counter +import io.micrometer.core.instrument.MeterRegistry import kotlinx.serialization.json.Json import org.springframework.kafka.annotation.KafkaListener import org.springframework.messaging.handler.annotation.Payload @@ -10,6 +12,7 @@ import org.springframework.stereotype.Component @Component class Consumer( private val cityService: CityService, + private val metricRegistry: MeterRegistry, ) { @KafkaListener( topics = ["#{'\${kafka.consumer.topics}'.split(',')}"], @@ -17,6 +20,13 @@ class Consumer( ) fun handleCityCreate(@Payload message: String) { val cityCreateDto = Json.decodeFromString(message) + .also { + val counter = Counter.builder("kafka_consumer_city_create") + .description("consumed created city event") + .register(metricRegistry) + counter.increment() + } + cityService.create(cityCreateDto) } } \ No newline at end of file diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 32bce49..0065edf 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -29,6 +29,7 @@ kafka: consumer: group-id: demo-consumer topics: demo-city-sync + auto-offset-reset: none auto-startup: true validation: schema: diff --git a/src/test/kotlin/com/example/demo/BaseFeatureTest.kt b/src/test/kotlin/com/example/demo/BaseDbTest.kt similarity index 93% rename from src/test/kotlin/com/example/demo/BaseFeatureTest.kt rename to src/test/kotlin/com/example/demo/BaseDbTest.kt index 714fa90..0673a09 100644 --- a/src/test/kotlin/com/example/demo/BaseFeatureTest.kt +++ b/src/test/kotlin/com/example/demo/BaseDbTest.kt @@ -8,12 +8,12 @@ import org.springframework.data.jdbc.repository.config.EnableJdbcRepositories import org.springframework.test.context.ActiveProfiles import org.testcontainers.junit.jupiter.Testcontainers -@ActiveProfiles("feature") +@ActiveProfiles("db") @DataJdbcTest @Testcontainers(disabledWithoutDocker = false) @AutoConfigureTestDatabase(replace = AutoConfigureTestDatabase.Replace.NONE) @EnableJdbcRepositories -class BaseFeatureTest { +class BaseDbTest { @MockBean lateinit var producer: Producer } \ No newline at end of file diff --git a/src/test/kotlin/com/example/demo/BaseUnitTest.kt b/src/test/kotlin/com/example/demo/BaseUnitTest.kt index 55426b4..c06d83d 100644 --- a/src/test/kotlin/com/example/demo/BaseUnitTest.kt +++ b/src/test/kotlin/com/example/demo/BaseUnitTest.kt @@ -24,6 +24,7 @@ open class BaseUnitTest { groupId = "group", topics = "topic", autoStartup = false, + autoOffsetReset = "none", ), validation = KafkaProperties.Validation( schema = mapOf("product-sync" to "foo"), diff --git a/src/test/kotlin/com/example/demo/services/database/city/CityServiceImplFeatureTest.kt b/src/test/kotlin/com/example/demo/services/database/city/CityServiceImplDbTest.kt similarity index 95% rename from src/test/kotlin/com/example/demo/services/database/city/CityServiceImplFeatureTest.kt rename to src/test/kotlin/com/example/demo/services/database/city/CityServiceImplDbTest.kt index 1cea4cb..3bf5307 100644 --- a/src/test/kotlin/com/example/demo/services/database/city/CityServiceImplFeatureTest.kt +++ b/src/test/kotlin/com/example/demo/services/database/city/CityServiceImplDbTest.kt @@ -1,6 +1,6 @@ package com.example.demo.services.database.city -import com.example.demo.BaseFeatureTest +import com.example.demo.BaseDbTest import com.example.demo.models.City import com.example.demo.providers.CityRepository import com.example.demo.services.database.city.exceptions.CityNotFoundException @@ -12,7 +12,7 @@ import java.util.* import kotlin.test.* @ContextConfiguration(classes = [CityRepository::class, CityServiceImpl::class]) -class CityServiceImplFeatureTest: BaseFeatureTest() { +class CityServiceImplDbTest: BaseDbTest() { @Autowired private lateinit var cityRepository: CityRepository @Autowired diff --git a/src/test/kotlin/com/example/demo/services/database/product/ProductServiceImplFeatureTest.kt b/src/test/kotlin/com/example/demo/services/database/product/ProductServiceImplDbTest.kt similarity index 96% rename from src/test/kotlin/com/example/demo/services/database/product/ProductServiceImplFeatureTest.kt rename to src/test/kotlin/com/example/demo/services/database/product/ProductServiceImplDbTest.kt index 123f85c..eec14af 100644 --- a/src/test/kotlin/com/example/demo/services/database/product/ProductServiceImplFeatureTest.kt +++ b/src/test/kotlin/com/example/demo/services/database/product/ProductServiceImplDbTest.kt @@ -1,6 +1,6 @@ package com.example.demo.services.database.product -import com.example.demo.BaseFeatureTest +import com.example.demo.BaseDbTest import com.example.demo.models.Product import com.example.demo.providers.ProductRepository import com.example.demo.services.database.exceptions.AlreadyDeletedException @@ -12,7 +12,7 @@ import java.util.* import kotlin.test.* @ContextConfiguration(classes = [ProductRepository::class]) -class ProductServiceImplFeatureTest: BaseFeatureTest() { +class ProductServiceImplDbTest: BaseDbTest() { private lateinit var productService: ProductServiceImpl @Autowired private lateinit var productRepository: ProductRepository diff --git a/src/test/kotlin/com/example/demo/services/kafka/ConsumerKfkTest.kt b/src/test/kotlin/com/example/demo/services/kafka/ConsumerKfkTest.kt new file mode 100644 index 0000000..6b8e9d8 --- /dev/null +++ b/src/test/kotlin/com/example/demo/services/kafka/ConsumerKfkTest.kt @@ -0,0 +1,79 @@ +package com.example.demo.services.kafka + +import com.example.demo.models.City +import com.example.demo.services.database.city.CityService +import com.example.demo.services.kafka.dto.CityCreateDto +import io.micrometer.core.instrument.MeterRegistry +import kotlinx.serialization.encodeToString +import kotlinx.serialization.json.Json +import org.mockito.kotlin.after +import org.mockito.kotlin.doReturn +import org.mockito.kotlin.verify +import org.mockito.kotlin.whenever +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.boot.test.mock.mockito.MockBean +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.support.KafkaHeaders +import org.springframework.kafka.test.context.EmbeddedKafka +import org.springframework.messaging.Message +import org.springframework.messaging.support.MessageBuilder +import org.springframework.test.annotation.DirtiesContext +import org.springframework.test.context.ActiveProfiles +import java.time.OffsetDateTime +import java.time.format.DateTimeFormatter +import java.util.* +import kotlin.test.Test +import kotlin.test.assertEquals + +@ActiveProfiles("kafka") +@SpringBootTest +@EmbeddedKafka( + brokerProperties = ["listeners=PLAINTEXT://localhost:3392", "port=3392"], + topics = ["demo-city-sync"], + partitions = 1, +) +@DirtiesContext +class ConsumerKfkTest( + @Autowired private val kafkaTemplate: KafkaTemplate, + @Autowired private val metricRegistry: MeterRegistry, +) { + @MockBean + private lateinit var cityService: CityService + + @Test + fun consumer_handleCityCreate() { + val cityGuid = UUID.randomUUID() + val cityName = "new-city" + val createdAt = OffsetDateTime.now().minusDays(1) + val cityCreateDto = CityCreateDto( + guid = cityGuid.toString(), + name = cityName, + createdAt = createdAt.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME), + updatedAt = null, + deletedAt = null, + ) + + whenever(cityService.create(cityCreateDto)) doReturn City( + id = 123, + guid = cityGuid, + name = cityName, + createdAt = createdAt, + updatedAt = null, + deletedAt = null, + ) + + val message: Message = MessageBuilder + .withPayload( + Json.encodeToString(cityCreateDto) + ) + .setHeader(KafkaHeaders.TOPIC, "demo-city-sync") + .build() + + kafkaTemplate.send(message) + + verify(cityService, after(1000).times(1)).create(cityCreateDto) + + assertEquals(1.0, metricRegistry.get("kafka_consumer_city_create").counter().count()) + } +} \ No newline at end of file diff --git a/src/test/resources/application_feature.yml b/src/test/resources/application_db.yml similarity index 93% rename from src/test/resources/application_feature.yml rename to src/test/resources/application_db.yml index 4fe5761..cd83601 100644 --- a/src/test/resources/application_feature.yml +++ b/src/test/resources/application_db.yml @@ -17,4 +17,5 @@ kafka: consumer: group-id: demo-consumer topics: demo-city-sync + auto-offset-reset: earliest auto-startup: false \ No newline at end of file diff --git a/src/test/resources/application_kfk.yml b/src/test/resources/application_kfk.yml new file mode 100644 index 0000000..81ae94d --- /dev/null +++ b/src/test/resources/application_kfk.yml @@ -0,0 +1,21 @@ +--- +spring: + datasource: + url: jdbc:tc:postgresql:14-alpine:///test + hikari: + maximum-pool-size: 2 + driver-class-name: org.testcontainers.jdbc.ContainerDatabaseDriver + jpa: + hibernate: + ddl-auto: create + +kafka: + bootstrap-servers: localhost:3392 + producer: + product: + default-sync-topic: demo-product-sync + consumer: + group-id: demo-consumer + topics: demo-city-sync + auto-offset-reset: earliest + auto-startup: true \ No newline at end of file