flow stat supports resource chain

This commit is contained in:
Francis Dong
2021-01-18 10:17:59 +08:00
parent eae626d42f
commit d3fd8efa8d
5 changed files with 484 additions and 11 deletions

View File

@@ -26,9 +26,13 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import we.stats.IncrRequestResult.BlockType;
import we.util.Utils;
/**
@@ -56,6 +60,9 @@ public class FlowStat {
*/
public static long RETENTION_TIME_IN_MINUTES = 10;
private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private Lock w = rwl.writeLock();
private ExecutorService pool = Executors.newFixedThreadPool(2);
public FlowStat() {
@@ -95,6 +102,79 @@ public class FlowStat {
return (System.currentTimeMillis() / INTERVAL) * INTERVAL;
}
/**
* Increase concurrent request counter for given resources chain
*
* @param resourceConfigs Resource configurations
* @param curTimeSlotId current time slot ID, it should be generated by
* Flowstat.currentTimeSlotId()
* @return IncrRequestResult
*/
public IncrRequestResult incrRequest(List<ResourceConfig> resourceConfigs, long curTimeSlotId) {
if (resourceConfigs == null || resourceConfigs.size() == 0) {
return null;
}
w.lock();
try {
// check if exceed limit
for (ResourceConfig resourceConfig : resourceConfigs) {
long maxCon = resourceConfig.getMaxCon();
long maxQPS = resourceConfig.getMaxQPS();
if (maxCon > 0 || maxQPS > 0) {
ResourceStat resourceStat = getResourceStat(resourceConfig.getResourceId());
// check concurrent request
if (maxCon > 0) {
long n = resourceStat.getConcurrentRequests().get();
if (n >= maxCon) {
resourceStat.incrBlockRequestToTimeSlot(curTimeSlotId);
return IncrRequestResult.block(resourceConfig.getResourceId(),
BlockType.CONCURRENT_REQUEST);
}
}
// check QPS
if (maxQPS > 0) {
long total = resourceStat.getTimeSlot(curTimeSlotId).getCounter().get();
if (total >= maxQPS) {
resourceStat.incrBlockRequestToTimeSlot(curTimeSlotId);
return IncrRequestResult.block(resourceConfig.getResourceId(), BlockType.QPS);
}
}
}
}
// increase request and concurrent request
for (ResourceConfig resourceConfig : resourceConfigs) {
ResourceStat resourceStat = getResourceStat(resourceConfig.getResourceId());
long cons = resourceStat.getConcurrentRequests().incrementAndGet();
resourceStat.getTimeSlot(curTimeSlotId).updatePeakConcurrentReqeusts(cons);
resourceStat.getTimeSlot(curTimeSlotId).incr();
}
return IncrRequestResult.success();
} finally {
w.unlock();
}
}
/**
* Add request RT and Decrease concurrent request for given resources chain
*
* @param resourceConfigs
* @param timeSlotId
* @param rt
* @param isSuccess
*/
public void addRequestRT(List<ResourceConfig> resourceConfigs, long timeSlotId, long rt, boolean isSuccess) {
if (resourceConfigs == null || resourceConfigs.size() == 0) {
return;
}
for (int i = resourceConfigs.size() - 1; i >= 0; i--) {
ResourceStat resourceStat = getResourceStat(resourceConfigs.get(i).getResourceId());
resourceStat.decrConcurrentRequest(timeSlotId);
resourceStat.addRequestRT(timeSlotId, rt, isSuccess);
}
}
/**
* Increase concurrent request counter of the specified resource
*
@@ -115,7 +195,8 @@ public class FlowStat {
success = resourceStat.incrRequestToTimeSlot(curTimeSlotId, maxRPS);
}
if (log.isDebugEnabled()) {
log.debug(resourceId + " incr req for current time slot " + curTimeSlotId + " with max con " + maxCon + " and max rps " + maxRPS);
log.debug(resourceId + " incr req for current time slot " + curTimeSlotId + " with max con " + maxCon
+ " and max rps " + maxRPS);
}
return success;
}
@@ -341,20 +422,21 @@ public class FlowStat {
long n = FlowStat.RETENTION_TIME_IN_MINUTES * 60 * 1000 / FlowStat.INTERVAL * FlowStat.INTERVAL;
long lastSlotId = stat.currentTimeSlotId() - n;
while (true) {
//log.debug("housekeeping start");
// log.debug("housekeeping start");
long slotId = stat.currentTimeSlotId() - n;
for (long i = lastSlotId; i < slotId;) {
Set<Map.Entry<String, ResourceStat>> entrys = stat.resourceStats.entrySet();
for (Entry<String, ResourceStat> entry : entrys) {
String resourceId = entry.getKey();
ConcurrentMap<Long, TimeSlot> timeSlots = entry.getValue().getTimeSlots();
//log.debug("housekeeping remove slot: resourceId={} slotId=={}", resourceId, i);
// log.debug("housekeeping remove slot: resourceId={} slotId=={}", resourceId,
// i);
timeSlots.remove(i);
}
i = i + FlowStat.INTERVAL;
}
lastSlotId = slotId;
//log.debug("housekeeping done");
// log.debug("housekeeping done");
try {
Thread.sleep(60 * 1000);
} catch (Exception e) {
@@ -378,15 +460,16 @@ public class FlowStat {
while (true) {
long curTimeSlotId = stat.currentTimeSlotId();
if (lastTimeSlotId == null || lastTimeSlotId.longValue() != curTimeSlotId) {
//log.debug("PeakConcurrentJob start");
// log.debug("PeakConcurrentJob start");
Set<Map.Entry<String, ResourceStat>> entrys = stat.resourceStats.entrySet();
for (Entry<String, ResourceStat> entry : entrys) {
String resourceId = entry.getKey();
//log.debug("PeakConcurrentJob: resourceId={} slotId=={}", resourceId, curTimeSlotId);
// log.debug("PeakConcurrentJob: resourceId={} slotId=={}", resourceId,
// curTimeSlotId);
entry.getValue().getTimeSlot(curTimeSlotId);
}
lastTimeSlotId = curTimeSlotId;
//log.debug("PeakConcurrentJob done");
// log.debug("PeakConcurrentJob done");
}
try {
Thread.sleep(1);

View File

@@ -0,0 +1,92 @@
/*
* Copyright (C) 2020 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.stats;
/**
*
* @author Francis Dong
*
*/
public class IncrRequestResult {
enum BlockType {
/**
* Blocked by concurrent request rule
*/
CONCURRENT_REQUEST,
/**
* Blocked by QPS
*/
QPS;
}
/**
* true if success, otherwise false
*/
private boolean success;
/**
* Resource ID that causes block
*/
private String blockedResourceId;
/**
* block type 1 for concurrent request, 2 for QPS
*/
private BlockType blockType;
public static IncrRequestResult success() {
return new IncrRequestResult(true, null, null);
}
public static IncrRequestResult block(String resourceId, BlockType blockType) {
return new IncrRequestResult(false, resourceId, blockType);
}
public IncrRequestResult(boolean success, String resourceId, BlockType blockType) {
this.success = success;
this.blockedResourceId = resourceId;
this.blockType = blockType;
}
public boolean isSuccess() {
return success;
}
public void setSuccess(boolean success) {
this.success = success;
}
public String getBlockedResourceId() {
return blockedResourceId;
}
public void setBlockedResourceId(String blockedResourceId) {
this.blockedResourceId = blockedResourceId;
}
public BlockType getBlockType() {
return blockType;
}
public void setBlockType(BlockType blockType) {
this.blockType = blockType;
}
}

View File

@@ -0,0 +1,74 @@
/*
* Copyright (C) 2020 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.stats;
/**
*
* @author Francis Dong
*
*/
public class ResourceConfig {
/**
* Resouce ID
*/
private String resourceId;
/**
* Maximum concurrent request, zero or negative for no limit
*/
private long maxCon;
/**
* Maximum QPS, zero or negative for no limit
*/
private long maxQPS;
public ResourceConfig(String resourceId, long maxCon, long maxQPS) {
this.resourceId = resourceId;
this.maxCon = maxCon;
this.maxQPS = maxQPS;
}
public ResourceConfig() {
}
public String getResourceId() {
return resourceId;
}
public void setResourceId(String resourceId) {
this.resourceId = resourceId;
}
public long getMaxCon() {
return maxCon;
}
public void setMaxCon(long maxCon) {
this.maxCon = maxCon;
}
public long getMaxQPS() {
return maxQPS;
}
public void setMaxQPS(long maxQPS) {
this.maxQPS = maxQPS;
}
}

View File

@@ -14,10 +14,8 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.stats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
package we.stats;
import java.math.BigDecimal;
import java.util.concurrent.ConcurrentHashMap;
@@ -26,6 +24,9 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* @author Francis Dong
@@ -124,7 +125,7 @@ public class ResourceStat {
* Increase block request to the specified time slot
*
*/
private void incrBlockRequestToTimeSlot(long timeSlotId) {
public void incrBlockRequestToTimeSlot(long timeSlotId) {
this.getTimeSlot(timeSlotId).getBlockRequests().incrementAndGet();
}

View File

@@ -1,15 +1,37 @@
/*
* Copyright (C) 2020 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.stats;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.validation.constraints.AssertTrue;
import org.junit.jupiter.api.Test;
import we.stats.IncrRequestResult.BlockType;
import we.util.JacksonUtils;
/**
@@ -21,6 +43,207 @@ public class FlowStatTests {
private FlowStat stat = new FlowStat();
class FlowRuleCase {
public int threads = 10;
public int requests = 10000;
public int totalReqs = threads * requests;
public List<ResourceConfig> resourceConfigs = new ArrayList<>();
public List<ResourceExpect> resourceExpects = new ArrayList<>();
public IncrRequestResult expectResult;
}
class ResourceExpect {
public long concurrents;
public long QPS;
public long total;
public long blockedReqs;
public ResourceExpect(long concurrents, long QPS, long total, long blockedReqs) {
this.concurrents = concurrents;
this.QPS = QPS;
this.total = total;
this.blockedReqs = blockedReqs;
}
public ResourceExpect() {
}
}
public List<FlowRuleCase> createFlowRuleCase() {
List<FlowRuleCase> cases = new ArrayList<>();
// blocked by service concurrent request
FlowRuleCase c1 = new FlowRuleCase();
c1.resourceConfigs.add(new ResourceConfig("_global1", 100, 200));
c1.resourceConfigs.add(new ResourceConfig("service1", 10, 200));
c1.resourceExpects.add(new ResourceExpect(10, 10, 10, 0));
c1.resourceExpects.add(new ResourceExpect(10, 10, 10, c1.totalReqs - 10));
c1.expectResult = IncrRequestResult.block("service1", BlockType.CONCURRENT_REQUEST);
cases.add(c1);
// Note: use different resource ID to avoid being affected by previous test data
FlowRuleCase c2 = new FlowRuleCase();
c2.resourceConfigs.add(new ResourceConfig("_global2", 10, 200));
c2.resourceConfigs.add(new ResourceConfig("service2", 200, 200));
c2.resourceExpects.add(new ResourceExpect(10, 10, 10, c2.totalReqs - 10));
c2.resourceExpects.add(new ResourceExpect(10, 10, 10, 0));
c2.expectResult = IncrRequestResult.block("_global2", BlockType.CONCURRENT_REQUEST);
cases.add(c2);
// Note: use different resource ID to avoid being affected by previous test data
FlowRuleCase c3 = new FlowRuleCase();
c3.resourceConfigs.add(new ResourceConfig("_global3", 200, 10));
c3.resourceConfigs.add(new ResourceConfig("service3", 200, 100));
c3.resourceExpects.add(new ResourceExpect(10, 10, 10, c3.totalReqs - 10));
c3.resourceExpects.add(new ResourceExpect(10, 10, 10, 0));
c3.expectResult = IncrRequestResult.block("_global3", BlockType.QPS);
cases.add(c3);
// Note: use different resource ID to avoid being affected by previous test data
FlowRuleCase c4 = new FlowRuleCase();
c4.resourceConfigs.add(new ResourceConfig("_global4", 200, 100));
c4.resourceConfigs.add(new ResourceConfig("service4", 200, 10));
c4.resourceExpects.add(new ResourceExpect(10, 10, 10, 0));
c4.resourceExpects.add(new ResourceExpect(10, 10, 10, c4.totalReqs - 10));
c4.expectResult = IncrRequestResult.block("service4", BlockType.QPS);
cases.add(c4);
// Note: use different resource ID to avoid being affected by previous test data
FlowRuleCase c5 = new FlowRuleCase();
c5.resourceConfigs.add(new ResourceConfig("_global5", 0, 0));
c5.resourceConfigs.add(new ResourceConfig("service5", 0, 0));
c5.resourceExpects.add(new ResourceExpect(c5.totalReqs, c5.totalReqs, c5.totalReqs, 0));
c5.resourceExpects.add(new ResourceExpect(c5.totalReqs, c5.totalReqs, c5.totalReqs, 0));
c5.expectResult = IncrRequestResult.success();
cases.add(c5);
// Note: use different resource ID to avoid being affected by previous test data
FlowRuleCase c6 = new FlowRuleCase();
c6.resourceConfigs.add(new ResourceConfig("_global6", 20, 0));
c6.resourceConfigs.add(new ResourceConfig("service6", 20, 0));
c6.resourceExpects.add(new ResourceExpect(20, 20, 20, c6.totalReqs - 20));
c6.resourceExpects.add(new ResourceExpect(20, 20, 20, 0));
c6.expectResult = IncrRequestResult.block("_global6", BlockType.CONCURRENT_REQUEST);
cases.add(c6);
// Note: use different resource ID to avoid being affected by previous test data
FlowRuleCase c7 = new FlowRuleCase();
c7.resourceConfigs.add(new ResourceConfig("_global7", 0, 0));
c7.resourceConfigs.add(new ResourceConfig("service7", 0, 20));
c7.resourceExpects.add(new ResourceExpect(20, 20, 20, 0));
c7.resourceExpects.add(new ResourceExpect(20, 20, 20, c7.totalReqs - 20));
c7.expectResult = IncrRequestResult.block("service7", BlockType.QPS);
cases.add(c7);
return cases;
}
class ConcurrentJob1 implements Runnable {
public ConcurrentJob1(int requests, long curTimeSlotId, List<ResourceConfig> resourceConfigs,
IncrRequestResult expectResult) {
this.requests = requests;
this.resourceConfigs = resourceConfigs;
this.curTimeSlotId = curTimeSlotId;
this.expectResult = expectResult;
}
private int requests = 0;
private List<ResourceConfig> resourceConfigs;
private long curTimeSlotId = 0;
private IncrRequestResult expectResult;
@Override
public void run() {
for (int i = 0; i < requests; i++) {
IncrRequestResult result = stat.incrRequest(resourceConfigs, curTimeSlotId);
if (result != null && !result.isSuccess()) {
assertEquals(expectResult.getBlockedResourceId(), result.getBlockedResourceId());
assertEquals(expectResult.getBlockType(), result.getBlockType());
}
}
}
}
@Test
public void testIncrRequestResultByResourceChain() throws Throwable {
// concurrent
FlowRuleCase c1 = new FlowRuleCase();
c1.resourceConfigs.add(new ResourceConfig("testIncrRequestResultByResourceChain_global1", 100, 200));
c1.resourceConfigs.add(new ResourceConfig("testIncrRequestResultByResourceChain_service1", 10, 200));
long startTimeSlotId = stat.currentTimeSlotId();
long endTimeSlotId = startTimeSlotId + 1000;
for (int i = 0; i < 10; i++) {
stat.incrRequest(c1.resourceConfigs, startTimeSlotId);
}
IncrRequestResult result = stat.incrRequest(c1.resourceConfigs, startTimeSlotId);
assertTrue(!result.isSuccess());
assertEquals("testIncrRequestResultByResourceChain_service1", result.getBlockedResourceId());
assertEquals(BlockType.CONCURRENT_REQUEST, result.getBlockType());
stat.addRequestRT(c1.resourceConfigs, startTimeSlotId, 1, true);
result = stat.incrRequest(c1.resourceConfigs, startTimeSlotId);
assertTrue(result.isSuccess());
// QPS
FlowRuleCase c2 = new FlowRuleCase();
c2.resourceConfigs.add(new ResourceConfig("testIncrRequestResultByResourceChain_global2", 100, 200));
c2.resourceConfigs.add(new ResourceConfig("testIncrRequestResultByResourceChain_service2", 100, 10));
for (int i = 0; i < 10; i++) {
stat.incrRequest(c2.resourceConfigs, startTimeSlotId);
}
result = stat.incrRequest(c2.resourceConfigs, startTimeSlotId);
assertTrue(!result.isSuccess());
assertEquals("testIncrRequestResultByResourceChain_service2", result.getBlockedResourceId());
assertEquals(BlockType.QPS, result.getBlockType());
stat.addRequestRT(c2.resourceConfigs, startTimeSlotId, 1, true);
result = stat.incrRequest(c2.resourceConfigs, startTimeSlotId);
assertTrue(!result.isSuccess());
assertEquals("testIncrRequestResultByResourceChain_service2", result.getBlockedResourceId());
assertEquals(BlockType.QPS, result.getBlockType());
}
@Test
public void testIncrRequestByResourceChain() throws Throwable {
// create data
List<FlowRuleCase> cases = createFlowRuleCase();
long startTimeSlotId = stat.currentTimeSlotId();
long endTimeSlotId = startTimeSlotId + 1000;
for (FlowRuleCase c : cases) {
ExecutorService pool = Executors.newFixedThreadPool(c.threads);
long t1 = System.currentTimeMillis();
for (int i = 0; i < c.threads; i++) {
pool.submit(new ConcurrentJob1(c.requests, startTimeSlotId, c.resourceConfigs, c.expectResult));
}
pool.shutdown();
if (pool.awaitTermination(5, TimeUnit.SECONDS)) {
long t2 = System.currentTimeMillis();
System.out.println("testIncrRequestByResourceChain elapsed time: " + (t2 - t1) + "ms for " + c.totalReqs
+ " requests");
for (int i = 0; i < c.resourceConfigs.size(); i++) {
ResourceConfig cfg = c.resourceConfigs.get(i);
ResourceExpect expect = c.resourceExpects.get(i);
TimeWindowStat tws = stat.getTimeWindowStat(cfg.getResourceId(), startTimeSlotId, endTimeSlotId);
assertEquals(expect.concurrents, tws.getPeakConcurrentReqeusts());
assertEquals(expect.QPS, tws.getTotal());
assertEquals(expect.total, tws.getTotal());
assertEquals(expect.blockedReqs, tws.getBlockRequests());
}
} else {
System.out.println("testIncrRequestByResourceChain timeout");
}
startTimeSlotId = startTimeSlotId + 1000;
endTimeSlotId = endTimeSlotId + 1000;
}
}
@Test
public void testPeakConcurrentJob() throws Throwable {
long curTimeSlotId = stat.currentTimeSlotId();