添加kafka扩展

This commit is contained in:
b2baccline
2020-06-23 19:35:53 +08:00
parent d68cd369b3
commit 6f89242d72
10 changed files with 357 additions and 2 deletions

View File

@@ -124,6 +124,11 @@
<artifactId>ballcat-extend-mybatis-plus-mysql</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>com.hccake</groupId>
<artifactId>ballcat-extend-kafka</artifactId>
<version>${revision}</version>
</dependency>
<!--swagger注解-->
<dependency>

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>ballcat-extends</artifactId>
<groupId>com.hccake</groupId>
<version>0.0.2</version>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@@ -0,0 +1,26 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>ballcat-extends</artifactId>
<groupId>com.hccake</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>ballcat-extend-kafka</artifactId>
<packaging>pom</packaging>
<dependencies>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>${kafka.version}</version>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,8 @@
package com.hccake.extend.kafka;
/**
* @author lingting 2020/6/19 16:48
*/
public class KafkaConstants {
public static final String BOOTSTRAP_SERVERS_DELIMITER = ",";
}

View File

@@ -0,0 +1,111 @@
package com.hccake.extend.kafka;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.Collection;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import static com.hccake.extend.kafka.KafkaConstants.BOOTSTRAP_SERVERS_DELIMITER;
/**
* 消费者
* 具体的配置请参考 {@link ConsumerConfig}
* 这里只提供一些常用配置
*
* @author lingting 2020/5/19 20:56
*/
@Slf4j
public class KafkaConsumerBuilder {
private final Properties properties = new Properties();
private final Set<String> bootstrapServers = new HashSet<>();
private final Set<String> topics = new HashSet<>();
public KafkaConsumerBuilder keyDeserializer(Class<? extends Deserializer<?>> c) {
return keyDeserializer(c.getName());
}
public KafkaConsumerBuilder keyDeserializer(String className) {
return put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, className);
}
public KafkaConsumerBuilder valueDeserializer(Class<? extends Deserializer<?>> c) {
return valueDeserializer(c.getName());
}
public KafkaConsumerBuilder valueDeserializer(String className) {
return put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, className);
}
/**
* 添加 kafka 路径 host:port
*
* @author lingting 2020-06-19 16:30:03
*/
public KafkaConsumerBuilder addBootstrapServers(String uri) {
bootstrapServers.add(uri);
return this;
}
public KafkaConsumerBuilder addAllBootstrapServers(Collection<String> uris) {
bootstrapServers.addAll(uris);
return this;
}
/**
* 添加配置
*
* @author lingting 2020-06-19 16:30:50
*/
public KafkaConsumerBuilder put(Object key, Object val) {
properties.put(key, val);
return this;
}
/**
* 添加配置
*
* @author lingting 2020-06-19 16:30:50
*/
public KafkaConsumerBuilder putAll(Properties properties) {
this.properties.putAll(properties);
return this;
}
/**
* 组id
*
* @author lingting 2020-06-19 16:46:32
*/
public KafkaConsumerBuilder groupId(String groupId) {
return put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
}
public KafkaConsumerBuilder addTopic(String topic) {
topics.add(topic);
return this;
}
public KafkaConsumerBuilder addAllTopic(Collection<String> topics) {
this.topics.addAll(topics);
return this;
}
public <K, V> KafkaConsumer<K, V> build(Properties properties) {
return putAll(properties).build();
}
public <K, V> KafkaConsumer<K, V> build() {
bootstrapServers.addAll(ListUtil.toList(properties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, StrUtil.EMPTY).split(BOOTSTRAP_SERVERS_DELIMITER)));
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, String.join(BOOTSTRAP_SERVERS_DELIMITER, bootstrapServers));
KafkaConsumer<K, V> kafkaConsumer = new KafkaConsumer<>(properties);
kafkaConsumer.subscribe(topics);
return kafkaConsumer;
}
}

View File

@@ -0,0 +1,81 @@
package com.hccake.extend.kafka;
import lombok.Getter;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Header;
import java.util.concurrent.Future;
/**
* 生产者扩展
* 提供了一些更加方便使用 生产者的方法
*
* @author lingting 2020/6/23 17:11
*/
public class KafkaExtendProducer<K, V> {
@Getter
private final KafkaProducer<K, V> producer;
public KafkaExtendProducer(KafkaProducer<K, V> producer) {
this.producer = producer;
}
public ProducerRecord<K, V> record(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {
return new ProducerRecord<>(topic, partition, timestamp, key, value, headers);
}
public ProducerRecord<K, V> record(String topic, Integer partition, Long timestamp, K key, V value) {
return record(topic, partition, timestamp, key, value, null);
}
public ProducerRecord<K, V> record(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
return record(topic, partition, null, key, value, headers);
}
public ProducerRecord<K, V> record(String topic, Integer partition, K key, V value) {
return record(topic, partition, null, key, value, null);
}
public ProducerRecord<K, V> record(String topic, K key, V value) {
return record(topic, null, null, key, value, null);
}
public ProducerRecord<K, V> record(String topic, V value) {
return record(topic, null, null, null, value, null);
}
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
return producer.send(record, callback);
}
public Future<RecordMetadata> send(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {
return send(record(topic, partition, timestamp, key, value, headers), null);
}
public Future<RecordMetadata> send(String topic, Integer partition, Long timestamp, K key, V value) {
return send(topic, partition, timestamp, key, value, null);
}
public Future<RecordMetadata> send(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
return send(topic, partition, null, key, value, headers);
}
public Future<RecordMetadata> send(String topic, Integer partition, K key, V value) {
return send(topic, partition, null, key, value, null);
}
public Future<RecordMetadata> send(String topic, K key, V value) {
return send(topic, null, null, key, value, null);
}
public Future<RecordMetadata> send(String topic, V value) {
return send(topic, null, null, null, value, null);
}
public Future<RecordMetadata> send(String topic, V value, Callback callback) {
return send(record(topic, value), callback);
}
}

View File

@@ -0,0 +1,88 @@
package com.hccake.extend.kafka;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serializer;
import java.util.Collection;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import static com.hccake.extend.kafka.KafkaConstants.BOOTSTRAP_SERVERS_DELIMITER;
/**
* 生产者
* 具体的配置请参考 {@link ProducerConfig}
* 这里只提供一些常用配置
*
* @author lingting 2020/5/19 20:56
*/
@Slf4j
public class KafkaProducerBuilder {
private final Properties properties = new Properties();
private final Set<String> bootstrapServers = new HashSet<>();
public KafkaProducerBuilder keySerializer(Class<? extends Serializer<?>> c) {
return keySerializer(c.getName());
}
public KafkaProducerBuilder keySerializer(String className) {
return put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, className);
}
public KafkaProducerBuilder valueSerializer(Class<? extends Serializer<?>> c) {
return valueSerializer(c.getName());
}
public KafkaProducerBuilder valueSerializer(String className) {
return put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, className);
}
/**
* 添加 kafka 路径 host:port
*
* @author lingting 2020-06-19 16:30:03
*/
public KafkaProducerBuilder addBootstrapServers(String uri) {
bootstrapServers.add(uri);
return this;
}
public KafkaProducerBuilder addAllBootstrapServers(Collection<String> uris) {
bootstrapServers.addAll(uris);
return this;
}
/**
* 添加配置
*
* @author lingting 2020-06-19 16:30:50
*/
public KafkaProducerBuilder put(Object key, Object val) {
properties.put(key, val);
return this;
}
/**
* 添加配置
*
* @author lingting 2020-06-19 16:30:50
*/
public KafkaProducerBuilder putAll(Properties properties) {
this.properties.putAll(properties);
return this;
}
public <K, V> KafkaExtendProducer<K, V> build(Properties properties) {
return putAll(properties).build();
}
public <K, V> KafkaExtendProducer<K, V> build() {
bootstrapServers.addAll(ListUtil.toList(properties.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, StrUtil.EMPTY).split(BOOTSTRAP_SERVERS_DELIMITER)));
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, String.join(BOOTSTRAP_SERVERS_DELIMITER, bootstrapServers));
return new KafkaExtendProducer<>(new org.apache.kafka.clients.producer.KafkaProducer<>(properties));
}
}

View File

@@ -0,0 +1,33 @@
package com.hccake.extend;
import com.hccake.extend.kafka.KafkaConsumerBuilder;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
/**
* @author lingting 2020/6/23 17:07
*/
public class KafkaTest {
public static void main(String[] args) {
// KafkaProducer<String, String> producer = new KafkaProducerBuilder()
KafkaConsumer<Object, Object> consumer = new KafkaConsumerBuilder()
.keyDeserializer(StringDeserializer.class)
.valueDeserializer(StringDeserializer.class)
.groupId("group-id")
.addTopic("first")
// .keySerializer(StringSerializer.class)
// .valueSerializer(StringSerializer.class)
.addBootstrapServers("192.168.1.3:50211")
.addBootstrapServers("192.168.1.3:50212")
.build();
// producer.send("first", "测试消息");
// while (true) {
// ConsumerRecords<Object, Object> records = consumer.poll(Duration.ofSeconds(5));
// records.forEach(record -> {
// System.out.println(record.key());
// System.out.println(record.value());
// });
// }
}
}

View File

@@ -15,5 +15,6 @@
<module>ballcat-extend-mybatis-plus</module>
<module>ballcat-extend-mybatis-plus-mysql</module>
<module>ballcat-extend-dingtalk</module>
<module>ballcat-extend-kafka</module>
</modules>
</project>

View File

@@ -51,6 +51,8 @@
<maven.compiler.target>1.8</maven.compiler.target>
<maven-compiler-plugin.verison>3.8.0</maven-compiler-plugin.verison>
<kafka.version>2.5.0</kafka.version>
<flatten-maven-plugin.version>1.1.0</flatten-maven-plugin.version>
<nexus-staging-maven-plugin.version>1.6.8</nexus-staging-maven-plugin.version>
<maven-release-plugin.version>3.0.0-M1</maven-release-plugin.version>