举个简单的例子
// filter.go
package pipefilter
// 过滤器的输入
type Request interface{}
// 过滤器的输出
type Response interface{}
type Filter interface{
Process(data Request) (Response, error) // 方法:方法名,参数,返回值
}
// split_filter.go
package pipefilter
import "strings"
import "errors"
import "fmt"
type SplitFilter struct {
delimiter string // 分隔符
}
var SplitFilterWrongFormatError = errors.New("input data is not string")
func (sf *SplitFilter) Process(data Request) (Response, error) {
str, ok := data.(string) // 判断类型
if !ok {
return nil, SplitFilterWrongFormatError
}
parts := strings.Split(str, sf.delimiter)
fmt.Println("after split: ", parts)
return parts, nil
}
func NewSplitFilter(delimiter string) *SplitFilter {
return &SplitFilter{delimiter} // 初始化对象
}
// to_int_filter.go
package pipefilter
import "errors"
import "strconv"
import "fmt"
var ToIntFilterWrongFormatError = errors.New("input data should be []string")
type ToIntFilter struct {
}
func (toint *ToIntFilter) Process(data Request) (Response, error) {
strs, ok := data.([]string) // 判断类型
if !ok {
return nil, ToIntFilterWrongFormatError
}
ret := []int{}
for _, str := range strs { // 第二个是元素值
s, err := strconv.Atoi(str)
if err != nil {
return nil, err
}
ret = append(ret, s)
}
fmt.Println("after to int ",ret)
return ret, nil // 返回int化切片
}
func NewToIntFilter() *ToIntFilter {
return &ToIntFilter{}
}
// sum_filter.go
package pipefilter
import "errors"
import "fmt"
type SumFilter struct {
}
var SumFilterWrongFormatError = errors.New("sum is not right")
func (toint *SumFilter) Process(data Request) (Response, error) {
i, ok := data.([]int) // 判断类型
if !ok {
return nil, SumFilterWrongFormatError
}
ret := 0
for _, elem := range i { // _ 是索引
ret += elem
}
fmt.Println("after sum: ",ret)
return ret, nil // 返回求和
}
func NewSumFilter() *SumFilter {
return &SumFilter{}
}
// stright_pipeline.go
package pipefilter
import "fmt"
type StrightPipeline struct {
Name string
Filters *[]Filter
}
func NewStrightPipeline(name string, filters ...Filter) *StrightPipeline {
return &StrightPipeline {
Name: name, // pipeline 名称
Filters: &filters,
}
}
func (sp *StrightPipeline) Process(data Request) (Response, error) {
// var ret interface{}
var err error
// 逐个执行filter
for _, filter := range *sp.Filters { // 不定长参数,传指针
ret, err := filter.Process(data)
fmt.Println(ret)
if err != nil {
return nil, err
}
data = ret // data,作为下个filter输入
}
return data, err // 返回求和
}
// filter_test.go
package pipefilter
import "testing"
func TestPipelineFilter(t *testing.T) {
spliter := NewSplitFilter(",") // delimiter
convert := NewToIntFilter()
sum := NewSumFilter()
sp := NewStrightPipeline("p1", spliter, convert, sum)
ret, err := sp.Process("1,2,3,4,5")
t.Logf("%T", ret)
if err != nil {
t.Fatal(err) // 结束
}
if ret != 15 {
t.Fatalf("Sum is not right! expected is 15, actual is %d", ret)
}
}
从上面的例子就可以理解:只跟数据格式耦合
micro kernel
特点:易于扩展,错误隔离,保持架构一致性代码给哥整懵了
// agent.go
package micro
type Event struct {
name string
content string
}
type EventReceiver interface { // 事件收集器
OnEvent(e Event)
}
type Collector interface { // 要集成的插件 plugin
Init(evt EventReceiver) error
Start(agtCtx context.Context) error // context 方便停止协程
Stop() error
Destory() error
}
type Agent struct { // micro kernel 会集成一些Collector
collectors map[string]Collector
evtBuff chan Event
cancel context.CancelFunc
ctx context.Context
state int
}
func (agt *Agent) EventProcessGroutine() {
var evtSeg [10]Event
for {
for i:=0; i<10; i++ {
select {
case evtSeg[i] = <-agt.evtBuff: // chan
case <-agt.ctx.Done():
return
}
}
}
}
func NewAgent(sizeEvtBuf int) *Agent {
agt := Agent {
collectors: map[string]Collector{},
evtBuff: make(chan Event, sizeEvtBuf),
state: Waiting,
}
return &agt // 取址,返回的是个指针
}
var WrongStateError = errors.New("wrong state, the collector was already reigstered")
func (agt *Agent) RegisterCollector(name string, collector Collector) error {
if agt.state != Waiting {
return WrongStateError
}
agt.collectors[name] = collector
return collector.Init(agt) // 初始化插件
}
func (agt *Agent) startCollectors() error {
var err error
var errs CollectorsError
var mutex sync.Mutex
for name, collector := range agt.collectors {
go func(name string, collector Collector, ctx context.Context) {
defer func() {
mutex.Unlock()
}() // 别忘了调用
err = collector.Start(ctx)
mutex.Lock()
if err != nil {
errs.CollectorErros = append(errs.CollectorErrors, errors.New(name + ":" + err.Error()))
}
}(name, collector, agt.ctx)
}
return errs
}
func (agt *Agent) stopCollectors() error {
var err error
var errs CollectorsError
for name, collector := range agt.collectors {
if err = collector.Stop(); err != nil {
errs.CollectorsErrors = append(errs.CollectorErrors, errors.New(name + ":" + err.Error()))
}
}
return errs
}
func (agt *Agent) destoryCollectors() error {
var err error
var errs CollectorsError
for name, collector := range agt.collectors {
if err = collector.Destory(); err != nil {
errs.CollectorsErrors = append(errs.CollectorErrors, errors.New(name + ":" + err.Error()))
}
}
return errs
}
func (agt *Agent) Start() error { // 只是从agent层面管理的,所以调用的还是collector自身的方法
if agt.state != Waiting {
return WrongStateError
}
agt.state = Running
agt.ctx, agt.cancel = context.WithCancel(context.Background())
go agt.EventProcessGroutine()
return agt.startCollectors()
}
func (agt *Agent) Stop() error {
if agt.state != Running {
return WrongStateError
}
agt.state = Waiting
agt.cancel()
return agt.stopCollectors()
}
func (agt *Agent) Destory() error {
if agt.state != Running {
return WrongStateError
}
return agt.destoryCollectors()
}
func (agt *Agent) OnEvent(evt Event) error { // 事件交给调度中心,准备处理?
agt.evtBuff <- evt
}
// agent_test.go
package micro
import "errors"
import "context"
import "fmt"
import "testing"
import "time"
type DemoCollector struct {
evtReceiver EventReceiver
agtCtx context.Context
stopChan chan struct{}
name string
content string
}
func Newcollector(name string, content string) *DemoCollector { // &
return &DemoCollector{ // 直接初始化
stopChan: make(chan struct{}), // 其他属性不用赋值
name: name,
content: content,
}
}
func (c *DemoCollector) Start(agtCtx context.Context) error {
fmt.Println("start collect", c.name)
for {
select {
case <- agtCtx.Done():
c.stopChan <- struct{}{} // 初始化一个
break
default:
time.Sleep(time.Millisecond * 50)
c.evtReceiver.OnEvent(Event{c.name, c.content})
}
}
}
func (c *DemoCollector) Stop(agtCtx context.Context) error {
fmt.Println("stop collect", c.name)
select {
case <- c.stopChan:
return nil
case <-time.Sleep(time.Millisecond * 1000): // 超时控制
return errors.New("timeout when stop")
}
}
func (c *DemoCollector) Destory() error {
fmt.Println("release resource", c.name)
return nil
}
func TestAgent(t *testing.T) {
agt := NewAgent(100)
c1 := NewCollect("c1", "1")
c2 := NewCollect("c2", "2")
agt.RegisterCollector("c1",c1)
agt.RegisterCollector("c2",c2)
agt.Start()
fmt.Println(agt.Start())
time.Sleep(time.Millisecond * 1000)
agt.Stop()
agt.Destory()
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)