生产者与消费者的构筑方法 添加部分属性的get方法、添加自定义build 方法

This commit is contained in:
b2baccline
2020-06-23 20:33:59 +08:00
parent 6f89242d72
commit 0b87a18ba6
2 changed files with 37 additions and 6 deletions

View File

@@ -2,6 +2,7 @@ package com.hccake.extend.kafka;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.util.StrUtil;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -11,6 +12,7 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.function.Function;
import static com.hccake.extend.kafka.KafkaConstants.BOOTSTRAP_SERVERS_DELIMITER;
@@ -25,6 +27,7 @@ import static com.hccake.extend.kafka.KafkaConstants.BOOTSTRAP_SERVERS_DELIMITER
public class KafkaConsumerBuilder {
private final Properties properties = new Properties();
private final Set<String> bootstrapServers = new HashSet<>();
@Getter
private final Set<String> topics = new HashSet<>();
public KafkaConsumerBuilder keyDeserializer(Class<? extends Deserializer<?>> c) {
@@ -97,15 +100,28 @@ public class KafkaConsumerBuilder {
return this;
}
public <K, V> KafkaConsumer<K, V> build(Function<Properties, KafkaConsumer<K, V>> function) {
KafkaConsumer<K, V> consumer = function.apply(getProperties());
consumer.subscribe(topics);
return consumer;
}
public <K, V> KafkaConsumer<K, V> build(Properties properties) {
return putAll(properties).build();
}
public <K, V> KafkaConsumer<K, V> build() {
return build(KafkaConsumer::new);
}
public Set<String> getBootstrapServers() {
getProperties();
return bootstrapServers;
}
public Properties getProperties() {
bootstrapServers.addAll(ListUtil.toList(properties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, StrUtil.EMPTY).split(BOOTSTRAP_SERVERS_DELIMITER)));
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, String.join(BOOTSTRAP_SERVERS_DELIMITER, bootstrapServers));
KafkaConsumer<K, V> kafkaConsumer = new KafkaConsumer<>(properties);
kafkaConsumer.subscribe(topics);
return kafkaConsumer;
return properties;
}
}

View File

@@ -3,6 +3,7 @@ package com.hccake.extend.kafka;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serializer;
@@ -10,6 +11,7 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.function.Function;
import static com.hccake.extend.kafka.KafkaConstants.BOOTSTRAP_SERVERS_DELIMITER;
@@ -76,13 +78,26 @@ public class KafkaProducerBuilder {
return this;
}
public <K, V> KafkaExtendProducer<K, V> build(Function<Properties, KafkaExtendProducer<K, V>> function) {
return function.apply(getProperties());
}
public <K, V> KafkaExtendProducer<K, V> build(Properties properties) {
return putAll(properties).build();
}
public <K, V> KafkaExtendProducer<K, V> build() {
bootstrapServers.addAll(ListUtil.toList(properties.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, StrUtil.EMPTY).split(BOOTSTRAP_SERVERS_DELIMITER)));
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, String.join(BOOTSTRAP_SERVERS_DELIMITER, bootstrapServers));
return new KafkaExtendProducer<>(new org.apache.kafka.clients.producer.KafkaProducer<>(properties));
return build(p -> new KafkaExtendProducer<>(new org.apache.kafka.clients.producer.KafkaProducer<>(p)));
}
public Set<String> getBootstrapServers() {
getProperties();
return bootstrapServers;
}
public Properties getProperties() {
bootstrapServers.addAll(ListUtil.toList(properties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, StrUtil.EMPTY).split(BOOTSTRAP_SERVERS_DELIMITER)));
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, String.join(BOOTSTRAP_SERVERS_DELIMITER, bootstrapServers));
return properties;
}
}