increase request counter at the beginning of the request

This commit is contained in:
Francis Dong
2020-12-28 17:06:05 +08:00
parent f95a63e8b3
commit 332eba6351
5 changed files with 186 additions and 126 deletions

View File

@@ -1,6 +1,5 @@
package we.stats;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -34,6 +33,11 @@ public class FlowStat {
*/
public ConcurrentMap<String, ResourceStat> resourceStats = new ConcurrentHashMap<>();
/**
* Retention time of statistic data
*/
public static long RETENTION_TIME_IN_MINUTES = 10;
private ExecutorService pool = Executors.newFixedThreadPool(1);
public FlowStat() {
@@ -44,6 +48,15 @@ public class FlowStat {
pool.submit(new HousekeepJob(this));
}
/**
* Update retention time
*
* @param retentionTimeInMinutes
*/
public void updateRetentionTime(int retentionTimeInMinutes) {
RETENTION_TIME_IN_MINUTES = retentionTimeInMinutes;
}
/**
* Returns the current time slot ID
*
@@ -64,84 +77,45 @@ public class FlowStat {
}
/**
* Increment concurrent request counter of the specified resource
*
* @param resourceId Resource ID
*/
public void incrConcurrentRequest(String resourceId) {
ResourceStat resourceStat = getResourceStat(resourceId);
resourceStat.incrConcurrentRequest(currentTimeSlotId());
}
/**
* Increment concurrent request counter of the specified resource
* Increase concurrent request counter of the specified resource
*
* @param resourceId Resource ID
* @param curTimeSlotId current time slot ID, it should be generated by
* Flowstat.currentTimeSlotId()
* @param resourceId Resource ID
* @param maxCon Maximum concurrent request of the specified resource,
* null/zero/negative for no limit
* @param maxRPS Maximum RPS of the specified resource,
* null/zero/negative for no limit
* @return false if exceed the maximum concurrent request/RPS of the specified
* resource or all resources
* @return true if the request is not blocked; false if exceed the maximum
* concurrent request/RPS of the specified resource
*/
public synchronized boolean incrConcurrentRequest(long curTimeSlotId, String resourceId, Integer maxCon,
Integer maxRPS) {
public boolean incrRequest(String resourceId, long curTimeSlotId, Long maxCon, Long maxRPS) {
ResourceStat resourceStat = getResourceStat(resourceId);
boolean isExceeded = false;
if (maxCon != null && maxCon.intValue() > 0) {
int n = resourceStat.getConcurrentRequests().get();
if (n >= maxCon.intValue()) {
isExceeded = true;
resourceStat.incrBlockRequestToTimeSlot(curTimeSlotId);
}
} else if (maxRPS != null && maxRPS.intValue() > 0) {
// TimeWindowStat timeWindowStat = this.getCurrentTimeWindowStat(resourceId, curTimeSlotId);
// if (new BigDecimal(maxRPS).compareTo(timeWindowStat.getRps()) <= 0) {
// isExceeded = true;
// resourceStat.incrBlockRequestToTimeSlot(curTimeSlotId);
// }
// time slot unit is one second
long total = resourceStat.getTimeSlot(curTimeSlotId).getCounter().get();
long max = Long.valueOf(maxRPS);
if (total >= max) {
isExceeded = true;
resourceStat.incrBlockRequestToTimeSlot(curTimeSlotId);
}
boolean success = resourceStat.incrConcurrentRequest(curTimeSlotId, maxCon);
if (success) {
success = resourceStat.incrRequestToTimeSlot(curTimeSlotId, maxRPS);
}
if (!isExceeded) {
resourceStat.incrConcurrentRequest(curTimeSlotId);
}
return !isExceeded;
return success;
}
/**
* Returns the current concurrent requests of the specified resource<br/>
* <br/>
* Decrease concurrent request of the specified resource of the specified time
* slot
*
* @param resourceId Resource ID
* @param timeSlotId TimeSlot ID
* @return
*/
public int getConcurrentRequests(String resourceId) {
public void decrConcurrentRequest(String resourceId, long timeSlotId) {
if (resourceId == null) {
return;
}
ResourceStat resourceStat = getResourceStat(resourceId);
return resourceStat.getConcurrentRequests().get();
resourceStat.decrConcurrentRequest(timeSlotId);
}
/**
* Add request to current time slot and decrement concurrent connection counter
*
* @param resourceId Resource ID
* @param rt Response time of request
* @param isSuccess Whether the request is success or not
*/
public void incrRequest(String resourceId, long rt, boolean isSuccess) {
incrRequestToTimeSlot(resourceId, currentTimeSlotId(), rt, isSuccess);
}
/**
* Add request to the specified time slot and decrement concurrent connection
* counter
* Add request RT to the specified time slot counter
*
* @param resourceId Resource ID
* @param timeSlotId TimeSlot ID
@@ -149,12 +123,12 @@ public class FlowStat {
* @param isSuccess Whether the request is success or not
* @return
*/
public void incrRequestToTimeSlot(String resourceId, long timeSlotId, long rt, boolean isSuccess) {
public void addRequestRT(String resourceId, long timeSlotId, long rt, boolean isSuccess) {
if (resourceId == null) {
return;
}
ResourceStat resourceStat = getResourceStat(resourceId);
resourceStat.incrRequestToTimeSlot(timeSlotId, rt, isSuccess);
resourceStat.addRequestRT(timeSlotId, rt, isSuccess);
}
private ResourceStat getResourceStat(String resourceId) {
@@ -171,6 +145,17 @@ public class FlowStat {
return resourceStat;
}
/**
* Returns the current concurrent requests of the specified resource<br/>
* <br/>
*
* @param resourceId Resource ID
*/
public long getConcurrentRequests(String resourceId) {
ResourceStat resourceStat = getResourceStat(resourceId);
return resourceStat.getConcurrentRequests().get();
}
/**
* Returns current TimeWindowStat of the specified resource
*
@@ -189,6 +174,7 @@ public class FlowStat {
* @param curTimeSlotId
* @return
*/
@SuppressWarnings("unused")
private TimeWindowStat getCurrentTimeWindowStat(String resourceId, long curTimeSlotId) {
return getTimeWindowStat(resourceId, curTimeSlotId, curTimeSlotId + 1000);
}
@@ -312,7 +298,7 @@ public class FlowStat {
@Override
public void run() {
long n = 2 * 60 * 60 * 1000 / FlowStat.INTERVAL * FlowStat.INTERVAL;
long n = FlowStat.RETENTION_TIME_IN_MINUTES * 60 * 1000 / FlowStat.INTERVAL * FlowStat.INTERVAL;
long lastSlotId = stat.currentTimeSlotId() - n;
while (true) {
log.debug("housekeeping start");

View File

@@ -3,7 +3,9 @@ package we.stats;
import java.math.BigDecimal;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
*
@@ -25,14 +27,20 @@ public class ResourceStat {
/**
* Concurrent requests
*/
private AtomicInteger concurrentRequests = new AtomicInteger(0);
private AtomicLong concurrentRequests = new AtomicLong(0);
private ReentrantReadWriteLock rwl1 = new ReentrantReadWriteLock();
private ReentrantReadWriteLock rwl2 = new ReentrantReadWriteLock();
private Lock w1 = rwl1.writeLock();
private Lock w2 = rwl2.writeLock();
public ResourceStat(String resourceId) {
this.resourceId = resourceId;
}
/**
* Returns Time slot of the specified time slot ID
*
* @param timeSlotId
* @return
*/
@@ -51,34 +59,98 @@ public class ResourceStat {
}
/**
* Increment concurrent request counter of the resource
* Increase concurrent request counter of the resource
*
* @param timeSlotId
* @param maxCon
* @return false if exceed the maximum concurrent request of the specified
* resource
*/
public void incrConcurrentRequest(long timeSlotId) {
int conns = this.concurrentRequests.incrementAndGet();
this.getTimeSlot(timeSlotId).updatePeakConcurrentReqeusts(conns);
public boolean incrConcurrentRequest(long timeSlotId, Long maxCon) {
w1.lock();
try {
boolean isExceeded = false;
if (maxCon != null && maxCon.intValue() > 0) {
long n = this.concurrentRequests.get();
if (n >= maxCon.longValue()) {
isExceeded = true;
this.incrBlockRequestToTimeSlot(timeSlotId);
} else {
long conns = this.concurrentRequests.incrementAndGet();
this.getTimeSlot(timeSlotId).updatePeakConcurrentReqeusts(conns);
}
} else {
long conns = this.concurrentRequests.incrementAndGet();
this.getTimeSlot(timeSlotId).updatePeakConcurrentReqeusts(conns);
}
return !isExceeded;
} finally {
w1.unlock();
}
}
/**
* Increment block request to the specified time slot
* Decrease concurrent request counter of the resource
*
*/
public void incrBlockRequestToTimeSlot(long timeSlotId) {
public void decrConcurrentRequest(long timeSlotId) {
this.concurrentRequests.decrementAndGet();
}
/**
* Increase block request to the specified time slot
*
*/
private void incrBlockRequestToTimeSlot(long timeSlotId) {
this.getTimeSlot(timeSlotId).getBlockRequests().incrementAndGet();
}
/**
* Add request to the specified time slot and decrement concurrent request
* counter
* Add request to the specified time slot
*
* @param timeSlotId
* @return false if exceed the maximum RPS of the specified resource
*/
public boolean incrRequestToTimeSlot(long timeSlotId, Long maxRPS) {
w2.lock();
try {
boolean isExceeded = false;
if (maxRPS != null && maxRPS.intValue() > 0) {
// TimeWindowStat timeWindowStat = this.getCurrentTimeWindowStat(resourceId, curTimeSlotId);
// if (new BigDecimal(maxRPS).compareTo(timeWindowStat.getRps()) <= 0) {
// isExceeded = true;
// resourceStat.incrBlockRequestToTimeSlot(curTimeSlotId);
// }
// time slot unit is one second
long total = this.getTimeSlot(timeSlotId).getCounter().get();
long max = Long.valueOf(maxRPS);
if (total >= max) {
isExceeded = true;
this.incrBlockRequestToTimeSlot(timeSlotId);
this.decrConcurrentRequest(timeSlotId);
} else {
this.getTimeSlot(timeSlotId).incr();
}
} else {
this.getTimeSlot(timeSlotId).incr();
}
return !isExceeded;
} finally {
w2.unlock();
}
}
/**
* Add request RT to the specified time slot
*
* @param timeSlotId
* @param rt response time of the request
* @param isSuccess Whether the request is success or not
* @return
*/
public void incrRequestToTimeSlot(long timeSlotId, long rt, boolean isSuccess) {
int conns = this.concurrentRequests.decrementAndGet();
this.getTimeSlot(timeSlotId).incr(rt, conns, isSuccess);
public void addRequestRT(long timeSlotId, long rt, boolean isSuccess) {
this.getTimeSlot(timeSlotId).addRequestRT(rt, isSuccess);
}
/**
@@ -95,7 +167,7 @@ public class ResourceStat {
long max = Long.MIN_VALUE;
long totalReqs = 0;
long totalRt = 0;
int peakConcurrences = 0;
long peakConcurrences = 0;
long errors = 0;
long blockReqs = 0;
for (long i = startSlotId; i < endSlotId;) {
@@ -154,11 +226,11 @@ public class ResourceStat {
this.timeSlots = timeSlots;
}
public AtomicInteger getConcurrentRequests() {
public AtomicLong getConcurrentRequests() {
return concurrentRequests;
}
public void setConcurrentRequests(AtomicInteger concurrentRequests) {
public void setConcurrentRequests(AtomicLong concurrentRequests) {
this.concurrentRequests = concurrentRequests;
}

View File

@@ -42,7 +42,7 @@ public class TimeSlot {
/**
* Peak concurrent requests
*/
private int peakConcurrentReqeusts;
private long peakConcurrentReqeusts;
/**
* Block requests <br/>
@@ -60,20 +60,24 @@ public class TimeSlot {
/**
* Add request to time slot
*
* @param rt
* @param concurrentRequests Current concurrent requests
* @param isSuccess Whether the request is success or not
*/
public synchronized void incr(long rt, int concurrentRequests, boolean isSuccess) {
public void incr() {
counter.incrementAndGet();
}
/**
* Add request RT information to time slot
*
* @param rt
* @param isSuccess Whether the request is success or not
*/
public synchronized void addRequestRT(long rt, boolean isSuccess) {
totalRt.addAndGet(rt);
if (!isSuccess) {
errors.incrementAndGet();
}
min = rt < min ? rt : min;
max = rt > max ? rt : max;
peakConcurrentReqeusts = concurrentRequests > peakConcurrentReqeusts ? concurrentRequests
: peakConcurrentReqeusts;
}
/**
@@ -81,7 +85,7 @@ public class TimeSlot {
*
* @param concurrentRequests Current concurrent requests
*/
public synchronized void updatePeakConcurrentReqeusts(int concurrentRequests) {
public synchronized void updatePeakConcurrentReqeusts(long concurrentRequests) {
peakConcurrentReqeusts = concurrentRequests > peakConcurrentReqeusts ? concurrentRequests
: peakConcurrentReqeusts;
}
@@ -122,11 +126,11 @@ public class TimeSlot {
this.totalRt = totalRt;
}
public int getPeakConcurrentReqeusts() {
public long getPeakConcurrentReqeusts() {
return peakConcurrentReqeusts;
}
public void setPeakConcurrentReqeusts(int peakConcurrentReqeusts) {
public void setPeakConcurrentReqeusts(long peakConcurrentReqeusts) {
this.peakConcurrentReqeusts = peakConcurrentReqeusts;
}

View File

@@ -52,7 +52,7 @@ public class TimeWindowStat {
/**
* Peak concurrent requests of the time window
*/
private Integer peakConcurrentReqeusts;
private Long peakConcurrentReqeusts;
/**
* Block requests
@@ -67,11 +67,11 @@ public class TimeWindowStat {
this.blockRequests = blockRequests;
}
public Integer getPeakConcurrentReqeusts() {
public Long getPeakConcurrentReqeusts() {
return peakConcurrentReqeusts;
}
public void setPeakConcurrentReqeusts(Integer peakConcurrentReqeusts) {
public void setPeakConcurrentReqeusts(Long peakConcurrentReqeusts) {
this.peakConcurrentReqeusts = peakConcurrentReqeusts;
}

View File

@@ -2,7 +2,6 @@ package we.stats;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.math.BigDecimal;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -10,8 +9,6 @@ import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
import we.stats.FlowStat;
import we.stats.TimeWindowStat;
import we.util.JacksonUtils;
/**
@@ -25,15 +22,18 @@ public class FlowStatTests {
@Test
public void testIncr() throws Throwable {
long t = stat.currentTimeSlotId();
long slotId = t + 1000;
long curTimeSlotId = stat.currentTimeSlotId();
long slotId = curTimeSlotId + 1000;
String resourceId = "a";
stat.incrRequestToTimeSlot(resourceId, t, 100l, true);
stat.incrRequest(resourceId, curTimeSlotId, null, null);
TimeWindowStat tws = stat.getPreviousSecondStat(resourceId, slotId);
assertEquals(1, tws.getTotal());
stat.incrRequestToTimeSlot(resourceId, t, 300l, false);
stat.incrRequest(resourceId, curTimeSlotId, null, null);
stat.addRequestRT(resourceId, curTimeSlotId, 100, false);
stat.addRequestRT(resourceId, curTimeSlotId, 300, true);
tws = stat.getPreviousSecondStat(resourceId, slotId);
assertEquals(2, tws.getTotal());
assertEquals(200, tws.getAvgRt());
@@ -41,31 +41,35 @@ public class FlowStatTests {
assertEquals(300, tws.getMax());
assertEquals(2, tws.getRps().intValue());
assertEquals(1, tws.getErrors());
stat.decrConcurrentRequest(resourceId, curTimeSlotId);
Long con = stat.getConcurrentRequests(resourceId);
assertEquals(1, con);
// System.out.println(JacksonUtils.writeValueAsString(stat.resourceStats));
}
@Test
public void testIncrConcurrentRequest() throws Throwable {
public void testIncrRequest() throws Throwable {
long curTimeSlotId = stat.currentTimeSlotId();
long nextSlotId = curTimeSlotId + 1000;
String resourceId = "b";
Integer maxCon = 10;
Integer maxRPS = 20;
Long maxCon = 10l;
Long maxRPS = 20l;
stat.incrConcurrentRequest(curTimeSlotId, resourceId, maxCon, maxRPS);
stat.incrRequest(resourceId, curTimeSlotId, maxCon, maxRPS);
TimeWindowStat tws = stat.getTimeWindowStat(resourceId, curTimeSlotId, nextSlotId);
int peakCon = tws.getPeakConcurrentReqeusts();
assertEquals(1, peakCon);
long peakCon = tws.getPeakConcurrentReqeusts();
assertEquals(1l, peakCon);
}
@Test
public void testBlockedByMaxCon() throws Throwable {
long curTimeSlotId = stat.currentTimeSlotId();
long nextSlotId = curTimeSlotId + 1000;
Integer maxCon = 10;
Integer maxRPS = 20;
Long maxCon = 10l;
Long maxRPS = 20l;
int threads = 100;
int requests = 10000;
int totalRequests = threads * requests;
@@ -74,7 +78,7 @@ public class FlowStatTests {
ExecutorService pool = Executors.newFixedThreadPool(threads);
long t1 = System.currentTimeMillis();
for (int i = 0; i < threads; i++) {
pool.submit(new ConcurrentJob(requests, curTimeSlotId, resourceId, maxCon, maxRPS, false));
pool.submit(new ConcurrentJob(requests, curTimeSlotId, resourceId, maxCon, maxRPS));
}
pool.shutdown();
if (pool.awaitTermination(20, TimeUnit.SECONDS)) {
@@ -82,7 +86,7 @@ public class FlowStatTests {
TimeWindowStat tws = stat.getTimeWindowStat(resourceId, curTimeSlotId, nextSlotId);
assertEquals(maxCon, tws.getPeakConcurrentReqeusts());
assertEquals(totalRequests - maxCon, tws.getBlockRequests());
System.out.println("total elapsed time for " + threads * requests + " requests" + (t2 - t1) + "ms");
System.out.println("testBlockedByMaxCon total elapsed time for " + threads * requests + " requests" + (t2 - t1) + "ms");
} else {
System.out.println("testIncrConcurrentRequest timeout");
}
@@ -93,21 +97,21 @@ public class FlowStatTests {
public void testBlockedByMaxRPS() throws Throwable {
long curTimeSlotId = stat.currentTimeSlotId();
long nextSlotId = curTimeSlotId + 1000;
Integer maxCon = null;
Integer maxRPS = 20;
Long maxCon = Long.MAX_VALUE;
Long maxRPS = 20l;
int threads = 100;
int requests = 10000;
int totalRequests = threads * requests;
String resourceId = "c";
for (int i = 0; i < maxRPS; i++) {
stat.incrRequestToTimeSlot(resourceId, curTimeSlotId, 100, true);
stat.incrRequest(resourceId, curTimeSlotId, maxCon, maxRPS);
}
ExecutorService pool = Executors.newFixedThreadPool(threads);
long t1 = System.currentTimeMillis();
for (int i = 0; i < threads; i++) {
pool.submit(new ConcurrentJob(requests, curTimeSlotId, resourceId, maxCon, maxRPS, false));
pool.submit(new ConcurrentJob(requests, curTimeSlotId, resourceId, maxCon, maxRPS));
}
pool.shutdown();
if (pool.awaitTermination(20, TimeUnit.SECONDS)) {
@@ -115,7 +119,7 @@ public class FlowStatTests {
TimeWindowStat tws = stat.getTimeWindowStat(resourceId, curTimeSlotId, nextSlotId);
assertEquals(maxRPS, tws.getRps().intValue());
assertEquals(totalRequests, tws.getBlockRequests());
System.out.println("total elapsed time for " + threads * requests + " requests" + (t2 - t1) + "ms");
System.out.println("testIncrConcurrentRequest total elapsed time for " + threads * requests + " requests" + (t2 - t1) + "ms");
} else {
System.out.println("testIncrConcurrentRequest timeout");
}
@@ -220,11 +224,11 @@ public class FlowStatTests {
for (int m = 0; m < slots; m++) {
for (int i = 0; i < requests; i++) {
for (int j = 0; j < resources; j++) {
stat.incrConcurrentRequest("resource-" + j);
stat.incrRequest("resource-" + j, startSlotId + (m * FlowStat.INTERVAL), null, null);
// 10% error
boolean isSuccess = i % 10 == 1 ? false : true;
// rt will be triple while even
stat.incrRequestToTimeSlot("resource-" + j, startSlotId + (m * FlowStat.INTERVAL),
stat.addRequestRT("resource-" + j, startSlotId + (m * FlowStat.INTERVAL),
rt * (j % 2 == 0 ? 3 : 1), isSuccess);
}
try {
@@ -239,30 +243,24 @@ public class FlowStatTests {
class ConcurrentJob implements Runnable {
public ConcurrentJob(int requests, long curTimeSlotId, String resourceId, Integer maxCon, Integer maxRPS, boolean incrReq) {
public ConcurrentJob(int requests, long curTimeSlotId, String resourceId, Long maxCon, Long maxRPS) {
this.requests = requests;
this.resourceId = resourceId;
this.maxRPS = maxRPS;
this.maxCon = maxCon;
this.curTimeSlotId = curTimeSlotId;
this.incrReq = incrReq;
}
private int requests = 0;
private String resourceId;
private Integer maxCon = 0;
private Integer maxRPS = 0;
private Long maxCon = 0l;
private Long maxRPS = 0l;
private long curTimeSlotId = 0;
private boolean incrReq;
@Override
public void run() {
for (int i = 0; i < requests; i++) {
if (incrReq) {
stat.incrRequestToTimeSlot(resourceId, curTimeSlotId, 100, true);
}else {
stat.incrConcurrentRequest(curTimeSlotId, resourceId, maxCon, maxRPS);
}
stat.incrRequest(resourceId, curTimeSlotId, maxCon, maxRPS);
}
}
}