完成health_check_http

This commit is contained in:
chenjiekun
2021-07-21 12:45:10 +08:00
parent 1b6fc491fc
commit c2a4535f94
10 changed files with 213 additions and 359 deletions

View File

@@ -10,7 +10,7 @@ import (
"time"
"unicode"
health_check_http "github.com/eolinker/goku-eosc/health-check-http"
health_check_http "github.com/eolinker/goku-eosc/healthcheckhttp"
"github.com/eolinker/eosc"
"github.com/eolinker/goku-eosc/discovery"
@@ -25,7 +25,7 @@ type static struct {
apps map[string]discovery.IApp
locker sync.RWMutex
healthOn bool
checker *health_check_http.HttpCheck
checker *health_check_http.HTTPCheck
context context.Context
cancelFunc context.CancelFunc
}
@@ -58,11 +58,11 @@ func (s *static) Reset(conf interface{}, workers map[eosc.RequireId]interface{})
}
if s.healthOn {
if s.checker == nil {
s.checker = health_check_http.NewHttpCheck(
s.checker = health_check_http.NewHTTPCheck(
health_check_http.Config{
Protocol: cfg.Health.Protocol,
Method: cfg.Health.Method,
Url: cfg.Health.URL,
URL: cfg.Health.URL,
SuccessCode: cfg.Health.SuccessCode,
Period: time.Duration(cfg.Health.Period) * time.Second,
Timeout: time.Duration(cfg.Health.Timeout) * time.Millisecond,
@@ -72,7 +72,7 @@ func (s *static) Reset(conf interface{}, workers map[eosc.RequireId]interface{})
health_check_http.Config{
Protocol: cfg.Health.Protocol,
Method: cfg.Health.Method,
Url: cfg.Health.URL,
URL: cfg.Health.URL,
SuccessCode: cfg.Health.SuccessCode,
Period: time.Duration(cfg.Health.Period) * time.Second,
Timeout: time.Duration(cfg.Health.Timeout) * time.Millisecond,

View File

@@ -1,25 +0,0 @@
package health_check_http
import "github.com/eolinker/goku-eosc/discovery"
type Agent struct {
agentId string
*HttpCheck
}
func NewAgent(agentId string) *Agent {
return &Agent{agentId: agentId}
}
func (a *Agent) AddToCheck(node discovery.INode) error {
a.addToCheck(&checkNode{
node: node,
agentId: a.agentId,
})
return nil
}
func (a *Agent) Stop() error {
a.stop(a.agentId)
return nil
}

View File

@@ -1,77 +0,0 @@
package health_check_http
//
//type checker struct {
// ctx context.Context
// cancelFunc context.CancelFunc
// ch chan *checkNode
// config *Config
// id string
//}
//
//func (c *checker) doCheckLoop() {
// nodes := make(map[string]map[string]discovery.INode)
// for {
// select {
// case <-c.ctx.Done():
// {
// return
// }
// case n, ok := <-c.ch:
// {
//
// }
// }
// }
//}
//
//func (c *checker) AddToCheck(node discovery.INode) error {
// n := &checkNode{
// node: node,
// from: c.id,
// }
// c.ch <- n
// return nil
//}
//
//func (c *checker) Stop() error {
// c.cancelFunc()
// return nil
//}
//
//type checkNode struct {
// node discovery.INode
// from string
//}
//
//func NewHealthCheck(conf interface{}) (discovery.IHealthChecker, error) {
// ctx, cancel := context.WithCancel(context.Background())
// ch := make(chan *checkNode, 10)
// c := &checker{
// ctx: ctx,
// cancelFunc: cancel,
// ch: ch,
// }
// return c, nil
//}
//
//func (c *checker) check() error {
// for _, checkNode := range c.nodes {
// uri := fmt.Sprintf("%s://%s/%s", c.config.Protocol, strings.TrimSuffix(checkNode.checkNode.Addr(), "/"), strings.TrimPrefix(c.config.URL, "/"))
// c.client.Timeout = c.config.Timeout
// request, err := http.NewRequest(c.config.Method, uri, nil)
// if err != nil {
// return err
// }
// resp, err := c.client.Do(request)
// if err != nil {
// return err
// }
// defer resp.Body.Close()
// if c.config.SuccessCode != resp.StatusCode {
// return errors.New("error status code")
// }
//
// }
// return nil
//}

View File

@@ -1,108 +0,0 @@
package health_check_http
//var supportMethods = []string{
// "POST", "GET", "PUT", "DELETE", "OPTIONS", "HEAD", "OPTIONS",
//}
//
//type checkerFactory struct {
// c chan *checkNode
// ctx context.Context
// cancelFunc context.CancelFunc
//}
//
//type checkNode struct {
// checkNode discovery.INode
// from string
// IsRemove bool
//}
//
//func NewCheckerFactory() *checkerFactory {
//
// ctx, cancelFunc := context.WithCancel(context.Background())
// ch := make(chan *checkNode, 10)
//
// c := checkerFactory{
// c: ch,
// ctx: ctx,
// cancelFunc: cancelFunc,
// }
// go c.doCheckLoop()
// return &c
//}
//func (c *checkerFactory) doCheckLoop() {
// for {
// select {
// case <-c.ctx.Done():
// {
// return
// }
// case checkNode, ok := <-c.c:
// {
//
// }
//
// }
// }
//}
//func (c *checkerFactory) Create(config interface{}) (discovery.IHealthChecker, error) {
// cfg, ok := config.(*Config)
// if !ok {
// return nil, errors.New("fail to create health checker")
// }
// validMethod := false
// for _, method := range supportMethods {
// if method == cfg.Method {
// validMethod = true
// break
// }
// }
// if !validMethod {
// return nil, errors.New("error request method")
// }
// return &checker{config: cfg, client: &http.Client{}}, nil
//}
//
//type checker struct {
// id string
// config *Config
// client *http.Client
// nodes map[string]*checkNode
//
// ch chan *checkNode
//}
//
//func (c *checker) AddToCheck(checkNode discovery.INode) error {
// n := &checkNode{
// checkNode: checkNode,
// from: c.id,
// }
// c.ch <- n
// c.nodes[checkNode.ID()] = n
// return nil
//}
//
//func (c *checker) Stop() error {
//
// return nil
//}
//
//func (c *checker) check() error {
// for _, checkNode := range c.nodes {
// uri := fmt.Sprintf("%s://%s/%s", c.config.Protocol, strings.TrimSuffix(checkNode.checkNode.Addr(), "/"), strings.TrimPrefix(c.config.URL, "/"))
// c.client.Timeout = c.config.Timeout
// request, err := http.NewRequest(c.config.Method, uri, nil)
// if err != nil {
// return err
// }
// resp, err := c.client.Do(request)
// if err != nil {
// return err
// }
// defer resp.Body.Close()
// if c.config.SuccessCode != resp.StatusCode {
// return errors.New("error status code")
// }
//
// }
// return nil
//}

View File

@@ -1,142 +0,0 @@
package health_check_http
import (
"context"
"fmt"
"net/http"
"strings"
"sync"
"time"
"github.com/eolinker/eosc/log"
"github.com/eolinker/goku-eosc/discovery"
"github.com/go-basic/uuid"
)
func NewHttpCheck(config Config) *HttpCheck {
ctx, cancel := context.WithCancel(context.Background())
checker := &HttpCheck{
config: &config,
ctx: ctx,
cancel: cancel,
ch: make(chan *checkNode, 10),
client: &http.Client{},
locker: sync.RWMutex{},
}
go checker.doCheckLoop()
return checker
}
type HttpCheck struct {
config *Config
ctx context.Context
cancel context.CancelFunc
ch chan *checkNode
delCh chan string
client *http.Client
locker sync.RWMutex
}
func (h *HttpCheck) doCheckLoop() {
ticker := time.NewTicker(h.config.Period)
nodes := map[string]map[string]*checkNode{}
defer ticker.Stop()
for {
select {
case <-h.ctx.Done():
return
case <-ticker.C:
{
nodes = h.check(nodes)
}
case node, ok := <-h.ch:
{
if ok {
if _, ok := nodes[node.agentId]; !ok {
nodes[node.agentId] = make(map[string]*checkNode)
}
nodes[node.agentId][node.node.ID()] = node
}
}
case id, ok := <-h.delCh:
{
if ok {
delete(nodes, id)
}
}
}
}
}
func (h *HttpCheck) Agent() (discovery.IHealthChecker, error) {
return NewAgent(uuid.New()), nil
}
func (h *HttpCheck) Reset(conf Config) error {
h.config = &conf
return nil
}
func (h *HttpCheck) AddToCheck(node discovery.INode) error {
h.addToCheck(&checkNode{
node: node,
agentId: "",
})
return nil
}
func (h *HttpCheck) addToCheck(node *checkNode) error {
h.ch <- node
return nil
}
func (h *HttpCheck) Stop() error {
h.cancel()
return nil
}
func (h *HttpCheck) stop(id string) {
h.delCh <- id
}
func (h *HttpCheck) check(nodes map[string]map[string]*checkNode) map[string]map[string]*checkNode {
newNodes := make(map[string][]*checkNode)
for _, ns := range nodes {
for _, n := range ns {
if n.node.Status() == discovery.Down {
newNodes[n.node.Addr()] = append(newNodes[n.node.Addr()], n)
}
}
}
for addr, ns := range newNodes {
uri := fmt.Sprintf("%s://%s/%s", h.config.Protocol, strings.TrimSuffix(addr, "/"), strings.TrimPrefix(h.config.Url, "/"))
h.client.Timeout = h.config.Timeout
request, err := http.NewRequest(h.config.Method, uri, nil)
if err != nil {
log.Error(err)
continue
}
resp, err := h.client.Do(request)
if err != nil {
log.Error(err)
continue
}
resp.Body.Close()
if h.config.SuccessCode != resp.StatusCode {
log.Error(err)
continue
}
for _, n := range ns {
n.node.Up()
delete(nodes[n.agentId], n.node.ID())
}
}
return nodes
}
type checkNode struct {
node discovery.INode
agentId string
}

29
healthcheckhttp/agent.go Normal file
View File

@@ -0,0 +1,29 @@
package healthcheckhttp
import "github.com/eolinker/goku-eosc/discovery"
//agent 从属于HTTPCheck,实现了IHealthChecker接口
type agent struct {
agentID string
*HTTPCheck
}
//NewAgent 创建agent
func NewAgent(agentID string) discovery.IHealthChecker {
return &agent{agentID: agentID}
}
//AddToCheck 将节点添加进HTTPCheck的检查列表
func (a *agent) AddToCheck(node discovery.INode) error {
a.addToCheck(&checkNode{
node: node,
agentID: a.agentID,
})
return nil
}
//Stop 停止agent并且将HTTPCheck中属于该agent的正在检查的所有节点都移除
func (a *agent) Stop() error {
a.stop(a.agentID)
return nil
}

View File

@@ -1,11 +1,12 @@
package health_check_http
package healthcheckhttp
import "time"
//Config healthCheck所需配置
type Config struct {
Protocol string
Method string
Url string
URL string
SuccessCode int
Period time.Duration
Timeout time.Duration

Binary file not shown.

After

Width:  |  Height:  |  Size: 26 KiB

View File

@@ -0,0 +1,15 @@
# HealthCheckHTTP用法
* 用法一自身实现了IHealthChecker接口可在服务发现创建app时作为healthChecker传入。
* 用法二HTTPCheck作为健康检查中心可生成agent每个agent作为app的healthChecker
#### 用法一:
相当于为每个app启动一个协程定时检查该app中所有的不健康节点
#### 用法二:
HTTPCheck作为检测中心生成的agent作为每个app的healthChecker。当app的节点出问题时agent充当搬运工把节点通过通道传给HTTPCheck交由HTTPCheck进行定时检测。相当于只为服务发现启动一个协程。
![avatar](healthcheckhttp/health_check_http.png)

View File

@@ -0,0 +1,161 @@
package healthcheckhttp
import (
"context"
"fmt"
"net/http"
"strings"
"sync"
"time"
"github.com/eolinker/eosc/log"
"github.com/eolinker/goku-eosc/discovery"
"github.com/go-basic/uuid"
)
//NewHTTPCheck 创建HTTPCheck
func NewHTTPCheck(config Config) *HTTPCheck {
ctx, cancel := context.WithCancel(context.Background())
checker := &HTTPCheck{
config: &config,
ctx: ctx,
cancel: cancel,
ch: make(chan *checkNode, 10),
client: &http.Client{},
locker: sync.RWMutex{},
}
go checker.doCheckLoop()
return checker
}
//HTTPCheck HTTP健康检查结构,实现了IHealthChecker接口
type HTTPCheck struct {
config *Config
ctx context.Context
cancel context.CancelFunc
ch chan *checkNode
delCh chan string
client *http.Client
locker sync.RWMutex
}
//doCheckLoop 定时检查,维护了一个待检测节点集合
func (h *HTTPCheck) doCheckLoop() {
ticker := time.NewTicker(h.config.Period)
nodes := map[string]map[string]*checkNode{}
defer ticker.Stop()
for {
select {
case <-h.ctx.Done():
return
case <-ticker.C:
{
nodes = h.check(nodes)
}
//接收待检测节点并存入待检测节点集合
case node, ok := <-h.ch:
{
if ok {
if _, ok := nodes[node.agentID]; !ok {
nodes[node.agentID] = make(map[string]*checkNode)
}
nodes[node.agentID][node.node.ID()] = node
}
}
//接收agentID,并将待检测集合中属于该agent的所有节点移除
case agentID, ok := <-h.delCh:
{
if ok {
delete(nodes, agentID)
}
}
}
}
}
//Agent 生成一个agent
func (h *HTTPCheck) Agent() (discovery.IHealthChecker, error) {
return NewAgent(uuid.New()), nil
}
//Reset 重置HTTPCheck的配置
func (h *HTTPCheck) Reset(conf Config) error {
h.config = &conf
return nil
}
//AddToCheck 将节点添加进HTTPCheck的检查列表
func (h *HTTPCheck) AddToCheck(node discovery.INode) error {
h.addToCheck(&checkNode{
node: node,
agentID: "",
})
return nil
}
//addToCheck 将节点传入HTTPCheck的检测Channel
func (h *HTTPCheck) addToCheck(node *checkNode) error {
h.ch <- node
return nil
}
//Stop 停止HTTPCheck中止定时检查
func (h *HTTPCheck) Stop() error {
h.cancel()
return nil
}
//stop 停止从属该agentID的所有节点的健康检查
func (h *HTTPCheck) stop(agentID string) {
h.delCh <- agentID
}
//check 对待检查的节点集合进行检测入参nodes map[agentID][nodeID]*checkNode
func (h *HTTPCheck) check(nodes map[string]map[string]*checkNode) map[string]map[string]*checkNode {
//将待检测节点集合中地址相同的节点整合在一起结构为map[node.Addr][]*checkNode
newNodes := make(map[string][]*checkNode)
for _, ns := range nodes {
for _, n := range ns {
if n.node.Status() == discovery.Down {
newNodes[n.node.Addr()] = append(newNodes[n.node.Addr()], n)
}
}
}
/*对每个节点地址进行检测
成功则将属于该地址的所有节点的状态都置于可运行并从HTTPCheck维护的待检测节点列表中移除
失败则下次定时检查再进行检测
*/
for addr, ns := range newNodes {
uri := fmt.Sprintf("%s://%s/%s", h.config.Protocol, strings.TrimSuffix(addr, "/"), strings.TrimPrefix(h.config.URL, "/"))
h.client.Timeout = h.config.Timeout
request, err := http.NewRequest(h.config.Method, uri, nil)
if err != nil {
log.Error(err)
continue
}
resp, err := h.client.Do(request)
if err != nil {
log.Error(err)
continue
}
resp.Body.Close()
if h.config.SuccessCode != resp.StatusCode {
log.Error(err)
continue
}
for _, n := range ns {
n.node.Up()
delete(nodes[n.agentID], n.node.ID())
}
}
return nodes
}
//checkNode 进入检查channel的节点结构
type checkNode struct {
node discovery.INode
agentID string
}