Go——常见模式

Go——常见模式,第1张

这里写目录标题 软件架构pipe-filtermicro kernel

软件架构 常用的架构模式,针对不同应用场景 pipe-filter 模式特点
举个简单的例子
// filter.go
package pipefilter

// 过滤器的输入
type Request interface{}

// 过滤器的输出
type Response interface{}

type Filter interface{
  Process(data Request) (Response, error) // 方法:方法名,参数,返回值
}
// split_filter.go
package pipefilter

import "strings"
import "errors"
import "fmt"


type SplitFilter struct {
  delimiter string  // 分隔符
}

var SplitFilterWrongFormatError = errors.New("input data is not string")

func (sf *SplitFilter) Process(data Request) (Response, error) {
  str, ok := data.(string)    // 判断类型
  if !ok {
    return nil, SplitFilterWrongFormatError
  }
  parts := strings.Split(str, sf.delimiter)
  fmt.Println("after split: ", parts)
  return parts, nil
}

func NewSplitFilter(delimiter string) *SplitFilter {
  return &SplitFilter{delimiter}  // 初始化对象
}
// to_int_filter.go
package pipefilter

import "errors"
import "strconv"
import "fmt"

var ToIntFilterWrongFormatError = errors.New("input data should be []string")

type ToIntFilter struct {
}

func (toint *ToIntFilter) Process(data Request)  (Response, error) {
  strs, ok := data.([]string)    // 判断类型
  if !ok {
    return nil, ToIntFilterWrongFormatError
  }
  ret := []int{}
  for _, str := range strs {  // 第二个是元素值
    s, err := strconv.Atoi(str)
    if err != nil {
      return nil, err
    }
    ret = append(ret, s)
  }
  fmt.Println("after to int ",ret)
  return ret, nil // 返回int化切片
}

func NewToIntFilter() *ToIntFilter {
  return &ToIntFilter{}
}
// sum_filter.go
package pipefilter

import "errors"
import "fmt"

type SumFilter struct {
}

var SumFilterWrongFormatError = errors.New("sum is not right")

func (toint *SumFilter) Process(data Request)  (Response, error) {
  i, ok := data.([]int)    // 判断类型
  if !ok {
    return nil, SumFilterWrongFormatError
  }
  ret := 0
  for _, elem := range i {  // _ 是索引
    ret += elem
  }
  fmt.Println("after sum: ",ret)
  return ret, nil // 返回求和
}

func NewSumFilter() *SumFilter {
  return &SumFilter{}
}
// stright_pipeline.go
package pipefilter

import "fmt"

type StrightPipeline struct {
  Name string
  Filters *[]Filter
}

func NewStrightPipeline(name string, filters ...Filter) *StrightPipeline {
  return &StrightPipeline {
    Name: name, // pipeline 名称
    Filters: &filters,
  }
}

func (sp *StrightPipeline) Process(data Request)  (Response, error) {
  // var ret interface{}
  var err error
  // 逐个执行filter
  for _, filter := range *sp.Filters {	// 不定长参数,传指针
    ret, err := filter.Process(data)
    fmt.Println(ret)
    if err != nil {
      return nil, err
    }
    data = ret  // data,作为下个filter输入
  }
  return data, err // 返回求和
}
// filter_test.go
package pipefilter

import "testing"

func TestPipelineFilter(t *testing.T) {
  spliter := NewSplitFilter(",")  // delimiter
  convert := NewToIntFilter()
  sum := NewSumFilter()
  sp := NewStrightPipeline("p1", spliter, convert, sum)
  ret, err := sp.Process("1,2,3,4,5")
  t.Logf("%T", ret)
  if err != nil {
    t.Fatal(err)  // 结束
  }
  if ret != 15 {
    t.Fatalf("Sum is not right! expected is 15, actual is %d", ret)
  }
}
从上面的例子就可以理解:只跟数据格式耦合 micro kernel 特点:易于扩展,错误隔离,保持架构一致性代码给哥整懵了
// agent.go
package micro

type Event struct {
  name        string
  content     string
}

type EventReceiver interface {  // 事件收集器
  OnEvent(e Event)
}

type Collector interface {  // 要集成的插件 plugin
  Init(evt EventReceiver) error
  Start(agtCtx context.Context) error  // context 方便停止协程
  Stop()  error
  Destory() error
}

type Agent struct { // micro kernel 会集成一些Collector
  collectors map[string]Collector
  evtBuff    chan Event
  cancel     context.CancelFunc
  ctx        context.Context
  state      int
}

func (agt *Agent) EventProcessGroutine() {
  var evtSeg [10]Event
  for {
    for i:=0; i<10; i++ {
      select {
      case evtSeg[i] = <-agt.evtBuff:  // chan
      case <-agt.ctx.Done():
        return
      }
    }
  }
}

func NewAgent(sizeEvtBuf int) *Agent {
  agt := Agent {
    collectors: map[string]Collector{},
    evtBuff:    make(chan Event, sizeEvtBuf),
    state:      Waiting,
  }
  return &agt // 取址,返回的是个指针
}

var WrongStateError = errors.New("wrong state, the collector was already reigstered")

func (agt *Agent) RegisterCollector(name string, collector Collector) error {
  if agt.state != Waiting {
    return WrongStateError
  }
  agt.collectors[name] = collector
  return collector.Init(agt)  // 初始化插件
}

func (agt *Agent) startCollectors() error {
  var err error
  var errs CollectorsError
  var mutex sync.Mutex
  for name, collector := range agt.collectors {
    go func(name string, collector Collector, ctx context.Context) {
      defer func() {
        mutex.Unlock()
      }() // 别忘了调用
      err = collector.Start(ctx)
      mutex.Lock()
      if err != nil {
        errs.CollectorErros = append(errs.CollectorErrors, errors.New(name + ":" + err.Error()))
      }
    }(name, collector, agt.ctx)
  }
  return errs
}

func (agt *Agent) stopCollectors() error {
  var err error
  var errs CollectorsError
  for name, collector := range agt.collectors {
    if err = collector.Stop(); err != nil {
      errs.CollectorsErrors = append(errs.CollectorErrors, errors.New(name + ":" + err.Error()))
    }
  }
  return errs
}

func (agt *Agent) destoryCollectors() error {
  var err error
  var errs CollectorsError
  for name, collector := range agt.collectors {
    if err = collector.Destory(); err != nil {
      errs.CollectorsErrors = append(errs.CollectorErrors, errors.New(name + ":" + err.Error()))
    }
  }
  return errs
}

func (agt *Agent) Start() error { // 只是从agent层面管理的,所以调用的还是collector自身的方法
  if agt.state != Waiting {
    return WrongStateError
  }
  agt.state = Running
  agt.ctx, agt.cancel = context.WithCancel(context.Background())
  go agt.EventProcessGroutine()
  return agt.startCollectors()
}

func (agt *Agent) Stop() error {
  if agt.state != Running {
    return WrongStateError
  }
  agt.state = Waiting
  agt.cancel()
  return agt.stopCollectors()
}

func (agt *Agent) Destory() error {
  if agt.state != Running {
    return WrongStateError
  }
  return agt.destoryCollectors()
}

func (agt *Agent) OnEvent(evt Event) error {  // 事件交给调度中心,准备处理?
  agt.evtBuff <- evt
}
// agent_test.go
package micro

import "errors"
import "context"
import "fmt"
import "testing"
import "time"

type DemoCollector struct {
  evtReceiver EventReceiver
  agtCtx      context.Context
  stopChan    chan struct{}
  name        string
  content     string
}

func Newcollector(name string, content string) *DemoCollector { // &
  return &DemoCollector{  // 直接初始化
    stopChan: make(chan struct{}), // 其他属性不用赋值
    name: name,
    content: content,
  }
}

func (c *DemoCollector) Start(agtCtx context.Context) error {
  fmt.Println("start collect", c.name)
  for {
    select {
    case <- agtCtx.Done():
      c.stopChan <- struct{}{}  // 初始化一个
      break
    default:
      time.Sleep(time.Millisecond * 50)
      c.evtReceiver.OnEvent(Event{c.name, c.content})
    }
  }
}

func (c *DemoCollector) Stop(agtCtx context.Context) error {
  fmt.Println("stop collect", c.name)
  select {
  case <- c.stopChan:
    return nil
  case <-time.Sleep(time.Millisecond * 1000): // 超时控制
    return errors.New("timeout when stop")
  }
}

func (c *DemoCollector) Destory() error {
  fmt.Println("release resource", c.name)
  return nil
}

func TestAgent(t *testing.T) {
  agt := NewAgent(100)
  c1 := NewCollect("c1", "1")
  c2 := NewCollect("c2", "2")
  agt.RegisterCollector("c1",c1)
  agt.RegisterCollector("c2",c2)
  agt.Start()
  fmt.Println(agt.Start())
  time.Sleep(time.Millisecond * 1000)
  agt.Stop()
  agt.Destory()
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/995938.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存