add schedule job to sync concurrent request counter to next timeslot

This commit is contained in:
Francis Dong
2020-12-29 18:13:50 +08:00
parent 6496dc8c1f
commit 8be4401f47
3 changed files with 49 additions and 3 deletions

View File

@@ -38,14 +38,15 @@ public class FlowStat {
*/
public static long RETENTION_TIME_IN_MINUTES = 10;
private ExecutorService pool = Executors.newFixedThreadPool(1);
private ExecutorService pool = Executors.newFixedThreadPool(2);
public FlowStat() {
runHousekeepJob();
runScheduleJob();
}
private void runHousekeepJob() {
private void runScheduleJob() {
pool.submit(new HousekeepJob(this));
pool.submit(new PeakConcurrentJob(this));
}
/**
@@ -324,4 +325,37 @@ public class FlowStat {
}
}
class PeakConcurrentJob implements Runnable {
private FlowStat stat;
public PeakConcurrentJob(FlowStat stat) {
this.stat = stat;
}
@Override
public void run() {
Long lastTimeSlotId = null;
while (true) {
long curTimeSlotId = stat.currentTimeSlotId();
if (lastTimeSlotId == null || lastTimeSlotId.longValue() != curTimeSlotId) {
log.debug("PeakConcurrentJob start");
Set<Map.Entry<String, ResourceStat>> entrys = stat.resourceStats.entrySet();
for (Entry<String, ResourceStat> entry : entrys) {
String resourceId = entry.getKey();
log.debug("PeakConcurrentJob: resourceId={} slotId=={}", resourceId, curTimeSlotId);
entry.getValue().getTimeSlot(curTimeSlotId);
}
lastTimeSlotId = curTimeSlotId;
log.debug("PeakConcurrentJob done");
}
try {
Thread.sleep(100);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}

View File

@@ -49,6 +49,7 @@ public class ResourceStat {
return timeSlots.get(timeSlotId);
} else {
TimeSlot timeSlot = new TimeSlot(timeSlotId);
timeSlot.setPeakConcurrentReqeusts(this.concurrentRequests.get());
TimeSlot old = timeSlots.putIfAbsent(timeSlotId, timeSlot);
if (old != null) {
return old;

View File

@@ -20,6 +20,17 @@ public class FlowStatTests {
private FlowStat stat = new FlowStat();
@Test
public void testPeakConcurrentJob() throws Throwable {
long curTimeSlotId = stat.currentTimeSlotId();
long nextSlotId = curTimeSlotId + 1000;
String resourceId = "PeakConcurrentJob";
stat.incrRequest(resourceId, curTimeSlotId, null, null);
Thread.sleep(1200);
TimeWindowStat tws = stat.getPreviousSecondStat(resourceId, nextSlotId + 1000);
assertEquals(1, tws.getPeakConcurrentReqeusts());
}
@Test
public void testIncr() throws Throwable {
long curTimeSlotId = stat.currentTimeSlotId();