Merge pull request #38 from lingting-gzm/master

 添加消费者配置,bean 添加 ConditionalOnMissingBean 注解,用于用户自定义
This commit is contained in:
b2baccline
2020-10-14 11:07:30 +08:00
11 changed files with 223 additions and 86 deletions

View File

@@ -1,11 +1,14 @@
package com.hccake.starter.kafka;
import com.hccake.extend.kafka.KafkaConsumerBuilder;
import com.hccake.extend.kafka.KafkaExtendProducer;
import com.hccake.extend.kafka.KafkaProducerBuilder;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Scope;
/**
* @author lingting 2020/7/28 21:17
@@ -16,20 +19,23 @@ import org.springframework.context.annotation.Bean;
public class KafkaAutoConfiguration {
@Bean
@ConditionalOnMissingBean(KafkaExtendProducer.class)
public KafkaExtendProducer<String, String> stringKafkaExtendProducer(KafkaProperties properties) {
KafkaProducerBuilder builder = new KafkaProducerBuilder()
.addAllBootstrapServers(properties.getBootstrapServers()).putAll(properties.getExtend());
if (properties.getKeySerializer() != null || properties.getKeySerializerClassName() != null) {
builder.keySerializer(properties.getKeySerializer() == null ? properties.getKeySerializerClassName()
: properties.getKeySerializer().getName());
}
if (properties.getValueSerializer() != null || properties.getValueSerializerClassName() != null) {
builder.valueSerializer(properties.getValueSerializer() == null ? properties.getValueSerializerClassName()
: properties.getValueSerializer().getName());
}
builder.keySerializer(properties.getKeySerializerClassName());
builder.valueSerializer(properties.getValueSerializerClassName());
return builder.build();
}
@Bean
@Scope("prototype")
@ConditionalOnMissingBean(KafkaConsumerBuilder.class)
public KafkaConsumerBuilder consumerBuilder(KafkaProperties properties) {
return new KafkaConsumerBuilder().addAllBootstrapServers(properties.getBootstrapServers())
.keyDeserializer(properties.getKeyDeserializerClassName())
.valueDeserializer(properties.getValueDeserializerClassName()).groupId(properties.getGroupId());
}
}

View File

@@ -1,13 +1,15 @@
package com.hccake.starter.kafka;
import lombok.Data;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.context.properties.ConfigurationProperties;
import cn.hutool.core.util.StrUtil;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import lombok.Data;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* @author lingting 2020/7/28 21:15
@@ -16,6 +18,11 @@ import java.util.Set;
@ConfigurationProperties(prefix = "ballcat.starter.kafka")
public class KafkaProperties {
/**
* 用于指定分组
*/
private String groupId;
/**
* 所属服务
*/
@@ -27,7 +34,7 @@ public class KafkaProperties {
private Class<? extends Serializer<?>> keySerializer = StringSerializer.class;
/**
* key 序列化 类名
* key 序列化 类名, 如果填写本值则 valueSerializer 无效
*/
private String keySerializerClassName;
@@ -37,13 +44,61 @@ public class KafkaProperties {
private Class<? extends Serializer<?>> valueSerializer = StringSerializer.class;
/**
* value 序列化 类名
* value 序列化 类名, 如果填写本值则 valueSerializer 无效
*/
private String valueSerializerClassName;
/**
* key 反序列化
*/
private Class<? extends Deserializer<?>> keyDeserializer = StringDeserializer.class;
/**
* key 反序列化 类名, 如果填写本值则 keyDeserializer 无效
*/
private String keyDeserializerClassName;
/**
* value 反序列化
*/
private Class<? extends Deserializer<?>> valueDeserializer = StringDeserializer.class;
/**
* value 反序列化 类名, 如果填写本值则 valueDeserializer 无效
*/
private String valueDeserializerClassName;
/**
* 额外参数
*/
private Map<String, Object> extend = new HashMap<>();
public String getKeyDeserializerClassName() {
if (StrUtil.isNotEmpty(keyDeserializerClassName)) {
return keyDeserializerClassName;
}
return getKeyDeserializer().getName();
}
public String getValueDeserializerClassName() {
if (StrUtil.isNotEmpty(valueDeserializerClassName)) {
return valueDeserializerClassName;
}
return getValueDeserializer().getName();
}
public String getKeySerializerClassName() {
if (StrUtil.isNotEmpty(keySerializerClassName)) {
return keySerializerClassName;
}
return getKeySerializer().getName();
}
public String getValueSerializerClassName() {
if (StrUtil.isNotEmpty(valueSerializerClassName)) {
return valueSerializerClassName;
}
return getValueSerializer().getName();
}
}