diff --git a/discovery-old/app.go b/discovery-old/app.go deleted file mode 100644 index 63cb5db8..00000000 --- a/discovery-old/app.go +++ /dev/null @@ -1,107 +0,0 @@ -package discovery_old - -import ( - "github.com/eolinker/eosc/eocontext" - "github.com/google/uuid" - "sync" -) - -type app struct { - id string - nodes map[string]eocontext.INode - healthChecker IHealthChecker - attrs Attrs - locker sync.RWMutex - container IAppContainer -} - -// Reset 重置app的节点列表 -func (s *app) Reset(nodes Nodes) { - - tmp := make(map[string]INode) - - for _, node := range nodes { - - if n, has := s.nodes[node.ID()]; has { - n.Leave() - } - tmp[node.ID()] = node - - } - s.locker.Lock() - s.nodes = tmp - s.locker.Unlock() -} - -// GetAttrs 获取app的属性集合 -func (s *app) GetAttrs() Attrs { - s.locker.RLock() - defer s.locker.RUnlock() - return s.attrs -} - -// GetAttrByName 通过属性名获取app对应属性 -func (s *app) GetAttrByName(name string) (string, bool) { - s.locker.RLock() - defer s.locker.RUnlock() - attr, ok := s.attrs[name] - return attr, ok -} - -// NewApp 创建服务发现app -func NewApp(checker IHealthChecker, container IAppContainer, attrs Attrs, nodes Nodes) IApp { - if attrs == nil { - attrs = make(Attrs) - } - return &app{ - attrs: attrs, - nodes: nodes, - locker: sync.RWMutex{}, - healthChecker: checker, - id: uuid.NewString(), - container: container, - } -} - -// ID 返回app的id -func (s *app) ID() string { - return s.id -} - -// Nodes 将运行中的节点列表返回 -func (s *app) Nodes() []BaseNode { - s.locker.RLock() - defer s.locker.RUnlock() - nodes := make([]BaseNode, 0, len(s.nodes)) - for _, node := range s.nodes { - if node.Status() != Running { - continue - } - nodes = append(nodes, node) - } - return nodes -} - -// NodeError 定时检查节点,当节点失败时,则返回错误 -func (s *app) NodeError(id string) error { - s.locker.Lock() - defer s.locker.Unlock() - if n, ok := s.nodes[id]; ok { - n.Down() - if s.healthChecker != nil { - err := s.healthChecker.AddToCheck(n) - return err - } - } - return nil -} - -// Close 关闭服务发现的app -func (s *app) Close() error { - // - s.container.Remove(s.id) - if s.healthChecker != nil { - return s.healthChecker.Stop() - } - return nil -} diff --git a/discovery-old/checker.go b/discovery-old/checker.go deleted file mode 100644 index 94050566..00000000 --- a/discovery-old/checker.go +++ /dev/null @@ -1,14 +0,0 @@ -package discovery_old - -// IHealthCheckerFactory 健康检查工厂类接口 -type IHealthCheckerFactory interface { - IHealthChecker - Agent() (IHealthChecker, error) - Reset(conf interface{}) error -} - -// IHealthChecker 健康检查接口 -type IHealthChecker interface { - AddToCheck(node BaseNode) error - Stop() error -} diff --git a/discovery-old/discovery.go b/discovery-old/discovery.go deleted file mode 100644 index ca917ee6..00000000 --- a/discovery-old/discovery.go +++ /dev/null @@ -1,62 +0,0 @@ -package discovery_old - -import ( - "errors" - "github.com/eolinker/eosc/eocontext" -) - -var ( - ErrDiscoveryDown = errors.New("discovery down") -) - -// CheckSkill 检查目标技能是否符合 -func CheckSkill(skill string) bool { - return skill == "github.com/eolinker/apinto/discovery.discovery.IDiscovery" -} - -// IDiscovery 服务发现接口 -type IDiscovery interface { - GetApp(config string) (IApp, error) -} - -// IApp app接口 -type IApp interface { - IAttributes - ID() string - Nodes() []eocontext.INode - Reset(nodes Nodes) - NodeError(id string) error - Close() error -} - -type INode = eocontext.INode - -// BaseNode 节点接口 -type BaseNode interface { - ID() string - IP() string - Port() int - Addr() string - Status() NodeStatus - Up() - Down() - Leave() -} - -// Attrs 属性集合 -type Attrs = eocontext.Attrs - -// IAttributes 属性接口 -type IAttributes = eocontext.IAttributes - -// NodeStatus 节点状态类型 -type NodeStatus = eocontext.NodeStatus - -const ( - //Running 节点运行中状态 - Running = eocontext.Running - //Down 节点不可用状态 - Down = eocontext.Down - //Leave 节点离开状态 - Leave = eocontext.Leave -) diff --git a/discovery-old/node.go b/discovery-old/node.go deleted file mode 100644 index b9c6f14c..00000000 --- a/discovery-old/node.go +++ /dev/null @@ -1,103 +0,0 @@ -package discovery_old - -import ( - "fmt" - - "github.com/eolinker/eosc" -) - -// NewNode 创建新节点 -func NewNode(labels map[string]string, id string, ip string, port int) BaseNode { - if labels == nil { - labels = make(map[string]string) - } - return &node{labels: labels, id: id, ip: ip, port: port, status: Running} -} - -type node struct { - labels Attrs - id string - ip string - port int - schema string - status NodeStatus -} - -func (n *node) Schema() string { - return n.schema -} - -// GetAttrs 获取节点属性集合 -func (n *node) GetAttrs() Attrs { - return n.labels -} - -// GetAttrByName 通过属性名获取节点属性 -func (n *node) GetAttrByName(name string) (string, bool) { - v, ok := n.labels[name] - return v, ok -} - -// IP 返回节点IP -func (n *node) IP() string { - return n.ip -} - -// Port 返回节点端口 -func (n *node) Port() int { - return n.port -} - -// ID 返回节点ID -func (n *node) ID() string { - return n.id -} - -// Status 返回节点状态 -func (n *node) Status() NodeStatus { - return n.status -} - -// Labels 返回节点标签集合 -func (n *node) Labels() map[string]string { - return n.labels -} - -// Addr 返回节点地址 -func (n *node) Addr() string { - if n.port == 0 { - return n.ip - } - return fmt.Sprintf("%s:%d", n.ip, n.port) -} - -// Up 将节点状态置为运行中 -func (n *node) Up() { - n.status = Running -} - -// Down 将节点状态置为不可用 -func (n *node) Down() { - n.status = Down -} - -// Leave 将节点状态置为离开 -func (n *node) Leave() { - n.status = Leave -} - -type INodesData interface { - Get(name string) (map[string]BaseNode, bool) - Set(name string, nodes map[string]BaseNode) - Del(name string) (map[string]BaseNode, bool) -} - -type NodesData struct { - eosc.Untyped[string, map[string]BaseNode] -} - -func NewNodesData() INodesData { - return &NodesData{Untyped: eosc.BuildUntyped[string, map[string]BaseNode]()} -} - -type Nodes map[string]INode diff --git a/discovery-old/services.go b/discovery-old/services.go deleted file mode 100644 index f53aef54..00000000 --- a/discovery-old/services.go +++ /dev/null @@ -1,74 +0,0 @@ -package discovery_old - -import ( - "github.com/eolinker/eosc" -) - -type serviceHandler eosc.Untyped[string, IApp] -type services struct { - apps eosc.Untyped[string, serviceHandler] - appNameOfID eosc.Untyped[string, string] -} - -// NewServices 创建服务发现的服务app集合 -func NewServices() IServices { - return &services{apps: eosc.BuildUntyped[string, serviceHandler](), appNameOfID: eosc.BuildUntyped[string, string]()} -} - -// get 获取对应服务名的节点列表 -func (s *services) get(serviceName string) (serviceHandler, bool) { - v, ok := s.apps.Get(serviceName) - - return v, ok -} - -// Set 将app存入其对应服务的节点列表 -func (s *services) Set(serviceName string, id string, app IApp) error { - s.appNameOfID.Set(id, serviceName) - if apps, ok := s.get(serviceName); ok { - apps.Set(id, app) - return nil - } - apps := eosc.BuildUntyped[string, IApp]() - apps.Set(id, app) - s.apps.Set(serviceName, apps) - return nil -} - -// Remove 将目标app从其对应服务的app列表中删除,传入值为目标app的id -func (s *services) Remove(appID string) (string, int) { - name, has := s.appNameOfID.Del(appID) - if has { - apps, ok := s.get(name) - if ok { - apps.Del(appID) - return name, apps.Count() - } - return name, 0 - } - return "", 0 -} - -// Update 更新目标服务所有app的节点列表 -func (s *services) Update(serviceName string, nodes Nodes) error { - if apps, ok := s.get(serviceName); ok { - for _, v := range apps.List() { - v.Reset(nodes) - } - } - return nil -} - -// AppKeys 获取现有服务app的服务名列表 -func (s *services) AppKeys() []string { - return s.apps.Keys() -} - -// IServices 服务app集合接口 -type IServices interface { - Set(serviceName string, id string, app IApp) error - Remove(id string) (string, int) - Update(serviceName string, nodes Nodes) error - AppKeys() []string - //GetStatus(serviceName string) (IApp, bool) -} diff --git a/discovery/app.go b/discovery/app.go index d0e42d73..c8137458 100644 --- a/discovery/app.go +++ b/discovery/app.go @@ -4,7 +4,6 @@ import ( "github.com/eolinker/eosc/eocontext" "sync" "sync/atomic" - "time" ) var ( @@ -14,31 +13,28 @@ var ( type IAppAgent interface { reset(nodes []eocontext.INode) - Agent(scheme string, timeout time.Duration) IApp + Agent() IApp } type IApp interface { - eocontext.EoApp + Nodes() []eocontext.INode Close() } type _AppAgent struct { - id string - locker sync.RWMutex - nodes []eocontext.INode - timeout time.Duration - use int64 + locker sync.RWMutex + nodes []eocontext.INode + use int64 } -func (a *_AppAgent) Agent(scheme string, timeout time.Duration) IApp { +func (a *_AppAgent) Agent() IApp { atomic.AddInt64(&a.use, 1) - return &_App{_AppAgent: a, scheme: scheme, timeout: timeout, isClose: 0} + return &_App{_AppAgent: a, isClose: 0} } type _App struct { *_AppAgent - scheme string - timeout time.Duration + isClose int32 } @@ -68,11 +64,3 @@ func (a *_AppAgent) Nodes() []eocontext.INode { copy(l, a.nodes) return l } - -func (a *_App) Scheme() string { - return a.scheme -} - -func (a *_App) TimeOut() time.Duration { - return a.timeout -} diff --git a/discovery/check.go b/discovery/check.go index c54aa92c..2f2d4f23 100644 --- a/discovery/check.go +++ b/discovery/check.go @@ -2,5 +2,7 @@ package discovery // IHealthChecker 健康检查接口 type IHealthChecker interface { - check(nodes []INode) + Check(nodes INodes) + Reset(conf interface{}) error + Stop() } diff --git a/discovery/discovery.go b/discovery/discovery.go index e7e4aecc..36e757a3 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -2,8 +2,11 @@ package discovery import ( "errors" + "github.com/eolinker/eosc" "github.com/eolinker/eosc/eocontext" "sync" + "sync/atomic" + "time" ) var ( @@ -12,7 +15,7 @@ var ( // IDiscovery 服务发现接口 type IDiscovery interface { - GetApp(config string) (IAppAgent, error) + GetApp(config string) (IApp, error) } // CheckSkill 检查目标技能是否符合 @@ -27,28 +30,34 @@ type NodeInfo struct { } type IAppContainer interface { + INodes Set(id string, info []NodeInfo) (app IAppAgent) - Remove(id string) Reset(info map[string][]NodeInfo) GetApp(id string) (IAppAgent, bool) Keys() []string } type appContainer struct { - nodes nodes lock sync.RWMutex - apps map[string]IAppAgent + nodes eosc.Untyped[string, INode] + apps map[string]*_AppAgent } func NewAppContainer() IAppContainer { - return &appContainer{} + return &appContainer{ + apps: make(map[string]*_AppAgent), + nodes: eosc.BuildUntyped[string, INode](), + } } func (ac *appContainer) Keys() []string { ac.lock.RLock() defer ac.lock.RUnlock() + if ac.apps == nil { + return nil + } keys := make([]string, 0, len(ac.apps)) for k := range ac.apps { keys = append(keys, k) @@ -60,7 +69,7 @@ func (ac *appContainer) create(infos []NodeInfo) []eocontext.INode { ns := make([]eocontext.INode, 0, len(infos)) for _, i := range infos { - n := ac.nodes.Get(i.Ip, i.Port) + n := ac.Get(i.Ip, i.Port) ns = append(ns, NewNode(n, i.Labels)) } return ns @@ -69,33 +78,69 @@ func (ac *appContainer) Set(name string, infos []NodeInfo) IAppAgent { ns := ac.create(infos) ac.lock.RLock() + app, has := ac.apps[name] - ac.lock.Unlock() + ac.lock.RUnlock() if has { app.reset(ns) return app } ac.lock.Lock() - app, has = ac.apps[name] + app, has = ac.apps[name] + needCheck := false if !has { + if len(ac.apps) == 0 { + needCheck = true + } app = newApp(ns) ac.apps[name] = app } ac.lock.Unlock() - + if needCheck { + go ac.doCheck() + } return app } +func (ac *appContainer) doCheck() { + t := time.NewTicker(time.Second * 10) + defer t.Stop() + for range t.C { + + ac.lock.Lock() + if len(ac.apps) == 0 { + ac.lock.Unlock() + return + } + for key, app := range ac.apps { + if atomic.LoadInt64(&app.use) <= 0 { + delete(ac.apps, key) + } + } + + nodeUse := make(map[string]int) + + for _, app := range ac.apps { + for _, n := range app.nodes { + nodeUse[n.ID()] += 1 + } + } + nodeList := ac.nodes.List() + dels := make([]string, 0, len(nodeList)) + for _, n := range nodeList { + if nodeUse[n.ID()] == 0 { + dels = append(dels, n.ID()) + } + } + ac.nodes.Dels(dels...) + ac.lock.Unlock() + } -func (ac *appContainer) Remove(name string) { - ac.lock.Lock() - defer ac.lock.RUnlock() - delete(ac.apps, name) } func (ac *appContainer) Reset(infos map[string][]NodeInfo) { - nm := make(map[string]IAppAgent) + nm := make(map[string]*_AppAgent) for name, info := range infos { nm[name] = newApp(ac.create(info)) } diff --git a/discovery/nodes.go b/discovery/nodes.go index 4cbea9c4..6598ae42 100644 --- a/discovery/nodes.go +++ b/discovery/nodes.go @@ -2,55 +2,38 @@ package discovery import ( "fmt" - "sync" +) + +var ( + _ INodes = (*appContainer)(nil) ) type INodes interface { - Get(scheme, ip string, port int) INode + Get(ip string, port int) INode All() []INode - remove(id ...string) } -type nodes struct { - locker sync.RWMutex - m map[string]INode -} - -func (n *nodes) remove(ids ...string) { - n.locker.Lock() - defer n.locker.Unlock() - - for _, id := range ids { - delete(n.m, id) - } -} - -func (n *nodes) Get(ip string, port int) INode { +func (ac *appContainer) Get(ip string, port int) INode { id := fmt.Sprintf("%s:%d", ip, port) - n.locker.RLock() - node, has := n.m[id] - n.locker.RUnlock() + + node, has := ac.nodes.Get(id) + if has { return node } - n.locker.Lock() - defer n.locker.Unlock() - node, has = n.m[id] + ac.lock.Lock() + defer ac.lock.Unlock() + node, has = ac.nodes.Get(id) if has { return node } - n.m[id] = newBaseNode(ip, port) - return n.m[id] + ac.nodes.Set(id, newBaseNode(ip, port)) + node, _ = ac.nodes.Get(id) + return node } -func (n *nodes) All() []INode { +func (ac *appContainer) All() []INode { - n.locker.RLock() - ls := make([]INode, 0, len(n.m)) - for _, node := range n.m { - ls = append(ls, node) - } - n.locker.RUnlock() - return ls + return ac.nodes.List() } diff --git a/drivers/discovery/eureka/client.go b/drivers/discovery/eureka/client.go index 1b30c76f..e6453519 100644 --- a/drivers/discovery/eureka/client.go +++ b/drivers/discovery/eureka/client.go @@ -51,7 +51,6 @@ func (c *client) GetNodeList(serviceName string) ([]discovery.NodeInfo, error) { "app": ins.App, "hostName": ins.HostName, }, - Scheme: ins.Status, } nodes = append(nodes, node) } diff --git a/drivers/discovery/eureka/driver.go b/drivers/discovery/eureka/driver.go index 165d8d17..89ba48ed 100644 --- a/drivers/discovery/eureka/driver.go +++ b/drivers/discovery/eureka/driver.go @@ -18,8 +18,7 @@ func Create(id, name string, conf *Config, workers map[eosc.RequireId]eosc.IWork return &eureka{ WorkerBase: drivers.Worker(id, name), client: newClient(conf.getAddress(), conf.getParams()), - nodes: discovery.NewNodesData(), - services: discovery.NewServices(), + services: discovery.NewAppContainer(), locker: sync.RWMutex{}, }, nil } diff --git a/drivers/discovery/eureka/eureka.go b/drivers/discovery/eureka/eureka.go index c3908ed8..e37ff92f 100644 --- a/drivers/discovery/eureka/eureka.go +++ b/drivers/discovery/eureka/eureka.go @@ -25,18 +25,18 @@ type eureka struct { } // GetApp 获取服务发现中目标服务的app -func (e *eureka) GetApp(serviceName string) (discovery.IAppAgent, error) { +func (e *eureka) GetApp(serviceName string) (discovery.IApp, error) { e.locker.RLock() app, ok := e.services.GetApp(serviceName) e.locker.RUnlock() if ok { - return app, nil + return app.Agent(), nil } e.locker.Lock() app, ok = e.services.GetApp(serviceName) if ok { - return app, nil + return app.Agent(), nil } // 开始重新获取 @@ -47,7 +47,7 @@ func (e *eureka) GetApp(serviceName string) (discovery.IAppAgent, error) { app = e.services.Set(serviceName, ns) e.locker.Unlock() - return app, nil + return app.Agent(), nil } // Start 开始服务发现 diff --git a/drivers/discovery/nacos/driver.go b/drivers/discovery/nacos/driver.go index 7165c9cb..678ee644 100644 --- a/drivers/discovery/nacos/driver.go +++ b/drivers/discovery/nacos/driver.go @@ -18,9 +18,9 @@ func Create(id, name string, cfg *Config, workers map[eosc.RequireId]eosc.IWorke return &nacos{ WorkerBase: drivers.Worker(id, name), client: newClient(cfg.Config.Address, cfg.getParams()), - nodes: discovery.NewNodesData(), - services: discovery.NewServices(), - locker: sync.RWMutex{}, + + services: discovery.NewAppContainer(), + locker: sync.RWMutex{}, }, nil } diff --git a/drivers/discovery/nacos/nacos.go b/drivers/discovery/nacos/nacos.go index 0ad382e5..be9b738a 100644 --- a/drivers/discovery/nacos/nacos.go +++ b/drivers/discovery/nacos/nacos.go @@ -99,18 +99,18 @@ func (n *nacos) Stop() error { } // GetApp 获取服务发现中目标服务的app -func (n *nacos) GetApp(serviceName string) (discovery.IAppAgent, error) { +func (n *nacos) GetApp(serviceName string) (discovery.IApp, error) { n.locker.RLock() app, ok := n.services.GetApp(serviceName) n.locker.RUnlock() if ok { - return app, nil + return app.Agent(), nil } n.locker.Lock() app, ok = n.services.GetApp(serviceName) if ok { - return app, nil + return app.Agent(), nil } ns, err := n.client.GetNodeList(serviceName) @@ -123,5 +123,5 @@ func (n *nacos) GetApp(serviceName string) (discovery.IAppAgent, error) { n.locker.Unlock() - return app, nil + return app.Agent(), nil } diff --git a/drivers/discovery/static/driver.go b/drivers/discovery/static/driver.go index 11143ae4..f9a039ef 100644 --- a/drivers/discovery/static/driver.go +++ b/drivers/discovery/static/driver.go @@ -7,10 +7,6 @@ import ( "github.com/eolinker/eosc" ) -const ( - driverName = "static" -) - // Create 创建静态服务发现驱动的实例 func Create(id, name string, cfg *Config, workers map[eosc.RequireId]eosc.IWorker) (eosc.IWorker, error) { @@ -23,6 +19,7 @@ func Create(id, name string, cfg *Config, workers map[eosc.RequireId]eosc.IWorke func CreateAnonymous(conf *Config) discovery.IDiscovery { s := &static{} - + s.cfg = conf + s.Start() return s } diff --git a/drivers/discovery/static/heath.go b/drivers/discovery/static/heath.go index 79b682be..994ed8b8 100644 --- a/drivers/discovery/static/heath.go +++ b/drivers/discovery/static/heath.go @@ -1,6 +1,7 @@ package static import ( + "github.com/eolinker/apinto/discovery" health_check_http "github.com/eolinker/apinto/health-check-http" "regexp" "strings" @@ -11,10 +12,13 @@ import ( type HeathCheckHandler struct { healthOn bool checker *health_check_http.HTTPCheck + nodes discovery.INodes } -func NewHeathCheckHandler(cfg *Config) *HeathCheckHandler { - h := &HeathCheckHandler{} +func NewHeathCheckHandler(nodes discovery.INodes, cfg *Config) *HeathCheckHandler { + h := &HeathCheckHandler{ + nodes: nodes, + } h.reset(cfg) return h } @@ -42,6 +46,7 @@ func (s *HeathCheckHandler) reset(cfg *Config) error { Period: time.Duration(cfg.Health.Period) * time.Second, Timeout: time.Duration(cfg.Health.Timeout) * time.Millisecond, }) + checker.Check(s.nodes) } else { checker.Reset( health_check_http.Config{ @@ -74,7 +79,7 @@ func fields(str string) []string { return words } -//validIP 判断ip是否合法 +// validIP 判断ip是否合法 func validIP(ip string) bool { match, err := regexp.MatchString(`^(?:(?:1[0-9][0-9]\.)|(?:2[0-4][0-9]\.)|(?:25[0-5]\.)|(?:[1-9][0-9]\.)|(?:[0-9]\.)){3}(?:(?:1[0-9][0-9])|(?:2[0-4][0-9])|(?:25[0-5])|(?:[1-9][0-9])|(?:[0-9]))$`, ip) if err != nil { diff --git a/drivers/discovery/static/static.go b/drivers/discovery/static/static.go index 53701fdf..b772cd3c 100644 --- a/drivers/discovery/static/static.go +++ b/drivers/discovery/static/static.go @@ -33,7 +33,7 @@ func (s *static) Start() error { if handler != nil { return nil } - handler = NewHeathCheckHandler(s.cfg) + handler = NewHeathCheckHandler(s.services, s.cfg) return nil } @@ -78,14 +78,14 @@ func (s *static) CheckSkill(skill string) bool { } // GetApp 获取服务发现中目标服务的app -func (s *static) GetApp(config string) (discovery.IAppAgent, error) { +func (s *static) GetApp(config string) (discovery.IApp, error) { app, err := s.decode(config) if err != nil { return nil, err } - return app, nil + return app.Agent(), nil } // Remove 从所有服务app中移除目标app diff --git a/drivers/service/service.go b/drivers/service/service.go index 2cbc0385..33197b59 100644 --- a/drivers/service/service.go +++ b/drivers/service/service.go @@ -22,7 +22,7 @@ var ( type Service struct { eocontext.BalanceHandler - app discovery.IAppAgent + app discovery.IApp scheme string timeout time.Duration @@ -73,7 +73,7 @@ func (s *Service) Reset(conf interface{}, workers map[eosc.RequireId]eosc.IWorke if err != nil { return err } - var apps discovery.IAppAgent + var apps discovery.IApp if data.Discovery != "" { discoveryWorker, has := workers[data.Discovery] if !has { @@ -98,13 +98,17 @@ func (s *Service) Reset(conf interface{}, workers map[eosc.RequireId]eosc.IWorke apps.Close() return err } + old := s.app s.app = apps + if old != nil { + old.Close() + } s.scheme = data.Scheme s.timeout = time.Duration(data.Timeout) * time.Millisecond s.BalanceHandler = balanceHandler s.passHost = parsePassHost(data.PassHost) s.scheme = data.Scheme - s.app = apps + s.upstreamHost = data.UpstreamHost return nil diff --git a/health-check-http/agent.go b/health-check-http/agent.go index 10a3ce9e..a32d031b 100644 --- a/health-check-http/agent.go +++ b/health-check-http/agent.go @@ -1,29 +1 @@ package health_check_http - -import "github.com/eolinker/apinto/discovery" - -// agent 从属于HTTPCheck,实现了IHealthChecker接口 -type agent struct { - agentID string - checker *HTTPCheck -} - -// NewAgent 创建agent -func NewAgent(agentID string, checker *HTTPCheck) discovery.IHealthChecker { - return &agent{agentID: agentID, checker: checker} -} - -// AddToCheck 将节点添加进HTTPCheck的检查列表 -func (a *agent) AddToCheck(node discovery.BaseNode) error { - a.checker.addToCheck(&checkNode{ - node: node, - agentID: a.agentID, - }) - return nil -} - -// Stop 停止agent并且将HTTPCheck中属于该agent的正在检查的所有节点都移除 -func (a *agent) Stop() error { - a.checker.stop(a.agentID) - return nil -} diff --git a/health-check-http/http-check.go b/health-check-http/http-check.go index b94ee756..bf91cfd9 100644 --- a/health-check-http/http-check.go +++ b/health-check-http/http-check.go @@ -11,7 +11,10 @@ import ( "github.com/eolinker/eosc/log" "github.com/eolinker/apinto/discovery" - "github.com/google/uuid" +) + +var ( + _ discovery.IHealthChecker = (*HTTPCheck)(nil) ) // NewHTTPCheck 创建HTTPCheck @@ -22,32 +25,36 @@ func NewHTTPCheck(config Config) *HTTPCheck { config: &config, ctx: ctx, cancel: cancel, - ch: make(chan *checkNode, 10), + client: &http.Client{}, locker: sync.RWMutex{}, } - go checker.doCheckLoop() + return checker } // HTTPCheck HTTP健康检查结构,实现了IHealthChecker接口 type HTTPCheck struct { config *Config + nodes discovery.INodes ctx context.Context cancel context.CancelFunc - ch chan *checkNode - delCh chan string + client *http.Client locker sync.RWMutex } +func (h *HTTPCheck) Check(nodes discovery.INodes) { + go h.doCheckLoop(nodes) +} + // doCheckLoop 定时检查,维护了一个待检测节点集合 -func (h *HTTPCheck) doCheckLoop() { +func (h *HTTPCheck) doCheckLoop(nodes discovery.INodes) { if h.config.Period < 1 { return } ticker := time.NewTicker(h.config.Period) - nodes := map[string]map[string]*checkNode{} + defer ticker.Stop() for { select { @@ -55,84 +62,43 @@ func (h *HTTPCheck) doCheckLoop() { return case <-ticker.C: { - nodes = h.check(nodes) - } - //接收待检测节点并存入待检测节点集合 - case node, ok := <-h.ch: - { - if ok { - if _, ok := nodes[node.agentID]; !ok { - nodes[node.agentID] = make(map[string]*checkNode) - } - nodes[node.agentID][node.node.ID()] = node - } - } - //接收agentID,并将待检测集合中属于该agent的所有节点移除 - case agentID, ok := <-h.delCh: - { - if ok { - delete(nodes, agentID) - } + h.check(nodes.All()) } } } } -// Agent 生成一个agent -func (h *HTTPCheck) Agent() (discovery.IHealthChecker, error) { - return NewAgent(uuid.NewString(), h), nil -} - // Reset 重置HTTPCheck的配置 -func (h *HTTPCheck) Reset(conf Config) error { - h.config = &conf +func (h *HTTPCheck) Reset(conf interface{}) error { + cf, ok := conf.(Config) + if !ok { + return nil + } + h.reset(&cf) return nil } - -// AddToCheck 将节点添加进HTTPCheck的检查列表 -func (h *HTTPCheck) AddToCheck(node discovery.BaseNode) error { - h.addToCheck(&checkNode{ - node: node, - agentID: "", - }) - return nil -} - -// addToCheck 将节点传入HTTPCheck的检测Channel -func (h *HTTPCheck) addToCheck(node *checkNode) error { - h.ch <- node - return nil +func (h *HTTPCheck) reset(conf *Config) { + h.config = conf } // Stop 停止HTTPCheck,中止定时检查 -func (h *HTTPCheck) Stop() error { +func (h *HTTPCheck) Stop() { h.cancel() - return nil -} -// stop 停止从属该agentID的所有节点的健康检查 -func (h *HTTPCheck) stop(agentID string) { - h.delCh <- agentID } // check 对待检查的节点集合进行检测,入参:nodes map[agentID][nodeID]*checkNode -func (h *HTTPCheck) check(nodes map[string]map[string]*checkNode) map[string]map[string]*checkNode { - //将待检测节点集合中地址相同的节点整合在一起,结构为:map[node.Addr][]*checkNode - newNodes := make(map[string][]*checkNode) - for _, ns := range nodes { - for _, n := range ns { - if n.node.Status() == discovery.Down { - newNodes[n.node.Addr()] = append(newNodes[n.node.Addr()], n) - } - } - } +func (h *HTTPCheck) check(nodes []discovery.INode) { /*对每个节点地址进行检测 成功则将属于该地址的所有节点的状态都置于可运行,并从HTTPCheck维护的待检测节点列表中移除 失败则下次定时检查再进行检测 */ - for addr, ns := range newNodes { - uri := fmt.Sprintf("%s://%s/%s", h.config.Protocol, strings.TrimSuffix(addr, "/"), strings.TrimPrefix(h.config.URL, "/")) + for _, ns := range nodes { + if ns.Status() != discovery.Down { + continue + } + uri := fmt.Sprintf("%s://%s/%s", h.config.Protocol, strings.TrimSuffix(ns.Addr(), "/"), strings.TrimPrefix(h.config.URL, "/")) h.client.Timeout = h.config.Timeout request, err := http.NewRequest(h.config.Method, uri, nil) if err != nil { @@ -149,16 +115,6 @@ func (h *HTTPCheck) check(nodes map[string]map[string]*checkNode) map[string]map log.Error(err) continue } - for _, n := range ns { - n.node.Up() - delete(nodes[n.agentID], n.node.ID()) - } + ns.Up() } - return nodes -} - -// checkNode 进入检查channel的节点结构 -type checkNode struct { - node discovery.BaseNode - agentID string }