【Go语言实战】 (11) go-micro微服务 实现简单备忘录 (下) | 备忘录模块

【Go语言实战】 (11) go-micro微服务 实现简单备忘录 (下) | 备忘录模块,第1张

文章目录 写在前面源码地址1. RabbitMQ创建备忘录1.1 导入配置1.2 proto1.2.1 taskModels.proto1.2.2 taskService.proto 1.3 写入数据1.4 读取数据 2. 备忘录其他 *** 作2.1 获取备忘录列表2.2 获取备忘录详情2.3 更新备忘录2.4 注册到etcd中 3. 接入网关3.1 接入路由3.2 编写接口(创建备忘录为例子)3.3 测试

写在前面

这一章节我们继续前一章的内容,将备忘录模块完善,我们将使用RabbitMQ作为消息队列去创建备忘录

源码地址

https://github.com/CocaineCong/micro-todoList

1. RabbitMQ创建备忘录 1.1 导入配置

导入配置

[rabbitmq]
RabbitMQ = amqp
RabbitMQUser = guest
RabbitMQPassWord = guest
RabbitMQHost = localhost
RabbitMQPort = 5672

加载配置

func LoadRabbitMQ(file *ini.File) {
	RabbitMQ = file.Section("rabbitmq").Key("RabbitMQ").String()
	RabbitMQUser = file.Section("rabbitmq").Key("RabbitMQUser").String()
	RabbitMQPassWord = file.Section("rabbitmq").Key("RabbitMQPassWord").String()
	RabbitMQHost = file.Section("rabbitmq").Key("RabbitMQHost").String()
	RabbitMQPort = file.Section("rabbitmq").Key("RabbitMQPort").String()
}

连接RabbitMQ

// MQ rabbitMQ链接单例
var MQ *amqp.Connection

// 初始化rabbitMQ链接
func RabbitMQ(connString string) {
	conn, err := amqp.Dial(connString)
	if err != nil {
		panic(err)
	}
	MQ = conn
}
1.2 proto task/services/protos 1.2.1 taskModels.proto

定义了task的proto模型

syntax="proto3";
package services;
option go_package ="./;protos";

message TaskModel{
    //@inject_tag: json:"Id" form:"Id"
    uint64 Id = 1;
    //@inject_tag: json:"Uid" form:"Uid"
    uint64 Uid = 2;
    //@inject_tag: json:"Title" form:"Title"
    string Title = 3;
    //@inject_tag: json:"Content" form:"Content"
    string Content = 4;
    //@inject_tag: json:"StartTime" form:"StartTime"
    int64 StartTime = 5;
    //@inject_tag: json:"EndTime" form:"EndTime"
    int64 EndTime = 6;
    //@inject_tag: json:"Status" form:"Status"
    int64 Status = 7;
    //@inject_tag: json:"CreateTime" form:"CreateTime"
    int64 CreateTime = 8;
    //@inject_tag: json:"UpdateTime" form:"UpdateTime"
    int64 UpdateTime = 9;
}

执行protoc生成pb文件

protoc --proto_path=. --micro_out=. --go_out=. taskModels.proto
1.2.2 taskService.proto

定义了taskRequest,task的请求参数。
定义了TaskListResponse,task列表的响应参数。
定义了TaskDetailResponse,task列表的详细信息。
定义了TaskService,都是定义一些增删改查的服务。

syntax="proto3";
package services;
import "taskModels.proto";
option go_package = "./;protos";

message TaskRequest{
    //@inject_tag: json:"Id" form:"Id"
    uint64 Id = 1;
    //@inject_tag: json:"Uid" form:"Uid"
    uint64 Uid = 2;
    //@inject_tag: json:"Title" form:"Title"
    string Title = 3;
    //@inject_tag: json:"Content" form:"Content"
    string Content = 4;
    //@inject_tag: json:"StartTime" form:"StartTime"
    int64 StartTime = 5;
    //@inject_tag: json:"EndTime" form:"EndTime"
    int64 EndTime = 6;
    //@inject_tag: json:"Status" form:"Status"
    int64 Status = 7;
    // @inject_tag: json:"Start" form:"Start" uri:"Start"
    uint32 Start = 8;
    // @inject_tag: json:"Limit" form:"Limit" uri:"Limit"
    uint32 Limit = 9;
}

message TaskListResponse{
  repeated TaskModel TaskList=1;
  // @inject_tag: json:"Count"
  uint32 Count=2;
}

message TaskDetailResponse{
  TaskModel TaskDetail=1;
}

service TaskService{
  rpc CreateTask(TaskRequest) returns(TaskDetailResponse);
  rpc GetTasksList(TaskRequest) returns(TaskListResponse);
  rpc GetTask(TaskRequest) returns(TaskDetailResponse);
  rpc UpdateTask(TaskRequest) returns(TaskDetailResponse);
  rpc DeleteTask(TaskRequest) returns(TaskDetailResponse);
}

执行protoc生成pb文件

protoc --proto_path=. --micro_out=. --go_out=. taskService.proto

1.3 写入数据 task/core/taskService.go

我们在这个go文件中将数据写入RabbitMQ当中。

连接通道
	ch, err := model.MQ.Channel()
	if err != nil {
		err = errors.New("rabbitMQ err:" + err.Error())
		return err
	}
声明通道队列
	q, err := ch.QueueDeclare("task_queue", true, false, false, false, nil)
	if err != nil {
		err = errors.New("rabbitMQ err:" + err.Error())
		return err
	}
将请求的参数序列化,发布到队列中
	body, _ := json.Marshal(req)
	err = ch.Publish("", q.Name, false, false, amqp.Publishing{
		DeliveryMode: amqp.Persistent,
		ContentType:  "application/json",
		Body:         body,
	})
1.4 读取数据 mq-server/services/task.go

从RabbitMQ中接收数据信息再写入数据库中

打开Channel
ch, err := model.MQ.Channel()
task_queue通道中获取消息
	q, err := ch.QueueDeclare("task_queue", true, false, false, false, nil)
	if err != nil {
		panic(err)
	}

name:队列名称;
durable:是否持久化,队列存盘,true服务重启后信息不会丢失,影响性能;
autoDelete:是否自动删除;
noWait:是否非阻塞,true为是,不等待RMQ返回信息;
args:参数,传nil即可;
exclusive:是否设置排他

消息ACK保证了消息不会丢失,但是当rabbitMQ Server停止(不是consumer 挂掉)的时候,我们的所有消息都会丢失。针对这种情况,我们先确保消息队列的持久化,设置消息队列的durable选项为true

公平分派消息
	err = ch.Qos(1, 0, false)
	if err != nil {
		panic(err)
	}

设置Qos,设置预取大小prefetch,当prefetch=1时,表示在没收到consumer的ACK消息之前,只会为其consumer分派一个消息。

读出数据
	msgs, err := ch.Consume(q.Name, "", false, false, false, false, nil)
从通道中读出数据

将通道的信息,反系列化,然后在数据库中创建。

	go func() {
		for d := range msgs {
			var p model.Task
			err := json.Unmarshal(d.Body, &p)
			if err != nil {
				panic(err)
			}
			fmt.Println("d.Body",string(d.Body))
			model.DB.Create(&p)
			log.Printf("Done")
			_ = d.Ack(false) // 确认消息,必须为false
		}
	}()
2. 备忘录其他 *** 作

构造一个服务

type TaskService struct {

}
2.1 获取备忘录列表

传入的参数:上下文信息,请求参数,响应参数。

func (*TaskService) GetTasksList(ctx context.Context, req *services.TaskRequest, res *services.TaskListResponse) error {
	if req.Limit == 0 {
		req.Limit = 6
	}
	//在数据库查找值
	var productData []model.Task
	var count uint32
	err := model.DB.Offset(req.Start).Limit(req.Limit).Where("uid=?", req.Uid).Find(&productData).Error
	if err != nil {
		err = errors.New("mysql err:" + err.Error())
		return err
	}
	err = model.DB.Model(&model.Task{}).Where("uid=?", req.Uid).Count(&count).Error
	if err != nil {
		err = errors.New("mysql err:" + err.Error())
		return err
	}

	//序类化备忘录列表
	var taskRes []*services.TaskModel
	for _, item := range productData {
		taskRes = append(taskRes, BuildTask(item))
	}
	//序列化后的结果赋给response
	res.TaskList = taskRes
	res.Count = count
	return nil
}
2.2 获取备忘录详情
func (*TaskService) GetTask(ctx context.Context, req *services.TaskRequest, res *services.TaskDetailResponse) error {
	//在数据库查找值
	productData := model.Task{}
	err := model.DB.First(&productData, req.Id).Error
	if err != nil {
		err = errors.New("mysql err:" + err.Error())
		return err
	}
	//序类化商品
	productRes := BuildTask(productData)
	//序列化后的结果赋给response
	res.TaskDetail = productRes
	return nil
}
2.3 更新备忘录
func (*TaskService) UpdateTask(ctx context.Context, req *services.TaskRequest, res *services.TaskDetailResponse) error {
	//在数据库查找值
	taskData := model.Task{}
	err := model.DB.Model(model.Task{}).Where("id = ? AND uid = ?",req.Id,req.Uid).First(&taskData).Error
	if err != nil {
		err = errors.New("mysql err:" + err.Error())
		return err
	}
	//将要更新的数据赋值给结构体
	taskData.Title = req.Title
	taskData.Status = int(req.Status)
	taskData.Content = req.Content
	//update
	err = model.DB.Save(&taskData).Error
	if err != nil {
		err = errors.New("mysql err:" + err.Error())
		return err
	}
	//序列化后的结果赋给response
	res.TaskDetail = BuildTask(taskData)
	return nil
}
2.4 注册到etcd中 注册etcd
	etcdReg := etcd.NewRegistry(
		registry.Addrs("127.0.0.1:2379"),
	)
得到微服务实例
	// 1. 得到微服务实例
	microService := micro.NewService(
		micro.Name("rpcTaskService"), // 设置微服务名字,用来访问的
		micro.Address("127.0.0.1:8083"),
		micro.Registry(etcdReg),
	)
初始化
	microService.Init()
服务注册
将用户服务注册到etcd中
	_ = services.RegisterTaskServiceHandler(microService.Server(), new(core.TaskService))
启动微服务
	_ = microService.Run()

查看etcd中http://localhost:8080/etcdkeeper/是否有该模块的注册信息

3. 接入网关 3.1 接入路由 api-gateway/weblib/handlers
	//备忘录服务
	authed.GET("tasks", handlers.GetTaskList)
	authed.POST("task", handlers.CreateTask)
	authed.GET("task/:id", handlers.GetTaskDetail)
	authed.DELETE("task/:id", handlers.DeleteTask)
	authed.PUT("task/:id", handlers.UpdateTask)
3.2 编写接口(创建备忘录为例子)

注意这是一个多用户的备忘录,所以我们要确保的是创建到该用户的管理下的备忘录中。

所以我们就需要用到用户的id,所以就从Authorization中取出来。

func CreateTask(ginCtx *gin.Context) {
	var taskReq services.TaskRequest
	PanicIfTaskError(ginCtx.Bind(&taskReq))
	//从gin.keys取出服务实例
	claim,_ := util.ParseToken(ginCtx.GetHeader("Authorization"))
	taskReq.Uid = uint64(claim.Id)
	taskService := ginCtx.Keys["taskService"].(services.TaskService)
	taskRes, err := taskService.CreateTask(context.Background(), &taskReq)
	PanicIfTaskError(err)
	ginCtx.JSON(200, gin.H{"data": taskRes.TaskDetail})
}
3.3 测试 创建备忘录


展示用户备忘录

修改备忘录


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/995149.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存