🎨 引入spring-java-format插件,代码格式管理

This commit is contained in:
b2baccline
2020-06-25 00:19:21 +08:00
parent 2b01fb6f5b
commit 89add5d79a
329 changed files with 6464 additions and 6260 deletions

View File

@@ -26,13 +26,17 @@ import java.util.regex.Pattern;
/**
* kafka Stream 流构建方法
*
* @author lingting 2020/6/23 19:31
* @author lingting 2020/6/23 19:31
*/
public class KafkaStreamBuilder {
private static final String BOOTSTRAP_SERVERS_DELIMITER = ",";
@Getter
private Topology topology = new Topology();
private final Properties properties = new Properties();
private final Set<String> bootstrapServers = new HashSet<>();
public KafkaStreamBuilder keySerde(Class<? extends Serde<?>> c) {
@@ -52,9 +56,9 @@ public class KafkaStreamBuilder {
}
/**
* 添加 kafka 路径 host:port
* 添加 kafka 路径 host:port
*
* @author lingting 2020-06-19 16:30:03
* @author lingting 2020-06-19 16:30:03
*/
public KafkaStreamBuilder addBootstrapServers(String uri) {
bootstrapServers.add(uri);
@@ -69,7 +73,7 @@ public class KafkaStreamBuilder {
/**
* 添加配置
*
* @author lingting 2020-06-19 16:30:50
* @author lingting 2020-06-19 16:30:50
*/
public KafkaStreamBuilder put(Object key, Object val) {
properties.put(key, val);
@@ -79,7 +83,7 @@ public class KafkaStreamBuilder {
/**
* 添加配置
*
* @author lingting 2020-06-19 16:30:50
* @author lingting 2020-06-19 16:30:50
*/
public KafkaStreamBuilder putAll(Properties properties) {
this.properties.putAll(properties);
@@ -101,62 +105,76 @@ public class KafkaStreamBuilder {
return this;
}
public synchronized KafkaStreamBuilder addSource(Topology.AutoOffsetReset offsetReset, String name, String... topics) {
public synchronized KafkaStreamBuilder addSource(Topology.AutoOffsetReset offsetReset, String name,
String... topics) {
topology.addSource(offsetReset, name, topics);
return this;
}
public synchronized KafkaStreamBuilder addSource(Topology.AutoOffsetReset offsetReset, String name, Pattern topicPattern) {
public synchronized KafkaStreamBuilder addSource(Topology.AutoOffsetReset offsetReset, String name,
Pattern topicPattern) {
topology.addSource(offsetReset, name, topicPattern);
return this;
}
public synchronized KafkaStreamBuilder addSource(TimestampExtractor timestampExtractor, String name, String... topics) {
public synchronized KafkaStreamBuilder addSource(TimestampExtractor timestampExtractor, String name,
String... topics) {
topology.addSource(timestampExtractor, name, topics);
return this;
}
public synchronized KafkaStreamBuilder addSource(TimestampExtractor timestampExtractor, String name, Pattern topicPattern) {
public synchronized KafkaStreamBuilder addSource(TimestampExtractor timestampExtractor, String name,
Pattern topicPattern) {
topology.addSource(timestampExtractor, name, topicPattern);
return this;
}
public synchronized KafkaStreamBuilder addSource(Topology.AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, String name, String... topics) {
public synchronized KafkaStreamBuilder addSource(Topology.AutoOffsetReset offsetReset,
TimestampExtractor timestampExtractor, String name, String... topics) {
topology.addSource(offsetReset, timestampExtractor, name, topics);
return this;
}
public synchronized KafkaStreamBuilder addSource(Topology.AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, String name, Pattern topicPattern) {
public synchronized KafkaStreamBuilder addSource(Topology.AutoOffsetReset offsetReset,
TimestampExtractor timestampExtractor, String name, Pattern topicPattern) {
topology.addSource(offsetReset, timestampExtractor, name, topicPattern);
return this;
}
public synchronized KafkaStreamBuilder addSource(String name, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, String... topics) {
public synchronized KafkaStreamBuilder addSource(String name, Deserializer<?> keyDeserializer,
Deserializer<?> valueDeserializer, String... topics) {
topology.addSource(name, keyDeserializer, valueDeserializer, topics);
return this;
}
public synchronized KafkaStreamBuilder addSource(String name, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, Pattern topicPattern) {
public synchronized KafkaStreamBuilder addSource(String name, Deserializer<?> keyDeserializer,
Deserializer<?> valueDeserializer, Pattern topicPattern) {
topology.addSource(name, keyDeserializer, valueDeserializer, topicPattern);
return this;
}
public synchronized KafkaStreamBuilder addSource(Topology.AutoOffsetReset offsetReset, String name, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, String... topics) {
public synchronized KafkaStreamBuilder addSource(Topology.AutoOffsetReset offsetReset, String name,
Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, String... topics) {
topology.addSource(offsetReset, name, keyDeserializer, valueDeserializer, topics);
return this;
}
public synchronized KafkaStreamBuilder addSource(Topology.AutoOffsetReset offsetReset, String name, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, Pattern topicPattern) {
public synchronized KafkaStreamBuilder addSource(Topology.AutoOffsetReset offsetReset, String name,
Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, Pattern topicPattern) {
topology.addSource(offsetReset, name, keyDeserializer, valueDeserializer, topicPattern);
return this;
}
public synchronized KafkaStreamBuilder addSource(Topology.AutoOffsetReset offsetReset, String name, TimestampExtractor timestampExtractor, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, String... topics) {
public synchronized KafkaStreamBuilder addSource(Topology.AutoOffsetReset offsetReset, String name,
TimestampExtractor timestampExtractor, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer,
String... topics) {
topology.addSource(offsetReset, name, timestampExtractor, keyDeserializer, valueDeserializer, topics);
return this;
}
public synchronized KafkaStreamBuilder addSource(Topology.AutoOffsetReset offsetReset, String name, TimestampExtractor timestampExtractor, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, Pattern topicPattern) {
public synchronized KafkaStreamBuilder addSource(Topology.AutoOffsetReset offsetReset, String name,
TimestampExtractor timestampExtractor, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer,
Pattern topicPattern) {
topology.addSource(offsetReset, name, timestampExtractor, keyDeserializer, valueDeserializer, topicPattern);
return this;
}
@@ -166,42 +184,51 @@ public class KafkaStreamBuilder {
return this;
}
public synchronized <K, V> KafkaStreamBuilder addSink(String name, String topic, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames) {
public synchronized <K, V> KafkaStreamBuilder addSink(String name, String topic,
StreamPartitioner<? super K, ? super V> partitioner, String... parentNames) {
topology.addSink(name, topic, partitioner, parentNames);
return this;
}
public synchronized <K, V> KafkaStreamBuilder addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valueSerializer, String... parentNames) {
public synchronized <K, V> KafkaStreamBuilder addSink(String name, String topic, Serializer<K> keySerializer,
Serializer<V> valueSerializer, String... parentNames) {
topology.addSink(name, topic, keySerializer, valueSerializer, parentNames);
return this;
}
public synchronized <K, V> KafkaStreamBuilder addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valueSerializer, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames) {
public synchronized <K, V> KafkaStreamBuilder addSink(String name, String topic, Serializer<K> keySerializer,
Serializer<V> valueSerializer, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames) {
topology.addSink(name, topic, keySerializer, valueSerializer, partitioner, parentNames);
return this;
}
public synchronized <K, V> KafkaStreamBuilder addSink(String name, TopicNameExtractor<K, V> topicExtractor, String... parentNames) {
public synchronized <K, V> KafkaStreamBuilder addSink(String name, TopicNameExtractor<K, V> topicExtractor,
String... parentNames) {
topology.addSink(name, topicExtractor, parentNames);
return this;
}
public synchronized <K, V> KafkaStreamBuilder addSink(String name, TopicNameExtractor<K, V> topicExtractor, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames) {
public synchronized <K, V> KafkaStreamBuilder addSink(String name, TopicNameExtractor<K, V> topicExtractor,
StreamPartitioner<? super K, ? super V> partitioner, String... parentNames) {
topology.addSink(name, topicExtractor, partitioner, parentNames);
return this;
}
public synchronized <K, V> KafkaStreamBuilder addSink(String name, TopicNameExtractor<K, V> topicExtractor, Serializer<K> keySerializer, Serializer<V> valueSerializer, String... parentNames) {
public synchronized <K, V> KafkaStreamBuilder addSink(String name, TopicNameExtractor<K, V> topicExtractor,
Serializer<K> keySerializer, Serializer<V> valueSerializer, String... parentNames) {
topology.addSink(name, topicExtractor, keySerializer, valueSerializer, parentNames);
return this;
}
public synchronized <K, V> KafkaStreamBuilder addSink(String name, TopicNameExtractor<K, V> topicExtractor, Serializer<K> keySerializer, Serializer<V> valueSerializer, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames) {
public synchronized <K, V> KafkaStreamBuilder addSink(String name, TopicNameExtractor<K, V> topicExtractor,
Serializer<K> keySerializer, Serializer<V> valueSerializer,
StreamPartitioner<? super K, ? super V> partitioner, String... parentNames) {
topology.addSink(name, topicExtractor, keySerializer, valueSerializer, partitioner, parentNames);
return this;
}
public synchronized KafkaStreamBuilder addProcessor(String name, ProcessorSupplier<?, ?> supplier, String... parentNames) {
public synchronized KafkaStreamBuilder addProcessor(String name, ProcessorSupplier<?, ?> supplier,
String... parentNames) {
topology.addProcessor(name, supplier, parentNames);
return this;
}
@@ -211,17 +238,24 @@ public class KafkaStreamBuilder {
return this;
}
public synchronized KafkaStreamBuilder addGlobalStore(StoreBuilder<?> storeBuilder, String sourceName, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, String topic, String processorName, ProcessorSupplier<?, ?> stateUpdateSupplier) {
topology.addGlobalStore(storeBuilder, sourceName, keyDeserializer, valueDeserializer, topic, processorName, stateUpdateSupplier);
public synchronized KafkaStreamBuilder addGlobalStore(StoreBuilder<?> storeBuilder, String sourceName,
Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, String topic, String processorName,
ProcessorSupplier<?, ?> stateUpdateSupplier) {
topology.addGlobalStore(storeBuilder, sourceName, keyDeserializer, valueDeserializer, topic, processorName,
stateUpdateSupplier);
return this;
}
public synchronized KafkaStreamBuilder addGlobalStore(StoreBuilder<?> storeBuilder, String sourceName, TimestampExtractor timestampExtractor, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer, String topic, String processorName, ProcessorSupplier<?, ?> stateUpdateSupplier) {
topology.addGlobalStore(storeBuilder, sourceName, timestampExtractor, keyDeserializer, valueDeserializer, topic, processorName, stateUpdateSupplier);
public synchronized KafkaStreamBuilder addGlobalStore(StoreBuilder<?> storeBuilder, String sourceName,
TimestampExtractor timestampExtractor, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer,
String topic, String processorName, ProcessorSupplier<?, ?> stateUpdateSupplier) {
topology.addGlobalStore(storeBuilder, sourceName, timestampExtractor, keyDeserializer, valueDeserializer, topic,
processorName, stateUpdateSupplier);
return this;
}
public synchronized KafkaStreamBuilder connectProcessorAndStateStores(String processorName, String... stateStoreNames) {
public synchronized KafkaStreamBuilder connectProcessorAndStateStores(String processorName,
String... stateStoreNames) {
topology.connectProcessorAndStateStores(processorName, stateStoreNames);
return this;
}
@@ -229,7 +263,7 @@ public class KafkaStreamBuilder {
/**
* 自定义的构筑方法, 传入 topology 和属性
*
* @author lingting 2020-06-23 20:24:52
* @author lingting 2020-06-23 20:24:52
*/
public KafkaStreams build(BiFunction<Topology, Properties, KafkaStreams> biFunction) {
return biFunction.apply(topology, getProperties());
@@ -254,8 +288,12 @@ public class KafkaStreamBuilder {
}
public Properties getProperties() {
bootstrapServers.addAll(ListUtil.toList(properties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, StrUtil.EMPTY).split(BOOTSTRAP_SERVERS_DELIMITER)));
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, String.join(BOOTSTRAP_SERVERS_DELIMITER, bootstrapServers));
bootstrapServers
.addAll(ListUtil.toList(properties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, StrUtil.EMPTY)
.split(BOOTSTRAP_SERVERS_DELIMITER)));
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
String.join(BOOTSTRAP_SERVERS_DELIMITER, bootstrapServers));
return properties;
}
}

View File

@@ -7,20 +7,19 @@ import org.apache.kafka.streams.state.KeyValueStore;
/**
* 所有 kafka 流处理执行相关类的顶级父类
*
* @author lingting 2020/6/22 11:02
* @author lingting 2020/6/22 11:02
*/
public abstract class AbstractKafka {
/**
* 获取上下文
*
* @return content
* @author lingting 2020-06-22 11:03:23
* @author lingting 2020-06-22 11:03:23
*/
public abstract ProcessorContext getContext();
/**
* 获取 KeyValueStore
*
* @return java.lang.String
* @author lingting 2020-06-22 09:57:37
*/

View File

@@ -15,10 +15,11 @@ import java.time.Duration;
/**
* kafka 顶级 processor 类
*
* @author lingting 2020/6/16 22:27
* @author lingting 2020/6/16 22:27
*/
@Slf4j
public abstract class AbstractProcessor<K, V> extends AbstractKafka implements Processor<K, V> {
@Getter
private ProcessorContext context;
@@ -29,10 +30,9 @@ public abstract class AbstractProcessor<K, V> extends AbstractKafka implements P
}
/**
* 用于初始化窗口的方法
* 子类如果需要 自己重写
* 用于初始化窗口的方法 子类如果需要 自己重写
*
* @author lingting 2020-06-17 10:44:39
* @author lingting 2020-06-17 10:44:39
*/
public void initSchedule(ProcessorContext context) {
@@ -41,7 +41,7 @@ public abstract class AbstractProcessor<K, V> extends AbstractKafka implements P
/**
* 用于构筑 Punctuator
*
* @author lingting 2020-06-21 13:58:34
* @author lingting 2020-06-21 13:58:34
*/
public void schedule(Duration interval, PunctuationType type, AbstractPunctuator callback) {
context.schedule(interval, type, callback);
@@ -50,7 +50,7 @@ public abstract class AbstractProcessor<K, V> extends AbstractKafka implements P
/**
* 用于构筑 {@link PunctuationType#WALL_CLOCK_TIME} 类型的 Punctuator
*
* @author lingting 2020-06-21 13:58:53
* @author lingting 2020-06-21 13:58:53
*/
public void schedule(Duration interval, AbstractPunctuator callback) {
schedule(interval, PunctuationType.WALL_CLOCK_TIME, callback);
@@ -58,11 +58,10 @@ public abstract class AbstractProcessor<K, V> extends AbstractKafka implements P
/**
* 下发数据
*
* @param key key
* @param value value
* @param key key
* @param value value
* @param childName 目标名称
* @author lingting 2020-06-17 19:44:45
* @author lingting 2020-06-17 19:44:45
*/
public <KEY, VALUE> void forward(KEY key, VALUE value, String childName) {
context.forward(key, value, To.child(childName));
@@ -70,10 +69,9 @@ public abstract class AbstractProcessor<K, V> extends AbstractKafka implements P
/**
* 下发数据
*
* @param key key
* @param key key
* @param value value
* @param to 目标
* @param to 目标
* @author lingting 2020-06-17 19:47:55
*/
public <KEY, VALUE> void forward(KEY key, VALUE value, To to) {
@@ -94,12 +92,14 @@ public abstract class AbstractProcessor<K, V> extends AbstractKafka implements P
try {
startLog(key, value);
process(context, key, value);
} catch (Exception e) {
}
catch (Exception e) {
errLog(e);
String errStr;
try {
errStr = new ObjectMapper().writeValueAsString(value);
} catch (Exception ex) {
}
catch (Exception ex) {
log.error("数据转json出错!msg: {}", ex.getMessage());
errStr = Convert.toStr(value);
}
@@ -109,10 +109,9 @@ public abstract class AbstractProcessor<K, V> extends AbstractKafka implements P
/**
* Process the record with the given key and value.
*
* @param context 上下文
* @param key the key for the record
* @param value the value for the record
* @param key the key for the record
* @param value the value for the record
*/
public abstract void process(ProcessorContext context, K key, V value);
@@ -122,4 +121,5 @@ public abstract class AbstractProcessor<K, V> extends AbstractKafka implements P
@Override
public void close() {
}
}

View File

@@ -10,10 +10,11 @@ import org.apache.kafka.streams.processor.Punctuator;
/**
* kafka 顶级 punctuator 类
*
* @author lingting 2020/6/17 14:02
* @author lingting 2020/6/17 14:02
*/
@Slf4j
public abstract class AbstractPunctuator extends AbstractKafka implements Punctuator {
@Getter
private ProcessorContext context;
@@ -24,7 +25,6 @@ public abstract class AbstractPunctuator extends AbstractKafka implements Punctu
/**
* 是否处理数据, 如果想依据条件执行,需要自己重写当前方法
*
* @return boolean true 表示有参数,可以执行处理
* @author lingting 2020-06-17 14:05:01
*/
@@ -42,7 +42,7 @@ public abstract class AbstractPunctuator extends AbstractKafka implements Punctu
/**
* handle 开始日志
*
* @author lingting 2020-06-21 18:34:15
* @author lingting 2020-06-21 18:34:15
*/
public void startLog() {
log.debug("任务开始执行, 类名 {} ,{}", this.getClass().getSimpleName(), ProcessorContextUtil.toLogString(context));
@@ -50,21 +50,22 @@ public abstract class AbstractPunctuator extends AbstractKafka implements Punctu
/**
* handle 结束日志
*
* @param time 执行时长 单位 毫秒
* @author lingting 2020-06-21 16:39:16
* @param time 执行时长 单位 毫秒
* @author lingting 2020-06-21 16:39:16
*/
public void endLog(long time) {
log.debug("任务执行时长: {}, 类名 {}, {} ", time, this.getClass().getSimpleName(), ProcessorContextUtil.toLogString(context));
log.debug("任务执行时长: {}, 类名 {}, {} ", time, this.getClass().getSimpleName(),
ProcessorContextUtil.toLogString(context));
}
/**
* 异常日志
*
* @author lingting 2020-06-22 19:50:16
* @author lingting 2020-06-22 19:50:16
*/
public void errLog(Throwable e) {
log.error("punctuator 操作数据出错 类名 " + this.getClass().getSimpleName() + ", " + ProcessorContextUtil.toLogString(context), e);
log.error("punctuator 操作数据出错 类名 " + this.getClass().getSimpleName() + ", "
+ ProcessorContextUtil.toLogString(context), e);
}
@Override
@@ -79,16 +80,17 @@ public abstract class AbstractPunctuator extends AbstractKafka implements Punctu
clean();
context.commit();
}
} catch (Exception e) {
}
catch (Exception e) {
errLog(e);
}
}
/**
* 处理聚合的数据
*
* @param timestamp 时间戳
* @author lingting 2020-06-17 14:06:25
*/
public abstract void handle(long timestamp);
}

View File

@@ -14,17 +14,16 @@ import java.util.List;
import java.util.function.BiFunction;
/**
* kafka 扩展类
* 自动注入 指定类型 指定名称的 store
* Value 数据的类型
* Values 存放数据的对象类型
* kafka 扩展类 自动注入 指定类型 指定名称的 store Value 数据的类型 Values 存放数据的对象类型
*
* @author lingting 2020/6/19 10:21
* @author lingting 2020/6/19 10:21
*/
@Slf4j
public abstract class AbstractKeyValueStorePunctuator<K, V, R> extends AbstractPunctuator {
@Getter
private KafkaKeyValueStore<K, V> store;
/**
* 用来处理单个值得函数
*/
@@ -33,10 +32,12 @@ public abstract class AbstractKeyValueStorePunctuator<K, V, R> extends AbstractP
@Override
@Deprecated
public AbstractKeyValueStorePunctuator<K, V, R> init(ProcessorContext context) {
throw new RuntimeException("继承自 com.hccake.extend.kafka.stream.extend.AbstractKeyValueStorePunctuator 的类禁止使用当前方法进行初始化!");
throw new RuntimeException(
"继承自 com.hccake.extend.kafka.stream.extend.AbstractKeyValueStorePunctuator 的类禁止使用当前方法进行初始化!");
}
public AbstractKeyValueStorePunctuator<K, V, R> init(ProcessorContext context, String storeName, BiFunction<K, V, R> signHandle) {
public AbstractKeyValueStorePunctuator<K, V, R> init(ProcessorContext context, String storeName,
BiFunction<K, V, R> signHandle) {
super.init(context);
store = getStore(storeName);
this.signHandle = signHandle;
@@ -46,7 +47,7 @@ public abstract class AbstractKeyValueStorePunctuator<K, V, R> extends AbstractP
/**
* 获取单批量处理数量
*
* @author lingting 2020-06-22 15:37:10
* @author lingting 2020-06-22 15:37:10
*/
public long getHandleSize() {
return 1000;
@@ -71,10 +72,9 @@ public abstract class AbstractKeyValueStorePunctuator<K, V, R> extends AbstractP
/**
* 执行数据处理方法
*
* @param list 数据
* @param list 数据
* @param timestamp 时间戳
* @author lingting 2020-06-22 19:51:56
* @author lingting 2020-06-22 19:51:56
*/
public void runHandle(long timestamp, List<R> list) {
try {
@@ -82,11 +82,14 @@ public abstract class AbstractKeyValueStorePunctuator<K, V, R> extends AbstractP
log.debug("任务执行中,类名 {}, 操作数据量: {}", this.getClass().getSimpleName(), list.size());
handle(timestamp, list);
}
} catch (Exception e) {
}
catch (Exception e) {
errLog(e);
try {
log.error("时间戳: {}, 类名: {}, 异常数据: {}", timestamp, this.getClass().getName(), new ObjectMapper().writeValueAsString(list));
} catch (Exception exception) {
log.error("时间戳: {}, 类名: {}, 异常数据: {}", timestamp, this.getClass().getName(),
new ObjectMapper().writeValueAsString(list));
}
catch (Exception exception) {
log.error("记录异常数据出错! 时间戳: {}, 类名: {}", timestamp, this.getClass().getName());
log.error("数据转换异常! ", exception);
}
@@ -95,9 +98,8 @@ public abstract class AbstractKeyValueStorePunctuator<K, V, R> extends AbstractP
/**
* 批量处理数据
*
* @param timestamp 时间戳
* @param list 当前批数据
* @param list 当前批数据
* @author lingting 2020-06-22 15:35:38
*/
public abstract void handle(long timestamp, List<R> list);
@@ -106,4 +108,5 @@ public abstract class AbstractKeyValueStorePunctuator<K, V, R> extends AbstractP
public void clean() {
}
}

View File

@@ -5,9 +5,10 @@ import lombok.extern.slf4j.Slf4j;
/**
* String String 类型的 store
*
* @author lingting 2020/6/19 10:21
* @author lingting 2020/6/19 10:21
*/
@Slf4j
public abstract class AbstractStringStringStorePunctuator<R> extends AbstractKeyValueStorePunctuator<String, String, R> {
public abstract class AbstractStringStringStorePunctuator<R>
extends AbstractKeyValueStorePunctuator<String, String, R> {
}

View File

@@ -14,10 +14,12 @@ import java.util.function.Supplier;
/**
* 使用 {@link KeyValueStore} 为具体实现的kafka数据缓存方法
*
* @author lingting 2020/6/22 10:24
* @author lingting 2020/6/22 10:24
*/
public class KafkaKeyValueStore<K, V> implements KafkaWindow<V, KeyValueStore<K, V>> {
private KeyValueStore<K, V> store;
private Supplier<K> supplier;
private KafkaKeyValueStore() {
@@ -30,7 +32,7 @@ public class KafkaKeyValueStore<K, V> implements KafkaWindow<V, KeyValueStore<K,
/**
* @param supplier 生成key的方式
* @author lingting 2020-06-22 10:43:34
* @author lingting 2020-06-22 10:43:34
*/
public static <K, V> KafkaKeyValueStore<K, V> init(KeyValueStore<K, V> store, Supplier<K> supplier) {
KafkaKeyValueStore<K, V> keyValueStore = new KafkaKeyValueStore<>();
@@ -63,12 +65,10 @@ public class KafkaKeyValueStore<K, V> implements KafkaWindow<V, KeyValueStore<K,
return list;
}
/**
* 获取插入数据的key
*
* @return 生成的key
* @author lingting 2020-06-22 10:14:15
* @author lingting 2020-06-22 10:14:15
*/
public K getKey() {
return supplier.get();
@@ -98,8 +98,7 @@ public class KafkaKeyValueStore<K, V> implements KafkaWindow<V, KeyValueStore<K,
/**
* 直接插入数据
*
* @param v 值
* @param v 值
* @param kvKeyValueStore 目标
* @author lingting 2020-06-22 10:36:53
*/
@@ -115,4 +114,5 @@ public class KafkaKeyValueStore<K, V> implements KafkaWindow<V, KeyValueStore<K,
public V delete(K key) {
return store.delete(key);
}
}

View File

@@ -4,71 +4,68 @@ import cn.hutool.core.util.StrUtil;
/**
* kafka 数据缓存类的接口
* @author lingting 2020/6/22 10:32
*
* @author lingting 2020/6/22 10:32
*/
public interface KafkaWindow<Value, Values> {
/**
* 数据通过校验才插入
*
* @param values 目标
* @param value 值
* @author lingting 2020-06-19 10:27:30
*/
default void pushValue(Value value, Values values) {
if (check(value)) {
forkPush(value, values);
}
}
/**
* 数据通过校验才插入
* @param values 目标
* @param value 值
* @author lingting 2020-06-19 10:27:30
*/
default void pushValue(Value value, Values values) {
if (check(value)) {
forkPush(value, values);
}
}
/**
* 插入多个数据
*
* @param values 插入目标
* @param iterable 需要插入的多个值
* @author lingting 2020-06-19 11:05:00
*/
default void pushAll(Iterable<Value> iterable, Values values) {
for (Value v : iterable) {
pushValue(v, values);
}
}
/**
* 插入多个数据
* @param values 插入目标
* @param iterable 需要插入的多个值
* @author lingting 2020-06-19 11:05:00
*/
default void pushAll(Iterable<Value> iterable, Values values) {
for (Value v : iterable) {
pushValue(v, values);
}
}
/**
* 直接插入数据
*
* @param value 数据
* @param values 存放所有数据的对象
* @author lingting 2020-06-19 10:25:24
*/
void forkPush(Value value, Values values);
/**
* 直接插入数据
* @param value 数据
* @param values 存放所有数据的对象
* @author lingting 2020-06-19 10:25:24
*/
void forkPush(Value value, Values values);
/**
* 校验 value 是否可以插入
*
* @param value 数据
* @return boolean true 可以插入
* @author lingting 2020-06-19 10:27:17
*/
default boolean check(Value value) {
if (!isInsertNull()) {
// 不能插入空值,进行校验
if (value instanceof String && StrUtil.isEmpty((String) value)) {
// 空值, 结束方法
return false;
}
return value != null;
}
return true;
}
/**
* 校验 value 是否可以插入
* @param value 数据
* @return boolean true 可以插入
* @author lingting 2020-06-19 10:27:17
*/
default boolean check(Value value) {
if (!isInsertNull()) {
// 不能插入空值,进行校验
if (value instanceof String && StrUtil.isEmpty((String) value)) {
// 空值, 结束方法
return false;
}
return value != null;
}
return true;
}
/**
* 是否可以插入空值
* @return true 可以插入空值
* @author lingting 2020-06-19 10:28:52
*/
default boolean isInsertNull() {
return false;
}
/**
* 是否可以插入空值
*
* @return true 可以插入空值
* @author lingting 2020-06-19 10:28:52
*/
default boolean isInsertNull() {
return false;
}
}

View File

@@ -6,7 +6,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
/**
* kafka 上下文工具类
*
* @author lingting 2020/6/23 18:00
* @author lingting 2020/6/23 18:00
*/
public class ProcessorContextUtil {
@@ -15,9 +15,8 @@ public class ProcessorContextUtil {
if (context instanceof ProcessorContextImpl) {
res += " currentNode.name: " + ((ProcessorContextImpl) context).currentNode().name();
}
res += " topic: " + context.topic() +
" offset: " + context.offset() +
" taskId: " + context.taskId();
res += " topic: " + context.topic() + " offset: " + context.offset() + " taskId: " + context.taskId();
return res;
}
}