From 332eba6351962b3998e7bb83d661b6672c873a9f Mon Sep 17 00:00:00 2001 From: Francis Dong Date: Mon, 28 Dec 2020 17:06:05 +0800 Subject: [PATCH] increase request counter at the beginning of the request --- src/main/java/we/stats/FlowStat.java | 110 +++++++++------------ src/main/java/we/stats/ResourceStat.java | 106 ++++++++++++++++---- src/main/java/we/stats/TimeSlot.java | 24 +++-- src/main/java/we/stats/TimeWindowStat.java | 6 +- src/test/java/we/stats/FlowStatTests.java | 66 ++++++------- 5 files changed, 186 insertions(+), 126 deletions(-) diff --git a/src/main/java/we/stats/FlowStat.java b/src/main/java/we/stats/FlowStat.java index cef38e9..f19a465 100644 --- a/src/main/java/we/stats/FlowStat.java +++ b/src/main/java/we/stats/FlowStat.java @@ -1,6 +1,5 @@ package we.stats; -import java.math.BigDecimal; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -34,6 +33,11 @@ public class FlowStat { */ public ConcurrentMap resourceStats = new ConcurrentHashMap<>(); + /** + * Retention time of statistic data + */ + public static long RETENTION_TIME_IN_MINUTES = 10; + private ExecutorService pool = Executors.newFixedThreadPool(1); public FlowStat() { @@ -44,6 +48,15 @@ public class FlowStat { pool.submit(new HousekeepJob(this)); } + /** + * Update retention time + * + * @param retentionTimeInMinutes + */ + public void updateRetentionTime(int retentionTimeInMinutes) { + RETENTION_TIME_IN_MINUTES = retentionTimeInMinutes; + } + /** * Returns the current time slot ID * @@ -64,84 +77,45 @@ public class FlowStat { } /** - * Increment concurrent request counter of the specified resource - * - * @param resourceId Resource ID - */ - public void incrConcurrentRequest(String resourceId) { - ResourceStat resourceStat = getResourceStat(resourceId); - resourceStat.incrConcurrentRequest(currentTimeSlotId()); - } - - /** - * Increment concurrent request counter of the specified resource + * Increase concurrent request counter of the specified resource * + * @param resourceId Resource ID * @param curTimeSlotId current time slot ID, it should be generated by * Flowstat.currentTimeSlotId() - * @param resourceId Resource ID * @param maxCon Maximum concurrent request of the specified resource, * null/zero/negative for no limit * @param maxRPS Maximum RPS of the specified resource, * null/zero/negative for no limit - * @return false if exceed the maximum concurrent request/RPS of the specified - * resource or all resources + * @return true if the request is not blocked; false if exceed the maximum + * concurrent request/RPS of the specified resource */ - public synchronized boolean incrConcurrentRequest(long curTimeSlotId, String resourceId, Integer maxCon, - Integer maxRPS) { + public boolean incrRequest(String resourceId, long curTimeSlotId, Long maxCon, Long maxRPS) { ResourceStat resourceStat = getResourceStat(resourceId); - boolean isExceeded = false; - if (maxCon != null && maxCon.intValue() > 0) { - int n = resourceStat.getConcurrentRequests().get(); - if (n >= maxCon.intValue()) { - isExceeded = true; - resourceStat.incrBlockRequestToTimeSlot(curTimeSlotId); - } - } else if (maxRPS != null && maxRPS.intValue() > 0) { -// TimeWindowStat timeWindowStat = this.getCurrentTimeWindowStat(resourceId, curTimeSlotId); -// if (new BigDecimal(maxRPS).compareTo(timeWindowStat.getRps()) <= 0) { -// isExceeded = true; -// resourceStat.incrBlockRequestToTimeSlot(curTimeSlotId); -// } - - // time slot unit is one second - long total = resourceStat.getTimeSlot(curTimeSlotId).getCounter().get(); - long max = Long.valueOf(maxRPS); - if (total >= max) { - isExceeded = true; - resourceStat.incrBlockRequestToTimeSlot(curTimeSlotId); - } + boolean success = resourceStat.incrConcurrentRequest(curTimeSlotId, maxCon); + if (success) { + success = resourceStat.incrRequestToTimeSlot(curTimeSlotId, maxRPS); } - if (!isExceeded) { - resourceStat.incrConcurrentRequest(curTimeSlotId); - } - return !isExceeded; + return success; } /** - * Returns the current concurrent requests of the specified resource
- *
+ * Decrease concurrent request of the specified resource of the specified time + * slot * * @param resourceId Resource ID + * @param timeSlotId TimeSlot ID + * @return */ - public int getConcurrentRequests(String resourceId) { + public void decrConcurrentRequest(String resourceId, long timeSlotId) { + if (resourceId == null) { + return; + } ResourceStat resourceStat = getResourceStat(resourceId); - return resourceStat.getConcurrentRequests().get(); + resourceStat.decrConcurrentRequest(timeSlotId); } /** - * Add request to current time slot and decrement concurrent connection counter - * - * @param resourceId Resource ID - * @param rt Response time of request - * @param isSuccess Whether the request is success or not - */ - public void incrRequest(String resourceId, long rt, boolean isSuccess) { - incrRequestToTimeSlot(resourceId, currentTimeSlotId(), rt, isSuccess); - } - - /** - * Add request to the specified time slot and decrement concurrent connection - * counter + * Add request RT to the specified time slot counter * * @param resourceId Resource ID * @param timeSlotId TimeSlot ID @@ -149,12 +123,12 @@ public class FlowStat { * @param isSuccess Whether the request is success or not * @return */ - public void incrRequestToTimeSlot(String resourceId, long timeSlotId, long rt, boolean isSuccess) { + public void addRequestRT(String resourceId, long timeSlotId, long rt, boolean isSuccess) { if (resourceId == null) { return; } ResourceStat resourceStat = getResourceStat(resourceId); - resourceStat.incrRequestToTimeSlot(timeSlotId, rt, isSuccess); + resourceStat.addRequestRT(timeSlotId, rt, isSuccess); } private ResourceStat getResourceStat(String resourceId) { @@ -171,6 +145,17 @@ public class FlowStat { return resourceStat; } + /** + * Returns the current concurrent requests of the specified resource
+ *
+ * + * @param resourceId Resource ID + */ + public long getConcurrentRequests(String resourceId) { + ResourceStat resourceStat = getResourceStat(resourceId); + return resourceStat.getConcurrentRequests().get(); + } + /** * Returns current TimeWindowStat of the specified resource * @@ -189,6 +174,7 @@ public class FlowStat { * @param curTimeSlotId * @return */ + @SuppressWarnings("unused") private TimeWindowStat getCurrentTimeWindowStat(String resourceId, long curTimeSlotId) { return getTimeWindowStat(resourceId, curTimeSlotId, curTimeSlotId + 1000); } @@ -312,7 +298,7 @@ public class FlowStat { @Override public void run() { - long n = 2 * 60 * 60 * 1000 / FlowStat.INTERVAL * FlowStat.INTERVAL; + long n = FlowStat.RETENTION_TIME_IN_MINUTES * 60 * 1000 / FlowStat.INTERVAL * FlowStat.INTERVAL; long lastSlotId = stat.currentTimeSlotId() - n; while (true) { log.debug("housekeeping start"); diff --git a/src/main/java/we/stats/ResourceStat.java b/src/main/java/we/stats/ResourceStat.java index 7b06b56..c904a5d 100644 --- a/src/main/java/we/stats/ResourceStat.java +++ b/src/main/java/we/stats/ResourceStat.java @@ -3,7 +3,9 @@ package we.stats; import java.math.BigDecimal; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * @@ -25,14 +27,20 @@ public class ResourceStat { /** * Concurrent requests */ - private AtomicInteger concurrentRequests = new AtomicInteger(0); + private AtomicLong concurrentRequests = new AtomicLong(0); + + private ReentrantReadWriteLock rwl1 = new ReentrantReadWriteLock(); + private ReentrantReadWriteLock rwl2 = new ReentrantReadWriteLock(); + private Lock w1 = rwl1.writeLock(); + private Lock w2 = rwl2.writeLock(); public ResourceStat(String resourceId) { this.resourceId = resourceId; } - + /** * Returns Time slot of the specified time slot ID + * * @param timeSlotId * @return */ @@ -51,34 +59,98 @@ public class ResourceStat { } /** - * Increment concurrent request counter of the resource + * Increase concurrent request counter of the resource * + * @param timeSlotId + * @param maxCon + * @return false if exceed the maximum concurrent request of the specified + * resource */ - public void incrConcurrentRequest(long timeSlotId) { - int conns = this.concurrentRequests.incrementAndGet(); - this.getTimeSlot(timeSlotId).updatePeakConcurrentReqeusts(conns); + public boolean incrConcurrentRequest(long timeSlotId, Long maxCon) { + w1.lock(); + try { + boolean isExceeded = false; + if (maxCon != null && maxCon.intValue() > 0) { + long n = this.concurrentRequests.get(); + if (n >= maxCon.longValue()) { + isExceeded = true; + this.incrBlockRequestToTimeSlot(timeSlotId); + } else { + long conns = this.concurrentRequests.incrementAndGet(); + this.getTimeSlot(timeSlotId).updatePeakConcurrentReqeusts(conns); + } + } else { + long conns = this.concurrentRequests.incrementAndGet(); + this.getTimeSlot(timeSlotId).updatePeakConcurrentReqeusts(conns); + } + return !isExceeded; + } finally { + w1.unlock(); + } } /** - * Increment block request to the specified time slot + * Decrease concurrent request counter of the resource * */ - public void incrBlockRequestToTimeSlot(long timeSlotId) { + public void decrConcurrentRequest(long timeSlotId) { + this.concurrentRequests.decrementAndGet(); + } + + /** + * Increase block request to the specified time slot + * + */ + private void incrBlockRequestToTimeSlot(long timeSlotId) { this.getTimeSlot(timeSlotId).getBlockRequests().incrementAndGet(); } /** - * Add request to the specified time slot and decrement concurrent request - * counter + * Add request to the specified time slot + * + * @param timeSlotId + * @return false if exceed the maximum RPS of the specified resource + */ + public boolean incrRequestToTimeSlot(long timeSlotId, Long maxRPS) { + w2.lock(); + try { + boolean isExceeded = false; + if (maxRPS != null && maxRPS.intValue() > 0) { +// TimeWindowStat timeWindowStat = this.getCurrentTimeWindowStat(resourceId, curTimeSlotId); +// if (new BigDecimal(maxRPS).compareTo(timeWindowStat.getRps()) <= 0) { +// isExceeded = true; +// resourceStat.incrBlockRequestToTimeSlot(curTimeSlotId); +// } + + // time slot unit is one second + long total = this.getTimeSlot(timeSlotId).getCounter().get(); + long max = Long.valueOf(maxRPS); + if (total >= max) { + isExceeded = true; + this.incrBlockRequestToTimeSlot(timeSlotId); + this.decrConcurrentRequest(timeSlotId); + } else { + this.getTimeSlot(timeSlotId).incr(); + } + } else { + this.getTimeSlot(timeSlotId).incr(); + } + return !isExceeded; + } finally { + w2.unlock(); + } + } + + /** + * Add request RT to the specified time slot * * @param timeSlotId * @param rt response time of the request * @param isSuccess Whether the request is success or not * @return */ - public void incrRequestToTimeSlot(long timeSlotId, long rt, boolean isSuccess) { - int conns = this.concurrentRequests.decrementAndGet(); - this.getTimeSlot(timeSlotId).incr(rt, conns, isSuccess); + public void addRequestRT(long timeSlotId, long rt, boolean isSuccess) { + this.getTimeSlot(timeSlotId).addRequestRT(rt, isSuccess); } /** @@ -95,7 +167,7 @@ public class ResourceStat { long max = Long.MIN_VALUE; long totalReqs = 0; long totalRt = 0; - int peakConcurrences = 0; + long peakConcurrences = 0; long errors = 0; long blockReqs = 0; for (long i = startSlotId; i < endSlotId;) { @@ -154,11 +226,11 @@ public class ResourceStat { this.timeSlots = timeSlots; } - public AtomicInteger getConcurrentRequests() { + public AtomicLong getConcurrentRequests() { return concurrentRequests; } - public void setConcurrentRequests(AtomicInteger concurrentRequests) { + public void setConcurrentRequests(AtomicLong concurrentRequests) { this.concurrentRequests = concurrentRequests; } diff --git a/src/main/java/we/stats/TimeSlot.java b/src/main/java/we/stats/TimeSlot.java index 8a2e03a..bd8574f 100644 --- a/src/main/java/we/stats/TimeSlot.java +++ b/src/main/java/we/stats/TimeSlot.java @@ -42,7 +42,7 @@ public class TimeSlot { /** * Peak concurrent requests */ - private int peakConcurrentReqeusts; + private long peakConcurrentReqeusts; /** * Block requests
@@ -60,20 +60,24 @@ public class TimeSlot { /** * Add request to time slot * - * @param rt - * @param concurrentRequests Current concurrent requests - * @param isSuccess Whether the request is success or not */ - public synchronized void incr(long rt, int concurrentRequests, boolean isSuccess) { + public void incr() { counter.incrementAndGet(); + } + + /** + * Add request RT information to time slot + * + * @param rt + * @param isSuccess Whether the request is success or not + */ + public synchronized void addRequestRT(long rt, boolean isSuccess) { totalRt.addAndGet(rt); if (!isSuccess) { errors.incrementAndGet(); } min = rt < min ? rt : min; max = rt > max ? rt : max; - peakConcurrentReqeusts = concurrentRequests > peakConcurrentReqeusts ? concurrentRequests - : peakConcurrentReqeusts; } /** @@ -81,7 +85,7 @@ public class TimeSlot { * * @param concurrentRequests Current concurrent requests */ - public synchronized void updatePeakConcurrentReqeusts(int concurrentRequests) { + public synchronized void updatePeakConcurrentReqeusts(long concurrentRequests) { peakConcurrentReqeusts = concurrentRequests > peakConcurrentReqeusts ? concurrentRequests : peakConcurrentReqeusts; } @@ -122,11 +126,11 @@ public class TimeSlot { this.totalRt = totalRt; } - public int getPeakConcurrentReqeusts() { + public long getPeakConcurrentReqeusts() { return peakConcurrentReqeusts; } - public void setPeakConcurrentReqeusts(int peakConcurrentReqeusts) { + public void setPeakConcurrentReqeusts(long peakConcurrentReqeusts) { this.peakConcurrentReqeusts = peakConcurrentReqeusts; } diff --git a/src/main/java/we/stats/TimeWindowStat.java b/src/main/java/we/stats/TimeWindowStat.java index 33a131c..d654fa9 100644 --- a/src/main/java/we/stats/TimeWindowStat.java +++ b/src/main/java/we/stats/TimeWindowStat.java @@ -52,7 +52,7 @@ public class TimeWindowStat { /** * Peak concurrent requests of the time window */ - private Integer peakConcurrentReqeusts; + private Long peakConcurrentReqeusts; /** * Block requests @@ -67,11 +67,11 @@ public class TimeWindowStat { this.blockRequests = blockRequests; } - public Integer getPeakConcurrentReqeusts() { + public Long getPeakConcurrentReqeusts() { return peakConcurrentReqeusts; } - public void setPeakConcurrentReqeusts(Integer peakConcurrentReqeusts) { + public void setPeakConcurrentReqeusts(Long peakConcurrentReqeusts) { this.peakConcurrentReqeusts = peakConcurrentReqeusts; } diff --git a/src/test/java/we/stats/FlowStatTests.java b/src/test/java/we/stats/FlowStatTests.java index 248cd4c..183f954 100644 --- a/src/test/java/we/stats/FlowStatTests.java +++ b/src/test/java/we/stats/FlowStatTests.java @@ -2,7 +2,6 @@ package we.stats; import static org.junit.jupiter.api.Assertions.assertEquals; -import java.math.BigDecimal; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -10,8 +9,6 @@ import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; -import we.stats.FlowStat; -import we.stats.TimeWindowStat; import we.util.JacksonUtils; /** @@ -25,15 +22,18 @@ public class FlowStatTests { @Test public void testIncr() throws Throwable { - long t = stat.currentTimeSlotId(); - long slotId = t + 1000; + long curTimeSlotId = stat.currentTimeSlotId(); + long slotId = curTimeSlotId + 1000; String resourceId = "a"; - stat.incrRequestToTimeSlot(resourceId, t, 100l, true); + stat.incrRequest(resourceId, curTimeSlotId, null, null); TimeWindowStat tws = stat.getPreviousSecondStat(resourceId, slotId); assertEquals(1, tws.getTotal()); - stat.incrRequestToTimeSlot(resourceId, t, 300l, false); + stat.incrRequest(resourceId, curTimeSlotId, null, null); + stat.addRequestRT(resourceId, curTimeSlotId, 100, false); + stat.addRequestRT(resourceId, curTimeSlotId, 300, true); + tws = stat.getPreviousSecondStat(resourceId, slotId); assertEquals(2, tws.getTotal()); assertEquals(200, tws.getAvgRt()); @@ -41,31 +41,35 @@ public class FlowStatTests { assertEquals(300, tws.getMax()); assertEquals(2, tws.getRps().intValue()); assertEquals(1, tws.getErrors()); + + stat.decrConcurrentRequest(resourceId, curTimeSlotId); + Long con = stat.getConcurrentRequests(resourceId); + assertEquals(1, con); // System.out.println(JacksonUtils.writeValueAsString(stat.resourceStats)); } @Test - public void testIncrConcurrentRequest() throws Throwable { + public void testIncrRequest() throws Throwable { long curTimeSlotId = stat.currentTimeSlotId(); long nextSlotId = curTimeSlotId + 1000; String resourceId = "b"; - Integer maxCon = 10; - Integer maxRPS = 20; + Long maxCon = 10l; + Long maxRPS = 20l; - stat.incrConcurrentRequest(curTimeSlotId, resourceId, maxCon, maxRPS); + stat.incrRequest(resourceId, curTimeSlotId, maxCon, maxRPS); TimeWindowStat tws = stat.getTimeWindowStat(resourceId, curTimeSlotId, nextSlotId); - int peakCon = tws.getPeakConcurrentReqeusts(); - assertEquals(1, peakCon); + long peakCon = tws.getPeakConcurrentReqeusts(); + assertEquals(1l, peakCon); } @Test public void testBlockedByMaxCon() throws Throwable { long curTimeSlotId = stat.currentTimeSlotId(); long nextSlotId = curTimeSlotId + 1000; - Integer maxCon = 10; - Integer maxRPS = 20; + Long maxCon = 10l; + Long maxRPS = 20l; int threads = 100; int requests = 10000; int totalRequests = threads * requests; @@ -74,7 +78,7 @@ public class FlowStatTests { ExecutorService pool = Executors.newFixedThreadPool(threads); long t1 = System.currentTimeMillis(); for (int i = 0; i < threads; i++) { - pool.submit(new ConcurrentJob(requests, curTimeSlotId, resourceId, maxCon, maxRPS, false)); + pool.submit(new ConcurrentJob(requests, curTimeSlotId, resourceId, maxCon, maxRPS)); } pool.shutdown(); if (pool.awaitTermination(20, TimeUnit.SECONDS)) { @@ -82,7 +86,7 @@ public class FlowStatTests { TimeWindowStat tws = stat.getTimeWindowStat(resourceId, curTimeSlotId, nextSlotId); assertEquals(maxCon, tws.getPeakConcurrentReqeusts()); assertEquals(totalRequests - maxCon, tws.getBlockRequests()); - System.out.println("total elapsed time for " + threads * requests + " requests:" + (t2 - t1) + "ms"); + System.out.println("testBlockedByMaxCon total elapsed time for " + threads * requests + " requests:" + (t2 - t1) + "ms"); } else { System.out.println("testIncrConcurrentRequest timeout"); } @@ -93,21 +97,21 @@ public class FlowStatTests { public void testBlockedByMaxRPS() throws Throwable { long curTimeSlotId = stat.currentTimeSlotId(); long nextSlotId = curTimeSlotId + 1000; - Integer maxCon = null; - Integer maxRPS = 20; + Long maxCon = Long.MAX_VALUE; + Long maxRPS = 20l; int threads = 100; int requests = 10000; int totalRequests = threads * requests; String resourceId = "c"; for (int i = 0; i < maxRPS; i++) { - stat.incrRequestToTimeSlot(resourceId, curTimeSlotId, 100, true); + stat.incrRequest(resourceId, curTimeSlotId, maxCon, maxRPS); } ExecutorService pool = Executors.newFixedThreadPool(threads); long t1 = System.currentTimeMillis(); for (int i = 0; i < threads; i++) { - pool.submit(new ConcurrentJob(requests, curTimeSlotId, resourceId, maxCon, maxRPS, false)); + pool.submit(new ConcurrentJob(requests, curTimeSlotId, resourceId, maxCon, maxRPS)); } pool.shutdown(); if (pool.awaitTermination(20, TimeUnit.SECONDS)) { @@ -115,7 +119,7 @@ public class FlowStatTests { TimeWindowStat tws = stat.getTimeWindowStat(resourceId, curTimeSlotId, nextSlotId); assertEquals(maxRPS, tws.getRps().intValue()); assertEquals(totalRequests, tws.getBlockRequests()); - System.out.println("total elapsed time for " + threads * requests + " requests:" + (t2 - t1) + "ms"); + System.out.println("testIncrConcurrentRequest total elapsed time for " + threads * requests + " requests:" + (t2 - t1) + "ms"); } else { System.out.println("testIncrConcurrentRequest timeout"); } @@ -220,11 +224,11 @@ public class FlowStatTests { for (int m = 0; m < slots; m++) { for (int i = 0; i < requests; i++) { for (int j = 0; j < resources; j++) { - stat.incrConcurrentRequest("resource-" + j); + stat.incrRequest("resource-" + j, startSlotId + (m * FlowStat.INTERVAL), null, null); // 10% error boolean isSuccess = i % 10 == 1 ? false : true; // rt will be triple while even - stat.incrRequestToTimeSlot("resource-" + j, startSlotId + (m * FlowStat.INTERVAL), + stat.addRequestRT("resource-" + j, startSlotId + (m * FlowStat.INTERVAL), rt * (j % 2 == 0 ? 3 : 1), isSuccess); } try { @@ -239,30 +243,24 @@ public class FlowStatTests { class ConcurrentJob implements Runnable { - public ConcurrentJob(int requests, long curTimeSlotId, String resourceId, Integer maxCon, Integer maxRPS, boolean incrReq) { + public ConcurrentJob(int requests, long curTimeSlotId, String resourceId, Long maxCon, Long maxRPS) { this.requests = requests; this.resourceId = resourceId; this.maxRPS = maxRPS; this.maxCon = maxCon; this.curTimeSlotId = curTimeSlotId; - this.incrReq = incrReq; } private int requests = 0; private String resourceId; - private Integer maxCon = 0; - private Integer maxRPS = 0; + private Long maxCon = 0l; + private Long maxRPS = 0l; private long curTimeSlotId = 0; - private boolean incrReq; @Override public void run() { for (int i = 0; i < requests; i++) { - if (incrReq) { - stat.incrRequestToTimeSlot(resourceId, curTimeSlotId, 100, true); - }else { - stat.incrConcurrentRequest(curTimeSlotId, resourceId, maxCon, maxRPS); - } + stat.incrRequest(resourceId, curTimeSlotId, maxCon, maxRPS); } } }