mirror of
https://github.com/Dannecron/spring-boot-demo.git
synced 2025-12-25 16:22:35 +03:00
add custom prometheus metric
add tests for kafka consumer
This commit is contained in:
@@ -2,6 +2,7 @@ package com.example.demo.config
|
|||||||
|
|
||||||
import com.example.demo.services.database.city.CityService
|
import com.example.demo.services.database.city.CityService
|
||||||
import com.example.demo.services.kafka.Consumer
|
import com.example.demo.services.kafka.Consumer
|
||||||
|
import io.micrometer.core.instrument.MeterRegistry
|
||||||
import org.apache.kafka.clients.consumer.ConsumerConfig
|
import org.apache.kafka.clients.consumer.ConsumerConfig
|
||||||
import org.apache.kafka.common.serialization.StringDeserializer
|
import org.apache.kafka.common.serialization.StringDeserializer
|
||||||
import org.springframework.beans.factory.annotation.Autowired
|
import org.springframework.beans.factory.annotation.Autowired
|
||||||
@@ -18,15 +19,20 @@ class KafkaConsumerConfig(
|
|||||||
@Bean
|
@Bean
|
||||||
fun consumer(
|
fun consumer(
|
||||||
@Autowired cityService: CityService,
|
@Autowired cityService: CityService,
|
||||||
): Consumer = Consumer(cityService)
|
@Autowired metricRegistry: MeterRegistry
|
||||||
|
): Consumer = Consumer(
|
||||||
|
cityService = cityService,
|
||||||
|
metricRegistry = metricRegistry,
|
||||||
|
)
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
fun consumerFactory(): ConsumerFactory<String, String> {
|
fun consumerFactory(): ConsumerFactory<String, String> {
|
||||||
val configs = mapOf(
|
val configs = mapOf(
|
||||||
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaProperties.bootstrapServers,
|
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaProperties.bootstrapServers,
|
||||||
ConsumerConfig.GROUP_ID_CONFIG to kafkaProperties.consumer.groupId,
|
ConsumerConfig.GROUP_ID_CONFIG to kafkaProperties.consumer.groupId,
|
||||||
|
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to kafkaProperties.consumer.autoOffsetReset,
|
||||||
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
|
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
|
||||||
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
|
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java
|
||||||
)
|
)
|
||||||
|
|
||||||
return DefaultKafkaConsumerFactory(configs)
|
return DefaultKafkaConsumerFactory(configs)
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ data class KafkaProperties @ConstructorBinding constructor(
|
|||||||
val groupId: String,
|
val groupId: String,
|
||||||
val topics: String,
|
val topics: String,
|
||||||
val autoStartup: Boolean,
|
val autoStartup: Boolean,
|
||||||
|
val autoOffsetReset: String,
|
||||||
)
|
)
|
||||||
|
|
||||||
data class Validation(
|
data class Validation(
|
||||||
|
|||||||
@@ -2,6 +2,8 @@ package com.example.demo.services.kafka
|
|||||||
|
|
||||||
import com.example.demo.services.database.city.CityService
|
import com.example.demo.services.database.city.CityService
|
||||||
import com.example.demo.services.kafka.dto.CityCreateDto
|
import com.example.demo.services.kafka.dto.CityCreateDto
|
||||||
|
import io.micrometer.core.instrument.Counter
|
||||||
|
import io.micrometer.core.instrument.MeterRegistry
|
||||||
import kotlinx.serialization.json.Json
|
import kotlinx.serialization.json.Json
|
||||||
import org.springframework.kafka.annotation.KafkaListener
|
import org.springframework.kafka.annotation.KafkaListener
|
||||||
import org.springframework.messaging.handler.annotation.Payload
|
import org.springframework.messaging.handler.annotation.Payload
|
||||||
@@ -10,6 +12,7 @@ import org.springframework.stereotype.Component
|
|||||||
@Component
|
@Component
|
||||||
class Consumer(
|
class Consumer(
|
||||||
private val cityService: CityService,
|
private val cityService: CityService,
|
||||||
|
private val metricRegistry: MeterRegistry,
|
||||||
) {
|
) {
|
||||||
@KafkaListener(
|
@KafkaListener(
|
||||||
topics = ["#{'\${kafka.consumer.topics}'.split(',')}"],
|
topics = ["#{'\${kafka.consumer.topics}'.split(',')}"],
|
||||||
@@ -17,6 +20,13 @@ class Consumer(
|
|||||||
)
|
)
|
||||||
fun handleCityCreate(@Payload message: String) {
|
fun handleCityCreate(@Payload message: String) {
|
||||||
val cityCreateDto = Json.decodeFromString<CityCreateDto>(message)
|
val cityCreateDto = Json.decodeFromString<CityCreateDto>(message)
|
||||||
|
.also {
|
||||||
|
val counter = Counter.builder("kafka_consumer_city_create")
|
||||||
|
.description("consumed created city event")
|
||||||
|
.register(metricRegistry)
|
||||||
|
counter.increment()
|
||||||
|
}
|
||||||
|
|
||||||
cityService.create(cityCreateDto)
|
cityService.create(cityCreateDto)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -29,6 +29,7 @@ kafka:
|
|||||||
consumer:
|
consumer:
|
||||||
group-id: demo-consumer
|
group-id: demo-consumer
|
||||||
topics: demo-city-sync
|
topics: demo-city-sync
|
||||||
|
auto-offset-reset: none
|
||||||
auto-startup: true
|
auto-startup: true
|
||||||
validation:
|
validation:
|
||||||
schema:
|
schema:
|
||||||
|
|||||||
@@ -8,12 +8,12 @@ import org.springframework.data.jdbc.repository.config.EnableJdbcRepositories
|
|||||||
import org.springframework.test.context.ActiveProfiles
|
import org.springframework.test.context.ActiveProfiles
|
||||||
import org.testcontainers.junit.jupiter.Testcontainers
|
import org.testcontainers.junit.jupiter.Testcontainers
|
||||||
|
|
||||||
@ActiveProfiles("feature")
|
@ActiveProfiles("db")
|
||||||
@DataJdbcTest
|
@DataJdbcTest
|
||||||
@Testcontainers(disabledWithoutDocker = false)
|
@Testcontainers(disabledWithoutDocker = false)
|
||||||
@AutoConfigureTestDatabase(replace = AutoConfigureTestDatabase.Replace.NONE)
|
@AutoConfigureTestDatabase(replace = AutoConfigureTestDatabase.Replace.NONE)
|
||||||
@EnableJdbcRepositories
|
@EnableJdbcRepositories
|
||||||
class BaseFeatureTest {
|
class BaseDbTest {
|
||||||
@MockBean
|
@MockBean
|
||||||
lateinit var producer: Producer
|
lateinit var producer: Producer
|
||||||
}
|
}
|
||||||
@@ -24,6 +24,7 @@ open class BaseUnitTest {
|
|||||||
groupId = "group",
|
groupId = "group",
|
||||||
topics = "topic",
|
topics = "topic",
|
||||||
autoStartup = false,
|
autoStartup = false,
|
||||||
|
autoOffsetReset = "none",
|
||||||
),
|
),
|
||||||
validation = KafkaProperties.Validation(
|
validation = KafkaProperties.Validation(
|
||||||
schema = mapOf("product-sync" to "foo"),
|
schema = mapOf("product-sync" to "foo"),
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
package com.example.demo.services.database.city
|
package com.example.demo.services.database.city
|
||||||
|
|
||||||
import com.example.demo.BaseFeatureTest
|
import com.example.demo.BaseDbTest
|
||||||
import com.example.demo.models.City
|
import com.example.demo.models.City
|
||||||
import com.example.demo.providers.CityRepository
|
import com.example.demo.providers.CityRepository
|
||||||
import com.example.demo.services.database.city.exceptions.CityNotFoundException
|
import com.example.demo.services.database.city.exceptions.CityNotFoundException
|
||||||
@@ -12,7 +12,7 @@ import java.util.*
|
|||||||
import kotlin.test.*
|
import kotlin.test.*
|
||||||
|
|
||||||
@ContextConfiguration(classes = [CityRepository::class, CityServiceImpl::class])
|
@ContextConfiguration(classes = [CityRepository::class, CityServiceImpl::class])
|
||||||
class CityServiceImplFeatureTest: BaseFeatureTest() {
|
class CityServiceImplDbTest: BaseDbTest() {
|
||||||
@Autowired
|
@Autowired
|
||||||
private lateinit var cityRepository: CityRepository
|
private lateinit var cityRepository: CityRepository
|
||||||
@Autowired
|
@Autowired
|
||||||
@@ -1,6 +1,6 @@
|
|||||||
package com.example.demo.services.database.product
|
package com.example.demo.services.database.product
|
||||||
|
|
||||||
import com.example.demo.BaseFeatureTest
|
import com.example.demo.BaseDbTest
|
||||||
import com.example.demo.models.Product
|
import com.example.demo.models.Product
|
||||||
import com.example.demo.providers.ProductRepository
|
import com.example.demo.providers.ProductRepository
|
||||||
import com.example.demo.services.database.exceptions.AlreadyDeletedException
|
import com.example.demo.services.database.exceptions.AlreadyDeletedException
|
||||||
@@ -12,7 +12,7 @@ import java.util.*
|
|||||||
import kotlin.test.*
|
import kotlin.test.*
|
||||||
|
|
||||||
@ContextConfiguration(classes = [ProductRepository::class])
|
@ContextConfiguration(classes = [ProductRepository::class])
|
||||||
class ProductServiceImplFeatureTest: BaseFeatureTest() {
|
class ProductServiceImplDbTest: BaseDbTest() {
|
||||||
private lateinit var productService: ProductServiceImpl
|
private lateinit var productService: ProductServiceImpl
|
||||||
@Autowired
|
@Autowired
|
||||||
private lateinit var productRepository: ProductRepository
|
private lateinit var productRepository: ProductRepository
|
||||||
@@ -0,0 +1,79 @@
|
|||||||
|
package com.example.demo.services.kafka
|
||||||
|
|
||||||
|
import com.example.demo.models.City
|
||||||
|
import com.example.demo.services.database.city.CityService
|
||||||
|
import com.example.demo.services.kafka.dto.CityCreateDto
|
||||||
|
import io.micrometer.core.instrument.MeterRegistry
|
||||||
|
import kotlinx.serialization.encodeToString
|
||||||
|
import kotlinx.serialization.json.Json
|
||||||
|
import org.mockito.kotlin.after
|
||||||
|
import org.mockito.kotlin.doReturn
|
||||||
|
import org.mockito.kotlin.verify
|
||||||
|
import org.mockito.kotlin.whenever
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired
|
||||||
|
import org.springframework.boot.test.context.SpringBootTest
|
||||||
|
import org.springframework.boot.test.mock.mockito.MockBean
|
||||||
|
import org.springframework.kafka.core.KafkaTemplate
|
||||||
|
import org.springframework.kafka.support.KafkaHeaders
|
||||||
|
import org.springframework.kafka.test.context.EmbeddedKafka
|
||||||
|
import org.springframework.messaging.Message
|
||||||
|
import org.springframework.messaging.support.MessageBuilder
|
||||||
|
import org.springframework.test.annotation.DirtiesContext
|
||||||
|
import org.springframework.test.context.ActiveProfiles
|
||||||
|
import java.time.OffsetDateTime
|
||||||
|
import java.time.format.DateTimeFormatter
|
||||||
|
import java.util.*
|
||||||
|
import kotlin.test.Test
|
||||||
|
import kotlin.test.assertEquals
|
||||||
|
|
||||||
|
@ActiveProfiles("kafka")
|
||||||
|
@SpringBootTest
|
||||||
|
@EmbeddedKafka(
|
||||||
|
brokerProperties = ["listeners=PLAINTEXT://localhost:3392", "port=3392"],
|
||||||
|
topics = ["demo-city-sync"],
|
||||||
|
partitions = 1,
|
||||||
|
)
|
||||||
|
@DirtiesContext
|
||||||
|
class ConsumerKfkTest(
|
||||||
|
@Autowired private val kafkaTemplate: KafkaTemplate<String, Any>,
|
||||||
|
@Autowired private val metricRegistry: MeterRegistry,
|
||||||
|
) {
|
||||||
|
@MockBean
|
||||||
|
private lateinit var cityService: CityService
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun consumer_handleCityCreate() {
|
||||||
|
val cityGuid = UUID.randomUUID()
|
||||||
|
val cityName = "new-city"
|
||||||
|
val createdAt = OffsetDateTime.now().minusDays(1)
|
||||||
|
val cityCreateDto = CityCreateDto(
|
||||||
|
guid = cityGuid.toString(),
|
||||||
|
name = cityName,
|
||||||
|
createdAt = createdAt.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME),
|
||||||
|
updatedAt = null,
|
||||||
|
deletedAt = null,
|
||||||
|
)
|
||||||
|
|
||||||
|
whenever(cityService.create(cityCreateDto)) doReturn City(
|
||||||
|
id = 123,
|
||||||
|
guid = cityGuid,
|
||||||
|
name = cityName,
|
||||||
|
createdAt = createdAt,
|
||||||
|
updatedAt = null,
|
||||||
|
deletedAt = null,
|
||||||
|
)
|
||||||
|
|
||||||
|
val message: Message<String> = MessageBuilder
|
||||||
|
.withPayload(
|
||||||
|
Json.encodeToString(cityCreateDto)
|
||||||
|
)
|
||||||
|
.setHeader(KafkaHeaders.TOPIC, "demo-city-sync")
|
||||||
|
.build()
|
||||||
|
|
||||||
|
kafkaTemplate.send(message)
|
||||||
|
|
||||||
|
verify(cityService, after(1000).times(1)).create(cityCreateDto)
|
||||||
|
|
||||||
|
assertEquals(1.0, metricRegistry.get("kafka_consumer_city_create").counter().count())
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -17,4 +17,5 @@ kafka:
|
|||||||
consumer:
|
consumer:
|
||||||
group-id: demo-consumer
|
group-id: demo-consumer
|
||||||
topics: demo-city-sync
|
topics: demo-city-sync
|
||||||
|
auto-offset-reset: earliest
|
||||||
auto-startup: false
|
auto-startup: false
|
||||||
21
src/test/resources/application_kfk.yml
Normal file
21
src/test/resources/application_kfk.yml
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
---
|
||||||
|
spring:
|
||||||
|
datasource:
|
||||||
|
url: jdbc:tc:postgresql:14-alpine:///test
|
||||||
|
hikari:
|
||||||
|
maximum-pool-size: 2
|
||||||
|
driver-class-name: org.testcontainers.jdbc.ContainerDatabaseDriver
|
||||||
|
jpa:
|
||||||
|
hibernate:
|
||||||
|
ddl-auto: create
|
||||||
|
|
||||||
|
kafka:
|
||||||
|
bootstrap-servers: localhost:3392
|
||||||
|
producer:
|
||||||
|
product:
|
||||||
|
default-sync-topic: demo-product-sync
|
||||||
|
consumer:
|
||||||
|
group-id: demo-consumer
|
||||||
|
topics: demo-city-sync
|
||||||
|
auto-offset-reset: earliest
|
||||||
|
auto-startup: true
|
||||||
Reference in New Issue
Block a user