From cbf7e0a5f6253ff02b6d680a09c8cab9144e96b3 Mon Sep 17 00:00:00 2001 From: Denis Savosin Date: Wed, 2 Oct 2024 10:32:55 +0700 Subject: [PATCH] add kafka producer, move configs to separate package --- build.gradle.kts | 2 + checklist.md | 4 +- .../example/demo/{ => config}/AppConfig.kt | 18 +++++++-- .../com/example/demo/config/ProducerConfig.kt | 40 +++++++++++++++++++ .../demo/controllers/ProductController.kt | 17 ++++++++ .../example/demo/services/ProductService.kt | 4 ++ .../demo/services/ProductServiceImpl.kt | 10 +++++ .../example/demo/services/kafka/Producer.kt | 9 +++++ .../demo/services/kafka/ProducerImpl.kt | 24 +++++++++++ .../demo/services/kafka/dto/ProductDto.kt | 28 +++++++++++++ .../kafka/dto/serializer/ProductSerializer.kt | 20 ++++++++++ .../exceptions/InvalidArgumentException.kt | 3 ++ src/main/resources/application.yml | 8 +++- .../com/example/demo/BaseFeatureTest.kt | 8 ++-- .../demo/services/ProductServiceImplTest.kt | 6 +-- 15 files changed, 186 insertions(+), 15 deletions(-) rename src/main/kotlin/com/example/demo/{ => config}/AppConfig.kt (60%) create mode 100644 src/main/kotlin/com/example/demo/config/ProducerConfig.kt create mode 100644 src/main/kotlin/com/example/demo/services/kafka/Producer.kt create mode 100644 src/main/kotlin/com/example/demo/services/kafka/ProducerImpl.kt create mode 100644 src/main/kotlin/com/example/demo/services/kafka/dto/ProductDto.kt create mode 100644 src/main/kotlin/com/example/demo/services/kafka/dto/serializer/ProductSerializer.kt create mode 100644 src/main/kotlin/com/example/demo/services/kafka/exceptions/InvalidArgumentException.kt diff --git a/build.gradle.kts b/build.gradle.kts index 5dae183..366bb12 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -32,12 +32,14 @@ dependencies { implementation("org.springframework.boot:spring-boot-starter-mustache") implementation("org.springframework.boot:spring-boot-starter-validation") implementation("org.springframework.boot:spring-boot-starter-web") + implementation("org.springframework.kafka:spring-kafka") developmentOnly("org.springframework.boot:spring-boot-devtools") testImplementation("org.jetbrains.kotlin:kotlin-test-junit5") testImplementation("org.mockito.kotlin:mockito-kotlin:5.4.0") testImplementation("org.springframework.boot:spring-boot-starter-test") + testImplementation("org.springframework.kafka:spring-kafka-test") testImplementation("org.testcontainers:junit-jupiter") testImplementation("org.testcontainers:testcontainers") testImplementation("org.testcontainers:postgresql") diff --git a/checklist.md b/checklist.md index f5207b1..d5fe223 100644 --- a/checklist.md +++ b/checklist.md @@ -1,8 +1,8 @@ Some simple checklist for this demo repo: - [x] connect to database - [x] make migrations -- [ ] make repository and models -- [ ] make crud api +- [x] make repository and models +- [x] make crud api - [ ] make openapi documentation generation - [ ] connect to kafka to produce events - [ ] make kafka consumer diff --git a/src/main/kotlin/com/example/demo/AppConfig.kt b/src/main/kotlin/com/example/demo/config/AppConfig.kt similarity index 60% rename from src/main/kotlin/com/example/demo/AppConfig.kt rename to src/main/kotlin/com/example/demo/config/AppConfig.kt index 0035e04..560b86e 100644 --- a/src/main/kotlin/com/example/demo/AppConfig.kt +++ b/src/main/kotlin/com/example/demo/config/AppConfig.kt @@ -1,4 +1,4 @@ -package com.example.demo +package com.example.demo.config import com.example.demo.provider.CityRepository import com.example.demo.provider.MockedShopProvider @@ -8,19 +8,31 @@ import com.example.demo.services.CityService import com.example.demo.services.CityServiceImpl import com.example.demo.services.ProductService import com.example.demo.services.ProductServiceImpl +import com.example.demo.services.kafka.Producer import org.springframework.beans.factory.annotation.Autowired +import org.springframework.beans.factory.annotation.Value import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration @Configuration -class AppConfig { +class AppConfig( + @Value("\${kafka.producer.product.default-sync-topic}") + private val defaultProductSyncTopic: String +) { @Bean fun shopProvider(): ShopProvider{ return MockedShopProvider() } @Bean - fun productService(@Autowired productRepository: ProductRepository): ProductService = ProductServiceImpl(productRepository) + fun productService( + @Autowired productRepository: ProductRepository, + @Autowired producer: Producer, + ): ProductService = ProductServiceImpl( + defaultProductSyncTopic, + productRepository, + producer, + ) @Bean fun cityService(@Autowired cityRepository: CityRepository): CityService = CityServiceImpl(cityRepository) diff --git a/src/main/kotlin/com/example/demo/config/ProducerConfig.kt b/src/main/kotlin/com/example/demo/config/ProducerConfig.kt new file mode 100644 index 0000000..c7e29ab --- /dev/null +++ b/src/main/kotlin/com/example/demo/config/ProducerConfig.kt @@ -0,0 +1,40 @@ +package com.example.demo.config + +import com.example.demo.services.kafka.Producer +import com.example.demo.services.kafka.ProducerImpl +import com.example.demo.services.kafka.dto.serializer.ProductSerializer +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.serialization.StringSerializer +import org.springframework.beans.factory.annotation.Value +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.core.ProducerFactory + +@Configuration +class ProducerConfig( + @Value("\${kafka.bootstrap-servers}") + val servers: String +) { + @Bean + fun producerFactory(): ProducerFactory { + val configProps: MutableMap = HashMap() + + configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = servers + configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java + configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = ProductSerializer::class.java + + return DefaultKafkaProducerFactory(configProps) + } + + @Bean + fun kafkaTemplate(): KafkaTemplate = KafkaTemplate( + producerFactory(), + ) + + @Bean + fun producer(): Producer = ProducerImpl( + kafkaTemplate(), + ) +} \ No newline at end of file diff --git a/src/main/kotlin/com/example/demo/controllers/ProductController.kt b/src/main/kotlin/com/example/demo/controllers/ProductController.kt index d19e68d..6ece5fc 100644 --- a/src/main/kotlin/com/example/demo/controllers/ProductController.kt +++ b/src/main/kotlin/com/example/demo/controllers/ProductController.kt @@ -5,6 +5,7 @@ import com.example.demo.exceptions.UnprocessableException import com.example.demo.requests.CreateProductRequest import com.example.demo.responses.makeOkResponse import com.example.demo.services.ProductService +import com.example.demo.services.kafka.exceptions.InvalidArgumentException import jakarta.validation.Valid import org.springframework.http.HttpStatus import org.springframework.http.MediaType @@ -28,6 +29,22 @@ class ProductController( return ResponseEntity(product, HttpStatus.OK) } + @PostMapping("/{guid}/sync") + @ResponseBody + @Throws(NotFoundException::class) + fun syncProductToKafka( + @PathVariable guid: UUID, + @RequestParam(required = false) topic: String? + ): ResponseEntity { + try { + productService.syncToKafka(guid, topic) + } catch (exception: InvalidArgumentException) { + throw UnprocessableException("cannot sync product to kafka") + } + + return ResponseEntity(makeOkResponse(), HttpStatus.OK) + } + @PostMapping(value = [""], consumes = [MediaType.APPLICATION_JSON_VALUE]) @ResponseBody fun createProduct( diff --git a/src/main/kotlin/com/example/demo/services/ProductService.kt b/src/main/kotlin/com/example/demo/services/ProductService.kt index 6b2e931..69f491c 100644 --- a/src/main/kotlin/com/example/demo/services/ProductService.kt +++ b/src/main/kotlin/com/example/demo/services/ProductService.kt @@ -3,6 +3,7 @@ package com.example.demo.services import com.example.demo.exceptions.NotFoundException import com.example.demo.exceptions.UnprocessableException import com.example.demo.models.Product +import com.example.demo.services.kafka.exceptions.InvalidArgumentException import org.springframework.stereotype.Service import java.util.* @@ -14,4 +15,7 @@ interface ProductService { @Throws(NotFoundException::class, UnprocessableException::class) fun delete(guid: UUID): Product? + + @Throws(NotFoundException::class, InvalidArgumentException::class) + fun syncToKafka(guid: UUID, topic: String?) } \ No newline at end of file diff --git a/src/main/kotlin/com/example/demo/services/ProductServiceImpl.kt b/src/main/kotlin/com/example/demo/services/ProductServiceImpl.kt index 89ff5fc..92c9a11 100644 --- a/src/main/kotlin/com/example/demo/services/ProductServiceImpl.kt +++ b/src/main/kotlin/com/example/demo/services/ProductServiceImpl.kt @@ -4,11 +4,14 @@ import com.example.demo.exceptions.NotFoundException import com.example.demo.exceptions.UnprocessableException import com.example.demo.models.Product import com.example.demo.provider.ProductRepository +import com.example.demo.services.kafka.Producer import java.time.OffsetDateTime import java.util.* class ProductServiceImpl( + private val defaultSyncTopic: String, private val productRepository: ProductRepository, + private val producer: Producer, ): ProductService { override fun findByGuid(guid: UUID): Product? = productRepository.findByGuid(guid) @@ -47,4 +50,11 @@ class ProductServiceImpl( return productRepository.save(deletedProduct) } + + override fun syncToKafka(guid: UUID, topic: String?) { + val product = findByGuid(guid) ?: throw NotFoundException() + + producer.produceProductInfo(topic ?: defaultSyncTopic, product) + } + } \ No newline at end of file diff --git a/src/main/kotlin/com/example/demo/services/kafka/Producer.kt b/src/main/kotlin/com/example/demo/services/kafka/Producer.kt new file mode 100644 index 0000000..15ed7c2 --- /dev/null +++ b/src/main/kotlin/com/example/demo/services/kafka/Producer.kt @@ -0,0 +1,9 @@ +package com.example.demo.services.kafka + +import com.example.demo.models.Product +import com.example.demo.services.kafka.exceptions.InvalidArgumentException + +interface Producer { + @Throws(InvalidArgumentException::class) + fun produceProductInfo(topicName: String, product: Product) +} \ No newline at end of file diff --git a/src/main/kotlin/com/example/demo/services/kafka/ProducerImpl.kt b/src/main/kotlin/com/example/demo/services/kafka/ProducerImpl.kt new file mode 100644 index 0000000..b9b6654 --- /dev/null +++ b/src/main/kotlin/com/example/demo/services/kafka/ProducerImpl.kt @@ -0,0 +1,24 @@ +package com.example.demo.services.kafka + +import com.example.demo.models.Product +import com.example.demo.services.kafka.dto.ProductDto +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.support.KafkaHeaders +import org.springframework.messaging.Message +import org.springframework.messaging.support.MessageBuilder +import org.springframework.stereotype.Service + +@Service +class ProducerImpl( + private val kafkaTemplate: KafkaTemplate +): Producer { + override fun produceProductInfo(topicName: String, product: Product) { + val message: Message = MessageBuilder + .withPayload(ProductDto(product)) + .setHeader(KafkaHeaders.TOPIC, topicName) + .setHeader("X-Custom-Header", "some-custom-header") + .build() + + kafkaTemplate.send(message) + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/example/demo/services/kafka/dto/ProductDto.kt b/src/main/kotlin/com/example/demo/services/kafka/dto/ProductDto.kt new file mode 100644 index 0000000..ce52d23 --- /dev/null +++ b/src/main/kotlin/com/example/demo/services/kafka/dto/ProductDto.kt @@ -0,0 +1,28 @@ +package com.example.demo.services.kafka.dto + +import com.example.demo.models.Product +import com.example.demo.services.kafka.exceptions.InvalidArgumentException +import java.time.format.DateTimeFormatter + +data class ProductDto( + val id: Long, + val guid: String, + val name: String, + val description: String?, + val price: Long, + val createdAt: String, + val updatedAt: String?, + val deletedAt: String?, +) { + @Throws(InvalidArgumentException::class) + constructor(product: Product) : this( + id = product.id ?: throw InvalidArgumentException("product.id"), + guid = product.guid.toString(), + name = product.name, + description = product.description, + price = product.price, + createdAt = product.createdAt.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME), + updatedAt = product.updatedAt?.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME), + deletedAt = product.deletedAt?.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME), + ) +} \ No newline at end of file diff --git a/src/main/kotlin/com/example/demo/services/kafka/dto/serializer/ProductSerializer.kt b/src/main/kotlin/com/example/demo/services/kafka/dto/serializer/ProductSerializer.kt new file mode 100644 index 0000000..d4e5c1b --- /dev/null +++ b/src/main/kotlin/com/example/demo/services/kafka/dto/serializer/ProductSerializer.kt @@ -0,0 +1,20 @@ +package com.example.demo.services.kafka.dto.serializer + +import com.example.demo.services.kafka.dto.ProductDto +import com.fasterxml.jackson.databind.ObjectMapper +import org.apache.kafka.common.serialization.Serializer +import javax.sql.rowset.serial.SerialException + +class ProductSerializer: Serializer { + private val objectMapper = ObjectMapper() + + override fun serialize(topic: String?, data: ProductDto?): ByteArray { + return objectMapper.writeValueAsBytes( + data ?: throw SerialException() + ) + } + + override fun close() { + // no logic + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/example/demo/services/kafka/exceptions/InvalidArgumentException.kt b/src/main/kotlin/com/example/demo/services/kafka/exceptions/InvalidArgumentException.kt new file mode 100644 index 0000000..0bfe3d0 --- /dev/null +++ b/src/main/kotlin/com/example/demo/services/kafka/exceptions/InvalidArgumentException.kt @@ -0,0 +1,3 @@ +package com.example.demo.services.kafka.exceptions + +class InvalidArgumentException(argName: String): RuntimeException("invalid argument $argName") \ No newline at end of file diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index c4d7a1e..08143fd 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -13,4 +13,10 @@ spring: enabled: true # enables flyway database migration locations: classpath:db/migration/structure, classpath:db/migration/data # the location where flyway should look for migration scripts validate-on-migrate: true - default-schema: public \ No newline at end of file + default-schema: public + +kafka: + bootstrap-servers: localhost:9095 + producer: + product: + default-sync-topic: demo-product-sync diff --git a/src/test/kotlin/com/example/demo/BaseFeatureTest.kt b/src/test/kotlin/com/example/demo/BaseFeatureTest.kt index a8e4569..f8582fa 100644 --- a/src/test/kotlin/com/example/demo/BaseFeatureTest.kt +++ b/src/test/kotlin/com/example/demo/BaseFeatureTest.kt @@ -1,10 +1,10 @@ package com.example.demo -import org.springframework.beans.factory.annotation.Autowired +import com.example.demo.services.kafka.Producer import org.springframework.boot.test.autoconfigure.data.jdbc.DataJdbcTest import org.springframework.boot.test.autoconfigure.jdbc.AutoConfigureTestDatabase +import org.springframework.boot.test.mock.mockito.MockBean import org.springframework.data.jdbc.repository.config.EnableJdbcRepositories -import org.springframework.jdbc.core.JdbcTemplate import org.springframework.test.context.ActiveProfiles import org.testcontainers.junit.jupiter.Testcontainers @@ -14,6 +14,6 @@ import org.testcontainers.junit.jupiter.Testcontainers @AutoConfigureTestDatabase(replace = AutoConfigureTestDatabase.Replace.NONE) @EnableJdbcRepositories class BaseFeatureTest { - @Autowired - lateinit var jdbcTemplate: JdbcTemplate + @MockBean + private lateinit var producer: Producer } \ No newline at end of file diff --git a/src/test/kotlin/com/example/demo/services/ProductServiceImplTest.kt b/src/test/kotlin/com/example/demo/services/ProductServiceImplTest.kt index e533ce4..9b697b2 100644 --- a/src/test/kotlin/com/example/demo/services/ProductServiceImplTest.kt +++ b/src/test/kotlin/com/example/demo/services/ProductServiceImplTest.kt @@ -5,11 +5,7 @@ import com.example.demo.models.Product import com.example.demo.provider.ProductRepository import org.springframework.beans.factory.annotation.Autowired import org.springframework.test.context.ContextConfiguration -import kotlin.test.Test -import kotlin.test.assertEquals -import kotlin.test.assertNotNull -import kotlin.test.assertTrue -import kotlin.test.assertFalse +import kotlin.test.* @ContextConfiguration(classes = [ProductRepository::class, ProductServiceImpl::class]) class ProductServiceImplTest: BaseFeatureTest() {