upstream流程测通

This commit is contained in:
Liujian
2021-07-21 15:06:24 +08:00
parent 5ea1cfb755
commit 62802c5a92
11 changed files with 243 additions and 140 deletions

View File

@@ -27,7 +27,7 @@ func (f *factory) ExtendInfo() eosc.ExtendInfo {
ID: "eolinker:goku:discovery_static",
Group: "eolinker",
Project: "goku",
Name: "consul",
Name: "static",
}
}

View File

@@ -2,6 +2,7 @@ package discovery_static
import (
"context"
"errors"
"fmt"
"regexp"
"strconv"
@@ -18,6 +19,10 @@ import (
const name = "static"
var (
ErrorStructType = errors.New("error struct type")
)
type static struct {
id string
name string
@@ -43,7 +48,7 @@ func (s *static) Start() error {
func (s *static) Reset(conf interface{}, workers map[eosc.RequireId]interface{}) error {
cfg, ok := conf.(*Config)
if !ok {
return fmt.Errorf("need %s,now %s:%w", eosc.TypeNameOf((*Config)(nil)), eosc.TypeNameOf(conf), eosc.ErrorStructType)
return fmt.Errorf("need %s,now %s:%w", eosc.TypeNameOf((*Config)(nil)), eosc.TypeNameOf(conf), ErrorStructType)
}
s.locker.Lock()
s.labels = cfg.Labels
@@ -132,21 +137,23 @@ func (s *static) decode(config string) (discovery.IApp, error) {
for _, word := range words {
if word == ";" {
n := discovery.NewNode(node.labels, fmt.Sprintf("%s:%d", node.ip, node.port), node.ip, node.port)
nodes[n.Id()] = n
index = 0
node = nil
continue
}
l := len(word)
value := word
if word[l-1] == ';' {
value = word[:l-1]
}
switch index {
case 0:
{
// 域名+端口
node = new(Node)
vs := strings.Split(value, ":")
node = &Node{
labels: map[string]string{},
ip: "",
port: 0,
}
vs := strings.Split(word, ":")
// 先判断是否是IP端口模式
if !validIP(vs[0]) {
// 若不是IP端口模式则计入全局的属性
@@ -157,7 +164,7 @@ func (s *static) decode(config string) (discovery.IApp, error) {
break
}
if len(vs) > 2 {
return nil, fmt.Errorf("decode ip:port failt for[%s]", value)
return nil, fmt.Errorf("decode ip:port failt for[%s]", word)
}
node.ip = vs[0]
if len(vs) == 2 {
@@ -169,22 +176,19 @@ func (s *static) decode(config string) (discovery.IApp, error) {
default:
{
// label集合
args := strings.Split(value, "=")
args := strings.Split(word, "=")
if len(args) > 1 {
node.labels[args[0]] = args[1]
}
}
}
if word[l-1] == ';' {
n := discovery.NewNode(node.labels, fmt.Sprintf("%s:%d", node.ip, node.port), node.ip, node.port)
nodes[n.Id()] = n
index = 0
node = nil
} else {
index++
}
index++
}
n := discovery.NewNode(node.labels, fmt.Sprintf("%s:%d", node.ip, node.port), node.ip, node.port)
nodes[n.Id()] = n
index = 0
node = nil
agent := (discovery.IHealthChecker)(nil)
if s.checker != nil {
agent, _ = s.checker.Agent()

View File

@@ -7,8 +7,8 @@ type Agent struct {
*HttpCheck
}
func NewAgent(agentId string) *Agent {
return &Agent{agentId: agentId}
func NewAgent(agentId string, h *HttpCheck) *Agent {
return &Agent{agentId: agentId, HttpCheck: h}
}
func (a *Agent) AddToCheck(node discovery.INode) error {

View File

@@ -22,6 +22,7 @@ func NewHttpCheck(config Config) *HttpCheck {
ctx: ctx,
cancel: cancel,
ch: make(chan *checkNode, 10),
delCh: make(chan string, 10),
client: &http.Client{},
locker: sync.RWMutex{},
}
@@ -71,7 +72,7 @@ func (h *HttpCheck) doCheckLoop() {
}
func (h *HttpCheck) Agent() (discovery.IHealthChecker, error) {
return NewAgent(uuid.New()), nil
return NewAgent(uuid.New(), h), nil
}
func (h *HttpCheck) Reset(conf Config) error {

View File

@@ -42,7 +42,7 @@ func DoRequest(ctx *http_context.Context, uri string, timeout time.Duration) (*h
body, _ := ctx.ProxyRequest.RawBody()
req.SetRawBody(body)
if timeout != 0 {
req.SetTimeout(timeout * time.Millisecond)
req.SetTimeout(timeout)
}
err = req.ParseBody()
if err != nil {

View File

@@ -67,7 +67,7 @@ func Keys() []string {
return defaultDriverRegister.Keys()
}
func GetDriver(name string, app discovery.IApp) (IBalanceHandler, error) {
func GetFactory(name string) (IBalanceFactory, error) {
factory, ok := Get(name)
if !ok {
for _, key := range Keys() {
@@ -80,5 +80,5 @@ func GetDriver(name string, app discovery.IApp) (IBalanceHandler, error) {
return nil, errors.New("no valid balance handler")
}
}
return factory.Create(app)
return factory, nil
}

View File

@@ -45,22 +45,25 @@ type roundRobin struct {
gcdWeight int
// maxWeight 权重最大值
maxWeight int
cw int
}
func (r *roundRobin) Next() (discovery.INode, error) {
cw := 0
for {
r.index = (r.index + 1) % r.size
if r.index == 0 {
cw = cw - r.gcdWeight
if cw <= 0 {
cw = r.maxWeight
if cw == 0 {
r.cw = r.cw - r.gcdWeight
if r.cw <= 0 {
r.cw = r.maxWeight
if r.cw == 0 {
return nil, errors.New("")
}
}
}
if r.nodes[r.index].weight >= cw {
if r.nodes[r.index].weight >= r.cw {
if r.nodes[r.index].Status() == discovery.Down {
continue
}
return r.nodes[r.index], nil
}
}
@@ -72,8 +75,9 @@ func newRoundRobin(nodes []discovery.INode) *roundRobin {
nodes: make([]node, 0, size),
size: size,
}
for i, n := range r.nodes {
weight, _ := r.nodes[r.index].GetAttrByName("weight")
for i, n := range nodes {
weight, _ := n.GetAttrByName("weight")
w, _ := strconv.Atoi(weight)
if w == 0 {
w = 1
@@ -87,6 +91,7 @@ func newRoundRobin(nodes []discovery.INode) *roundRobin {
}
r.gcdWeight = gcd(w, r.gcdWeight)
r.maxWeight = max(w, r.maxWeight)
}
return r
}

View File

@@ -1,12 +1,8 @@
package upstream_http
import (
"errors"
"fmt"
"reflect"
"github.com/eolinker/goku-eosc/discovery"
"github.com/eolinker/eosc"
)
@@ -34,29 +30,15 @@ func (d *driver) ConfigType() reflect.Type {
}
func (d *driver) Create(id, name string, v interface{}, workers map[eosc.RequireId]interface{}) (eosc.IWorker, error) {
cfg, ok := v.(*Config)
if !ok {
return nil, errors.New(fmt.Sprintf(ErrorStructType, eosc.TypeNameOf(v), d.configType))
w := &httpUpstream{
id: id,
name: name,
driver: driverName,
}
err := w.Reset(v, workers)
if err != nil {
return nil, err
}
if factory, has := workers[cfg.Discovery]; has {
f, ok := factory.(discovery.IDiscovery)
if ok {
app, err := f.GetApp(cfg.Config)
if err != nil {
return nil, err
}
w := &httpUpstream{
id: id,
name: name,
driver: cfg.Driver,
desc: cfg.Desc,
scheme: cfg.Scheme,
balanceType: cfg.Type,
app: app,
}
return w, nil
}
}
return nil, errors.New("fail to create upstream worker")
return w, nil
}

View File

@@ -1,62 +0,0 @@
package upstream_http
import (
"fmt"
"net/http"
"reflect"
"github.com/eolinker/goku-eosc/service"
"github.com/eolinker/goku-eosc/upstream/balance"
http_proxy "github.com/eolinker/goku-eosc/upstream/upstream-http/http-proxy"
http_context "github.com/eolinker/goku-eosc/node/http-context"
"github.com/eolinker/goku-eosc/utils"
)
//Http org
type httpUpstream struct {
Scheme string `json:"scheme"`
Nodes []*node `json:"nodes" yaml:"nodes"`
Type string `json:"type"`
}
type node struct {
IP string `json:"ip" yaml:"ip"`
Port int `json:"port" yaml:"port"`
Labels map[string]string `json:"labels" yaml:"labels"`
}
//send 请求发送,忽略重试
func (h *httpUpstream) Send(ctx *http_context.Context, serviceDetail service.IServiceDetail, handler balance.IBalanceHandler) (*http.Response, error) {
var response *http.Response
var err error
path := utils.TrimPrefixAll(ctx.ProxyRequest.TargetURL(), "/")
node, err := handler.Next()
if err != nil {
return nil, err
}
for doTrice := serviceDetail.GetRetry() + 1; doTrice > 0; doTrice-- {
u := fmt.Sprintf("%s://%s/%s", h.Scheme, node.Addr(), path)
response, err = http_proxy.DoRequest(ctx, u, serviceDetail.GetTimeout())
if err != nil {
node, err = handler.Next()
if err != nil {
return nil, err
}
continue
} else {
return response, err
}
}
return response, err
}
func GetType() reflect.Type {
return reflect.TypeOf((*httpUpstream)(nil))
}

View File

@@ -4,7 +4,6 @@ import (
"errors"
"fmt"
"net/http"
"reflect"
"github.com/eolinker/goku-eosc/upstream"
@@ -31,7 +30,7 @@ type httpUpstream struct {
scheme string
balanceType string
app discovery.IApp
balanceHandler balance.IBalanceHandler
balanceFactory balance.IBalanceFactory
}
func (h *httpUpstream) Id() string {
@@ -39,11 +38,6 @@ func (h *httpUpstream) Id() string {
}
func (h *httpUpstream) Start() error {
handler, err := balance.GetDriver(h.balanceType, h.app)
if err != nil {
return err
}
h.balanceHandler = handler
return nil
}
@@ -63,11 +57,14 @@ func (h *httpUpstream) Reset(conf interface{}, workers map[eosc.RequireId]interf
h.scheme = cfg.Scheme
h.balanceType = cfg.Type
h.app = app
handler, err := balance.GetDriver(h.balanceType, h.app)
if err != nil {
return err
}
h.balanceFactory, err = balance.GetFactory(h.balanceType)
if err != nil {
return err
}
h.balanceHandler = handler
return nil
}
}
@@ -85,15 +82,19 @@ func (h *httpUpstream) CheckSkill(skill string) bool {
//send 请求发送,忽略重试
func (h *httpUpstream) Send(ctx *http_context.Context, serviceDetail service.IServiceDetail) (*http.Response, error) {
handler, err := h.balanceFactory.Create(h.app)
if err != nil {
return nil, err
}
var response *http.Response
var err error
path := utils.TrimPrefixAll(ctx.ProxyRequest.TargetURL(), "/")
node, err := h.balanceHandler.Next()
node, err := handler.Next()
if err != nil {
return nil, err
}
for doTrice := serviceDetail.Retry() + 1; doTrice > 0; doTrice-- {
fmt.Println("addr is:", node.Addr())
u := fmt.Sprintf("%s://%s/%s", h.scheme, node.Addr(), path)
response, err = http_proxy.DoRequest(ctx, u, serviceDetail.Timeout())
@@ -102,7 +103,7 @@ func (h *httpUpstream) Send(ctx *http_context.Context, serviceDetail service.ISe
node.Down()
}
h.app.NodeError(node.Id())
node, err = h.balanceHandler.Next()
node, err = handler.Next()
if err != nil {
return nil, err
}
@@ -114,7 +115,3 @@ func (h *httpUpstream) Send(ctx *http_context.Context, serviceDetail service.ISe
return response, err
}
func GetType() reflect.Type {
return reflect.TypeOf((*httpUpstream)(nil))
}

View File

@@ -0,0 +1,176 @@
package upstream_http
import (
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strings"
"testing"
"time"
"github.com/eolinker/goku-eosc/service"
round_robin "github.com/eolinker/goku-eosc/upstream/round-robin"
http_context "github.com/eolinker/goku-eosc/node/http-context"
"github.com/eolinker/goku-eosc/upstream"
discovery_static "github.com/eolinker/goku-eosc/discovery/discovery-static"
"github.com/eolinker/eosc"
)
type Service struct {
name string
desc string
retry int
timeout time.Duration
scheme string
addr string
}
func (s *Service) Name() string {
return s.name
}
func (s *Service) Desc() string {
return s.desc
}
func (s *Service) Retry() int {
return s.retry
}
func (s *Service) Timeout() time.Duration {
return s.timeout
}
func (s *Service) Scheme() string {
return s.scheme
}
func (s *Service) ProxyAddr() string {
return s.ProxyAddr()
}
func TestSend(t *testing.T) {
round_robin.Register()
s := &Service{
name: "参数打印",
desc: "打印所有参数",
retry: 3,
timeout: time.Second * 10,
scheme: "http",
}
factory := NewFactory()
t.Log("upstream extend info:", factory.ExtendInfo())
driver, err := factory.Create("upstream", "http_proxy", "", "http转发驱动", nil)
if err != nil {
t.Error(err)
return
}
cfg := &Config{
Name: "product-user",
Driver: "http_proxy",
Desc: "生产环境-用户模块",
Scheme: "http",
Type: "round-robin",
Config: "127.0.0.1:8580 weight=10;47.95.203.198:8080 weight=15",
Discovery: "static_1@discovery",
}
staticDiscovery := discovery_static.NewFactory()
t.Log("static discovery extend info:", staticDiscovery.ExtendInfo())
staticDriver, err := staticDiscovery.Create("discovery", "static", "", "静态服务发现驱动", nil)
if err != nil {
t.Error(err)
return
}
staticCfg := &discovery_static.Config{
Name: "static_1",
Driver: "static",
Labels: nil,
Health: &discovery_static.HealthConfig{
Protocol: "http",
Method: "GET",
Url: "/",
SuccessCode: 404,
Period: 30,
Timeout: 3000,
},
HealthOn: true,
}
staticWorker, err := staticDriver.Create("", "static_1", staticCfg, nil)
if err != nil {
t.Error(err)
return
}
worker, err := driver.Create(
"",
"product-user",
cfg,
map[eosc.RequireId]interface{}{
"static_1@discovery": staticWorker,
})
if err != nil {
t.Error(err)
return
}
worker.Start()
hUpstream, ok := worker.(upstream.IUpstream)
if !ok {
t.Error(ErrorStructType)
return
}
data := url.Values{}
data.Set("name", "eolinker")
r, err := http.NewRequest("POST", "http://localhost:8080/Web/Test/params/print", strings.NewReader(data.Encode()))
if err != nil {
t.Error(ErrorStructType)
return
}
ctx := http_context.NewContext(r, &response{})
// 设置目标URL
ctx.ProxyRequest.SetTargetURL(r.URL.Path)
for i := 0; i < 10; i++ {
now := time.Now()
err = send(ctx, s, hUpstream)
if err != nil {
t.Error(err)
}
fmt.Println("spend time is", time.Now().Sub(now))
}
}
func send(ctx *http_context.Context, s service.IServiceDetail, hUpstream upstream.IUpstream) error {
resp, err := hUpstream.Send(ctx, s)
if err != nil {
return err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
fmt.Println(string(body))
return nil
}
type response struct {
}
func (r *response) Header() http.Header {
panic("implement me")
}
func (r *response) Write(bytes []byte) (int, error) {
panic("implement me")
}
func (r *response) WriteHeader(statusCode int) {
panic("implement me")
}