添加kafka流处理扩展

This commit is contained in:
b2baccline
2020-06-23 20:34:25 +08:00
parent 0b87a18ba6
commit 57b3c253b7
11 changed files with 882 additions and 2 deletions

View File

@@ -0,0 +1,30 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>ballcat-extends</artifactId>
<groupId>com.hccake</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>ballcat-extend-kafka-stream</artifactId>
<dependencies>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,261 @@
package com.hccake.extend.kafka.stream;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.util.StrUtil;
import lombok.Getter;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.state.StoreBuilder;
import java.util.Collection;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.regex.Pattern;
/**
* kafka Stream 流构建方法
*
* @author lingting 2020/6/23 19:31
*/
public class KafkaStreamBuilder {
private static final String BOOTSTRAP_SERVERS_DELIMITER = ",";
@Getter
private Topology topology = new Topology();
private final Properties properties = new Properties();
private final Set<String> bootstrapServers = new HashSet<>();
public KafkaStreamBuilder keySerde(Class<? extends Serde<?>> c) {
return keySerde(c.getName());
}
public KafkaStreamBuilder keySerde(String className) {
return put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, className);
}
public KafkaStreamBuilder valueSerde(Class<? extends Serde<?>> c) {
return valueSerde(c.getName());
}
public KafkaStreamBuilder valueSerde(String className) {
return put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, className);
}
/**
* 添加 kafka 路径 host:port
*
* @author lingting 2020-06-19 16:30:03
*/
public KafkaStreamBuilder addBootstrapServers(String uri) {
bootstrapServers.add(uri);
return this;
}
public KafkaStreamBuilder addAllBootstrapServers(Collection<String> uris) {
bootstrapServers.addAll(uris);
return this;
}
/**
* 添加配置
*
* @author lingting 2020-06-19 16:30:50
*/
public KafkaStreamBuilder put(Object key, Object val) {
properties.put(key, val);
return this;
}
/**
* 添加配置
*
* @author lingting 2020-06-19 16:30:50
*/
public KafkaStreamBuilder putAll(Properties properties) {
this.properties.putAll(properties);
return this;
}
public KafkaStreamBuilder applicationId(String aId) {
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, aId);
return this;
}
public synchronized KafkaStreamBuilder addSource(String name, String... topics) {
topology.addSource(name, topics);
return this;
}
public synchronized KafkaStreamBuilder addSource(String name, Pattern topicPattern) {
topology.addSource(name, topicPattern);
return this;
}
public synchronized KafkaStreamBuilder addSource(Topology.AutoOffsetReset offsetReset, String name, String... topics) {
topology.addSource(offsetReset, name, topics);
return this;
}
public synchronized KafkaStreamBuilder addSource(Topology.AutoOffsetReset offsetReset, String name, Pattern topicPattern) {
topology.addSource(offsetReset, name, topicPattern);
return this;
}
public synchronized KafkaStreamBuilder addSource(TimestampExtractor timestampExtractor, String name, String... topics) {
topology.addSource(timestampExtractor, name, topics);
return this;
}
public synchronized KafkaStreamBuilder addSource(TimestampExtractor timestampExtractor, String name, Pattern topicPattern) {
topology.addSource(timestampExtractor, name, topicPattern);
return this;
}
public synchronized KafkaStreamBuilder addSource(Topology.AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, String name, String... topics) {
topology.addSource(offsetReset, timestampExtractor, name, topics);
return this;
}
public synchronized KafkaStreamBuilder addSource(Topology.AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, String name, Pattern topicPattern) {
topology.addSource(offsetReset, timestampExtractor, name, topicPattern);
return this;
}
public synchronized KafkaStreamBuilder addSource(String name, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, String... topics) {
topology.addSource(name, keyDeserializer, valueDeserializer, topics);
return this;
}
public synchronized KafkaStreamBuilder addSource(String name, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, Pattern topicPattern) {
topology.addSource(name, keyDeserializer, valueDeserializer, topicPattern);
return this;
}
public synchronized KafkaStreamBuilder addSource(Topology.AutoOffsetReset offsetReset, String name, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, String... topics) {
topology.addSource(offsetReset, name, keyDeserializer, valueDeserializer, topics);
return this;
}
public synchronized KafkaStreamBuilder addSource(Topology.AutoOffsetReset offsetReset, String name, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, Pattern topicPattern) {
topology.addSource(offsetReset, name, keyDeserializer, valueDeserializer, topicPattern);
return this;
}
public synchronized KafkaStreamBuilder addSource(Topology.AutoOffsetReset offsetReset, String name, TimestampExtractor timestampExtractor, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, String... topics) {
topology.addSource(offsetReset, name, timestampExtractor, keyDeserializer, valueDeserializer, topics);
return this;
}
public synchronized KafkaStreamBuilder addSource(Topology.AutoOffsetReset offsetReset, String name, TimestampExtractor timestampExtractor, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, Pattern topicPattern) {
topology.addSource(offsetReset, name, timestampExtractor, keyDeserializer, valueDeserializer, topicPattern);
return this;
}
public synchronized KafkaStreamBuilder addSink(String name, String topic, String... parentNames) {
topology.addSink(name, topic, parentNames);
return this;
}
public synchronized <K, V> KafkaStreamBuilder addSink(String name, String topic, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames) {
topology.addSink(name, topic, partitioner, parentNames);
return this;
}
public synchronized <K, V> KafkaStreamBuilder addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valueSerializer, String... parentNames) {
topology.addSink(name, topic, keySerializer, valueSerializer, parentNames);
return this;
}
public synchronized <K, V> KafkaStreamBuilder addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valueSerializer, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames) {
topology.addSink(name, topic, keySerializer, valueSerializer, partitioner, parentNames);
return this;
}
public synchronized <K, V> KafkaStreamBuilder addSink(String name, TopicNameExtractor<K, V> topicExtractor, String... parentNames) {
topology.addSink(name, topicExtractor, parentNames);
return this;
}
public synchronized <K, V> KafkaStreamBuilder addSink(String name, TopicNameExtractor<K, V> topicExtractor, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames) {
topology.addSink(name, topicExtractor, partitioner, parentNames);
return this;
}
public synchronized <K, V> KafkaStreamBuilder addSink(String name, TopicNameExtractor<K, V> topicExtractor, Serializer<K> keySerializer, Serializer<V> valueSerializer, String... parentNames) {
topology.addSink(name, topicExtractor, keySerializer, valueSerializer, parentNames);
return this;
}
public synchronized <K, V> KafkaStreamBuilder addSink(String name, TopicNameExtractor<K, V> topicExtractor, Serializer<K> keySerializer, Serializer<V> valueSerializer, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames) {
topology.addSink(name, topicExtractor, keySerializer, valueSerializer, partitioner, parentNames);
return this;
}
public synchronized KafkaStreamBuilder addProcessor(String name, ProcessorSupplier<?, ?> supplier, String... parentNames) {
topology.addProcessor(name, supplier, parentNames);
return this;
}
public synchronized KafkaStreamBuilder addStateStore(StoreBuilder<?> storeBuilder, String... processorNames) {
topology.addStateStore(storeBuilder, processorNames);
return this;
}
public synchronized KafkaStreamBuilder addGlobalStore(StoreBuilder<?> storeBuilder, String sourceName, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, String topic, String processorName, ProcessorSupplier<?, ?> stateUpdateSupplier) {
topology.addGlobalStore(storeBuilder, sourceName, keyDeserializer, valueDeserializer, topic, processorName, stateUpdateSupplier);
return this;
}
public synchronized KafkaStreamBuilder addGlobalStore(StoreBuilder<?> storeBuilder, String sourceName, TimestampExtractor timestampExtractor, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, String topic, String processorName, ProcessorSupplier<?, ?> stateUpdateSupplier) {
topology.addGlobalStore(storeBuilder, sourceName, timestampExtractor, keyDeserializer, valueDeserializer, topic, processorName, stateUpdateSupplier);
return this;
}
public synchronized KafkaStreamBuilder connectProcessorAndStateStores(String processorName, String... stateStoreNames) {
topology.connectProcessorAndStateStores(processorName, stateStoreNames);
return this;
}
/**
* 自定义的构筑方法, 传入 topology 和属性
*
* @author lingting 2020-06-23 20:24:52
*/
public KafkaStreams build(BiFunction<Topology, Properties, KafkaStreams> biFunction) {
return biFunction.apply(topology, getProperties());
}
public KafkaStreams build(Properties properties) {
return putAll(properties).build();
}
public KafkaStreams build(Topology topology) {
this.topology = topology;
return build();
}
public KafkaStreams build() {
return build(KafkaStreams::new);
}
public Set<String> getBootstrapServers() {
getProperties();
return bootstrapServers;
}
public Properties getProperties() {
bootstrapServers.addAll(ListUtil.toList(properties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, StrUtil.EMPTY).split(BOOTSTRAP_SERVERS_DELIMITER)));
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, String.join(BOOTSTRAP_SERVERS_DELIMITER, bootstrapServers));
return properties;
}
}

View File

@@ -0,0 +1,32 @@
package com.hccake.extend.kafka.stream.core;
import com.hccake.extend.kafka.stream.store.KafkaKeyValueStore;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
/**
* 所有 kafka 流处理执行相关类的顶级父类
*
* @author lingting 2020/6/22 11:02
*/
public abstract class AbstractKafka {
/**
* 获取上下文
*
* @return content
* @author lingting 2020-06-22 11:03:23
*/
public abstract ProcessorContext getContext();
/**
* 获取 KeyValueStore
*
* @return java.lang.String
* @author lingting 2020-06-22 09:57:37
*/
@SuppressWarnings("unchecked")
public <VK, VV> KafkaKeyValueStore<VK, VV> getStore(String name) {
return KafkaKeyValueStore.init((KeyValueStore<VK, VV>) getContext().getStateStore(name));
}
}

View File

@@ -0,0 +1,125 @@
package com.hccake.extend.kafka.stream.core;
import cn.hutool.core.convert.Convert;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.hccake.extend.kafka.stream.util.ProcessorContextUtil;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.To;
import java.time.Duration;
/**
* kafka 顶级 processor 类
*
* @author lingting 2020/6/16 22:27
*/
@Slf4j
public abstract class AbstractProcessor<K, V> extends AbstractKafka implements Processor<K, V> {
@Getter
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
initSchedule(this.context);
}
/**
* 用于初始化窗口的方法
* 子类如果需要 自己重写
*
* @author lingting 2020-06-17 10:44:39
*/
public void initSchedule(ProcessorContext context) {
}
/**
* 用于构筑 Punctuator
*
* @author lingting 2020-06-21 13:58:34
*/
public void schedule(Duration interval, PunctuationType type, AbstractPunctuator callback) {
context.schedule(interval, type, callback);
}
/**
* 用于构筑 {@link PunctuationType#WALL_CLOCK_TIME} 类型的 Punctuator
*
* @author lingting 2020-06-21 13:58:53
*/
public void schedule(Duration interval, AbstractPunctuator callback) {
schedule(interval, PunctuationType.WALL_CLOCK_TIME, callback);
}
/**
* 下发数据
*
* @param key key
* @param value value
* @param childName 目标名称
* @author lingting 2020-06-17 19:44:45
*/
public <KEY, VALUE> void forward(KEY key, VALUE value, String childName) {
context.forward(key, value, To.child(childName));
}
/**
* 下发数据
*
* @param key key
* @param value value
* @param to 目标
* @author lingting 2020-06-17 19:47:55
*/
public <KEY, VALUE> void forward(KEY key, VALUE value, To to) {
context.forward(key, value, to);
}
public void startLog(K key, V value) {
log.debug("收到消息 {} key: {} value: {}", ProcessorContextUtil.toLogString(context), key, value);
}
public void errLog(Throwable e) {
log.error("processor 操作数据出错 " + ProcessorContextUtil.toLogString(context), e);
}
@Override
public void process(K key, V value) {
// 由于测试中存在 处理过程报错,整个 topology 停止运行,所以直接捕获异常
try {
startLog(key, value);
process(context, key, value);
} catch (Exception e) {
errLog(e);
String errStr;
try {
errStr = new ObjectMapper().writeValueAsString(value);
} catch (Exception ex) {
log.error("数据转json出错!msg: {}", ex.getMessage());
errStr = Convert.toStr(value);
}
log.error("异常数据 {}", errStr);
}
}
/**
* Process the record with the given key and value.
*
* @param context 上下文
* @param key the key for the record
* @param value the value for the record
*/
public abstract void process(ProcessorContext context, K key, V value);
/**
* 子类需要时, 自己重写
*/
@Override
public void close() {
}
}

View File

@@ -0,0 +1,94 @@
package com.hccake.extend.kafka.stream.core;
import cn.hutool.core.date.TimeInterval;
import com.hccake.extend.kafka.stream.util.ProcessorContextUtil;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.Punctuator;
/**
* kafka 顶级 punctuator 类
*
* @author lingting 2020/6/17 14:02
*/
@Slf4j
public abstract class AbstractPunctuator extends AbstractKafka implements Punctuator {
@Getter
private ProcessorContext context;
public AbstractPunctuator init(ProcessorContext context) {
this.context = context;
return this;
}
/**
* 是否处理数据, 如果想依据条件执行,需要自己重写当前方法
*
* @return boolean true 表示有参数,可以执行处理
* @author lingting 2020-06-17 14:05:01
*/
public boolean isHandle() {
return true;
}
/**
* 用于处理完数据后,清空当前存储的数据
*
* @author lingting 2020-06-17 15:55:44
*/
public abstract void clean();
/**
* handle 开始日志
*
* @author lingting 2020-06-21 18:34:15
*/
public void startLog() {
log.debug("任务开始执行, 类名 {} ,{}", this.getClass().getSimpleName(), ProcessorContextUtil.toLogString(context));
}
/**
* handle 结束日志
*
* @param time 执行时长 单位 毫秒
* @author lingting 2020-06-21 16:39:16
*/
public void endLog(long time) {
log.debug("任务执行时长: {}, 类名 {}, {} ", time, this.getClass().getSimpleName(), ProcessorContextUtil.toLogString(context));
}
/**
* 异常日志
*
* @author lingting 2020-06-22 19:50:16
*/
public void errLog(Throwable e) {
log.error("punctuator 操作数据出错 类名 " + this.getClass().getSimpleName() + ", " + ProcessorContextUtil.toLogString(context), e);
}
@Override
public void punctuate(long timestamp) {
try {
if (isHandle()) {
TimeInterval interval = new TimeInterval();
startLog();
handle(timestamp);
endLog(interval.intervalMs());
// 清除数据
clean();
context.commit();
}
} catch (Exception e) {
errLog(e);
}
}
/**
* 处理聚合的数据
*
* @param timestamp 时间戳
* @author lingting 2020-06-17 14:06:25
*/
public abstract void handle(long timestamp);
}

View File

@@ -0,0 +1,109 @@
package com.hccake.extend.kafka.stream.extend;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.hccake.extend.kafka.stream.core.AbstractPunctuator;
import com.hccake.extend.kafka.stream.store.KafkaKeyValueStore;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import java.util.ArrayList;
import java.util.List;
import java.util.function.BiFunction;
/**
* kafka 扩展类
* 自动注入 指定类型 指定名称的 store
* Value 数据的类型
* Values 存放数据的对象类型
*
* @author lingting 2020/6/19 10:21
*/
@Slf4j
public abstract class AbstractKeyValueStorePunctuator<K, V, R> extends AbstractPunctuator {
@Getter
private KafkaKeyValueStore<K, V> store;
/**
* 用来处理单个值得函数
*/
private BiFunction<K, V, R> signHandle;
@Override
@Deprecated
public AbstractKeyValueStorePunctuator<K, V, R> init(ProcessorContext context) {
throw new RuntimeException("继承自 com.hccake.extend.kafka.stream.extend.AbstractKeyValueStorePunctuator 的类禁止使用当前方法进行初始化!");
}
public AbstractKeyValueStorePunctuator<K, V, R> init(ProcessorContext context, String storeName, BiFunction<K, V, R> signHandle) {
super.init(context);
store = getStore(storeName);
this.signHandle = signHandle;
return this;
}
/**
* 获取单批量处理数量
*
* @author lingting 2020-06-22 15:37:10
*/
public long getHandleSize() {
return 1000;
}
@Override
public void handle(long timestamp) {
KeyValueIterator<K, V> iterator = store.all();
List<R> list = new ArrayList<>();
while (iterator.hasNext()) {
if (list.size() == getHandleSize()) {
runHandle(timestamp, list);
list.clear();
}
KeyValue<K, V> kv = iterator.next();
list.add(signHandle.apply(kv.key, kv.value));
store.delete(kv.key);
}
runHandle(timestamp, list);
}
/**
* 执行数据处理方法
*
* @param list 数据
* @param timestamp 时间戳
* @author lingting 2020-06-22 19:51:56
*/
public void runHandle(long timestamp, List<R> list) {
try {
if (list.size() > 0) {
log.debug("任务执行中,类名 {}, 操作数据量: {}", this.getClass().getSimpleName(), list.size());
handle(timestamp, list);
}
} catch (Exception e) {
errLog(e);
try {
log.error("时间戳: {}, 类名: {}, 异常数据: {}", timestamp, this.getClass().getName(), new ObjectMapper().writeValueAsString(list));
} catch (Exception exception) {
log.error("记录异常数据出错! 时间戳: {}, 类名: {}", timestamp, this.getClass().getName());
log.error("数据转换异常! ", exception);
}
}
}
/**
* 批量处理数据
*
* @param timestamp 时间戳
* @param list 当前批数据
* @author lingting 2020-06-22 15:35:38
*/
public abstract void handle(long timestamp, List<R> list);
@Override
public void clean() {
}
}

View File

@@ -0,0 +1,13 @@
package com.hccake.extend.kafka.stream.extend;
import lombok.extern.slf4j.Slf4j;
/**
* String String 类型的 store
*
* @author lingting 2020/6/19 10:21
*/
@Slf4j
public abstract class AbstractStringStringStorePunctuator<R> extends AbstractKeyValueStorePunctuator<String, String, R> {
}

View File

@@ -0,0 +1,118 @@
package com.hccake.extend.kafka.stream.store;
import cn.hutool.core.util.IdUtil;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Supplier;
/**
* 使用 {@link KeyValueStore} 为具体实现的kafka数据缓存方法
*
* @author lingting 2020/6/22 10:24
*/
public class KafkaKeyValueStore<K, V> implements KafkaWindow<V, KeyValueStore<K, V>> {
private KeyValueStore<K, V> store;
private Supplier<K> supplier;
private KafkaKeyValueStore() {
}
@SuppressWarnings("unchecked")
public static <K, V> KafkaKeyValueStore<K, V> init(KeyValueStore<K, V> store) {
return init(store, () -> (K) IdUtil.simpleUUID());
}
/**
* @param supplier 生成key的方式
* @author lingting 2020-06-22 10:43:34
*/
public static <K, V> KafkaKeyValueStore<K, V> init(KeyValueStore<K, V> store, Supplier<K> supplier) {
KafkaKeyValueStore<K, V> keyValueStore = new KafkaKeyValueStore<>();
keyValueStore.store = store;
keyValueStore.supplier = supplier;
return keyValueStore;
}
public KeyValueIterator<K, V> all() {
return store.all();
}
public void forEachRemaining(Consumer<? super KeyValue<K, V>> action) {
all().forEachRemaining(action);
}
public List<K> keys() {
List<K> list = new ArrayList<>();
all().forEachRemaining(kv -> {
list.add(kv.key);
});
return list;
}
public List<V> values() {
List<V> list = new ArrayList<>();
all().forEachRemaining(kv -> {
list.add(kv.value);
});
return list;
}
/**
* 获取插入数据的key
*
* @return 生成的key
* @author lingting 2020-06-22 10:14:15
*/
public K getKey() {
return supplier.get();
}
public void put(V v) {
pushValue(v, store);
}
public void put(K k, V v) {
if (check(v)) {
forkPush(k, v, store);
}
}
public void put(KeyValue<K, V> kv) {
put(kv.key, kv.value);
}
public void putAll(List<KeyValue<K, V>> kvList) {
kvList.forEach(kv -> put(kv.key, kv.value));
}
public void putAll(Collection<V> vs) {
vs.forEach(this::put);
}
/**
* 直接插入数据
*
* @param v 值
* @param kvKeyValueStore 目标
* @author lingting 2020-06-22 10:36:53
*/
@Override
public void forkPush(V v, KeyValueStore<K, V> kvKeyValueStore) {
forkPush(getKey(), v, kvKeyValueStore);
}
public void forkPush(K k, V v, KeyValueStore<K, V> kvKeyValueStore) {
kvKeyValueStore.put(k, v);
}
public V delete(K key) {
return store.delete(key);
}
}

View File

@@ -0,0 +1,74 @@
package com.hccake.extend.kafka.stream.store;
import cn.hutool.core.util.StrUtil;
/**
* kafka 数据缓存类的接口
* @author lingting 2020/6/22 10:32
*/
public interface KafkaWindow<Value, Values> {
/**
* 数据通过校验才插入
*
* @param values 目标
* @param value 值
* @author lingting 2020-06-19 10:27:30
*/
default void pushValue(Value value, Values values) {
if (check(value)) {
forkPush(value, values);
}
}
/**
* 插入多个数据
*
* @param values 插入目标
* @param iterable 需要插入的多个值
* @author lingting 2020-06-19 11:05:00
*/
default void pushAll(Iterable<Value> iterable, Values values) {
for (Value v : iterable) {
pushValue(v, values);
}
}
/**
* 直接插入数据
*
* @param value 数据
* @param values 存放所有数据的对象
* @author lingting 2020-06-19 10:25:24
*/
void forkPush(Value value, Values values);
/**
* 校验 value 是否可以插入
*
* @param value 数据
* @return boolean true 可以插入
* @author lingting 2020-06-19 10:27:17
*/
default boolean check(Value value) {
if (!isInsertNull()) {
// 不能插入空值,进行校验
if (value instanceof String && StrUtil.isEmpty((String) value)) {
// 空值, 结束方法
return false;
}
return value != null;
}
return true;
}
/**
* 是否可以插入空值
*
* @return true 可以插入空值
* @author lingting 2020-06-19 10:28:52
*/
default boolean isInsertNull() {
return false;
}
}

View File

@@ -0,0 +1,23 @@
package com.hccake.extend.kafka.stream.util;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
/**
* kafka 上下文工具类
*
* @author lingting 2020/6/23 18:00
*/
public class ProcessorContextUtil {
public static String toLogString(ProcessorContext context) {
String res = " 节点属性: application-id: " + context.applicationId();
if (context instanceof ProcessorContextImpl) {
res += " currentNode.name: " + ((ProcessorContextImpl) context).currentNode().name();
}
res += " topic: " + context.topic() +
" offset: " + context.offset() +
" taskId: " + context.taskId();
return res;
}
}

View File

@@ -15,6 +15,7 @@
<module>ballcat-extend-mybatis-plus</module>
<module>ballcat-extend-mybatis-plus-mysql</module>
<module>ballcat-extend-dingtalk</module>
<module>ballcat-extend-kafka</module>
</modules>
<module>ballcat-extend-kafka</module>
<module>ballcat-extend-kafka-stream</module>
</modules>
</project>