From 6f89242d72ed17e14173fac4e4e7c438cbe2b4cc Mon Sep 17 00:00:00 2001 From: b2baccline <23131013+b2baccline@users.noreply.github.com> Date: Tue, 23 Jun 2020 19:35:53 +0800 Subject: [PATCH] =?UTF-8?q?:sparkles:=20=E6=B7=BB=E5=8A=A0kafka=E6=89=A9?= =?UTF-8?q?=E5=B1=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ballcat-dependencies/pom.xml | 5 + .../ballcat-extend-dingtalk/pom.xml | 2 +- ballcat-extends/ballcat-extend-kafka/pom.xml | 26 ++++ .../hccake/extend/kafka/KafkaConstants.java | 8 ++ .../extend/kafka/KafkaConsumerBuilder.java | 111 ++++++++++++++++++ .../extend/kafka/KafkaExtendProducer.java | 81 +++++++++++++ .../extend/kafka/KafkaProducerBuilder.java | 88 ++++++++++++++ .../java/com/hccake/extend/KafkaTest.java | 33 ++++++ ballcat-extends/pom.xml | 3 +- pom.xml | 2 + 10 files changed, 357 insertions(+), 2 deletions(-) create mode 100644 ballcat-extends/ballcat-extend-kafka/pom.xml create mode 100644 ballcat-extends/ballcat-extend-kafka/src/main/java/com/hccake/extend/kafka/KafkaConstants.java create mode 100644 ballcat-extends/ballcat-extend-kafka/src/main/java/com/hccake/extend/kafka/KafkaConsumerBuilder.java create mode 100644 ballcat-extends/ballcat-extend-kafka/src/main/java/com/hccake/extend/kafka/KafkaExtendProducer.java create mode 100644 ballcat-extends/ballcat-extend-kafka/src/main/java/com/hccake/extend/kafka/KafkaProducerBuilder.java create mode 100644 ballcat-extends/ballcat-extend-kafka/src/test/java/com/hccake/extend/KafkaTest.java diff --git a/ballcat-dependencies/pom.xml b/ballcat-dependencies/pom.xml index ae24e39d..f5a6950e 100644 --- a/ballcat-dependencies/pom.xml +++ b/ballcat-dependencies/pom.xml @@ -124,6 +124,11 @@ ballcat-extend-mybatis-plus-mysql ${revision} + + com.hccake + ballcat-extend-kafka + ${revision} + diff --git a/ballcat-extends/ballcat-extend-dingtalk/pom.xml b/ballcat-extends/ballcat-extend-dingtalk/pom.xml index 3f86deae..acdf39f9 100644 --- a/ballcat-extends/ballcat-extend-dingtalk/pom.xml +++ b/ballcat-extends/ballcat-extend-dingtalk/pom.xml @@ -5,7 +5,7 @@ ballcat-extends com.hccake - 0.0.2 + ${revision} 4.0.0 diff --git a/ballcat-extends/ballcat-extend-kafka/pom.xml b/ballcat-extends/ballcat-extend-kafka/pom.xml new file mode 100644 index 00000000..b389a9dd --- /dev/null +++ b/ballcat-extends/ballcat-extend-kafka/pom.xml @@ -0,0 +1,26 @@ + + + + ballcat-extends + com.hccake + ${revision} + + 4.0.0 + + ballcat-extend-kafka + pom + + + + cn.hutool + hutool-all + + + org.apache.kafka + kafka_2.12 + ${kafka.version} + + + \ No newline at end of file diff --git a/ballcat-extends/ballcat-extend-kafka/src/main/java/com/hccake/extend/kafka/KafkaConstants.java b/ballcat-extends/ballcat-extend-kafka/src/main/java/com/hccake/extend/kafka/KafkaConstants.java new file mode 100644 index 00000000..350ca989 --- /dev/null +++ b/ballcat-extends/ballcat-extend-kafka/src/main/java/com/hccake/extend/kafka/KafkaConstants.java @@ -0,0 +1,8 @@ +package com.hccake.extend.kafka; + +/** + * @author lingting 2020/6/19 16:48 + */ +public class KafkaConstants { + public static final String BOOTSTRAP_SERVERS_DELIMITER = ","; +} diff --git a/ballcat-extends/ballcat-extend-kafka/src/main/java/com/hccake/extend/kafka/KafkaConsumerBuilder.java b/ballcat-extends/ballcat-extend-kafka/src/main/java/com/hccake/extend/kafka/KafkaConsumerBuilder.java new file mode 100644 index 00000000..8d9eaedc --- /dev/null +++ b/ballcat-extends/ballcat-extend-kafka/src/main/java/com/hccake/extend/kafka/KafkaConsumerBuilder.java @@ -0,0 +1,111 @@ +package com.hccake.extend.kafka; + +import cn.hutool.core.collection.ListUtil; +import cn.hutool.core.util.StrUtil; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.Deserializer; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Properties; +import java.util.Set; + +import static com.hccake.extend.kafka.KafkaConstants.BOOTSTRAP_SERVERS_DELIMITER; + +/** + * 消费者 + * 具体的配置请参考 {@link ConsumerConfig} + * 这里只提供一些常用配置 + * + * @author lingting 2020/5/19 20:56 + */ +@Slf4j +public class KafkaConsumerBuilder { + private final Properties properties = new Properties(); + private final Set bootstrapServers = new HashSet<>(); + private final Set topics = new HashSet<>(); + + public KafkaConsumerBuilder keyDeserializer(Class> c) { + return keyDeserializer(c.getName()); + } + + public KafkaConsumerBuilder keyDeserializer(String className) { + return put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, className); + } + + public KafkaConsumerBuilder valueDeserializer(Class> c) { + return valueDeserializer(c.getName()); + } + + public KafkaConsumerBuilder valueDeserializer(String className) { + return put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, className); + } + + /** + * 添加 kafka 路径 host:port + * + * @author lingting 2020-06-19 16:30:03 + */ + public KafkaConsumerBuilder addBootstrapServers(String uri) { + bootstrapServers.add(uri); + return this; + } + + public KafkaConsumerBuilder addAllBootstrapServers(Collection uris) { + bootstrapServers.addAll(uris); + return this; + } + + /** + * 添加配置 + * + * @author lingting 2020-06-19 16:30:50 + */ + public KafkaConsumerBuilder put(Object key, Object val) { + properties.put(key, val); + return this; + } + + /** + * 添加配置 + * + * @author lingting 2020-06-19 16:30:50 + */ + public KafkaConsumerBuilder putAll(Properties properties) { + this.properties.putAll(properties); + return this; + } + + /** + * 组id + * + * @author lingting 2020-06-19 16:46:32 + */ + public KafkaConsumerBuilder groupId(String groupId) { + return put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + } + + public KafkaConsumerBuilder addTopic(String topic) { + topics.add(topic); + return this; + } + + public KafkaConsumerBuilder addAllTopic(Collection topics) { + this.topics.addAll(topics); + return this; + } + + public KafkaConsumer build(Properties properties) { + return putAll(properties).build(); + } + + public KafkaConsumer build() { + bootstrapServers.addAll(ListUtil.toList(properties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, StrUtil.EMPTY).split(BOOTSTRAP_SERVERS_DELIMITER))); + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, String.join(BOOTSTRAP_SERVERS_DELIMITER, bootstrapServers)); + KafkaConsumer kafkaConsumer = new KafkaConsumer<>(properties); + kafkaConsumer.subscribe(topics); + return kafkaConsumer; + } +} diff --git a/ballcat-extends/ballcat-extend-kafka/src/main/java/com/hccake/extend/kafka/KafkaExtendProducer.java b/ballcat-extends/ballcat-extend-kafka/src/main/java/com/hccake/extend/kafka/KafkaExtendProducer.java new file mode 100644 index 00000000..cbaeb60b --- /dev/null +++ b/ballcat-extends/ballcat-extend-kafka/src/main/java/com/hccake/extend/kafka/KafkaExtendProducer.java @@ -0,0 +1,81 @@ +package com.hccake.extend.kafka; + +import lombok.Getter; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.header.Header; + +import java.util.concurrent.Future; + +/** + * 生产者扩展 + * 提供了一些更加方便使用 生产者的方法 + * + * @author lingting 2020/6/23 17:11 + */ +public class KafkaExtendProducer { + @Getter + private final KafkaProducer producer; + + public KafkaExtendProducer(KafkaProducer producer) { + this.producer = producer; + } + + public ProducerRecord record(String topic, Integer partition, Long timestamp, K key, V value, Iterable
headers) { + return new ProducerRecord<>(topic, partition, timestamp, key, value, headers); + } + + public ProducerRecord record(String topic, Integer partition, Long timestamp, K key, V value) { + return record(topic, partition, timestamp, key, value, null); + } + + public ProducerRecord record(String topic, Integer partition, K key, V value, Iterable
headers) { + return record(topic, partition, null, key, value, headers); + } + + public ProducerRecord record(String topic, Integer partition, K key, V value) { + return record(topic, partition, null, key, value, null); + } + + public ProducerRecord record(String topic, K key, V value) { + return record(topic, null, null, key, value, null); + } + + public ProducerRecord record(String topic, V value) { + return record(topic, null, null, null, value, null); + } + + public Future send(ProducerRecord record, Callback callback) { + return producer.send(record, callback); + } + + public Future send(String topic, Integer partition, Long timestamp, K key, V value, Iterable
headers) { + return send(record(topic, partition, timestamp, key, value, headers), null); + } + + public Future send(String topic, Integer partition, Long timestamp, K key, V value) { + return send(topic, partition, timestamp, key, value, null); + } + + public Future send(String topic, Integer partition, K key, V value, Iterable
headers) { + return send(topic, partition, null, key, value, headers); + } + + public Future send(String topic, Integer partition, K key, V value) { + return send(topic, partition, null, key, value, null); + } + + public Future send(String topic, K key, V value) { + return send(topic, null, null, key, value, null); + } + + public Future send(String topic, V value) { + return send(topic, null, null, null, value, null); + } + + public Future send(String topic, V value, Callback callback) { + return send(record(topic, value), callback); + } +} diff --git a/ballcat-extends/ballcat-extend-kafka/src/main/java/com/hccake/extend/kafka/KafkaProducerBuilder.java b/ballcat-extends/ballcat-extend-kafka/src/main/java/com/hccake/extend/kafka/KafkaProducerBuilder.java new file mode 100644 index 00000000..ad36e044 --- /dev/null +++ b/ballcat-extends/ballcat-extend-kafka/src/main/java/com/hccake/extend/kafka/KafkaProducerBuilder.java @@ -0,0 +1,88 @@ +package com.hccake.extend.kafka; + +import cn.hutool.core.collection.ListUtil; +import cn.hutool.core.util.StrUtil; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.Serializer; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Properties; +import java.util.Set; + +import static com.hccake.extend.kafka.KafkaConstants.BOOTSTRAP_SERVERS_DELIMITER; + +/** + * 生产者 + * 具体的配置请参考 {@link ProducerConfig} + * 这里只提供一些常用配置 + * + * @author lingting 2020/5/19 20:56 + */ +@Slf4j +public class KafkaProducerBuilder { + private final Properties properties = new Properties(); + private final Set bootstrapServers = new HashSet<>(); + + public KafkaProducerBuilder keySerializer(Class> c) { + return keySerializer(c.getName()); + } + + public KafkaProducerBuilder keySerializer(String className) { + return put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, className); + } + + public KafkaProducerBuilder valueSerializer(Class> c) { + return valueSerializer(c.getName()); + } + + public KafkaProducerBuilder valueSerializer(String className) { + return put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, className); + } + + /** + * 添加 kafka 路径 host:port + * + * @author lingting 2020-06-19 16:30:03 + */ + public KafkaProducerBuilder addBootstrapServers(String uri) { + bootstrapServers.add(uri); + return this; + } + + public KafkaProducerBuilder addAllBootstrapServers(Collection uris) { + bootstrapServers.addAll(uris); + return this; + } + + /** + * 添加配置 + * + * @author lingting 2020-06-19 16:30:50 + */ + public KafkaProducerBuilder put(Object key, Object val) { + properties.put(key, val); + return this; + } + + /** + * 添加配置 + * + * @author lingting 2020-06-19 16:30:50 + */ + public KafkaProducerBuilder putAll(Properties properties) { + this.properties.putAll(properties); + return this; + } + + public KafkaExtendProducer build(Properties properties) { + return putAll(properties).build(); + } + + public KafkaExtendProducer build() { + bootstrapServers.addAll(ListUtil.toList(properties.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, StrUtil.EMPTY).split(BOOTSTRAP_SERVERS_DELIMITER))); + properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, String.join(BOOTSTRAP_SERVERS_DELIMITER, bootstrapServers)); + return new KafkaExtendProducer<>(new org.apache.kafka.clients.producer.KafkaProducer<>(properties)); + } +} diff --git a/ballcat-extends/ballcat-extend-kafka/src/test/java/com/hccake/extend/KafkaTest.java b/ballcat-extends/ballcat-extend-kafka/src/test/java/com/hccake/extend/KafkaTest.java new file mode 100644 index 00000000..04d646ae --- /dev/null +++ b/ballcat-extends/ballcat-extend-kafka/src/test/java/com/hccake/extend/KafkaTest.java @@ -0,0 +1,33 @@ +package com.hccake.extend; + +import com.hccake.extend.kafka.KafkaConsumerBuilder; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; + +/** + * @author lingting 2020/6/23 17:07 + */ +public class KafkaTest { + public static void main(String[] args) { +// KafkaProducer producer = new KafkaProducerBuilder() + KafkaConsumer consumer = new KafkaConsumerBuilder() + .keyDeserializer(StringDeserializer.class) + .valueDeserializer(StringDeserializer.class) + .groupId("group-id") + .addTopic("first") +// .keySerializer(StringSerializer.class) +// .valueSerializer(StringSerializer.class) + .addBootstrapServers("192.168.1.3:50211") + .addBootstrapServers("192.168.1.3:50212") + .build(); + +// producer.send("first", "测试消息"); +// while (true) { +// ConsumerRecords records = consumer.poll(Duration.ofSeconds(5)); +// records.forEach(record -> { +// System.out.println(record.key()); +// System.out.println(record.value()); +// }); +// } + } +} diff --git a/ballcat-extends/pom.xml b/ballcat-extends/pom.xml index df222263..89cd4990 100644 --- a/ballcat-extends/pom.xml +++ b/ballcat-extends/pom.xml @@ -15,5 +15,6 @@ ballcat-extend-mybatis-plus ballcat-extend-mybatis-plus-mysql ballcat-extend-dingtalk - + ballcat-extend-kafka + \ No newline at end of file diff --git a/pom.xml b/pom.xml index 2b850679..8f55705d 100644 --- a/pom.xml +++ b/pom.xml @@ -51,6 +51,8 @@ 1.8 3.8.0 + 2.5.0 + 1.1.0 1.6.8 3.0.0-M1