特别是在目前单q匹马的情况下。然后发现Beanstalkd看起来是我所需要的.
Beanstalkd 支持任务优先级 (priority),延时 (delay),超时重发 (time-to-run) 和预留 (burIEd),
同时支持binlog.最后速度还可以。
看了下源码,c语言代码量小而清晰.作者从07年维护到14年,Star也很高,质量应当是有保障的。
队列作者提供了Go客户端,从作者项目列表中可以看到已经写了不少Go相关的东西,看来Go很受后台开发的欢迎。
Beanstalkd 主页在这: http://kr.github.io/beanstalkd
写了个调用例子如下.
/* xcl (2015-8-15) 多Tubename 多消费者*/package mainimport ( "fmt" "github.com/kr/beanstalk" "runtime" "strings" "time")var ( Tubename1 string = "channel1" Tubename2 string = "channel2")func Producer(fname,tubename string) { if fname == "" || tubename == "" { return } c,err := beanstalk.Dial("tcp","127.0.0.1:11300") if err != nil { panic(err) } defer c.Close() c.Tube.name = tubename c.TubeSet.name[tubename] = true fmt.Println(fname," [Producer] tubename:",tubename," c.Tube.name:",c.Tube.name) for i := 0; i < 5; i++ { msg := fmt.Sprintf("for %s %d",i) c.Put([]byte(msg),30,120*time.Second) fmt.Println(fname," [Producer] beanstalk put body:",msg) //time.Sleep(1 * time.Second) } c.Close() fmt.Println("Producer() end.")}func Consumer(fname,"127.0.0.1:11300") if err != nil { panic(err) } defer c.Close() c.Tube.name = tubename c.TubeSet.name[tubename] = true fmt.Println(fname," [Consumer] tubename:",c.Tube.name) substr := "timeout" for { fmt.Println(fname," [Consumer]///////////////////////// ") //从队列中取出 ID,body,err := c.Reserve(1 * time.Second) if err != nil { if !strings.Contains(err.Error(),substr) { fmt.Println(fname," [Consumer] [",c.Tube.name,"] err:",err," ID:",ID) } continue } fmt.Println(fname,"] job:",ID," body:",string(body)) //从队列中清掉 err = c.Delete(ID) if err != nil { fmt.Println(fname,"] Delete err:",ID) } else { fmt.Println(fname,"] Successfully deleted. ID:",ID) } fmt.Println(fname," [Consumer]/////////////////////////") //time.Sleep(1 * time.Second) } fmt.Println("Consumer() end. ")}func main() { runtime.GOMAXPROCS(runtime.Numcpu()) go Producer("PA",Tubename1) go Producer("PB",Tubename2) go Consumer("CA",Tubename1) go Consumer("CB",Tubename2) time.Sleep(10 * time.Second)}/*运行结果:XCLdeiMac:src xcl$ clearXCLdeiMac:src xcl$ go run testmq.goCB [Consumer] tubename: channel2 c.Tube.name: channel2CA [Consumer] tubename: channel1 c.Tube.name: channel1CB [Consumer]/////////////////////////CA [Consumer]/////////////////////////PB [Producer] tubename: channel2 c.Tube.name: channel2PA [Producer] tubename: channel1 c.Tube.name: channel1PB [Producer] beanstalk put body: for channel2 0PA [Producer] beanstalk put body: for channel1 0CA [Consumer] [ channel1 ] job: 47027 body: for channel1 0CB [Consumer] [ channel2 ] job: 47026 body: for channel2 0PB [Producer] beanstalk put body: for channel2 1PA [Producer] beanstalk put body: for channel1 1CB [Consumer] [ channel2 ] Successfully deleted. ID: 47026CB [Consumer]/////////////////////////CB [Consumer]/////////////////////////CA [Consumer] [ channel1 ] Successfully deleted. ID: 47027CA [Consumer]/////////////////////////CA [Consumer]/////////////////////////CA [Consumer] [ channel1 ] job: 47028 body: for channel1 1PA [Producer] beanstalk put body: for channel1 2CB [Consumer] [ channel2 ] job: 47029 body: for channel2 1PB [Producer] beanstalk put body: for channel2 2PA [Producer] beanstalk put body: for channel1 3CA [Consumer] [ channel1 ] Successfully deleted. ID: 47028CA [Consumer]/////////////////////////CA [Consumer]/////////////////////////CB [Consumer] [ channel2 ] Successfully deleted. ID: 47029PB [Producer] beanstalk put body: for channel2 3CB [Consumer]/////////////////////////CB [Consumer]/////////////////////////PB [Producer] beanstalk put body: for channel2 4CB [Consumer] [ channel2 ] job: 47030 body: for channel2 2CA [Consumer] [ channel1 ] job: 47031 body: for channel1 2PA [Producer] beanstalk put body: for channel1 4Producer() end.Producer() end.CA [Consumer] [ channel1 ] Successfully deleted. ID: 47031CA [Consumer]/////////////////////////CA [Consumer]/////////////////////////CB [Consumer] [ channel2 ] Successfully deleted. ID: 47030CB [Consumer]/////////////////////////CB [Consumer]/////////////////////////CB [Consumer] [ channel2 ] job: 47033 body: for channel2 3CA [Consumer] [ channel1 ] job: 47032 body: for channel1 3CB [Consumer] [ channel2 ] Successfully deleted. ID: 47033CA [Consumer] [ channel1 ] Successfully deleted. ID: 47032CB [Consumer]/////////////////////////CB [Consumer]/////////////////////////CA [Consumer]/////////////////////////CA [Consumer]/////////////////////////CA [Consumer] [ channel1 ] job: 47034 body: for channel1 4CB [Consumer] [ channel2 ] job: 47035 body: for channel2 4CB [Consumer] [ channel2 ] Successfully deleted. ID: 47035CB [Consumer]/////////////////////////CA [Consumer] [ channel1 ] Successfully deleted. ID: 47034CB [Consumer]/////////////////////////XCLdeiMac:src xcl$*/可用beanstool来查看队列状态
也可以参考我写下面两段,来查。
ar,er := c.ListTubes()if er != nil { fmt.Println("[Example] er:",er)} else { for i,v := range ar { fmt.Println("[Example] ListTubes i:",i," v:",v) c.Tube.name = v ID,err := c.Reserve(5 * time.Second) if err != nil { fmt.Println("[Example] err:"," name:",c.Tube.name) continue } else { fmt.Println("[Example] job:",ID) fmt.Println("[Example] body:",string(body)) } }}func tubeStatus(c *beanstalk.Conn) { fmt.Println("[tubeStatus]/////////////////////////") fmt.Println("Tube(",") Stats:") m,er := c.Tube.Stats() if er != nil { fmt.Println("[tubeStatus] err:",er) } else { for k,v := range m { fmt.Println(k," : ",v) } } fmt.Println("[tubeStatus]/////////////////////////")}从测试看,Beanstalkd 足以满足我现在的需求了.
BLOG: http://blog.csdn.net/xcl168
总结以上是内存溢出为你收集整理的Beanstalkd的使用(Golang)全部内容,希望文章能够帮你解决Beanstalkd的使用(Golang)所遇到的程序开发问题。
如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)