From 0b87a18ba678d8a654f85469a57cbe75e43e7958 Mon Sep 17 00:00:00 2001 From: b2baccline <23131013+b2baccline@users.noreply.github.com> Date: Tue, 23 Jun 2020 20:33:59 +0800 Subject: [PATCH] =?UTF-8?q?:sparkles:=20=E7=94=9F=E4=BA=A7=E8=80=85?= =?UTF-8?q?=E4=B8=8E=E6=B6=88=E8=B4=B9=E8=80=85=E7=9A=84=E6=9E=84=E7=AD=91?= =?UTF-8?q?=E6=96=B9=E6=B3=95=20=E6=B7=BB=E5=8A=A0=E9=83=A8=E5=88=86?= =?UTF-8?q?=E5=B1=9E=E6=80=A7=E7=9A=84get=E6=96=B9=E6=B3=95=E3=80=81?= =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E8=87=AA=E5=AE=9A=E4=B9=89build=20=E6=96=B9?= =?UTF-8?q?=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../extend/kafka/KafkaConsumerBuilder.java | 22 ++++++++++++++++--- .../extend/kafka/KafkaProducerBuilder.java | 21 +++++++++++++++--- 2 files changed, 37 insertions(+), 6 deletions(-) diff --git a/ballcat-extends/ballcat-extend-kafka/src/main/java/com/hccake/extend/kafka/KafkaConsumerBuilder.java b/ballcat-extends/ballcat-extend-kafka/src/main/java/com/hccake/extend/kafka/KafkaConsumerBuilder.java index 8d9eaedc..f29aab2a 100644 --- a/ballcat-extends/ballcat-extend-kafka/src/main/java/com/hccake/extend/kafka/KafkaConsumerBuilder.java +++ b/ballcat-extends/ballcat-extend-kafka/src/main/java/com/hccake/extend/kafka/KafkaConsumerBuilder.java @@ -2,6 +2,7 @@ package com.hccake.extend.kafka; import cn.hutool.core.collection.ListUtil; import cn.hutool.core.util.StrUtil; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -11,6 +12,7 @@ import java.util.Collection; import java.util.HashSet; import java.util.Properties; import java.util.Set; +import java.util.function.Function; import static com.hccake.extend.kafka.KafkaConstants.BOOTSTRAP_SERVERS_DELIMITER; @@ -25,6 +27,7 @@ import static com.hccake.extend.kafka.KafkaConstants.BOOTSTRAP_SERVERS_DELIMITER public class KafkaConsumerBuilder { private final Properties properties = new Properties(); private final Set bootstrapServers = new HashSet<>(); + @Getter private final Set topics = new HashSet<>(); public KafkaConsumerBuilder keyDeserializer(Class> c) { @@ -97,15 +100,28 @@ public class KafkaConsumerBuilder { return this; } + public KafkaConsumer build(Function> function) { + KafkaConsumer consumer = function.apply(getProperties()); + consumer.subscribe(topics); + return consumer; + } + public KafkaConsumer build(Properties properties) { return putAll(properties).build(); } public KafkaConsumer build() { + return build(KafkaConsumer::new); + } + + public Set getBootstrapServers() { + getProperties(); + return bootstrapServers; + } + + public Properties getProperties() { bootstrapServers.addAll(ListUtil.toList(properties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, StrUtil.EMPTY).split(BOOTSTRAP_SERVERS_DELIMITER))); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, String.join(BOOTSTRAP_SERVERS_DELIMITER, bootstrapServers)); - KafkaConsumer kafkaConsumer = new KafkaConsumer<>(properties); - kafkaConsumer.subscribe(topics); - return kafkaConsumer; + return properties; } } diff --git a/ballcat-extends/ballcat-extend-kafka/src/main/java/com/hccake/extend/kafka/KafkaProducerBuilder.java b/ballcat-extends/ballcat-extend-kafka/src/main/java/com/hccake/extend/kafka/KafkaProducerBuilder.java index ad36e044..2dcee14f 100644 --- a/ballcat-extends/ballcat-extend-kafka/src/main/java/com/hccake/extend/kafka/KafkaProducerBuilder.java +++ b/ballcat-extends/ballcat-extend-kafka/src/main/java/com/hccake/extend/kafka/KafkaProducerBuilder.java @@ -3,6 +3,7 @@ package com.hccake.extend.kafka; import cn.hutool.core.collection.ListUtil; import cn.hutool.core.util.StrUtil; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.Serializer; @@ -10,6 +11,7 @@ import java.util.Collection; import java.util.HashSet; import java.util.Properties; import java.util.Set; +import java.util.function.Function; import static com.hccake.extend.kafka.KafkaConstants.BOOTSTRAP_SERVERS_DELIMITER; @@ -76,13 +78,26 @@ public class KafkaProducerBuilder { return this; } + public KafkaExtendProducer build(Function> function) { + return function.apply(getProperties()); + } + public KafkaExtendProducer build(Properties properties) { return putAll(properties).build(); } public KafkaExtendProducer build() { - bootstrapServers.addAll(ListUtil.toList(properties.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, StrUtil.EMPTY).split(BOOTSTRAP_SERVERS_DELIMITER))); - properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, String.join(BOOTSTRAP_SERVERS_DELIMITER, bootstrapServers)); - return new KafkaExtendProducer<>(new org.apache.kafka.clients.producer.KafkaProducer<>(properties)); + return build(p -> new KafkaExtendProducer<>(new org.apache.kafka.clients.producer.KafkaProducer<>(p))); + } + + public Set getBootstrapServers() { + getProperties(); + return bootstrapServers; + } + + public Properties getProperties() { + bootstrapServers.addAll(ListUtil.toList(properties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, StrUtil.EMPTY).split(BOOTSTRAP_SERVERS_DELIMITER))); + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, String.join(BOOTSTRAP_SERVERS_DELIMITER, bootstrapServers)); + return properties; } }