完成consul新增注释

This commit is contained in:
chenjiekun
2021-07-20 15:52:04 +08:00
parent 5ea1cfb755
commit 9e0aff29a3
17 changed files with 83 additions and 77 deletions

View File

@@ -20,10 +20,10 @@ func (s *app) Reset(nodes []INode) {
for _, node := range nodes { for _, node := range nodes {
if n, has := s.nodes[node.Id()]; has { if n, has := s.nodes[node.ID()]; has {
n.Leave() n.Leave()
} }
tmp[node.Id()] = node tmp[node.ID()] = node
} }
s.locker.Lock() s.locker.Lock()
@@ -40,6 +40,7 @@ func (s *app) GetAttrByName(name string) (string, bool) {
return attr, ok return attr, ok
} }
//NewApp 创建服务发现应用
func NewApp(checker IHealthChecker, container IAppContainer, attrs Attrs, nodes map[string]INode) IApp { func NewApp(checker IHealthChecker, container IAppContainer, attrs Attrs, nodes map[string]INode) IApp {
return &app{ return &app{
attrs: attrs, attrs: attrs,
@@ -51,7 +52,8 @@ func NewApp(checker IHealthChecker, container IAppContainer, attrs Attrs, nodes
} }
} }
func (s *app) Id() string { //ID 返回服务发现应用的id
func (s *app) ID() string {
return s.id return s.id
} }
@@ -83,7 +85,9 @@ func (s *app) NodeError(id string) error {
return nil return nil
} }
//Close 关闭服务发现的应用
func (s *app) Close() error { func (s *app) Close() error {
//
s.container.Remove(s.id) s.container.Remove(s.id)
return s.healthChecker.Stop() return s.healthChecker.Stop()
} }

View File

@@ -1,5 +1,6 @@
package discovery_consul package consul
//Config consul驱动配置
type Config struct { type Config struct {
Name string `json:"name"` Name string `json:"name"`
Driver string `json:"driver"` Driver string `json:"driver"`
@@ -7,6 +8,7 @@ type Config struct {
Config AccessConfig `json:"config"` Config AccessConfig `json:"config"`
} }
//AccessConfig 接入地址配置
type AccessConfig struct { type AccessConfig struct {
Address []string `json:"address"` Address []string `json:"address"`
Params map[string]string `json:"params"` Params map[string]string `json:"params"`

View File

@@ -1,4 +1,4 @@
package discovery_consul package consul
import ( import (
"context" "context"
@@ -61,7 +61,7 @@ func (c *consul) Start() error {
return nil return nil
} }
// Reset 重置服务发现配置 // Reset 重置consul实例配置
func (c *consul) Reset(config interface{}, workers map[eosc.RequireId]interface{}) error { func (c *consul) Reset(config interface{}, workers map[eosc.RequireId]interface{}) error {
workerConfig, ok := config.(*Config) workerConfig, ok := config.(*Config)
if !ok { if !ok {
@@ -81,11 +81,12 @@ func (c *consul) Stop() error {
return nil return nil
} }
//Remove 从所有服务应用中移除目标应用
func (c *consul) Remove(id string) error { func (c *consul) Remove(id string) error {
return c.services.Remove(id) return c.services.Remove(id)
} }
// GetApp 获取服务发现应用 // GetApp 获取服务发现中对应服务的应用
func (c *consul) GetApp(serviceName string) (discovery.IApp, error) { func (c *consul) GetApp(serviceName string) (discovery.IApp, error) {
nodes, err := c.getNodes(serviceName) nodes, err := c.getNodes(serviceName)
if err != nil { if err != nil {
@@ -96,28 +97,27 @@ func (c *consul) GetApp(serviceName string) (discovery.IApp, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
c.services.Set(serviceName, app.Id(), app) c.services.Set(serviceName, app.ID(), app)
return app, nil return app, nil
} }
// Create 创建服务发现应用 // Create 创建对应服务的应用
func (c *consul) Create(serviceName string, attrs map[string]string, nodes map[string]discovery.INode) (discovery.IApp, error) { func (c *consul) Create(serviceName string, attrs map[string]string, nodes map[string]discovery.INode) (discovery.IApp, error) {
return discovery.NewApp(nil, c, attrs, nodes), nil return discovery.NewApp(nil, c, attrs, nodes), nil
} }
// Id 返回 worker id // Id 返回 worker id
func (n *consul) Id() string { func (c *consul) Id() string {
return n.id return c.id
} }
func (n *consul) CheckSkill(skill string) bool { //CheckSkill 检查目标能力是否存在
func (c *consul) CheckSkill(skill string) bool {
return discovery.CheckSkill(skill) return discovery.CheckSkill(skill)
} }
//getNodes 通过接入地址获取节点信息 //getNodes 通过接入地址获取节点信息
func (c *consul) getNodes(service string) (map[string]discovery.INode, error) { func (c *consul) getNodes(service string) (map[string]discovery.INode, error) {
//TODO Labels怎么处理
nodeSet := make(map[string]discovery.INode) nodeSet := make(map[string]discovery.INode)
for _, addr := range c.address { for _, addr := range c.address {
@@ -133,8 +133,8 @@ func (c *consul) getNodes(service string) (map[string]discovery.INode, error) {
clientNodes := getNodesFromClient(client, service) clientNodes := getNodesFromClient(client, service)
for _, node := range clientNodes { for _, node := range clientNodes {
if _, has := nodeSet[node.Id()]; !has { if _, has := nodeSet[node.ID()]; !has {
nodeSet[node.Id()] = node nodeSet[node.ID()] = node
} }
} }
} }

View File

@@ -1,4 +1,4 @@
package discovery_consul package consul
import ( import (
"github.com/eolinker/eosc/log" "github.com/eolinker/eosc/log"

View File

@@ -1,4 +1,4 @@
package discovery_consul package consul
import ( import (
"fmt" "fmt"
@@ -22,14 +22,12 @@ type driver struct {
params map[string]string params map[string]string
} }
func NewDriver() *driver { //ConfigType 返回consul驱动配置的反射类型
return &driver{configType: reflect.TypeOf(new(Config))}
}
func (d *driver) ConfigType() reflect.Type { func (d *driver) ConfigType() reflect.Type {
return d.configType return d.configType
} }
//Create 创建consul驱动实例
func (d *driver) Create(id, name string, v interface{}, workers map[eosc.RequireId]interface{}) (eosc.IWorker, error) { func (d *driver) Create(id, name string, v interface{}, workers map[eosc.RequireId]interface{}) (eosc.IWorker, error) {
workerConfig, ok := v.(*Config) workerConfig, ok := v.(*Config)
if !ok { if !ok {

View File

@@ -1,4 +1,4 @@
package discovery_consul package consul
import ( import (
"reflect" "reflect"
@@ -6,6 +6,7 @@ import (
"github.com/eolinker/eosc" "github.com/eolinker/eosc"
) )
//Register 注册consul驱动工厂
func Register() { func Register() {
eosc.DefaultProfessionDriverRegister.RegisterProfessionDriver("eolinker:goku:discovery_consul", NewFactory()) eosc.DefaultProfessionDriverRegister.RegisterProfessionDriver("eolinker:goku:discovery_consul", NewFactory())
} }
@@ -18,10 +19,12 @@ type factory struct {
params map[string]string params map[string]string
} }
func NewFactory() *factory { //NewFactory 创建consul驱动工厂
func NewFactory() eosc.IProfessionDriverFactory {
return &factory{} return &factory{}
} }
//ExtendInfo 返回consul驱动工厂的信息
func (f *factory) ExtendInfo() eosc.ExtendInfo { func (f *factory) ExtendInfo() eosc.ExtendInfo {
return eosc.ExtendInfo{ return eosc.ExtendInfo{
ID: "eolinker:goku:discovery_consul", ID: "eolinker:goku:discovery_consul",
@@ -31,6 +34,7 @@ func (f *factory) ExtendInfo() eosc.ExtendInfo {
} }
} }
//Create 创建consul驱动
func (f *factory) Create(profession string, name string, label string, desc string, params map[string]string) (eosc.IProfessionDriver, error) { func (f *factory) Create(profession string, name string, label string, desc string, params map[string]string) (eosc.IProfessionDriver, error) {
return &driver{ return &driver{
profession: profession, profession: profession,

View File

@@ -1,4 +1,4 @@
package discovery_consul package consul
import ( import (
"github.com/eolinker/eosc/log" "github.com/eolinker/eosc/log"
@@ -9,7 +9,7 @@ import (
"strings" "strings"
) )
// getConsulClient 创建并返回consul客户端 //getConsulClient 创建并返回consul客户端
func getConsulClient(addr string, param map[string]string) (*api.Client, error) { func getConsulClient(addr string, param map[string]string) (*api.Client, error) {
defaultConfig := api.DefaultConfig() defaultConfig := api.DefaultConfig()
//配置信息写入进defaultConfig里 //配置信息写入进defaultConfig里
@@ -26,12 +26,10 @@ func getConsulClient(addr string, param map[string]string) (*api.Client, error)
return client, nil return client, nil
} }
// getNodesFromClient 从连接的客户端返回健康的节点信息 //getNodesFromClient 从连接的客户端返回健康的节点信息
func getNodesFromClient(client *api.Client, service string) []discovery.INode { func getNodesFromClient(client *api.Client, service string) []discovery.INode {
queryOptions := &api.QueryOptions{} queryOptions := &api.QueryOptions{}
serviceEntryArr, _, err := client.Health().Service(service, "", true, queryOptions) serviceEntryArr, _, err := client.Health().Service(service, "", true, queryOptions)
//log.Info(serviceEntryArr)
//catalogService, _, err := client.Catalog().Service(service, "", queryOptions)
if err != nil { if err != nil {
return nil return nil
} }
@@ -57,7 +55,7 @@ func getNodesFromClient(client *api.Client, service string) []discovery.INode {
return nodes return nodes
} }
// validAddr 判断地址是否合法 //validAddr 判断地址是否合法
func validAddr(addr string) bool { func validAddr(addr string) bool {
c := strings.Split(addr, ":") c := strings.Split(addr, ":")
if len(c) < 2 { if len(c) < 2 {

View File

@@ -13,14 +13,14 @@ import (
) )
type eureka struct { type eureka struct {
id string id string
name string name string
address []string address []string
params map[string]string params map[string]string
labels map[string]string labels map[string]string
services discovery.IServices services discovery.IServices
context context.Context context context.Context
cancelFunc context.CancelFunc cancelFunc context.CancelFunc
} }
func (e *eureka) GetApp(serviceName string) (discovery.IApp, error) { func (e *eureka) GetApp(serviceName string) (discovery.IApp, error) {
@@ -28,7 +28,7 @@ func (e *eureka) GetApp(serviceName string) (discovery.IApp, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
err = e.services.Set(serviceName, app.Id(), app) err = e.services.Set(serviceName, app.ID(), app)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -132,10 +132,10 @@ func (e *eureka) GetNodeList(serviceName string) (map[string]discovery.INode, er
// label[k] = v // label[k] = v
//} //}
node := discovery.NewNode(label, ins.InstanceID, ins.IPAddr, port) node := discovery.NewNode(label, ins.InstanceID, ins.IPAddr, port)
if _, ok := nodes[node.Id()]; ok { if _, ok := nodes[node.ID()]; ok {
continue continue
} }
nodes[node.Id()] = node nodes[node.ID()] = node
} }
} }
return nodes, nil return nodes, nil

View File

@@ -18,17 +18,17 @@ func TestGetApp(t *testing.T) {
"username": "test", "username": "test",
"password": "test", "password": "test",
}, },
labels: nil, labels: nil,
services: discovery.NewServices(), services: discovery.NewServices(),
context: nil, context: nil,
cancelFunc: nil, cancelFunc: nil,
} }
app, err := e.GetApp(serviceName) app, err := e.GetApp(serviceName)
if err!=nil { if err != nil {
fmt.Println("error:", err) fmt.Println("error:", err)
} }
for _, node := range app.Nodes(){ for _, node := range app.Nodes() {
fmt.Println(node.Id()) fmt.Println(node.ID())
} }
} }

View File

@@ -18,14 +18,14 @@ const (
) )
type nacos struct { type nacos struct {
id string id string
name string name string
address []string address []string
params map[string]string params map[string]string
labels map[string]string labels map[string]string
services discovery.IServices services discovery.IServices
context context.Context context context.Context
cancelFunc context.CancelFunc cancelFunc context.CancelFunc
} }
// return worker id // return worker id
@@ -103,7 +103,7 @@ func (n *nacos) GetApp(serviceName string) (discovery.IApp, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
n.services.Set(serviceName, app.Id(), app) n.services.Set(serviceName, app.ID(), app)
return app, nil return app, nil
} }
@@ -130,15 +130,15 @@ func (n *nacos) GetNodeList(query map[string]string) (map[string]discovery.INode
for _, host := range ins.Hosts { for _, host := range ins.Hosts {
label := map[string]string{ label := map[string]string{
"valid": strconv.FormatBool(host.Valid), "valid": strconv.FormatBool(host.Valid),
"marked": strconv.FormatBool(host.Marked), "marked": strconv.FormatBool(host.Marked),
"weight": strconv.FormatFloat(host.Weight, 'f', -1, 64), "weight": strconv.FormatFloat(host.Weight, 'f', -1, 64),
} }
node := discovery.NewNode(label, host.InstanceId, host.Ip, host.Port) node := discovery.NewNode(label, host.InstanceId, host.Ip, host.Port)
if _, ok := nodes[node.Id()]; ok { if _, ok := nodes[node.ID()]; ok {
continue continue
} }
nodes[node.Id()] = node nodes[node.ID()] = node
} }
} }
return nodes, nil return nodes, nil
@@ -149,7 +149,7 @@ func (n *nacos) GetInstanceList(addr string, query map[string]string) (*Instance
addr = addr + instancePath addr = addr + instancePath
if !strings.HasPrefix(addr, "http://") && !strings.HasPrefix(addr, "https://") { if !strings.HasPrefix(addr, "http://") && !strings.HasPrefix(addr, "https://") {
addr = fmt.Sprintf("http://%s", addr) addr = fmt.Sprintf("http://%s", addr)
if v,ok := n.labels["schema"]; ok { if v, ok := n.labels["schema"]; ok {
if v == "https" { if v == "https" {
addr = fmt.Sprintf("https://%s", addr) addr = fmt.Sprintf("https://%s", addr)
} }

View File

@@ -16,12 +16,12 @@ func TestNacos_GetApp(t *testing.T) {
"username": "test", "username": "test",
"password": "test", "password": "test",
}, },
services: discovery.NewServices(), services: discovery.NewServices(),
context: nil, context: nil,
cancelFunc: nil, cancelFunc: nil,
} }
app, _ := n.GetApp(serviceName) app, _ := n.GetApp(serviceName)
for _, node := range app.Nodes(){ for _, node := range app.Nodes() {
fmt.Println(node.Id()) fmt.Println(node.ID())
} }
} }

View File

@@ -103,7 +103,7 @@ func (s *static) GetApp(config string) (discovery.IApp, error) {
return nil, err return nil, err
} }
s.locker.Lock() s.locker.Lock()
s.apps[app.Id()] = app s.apps[app.ID()] = app
s.locker.Unlock() s.locker.Unlock()
return app, nil return app, nil
} }
@@ -178,7 +178,7 @@ func (s *static) decode(config string) (discovery.IApp, error) {
if word[l-1] == ';' { if word[l-1] == ';' {
n := discovery.NewNode(node.labels, fmt.Sprintf("%s:%d", node.ip, node.port), node.ip, node.port) n := discovery.NewNode(node.labels, fmt.Sprintf("%s:%d", node.ip, node.port), node.ip, node.port)
nodes[n.Id()] = n nodes[n.ID()] = n
index = 0 index = 0
node = nil node = nil
} else { } else {

View File

@@ -12,7 +12,7 @@ type IDiscovery interface {
//IApp app接口 //IApp app接口
type IApp interface { type IApp interface {
IAttributes IAttributes
Id() string ID() string
Nodes() []INode Nodes() []INode
Reset([]INode) Reset([]INode)
NodeError(id string) error NodeError(id string) error
@@ -27,7 +27,7 @@ type IAppContainer interface {
//INode 节点接口 //INode 节点接口
type INode interface { type INode interface {
IAttributes IAttributes
Id() string ID() string
Ip() string Ip() string
Port() int Port() int
Addr() string Addr() string

View File

@@ -33,7 +33,7 @@ func (n *Node) Port() int {
return n.port return n.port
} }
func (n *Node) Id() string { func (n *Node) ID() string {
return n.id return n.id
} }

View File

@@ -77,7 +77,7 @@ package health_check_http
// from: c.id, // from: c.id,
// } // }
// c.ch <- n // c.ch <- n
// c.nodes[checkNode.Id()] = n // c.nodes[checkNode.ID()] = n
// return nil // return nil
//} //}
// //

View File

@@ -57,7 +57,7 @@ func (h *HttpCheck) doCheckLoop() {
if _, ok := nodes[node.agentId]; !ok { if _, ok := nodes[node.agentId]; !ok {
nodes[node.agentId] = make(map[string]*checkNode) nodes[node.agentId] = make(map[string]*checkNode)
} }
nodes[node.agentId][node.node.Id()] = node nodes[node.agentId][node.node.ID()] = node
} }
} }
case id, ok := <-h.delCh: case id, ok := <-h.delCh:
@@ -130,7 +130,7 @@ func (h *HttpCheck) check(nodes map[string]map[string]*checkNode) map[string]map
} }
for _, n := range ns { for _, n := range ns {
n.node.Up() n.node.Up()
delete(nodes[n.agentId], n.node.Id()) delete(nodes[n.agentId], n.node.ID())
} }
} }
return nodes return nodes

View File

@@ -101,7 +101,7 @@ func (h *httpUpstream) Send(ctx *http_context.Context, serviceDetail service.ISe
if response == nil { if response == nil {
node.Down() node.Down()
} }
h.app.NodeError(node.Id()) h.app.NodeError(node.ID())
node, err = h.balanceHandler.Next() node, err = h.balanceHandler.Next()
if err != nil { if err != nil {
return nil, err return nil, err