add kafka producer, move configs to separate package

This commit is contained in:
Denis Savosin
2024-10-02 10:32:55 +07:00
parent 1fa8b7051e
commit cbf7e0a5f6
15 changed files with 186 additions and 15 deletions

View File

@@ -32,12 +32,14 @@ dependencies {
implementation("org.springframework.boot:spring-boot-starter-mustache")
implementation("org.springframework.boot:spring-boot-starter-validation")
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.kafka:spring-kafka")
developmentOnly("org.springframework.boot:spring-boot-devtools")
testImplementation("org.jetbrains.kotlin:kotlin-test-junit5")
testImplementation("org.mockito.kotlin:mockito-kotlin:5.4.0")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.springframework.kafka:spring-kafka-test")
testImplementation("org.testcontainers:junit-jupiter")
testImplementation("org.testcontainers:testcontainers")
testImplementation("org.testcontainers:postgresql")

View File

@@ -1,8 +1,8 @@
Some simple checklist for this demo repo:
- [x] connect to database
- [x] make migrations
- [ ] make repository and models
- [ ] make crud api
- [x] make repository and models
- [x] make crud api
- [ ] make openapi documentation generation
- [ ] connect to kafka to produce events
- [ ] make kafka consumer

View File

@@ -1,4 +1,4 @@
package com.example.demo
package com.example.demo.config
import com.example.demo.provider.CityRepository
import com.example.demo.provider.MockedShopProvider
@@ -8,19 +8,31 @@ import com.example.demo.services.CityService
import com.example.demo.services.CityServiceImpl
import com.example.demo.services.ProductService
import com.example.demo.services.ProductServiceImpl
import com.example.demo.services.kafka.Producer
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
@Configuration
class AppConfig {
class AppConfig(
@Value("\${kafka.producer.product.default-sync-topic}")
private val defaultProductSyncTopic: String
) {
@Bean
fun shopProvider(): ShopProvider{
return MockedShopProvider()
}
@Bean
fun productService(@Autowired productRepository: ProductRepository): ProductService = ProductServiceImpl(productRepository)
fun productService(
@Autowired productRepository: ProductRepository,
@Autowired producer: Producer,
): ProductService = ProductServiceImpl(
defaultProductSyncTopic,
productRepository,
producer,
)
@Bean
fun cityService(@Autowired cityRepository: CityRepository): CityService = CityServiceImpl(cityRepository)

View File

@@ -0,0 +1,40 @@
package com.example.demo.config
import com.example.demo.services.kafka.Producer
import com.example.demo.services.kafka.ProducerImpl
import com.example.demo.services.kafka.dto.serializer.ProductSerializer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringSerializer
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.core.ProducerFactory
@Configuration
class ProducerConfig(
@Value("\${kafka.bootstrap-servers}")
val servers: String
) {
@Bean
fun producerFactory(): ProducerFactory<String, Any> {
val configProps: MutableMap<String, Any> = HashMap()
configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = servers
configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = ProductSerializer::class.java
return DefaultKafkaProducerFactory(configProps)
}
@Bean
fun kafkaTemplate(): KafkaTemplate<String, Any> = KafkaTemplate(
producerFactory(),
)
@Bean
fun producer(): Producer = ProducerImpl(
kafkaTemplate(),
)
}

View File

@@ -5,6 +5,7 @@ import com.example.demo.exceptions.UnprocessableException
import com.example.demo.requests.CreateProductRequest
import com.example.demo.responses.makeOkResponse
import com.example.demo.services.ProductService
import com.example.demo.services.kafka.exceptions.InvalidArgumentException
import jakarta.validation.Valid
import org.springframework.http.HttpStatus
import org.springframework.http.MediaType
@@ -28,6 +29,22 @@ class ProductController(
return ResponseEntity(product, HttpStatus.OK)
}
@PostMapping("/{guid}/sync")
@ResponseBody
@Throws(NotFoundException::class)
fun syncProductToKafka(
@PathVariable guid: UUID,
@RequestParam(required = false) topic: String?
): ResponseEntity<Any> {
try {
productService.syncToKafka(guid, topic)
} catch (exception: InvalidArgumentException) {
throw UnprocessableException("cannot sync product to kafka")
}
return ResponseEntity(makeOkResponse(), HttpStatus.OK)
}
@PostMapping(value = [""], consumes = [MediaType.APPLICATION_JSON_VALUE])
@ResponseBody
fun createProduct(

View File

@@ -3,6 +3,7 @@ package com.example.demo.services
import com.example.demo.exceptions.NotFoundException
import com.example.demo.exceptions.UnprocessableException
import com.example.demo.models.Product
import com.example.demo.services.kafka.exceptions.InvalidArgumentException
import org.springframework.stereotype.Service
import java.util.*
@@ -14,4 +15,7 @@ interface ProductService {
@Throws(NotFoundException::class, UnprocessableException::class)
fun delete(guid: UUID): Product?
@Throws(NotFoundException::class, InvalidArgumentException::class)
fun syncToKafka(guid: UUID, topic: String?)
}

View File

@@ -4,11 +4,14 @@ import com.example.demo.exceptions.NotFoundException
import com.example.demo.exceptions.UnprocessableException
import com.example.demo.models.Product
import com.example.demo.provider.ProductRepository
import com.example.demo.services.kafka.Producer
import java.time.OffsetDateTime
import java.util.*
class ProductServiceImpl(
private val defaultSyncTopic: String,
private val productRepository: ProductRepository,
private val producer: Producer,
): ProductService {
override fun findByGuid(guid: UUID): Product? = productRepository.findByGuid(guid)
@@ -47,4 +50,11 @@ class ProductServiceImpl(
return productRepository.save(deletedProduct)
}
override fun syncToKafka(guid: UUID, topic: String?) {
val product = findByGuid(guid) ?: throw NotFoundException()
producer.produceProductInfo(topic ?: defaultSyncTopic, product)
}
}

View File

@@ -0,0 +1,9 @@
package com.example.demo.services.kafka
import com.example.demo.models.Product
import com.example.demo.services.kafka.exceptions.InvalidArgumentException
interface Producer {
@Throws(InvalidArgumentException::class)
fun produceProductInfo(topicName: String, product: Product)
}

View File

@@ -0,0 +1,24 @@
package com.example.demo.services.kafka
import com.example.demo.models.Product
import com.example.demo.services.kafka.dto.ProductDto
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.support.KafkaHeaders
import org.springframework.messaging.Message
import org.springframework.messaging.support.MessageBuilder
import org.springframework.stereotype.Service
@Service
class ProducerImpl(
private val kafkaTemplate: KafkaTemplate<String, Any>
): Producer {
override fun produceProductInfo(topicName: String, product: Product) {
val message: Message<ProductDto> = MessageBuilder
.withPayload(ProductDto(product))
.setHeader(KafkaHeaders.TOPIC, topicName)
.setHeader("X-Custom-Header", "some-custom-header")
.build()
kafkaTemplate.send(message)
}
}

View File

@@ -0,0 +1,28 @@
package com.example.demo.services.kafka.dto
import com.example.demo.models.Product
import com.example.demo.services.kafka.exceptions.InvalidArgumentException
import java.time.format.DateTimeFormatter
data class ProductDto(
val id: Long,
val guid: String,
val name: String,
val description: String?,
val price: Long,
val createdAt: String,
val updatedAt: String?,
val deletedAt: String?,
) {
@Throws(InvalidArgumentException::class)
constructor(product: Product) : this(
id = product.id ?: throw InvalidArgumentException("product.id"),
guid = product.guid.toString(),
name = product.name,
description = product.description,
price = product.price,
createdAt = product.createdAt.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME),
updatedAt = product.updatedAt?.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME),
deletedAt = product.deletedAt?.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME),
)
}

View File

@@ -0,0 +1,20 @@
package com.example.demo.services.kafka.dto.serializer
import com.example.demo.services.kafka.dto.ProductDto
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.kafka.common.serialization.Serializer
import javax.sql.rowset.serial.SerialException
class ProductSerializer: Serializer<ProductDto> {
private val objectMapper = ObjectMapper()
override fun serialize(topic: String?, data: ProductDto?): ByteArray {
return objectMapper.writeValueAsBytes(
data ?: throw SerialException()
)
}
override fun close() {
// no logic
}
}

View File

@@ -0,0 +1,3 @@
package com.example.demo.services.kafka.exceptions
class InvalidArgumentException(argName: String): RuntimeException("invalid argument $argName")

View File

@@ -14,3 +14,9 @@ spring:
locations: classpath:db/migration/structure, classpath:db/migration/data # the location where flyway should look for migration scripts
validate-on-migrate: true
default-schema: public
kafka:
bootstrap-servers: localhost:9095
producer:
product:
default-sync-topic: demo-product-sync

View File

@@ -1,10 +1,10 @@
package com.example.demo
import org.springframework.beans.factory.annotation.Autowired
import com.example.demo.services.kafka.Producer
import org.springframework.boot.test.autoconfigure.data.jdbc.DataJdbcTest
import org.springframework.boot.test.autoconfigure.jdbc.AutoConfigureTestDatabase
import org.springframework.boot.test.mock.mockito.MockBean
import org.springframework.data.jdbc.repository.config.EnableJdbcRepositories
import org.springframework.jdbc.core.JdbcTemplate
import org.springframework.test.context.ActiveProfiles
import org.testcontainers.junit.jupiter.Testcontainers
@@ -14,6 +14,6 @@ import org.testcontainers.junit.jupiter.Testcontainers
@AutoConfigureTestDatabase(replace = AutoConfigureTestDatabase.Replace.NONE)
@EnableJdbcRepositories
class BaseFeatureTest {
@Autowired
lateinit var jdbcTemplate: JdbcTemplate
@MockBean
private lateinit var producer: Producer
}

View File

@@ -5,11 +5,7 @@ import com.example.demo.models.Product
import com.example.demo.provider.ProductRepository
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.test.context.ContextConfiguration
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertNotNull
import kotlin.test.assertTrue
import kotlin.test.assertFalse
import kotlin.test.*
@ContextConfiguration(classes = [ProductRepository::class, ProductServiceImpl::class])
class ProductServiceImplTest: BaseFeatureTest() {