diff --git a/discovery/consul/consul_test.go b/discovery/consul/consul_test.go index e95f8184..e0cd8f8a 100644 --- a/discovery/consul/consul_test.go +++ b/discovery/consul/consul_test.go @@ -1,9 +1,10 @@ package consul import ( + "testing" + "github.com/eolinker/eosc/log" "github.com/eolinker/goku-eosc/discovery" - "testing" ) func TestConsulGetNodes(t *testing.T) { @@ -11,8 +12,8 @@ func TestConsulGetNodes(t *testing.T) { newConsul := &consul{ id: "newConsul", - address: []string{"39.108.94.48:8500", "39.108.94.48:8501"}, - params: map[string]string{"token": "a92316d8-5c99-4fa0-b4cd-30b9e66718aa"}, //token在39.108.94.48下的/opt/consul/server_config/node_3/conf/acl.hcl文件里 + address: []string{"10.1.94.48:8500", "10.1.94.48:8501"}, + params: map[string]string{"token": "a92316d8-5c99-4fa0-b4cd-30b9e66718aa"}, //token在10.1.94.48下的/opt/consul/server_config/node_3/conf/acl.hcl文件里 labels: map[string]string{"scheme": "http"}, services: discovery.NewServices(), context: nil, diff --git a/discovery/eureka/eureka_test.go b/discovery/eureka/eureka_test.go index 2ed59f3e..b899fe8a 100644 --- a/discovery/eureka/eureka_test.go +++ b/discovery/eureka/eureka_test.go @@ -2,8 +2,9 @@ package eureka import ( "fmt" - "github.com/eolinker/goku-eosc/discovery" "testing" + + "github.com/eolinker/goku-eosc/discovery" ) func TestGetApp(t *testing.T) { @@ -12,7 +13,7 @@ func TestGetApp(t *testing.T) { id: "1", name: "eolinker", address: []string{ - "http://39.108.94.48:8761/eureka", + "http://10.1.94.48:8761/eureka", }, params: map[string]string{ "username": "test", diff --git a/discovery/nacos/nacos_test.go b/discovery/nacos/nacos_test.go index 5b851112..1171494b 100644 --- a/discovery/nacos/nacos_test.go +++ b/discovery/nacos/nacos_test.go @@ -2,15 +2,16 @@ package nacos import ( "fmt" - "github.com/eolinker/goku-eosc/discovery" "testing" + + "github.com/eolinker/goku-eosc/discovery" ) func TestNacos_GetApp(t *testing.T) { serviceName := "nacos.naming.serviceName" n := &nacos{ address: []string{ - "39.108.94.48:8848", + "10.1.94.48:8848", }, params: map[string]string{ "username": "test", diff --git a/go.sum b/go.sum index e17f52f7..0052de41 100644 --- a/go.sum +++ b/go.sum @@ -9,6 +9,7 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/eolinker/eosc v0.0.0-20210719112509-35868a3fa3ed h1:ohyziPArYFDUmaMkjfmy7h7v1vZKxmhERBwb1Vup39U= github.com/eolinker/eosc v0.0.1 h1:xRDaWSomXpn9E6wVZXE1fQYBpGOeQ2liCwpV4+l+QSc= github.com/eolinker/eosc v0.0.1/go.mod h1:h9RyDaBnWKeg6fxQu8faG8TQq/sdG+ipaePTTVTEqqA= +github.com/eolinker/goku-api-gateway v2.1.1+incompatible h1:D5Jhrng6Z67vklr8LOGGT0hHN1y2vvIa/NENfAQMoEk= github.com/eolinker/goku-standard-plugin v0.1.5 h1:0ydlgjaWsbeQi0le1I7cDrEWTramh5hMdTM7ifG78HM= github.com/eolinker/goku-standard-plugin v0.1.5/go.mod h1:KX5n+WS9Sfobjft13BY5E8S5AWD8H0hhMLNN0d1p7F4= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= diff --git a/health-check-http/agent.go b/health-check-http/agent.go index 27cee0dd..69dd4be6 100644 --- a/health-check-http/agent.go +++ b/health-check-http/agent.go @@ -5,17 +5,17 @@ import "github.com/eolinker/goku-eosc/discovery" //agent 从属于HTTPCheck,实现了IHealthChecker接口 type agent struct { agentID string - *HTTPCheck + checker *HTTPCheck } //NewAgent 创建agent -func NewAgent(agentID string) discovery.IHealthChecker { - return &agent{agentID: agentID} +func NewAgent(agentID string, checker *HTTPCheck) discovery.IHealthChecker { + return &agent{agentID: agentID, checker: checker} } //AddToCheck 将节点添加进HTTPCheck的检查列表 func (a *agent) AddToCheck(node discovery.INode) error { - a.addToCheck(&checkNode{ + a.checker.addToCheck(&checkNode{ node: node, agentID: a.agentID, }) @@ -24,6 +24,6 @@ func (a *agent) AddToCheck(node discovery.INode) error { //Stop 停止agent并且将HTTPCheck中属于该agent的正在检查的所有节点都移除 func (a *agent) Stop() error { - a.stop(a.agentID) + a.checker.stop(a.agentID) return nil } diff --git a/health-check-http/http-check.go b/health-check-http/http-check.go index 3c2816a0..3ec82fb7 100644 --- a/health-check-http/http-check.go +++ b/health-check-http/http-check.go @@ -77,7 +77,7 @@ func (h *HTTPCheck) doCheckLoop() { //Agent 生成一个agent func (h *HTTPCheck) Agent() (discovery.IHealthChecker, error) { - return NewAgent(uuid.New()), nil + return NewAgent(uuid.New(), h), nil } //Reset 重置HTTPCheck的配置 diff --git a/node/http-proxy/backend/response.go b/node/http-proxy/backend/response.go new file mode 100644 index 00000000..2b61af12 --- /dev/null +++ b/node/http-proxy/backend/response.go @@ -0,0 +1,11 @@ +package backend + +import "net/http" + +//IResponse 响应接口 +type IResponse interface { + Body() []byte + StatusCode() int + Header() http.Header + Proto() string +} diff --git a/node/http-proxy/http-proxy-request/request.go b/node/http-proxy/http-proxy-request/request.go index f2cd0ee4..e0fe07e9 100644 --- a/node/http-proxy/http-proxy-request/request.go +++ b/node/http-proxy/http-proxy-request/request.go @@ -32,7 +32,7 @@ func SetCert(skip int, clientCerts []tls.Certificate) { transport.TLSClientConfig = tlsConfig } -//Request request +//Request http-proxy 请求结构体 type Request struct { client *http.Client method string @@ -46,19 +46,23 @@ type Request struct { httpRequest *http.Request } +//SetQueryParams 替换Query参数 func (r *Request) SetQueryParams(queryParams url.Values) { r.queryParams = queryParams } +//SetHeaders 替换header参数 func (r *Request) SetHeaders(headers http.Header) { r.headers = headers } +//Body 返回请求的body参数 func (r *Request) Body() []byte { return r.body } -func (r *Request) HttpRequest() *http.Request { +//HTTPRequest 返回http请求结构体 +func (r *Request) HTTPRequest() *http.Request { return r.httpRequest } @@ -88,7 +92,6 @@ func newRequest(method string, URL *url.URL) (*Request, error) { urlPath = URL.Scheme + "://" + URL.Host + URL.Path r := &Request{ - client: httpClient, method: method, url: urlPath, @@ -130,9 +133,9 @@ func (r *Request) SetTimeout(timeout time.Duration) { r.timeout = timeout } -//send 发送请求 +//Send 发送请求 func (r *Request) Send(ctx *http_context.Context) (*http.Response, error) { - req := r.HttpRequest() + req := r.HTTPRequest() req.Header.Set("Accept-Encoding", "gzip") req.Header = parseHeaders(r.headers) @@ -190,7 +193,7 @@ func parseHeaders(headers map[string][]string) http.Header { return h } -// 解析请求体 +//ParseBody 解析请求体 func (r *Request) ParseBody() error { if r.httpRequest == nil { var body io.Reader = nil diff --git a/node/http-proxy/r.go b/node/http-proxy/request.go similarity index 85% rename from node/http-proxy/r.go rename to node/http-proxy/request.go index 0dac3a05..05d93170 100644 --- a/node/http-proxy/r.go +++ b/node/http-proxy/request.go @@ -2,16 +2,17 @@ package http_proxy import ( "fmt" - "net/http" "net/url" "time" + "github.com/eolinker/goku-eosc/node/http-proxy/backend" + http_context "github.com/eolinker/goku-eosc/node/http-context" http_proxy_request "github.com/eolinker/goku-eosc/node/http-proxy/http-proxy-request" ) //DoRequest 构造请求 -func DoRequest(ctx *http_context.Context, uri string, timeout time.Duration) (*http.Response, error) { +func DoRequest(ctx *http_context.Context, uri string, timeout time.Duration) (backend.IResponse, error) { if uri == "" { return nil, fmt.Errorf("invaild url") } @@ -42,7 +43,7 @@ func DoRequest(ctx *http_context.Context, uri string, timeout time.Duration) (*h body, _ := ctx.ProxyRequest.RawBody() req.SetRawBody(body) if timeout != 0 { - req.SetTimeout(timeout) + req.SetTimeout(timeout * time.Millisecond) } err = req.ParseBody() if err != nil { @@ -53,5 +54,5 @@ func DoRequest(ctx *http_context.Context, uri string, timeout time.Duration) (*h return nil, err } - return response, err + return NewResponse(response) } diff --git a/node/http-proxy/response.go b/node/http-proxy/response.go new file mode 100644 index 00000000..49053593 --- /dev/null +++ b/node/http-proxy/response.go @@ -0,0 +1,48 @@ +package http_proxy + +import ( + "io/ioutil" + "net/http" + + "github.com/eolinker/goku-eosc/node/http-proxy/backend" +) + +type response struct { + body []byte + resp *http.Response +} + +//Body 响应体 +func (r *response) Body() []byte { + // TODO: 该处可进行gzip等算法解压 + return r.body +} + +//StatusCode 状态码 +func (r *response) StatusCode() int { + return r.resp.StatusCode +} + +//Header 响应头部 +func (r *response) Header() http.Header { + return r.resp.Header +} + +//Proto 协议 +func (r *response) Proto() string { + return r.resp.Proto +} + +//NewResponse 新建响应,返回IResponse节点 +func NewResponse(resp *http.Response) (backend.IResponse, error) { + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + defer resp.Body.Close() + r := &response{ + body: body, + resp: resp, + } + return r, nil +} diff --git a/service/service-http/config.go b/service/service-http/config.go index 5eb3d1bc..9ad5662d 100644 --- a/service/service-http/config.go +++ b/service/service-http/config.go @@ -7,12 +7,13 @@ import ( //Config service_http驱动配置 type Config struct { id string - Name string `json:"name"` - Driver string `json:"driver"` - Desc string `json:"desc"` - Timeout int64 `json:"timeout"` - Retry int `json:"retry"` - Scheme string `json:"scheme"` - RewriteURL string `json:"rewrite_url"` - Upstream eosc.RequireId `json:"upstream" skill:"github.com/eolinker/goku-eosc/upstream.upstream.IUpstream"` + Name string `json:"name"` + Driver string `json:"driver"` + Desc string `json:"desc"` + Timeout int64 `json:"timeout"` + Retry int `json:"retry"` + Scheme string `json:"scheme"` + RewriteURL string `json:"rewrite_url"` + //Auth []eosc.RequireId `json:"auth" skill:"github.com/eolinker/goku-eosc/auth.auth.IAuth"` + Upstream eosc.RequireId `json:"upstream" skill:"github.com/eolinker/goku-eosc/upstream.upstream.IUpstream"` } diff --git a/service/service-http/driver.go b/service/service-http/driver.go index 5ecfc295..0ef493e0 100644 --- a/service/service-http/driver.go +++ b/service/service-http/driver.go @@ -35,7 +35,7 @@ func (d *driver) ConfigType() reflect.Type { func (d *driver) Create(id, name string, v interface{}, workers map[eosc.RequireId]interface{}) (eosc.IWorker, error) { cfg, ok := v.(*Config) if !ok { - return nil, fmt.Errorf("need %s,now %s:%w", eosc.TypeNameOf((*Config)(nil)), eosc.TypeNameOf(v), eosc.ErrorStructType) + return nil, fmt.Errorf("need %s,now %s:%w", eosc.TypeNameOf((*Config)(nil)), eosc.TypeNameOf(v), ErrorStructType) } if work, has := workers[cfg.Upstream]; has { w := &serviceWorker{ diff --git a/service/service-http/service.go b/service/service-http/service.go index 9a57a427..955a5a8e 100644 --- a/service/service-http/service.go +++ b/service/service-http/service.go @@ -15,6 +15,10 @@ import ( "github.com/eolinker/goku-eosc/service" ) +var ( + ErrorStructType = errors.New("error struct type") +) + type serviceWorker struct { id string name string @@ -41,7 +45,7 @@ func (s *serviceWorker) Start() error { func (s *serviceWorker) Reset(conf interface{}, workers map[eosc.RequireId]interface{}) error { data, ok := conf.(*Config) if !ok { - return fmt.Errorf("need %s,now %s:%w", eosc.TypeNameOf((*Config)(nil)), eosc.TypeNameOf(conf), eosc.ErrorStructType) + return fmt.Errorf("need %s,now %s", eosc.TypeNameOf((*Config)(nil)), eosc.TypeNameOf(conf)) } if worker, has := workers[data.Upstream]; has { s.desc = data.Desc @@ -119,7 +123,11 @@ func (s *serviceWorker) Handle(w http.ResponseWriter, r *http.Request, router se ctx := http_context.NewContext(r, w) // 设置目标URL ctx.ProxyRequest.SetTargetURL(recombinePath(r.URL.Path, router.Location(), s.rewriteURL)) - s.upstream.Send(ctx, s) + response, err := s.upstream.Send(ctx, s) + if err != nil { + return err + } + fmt.Println(string(response.Body())) return nil } diff --git a/service/service-http/service_test.go b/service/service-http/service_test.go new file mode 100644 index 00000000..96ebfb7e --- /dev/null +++ b/service/service-http/service_test.go @@ -0,0 +1,145 @@ +package service_http + +import ( + "net/http" + "net/url" + "strings" + "testing" + + "github.com/eolinker/goku-eosc/service" + + upstream_http "github.com/eolinker/goku-eosc/upstream/upstream-http" + + "github.com/eolinker/eosc" + "github.com/eolinker/goku-eosc/discovery/static" + "github.com/eolinker/goku-eosc/upstream" + round_robin "github.com/eolinker/goku-eosc/upstream/round-robin" +) + +type routerDemo struct { + location string + host string + header map[string]string + query url.Values +} + +func (r *routerDemo) Location() string { + return r.location +} + +func (r *routerDemo) Host() string { + return r.host +} + +func (r *routerDemo) Header() map[string]string { + return r.header +} + +func (r *routerDemo) Query() url.Values { + return r.query +} + +func TestService(t *testing.T) { + round_robin.Register() + + staticConfig := &upstream_http.Config{ + Name: "product-user", + Driver: "http_proxy", + Desc: "生产环境-用户模块", + Scheme: "http", + Type: "round-robin", + Config: "127.0.0.1:8580 weight=10;10.1.1.1:8080 weight=20", + Discovery: "static_1@discovery", + } + + staticWorker, err := getWorker(static.NewFactory(), &static.Config{ + Name: "static_1", + Driver: "static", + Labels: nil, + Health: &static.HealthConfig{ + Protocol: "http", + Method: "POST", + URL: "/Web/Test/params/print", + SuccessCode: 200, + Period: 30, + Timeout: 3000, + }, + HealthOn: true, + }, "discovery", "static", "", "静态服务发现", nil, "", "static_1", nil) + if err != nil { + t.Error(err) + return + } + allWorker := make(map[eosc.RequireId]interface{}) + allWorker["static_1@discovery"] = staticWorker + upstreamWorker, err := getWorker(upstream_http.NewFactory(), staticConfig, "upstream", "http_proxy", "", "http转发驱动", nil, "", "product-user", allWorker) + if err != nil { + t.Error(err) + return + } + upstreamWorker.Start() + hUpstream, ok := upstreamWorker.(upstream.IUpstream) + if !ok { + t.Error(ErrorStructType) + return + + } + allWorker["product-user@upstream"] = hUpstream + + serviceWorker, err := getWorker(NewFactory(), &Config{ + Name: "guest", + Driver: "http", + Desc: "游客服务", + Timeout: 30000, + Retry: 2, + Scheme: "http", + RewriteURL: "/Web/Test", + Upstream: "product-user@upstream", + }, "service", "http", "", "http服务驱动", nil, "", "guest", allWorker) + if err != nil { + t.Error(err) + return + } + serv, ok := serviceWorker.(service.IService) + if !ok { + t.Error(ErrorStructType) + return + } + data := url.Values{} + data.Set("name", "eolinker") + r, err := http.NewRequest("POST", "http://localhost:8080/product/params/print", strings.NewReader(data.Encode())) + if err != nil { + t.Error(err) + return + } + serv.Handle(&response{}, r, &routerDemo{ + location: "/product", + host: "localhost:8080", + header: nil, + query: nil, + }) +} + +func getWorker(factory eosc.IProfessionDriverFactory, cfg interface{}, profession string, name string, label string, desc string, params map[string]string, workerID, workerName string, worker map[eosc.RequireId]interface{}) (eosc.IWorker, error) { + driver, err := factory.Create(profession, name, label, desc, params) + if err != nil { + return nil, err + } + + return driver.Create(workerID, workerName, cfg, worker) +} + +type response struct { +} + +func (r *response) Header() http.Header { + panic("implement me") +} + +func (r *response) Write(bytes []byte) (int, error) { + panic("implement me") +} + +func (r *response) WriteHeader(statusCode int) { + panic("implement me") +} diff --git a/upstream/upstream-http/http-proxy/http-proxy-request/request.go b/upstream/upstream-http/http-proxy/http-proxy-request/request.go deleted file mode 100644 index e0fe07e9..00000000 --- a/upstream/upstream-http/http-proxy/http-proxy-request/request.go +++ /dev/null @@ -1,242 +0,0 @@ -package http_proxy_request - -import ( - "bytes" - "crypto/tls" - "errors" - "io" - "net/http" - "net/url" - - http_context "github.com/eolinker/goku-eosc/node/http-context" - - // "fmt" - "time" -) - -//Version 版本号 -var Version = "2.0" - -var ( - transport = &http.Transport{TLSClientConfig: &tls.Config{ - InsecureSkipVerify: false, - }} - httpClient = &http.Client{ - Transport: transport, - } -) - -//SetCert 设置证书配置 -func SetCert(skip int, clientCerts []tls.Certificate) { - tlsConfig := &tls.Config{InsecureSkipVerify: skip == 1, Certificates: clientCerts} - transport.TLSClientConfig = tlsConfig -} - -//Request http-proxy 请求结构体 -type Request struct { - client *http.Client - method string - url string - headers map[string][]string - body []byte - - queryParams map[string][]string - - timeout time.Duration - httpRequest *http.Request -} - -//SetQueryParams 替换Query参数 -func (r *Request) SetQueryParams(queryParams url.Values) { - r.queryParams = queryParams -} - -//SetHeaders 替换header参数 -func (r *Request) SetHeaders(headers http.Header) { - r.headers = headers -} - -//Body 返回请求的body参数 -func (r *Request) Body() []byte { - return r.body -} - -//HTTPRequest 返回http请求结构体 -func (r *Request) HTTPRequest() *http.Request { - return r.httpRequest -} - -//NewRequest 创建新请求 -func NewRequest(method string, URL *url.URL) (*Request, error) { - if method != "GET" && method != "POST" && method != "PUT" && method != "DELETE" && - method != "HEAD" && method != "OPTIONS" && method != "PATCH" { - return nil, errors.New("Unsupported Request method") - } - return newRequest(method, URL) -} - -//URLPath urlPath -func URLPath(url string, query url.Values) string { - if len(query) < 1 { - return url - } - return url + "?" + query.Encode() -} - -func newRequest(method string, URL *url.URL) (*Request, error) { - var urlPath string - queryParams := make(map[string][]string) - for key, values := range URL.Query() { - queryParams[key] = values - } - urlPath = URL.Scheme + "://" + URL.Host + URL.Path - - r := &Request{ - client: httpClient, - method: method, - url: urlPath, - headers: make(map[string][]string), - queryParams: queryParams, - } - return r, nil -} - -//SetHeader 设置请求头 -func (r *Request) SetHeader(key string, values ...string) { - if len(values) > 0 { - r.headers[key] = values[:] - } else { - delete(r.headers, key) - } -} - -//Headers 获取请求头 -func (r *Request) Headers() map[string][]string { - headers := make(map[string][]string) - for key, values := range r.headers { - headers[key] = values[:] - } - return headers -} - -//SetQueryParam 设置Query参数 -func (r *Request) SetQueryParam(key string, values ...string) { - if len(values) > 0 { - r.queryParams[key] = values[:] - } else { - delete(r.queryParams, key) - } -} - -//SetTimeout 设置请求超时时间 -func (r *Request) SetTimeout(timeout time.Duration) { - r.timeout = timeout -} - -//Send 发送请求 -func (r *Request) Send(ctx *http_context.Context) (*http.Response, error) { - req := r.HTTPRequest() - req.Header.Set("Accept-Encoding", "gzip") - req.Header = parseHeaders(r.headers) - - r.client.Timeout = r.timeout - - httpResponse, err := r.client.Do(req) - - return httpResponse, err -} - -//QueryParams 获取query参数 -func (r *Request) QueryParams() map[string][]string { - params := make(map[string][]string) - for key, values := range r.queryParams { - params[key] = values[:] - } - return params -} - -//URLPath 获取完整的URL路径 -func (r *Request) URLPath() string { - if len(r.queryParams) > 0 { - return r.url + "?" + parseParams(r.queryParams).Encode() - } - return r.url -} - -//SetURL 设置URL -func (r *Request) SetURL(url string) { - r.url = url -} - -//SetRawBody 设置源数据 -func (r *Request) SetRawBody(body []byte) { - r.body = body -} - -// 解析请求头 -func parseHeaders(headers map[string][]string) http.Header { - h := http.Header{} - for key, values := range headers { - for _, value := range values { - h.Add(key, value) - } - } - - _, hasAccept := h["Accept"] - if !hasAccept { - h.Add("Accept", "*/*") - } - _, hasAgent := h["User-Agent"] - if !hasAgent { - h.Add("User-Agent", "goku-requests/"+Version) - } - return h -} - -//ParseBody 解析请求体 -func (r *Request) ParseBody() error { - if r.httpRequest == nil { - var body io.Reader = nil - if len(r.body) > 0 { - body = bytes.NewBuffer(r.body) - } - request, err := http.NewRequest(r.method, r.URLPath(), body) - if err != nil { - return err - } - r.httpRequest = request - } - return nil -} - -// 解析参数 -func parseParams(params map[string][]string) url.Values { - v := url.Values{} - for key, values := range params { - for _, value := range values { - v.Add(key, value) - } - } - return v -} - -// 解析URL -func parseURL(urlPath string) (URL *url.URL, err error) { - URL, err = url.Parse(urlPath) - if err != nil { - return nil, err - } - - if URL.Scheme != "http" && URL.Scheme != "https" { - urlPath = "http://" + urlPath - URL, err = url.Parse(urlPath) - if err != nil { - return nil, err - } - - if URL.Scheme != "http" && URL.Scheme != "https" { - return nil, errors.New("[package requests] only HTTP and HTTPS are accepted") - } - } - return -} diff --git a/upstream/upstream-http/http-proxy/r.go b/upstream/upstream-http/http-proxy/r.go deleted file mode 100644 index 894d10a4..00000000 --- a/upstream/upstream-http/http-proxy/r.go +++ /dev/null @@ -1,57 +0,0 @@ -package http_proxy - -import ( - "fmt" - "net/http" - "net/url" - "time" - - http_context "github.com/eolinker/goku-eosc/node/http-context" - http_proxy_request "github.com/eolinker/goku-eosc/upstream/upstream-http/http-proxy/http-proxy-request" -) - -//DoRequest 构造请求 -func DoRequest(ctx *http_context.Context, uri string, timeout time.Duration) (*http.Response, error) { - if uri == "" { - return nil, fmt.Errorf("invaild url") - } - - u, err := url.ParseRequestURI(uri) - if err != nil { - return nil, err - } - - req, err := http_proxy_request.NewRequest(ctx.ProxyRequest.Method, u) - if err != nil { - - return nil, err - } - - queryDest := u.Query() - if ctx.ProxyRequest.Queries() != nil { - for k, vs := range ctx.ProxyRequest.Queries() { - for _, v := range vs { - queryDest.Add(k, v) - } - } - } - - req.SetHeaders(ctx.ProxyRequest.Headers()) - - req.SetQueryParams(queryDest) - body, _ := ctx.ProxyRequest.RawBody() - req.SetRawBody(body) - if timeout != 0 { - req.SetTimeout(timeout * time.Millisecond) - } - err = req.ParseBody() - if err != nil { - return nil, err - } - response, err := req.Send(ctx) - if err != nil { - return nil, err - } - - return response, err -} diff --git a/upstream/upstream-http/upstream.go b/upstream/upstream-http/upstream.go index 88c39e3b..485636c5 100644 --- a/upstream/upstream-http/upstream.go +++ b/upstream/upstream-http/upstream.go @@ -3,7 +3,8 @@ package upstream_http import ( "errors" "fmt" - "net/http" + + "github.com/eolinker/goku-eosc/node/http-proxy/backend" "github.com/eolinker/goku-eosc/upstream" @@ -89,9 +90,9 @@ func (h *httpUpstream) CheckSkill(skill string) bool { } //Send 请求发送,忽略重试 -func (h *httpUpstream) Send(ctx *http_context.Context, serviceDetail service.IServiceDetail) (*http.Response, error) { +func (h *httpUpstream) Send(ctx *http_context.Context, serviceDetail service.IServiceDetail) (backend.IResponse, error) { - var response *http.Response + var response backend.IResponse var err error path := utils.TrimPrefixAll(ctx.ProxyRequest.TargetURL(), "/") diff --git a/upstream/upstream-http/upstream_consul_test.go b/upstream/upstream-http/upstream_consul_test.go index ef749682..d9b1a88d 100644 --- a/upstream/upstream-http/upstream_consul_test.go +++ b/upstream/upstream-http/upstream_consul_test.go @@ -38,7 +38,7 @@ func TestConsul(t *testing.T) { "scheme": "http", }, Config: consul.AccessConfig{ - Address: []string{"39.108.94.48:8500", "39.108.94.48:8501"}, + Address: []string{"10.1.94.48:8500", "10.1.94.48:8501"}, Params: map[string]string{"token": "a92316d8-5c99-4fa0-b4cd-30b9e66718aa"}, }, }, "discovery", "consul", "", "consul", nil, "", "consul_1", nil) diff --git a/upstream/upstream-http/upstream_eureka_test.go b/upstream/upstream-http/upstream_eureka_test.go index 850f2578..abd2afca 100644 --- a/upstream/upstream-http/upstream_eureka_test.go +++ b/upstream/upstream-http/upstream_eureka_test.go @@ -8,14 +8,10 @@ import ( "testing" "time" - "github.com/eolinker/goku-eosc/discovery/nacos" - "github.com/eolinker/goku-eosc/discovery/eureka" round_robin "github.com/eolinker/goku-eosc/upstream/round-robin" - "github.com/eolinker/goku-eosc/discovery/static" - http_context "github.com/eolinker/goku-eosc/node/http-context" "github.com/eolinker/goku-eosc/upstream" @@ -42,7 +38,7 @@ func TestEureka(t *testing.T) { "scheme": "http", }, Config: eureka.AccessConfig{ - Address: []string{"39.108.94.48:8761/eureka"}, + Address: []string{"10.1.94.48:8761/eureka"}, Params: map[string]string{ "username": "test", "password": "test"}, @@ -85,131 +81,3 @@ func TestEureka(t *testing.T) { fmt.Println("spend time is", time.Now().Sub(now)) } } - -func TestNacos(t *testing.T) { - round_robin.Register() - nacosConfig := &Config{ - Name: "product-user", - Driver: "http_proxy", - Desc: "生产环境-用户模块", - Scheme: "http", - Type: "round-robin", - Config: "nacos.naming.serviceName", - Discovery: "nacos_1@discovery", - } - - nacosWorker, err := getWorker(nacos.NewFactory(), &nacos.Config{ - Name: "nacos_1", - Driver: "nacos", - Labels: map[string]string{ - "scheme": "http", - }, - Config: nacos.AccessConfig{ - Address: []string{"39.108.94.48:8848"}, - Params: map[string]string{ - "username": "test", - "password": "test", - "healthOnly": "false", - }, - }, - }, "discovery", "nacos", "", "nacos", nil, "", "nacos_1", nil) - if err != nil { - t.Error(err) - return - } - nacosWorker.Start() - allWorker := make(map[eosc.RequireId]interface{}) - allWorker["nacos_1@discovery"] = nacosWorker - worker, err := getWorker(NewFactory(), nacosConfig, "upstream", "http_proxy", "", "http转发驱动", nil, "", "product-user", allWorker) - if err != nil { - t.Error(err) - return - } - - hUpstream, ok := worker.(upstream.IUpstream) - if !ok { - t.Error(ErrorStructType) - - } - data := url.Values{} - data.Set("name", "eolinker") - r, err := http.NewRequest("POST", "http://localhost:8080/Web/Test/params/print", strings.NewReader(data.Encode())) - if err != nil { - t.Error(ErrorStructType) - } - - ctx := http_context.NewContext(r, &response{}) - // 设置目标URL - ctx.ProxyRequest.SetTargetURL(r.URL.Path) - for i := 0; i < 10; i++ { - now := time.Now() - err = send(ctx, s, hUpstream) - if err != nil { - t.Error(err) - } - fmt.Println("spend time is", time.Now().Sub(now)) - } -} - -func TestStatic(t *testing.T) { - round_robin.Register() - staticConfig := &Config{ - Name: "product-user", - Driver: "http_proxy", - Desc: "生产环境-用户模块", - Scheme: "http", - Type: "round-robin", - Config: "127.0.0.1:8580 weight=10;47.95.203.198:8080 weight=20", - Discovery: "static_1@discovery", - } - - staticWorker, err := getWorker(static.NewFactory(), &static.Config{ - Name: "static_1", - Driver: "static", - Labels: nil, - Health: &static.HealthConfig{ - Protocol: "http", - Method: "POST", - URL: "/Web/Test/params/print", - SuccessCode: 200, - Period: 30, - Timeout: 3000, - }, - HealthOn: true, - }, "discovery", "static", "", "静态服务发现", nil, "", "static_1", nil) - if err != nil { - t.Error(err) - return - } - allWorker := make(map[eosc.RequireId]interface{}) - allWorker["static_1@discovery"] = staticWorker - worker, err := getWorker(NewFactory(), staticConfig, "upstream", "http_proxy", "", "http转发驱动", nil, "", "product-user", allWorker) - if err != nil { - t.Error(err) - return - } - - hUpstream, ok := worker.(upstream.IUpstream) - if !ok { - t.Error(ErrorStructType) - - } - data := url.Values{} - data.Set("name", "eolinker") - r, err := http.NewRequest("POST", "http://localhost:8080/Web/Test/params/print", strings.NewReader(data.Encode())) - if err != nil { - t.Error(ErrorStructType) - } - - ctx := http_context.NewContext(r, &response{}) - // 设置目标URL - ctx.ProxyRequest.SetTargetURL(r.URL.Path) - for i := 0; i < 10; i++ { - now := time.Now() - err = send(ctx, s, hUpstream) - if err != nil { - t.Error(err) - } - fmt.Println("spend time is", time.Now().Sub(now)) - } -} diff --git a/upstream/upstream-http/upstream_nacos_test.go b/upstream/upstream-http/upstream_nacos_test.go index 569cdd8e..a55ea235 100644 --- a/upstream/upstream-http/upstream_nacos_test.go +++ b/upstream/upstream-http/upstream_nacos_test.go @@ -38,7 +38,7 @@ func TestNacos(t *testing.T) { "scheme": "http", }, Config: nacos.AccessConfig{ - Address: []string{"39.108.94.48:8848"}, + Address: []string{"10.1.94.48:8848"}, Params: map[string]string{ "username": "test", "password": "test", diff --git a/upstream/upstream-http/upstream_static_test.go b/upstream/upstream-http/upstream_static_test.go index 569cdd8e..ece6a805 100644 --- a/upstream/upstream-http/upstream_static_test.go +++ b/upstream/upstream-http/upstream_static_test.go @@ -8,10 +8,10 @@ import ( "testing" "time" - "github.com/eolinker/goku-eosc/discovery/nacos" - round_robin "github.com/eolinker/goku-eosc/upstream/round-robin" + "github.com/eolinker/goku-eosc/discovery/static" + http_context "github.com/eolinker/goku-eosc/node/http-context" "github.com/eolinker/goku-eosc/upstream" @@ -19,41 +19,39 @@ import ( "github.com/eolinker/eosc" ) -func TestNacos(t *testing.T) { +func TestStatic(t *testing.T) { round_robin.Register() - nacosConfig := &Config{ + staticConfig := &Config{ Name: "product-user", Driver: "http_proxy", Desc: "生产环境-用户模块", Scheme: "http", Type: "round-robin", - Config: "nacos.naming.serviceName", - Discovery: "nacos_1@discovery", + Config: "127.0.0.1:8580 weight=10;10.1.1.1:8080 weight=20", + Discovery: "static_1@discovery", } - nacosWorker, err := getWorker(nacos.NewFactory(), &nacos.Config{ - Name: "nacos_1", - Driver: "nacos", - Labels: map[string]string{ - "scheme": "http", + staticWorker, err := getWorker(static.NewFactory(), &static.Config{ + Name: "static_1", + Driver: "static", + Labels: nil, + Health: &static.HealthConfig{ + Protocol: "http", + Method: "POST", + URL: "/Web/Test/params/print", + SuccessCode: 200, + Period: 30, + Timeout: 3000, }, - Config: nacos.AccessConfig{ - Address: []string{"39.108.94.48:8848"}, - Params: map[string]string{ - "username": "test", - "password": "test", - "healthOnly": "false", - }, - }, - }, "discovery", "nacos", "", "nacos", nil, "", "nacos_1", nil) + HealthOn: true, + }, "discovery", "static", "", "静态服务发现", nil, "", "static_1", nil) if err != nil { t.Error(err) return } - nacosWorker.Start() allWorker := make(map[eosc.RequireId]interface{}) - allWorker["nacos_1@discovery"] = nacosWorker - worker, err := getWorker(NewFactory(), nacosConfig, "upstream", "http_proxy", "", "http转发驱动", nil, "", "product-user", allWorker) + allWorker["static_1@discovery"] = staticWorker + worker, err := getWorker(NewFactory(), staticConfig, "upstream", "http_proxy", "", "http转发驱动", nil, "", "product-user", allWorker) if err != nil { t.Error(err) return diff --git a/upstream/upstream-http/upstream_test.go b/upstream/upstream-http/upstream_test.go index 8604e832..072e0e60 100644 --- a/upstream/upstream-http/upstream_test.go +++ b/upstream/upstream-http/upstream_test.go @@ -3,7 +3,6 @@ package upstream_http import ( "errors" "fmt" - "io/ioutil" "net/http" "testing" "time" @@ -77,12 +76,8 @@ func send(ctx *http_context.Context, s service.IServiceDetail, hUpstream upstrea if err != nil { return err } - defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - return err - } - fmt.Println(string(body)) + + fmt.Println(string(resp.Body())) return nil } diff --git a/upstream/upstream-http_anonymous/org.go b/upstream/upstream-http_anonymous/org.go index f886ead6..5d035496 100644 --- a/upstream/upstream-http_anonymous/org.go +++ b/upstream/upstream-http_anonymous/org.go @@ -2,9 +2,10 @@ package upstream_http_anonymous import ( "fmt" - "net/http" "reflect" + "github.com/eolinker/goku-eosc/node/http-proxy/backend" + "github.com/eolinker/goku-eosc/upstream" "github.com/eolinker/eosc" @@ -47,8 +48,8 @@ func (h *httpUpstream) CheckSkill(skill string) bool { } //send 请求发送,忽略重试 -func (h *httpUpstream) Send(ctx *http_context.Context, serviceDetail service.IServiceDetail) (*http.Response, error) { - var response *http.Response +func (h *httpUpstream) Send(ctx *http_context.Context, serviceDetail service.IServiceDetail) (backend.IResponse, error) { + var response backend.IResponse var err error path := utils.TrimPrefixAll(ctx.ProxyRequest.TargetURL(), "/") for doTrice := serviceDetail.Retry() + 1; doTrice > 0; doTrice-- { diff --git a/upstream/upstream.go b/upstream/upstream.go index 7ba24e03..33ba3bb2 100644 --- a/upstream/upstream.go +++ b/upstream/upstream.go @@ -1,9 +1,8 @@ package upstream import ( - "net/http" - http_context "github.com/eolinker/goku-eosc/node/http-context" + "github.com/eolinker/goku-eosc/node/http-proxy/backend" "github.com/eolinker/goku-eosc/service" ) @@ -14,5 +13,5 @@ func CheckSkill(skill string) bool { //IUpstream 实现了负载发送请求方法 type IUpstream interface { - Send(ctx *http_context.Context, serviceDetail service.IServiceDetail) (*http.Response, error) + Send(ctx *http_context.Context, serviceDetail service.IServiceDetail) (backend.IResponse, error) }