Consumer group主要处理协调消费的问题。
为了应对变化,消费时按照时间段被切分成不同的generation,在同一时刻所有的group中的clients对应同一个generation,同一时刻只会有一个generation,新的generation需要所有旧generation产生的goroutine结束后才会创建(后面代码分析会做解释)。使用generation的另一个好处是防止旧generation拉取或者提交的数据,保证数据的一致性。
理论上讲,当group中的组员发生变化、监听的topics发生变化、同一topic中的partition数量发生变化时,会生成新的generation,重新分配组员处理的partitions。从代码上看,触发更新generation的 *** 作可以是由客户端发起,也可以由服务端发起(个人觉得只由服务端发起应该更合理一些)。
当新一轮generation开始时,分为两个步骤,join *** 作和sync *** 作。Join是所有的group client均向coordinator发出请求,表示新generation的选举开始。Coordinator会指定一个client作为这一轮generation的leader,同时将消费topic的基础信息发送给她。Coordinator在接受到join请求后,并不会立即返回,它会等待一段时间,以便接收到该group下所有client发出的join请求,这么做的目的是需要返回给leader这次所有client的信息和分配策略,以便其做分配。Client发出join请求时,还需要带上支持的balancer协议,coordinator会选择一个所有clients均支持的协议返回给leader。做sync *** 作时,leader会把分配方案一起发送给coordinator,coordinator会将每个client需要处理的partition返回给client。与join一致,coordinator接收到sync请求时也不会立即返回,因为它要等待leader的分配方案到达。
代码这部分的代码主要有两个核心类型,Consumer Group和Generation,我们逐一分析。
Consumer GroupConsumerGroup的主要成员是Generation,这个类型的主要作用是提供在其类型上的方法,包括创建和离开generation。在创建过程中,又分为join *** 作和sync *** 作。因为在这些步骤中,均不需要提供和generation相关的信息,可以好好体会一下它与Generation的分工。
类型和配置参数
type ConsumerGroup struct {
config ConsumerGroupConfig
next chan *Generation
errs chan error
closeOnce sync.Once
wg sync.WaitGroup
done chan struct{} // 表示结束的channel
}
type ConsumerGroupConfig struct {
// consumer group ID.根据它定位到coordinator所在的broker,不能为空
ID string
Brokers []string
Dialer *Dialer // 前面章节介绍过
Topics []string
// rebalance时的分配策略
GroupBalancers []GroupBalancer
// 向coordinator发送心跳的间隔时间,失败时表示当前generation关闭,需要生成新的generation
// Default: 3s
HeartbeatInterval time.Duration
// 确认topic对应的partition数目发生变化的间隔时间,partition数目变化时表示当前generation关闭,需要生成新的generation
// Default: 5s
PartitionWatchInterval time.Duration
// 开关,控制是否监控partition数目发生变化
WatchPartitionChanges bool
// 如果coordinator未在该时间内接收到client发出的心跳,则可认为该client已经处于失效的状态,同时触发rebalance
// Default: 30s
SessionTimeout time.Duration
// 等待所有client join的时间长度
// Default: 30s
RebalanceTimeout time.Duration
// 当join请求失败后,下一次re-join请求的时间间隔
// Default: 5s
JoinGroupBackoff time.Duration
// Consumer Group在brokers中的存留时间
RetentionTime time.Duration
// 当partition未有过commit记录时采用的消费策略
// FirstOffset or LastOffset.
// Default: FirstOffset
StartOffset int64
}
核心方法
1 获取新的generation
func (cg *ConsumerGroup) Next(ctx context.Context) (*Generation, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-cg.done:
return nil, ErrGroupClosed
case err := <-cg.errs:
return nil, err
case next := <-cg.next:
return next, nil
}
}
2 产生generation的后台goroutine
func (cg *ConsumerGroup) run() {
var memberID string
var err error
for { // 循环,这样当上一个generation close后可以生成新的
memberID, err = cg.nextGeneration(memberID) // 生成新的generation
var backoff <-chan time.Time
switch err {
case nil: // no error,之前的generation正常退出
continue
case ErrGroupClosed: // cg 关闭
_ = cg.leaveGroup(memberID) // 通知coordinator
return
case RebalanceInProgress: // 服务端逻辑
default: // 其它错误
_ = cg.leaveGroup(memberID)
memberID = ""
backoff = time.After(cg.config.JoinGroupBackoff)
}
select {
case <-cg.done:
return
case cg.errs <- err:
}
if backoff != nil {
select {
case <-cg.done:
return
case <-backoff:
}
}
}
}
2.1 生成新的generation
func (cg *ConsumerGroup) nextGeneration(memberID string) (string, error) {
// 连接coordinator
conn, err := cg.coordinator()
if err != nil {
。。。// 日志
}
defer conn.Close()
var generationID int32
var groupAssignments GroupMemberAssignments
var assignments map[string][]int32
memberID, generationID, groupAssignments, err = cg.joinGroup(conn, memberID) // join *** 作,如果是leader则分配任务
if err != nil {
。。。 // 日志
}
// sync group
assignments, err = cg.syncGroup(conn, memberID, generationID, groupAssignments) // sync *** 作
if err != nil {
。。。//日志
}
// 获取本client需要处理partitions的offsets.
var offsets map[string]map[int]int64
offsets, err = cg.fetchOffsets(conn, assignments)
if err != nil {
。。。//日志
return memberID, err
}
// 生成新的generation
gen := Generation{
ID: generationID,
GroupID: cg.config.ID,
MemberID: memberID,
Assignments: cg.makeAssignments(assignments, offsets),
conn: conn,
done: make(chan struct{}),
joined: make(chan struct{}),
retentionMillis: int64(cg.config.RetentionTime / time.Millisecond),
log: cg.withLogger,
logError: cg.withErrorLogger,
}
// 开启本generation的多个goroutine,一旦其中一个goroutine返回,表示本次generation结束
gen.heartbeatLoop(cg.config.HeartbeatInterval) // 心跳
if cg.config.WatchPartitionChanges {
for _, topic := range cg.config.Topics {
gen.partitionWatcher(cg.config.PartitionWatchInterval, topic) // 监控topic partitions的变化
}
}
select {
case <-cg.done:
gen.close()
return memberID, ErrGroupClosed // ErrGroupClosed will trigger leave logic.
case cg.next <- &gen: // 将新生成的generation放入channel中
}
select {
case <-cg.done:
gen.close()
return memberID, ErrGroupClosed // ErrGroupClosed will trigger leave logic.
case <-gen.done: // 等待本次generation结束
gen.close()
return memberID, nil
}
}
2.2 join *** 作
func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, error) {
request, err := cg.makeJoinGroupRequestV1(memberID) // 拼request
response, err := conn.joinGroup(request) // 向coordinator发出请求
memberID = response.MemberID // 获取member id
generationID := response.GenerationID // 获取generation id
var assignments GroupMemberAssignments
if iAmLeader := response.MemberID == response.LeaderID; iAmLeader { // 被coordinator指定为本次generation的leader
v, err := cg.assignTopicPartitions(conn, response) // 分配工作
assignments = v
}
return memberID, generationID, assignments, nil
}
2.3 sync *** 作
func (cg *ConsumerGroup) syncGroup(conn coordinator, memberID string, generationID int32, memberAssignments GroupMemberAssignments) (map[string][]int32, error) {
request := cg.makeSyncGroupRequestV0(memberID, generationID, memberAssignments) // 拼请求
response, err := conn.syncGroup(request) // 向coordinator发出sync请求
assignments := groupAssignment{}
reader := bufio.NewReader(bytes.NewReader(response.MemberAssignments))
if _, err := (&assignments).readFrom(reader, len(response.MemberAssignments)); err != nil { // 读取本client需要处理的partitions
return nil, err
}
return assignments.Topics, nil
}
2.4 离开本generation
func (cg *ConsumerGroup) leaveGroup(memberID string) error {
if memberID == "" {
return nil
}
// 重新连接,因为存在因连接问题造成的generation失效的情况,重新连接可以防止这种情况
coordinator, err := cg.coordinator()
_, err = coordinator.leaveGroup(leaveGroupRequestV0{ // 向coordinator发出离开的请求
GroupID: cg.config.ID,
MemberID: memberID,
})
_ = coordinator.Close() // 清理工作
return err
}
Generation
commit offset,heartbeat以及观察partitions数量的变化均同generation相关。试想一旦generation发生变化,则commit offset一定是不会被coordinator所接受的。
类型和配置type Generation struct {
ID int32
GroupID string // customer group id
MemberID string // 由coordinator分配
Assignments map[string][]PartitionAssignment // 被分配的任务
conn coordinator
lock sync.Mutex
done chan struct{} // 本次generation是否需要关闭
closed bool
routines int // 开启goroutine的个数
joined chan struct{} // 是否所有开启的goroutine都退出
}
核心方法
1 结束一轮generation
func (g *Generation) close() {
g.lock.Lock()
if !g.closed {
close(g.done) // 关闭channel
g.closed = true
}
r := g.routines
g.lock.Unlock()
// 如果还有这轮活动开启的goroutine,等待结束
if r > 0 {
<-g.joined
}
}
2 generation开启goroutine的方法
func (g *Generation) Start(fn func(ctx context.Context)) {
g.lock.Lock()
defer g.lock.Unlock()
if g.closed { // 没看懂。。。
go fn(genCtx{g})
return
}
// 计数加1
g.routines++
go func() {
fn(genCtx{g}) // 运行函数
g.lock.Lock()
if !g.closed {
close(g.done) // 关闭channel,这样别的goroutine也能感知到
g.closed = true
}
g.routines-- // 计数减1
if g.routines == 0 {
close(g.joined) // 通知close函数退出
}
g.lock.Unlock()
}()
}
3 本轮generation的commit offset
func (g *Generation) CommitOffsets(offsets map[string]map[int]int64) error {
if len(offsets) == 0 {
return nil
}
// 按照请求格式的需要构造参数
topics := make([]offsetCommitRequestV2Topic, 0, len(offsets))
for topic, partitions := range offsets {
t := offsetCommitRequestV2Topic{Topic: topic}
for partition, offset := range partitions {
t.Partitions = append(t.Partitions, offsetCommitRequestV2Partition{
Partition: int32(partition),
Offset: offset,
})
}
topics = append(topics, t)
}
request := offsetCommitRequestV2{ // 带上generation的参数,coordinator会做验证
GroupID: g.GroupID,
GenerationID: g.ID,
MemberID: g.MemberID,
RetentionTime: g.retentionMillis,
Topics: topics,
}
_, err := g.conn.offsetCommit(request) // 向coordinator发送commit请求
if err == nil {
。。。 // 日志
}
return err
}
4 发送心跳
func (g *Generation) heartbeatLoop(interval time.Duration) {
g.Start(func(ctx context.Context) {
ticker := time.NewTicker(interval) // 设置ticker
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
_, err := g.conn.heartbeat(heartbeatRequestV0{ // 发送心跳请求,带上本轮generation的信息
GroupID: g.GroupID,
GenerationID: g.ID,
MemberID: g.MemberID,
})
if err != nil { // 退出,进入下一轮generation
return
}
}
}
})
}
5 观察partition的变化
func (g *Generation) partitionWatcher(interval time.Duration, topic string) {
g.Start(func(ctx context.Context) {
ticker := time.NewTicker(interval) // 设置ticker
defer ticker.Stop()
ops, err := g.conn.readPartitions(topic) // topic下的partitions信息
oParts := len(ops)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
ops, err := g.conn.readPartitions(topic)
switch err {
case nil, UnknownTopicOrPartition:
if len(ops) != oParts { // partition发生变化,退出进入下一轮generation
return
}
default:
return
}
}
}
})
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)