重写健康检查

This commit is contained in:
huangmengzhu
2023-03-24 18:47:17 +08:00
parent 42adb01a19
commit f7803009c7
20 changed files with 150 additions and 560 deletions

View File

@@ -1,107 +0,0 @@
package discovery_old
import (
"github.com/eolinker/eosc/eocontext"
"github.com/google/uuid"
"sync"
)
type app struct {
id string
nodes map[string]eocontext.INode
healthChecker IHealthChecker
attrs Attrs
locker sync.RWMutex
container IAppContainer
}
// Reset 重置app的节点列表
func (s *app) Reset(nodes Nodes) {
tmp := make(map[string]INode)
for _, node := range nodes {
if n, has := s.nodes[node.ID()]; has {
n.Leave()
}
tmp[node.ID()] = node
}
s.locker.Lock()
s.nodes = tmp
s.locker.Unlock()
}
// GetAttrs 获取app的属性集合
func (s *app) GetAttrs() Attrs {
s.locker.RLock()
defer s.locker.RUnlock()
return s.attrs
}
// GetAttrByName 通过属性名获取app对应属性
func (s *app) GetAttrByName(name string) (string, bool) {
s.locker.RLock()
defer s.locker.RUnlock()
attr, ok := s.attrs[name]
return attr, ok
}
// NewApp 创建服务发现app
func NewApp(checker IHealthChecker, container IAppContainer, attrs Attrs, nodes Nodes) IApp {
if attrs == nil {
attrs = make(Attrs)
}
return &app{
attrs: attrs,
nodes: nodes,
locker: sync.RWMutex{},
healthChecker: checker,
id: uuid.NewString(),
container: container,
}
}
// ID 返回app的id
func (s *app) ID() string {
return s.id
}
// Nodes 将运行中的节点列表返回
func (s *app) Nodes() []BaseNode {
s.locker.RLock()
defer s.locker.RUnlock()
nodes := make([]BaseNode, 0, len(s.nodes))
for _, node := range s.nodes {
if node.Status() != Running {
continue
}
nodes = append(nodes, node)
}
return nodes
}
// NodeError 定时检查节点,当节点失败时,则返回错误
func (s *app) NodeError(id string) error {
s.locker.Lock()
defer s.locker.Unlock()
if n, ok := s.nodes[id]; ok {
n.Down()
if s.healthChecker != nil {
err := s.healthChecker.AddToCheck(n)
return err
}
}
return nil
}
// Close 关闭服务发现的app
func (s *app) Close() error {
//
s.container.Remove(s.id)
if s.healthChecker != nil {
return s.healthChecker.Stop()
}
return nil
}

View File

@@ -1,14 +0,0 @@
package discovery_old
// IHealthCheckerFactory 健康检查工厂类接口
type IHealthCheckerFactory interface {
IHealthChecker
Agent() (IHealthChecker, error)
Reset(conf interface{}) error
}
// IHealthChecker 健康检查接口
type IHealthChecker interface {
AddToCheck(node BaseNode) error
Stop() error
}

View File

@@ -1,62 +0,0 @@
package discovery_old
import (
"errors"
"github.com/eolinker/eosc/eocontext"
)
var (
ErrDiscoveryDown = errors.New("discovery down")
)
// CheckSkill 检查目标技能是否符合
func CheckSkill(skill string) bool {
return skill == "github.com/eolinker/apinto/discovery.discovery.IDiscovery"
}
// IDiscovery 服务发现接口
type IDiscovery interface {
GetApp(config string) (IApp, error)
}
// IApp app接口
type IApp interface {
IAttributes
ID() string
Nodes() []eocontext.INode
Reset(nodes Nodes)
NodeError(id string) error
Close() error
}
type INode = eocontext.INode
// BaseNode 节点接口
type BaseNode interface {
ID() string
IP() string
Port() int
Addr() string
Status() NodeStatus
Up()
Down()
Leave()
}
// Attrs 属性集合
type Attrs = eocontext.Attrs
// IAttributes 属性接口
type IAttributes = eocontext.IAttributes
// NodeStatus 节点状态类型
type NodeStatus = eocontext.NodeStatus
const (
//Running 节点运行中状态
Running = eocontext.Running
//Down 节点不可用状态
Down = eocontext.Down
//Leave 节点离开状态
Leave = eocontext.Leave
)

View File

@@ -1,103 +0,0 @@
package discovery_old
import (
"fmt"
"github.com/eolinker/eosc"
)
// NewNode 创建新节点
func NewNode(labels map[string]string, id string, ip string, port int) BaseNode {
if labels == nil {
labels = make(map[string]string)
}
return &node{labels: labels, id: id, ip: ip, port: port, status: Running}
}
type node struct {
labels Attrs
id string
ip string
port int
schema string
status NodeStatus
}
func (n *node) Schema() string {
return n.schema
}
// GetAttrs 获取节点属性集合
func (n *node) GetAttrs() Attrs {
return n.labels
}
// GetAttrByName 通过属性名获取节点属性
func (n *node) GetAttrByName(name string) (string, bool) {
v, ok := n.labels[name]
return v, ok
}
// IP 返回节点IP
func (n *node) IP() string {
return n.ip
}
// Port 返回节点端口
func (n *node) Port() int {
return n.port
}
// ID 返回节点ID
func (n *node) ID() string {
return n.id
}
// Status 返回节点状态
func (n *node) Status() NodeStatus {
return n.status
}
// Labels 返回节点标签集合
func (n *node) Labels() map[string]string {
return n.labels
}
// Addr 返回节点地址
func (n *node) Addr() string {
if n.port == 0 {
return n.ip
}
return fmt.Sprintf("%s:%d", n.ip, n.port)
}
// Up 将节点状态置为运行中
func (n *node) Up() {
n.status = Running
}
// Down 将节点状态置为不可用
func (n *node) Down() {
n.status = Down
}
// Leave 将节点状态置为离开
func (n *node) Leave() {
n.status = Leave
}
type INodesData interface {
Get(name string) (map[string]BaseNode, bool)
Set(name string, nodes map[string]BaseNode)
Del(name string) (map[string]BaseNode, bool)
}
type NodesData struct {
eosc.Untyped[string, map[string]BaseNode]
}
func NewNodesData() INodesData {
return &NodesData{Untyped: eosc.BuildUntyped[string, map[string]BaseNode]()}
}
type Nodes map[string]INode

View File

@@ -1,74 +0,0 @@
package discovery_old
import (
"github.com/eolinker/eosc"
)
type serviceHandler eosc.Untyped[string, IApp]
type services struct {
apps eosc.Untyped[string, serviceHandler]
appNameOfID eosc.Untyped[string, string]
}
// NewServices 创建服务发现的服务app集合
func NewServices() IServices {
return &services{apps: eosc.BuildUntyped[string, serviceHandler](), appNameOfID: eosc.BuildUntyped[string, string]()}
}
// get 获取对应服务名的节点列表
func (s *services) get(serviceName string) (serviceHandler, bool) {
v, ok := s.apps.Get(serviceName)
return v, ok
}
// Set 将app存入其对应服务的节点列表
func (s *services) Set(serviceName string, id string, app IApp) error {
s.appNameOfID.Set(id, serviceName)
if apps, ok := s.get(serviceName); ok {
apps.Set(id, app)
return nil
}
apps := eosc.BuildUntyped[string, IApp]()
apps.Set(id, app)
s.apps.Set(serviceName, apps)
return nil
}
// Remove 将目标app从其对应服务的app列表中删除传入值为目标app的id
func (s *services) Remove(appID string) (string, int) {
name, has := s.appNameOfID.Del(appID)
if has {
apps, ok := s.get(name)
if ok {
apps.Del(appID)
return name, apps.Count()
}
return name, 0
}
return "", 0
}
// Update 更新目标服务所有app的节点列表
func (s *services) Update(serviceName string, nodes Nodes) error {
if apps, ok := s.get(serviceName); ok {
for _, v := range apps.List() {
v.Reset(nodes)
}
}
return nil
}
// AppKeys 获取现有服务app的服务名列表
func (s *services) AppKeys() []string {
return s.apps.Keys()
}
// IServices 服务app集合接口
type IServices interface {
Set(serviceName string, id string, app IApp) error
Remove(id string) (string, int)
Update(serviceName string, nodes Nodes) error
AppKeys() []string
//GetStatus(serviceName string) (IApp, bool)
}

View File

@@ -4,7 +4,6 @@ import (
"github.com/eolinker/eosc/eocontext" "github.com/eolinker/eosc/eocontext"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
) )
var ( var (
@@ -14,31 +13,28 @@ var (
type IAppAgent interface { type IAppAgent interface {
reset(nodes []eocontext.INode) reset(nodes []eocontext.INode)
Agent(scheme string, timeout time.Duration) IApp Agent() IApp
} }
type IApp interface { type IApp interface {
eocontext.EoApp Nodes() []eocontext.INode
Close() Close()
} }
type _AppAgent struct { type _AppAgent struct {
id string locker sync.RWMutex
locker sync.RWMutex nodes []eocontext.INode
nodes []eocontext.INode use int64
timeout time.Duration
use int64
} }
func (a *_AppAgent) Agent(scheme string, timeout time.Duration) IApp { func (a *_AppAgent) Agent() IApp {
atomic.AddInt64(&a.use, 1) atomic.AddInt64(&a.use, 1)
return &_App{_AppAgent: a, scheme: scheme, timeout: timeout, isClose: 0} return &_App{_AppAgent: a, isClose: 0}
} }
type _App struct { type _App struct {
*_AppAgent *_AppAgent
scheme string
timeout time.Duration
isClose int32 isClose int32
} }
@@ -68,11 +64,3 @@ func (a *_AppAgent) Nodes() []eocontext.INode {
copy(l, a.nodes) copy(l, a.nodes)
return l return l
} }
func (a *_App) Scheme() string {
return a.scheme
}
func (a *_App) TimeOut() time.Duration {
return a.timeout
}

View File

@@ -2,5 +2,7 @@ package discovery
// IHealthChecker 健康检查接口 // IHealthChecker 健康检查接口
type IHealthChecker interface { type IHealthChecker interface {
check(nodes []INode) Check(nodes INodes)
Reset(conf interface{}) error
Stop()
} }

View File

@@ -2,8 +2,11 @@ package discovery
import ( import (
"errors" "errors"
"github.com/eolinker/eosc"
"github.com/eolinker/eosc/eocontext" "github.com/eolinker/eosc/eocontext"
"sync" "sync"
"sync/atomic"
"time"
) )
var ( var (
@@ -12,7 +15,7 @@ var (
// IDiscovery 服务发现接口 // IDiscovery 服务发现接口
type IDiscovery interface { type IDiscovery interface {
GetApp(config string) (IAppAgent, error) GetApp(config string) (IApp, error)
} }
// CheckSkill 检查目标技能是否符合 // CheckSkill 检查目标技能是否符合
@@ -27,28 +30,34 @@ type NodeInfo struct {
} }
type IAppContainer interface { type IAppContainer interface {
INodes
Set(id string, info []NodeInfo) (app IAppAgent) Set(id string, info []NodeInfo) (app IAppAgent)
Remove(id string)
Reset(info map[string][]NodeInfo) Reset(info map[string][]NodeInfo)
GetApp(id string) (IAppAgent, bool) GetApp(id string) (IAppAgent, bool)
Keys() []string Keys() []string
} }
type appContainer struct { type appContainer struct {
nodes nodes
lock sync.RWMutex lock sync.RWMutex
apps map[string]IAppAgent nodes eosc.Untyped[string, INode]
apps map[string]*_AppAgent
} }
func NewAppContainer() IAppContainer { func NewAppContainer() IAppContainer {
return &appContainer{} return &appContainer{
apps: make(map[string]*_AppAgent),
nodes: eosc.BuildUntyped[string, INode](),
}
} }
func (ac *appContainer) Keys() []string { func (ac *appContainer) Keys() []string {
ac.lock.RLock() ac.lock.RLock()
defer ac.lock.RUnlock() defer ac.lock.RUnlock()
if ac.apps == nil {
return nil
}
keys := make([]string, 0, len(ac.apps)) keys := make([]string, 0, len(ac.apps))
for k := range ac.apps { for k := range ac.apps {
keys = append(keys, k) keys = append(keys, k)
@@ -60,7 +69,7 @@ func (ac *appContainer) create(infos []NodeInfo) []eocontext.INode {
ns := make([]eocontext.INode, 0, len(infos)) ns := make([]eocontext.INode, 0, len(infos))
for _, i := range infos { for _, i := range infos {
n := ac.nodes.Get(i.Ip, i.Port) n := ac.Get(i.Ip, i.Port)
ns = append(ns, NewNode(n, i.Labels)) ns = append(ns, NewNode(n, i.Labels))
} }
return ns return ns
@@ -69,33 +78,69 @@ func (ac *appContainer) Set(name string, infos []NodeInfo) IAppAgent {
ns := ac.create(infos) ns := ac.create(infos)
ac.lock.RLock() ac.lock.RLock()
app, has := ac.apps[name] app, has := ac.apps[name]
ac.lock.Unlock() ac.lock.RUnlock()
if has { if has {
app.reset(ns) app.reset(ns)
return app return app
} }
ac.lock.Lock() ac.lock.Lock()
app, has = ac.apps[name]
app, has = ac.apps[name]
needCheck := false
if !has { if !has {
if len(ac.apps) == 0 {
needCheck = true
}
app = newApp(ns) app = newApp(ns)
ac.apps[name] = app ac.apps[name] = app
} }
ac.lock.Unlock() ac.lock.Unlock()
if needCheck {
go ac.doCheck()
}
return app return app
} }
func (ac *appContainer) doCheck() {
t := time.NewTicker(time.Second * 10)
defer t.Stop()
for range t.C {
ac.lock.Lock()
if len(ac.apps) == 0 {
ac.lock.Unlock()
return
}
for key, app := range ac.apps {
if atomic.LoadInt64(&app.use) <= 0 {
delete(ac.apps, key)
}
}
nodeUse := make(map[string]int)
for _, app := range ac.apps {
for _, n := range app.nodes {
nodeUse[n.ID()] += 1
}
}
nodeList := ac.nodes.List()
dels := make([]string, 0, len(nodeList))
for _, n := range nodeList {
if nodeUse[n.ID()] == 0 {
dels = append(dels, n.ID())
}
}
ac.nodes.Dels(dels...)
ac.lock.Unlock()
}
func (ac *appContainer) Remove(name string) {
ac.lock.Lock()
defer ac.lock.RUnlock()
delete(ac.apps, name)
} }
func (ac *appContainer) Reset(infos map[string][]NodeInfo) { func (ac *appContainer) Reset(infos map[string][]NodeInfo) {
nm := make(map[string]IAppAgent) nm := make(map[string]*_AppAgent)
for name, info := range infos { for name, info := range infos {
nm[name] = newApp(ac.create(info)) nm[name] = newApp(ac.create(info))
} }

View File

@@ -2,55 +2,38 @@ package discovery
import ( import (
"fmt" "fmt"
"sync" )
var (
_ INodes = (*appContainer)(nil)
) )
type INodes interface { type INodes interface {
Get(scheme, ip string, port int) INode Get(ip string, port int) INode
All() []INode All() []INode
remove(id ...string)
} }
type nodes struct { func (ac *appContainer) Get(ip string, port int) INode {
locker sync.RWMutex
m map[string]INode
}
func (n *nodes) remove(ids ...string) {
n.locker.Lock()
defer n.locker.Unlock()
for _, id := range ids {
delete(n.m, id)
}
}
func (n *nodes) Get(ip string, port int) INode {
id := fmt.Sprintf("%s:%d", ip, port) id := fmt.Sprintf("%s:%d", ip, port)
n.locker.RLock()
node, has := n.m[id] node, has := ac.nodes.Get(id)
n.locker.RUnlock()
if has { if has {
return node return node
} }
n.locker.Lock() ac.lock.Lock()
defer n.locker.Unlock() defer ac.lock.Unlock()
node, has = n.m[id] node, has = ac.nodes.Get(id)
if has { if has {
return node return node
} }
n.m[id] = newBaseNode(ip, port) ac.nodes.Set(id, newBaseNode(ip, port))
return n.m[id] node, _ = ac.nodes.Get(id)
return node
} }
func (n *nodes) All() []INode { func (ac *appContainer) All() []INode {
n.locker.RLock() return ac.nodes.List()
ls := make([]INode, 0, len(n.m))
for _, node := range n.m {
ls = append(ls, node)
}
n.locker.RUnlock()
return ls
} }

View File

@@ -51,7 +51,6 @@ func (c *client) GetNodeList(serviceName string) ([]discovery.NodeInfo, error) {
"app": ins.App, "app": ins.App,
"hostName": ins.HostName, "hostName": ins.HostName,
}, },
Scheme: ins.Status,
} }
nodes = append(nodes, node) nodes = append(nodes, node)
} }

View File

@@ -18,8 +18,7 @@ func Create(id, name string, conf *Config, workers map[eosc.RequireId]eosc.IWork
return &eureka{ return &eureka{
WorkerBase: drivers.Worker(id, name), WorkerBase: drivers.Worker(id, name),
client: newClient(conf.getAddress(), conf.getParams()), client: newClient(conf.getAddress(), conf.getParams()),
nodes: discovery.NewNodesData(), services: discovery.NewAppContainer(),
services: discovery.NewServices(),
locker: sync.RWMutex{}, locker: sync.RWMutex{},
}, nil }, nil
} }

View File

@@ -25,18 +25,18 @@ type eureka struct {
} }
// GetApp 获取服务发现中目标服务的app // GetApp 获取服务发现中目标服务的app
func (e *eureka) GetApp(serviceName string) (discovery.IAppAgent, error) { func (e *eureka) GetApp(serviceName string) (discovery.IApp, error) {
e.locker.RLock() e.locker.RLock()
app, ok := e.services.GetApp(serviceName) app, ok := e.services.GetApp(serviceName)
e.locker.RUnlock() e.locker.RUnlock()
if ok { if ok {
return app, nil return app.Agent(), nil
} }
e.locker.Lock() e.locker.Lock()
app, ok = e.services.GetApp(serviceName) app, ok = e.services.GetApp(serviceName)
if ok { if ok {
return app, nil return app.Agent(), nil
} }
// 开始重新获取 // 开始重新获取
@@ -47,7 +47,7 @@ func (e *eureka) GetApp(serviceName string) (discovery.IAppAgent, error) {
app = e.services.Set(serviceName, ns) app = e.services.Set(serviceName, ns)
e.locker.Unlock() e.locker.Unlock()
return app, nil return app.Agent(), nil
} }
// Start 开始服务发现 // Start 开始服务发现

View File

@@ -18,9 +18,9 @@ func Create(id, name string, cfg *Config, workers map[eosc.RequireId]eosc.IWorke
return &nacos{ return &nacos{
WorkerBase: drivers.Worker(id, name), WorkerBase: drivers.Worker(id, name),
client: newClient(cfg.Config.Address, cfg.getParams()), client: newClient(cfg.Config.Address, cfg.getParams()),
nodes: discovery.NewNodesData(),
services: discovery.NewServices(), services: discovery.NewAppContainer(),
locker: sync.RWMutex{}, locker: sync.RWMutex{},
}, nil }, nil
} }

View File

@@ -99,18 +99,18 @@ func (n *nacos) Stop() error {
} }
// GetApp 获取服务发现中目标服务的app // GetApp 获取服务发现中目标服务的app
func (n *nacos) GetApp(serviceName string) (discovery.IAppAgent, error) { func (n *nacos) GetApp(serviceName string) (discovery.IApp, error) {
n.locker.RLock() n.locker.RLock()
app, ok := n.services.GetApp(serviceName) app, ok := n.services.GetApp(serviceName)
n.locker.RUnlock() n.locker.RUnlock()
if ok { if ok {
return app, nil return app.Agent(), nil
} }
n.locker.Lock() n.locker.Lock()
app, ok = n.services.GetApp(serviceName) app, ok = n.services.GetApp(serviceName)
if ok { if ok {
return app, nil return app.Agent(), nil
} }
ns, err := n.client.GetNodeList(serviceName) ns, err := n.client.GetNodeList(serviceName)
@@ -123,5 +123,5 @@ func (n *nacos) GetApp(serviceName string) (discovery.IAppAgent, error) {
n.locker.Unlock() n.locker.Unlock()
return app, nil return app.Agent(), nil
} }

View File

@@ -7,10 +7,6 @@ import (
"github.com/eolinker/eosc" "github.com/eolinker/eosc"
) )
const (
driverName = "static"
)
// Create 创建静态服务发现驱动的实例 // Create 创建静态服务发现驱动的实例
func Create(id, name string, cfg *Config, workers map[eosc.RequireId]eosc.IWorker) (eosc.IWorker, error) { func Create(id, name string, cfg *Config, workers map[eosc.RequireId]eosc.IWorker) (eosc.IWorker, error) {
@@ -23,6 +19,7 @@ func Create(id, name string, cfg *Config, workers map[eosc.RequireId]eosc.IWorke
func CreateAnonymous(conf *Config) discovery.IDiscovery { func CreateAnonymous(conf *Config) discovery.IDiscovery {
s := &static{} s := &static{}
s.cfg = conf
s.Start()
return s return s
} }

View File

@@ -1,6 +1,7 @@
package static package static
import ( import (
"github.com/eolinker/apinto/discovery"
health_check_http "github.com/eolinker/apinto/health-check-http" health_check_http "github.com/eolinker/apinto/health-check-http"
"regexp" "regexp"
"strings" "strings"
@@ -11,10 +12,13 @@ import (
type HeathCheckHandler struct { type HeathCheckHandler struct {
healthOn bool healthOn bool
checker *health_check_http.HTTPCheck checker *health_check_http.HTTPCheck
nodes discovery.INodes
} }
func NewHeathCheckHandler(cfg *Config) *HeathCheckHandler { func NewHeathCheckHandler(nodes discovery.INodes, cfg *Config) *HeathCheckHandler {
h := &HeathCheckHandler{} h := &HeathCheckHandler{
nodes: nodes,
}
h.reset(cfg) h.reset(cfg)
return h return h
} }
@@ -42,6 +46,7 @@ func (s *HeathCheckHandler) reset(cfg *Config) error {
Period: time.Duration(cfg.Health.Period) * time.Second, Period: time.Duration(cfg.Health.Period) * time.Second,
Timeout: time.Duration(cfg.Health.Timeout) * time.Millisecond, Timeout: time.Duration(cfg.Health.Timeout) * time.Millisecond,
}) })
checker.Check(s.nodes)
} else { } else {
checker.Reset( checker.Reset(
health_check_http.Config{ health_check_http.Config{
@@ -74,7 +79,7 @@ func fields(str string) []string {
return words return words
} }
//validIP 判断ip是否合法 // validIP 判断ip是否合法
func validIP(ip string) bool { func validIP(ip string) bool {
match, err := regexp.MatchString(`^(?:(?:1[0-9][0-9]\.)|(?:2[0-4][0-9]\.)|(?:25[0-5]\.)|(?:[1-9][0-9]\.)|(?:[0-9]\.)){3}(?:(?:1[0-9][0-9])|(?:2[0-4][0-9])|(?:25[0-5])|(?:[1-9][0-9])|(?:[0-9]))$`, ip) match, err := regexp.MatchString(`^(?:(?:1[0-9][0-9]\.)|(?:2[0-4][0-9]\.)|(?:25[0-5]\.)|(?:[1-9][0-9]\.)|(?:[0-9]\.)){3}(?:(?:1[0-9][0-9])|(?:2[0-4][0-9])|(?:25[0-5])|(?:[1-9][0-9])|(?:[0-9]))$`, ip)
if err != nil { if err != nil {

View File

@@ -33,7 +33,7 @@ func (s *static) Start() error {
if handler != nil { if handler != nil {
return nil return nil
} }
handler = NewHeathCheckHandler(s.cfg) handler = NewHeathCheckHandler(s.services, s.cfg)
return nil return nil
} }
@@ -78,14 +78,14 @@ func (s *static) CheckSkill(skill string) bool {
} }
// GetApp 获取服务发现中目标服务的app // GetApp 获取服务发现中目标服务的app
func (s *static) GetApp(config string) (discovery.IAppAgent, error) { func (s *static) GetApp(config string) (discovery.IApp, error) {
app, err := s.decode(config) app, err := s.decode(config)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return app, nil return app.Agent(), nil
} }
// Remove 从所有服务app中移除目标app // Remove 从所有服务app中移除目标app

View File

@@ -22,7 +22,7 @@ var (
type Service struct { type Service struct {
eocontext.BalanceHandler eocontext.BalanceHandler
app discovery.IAppAgent app discovery.IApp
scheme string scheme string
timeout time.Duration timeout time.Duration
@@ -73,7 +73,7 @@ func (s *Service) Reset(conf interface{}, workers map[eosc.RequireId]eosc.IWorke
if err != nil { if err != nil {
return err return err
} }
var apps discovery.IAppAgent var apps discovery.IApp
if data.Discovery != "" { if data.Discovery != "" {
discoveryWorker, has := workers[data.Discovery] discoveryWorker, has := workers[data.Discovery]
if !has { if !has {
@@ -98,13 +98,17 @@ func (s *Service) Reset(conf interface{}, workers map[eosc.RequireId]eosc.IWorke
apps.Close() apps.Close()
return err return err
} }
old := s.app
s.app = apps s.app = apps
if old != nil {
old.Close()
}
s.scheme = data.Scheme s.scheme = data.Scheme
s.timeout = time.Duration(data.Timeout) * time.Millisecond s.timeout = time.Duration(data.Timeout) * time.Millisecond
s.BalanceHandler = balanceHandler s.BalanceHandler = balanceHandler
s.passHost = parsePassHost(data.PassHost) s.passHost = parsePassHost(data.PassHost)
s.scheme = data.Scheme s.scheme = data.Scheme
s.app = apps
s.upstreamHost = data.UpstreamHost s.upstreamHost = data.UpstreamHost
return nil return nil

View File

@@ -1,29 +1 @@
package health_check_http package health_check_http
import "github.com/eolinker/apinto/discovery"
// agent 从属于HTTPCheck,实现了IHealthChecker接口
type agent struct {
agentID string
checker *HTTPCheck
}
// NewAgent 创建agent
func NewAgent(agentID string, checker *HTTPCheck) discovery.IHealthChecker {
return &agent{agentID: agentID, checker: checker}
}
// AddToCheck 将节点添加进HTTPCheck的检查列表
func (a *agent) AddToCheck(node discovery.BaseNode) error {
a.checker.addToCheck(&checkNode{
node: node,
agentID: a.agentID,
})
return nil
}
// Stop 停止agent并且将HTTPCheck中属于该agent的正在检查的所有节点都移除
func (a *agent) Stop() error {
a.checker.stop(a.agentID)
return nil
}

View File

@@ -11,7 +11,10 @@ import (
"github.com/eolinker/eosc/log" "github.com/eolinker/eosc/log"
"github.com/eolinker/apinto/discovery" "github.com/eolinker/apinto/discovery"
"github.com/google/uuid" )
var (
_ discovery.IHealthChecker = (*HTTPCheck)(nil)
) )
// NewHTTPCheck 创建HTTPCheck // NewHTTPCheck 创建HTTPCheck
@@ -22,32 +25,36 @@ func NewHTTPCheck(config Config) *HTTPCheck {
config: &config, config: &config,
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
ch: make(chan *checkNode, 10),
client: &http.Client{}, client: &http.Client{},
locker: sync.RWMutex{}, locker: sync.RWMutex{},
} }
go checker.doCheckLoop()
return checker return checker
} }
// HTTPCheck HTTP健康检查结构,实现了IHealthChecker接口 // HTTPCheck HTTP健康检查结构,实现了IHealthChecker接口
type HTTPCheck struct { type HTTPCheck struct {
config *Config config *Config
nodes discovery.INodes
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
ch chan *checkNode
delCh chan string
client *http.Client client *http.Client
locker sync.RWMutex locker sync.RWMutex
} }
func (h *HTTPCheck) Check(nodes discovery.INodes) {
go h.doCheckLoop(nodes)
}
// doCheckLoop 定时检查,维护了一个待检测节点集合 // doCheckLoop 定时检查,维护了一个待检测节点集合
func (h *HTTPCheck) doCheckLoop() { func (h *HTTPCheck) doCheckLoop(nodes discovery.INodes) {
if h.config.Period < 1 { if h.config.Period < 1 {
return return
} }
ticker := time.NewTicker(h.config.Period) ticker := time.NewTicker(h.config.Period)
nodes := map[string]map[string]*checkNode{}
defer ticker.Stop() defer ticker.Stop()
for { for {
select { select {
@@ -55,84 +62,43 @@ func (h *HTTPCheck) doCheckLoop() {
return return
case <-ticker.C: case <-ticker.C:
{ {
nodes = h.check(nodes) h.check(nodes.All())
}
//接收待检测节点并存入待检测节点集合
case node, ok := <-h.ch:
{
if ok {
if _, ok := nodes[node.agentID]; !ok {
nodes[node.agentID] = make(map[string]*checkNode)
}
nodes[node.agentID][node.node.ID()] = node
}
}
//接收agentID,并将待检测集合中属于该agent的所有节点移除
case agentID, ok := <-h.delCh:
{
if ok {
delete(nodes, agentID)
}
} }
} }
} }
} }
// Agent 生成一个agent
func (h *HTTPCheck) Agent() (discovery.IHealthChecker, error) {
return NewAgent(uuid.NewString(), h), nil
}
// Reset 重置HTTPCheck的配置 // Reset 重置HTTPCheck的配置
func (h *HTTPCheck) Reset(conf Config) error { func (h *HTTPCheck) Reset(conf interface{}) error {
h.config = &conf cf, ok := conf.(Config)
if !ok {
return nil
}
h.reset(&cf)
return nil return nil
} }
func (h *HTTPCheck) reset(conf *Config) {
// AddToCheck 将节点添加进HTTPCheck的检查列表 h.config = conf
func (h *HTTPCheck) AddToCheck(node discovery.BaseNode) error {
h.addToCheck(&checkNode{
node: node,
agentID: "",
})
return nil
}
// addToCheck 将节点传入HTTPCheck的检测Channel
func (h *HTTPCheck) addToCheck(node *checkNode) error {
h.ch <- node
return nil
} }
// Stop 停止HTTPCheck中止定时检查 // Stop 停止HTTPCheck中止定时检查
func (h *HTTPCheck) Stop() error { func (h *HTTPCheck) Stop() {
h.cancel() h.cancel()
return nil
}
// stop 停止从属该agentID的所有节点的健康检查
func (h *HTTPCheck) stop(agentID string) {
h.delCh <- agentID
} }
// check 对待检查的节点集合进行检测入参nodes map[agentID][nodeID]*checkNode // check 对待检查的节点集合进行检测入参nodes map[agentID][nodeID]*checkNode
func (h *HTTPCheck) check(nodes map[string]map[string]*checkNode) map[string]map[string]*checkNode { func (h *HTTPCheck) check(nodes []discovery.INode) {
//将待检测节点集合中地址相同的节点整合在一起结构为map[node.Addr][]*checkNode
newNodes := make(map[string][]*checkNode)
for _, ns := range nodes {
for _, n := range ns {
if n.node.Status() == discovery.Down {
newNodes[n.node.Addr()] = append(newNodes[n.node.Addr()], n)
}
}
}
/*对每个节点地址进行检测 /*对每个节点地址进行检测
成功则将属于该地址的所有节点的状态都置于可运行并从HTTPCheck维护的待检测节点列表中移除 成功则将属于该地址的所有节点的状态都置于可运行并从HTTPCheck维护的待检测节点列表中移除
失败则下次定时检查再进行检测 失败则下次定时检查再进行检测
*/ */
for addr, ns := range newNodes { for _, ns := range nodes {
uri := fmt.Sprintf("%s://%s/%s", h.config.Protocol, strings.TrimSuffix(addr, "/"), strings.TrimPrefix(h.config.URL, "/")) if ns.Status() != discovery.Down {
continue
}
uri := fmt.Sprintf("%s://%s/%s", h.config.Protocol, strings.TrimSuffix(ns.Addr(), "/"), strings.TrimPrefix(h.config.URL, "/"))
h.client.Timeout = h.config.Timeout h.client.Timeout = h.config.Timeout
request, err := http.NewRequest(h.config.Method, uri, nil) request, err := http.NewRequest(h.config.Method, uri, nil)
if err != nil { if err != nil {
@@ -149,16 +115,6 @@ func (h *HTTPCheck) check(nodes map[string]map[string]*checkNode) map[string]map
log.Error(err) log.Error(err)
continue continue
} }
for _, n := range ns { ns.Up()
n.node.Up()
delete(nodes[n.agentID], n.node.ID())
}
} }
return nodes
}
// checkNode 进入检查channel的节点结构
type checkNode struct {
node discovery.BaseNode
agentID string
} }