support total block requests
This commit is contained in:
@@ -28,6 +28,7 @@ import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.function.BiFunction;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@@ -110,6 +111,21 @@ public class FlowStat {
|
||||
* @return IncrRequestResult
|
||||
*/
|
||||
public IncrRequestResult incrRequest(List<ResourceConfig> resourceConfigs, long curTimeSlotId) {
|
||||
return incrRequest(resourceConfigs, curTimeSlotId, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Increase concurrent request counter for given resources chain
|
||||
*
|
||||
* @param resourceConfigs Resource configurations
|
||||
* @param curTimeSlotId current time slot ID, it should be generated by
|
||||
* Flowstat.currentTimeSlotId()
|
||||
* @param totalBlockFunc [optional] callback function for statistic of total
|
||||
* block requests of the resource and its parent resources
|
||||
* @return IncrRequestResult
|
||||
*/
|
||||
public IncrRequestResult incrRequest(List<ResourceConfig> resourceConfigs, long curTimeSlotId,
|
||||
BiFunction<ResourceConfig, List<ResourceConfig>, List<ResourceConfig>> totalBlockFunc) {
|
||||
if (resourceConfigs == null || resourceConfigs.size() == 0) {
|
||||
return null;
|
||||
}
|
||||
@@ -126,6 +142,16 @@ public class FlowStat {
|
||||
long n = resourceStat.getConcurrentRequests().get();
|
||||
if (n >= maxCon) {
|
||||
resourceStat.incrBlockRequestToTimeSlot(curTimeSlotId);
|
||||
if (totalBlockFunc != null) {
|
||||
List<ResourceConfig> parentResCfgs = totalBlockFunc.apply(resourceConfig,
|
||||
resourceConfigs);
|
||||
if (parentResCfgs != null && parentResCfgs.size() > 0) {
|
||||
for (ResourceConfig pResCfg : parentResCfgs) {
|
||||
getResourceStat(pResCfg.getResourceId())
|
||||
.incrTotalBlockRequestToTimeSlot(curTimeSlotId);
|
||||
}
|
||||
}
|
||||
}
|
||||
return IncrRequestResult.block(resourceConfig.getResourceId(),
|
||||
BlockType.CONCURRENT_REQUEST);
|
||||
}
|
||||
@@ -136,6 +162,16 @@ public class FlowStat {
|
||||
long total = resourceStat.getTimeSlot(curTimeSlotId).getCounter().get();
|
||||
if (total >= maxQPS) {
|
||||
resourceStat.incrBlockRequestToTimeSlot(curTimeSlotId);
|
||||
if (totalBlockFunc != null) {
|
||||
List<ResourceConfig> parentResCfgs = totalBlockFunc.apply(resourceConfig,
|
||||
resourceConfigs);
|
||||
if (parentResCfgs != null && parentResCfgs.size() > 0) {
|
||||
for (ResourceConfig pResCfg : parentResCfgs) {
|
||||
getResourceStat(pResCfg.getResourceId())
|
||||
.incrTotalBlockRequestToTimeSlot(curTimeSlotId);
|
||||
}
|
||||
}
|
||||
}
|
||||
return IncrRequestResult.block(resourceConfig.getResourceId(), BlockType.QPS);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -128,6 +128,14 @@ public class ResourceStat {
|
||||
public void incrBlockRequestToTimeSlot(long timeSlotId) {
|
||||
this.getTimeSlot(timeSlotId).getBlockRequests().incrementAndGet();
|
||||
}
|
||||
|
||||
/**
|
||||
* Increase total block request to the specified time slot
|
||||
*
|
||||
*/
|
||||
public void incrTotalBlockRequestToTimeSlot(long timeSlotId) {
|
||||
this.getTimeSlot(timeSlotId).getTotalBlockRequests().incrementAndGet();
|
||||
}
|
||||
|
||||
/**
|
||||
* Add request to the specified time slot
|
||||
@@ -197,6 +205,7 @@ public class ResourceStat {
|
||||
long peakConcurrences = 0;
|
||||
long errors = 0;
|
||||
long blockReqs = 0;
|
||||
long totalBlockReqs = 0;
|
||||
long compReqs = 0;
|
||||
for (long i = startSlotId; i < endSlotId;) {
|
||||
if (timeSlots.containsKey(i)) {
|
||||
@@ -210,6 +219,7 @@ public class ResourceStat {
|
||||
totalRt = totalRt + timeSlot.getTotalRt().get();
|
||||
errors = errors + timeSlot.getErrors().get();
|
||||
blockReqs = blockReqs + timeSlot.getBlockRequests().get();
|
||||
totalBlockReqs = totalBlockReqs + timeSlot.getTotalBlockRequests().get();
|
||||
compReqs = compReqs + timeSlot.getCompReqs().get();
|
||||
}
|
||||
i = i + FlowStat.INTERVAL;
|
||||
@@ -220,6 +230,7 @@ public class ResourceStat {
|
||||
tws.setTotal(totalReqs);
|
||||
tws.setErrors(errors);
|
||||
tws.setBlockRequests(blockReqs);
|
||||
tws.setTotalBlockRequests(totalBlockReqs);
|
||||
tws.setCompReqs(compReqs);
|
||||
|
||||
if (compReqs > 0) {
|
||||
|
||||
@@ -70,6 +70,11 @@ public class TimeSlot {
|
||||
* Block requests <br/>
|
||||
*/
|
||||
private AtomicLong blockRequests = new AtomicLong(0);
|
||||
|
||||
/**
|
||||
* Total block requests of the resource and its underlying resources <br/>
|
||||
*/
|
||||
private AtomicLong totalBlockRequests = new AtomicLong(0);
|
||||
|
||||
public TimeSlot(long id) {
|
||||
this.id = id;
|
||||
@@ -181,4 +186,12 @@ public class TimeSlot {
|
||||
this.compReqs = compReqs;
|
||||
}
|
||||
|
||||
public AtomicLong getTotalBlockRequests() {
|
||||
return totalBlockRequests;
|
||||
}
|
||||
|
||||
public void setTotalBlockRequests(AtomicLong totalBlockRequests) {
|
||||
this.totalBlockRequests = totalBlockRequests;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -79,6 +79,11 @@ public class TimeWindowStat {
|
||||
* Block requests
|
||||
*/
|
||||
private Long blockRequests;
|
||||
|
||||
/**
|
||||
* Total block requests
|
||||
*/
|
||||
private Long totalBlockRequests;
|
||||
|
||||
public Long getBlockRequests() {
|
||||
return blockRequests;
|
||||
@@ -168,4 +173,12 @@ public class TimeWindowStat {
|
||||
this.compReqs = compReqs;
|
||||
}
|
||||
|
||||
public Long getTotalBlockRequests() {
|
||||
return totalBlockRequests;
|
||||
}
|
||||
|
||||
public void setTotalBlockRequests(Long totalBlockRequests) {
|
||||
this.totalBlockRequests = totalBlockRequests;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user