diff --git a/ballcat-starters/ballcat-spring-boot-starter-kafka/src/main/java/com/hccake/starter/kafka/KafkaAutoConfiguration.java b/ballcat-starters/ballcat-spring-boot-starter-kafka/src/main/java/com/hccake/starter/kafka/KafkaAutoConfiguration.java index 24087f0f..1800fa62 100644 --- a/ballcat-starters/ballcat-spring-boot-starter-kafka/src/main/java/com/hccake/starter/kafka/KafkaAutoConfiguration.java +++ b/ballcat-starters/ballcat-spring-boot-starter-kafka/src/main/java/com/hccake/starter/kafka/KafkaAutoConfiguration.java @@ -1,11 +1,14 @@ package com.hccake.starter.kafka; +import com.hccake.extend.kafka.KafkaConsumerBuilder; import com.hccake.extend.kafka.KafkaExtendProducer; import com.hccake.extend.kafka.KafkaProducerBuilder; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Scope; /** * @author lingting 2020/7/28 21:17 @@ -16,20 +19,23 @@ import org.springframework.context.annotation.Bean; public class KafkaAutoConfiguration { @Bean + @ConditionalOnMissingBean(KafkaExtendProducer.class) public KafkaExtendProducer stringKafkaExtendProducer(KafkaProperties properties) { KafkaProducerBuilder builder = new KafkaProducerBuilder() .addAllBootstrapServers(properties.getBootstrapServers()).putAll(properties.getExtend()); - if (properties.getKeySerializer() != null || properties.getKeySerializerClassName() != null) { - builder.keySerializer(properties.getKeySerializer() == null ? properties.getKeySerializerClassName() - : properties.getKeySerializer().getName()); - } - - if (properties.getValueSerializer() != null || properties.getValueSerializerClassName() != null) { - builder.valueSerializer(properties.getValueSerializer() == null ? properties.getValueSerializerClassName() - : properties.getValueSerializer().getName()); - } + builder.keySerializer(properties.getKeySerializerClassName()); + builder.valueSerializer(properties.getValueSerializerClassName()); return builder.build(); } + @Bean + @Scope("prototype") + @ConditionalOnMissingBean(KafkaConsumerBuilder.class) + public KafkaConsumerBuilder consumerBuilder(KafkaProperties properties) { + return new KafkaConsumerBuilder().addAllBootstrapServers(properties.getBootstrapServers()) + .keyDeserializer(properties.getKeyDeserializerClassName()) + .valueDeserializer(properties.getValueDeserializerClassName()).groupId(properties.getGroupId()); + } + } diff --git a/ballcat-starters/ballcat-spring-boot-starter-kafka/src/main/java/com/hccake/starter/kafka/KafkaProperties.java b/ballcat-starters/ballcat-spring-boot-starter-kafka/src/main/java/com/hccake/starter/kafka/KafkaProperties.java index d80ade7e..1031ea2f 100644 --- a/ballcat-starters/ballcat-spring-boot-starter-kafka/src/main/java/com/hccake/starter/kafka/KafkaProperties.java +++ b/ballcat-starters/ballcat-spring-boot-starter-kafka/src/main/java/com/hccake/starter/kafka/KafkaProperties.java @@ -1,13 +1,15 @@ package com.hccake.starter.kafka; -import lombok.Data; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.springframework.boot.context.properties.ConfigurationProperties; - +import cn.hutool.core.util.StrUtil; import java.util.HashMap; import java.util.Map; import java.util.Set; +import lombok.Data; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.boot.context.properties.ConfigurationProperties; /** * @author lingting 2020/7/28 21:15 @@ -16,6 +18,11 @@ import java.util.Set; @ConfigurationProperties(prefix = "ballcat.starter.kafka") public class KafkaProperties { + /** + * 用于指定分组 + */ + private String groupId; + /** * 所属服务 */ @@ -27,7 +34,7 @@ public class KafkaProperties { private Class> keySerializer = StringSerializer.class; /** - * key 序列化 类名 + * key 序列化 类名, 如果填写本值则 valueSerializer 无效 */ private String keySerializerClassName; @@ -37,13 +44,61 @@ public class KafkaProperties { private Class> valueSerializer = StringSerializer.class; /** - * value 序列化 类名 + * value 序列化 类名, 如果填写本值则 valueSerializer 无效 */ private String valueSerializerClassName; + /** + * key 反序列化 + */ + private Class> keyDeserializer = StringDeserializer.class; + + /** + * key 反序列化 类名, 如果填写本值则 keyDeserializer 无效 + */ + private String keyDeserializerClassName; + + /** + * value 反序列化 + */ + private Class> valueDeserializer = StringDeserializer.class; + + /** + * value 反序列化 类名, 如果填写本值则 valueDeserializer 无效 + */ + private String valueDeserializerClassName; + /** * 额外参数 */ private Map extend = new HashMap<>(); + public String getKeyDeserializerClassName() { + if (StrUtil.isNotEmpty(keyDeserializerClassName)) { + return keyDeserializerClassName; + } + return getKeyDeserializer().getName(); + } + + public String getValueDeserializerClassName() { + if (StrUtil.isNotEmpty(valueDeserializerClassName)) { + return valueDeserializerClassName; + } + return getValueDeserializer().getName(); + } + + public String getKeySerializerClassName() { + if (StrUtil.isNotEmpty(keySerializerClassName)) { + return keySerializerClassName; + } + return getKeySerializer().getName(); + } + + public String getValueSerializerClassName() { + if (StrUtil.isNotEmpty(valueSerializerClassName)) { + return valueSerializerClassName; + } + return getValueSerializer().getName(); + } + }