From 889dc359aab4ee3bc31873a022c4cfd1ada8d9fd Mon Sep 17 00:00:00 2001 From: Liujian <824010343@qq.com> Date: Thu, 27 Mar 2025 14:30:49 +0800 Subject: [PATCH] Finish private-model --- drivers/certs/config.go | 2 +- drivers/output/kafka/config.go | 23 +++++++++++++ drivers/output/kafka/sarama_client.go | 37 +++++++++++++++++++++ drivers/plugin-manager/manager.go | 2 ++ drivers/plugins/response-filter/executor.go | 1 + go.mod | 4 +++ node/http-context/context.go | 10 ++---- node/http-context/response.go | 12 ++++++- 8 files changed, 81 insertions(+), 10 deletions(-) create mode 100644 drivers/output/kafka/sarama_client.go diff --git a/drivers/certs/config.go b/drivers/certs/config.go index cef2927c..3b16f4c8 100644 --- a/drivers/certs/config.go +++ b/drivers/certs/config.go @@ -1,7 +1,7 @@ package certs type Config struct { - Name string `json:"name" label:"证书名"` + Name string `json:"name" label:"证书名称" description:"证书名称"` Key string `json:"key" label:"密钥内容" format:"file" description:"密钥文件的后缀名一般为.key"` Pem string `json:"pem" label:"证书内容" format:"file" description:"证书文件的后缀名一般为.crt 或 .pem"` } diff --git a/drivers/output/kafka/config.go b/drivers/output/kafka/config.go index dcb38d6d..925dad9e 100644 --- a/drivers/output/kafka/config.go +++ b/drivers/output/kafka/config.go @@ -27,6 +27,8 @@ type Config struct { PartitionKey string `json:"partition_key" yaml:"partition_key" switch:"partition_type==='hash'"` Type string `json:"type" yaml:"type" enum:"json,line" label:"输出格式"` ContentResize []ContentResize `json:"content_resize" yaml:"content_resize" label:"内容截断配置" switch:"type===json"` + EnableSASL bool `json:"enable_sasl" yaml:"enable_sasl" label:"是否启用SASL"` + SaslConfig *SaslConfig `json:"sasl" yaml:"sasl"` Filters []*Filter `json:"filters" yaml:"conditions" label:"过滤条件"` Formatter eosc.FormatterConfig `json:"formatter" yaml:"formatter" label:"格式化配置"` } @@ -59,6 +61,12 @@ type ProducerConfig struct { Filters []*Filter `json:"filters" yaml:"conditions" label:"过滤条件"` } +type SaslConfig struct { + User string `json:"user" yaml:"user"` + Password string `json:"password" yaml:"password"` + Mechanism string `json:"mechanism" yaml:"mechanism"` +} + func (c *Config) doCheck() (*ProducerConfig, error) { conf := c if conf.Topic == "" { @@ -121,6 +129,21 @@ func (c *Config) doCheck() (*ProducerConfig, error) { if conf.Type == "" { conf.Type = "line" } + if conf.EnableSASL { + + s.Net.SASL.Enable = true + s.Net.SASL.User = conf.SaslConfig.User + s.Net.SASL.Password = conf.SaslConfig.Password + s.Net.SASL.Mechanism = sarama.SASLMechanism(conf.SaslConfig.Mechanism) + s.Net.SASL.Handshake = true + if s.Net.SASL.Mechanism == sarama.SASLTypeSCRAMSHA256 { + s.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} } + } else if s.Net.SASL.Mechanism == sarama.SASLTypeSCRAMSHA512 { + s.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA256} } + s.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256 + + } + } p.Scopes = conf.Scopes p.Type = conf.Type p.ContentResize = conf.ContentResize diff --git a/drivers/output/kafka/sarama_client.go b/drivers/output/kafka/sarama_client.go new file mode 100644 index 00000000..1657458b --- /dev/null +++ b/drivers/output/kafka/sarama_client.go @@ -0,0 +1,37 @@ +package kafka + +import ( + "crypto/sha256" + "crypto/sha512" + + "github.com/xdg-go/scram" +) + +var ( + SHA256 scram.HashGeneratorFcn = sha256.New + SHA512 scram.HashGeneratorFcn = sha512.New +) + +type XDGSCRAMClient struct { + *scram.Client + *scram.ClientConversation + scram.HashGeneratorFcn +} + +func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) { + x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID) + if err != nil { + return err + } + x.ClientConversation = x.Client.NewConversation() + return nil +} + +func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) { + response, err = x.ClientConversation.Step(challenge) + return +} + +func (x *XDGSCRAMClient) Done() bool { + return x.ClientConversation.Done() +} diff --git a/drivers/plugin-manager/manager.go b/drivers/plugin-manager/manager.go index 5d32fd45..c4b47347 100644 --- a/drivers/plugin-manager/manager.go +++ b/drivers/plugin-manager/manager.go @@ -148,8 +148,10 @@ func (p *PluginManager) createChain(id string, conf map[string]*plugin.Config) * obj = NewPluginObj(chain, id, conf) p.pluginObjs.Set(id, obj) } else { + obj.conf = conf obj.fs = chain } + log.Debug("create chain len: ", len(chain)) return obj } diff --git a/drivers/plugins/response-filter/executor.go b/drivers/plugins/response-filter/executor.go index f120d39b..efb9cb29 100644 --- a/drivers/plugins/response-filter/executor.go +++ b/drivers/plugins/response-filter/executor.go @@ -25,6 +25,7 @@ func (e *executor) DoFilter(ctx eocontext.EoContext, next eocontext.IChain) (err } func (e *executor) DoHttpFilter(ctx http_service.IHttpContext, next eocontext.IChain) (err error) { + ctx.SetLabel("disable_stream", "true") if next != nil { err = next.DoChain(ctx) if err != nil { diff --git a/go.mod b/go.mod index f200ab8b..e6f7beef 100644 --- a/go.mod +++ b/go.mod @@ -44,6 +44,8 @@ require ( google.golang.org/protobuf v1.34.2 ) +require github.com/xdg-go/scram v1.1.0 + require ( cloud.google.com/go/compute v1.23.3 // indirect cloud.google.com/go/compute/metadata v0.2.3 // indirect @@ -102,6 +104,8 @@ require ( github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/tklauser/go-sysconf v0.3.10 // indirect github.com/tklauser/numcpus v0.4.0 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/stringprep v1.0.2 // indirect github.com/yusufpapurcu/wmi v1.2.2 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/otel/metric v1.20.0 // indirect diff --git a/node/http-context/context.go b/node/http-context/context.go index f464b788..da24e461 100644 --- a/node/http-context/context.go +++ b/node/http-context/context.go @@ -225,7 +225,8 @@ func (ctx *HttpContext) SendTo(scheme string, node eoscContext.INode, timeout ti response.Header.CopyTo(&ctx.response.Response.Header) ctx.response.ResponseHeader.refresh() if response.IsBodyStream() && response.Header.ContentLength() < 0 { - if response.StatusCode() == 200 { + disableStream := ctx.GetLabel("disable_stream") + if response.StatusCode() == 200 && disableStream != "true" { // 流式传输,非200状态码不考虑流式传输 ctx.response.Response.SetStatusCode(response.StatusCode()) ctx.SetLabel("stream_running", "true") @@ -242,13 +243,6 @@ func (ctx *HttpContext) SendTo(scheme string, node eoscContext.INode, timeout ti n, err := reader.Read(buffer) if n > 0 { chunk := buffer[:n] - //for _, streamFunc := range c().StreamFunc() { - // chunk, err = streamFunc(ctx, chunk) - // if err != nil { - // log.Errorf("exec stream func error: %v", err) - // break - // } - //} chunk, err = ctx.proxyRequest.StreamBodyHandles(ctx, chunk) if err != nil { log.Errorf("exec stream func error: %v", err) diff --git a/node/http-context/response.go b/node/http-context/response.go index 499e3f6a..f9f844cf 100644 --- a/node/http-context/response.go +++ b/node/http-context/response.go @@ -6,6 +6,8 @@ import ( "strings" "time" + "github.com/valyala/bytebufferpool" + http_service "github.com/eolinker/eosc/eocontext/http-context" "github.com/valyala/fasthttp" @@ -120,7 +122,15 @@ func (r *Response) SetBody(bytes []byte) { } if strings.Contains(r.GetHeader("Content-Encoding"), "gzip") { - r.DelHeader("Content-Encoding") + var bb bytebufferpool.ByteBuffer + _, err := fasthttp.WriteGunzip(&bb, bytes) + if err == nil { + r.DelHeader("Content-Encoding") + r.SetHeader("Content-Length", strconv.Itoa(len(bb.B))) + r.Response.SetBody(bb.B) + r.responseError = nil + return + } } r.Response.SetBody(bytes) r.length = len(bytes)