Finish private-model
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
package certs
|
||||
|
||||
type Config struct {
|
||||
Name string `json:"name" label:"证书名"`
|
||||
Name string `json:"name" label:"证书名称" description:"证书名称"`
|
||||
Key string `json:"key" label:"密钥内容" format:"file" description:"密钥文件的后缀名一般为.key"`
|
||||
Pem string `json:"pem" label:"证书内容" format:"file" description:"证书文件的后缀名一般为.crt 或 .pem"`
|
||||
}
|
||||
|
||||
@@ -27,6 +27,8 @@ type Config struct {
|
||||
PartitionKey string `json:"partition_key" yaml:"partition_key" switch:"partition_type==='hash'"`
|
||||
Type string `json:"type" yaml:"type" enum:"json,line" label:"输出格式"`
|
||||
ContentResize []ContentResize `json:"content_resize" yaml:"content_resize" label:"内容截断配置" switch:"type===json"`
|
||||
EnableSASL bool `json:"enable_sasl" yaml:"enable_sasl" label:"是否启用SASL"`
|
||||
SaslConfig *SaslConfig `json:"sasl" yaml:"sasl"`
|
||||
Filters []*Filter `json:"filters" yaml:"conditions" label:"过滤条件"`
|
||||
Formatter eosc.FormatterConfig `json:"formatter" yaml:"formatter" label:"格式化配置"`
|
||||
}
|
||||
@@ -59,6 +61,12 @@ type ProducerConfig struct {
|
||||
Filters []*Filter `json:"filters" yaml:"conditions" label:"过滤条件"`
|
||||
}
|
||||
|
||||
type SaslConfig struct {
|
||||
User string `json:"user" yaml:"user"`
|
||||
Password string `json:"password" yaml:"password"`
|
||||
Mechanism string `json:"mechanism" yaml:"mechanism"`
|
||||
}
|
||||
|
||||
func (c *Config) doCheck() (*ProducerConfig, error) {
|
||||
conf := c
|
||||
if conf.Topic == "" {
|
||||
@@ -121,6 +129,21 @@ func (c *Config) doCheck() (*ProducerConfig, error) {
|
||||
if conf.Type == "" {
|
||||
conf.Type = "line"
|
||||
}
|
||||
if conf.EnableSASL {
|
||||
|
||||
s.Net.SASL.Enable = true
|
||||
s.Net.SASL.User = conf.SaslConfig.User
|
||||
s.Net.SASL.Password = conf.SaslConfig.Password
|
||||
s.Net.SASL.Mechanism = sarama.SASLMechanism(conf.SaslConfig.Mechanism)
|
||||
s.Net.SASL.Handshake = true
|
||||
if s.Net.SASL.Mechanism == sarama.SASLTypeSCRAMSHA256 {
|
||||
s.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
|
||||
} else if s.Net.SASL.Mechanism == sarama.SASLTypeSCRAMSHA512 {
|
||||
s.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA256} }
|
||||
s.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256
|
||||
|
||||
}
|
||||
}
|
||||
p.Scopes = conf.Scopes
|
||||
p.Type = conf.Type
|
||||
p.ContentResize = conf.ContentResize
|
||||
|
||||
37
drivers/output/kafka/sarama_client.go
Normal file
37
drivers/output/kafka/sarama_client.go
Normal file
@@ -0,0 +1,37 @@
|
||||
package kafka
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"crypto/sha512"
|
||||
|
||||
"github.com/xdg-go/scram"
|
||||
)
|
||||
|
||||
var (
|
||||
SHA256 scram.HashGeneratorFcn = sha256.New
|
||||
SHA512 scram.HashGeneratorFcn = sha512.New
|
||||
)
|
||||
|
||||
type XDGSCRAMClient struct {
|
||||
*scram.Client
|
||||
*scram.ClientConversation
|
||||
scram.HashGeneratorFcn
|
||||
}
|
||||
|
||||
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
|
||||
x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
x.ClientConversation = x.Client.NewConversation()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
|
||||
response, err = x.ClientConversation.Step(challenge)
|
||||
return
|
||||
}
|
||||
|
||||
func (x *XDGSCRAMClient) Done() bool {
|
||||
return x.ClientConversation.Done()
|
||||
}
|
||||
@@ -148,8 +148,10 @@ func (p *PluginManager) createChain(id string, conf map[string]*plugin.Config) *
|
||||
obj = NewPluginObj(chain, id, conf)
|
||||
p.pluginObjs.Set(id, obj)
|
||||
} else {
|
||||
obj.conf = conf
|
||||
obj.fs = chain
|
||||
}
|
||||
|
||||
log.Debug("create chain len: ", len(chain))
|
||||
return obj
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ func (e *executor) DoFilter(ctx eocontext.EoContext, next eocontext.IChain) (err
|
||||
}
|
||||
|
||||
func (e *executor) DoHttpFilter(ctx http_service.IHttpContext, next eocontext.IChain) (err error) {
|
||||
ctx.SetLabel("disable_stream", "true")
|
||||
if next != nil {
|
||||
err = next.DoChain(ctx)
|
||||
if err != nil {
|
||||
|
||||
4
go.mod
4
go.mod
@@ -44,6 +44,8 @@ require (
|
||||
google.golang.org/protobuf v1.34.2
|
||||
)
|
||||
|
||||
require github.com/xdg-go/scram v1.1.0
|
||||
|
||||
require (
|
||||
cloud.google.com/go/compute v1.23.3 // indirect
|
||||
cloud.google.com/go/compute/metadata v0.2.3 // indirect
|
||||
@@ -102,6 +104,8 @@ require (
|
||||
github.com/spaolacci/murmur3 v1.1.0 // indirect
|
||||
github.com/tklauser/go-sysconf v0.3.10 // indirect
|
||||
github.com/tklauser/numcpus v0.4.0 // indirect
|
||||
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
|
||||
github.com/xdg-go/stringprep v1.0.2 // indirect
|
||||
github.com/yusufpapurcu/wmi v1.2.2 // indirect
|
||||
go.opencensus.io v0.24.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.20.0 // indirect
|
||||
|
||||
@@ -225,7 +225,8 @@ func (ctx *HttpContext) SendTo(scheme string, node eoscContext.INode, timeout ti
|
||||
response.Header.CopyTo(&ctx.response.Response.Header)
|
||||
ctx.response.ResponseHeader.refresh()
|
||||
if response.IsBodyStream() && response.Header.ContentLength() < 0 {
|
||||
if response.StatusCode() == 200 {
|
||||
disableStream := ctx.GetLabel("disable_stream")
|
||||
if response.StatusCode() == 200 && disableStream != "true" {
|
||||
// 流式传输,非200状态码不考虑流式传输
|
||||
ctx.response.Response.SetStatusCode(response.StatusCode())
|
||||
ctx.SetLabel("stream_running", "true")
|
||||
@@ -242,13 +243,6 @@ func (ctx *HttpContext) SendTo(scheme string, node eoscContext.INode, timeout ti
|
||||
n, err := reader.Read(buffer)
|
||||
if n > 0 {
|
||||
chunk := buffer[:n]
|
||||
//for _, streamFunc := range c().StreamFunc() {
|
||||
// chunk, err = streamFunc(ctx, chunk)
|
||||
// if err != nil {
|
||||
// log.Errorf("exec stream func error: %v", err)
|
||||
// break
|
||||
// }
|
||||
//}
|
||||
chunk, err = ctx.proxyRequest.StreamBodyHandles(ctx, chunk)
|
||||
if err != nil {
|
||||
log.Errorf("exec stream func error: %v", err)
|
||||
|
||||
@@ -6,6 +6,8 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/valyala/bytebufferpool"
|
||||
|
||||
http_service "github.com/eolinker/eosc/eocontext/http-context"
|
||||
|
||||
"github.com/valyala/fasthttp"
|
||||
@@ -120,7 +122,15 @@ func (r *Response) SetBody(bytes []byte) {
|
||||
}
|
||||
|
||||
if strings.Contains(r.GetHeader("Content-Encoding"), "gzip") {
|
||||
r.DelHeader("Content-Encoding")
|
||||
var bb bytebufferpool.ByteBuffer
|
||||
_, err := fasthttp.WriteGunzip(&bb, bytes)
|
||||
if err == nil {
|
||||
r.DelHeader("Content-Encoding")
|
||||
r.SetHeader("Content-Length", strconv.Itoa(len(bb.B)))
|
||||
r.Response.SetBody(bb.B)
|
||||
r.responseError = nil
|
||||
return
|
||||
}
|
||||
}
|
||||
r.Response.SetBody(bytes)
|
||||
r.length = len(bytes)
|
||||
|
||||
Reference in New Issue
Block a user