目录
摘要:
raft代码地址:
客户端向leader写数据:
raft中leader处理:
ApplyLog:
leader的处理循环:
leader的dispatch处理:
主从复制:
startStopReplication
replicate
replicateTo
摘要:
raft写数据流程追踪记录, 以github.com/hashicorp/raft为例
raft代码地址:GitHub - hashicorp/raft: Golang implementation of the Raft consensus protocol
https://github.com/Jille/raft-grpc-example
客户端向leader写数据:package main
import (
"context"
"fmt"
"io"
"io/ioutil"
"strings"
"sync"
"time"
pb "github.com/Jille/raft-grpc-example/proto"
"github.com/Jille/raft-grpc-leader-rpc/rafterrors"
"github.com/hashicorp/raft"
)
// wordTracker keeps track of the three longest words it ever saw.
type wordTracker struct {
mtx sync.RWMutex
words [3]string
}
var _ raft.FSM = &wordTracker{}
// compareWords returns true if a is longer (lexicography breaking ties).
func compareWords(a, b string) bool {
if len(a) == len(b) {
return a < b
}
return len(a) > len(b)
}
func cloneWords(words [3]string) []string {
var ret [3]string
copy(ret[:], words[:])
return ret[:]
}
func (f *wordTracker) Apply(l *raft.Log) interface{} {
f.mtx.Lock()
defer f.mtx.Unlock()
w := string(l.Data)
for i := 0; i < len(f.words); i++ {
if compareWords(w, f.words[i]) {
copy(f.words[i+1:], f.words[i:])
f.words[i] = w
break
}
}
return nil
}
func (f *wordTracker) Snapshot() (raft.FSMSnapshot, error) {
// Make sure that any future calls to f.Apply() don't change the snapshot.
return &snapshot{cloneWords(f.words)}, nil
}
func (f *wordTracker) Restore(r io.ReadCloser) error {
b, err := ioutil.ReadAll(r)
if err != nil {
return err
}
words := strings.Split(string(b), "\n")
copy(f.words[:], words)
return nil
}
type snapshot struct {
words []string
}
func (s *snapshot) Persist(sink raft.SnapshotSink) error {
_, err := sink.Write([]byte(strings.Join(s.words, "\n")))
if err != nil {
sink.Cancel()
return fmt.Errorf("sink.Write(): %v", err)
}
return sink.Close()
}
func (s *snapshot) Release() {
}
type rpcInterface struct {
wordTracker *wordTracker
raft *raft.Raft
}
func (r rpcInterface) AddWord(ctx context.Context, req *pb.AddWordRequest) (*pb.AddWordResponse, error) {
f := r.raft.Apply([]byte(req.GetWord()), time.Second)
if err := f.Error(); err != nil {
return nil, rafterrors.MarkRetriable(err)
}
return &pb.AddWordResponse{
CommitIndex: f.Index(),
}, nil
}
func (r rpcInterface) GetWords(ctx context.Context, req *pb.GetWordsRequest) (*pb.GetWordsResponse, error) {
r.wordTracker.mtx.RLock()
defer r.wordTracker.mtx.RUnlock()
return &pb.GetWordsResponse{
BestWords: cloneWords(r.wordTracker.words),
ReadAtIndex: r.raft.AppliedIndex(),
}, nil
}
核心为:
func (r rpcInterface) AddWord(ctx context.Context, req *pb.AddWordRequest) (*pb.AddWordResponse, error) {
f := r.raft.Apply([]byte(req.GetWord()), time.Second)
if err := f.Error(); err != nil {
return nil, rafterrors.MarkRetriable(err)
}
return &pb.AddWordResponse{
CommitIndex: f.Index(),
}, nil
}
raft中leader处理:
ApplyLog:
发送写日志信号:
// Apply is used to apply a command to the FSM in a highly consistent
// manner. This returns a future that can be used to wait on the application.
// An optional timeout can be provided to limit the amount of time we wait
// for the command to be started. This must be run on the leader or it
// will fail.
func (r *Raft) Apply(cmd []byte, timeout time.Duration) ApplyFuture {
return r.ApplyLog(Log{Data: cmd}, timeout)
}
// ApplyLog performs Apply but takes in a Log directly. The only values
// currently taken from the submitted Log are Data and Extensions.
func (r *Raft) ApplyLog(log Log, timeout time.Duration) ApplyFuture {
metrics.IncrCounter([]string{"raft", "apply"}, 1)
var timer <-chan time.Time
if timeout > 0 {
timer = time.After(timeout)
}
// Create a log future, no index or term yet
logFuture := &logFuture{
log: Log{
Type: LogCommand,
Data: log.Data,
Extensions: log.Extensions,
},
}
logFuture.init()
select {
case <-timer:
return errorFuture{ErrEnqueueTimeout}
case <-r.shutdownCh:
return errorFuture{ErrRaftShutdown}
case r.applyCh <- logFuture:
return logFuture
}
}
leader的处理循环:
接受管道信号
// leaderLoop is the hot loop for a leader. It is invoked
// after all the various leader setup is done.
func (r *Raft) leaderLoop() {
// stepDown is used to track if there is an inflight log that
// would cause us to lose leadership (specifically a RemovePeer of
// ourselves). If this is the case, we must not allow any logs to
// be processed in parallel, otherwise we are basing commit on
// only a single peer (ourself) and replicating to an undefined set
// of peers.
stepDown := false
lease := time.After(r.conf.LeaderLeaseTimeout)
for r.getState() == Leader {
select {
case rpc := <-r.rpcCh:
r.processRPC(rpc)
case <-r.leaderState.stepDown:
r.setState(Follower)
case future := <-r.leadershipTransferCh:
if r.getLeadershipTransferInProgress() {
r.logger.Debug(ErrLeadershipTransferInProgress.Error())
future.respond(ErrLeadershipTransferInProgress)
continue
}
r.logger.Debug("starting leadership transfer", "id", future.ID, "address", future.Address)
// When we are leaving leaderLoop, we are no longer
// leader, so we should stop transferring.
leftLeaderLoop := make(chan struct{})
defer func() { close(leftLeaderLoop) }()
stopCh := make(chan struct{})
doneCh := make(chan error, 1)
// This is intentionally being setup outside of the
// leadershipTransfer function. Because the TimeoutNow
// call is blocking and there is no way to abort that
// in case eg the timer expires.
// The leadershipTransfer function is controlled with
// the stopCh and doneCh.
go func() {
select {
case <-time.After(r.conf.ElectionTimeout):
close(stopCh)
err := fmt.Errorf("leadership transfer timeout")
r.logger.Debug(err.Error())
future.respond(err)
<-doneCh
case <-leftLeaderLoop:
close(stopCh)
err := fmt.Errorf("lost leadership during transfer (expected)")
r.logger.Debug(err.Error())
future.respond(nil)
<-doneCh
case err := <-doneCh:
if err != nil {
r.logger.Debug(err.Error())
}
future.respond(err)
}
}()
// leaderState.replState is accessed here before
// starting leadership transfer asynchronously because
// leaderState is only supposed to be accessed in the
// leaderloop.
id := future.ID
address := future.Address
if id == nil {
s := r.pickServer()
if s != nil {
id = &s.ID
address = &s.Address
} else {
doneCh <- fmt.Errorf("cannot find peer")
continue
}
}
state, ok := r.leaderState.replState[*id]
if !ok {
doneCh <- fmt.Errorf("cannot find replication state for %v", id)
continue
}
go r.leadershipTransfer(*id, *address, state, stopCh, doneCh)
case <-r.leaderState.commitCh:
// Process the newly committed entries
oldCommitIndex := r.getCommitIndex()
commitIndex := r.leaderState.commitment.getCommitIndex()
r.setCommitIndex(commitIndex)
// New configration has been committed, set it as the committed
// value.
if r.configurations.latestIndex > oldCommitIndex &&
r.configurations.latestIndex <= commitIndex {
r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex)
if !hasVote(r.configurations.committed, r.localID) {
stepDown = true
}
}
start := time.Now()
var groupReady []*list.Element
var groupFutures = make(map[uint64]*logFuture)
var lastIdxInGroup uint64
// Pull all inflight logs that are committed off the queue.
for e := r.leaderState.inflight.Front(); e != nil; e = e.Next() {
commitLog := e.Value.(*logFuture)
idx := commitLog.log.Index
if idx > commitIndex {
// Don't go past the committed index
break
}
// Measure the commit time
metrics.MeasureSince([]string{"raft", "commitTime"}, commitLog.dispatch)
groupReady = append(groupReady, e)
groupFutures[idx] = commitLog
lastIdxInGroup = idx
}
// Process the group
if len(groupReady) != 0 {
r.processLogs(lastIdxInGroup, groupFutures)
for _, e := range groupReady {
r.leaderState.inflight.Remove(e)
}
}
// Measure the time to enqueue batch of logs for FSM to apply
metrics.MeasureSince([]string{"raft", "fsm", "enqueue"}, start)
// Count the number of logs enqueued
metrics.SetGauge([]string{"raft", "commitNumLogs"}, float32(len(groupReady)))
if stepDown {
if r.conf.ShutdownOnRemove {
r.logger.Info("removed ourself, shutting down")
r.Shutdown()
} else {
r.logger.Info("removed ourself, transitioning to follower")
r.setState(Follower)
}
}
case v := <-r.verifyCh:
if v.quorumSize == 0 {
// Just dispatched, start the verification
r.verifyLeader(v)
} else if v.votes < v.quorumSize {
// Early return, means there must be a new leader
r.logger.Warn("new leader elected, stepping down")
r.setState(Follower)
delete(r.leaderState.notify, v)
for _, repl := range r.leaderState.replState {
repl.cleanNotify(v)
}
v.respond(ErrNotLeader)
} else {
// Quorum of members agree, we are still leader
delete(r.leaderState.notify, v)
for _, repl := range r.leaderState.replState {
repl.cleanNotify(v)
}
v.respond(nil)
}
case future := <-r.userRestoreCh:
if r.getLeadershipTransferInProgress() {
r.logger.Debug(ErrLeadershipTransferInProgress.Error())
future.respond(ErrLeadershipTransferInProgress)
continue
}
err := r.restoreUserSnapshot(future.meta, future.reader)
future.respond(err)
case future := <-r.configurationsCh:
if r.getLeadershipTransferInProgress() {
r.logger.Debug(ErrLeadershipTransferInProgress.Error())
future.respond(ErrLeadershipTransferInProgress)
continue
}
future.configurations = r.configurations.Clone()
future.respond(nil)
case future := <-r.configurationChangeChIfStable():
if r.getLeadershipTransferInProgress() {
r.logger.Debug(ErrLeadershipTransferInProgress.Error())
future.respond(ErrLeadershipTransferInProgress)
continue
}
r.appendConfigurationEntry(future)
case b := <-r.bootstrapCh:
b.respond(ErrCantBootstrap)
case newLog := <-r.applyCh:
if r.getLeadershipTransferInProgress() {
r.logger.Debug(ErrLeadershipTransferInProgress.Error())
newLog.respond(ErrLeadershipTransferInProgress)
continue
}
// Group commit, gather all the ready commits
ready := []*logFuture{newLog}
GROUP_COMMIT_LOOP:
for i := 0; i < r.conf.MaxAppendEntries; i++ {
select {
case newLog := <-r.applyCh:
ready = append(ready, newLog)
default:
break GROUP_COMMIT_LOOP
}
}
// Dispatch the logs
if stepDown {
// we're in the process of stepping down as leader, don't process anything new
for i := range ready {
ready[i].respond(ErrNotLeader)
}
} else {
r.dispatchLogs(ready)
}
case <-lease:
// Check if we've exceeded the lease, potentially stepping down
maxDiff := r.checkLeaderLease()
// Next check interval should adjust for the last node we've
// contacted, without going negative
checkInterval := r.conf.LeaderLeaseTimeout - maxDiff
if checkInterval < minCheckInterval {
checkInterval = minCheckInterval
}
// Renew the lease timer
lease = time.After(checkInterval)
case <-r.shutdownCh:
return
}
}
}
case newLog := <-r.applyCh:
if r.getLeadershipTransferInProgress() {
r.logger.Debug(ErrLeadershipTransferInProgress.Error())
newLog.respond(ErrLeadershipTransferInProgress)
continue
}
// Group commit, gather all the ready commits
ready := []*logFuture{newLog}
GROUP_COMMIT_LOOP:
for i := 0; i < r.conf.MaxAppendEntries; i++ {
select {
case newLog := <-r.applyCh:
ready = append(ready, newLog)
default:
break GROUP_COMMIT_LOOP
}
}
// Dispatch the logs
if stepDown {
// we're in the process of stepping down as leader, don't process anything new
for i := range ready {
ready[i].respond(ErrNotLeader)
}
} else {
r.dispatchLogs(ready)
}
leader的dispatch处理:
// dispatchLog is called on the leader to push a log to disk, mark it
// as inflight and begin replication of it.
func (r *Raft) dispatchLogs(applyLogs []*logFuture) {
now := time.Now()
defer metrics.MeasureSince([]string{"raft", "leader", "dispatchLog"}, now)
term := r.getCurrentTerm()
lastIndex := r.getLastIndex()
n := len(applyLogs)
logs := make([]*Log, n)
metrics.SetGauge([]string{"raft", "leader", "dispatchNumLogs"}, float32(n))
for idx, applyLog := range applyLogs {
applyLog.dispatch = now
lastIndex++
applyLog.log.Index = lastIndex
applyLog.log.Term = term
logs[idx] = &applyLog.log
r.leaderState.inflight.PushBack(applyLog)
}
// Write the log entry locally
if err := r.logs.StoreLogs(logs); err != nil {
r.logger.Error("failed to commit logs", "error", err)
for _, applyLog := range applyLogs {
applyLog.respond(err)
}
r.setState(Follower)
return
}
r.leaderState.commitment.match(r.localID, lastIndex)
// Update the last log since it's on disk now
r.setLastLog(lastIndex, term)
// Notify the replicators of the new log
for _, f := range r.leaderState.replState {
asyncNotifyCh(f.triggerCh)
}
}
// Notify the replicators of the new log
for _, f := range r.leaderState.replState {
asyncNotifyCh(f.triggerCh)
}
case <-s.triggerCh:
lastLogIdx, _ := r.getLastLog()
shouldStop = r.replicateTo(s, lastLogIdx)
// This is _not_ our heartbeat mechanism but is to ensure
// followers quickly learn the leader's commit index when
// raft commits stop flowing naturally. The actual heartbeats
// can't do this to keep them unblocked by disk IO on the
// follower. See https://github.com/hashicorp/raft/issues/282.
主从复制:
startStopReplication
// startStopReplication will set up state and start asynchronous replication to
// new peers, and stop replication to removed peers. Before removing a peer,
// it'll instruct the replication routines to try to replicate to the current
// index. This must only be called from the main thread.
func (r *Raft) startStopReplication() {
inConfig := make(map[ServerID]bool, len(r.configurations.latest.Servers))
lastIdx := r.getLastIndex()
// Start replication goroutines that need starting
for _, server := range r.configurations.latest.Servers {
if server.ID == r.localID {
continue
}
inConfig[server.ID] = true
if _, ok := r.leaderState.replState[server.ID]; !ok {
r.logger.Info("added peer, starting replication", "peer", server.ID)
s := &followerReplication{
peer: server,
commitment: r.leaderState.commitment,
stopCh: make(chan uint64, 1),
triggerCh: make(chan struct{}, 1),
triggerDeferErrorCh: make(chan *deferError, 1),
currentTerm: r.getCurrentTerm(),
nextIndex: lastIdx + 1,
lastContact: time.Now(),
notify: make(map[*verifyFuture]struct{}),
notifyCh: make(chan struct{}, 1),
stepDown: r.leaderState.stepDown,
}
r.leaderState.replState[server.ID] = s
r.goFunc(func() { r.replicate(s) })
asyncNotifyCh(s.triggerCh)
r.observe(PeerObservation{Peer: server, Removed: false})
}
}
// Stop replication goroutines that need stopping
for serverID, repl := range r.leaderState.replState {
if inConfig[serverID] {
continue
}
// Replicate up to lastIdx and stop
r.logger.Info("removed peer, stopping replication", "peer", serverID, "last-index", lastIdx)
repl.stopCh <- lastIdx
close(repl.stopCh)
delete(r.leaderState.replState, serverID)
r.observe(PeerObservation{Peer: repl.peer, Removed: true})
}
}
replicate
// replicate is a long running routine that replicates log entries to a single
// follower.
func (r *Raft) replicate(s *followerReplication) {
// Start an async heartbeating routing
stopHeartbeat := make(chan struct{})
defer close(stopHeartbeat)
r.goFunc(func() { r.heartbeat(s, stopHeartbeat) })
RPC:
shouldStop := false
for !shouldStop {
select {
case maxIndex := <-s.stopCh:
// Make a best effort to replicate up to this index
if maxIndex > 0 {
r.replicateTo(s, maxIndex)
}
return
case deferErr := <-s.triggerDeferErrorCh:
lastLogIdx, _ := r.getLastLog()
shouldStop = r.replicateTo(s, lastLogIdx)
if !shouldStop {
deferErr.respond(nil)
} else {
deferErr.respond(fmt.Errorf("replication failed"))
}
case <-s.triggerCh:
lastLogIdx, _ := r.getLastLog()
shouldStop = r.replicateTo(s, lastLogIdx)
// This is _not_ our heartbeat mechanism but is to ensure
// followers quickly learn the leader's commit index when
// raft commits stop flowing naturally. The actual heartbeats
// can't do this to keep them unblocked by disk IO on the
// follower. See https://github.com/hashicorp/raft/issues/282.
case <-randomTimeout(r.conf.CommitTimeout):
lastLogIdx, _ := r.getLastLog()
shouldStop = r.replicateTo(s, lastLogIdx)
}
// If things looks healthy, switch to pipeline mode
if !shouldStop && s.allowPipeline {
goto PIPELINE
}
}
return
PIPELINE:
// Disable until re-enabled
s.allowPipeline = false
// Replicates using a pipeline for high performance. This method
// is not able to gracefully recover from errors, and so we fall back
// to standard mode on failure.
if err := r.pipelineReplicate(s); err != nil {
if err != ErrPipelineReplicationNotSupported {
r.logger.Error("failed to start pipeline replication to", "peer", s.peer, "error", err)
}
}
goto RPC
}
// replicateTo is a helper to replicate(), used to replicate the logs up to a
// given last index.
// If the follower log is behind, we take care to bring them up to date.
func (r *Raft) replicateTo(s *followerReplication, lastIndex uint64) (shouldStop bool) {
// Create the base request
var req AppendEntriesRequest
var resp AppendEntriesResponse
var start time.Time
START:
// Prevent an excessive retry rate on errors
if s.failures > 0 {
select {
case <-time.After(backoff(failureWait, s.failures, maxFailureScale)):
case <-r.shutdownCh:
}
}
// Setup the request
if err := r.setupAppendEntries(s, &req, atomic.LoadUint64(&s.nextIndex), lastIndex); err == ErrLogNotFound {
goto SEND_SNAP
} else if err != nil {
return
}
// Make the RPC call
start = time.Now()
if err := r.trans.AppendEntries(s.peer.ID, s.peer.Address, &req, &resp); err != nil {
r.logger.Error("failed to appendEntries to", "peer", s.peer, "error", err)
s.failures++
return
}
appendStats(string(s.peer.ID), start, float32(len(req.Entries)))
// Check for a newer term, stop running
if resp.Term > req.Term {
r.handleStaleTerm(s)
return true
}
// Update the last contact
s.setLastContact()
// Update s based on success
if resp.Success {
// Update our replication state
updateLastAppended(s, &req)
// Clear any failures, allow pipelining
s.failures = 0
s.allowPipeline = true
} else {
atomic.StoreUint64(&s.nextIndex, max(min(s.nextIndex-1, resp.LastLog+1), 1))
if resp.NoRetryBackoff {
s.failures = 0
} else {
s.failures++
}
r.logger.Warn("appendEntries rejected, sending older logs", "peer", s.peer, "next", atomic.LoadUint64(&s.nextIndex))
}
CHECK_MORE:
// Poll the stop channel here in case we are looping and have been asked
// to stop, or have stepped down as leader. Even for the best effort case
// where we are asked to replicate to a given index and then shutdown,
// it's better to not loop in here to send lots of entries to a straggler
// that's leaving the cluster anyways.
select {
case <-s.stopCh:
return true
default:
}
// Check if there are more logs to replicate
if atomic.LoadUint64(&s.nextIndex) <= lastIndex {
goto START
}
return
// SEND_SNAP is used when we fail to get a log, usually because the follower
// is too far behind, and we must ship a snapshot down instead
SEND_SNAP:
if stop, err := r.sendLatestSnapshot(s); stop {
return true
} else if err != nil {
r.logger.Error("failed to send snapshot to", "peer", s.peer, "error", err)
return
}
// Check if there is more to replicate
goto CHECK_MORE
}
replicateTo
// replicateTo is a helper to replicate(), used to replicate the logs up to a
// given last index.
// If the follower log is behind, we take care to bring them up to date.
func (r *Raft) replicateTo(s *followerReplication, lastIndex uint64) (shouldStop bool) {
// Create the base request
var req AppendEntriesRequest
var resp AppendEntriesResponse
var start time.Time
START:
// Prevent an excessive retry rate on errors
if s.failures > 0 {
select {
case <-time.After(backoff(failureWait, s.failures, maxFailureScale)):
case <-r.shutdownCh:
}
}
// Setup the request
if err := r.setupAppendEntries(s, &req, atomic.LoadUint64(&s.nextIndex), lastIndex); err == ErrLogNotFound {
goto SEND_SNAP
} else if err != nil {
return
}
// Make the RPC call
start = time.Now()
if err := r.trans.AppendEntries(s.peer.ID, s.peer.Address, &req, &resp); err != nil {
r.logger.Error("failed to appendEntries to", "peer", s.peer, "error", err)
s.failures++
return
}
appendStats(string(s.peer.ID), start, float32(len(req.Entries)))
// Check for a newer term, stop running
if resp.Term > req.Term {
r.handleStaleTerm(s)
return true
}
// Update the last contact
s.setLastContact()
// Update s based on success
if resp.Success {
// Update our replication state
updateLastAppended(s, &req)
// Clear any failures, allow pipelining
s.failures = 0
s.allowPipeline = true
} else {
atomic.StoreUint64(&s.nextIndex, max(min(s.nextIndex-1, resp.LastLog+1), 1))
if resp.NoRetryBackoff {
s.failures = 0
} else {
s.failures++
}
r.logger.Warn("appendEntries rejected, sending older logs", "peer", s.peer, "next", atomic.LoadUint64(&s.nextIndex))
}
CHECK_MORE:
// Poll the stop channel here in case we are looping and have been asked
// to stop, or have stepped down as leader. Even for the best effort case
// where we are asked to replicate to a given index and then shutdown,
// it's better to not loop in here to send lots of entries to a straggler
// that's leaving the cluster anyways.
select {
case <-s.stopCh:
return true
default:
}
// Check if there are more logs to replicate
if atomic.LoadUint64(&s.nextIndex) <= lastIndex {
goto START
}
return
// SEND_SNAP is used when we fail to get a log, usually because the follower
// is too far behind, and we must ship a snapshot down instead
SEND_SNAP:
if stop, err := r.sendLatestSnapshot(s); stop {
return true
} else if err != nil {
r.logger.Error("failed to send snapshot to", "peer", s.peer, "error", err)
return
}
// Check if there is more to replicate
goto CHECK_MORE
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)