refactor the hard code with Eureka to support optional Eureka and Nacos.

This commit is contained in:
zhongjie
2020-11-09 15:05:05 +08:00
parent d595c73b17
commit 89757b342f
6 changed files with 157 additions and 39 deletions

View File

@@ -0,0 +1,17 @@
package we.proxy;
import we.util.Constants;
import we.util.ThreadContext;
/**
* Abstract implementation of {@code DiscoveryClientUriSelector}
*
* @author zhongjie
*/
abstract public class AbstractDiscoveryClientUriSelector implements DiscoveryClientUriSelector {
protected String buildUri(String ipAddr, int port, String path) {
StringBuilder b = ThreadContext.getStringBuilder();
return b.append(Constants.Symbol.HTTP_PROTOCOL_PREFIX).append(ipAddr).append(Constants.Symbol.COLON).append(port).append(path).toString();
}
}

View File

@@ -0,0 +1,19 @@
package we.proxy;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Service;
/**
* The disable implementation of {@code DiscoveryClientUriSelector}, used when Nacos and Eureka discovery are not enabled.
*
* @author zhongjie
*/
@ConditionalOnExpression("${nacos.discovery.enabled} == false and ${eureka.client.enabled} == false")
@Service
public class DisableDiscoveryUriSelector implements DiscoveryClientUriSelector {
@Override
public String getNextUri(String service, String relativeUri) {
throw new RuntimeException("No " + service + " because discovery disabled", null, false, false) {};
}
}

View File

@@ -0,0 +1,16 @@
package we.proxy;
/**
* A {@code DiscoveryClientUriSelector} is used to select the uri for the next request
*
* @author zhongjie
*/
public interface DiscoveryClientUriSelector {
/**
* find a instance of service by discovery and return the uri that http://{instance-ip-addr}:{instance-port}{relativeUri}
* @param service service name
* @param relativeUri relative uri
* @return the uri for the next request
*/
String getNextUri(String service, String relativeUri);
}

View File

@@ -0,0 +1,55 @@
package we.proxy;
import com.netflix.appinfo.InstanceInfo;
import com.netflix.discovery.EurekaClient;
import com.netflix.discovery.shared.Applications;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.List;
/**
* The Eureka implementation of {@code DiscoveryClientUriSelector}
*
* @author zhongjie
*/
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
@Service
public class EurekaUriSelector extends AbstractDiscoveryClientUriSelector {
@Resource
private EurekaClient eurekaClient;
@Override
public String getNextUri(String service, String relativeUri) {
InstanceInfo inst = roundRobinChoose1instFrom(service);
return buildUri(inst.getIPAddr(), inst.getPort(), relativeUri);
}
// private static List<InstanceInfo> aggrMemberInsts = new ArrayList<>();
// static {
// InstanceInfo i0 = InstanceInfo.Builder.newBuilder().setAppName("TRIP-MINI").setIPAddr("xxx.25.63.192").setPort(7094).build();
// aggrMemberInsts.add(i0);
// }
// private static AtomicLong counter = new AtomicLong(0);
// private static final String aggrMember = "trip-mini";
private InstanceInfo roundRobinChoose1instFrom(String service) {
// if (aggrMember.equals(service)) {
// int idx = (int) (counter.incrementAndGet() % aggrMemberInsts.size());
// return aggrMemberInsts.get(idx);
// }
List<InstanceInfo> insts = eurekaClient.getInstancesByVipAddress(service, false);
if (insts == null || insts.isEmpty()) {
throw new RuntimeException("eureka no " + service, null, false, false) {};
}
Applications apps = eurekaClient.getApplications();
int index = (int) (apps.getNextIndex(service.toUpperCase(), false).incrementAndGet() % insts.size());
return insts.get(index);
}
}

View File

@@ -17,10 +17,7 @@
package we.proxy;
import com.netflix.appinfo.InstanceInfo;
import com.netflix.discovery.EurekaClient;
import com.netflix.discovery.shared.Applications;
import com.alibaba.nacos.api.config.annotation.NacosValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
@@ -43,10 +40,7 @@ import we.util.WebUtils;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author hongqiaowei
@@ -62,7 +56,7 @@ public class FizzWebClient {
private static final String localhost = "localhost";
@Resource
private EurekaClient eurekaClient;
private DiscoveryClientUriSelector discoveryClientUriSelector;
@Resource(name = ProxyWebClientConfig.proxyWebClient)
private WebClient proxyWebClient;
@@ -70,6 +64,7 @@ public class FizzWebClient {
@Resource(name = AggrWebClientConfig.aggrWebClient)
private WebClient aggrWebClient;
@NacosValue(value = "${fizz-web-client.timeout:-1}")
@Value("${fizz-web-client.timeout:-1}")
private long timeout = -1;
@@ -140,8 +135,7 @@ public class FizzWebClient {
// what about multiple nginx instance
// current
InstanceInfo inst = roundRobinChoose1instFrom(service);
String uri = buildUri(inst, relativeUri);
String uri = discoveryClientUriSelector.getNextUri(service, relativeUri);
return send2uri(originReqIdOrBizId, method, uri, headers, body, cbc);
}
@@ -225,37 +219,8 @@ public class FizzWebClient {
// TODO 请求完成后做metric, 以反哺后续的请求转发
}
private String buildUri(InstanceInfo inst, String path) {
StringBuilder b = ThreadContext.getStringBuilder();
return b.append(Constants.Symbol.HTTP_PROTOCOL_PREFIX).append(inst.getIPAddr()).append(Constants.Symbol.COLON).append(inst.getPort()).append(path).toString();
}
// private static List<InstanceInfo> aggrMemberInsts = new ArrayList<>();
// static {
// InstanceInfo i0 = InstanceInfo.Builder.newBuilder().setAppName("TRIP-MINI").setIPAddr("xxx.25.63.192").setPort(7094).build();
// aggrMemberInsts.add(i0);
// }
// private static AtomicLong counter = new AtomicLong(0);
// private static final String aggrMember = "trip-mini";
private InstanceInfo roundRobinChoose1instFrom(String service) {
// if (aggrMember.equals(service)) {
// int idx = (int) (counter.incrementAndGet() % aggrMemberInsts.size());
// return aggrMemberInsts.get(idx);
// }
List<InstanceInfo> insts = eurekaClient.getInstancesByVipAddress(service, false);
if (insts == null || insts.isEmpty()) {
throw new RuntimeException("eureka no " + service, null, false, false) {};
}
Applications apps = eurekaClient.getApplications();
int index = (int) (apps.getNextIndex(service.toUpperCase(), false).incrementAndGet() % insts.size());
return insts.get(index);
}
private String extractServiceOrAddress(String uriOrSvc) {
return uriOrSvc.substring(7, uriOrSvc.indexOf(Constants.Symbol.FORWARD_SLASH, 10));
}

View File

@@ -0,0 +1,46 @@
package we.proxy;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.pojo.Instance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**
* The Nacos implementation of {@code DiscoveryClientUriSelector}
*
* @author zhongjie
*/
@ConditionalOnProperty(value = "nacos.discovery.enabled")
@Service
public class NacosUriSelector extends AbstractDiscoveryClientUriSelector {
private static final Logger log = LoggerFactory.getLogger(NacosUriSelector.class);
@Resource
private NamingService naming;
@Override
public String getNextUri(String service, String relativeUri) {
Instance instance = this.selectOneHealthyInstance(service);
return super.buildUri(instance.getIp(), instance.getPort(), relativeUri);
}
private Instance selectOneHealthyInstance(String service) {
Instance instance = null;
try {
instance = naming.selectOneHealthyInstance(service);
} catch (NacosException e) {
log.warn("Nacos selectOneHealthyInstance({}) exception", service, e);
}
if (instance == null) {
throw new RuntimeException("Nacos no " + service, null, false, false) {};
}
return instance;
}
}