add flow statistic

This commit is contained in:
Francis Dong
2020-12-22 11:05:52 +08:00
parent 3e75a7b3ad
commit d3ed36f6de
6 changed files with 905 additions and 0 deletions

View File

@@ -0,0 +1,294 @@
package we.stats;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Flow Statistic
*
* @author Francis Dong
*
*/
public class FlowStat {
private static final Logger log = LoggerFactory.getLogger(FlowStat.class);
/**
* Time slot interval in millisecond
*/
public static long INTERVAL = 1000;
/**
* Route ID for all routes entry
*/
public static String ALL_TOUTES = "_ALL_ROUTES";
/**
* A string Route ID as key
*/
public ConcurrentMap<String, RouteStat> routeStats = new ConcurrentHashMap<>();
private ExecutorService pool = Executors.newFixedThreadPool(1);
public FlowStat() {
runHousekeepJob();
}
private void runHousekeepJob() {
pool.submit(new HousekeepJob(this));
}
/**
* Returns the current time slot ID
*
* @return
*/
public long currentTimeSlotId() {
return (System.currentTimeMillis() / INTERVAL) * INTERVAL;
}
/**
* Returns the time slot ID of the specified time
*
* @param timeMilli
* @return
*/
public long getTimeSlotId(long timeMilli) {
return (System.currentTimeMillis() / INTERVAL) * INTERVAL;
}
/**
* Increment concurrent request counter of the specified route
*
* @param routeId Route ID
*/
public void incrConcurrentRequest(String routeId) {
RouteStat routeStat = getRouteStat(routeId);
RouteStat allRouteStat = getRouteStat(ALL_TOUTES);
routeStat.incrConcurrentRequest();
allRouteStat.incrConcurrentRequest();
}
/**
* Returns the current concurrent requests of the specified route<br/>
* <br/>
* Returns the current concurrent connections of all routes:<br/>
* getConnection(TrafficStat.ALL_TOUTES)
*
* @param routeId Route ID
*/
public int getConcurrentRequests(String routeId) {
RouteStat routeStat = getRouteStat(routeId);
return routeStat.getConcurrentRequests().get();
}
/**
* Add request to current time slot and decrement concurrent connection counter
*
* @param routeId Route ID
* @param rt Response time of request
* @param isSuccess Whether the request is success or not
*/
public void incrRequest(String routeId, long rt, boolean isSuccess) {
incrRequestToTimeSlot(routeId, currentTimeSlotId(), rt, isSuccess);
}
/**
* Add request to the specified time slot and decrement concurrent connection
* counter
*
* @param routeId Route ID
* @param timeSlotId TimeSlot ID
* @param rt Response time of request
* @param isSuccess Whether the request is success or not
* @return
*/
public void incrRequestToTimeSlot(String routeId, long timeSlotId, long rt, boolean isSuccess) {
if (routeId == null) {
return;
}
RouteStat routeStat = getRouteStat(routeId);
RouteStat allRouteStat = getRouteStat(ALL_TOUTES);
routeStat.incrRequestToTimeSlot(timeSlotId, rt, isSuccess);
allRouteStat.incrRequestToTimeSlot(timeSlotId, rt, isSuccess);
}
private RouteStat getRouteStat(String routeId) {
RouteStat routeStat = null;
if (routeStats.containsKey(routeId)) {
routeStat = routeStats.get(routeId);
} else {
routeStat = new RouteStat(routeId);
RouteStat rs = routeStats.putIfAbsent(routeId, routeStat);
if (rs != null) {
routeStat = rs;
}
}
return routeStat;
}
/**
* Returns current TimeWindowStat of the specified route
*
* @param routeId
* @return
*/
public TimeWindowStat getCurrentTimeWindowStat(String routeId) {
long startTimeMilli = currentTimeSlotId();
return getTimeWindowStat(routeId, startTimeMilli, startTimeMilli + 1000);
}
/**
* Returns the TimeWindowStat of previous second
*
* @param timeMilli
* @return
*/
public TimeWindowStat getPreviousSecondStat(String routeId, long timeMilli) {
long endTimeMilli = (timeMilli / INTERVAL) * INTERVAL;
return getTimeWindowStat(routeId, endTimeMilli - 1000, endTimeMilli);
}
/**
* Returns the timeWindowStat of the specific route in the specified time window
* [startTimeMilli, endTimeMilli)
*
* @param startTimeMilli included
* @param endTimemilli excluded
* @return
*/
public TimeWindowStat getTimeWindowStat(String routeId, long startTimeMilli, long endTimeMilli) {
long startSlotId = (startTimeMilli / INTERVAL) * INTERVAL;
long endSlotId = (endTimeMilli / INTERVAL) * INTERVAL;
if (startSlotId == endSlotId) {
endSlotId = endSlotId + INTERVAL;
}
if (routeStats.containsKey(routeId)) {
RouteStat routeStat = routeStats.get(routeId);
return routeStat.getTimeWindowStat(startSlotId, endSlotId);
}
return null;
}
/**
* Returns the RouteTimeWindowStat list in the specified time window
* [startTimeMilli, endTimeMilli), The time slot unit is one second
*
* @param routeId optional, returns RouteSlot list of all routes while
* routeId is null
* @param startTimeMilli
* @param endTimeMilli
* @return
*/
@SuppressWarnings("unused")
public List<RouteTimeWindowStat> getRouteTimeWindowStats(String routeId, long startTimeMilli, long endTimeMilli) {
return this.getRouteTimeWindowStats(routeId, startTimeMilli, endTimeMilli, INTERVAL);
}
/**
* Returns the RouteTimeWindowStat list in the specified time window
* [startTimeMilli, endTimeMilli)
*
* @param routeId optional, returns RouteTimeWindowStat list of all
* routes while routeId is null
* @param startTimeMilli
* @param endTimeMilli
* @param slotIntervalInSec interval of custom time slot in millisecond, such as
* 60 for 1 minutes
* @return
*/
@SuppressWarnings("unused")
public List<RouteTimeWindowStat> getRouteTimeWindowStats(String routeId, long startTimeMilli, long endTimeMilli,
long slotIntervalInSec) {
List<RouteTimeWindowStat> list = new ArrayList<>();
long startSlotId = (startTimeMilli / INTERVAL) * INTERVAL;
long endSlotId = (endTimeMilli / INTERVAL) * INTERVAL;
if (startSlotId == endSlotId) {
endSlotId = endSlotId + INTERVAL;
}
if (slotIntervalInSec < 1 || (endSlotId - startSlotId) / 1000 < slotIntervalInSec) {
return list;
}
long slotInterval = slotIntervalInSec * 1000;
if (routeId == null) {
Set<Map.Entry<String, RouteStat>> entrys = routeStats.entrySet();
for (Entry<String, RouteStat> entry : entrys) {
String rid = entry.getKey();
RouteTimeWindowStat routeWin = new RouteTimeWindowStat(rid);
for (long i = startSlotId; i < endSlotId;) {
TimeWindowStat tws = getTimeWindowStat(routeId, startSlotId, endSlotId);
if (tws != null) {
routeWin.getWindows().add(tws);
}
i = i + slotInterval;
}
if (routeWin.getWindows().size() > 0) {
list.add(routeWin);
}
}
} else {
RouteTimeWindowStat routeWin = new RouteTimeWindowStat(routeId);
for (long i = startSlotId; i < endSlotId;) {
TimeWindowStat tws = getTimeWindowStat(routeId, startSlotId, endSlotId);
if (tws != null) {
routeWin.getWindows().add(tws);
}
i = i + slotInterval;
}
if (routeWin.getWindows().size() > 0) {
list.add(routeWin);
}
}
return list;
}
class HousekeepJob implements Runnable {
private FlowStat stat;
public HousekeepJob(FlowStat stat) {
this.stat = stat;
}
@Override
public void run() {
long n = 2 * 60 * 60 * 1000 / FlowStat.INTERVAL * FlowStat.INTERVAL;
long lastSlotId = stat.currentTimeSlotId() - n;
while (true) {
log.debug("housekeeping start");
long slotId = stat.currentTimeSlotId() - n;
for (long i = lastSlotId; i < slotId;) {
Set<Map.Entry<String, RouteStat>> entrys = stat.routeStats.entrySet();
for (Entry<String, RouteStat> entry : entrys) {
String routeId = entry.getKey();
ConcurrentMap<Long, TimeSlot> timeSlots = entry.getValue().getTimeSlots();
log.debug("housekeeping remove slot: routeId={} slotId=={}", routeId, i);
timeSlots.remove(i);
}
i = i + FlowStat.INTERVAL;
}
lastSlotId = slotId;
log.debug("housekeeping done");
try {
Thread.sleep(60 * 1000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}

View File

@@ -0,0 +1,144 @@
package we.stats;
import java.math.BigDecimal;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
*
* @author Francis Dong
*
*/
public class RouteStat {
/**
* Route ID
*/
private String routeId;
/**
* Request count of time slot, the beginning timestamp(timeId) as key
*/
private ConcurrentMap<Long, TimeSlot> timeSlots = new ConcurrentHashMap<>();
/**
* Concurrent requests
*/
private AtomicInteger concurrentRequests = new AtomicInteger(0);
public RouteStat(String routeId) {
this.routeId = routeId;
}
/**
* Increment concurrent request counter of the route
*
*/
public void incrConcurrentRequest() {
this.concurrentRequests.incrementAndGet();
}
/**
* add request to the specified time slot and decrement concurrent request
* counter
*
* @param timeSlotId
* @param rt response time of the request
* @param isSuccess Whether the request is success or not
* @return
*/
public void incrRequestToTimeSlot(long timeSlotId, long rt, boolean isSuccess) {
int conns = this.concurrentRequests.decrementAndGet();
if (timeSlots.containsKey(timeSlotId)) {
timeSlots.get(timeSlotId).incr(rt, conns, isSuccess);
} else {
TimeSlot timeSlot = new TimeSlot(timeSlotId);
TimeSlot old = timeSlots.putIfAbsent(timeSlotId, timeSlot);
if (old != null) {
old.incr(rt, conns, isSuccess);
} else {
timeSlot.incr(rt, conns, isSuccess);
}
}
}
/**
* Returns statistic of the specified time window
*
* @param startSlotId
* @param endSlotId
* @return
*/
public TimeWindowStat getTimeWindowStat(long startSlotId, long endSlotId) {
TimeWindowStat tws = new TimeWindowStat();
long min = Long.MAX_VALUE;
long max = Long.MIN_VALUE;
long totalReqs = 0;
long totalRts = 0;
int peakConcurrences = 0;
long errors = 0;
for (long i = startSlotId; i < endSlotId;) {
if (timeSlots.containsKey(i)) {
TimeSlot timeSlot = timeSlots.get(i);
min = timeSlot.getMin() < min ? timeSlot.getMin() : min;
max = timeSlot.getMax() > max ? timeSlot.getMax() : max;
peakConcurrences = timeSlot.getPeakConcurrentReqeusts() > peakConcurrences
? timeSlot.getPeakConcurrentReqeusts()
: peakConcurrences;
totalReqs = totalReqs + timeSlot.getCounter().get();
totalRts = totalRts + timeSlot.getTotalRts().get();
errors = errors + timeSlot.getErrors().get();
}
i = i + FlowStat.INTERVAL;
}
tws.setMin(min == Long.MAX_VALUE ? null : min);
tws.setMax(max == Long.MIN_VALUE ? null : max);
tws.setPeakConcurrentReqeusts(peakConcurrences);
tws.setTotal(totalReqs);
tws.setErrors(errors);
if (totalReqs > 0) {
tws.setAvgRt(totalRts / totalReqs);
BigDecimal nsec = new BigDecimal(endSlotId - startSlotId).divide(new BigDecimal(1000), 5,
BigDecimal.ROUND_HALF_UP);
BigDecimal rps = new BigDecimal(totalReqs).divide(nsec, 5, BigDecimal.ROUND_HALF_UP);
if (rps.compareTo(new BigDecimal(10)) >= 0) {
rps = rps.setScale(0, BigDecimal.ROUND_HALF_UP).stripTrailingZeros();
} else {
rps = rps.setScale(2, BigDecimal.ROUND_HALF_UP).stripTrailingZeros();
}
tws.setRps(rps);
}
return tws;
}
public String getRouteId() {
return routeId;
}
public void setRouteId(String routeId) {
this.routeId = routeId;
}
public ConcurrentMap<Long, TimeSlot> getTimeSlots() {
return timeSlots;
}
public void setTimeSlots(ConcurrentMap<Long, TimeSlot> timeSlots) {
this.timeSlots = timeSlots;
}
public AtomicInteger getConcurrentRequests() {
return concurrentRequests;
}
public void setConcurrentRequests(AtomicInteger concurrentRequests) {
this.concurrentRequests = concurrentRequests;
}
}

View File

@@ -0,0 +1,40 @@
package we.stats;
import java.util.ArrayList;
import java.util.List;
/**
*
* @author Francis Dong
*
*/
public class RouteTimeWindowStat {
/**
* Route ID
*/
private String routeId;
private List<TimeWindowStat> windows = new ArrayList<>();
public RouteTimeWindowStat(String routeId) {
this.routeId = routeId;
}
public String getRouteId() {
return routeId;
}
public void setRouteId(String routeId) {
this.routeId = routeId;
}
public List<TimeWindowStat> getWindows() {
return windows;
}
public void setWindows(List<TimeWindowStat> windows) {
this.windows = windows;
}
}

View File

@@ -0,0 +1,126 @@
package we.stats;
import java.util.concurrent.atomic.AtomicLong;
/**
*
* @author Francis Dong
*
*/
public class TimeSlot {
/**
* Time slot start timestamp as ID
*/
private long id;
/**
* Request counter
*/
private AtomicLong counter = new AtomicLong();
/**
* Error request counter
*/
private AtomicLong errors = new AtomicLong();
/**
* Minimum response time
*/
private long min = Long.MAX_VALUE;
/**
* Maximum response time
*/
private long max = Long.MIN_VALUE;
/**
* Total response time
*/
private AtomicLong totalRts = new AtomicLong(0);
/**
* Peak concurrent requests
*/
private int peakConcurrentReqeusts;
public TimeSlot(long id) {
this.id = id;
}
public long getId() {
return id;
}
/**
* Add request to time slot
*
* @param rt
* @param concurrentRequests Current concurrent requests
* @param isSuccess Whether the request is success or not
*/
public synchronized void incr(long rt, int concurrentRequests, boolean isSuccess) {
counter.incrementAndGet();
totalRts.addAndGet(rt);
if (!isSuccess) {
errors.incrementAndGet();
}
min = rt < min ? rt : min;
max = rt > max ? rt : max;
peakConcurrentReqeusts = concurrentRequests > peakConcurrentReqeusts ? concurrentRequests
: peakConcurrentReqeusts;
}
public void setId(long id) {
this.id = id;
}
public AtomicLong getCounter() {
return counter;
}
public void setCounter(AtomicLong counter) {
this.counter = counter;
}
public long getMin() {
return min;
}
public void setMin(long min) {
this.min = min;
}
public long getMax() {
return max;
}
public void setMax(long max) {
this.max = max;
}
public AtomicLong getTotalRts() {
return totalRts;
}
public void setTotalRts(AtomicLong totalRts) {
this.totalRts = totalRts;
}
public int getPeakConcurrentReqeusts() {
return peakConcurrentReqeusts;
}
public void setPeakConcurrentReqeusts(int peakConcurrentReqeusts) {
this.peakConcurrentReqeusts = peakConcurrentReqeusts;
}
public AtomicLong getErrors() {
return errors;
}
public void setErrors(AtomicLong errors) {
this.errors = errors;
}
}

View File

@@ -0,0 +1,129 @@
package we.stats;
import java.math.BigDecimal;
/**
*
* @author Francis Dong
*
*/
public class TimeWindowStat {
/**
* Start time of time window[startTime,endTime)
*/
private Long startTime;
/**
* End time of time window, [startTime,endTime)
*/
private Long endTime;
/**
* Minimum response time
*/
private Long min;
/**
* Maximum response time
*/
private Long max;
/**
* Average response time
*/
private Long avgRt;
/**
* Total requests
*/
private Long total;
/**
* Total error requests
*/
private Long errors;
/**
* the average RPS(Requests Per Second) of time window
*/
private BigDecimal rps;
/**
* Peak concurrent requests of the time window
*/
private Integer peakConcurrentReqeusts;
public Integer getPeakConcurrentReqeusts() {
return peakConcurrentReqeusts;
}
public void setPeakConcurrentReqeusts(Integer peakConcurrentReqeusts) {
this.peakConcurrentReqeusts = peakConcurrentReqeusts;
}
public Long getErrors() {
return errors;
}
public void setErrors(Long errors) {
this.errors = errors;
}
public Long getMin() {
return min;
}
public void setMin(Long min) {
this.min = min;
}
public Long getMax() {
return max;
}
public void setMax(Long max) {
this.max = max;
}
public BigDecimal getRps() {
return rps;
}
public void setRps(BigDecimal rps) {
this.rps = rps;
}
public Long getAvgRt() {
return avgRt;
}
public void setAvgRt(Long avgRt) {
this.avgRt = avgRt;
}
public Long getTotal() {
return total;
}
public void setTotal(Long total) {
this.total = total;
}
public Long getStartTime() {
return startTime;
}
public void setStartTime(Long startTime) {
this.startTime = startTime;
}
public Long getEndTime() {
return endTime;
}
public void setEndTime(Long endTime) {
this.endTime = endTime;
}
}

View File

@@ -0,0 +1,172 @@
package we.stats;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.math.BigDecimal;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
import we.stats.FlowStat;
import we.stats.TimeWindowStat;
import we.util.JacksonUtils;
/**
*
* @author Francis Dong
*
*/
public class FlowStatTests {
private FlowStat stat = new FlowStat();
@Test
public void testIncr() throws Throwable {
long t = stat.currentTimeSlotId();
long slotId = t + 1000;
String routeId = "a";
stat.incrRequestToTimeSlot(routeId, t, 100l, true);
TimeWindowStat tws = stat.getPreviousSecondStat(routeId, slotId);
assertEquals(1, tws.getTotal());
stat.incrRequestToTimeSlot(routeId, t, 300l, false);
tws = stat.getPreviousSecondStat(routeId, slotId);
assertEquals(2, tws.getTotal());
assertEquals(200, tws.getAvgRt());
assertEquals(100, tws.getMin());
assertEquals(300, tws.getMax());
assertEquals(2, tws.getRps().intValue());
assertEquals(1, tws.getErrors());
// System.out.println(JacksonUtils.writeValueAsString(stat.routeStats));
}
@Test
public void testStat() throws Throwable {
// requests per slot per route
int requests = 100;
int threads = 10;
int routes = 10;
int slots = 100;
long rt = 100;
long t1 = System.currentTimeMillis();
long start = (t1 / FlowStat.INTERVAL) * FlowStat.INTERVAL;
int totalRequests = requests * threads * routes * slots;
ExecutorService pool = Executors.newFixedThreadPool(10);
for (int i = 0; i < threads; i++) {
pool.submit(new Job(requests, routes, slots, start, rt));
}
pool.shutdown();
if (pool.awaitTermination(20, TimeUnit.SECONDS)) {
long t2 = System.currentTimeMillis();
long end = start + slots * FlowStat.INTERVAL;
long nsecs = (end - start) / 1000;
System.out.println("total requests" + totalRequests);
System.out.println("total elapsed time" + (t2 - t1) + "ms");
System.out.println("Testing Time Window" + (end - start) + "ms");
int route1 = 1;
int route2 = 2;
int rtBase1 = 1;
int rtBase3 = 3;
TimeWindowStat tws1 = stat.getTimeWindowStat("route-" + route1, start, end);
TimeWindowStat tws2 = stat.getTimeWindowStat("route-" + route2, start, end);
TimeWindowStat tws = stat.getTimeWindowStat(FlowStat.ALL_TOUTES, start, end);
assertEquals(totalRequests / routes, tws1.getTotal());
assertEquals(rt * rtBase1, tws1.getAvgRt());
assertEquals(rt * rtBase1, tws1.getMin());
assertEquals(rt * rtBase1, tws1.getMax());
assertEquals(totalRequests / routes / nsecs, tws1.getRps().intValue());
assertEquals(totalRequests / routes / 10, tws1.getErrors().intValue());
System.out.println("RPS of route1: " + tws1.getRps().intValue());
assertEquals(totalRequests / routes, tws2.getTotal());
assertEquals(rt * rtBase3, tws2.getAvgRt());
assertEquals(rt * rtBase3, tws2.getMin());
assertEquals(rt * rtBase3, tws2.getMax());
assertEquals(totalRequests / routes / nsecs, tws2.getRps().intValue());
assertEquals(totalRequests / routes / 10, tws2.getErrors().intValue());
System.out.println("RPS of route2: " + tws2.getRps().intValue());
assertEquals(totalRequests, tws.getTotal());
assertEquals((rt * rtBase1 + rt * rtBase3) / 2, tws.getAvgRt());
assertEquals(rt * rtBase1, tws.getMin());
assertEquals(rt * rtBase3, tws.getMax());
assertEquals(totalRequests / nsecs, tws.getRps().intValue());
assertEquals(totalRequests / 10, tws.getErrors().intValue());
System.out.println("RPS of all routes: " + tws.getRps().intValue());
// performance of getTimeWindowStat
for (int n = 0; n < 10; n++) {
long t3 = System.currentTimeMillis();
int times = 100000;
for (int i = 0; i < times; i++) {
stat.getTimeWindowStat("route-" + route1, start, end);
}
long t4 = System.currentTimeMillis();
System.out.println("performance of getTimeWindowStat: " + (t4 - t3) + "ms " + times + " times");
try {
Thread.sleep(10);
} catch (Exception e) {
e.printStackTrace();
}
}
// System.out.println(JacksonUtils.writeValueAsString(stat.routeStats));
List<RouteTimeWindowStat> list = stat.getRouteTimeWindowStats("route-" + 1, start, end, 10);
System.out.println(JacksonUtils.writeValueAsString(list));
} else {
System.out.println("timeout");
}
}
class Job implements Runnable {
public Job(int requests, int routes, int slots, long startSlotId, long rt) {
this.requests = requests;
this.routes = routes;
this.slots = slots;
this.startSlotId = startSlotId;
this.rt = rt;
}
private int requests = 0;
private int routes = 0;
private int slots = 0;
private long startSlotId = 0;
private long rt = 0;
@Override
public void run() {
for (int m = 0; m < slots; m++) {
for (int i = 0; i < requests; i++) {
for (int j = 0; j < routes; j++) {
stat.incrConcurrentRequest("route-" + j);
// 10% error
boolean isSuccess = i % 10 == 1 ? false : true;
// rt will be triple while even
stat.incrRequestToTimeSlot("route-" + j, startSlotId + (m * FlowStat.INTERVAL),
rt * (j % 2 == 0 ? 3 : 1), isSuccess);
}
try {
// Thread.sleep(1);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
}