e = &Etcd{cfg: *inCfg, stopc: make(chan struct{})} ... e.Server.Start()
e.servePeers()
e.serveClients()
展示一下Etcd的结构。一个总引擎。发现注释还是很有必要写的。
// Etcd contains a running etcd server and its listeners. type Etcd struct { Peers []*peerListener Clients []net.Listener // a map of contexts for the servers that serves client requests. sctxs map[string]*serveCtx metricsListeners []net.Listener
tracingExporterShutdown func()
Server *etcdserver.EtcdServer
cfg Config
// closeOnce is to ensure `stopc` is closed only once, no matter // how many times the Close() method is called. closeOnce sync.Once // stopc is used to notify the sub goroutines not to send // any errors to `errc`. stopc chanstruct{} // errc is used to receive error from sub goroutines (including // client handler, peer handler and metrics handler). It's closed // after all these sub goroutines exit (checked via `wg`). Writers // should avoid writing after `stopc` is closed by selecting on // reading from `stopc`. errc chanerror
// wg is used to track the lifecycle of all sub goroutines which // need to send error back to the `errc`. wg sync.WaitGroup }
type kvServer struct { hdr header kv etcdserver.RaftKV // maxTxnOps is the max operations per txn. // e.g suppose maxTxnOps = 128. // Txn.Success can have at most 128 operations, // and Txn.Failure can have at most 128 operations. maxTxnOps uint } ... func (s *kvServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) { if err := checkPutRequest(r); err != nil { return nil, err }
for { select { case ap := <-s.r.apply(): f := schedule.NewJob("server_applyAll", func(context.Context) { s.applyAll(&ep, &ap) }) sched.Schedule(f) case leases := <-expiredLeaseC: s.revokeExpiredLeases(leases) case err := <-s.errorc: lg.Warn("server error", zap.Error(err)) lg.Warn("data-dir used by this member must be removed") return case <-s.stop: return } }
注意到applyAll,这里使用到了调度器,也是值得研究的一个点。调用了
s.applyEntries(ep, apply)
来到server/etcdserver/server.go的apply函数
switch ent.Type { case raftpb.EntryNormal: // 普通条目:客户端请求(PUT、DELETE等) s.applyEntryNormal(&ent) case raftpb.EntryConfChange: // 配置变更条目:添加/删除成员等 s.applyConfChange(&ent, confState) case raftpb.EntryConfChangeV2: // V2配置变更条目 s.applyConfChangeV2(&ent, confState) }
到applyEntryNormal
if shouldApplyV3 { defer func() { // The txPostLockInsideApplyHook will not get called in some cases, // in which we should move the consistent index forward directly. newIndex := s.consistIndex.ConsistentIndex() if newIndex < e.Index { s.consistIndex.SetConsistentIndex(e.Index, e.Term) } }() }
保证兜底索引一致性
if needResult || !noSideEffect(&raftReq) { if !needResult && raftReq.Txn != nil { removeNeedlessRangeReqs(raftReq.Txn) } ar = s.applyInternalRaftRequest(&raftReq, shouldApplyV3) }
逐层寻找,来到分发函数dispatch
switch { ... case r.Put != nil: op = "Put" ar.Resp, ar.Trace, ar.Err = a.applyV3.Put(r.Put) ... }
func(tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) { rev := tw.beginRev + 1 c := rev oldLease := lease.NoLease
// if the key exists before, use its previous created and // get its previous leaseID _, created, ver, err := tw.s.kvindex.Get(key, rev) if err == nil { c = created.Main oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)}) tw.trace.Step("get key's previous created_revision and leaseID") } ibytes := NewRevBytes() idxRev := Revision{Main: rev, Sub: int64(len(tw.changes))} ibytes = RevToBytes(idxRev, ibytes)
// get gets the modified, created revision and version of the key that satisfies the given atRev. // Rev must be smaller than or equal to the given atRev. func(ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created Revision, ver int64, err error) { if ki.isEmpty() { lg.Panic( "'get' got an unexpected empty keyIndex", zap.String("key", string(ki.key)), ) } g := ki.findGeneration(atRev) if g.isEmpty() { return Revision{}, Revision{}, 0, ErrRevisionNotFound }
n := g.walk(func(rev Revision)bool { return rev.Main > atRev }) if n != -1 { return g.revs[n], g.created, g.ver - int64(len(g.revs)-n-1), nil }
type toApply struct { entries []raftpb.Entry snapshot raftpb.Snapshot // notifyc synchronizes etcd server applies with the raft node notifyc chanstruct{} // raftAdvancedC notifies EtcdServer.apply that // 'raftLog.applied' has advanced by r.Advance // it should be used only when entries contain raftpb.EntryConfChange raftAdvancedC <-chanstruct{} }
for { select { case ap := <-s.r.apply(): f := schedule.NewJob("server_applyAll", func(context.Context) { s.applyAll(&ep, &ap) }) sched.Schedule(f) case leases := <-expiredLeaseC: s.revokeExpiredLeases(leases) case err := <-s.errorc: lg.Warn("server error", zap.Error(err)) lg.Warn("data-dir used by this member must be removed") return case <-s.stop: return } }
然后找了找,破案了,就是个先进先出队列,没啥。
type fifo struct { mu sync.Mutex
resume chan struct{} scheduled int finished int pendings []Job
ctx context.Context cancel context.CancelFunc
finishCond *sync.Cond donec chan struct{} lg *zap.Logger }
go func() { defer func() { stopc <- struct{}{} }() wps.recvLoop() }() go func() { defer func() { stopc <- struct{}{} }() wps.sendLoop() }() // tear down watch if leader goes down or entire watch proxy is terminated go func() { defer func() { stopc <- struct{}{} }() select { case <-lostLeaderC: case <-ctx.Done(): case <-wp.ctx.Done(): } }()
这个负责反馈给用户
// post puts a watch response on the watcher's proxy stream channel func (w *watcher) post(wr *pb.WatchResponse) bool { select { case w.wps.watchCh <- wr: case <-time.After(50 * time.Millisecond): w.wps.cancel() w.wps.lg.Error("failed to put a watch response on the watcher's proxy stream channel,err is timeout") return false } return true }
sendLoop直接发到订阅方
func (wps *watchProxyStream) sendLoop() { for { select { case wresp, ok := <-wps.watchCh: if !ok { return } if err := wps.stream.Send(wresp); err != nil { return } case <-wps.ctx.Done(): return } } }
rev := tw.Rev() + 1 evs := make([]mvccpb.Event, len(changes)) for i, change := range changes { evs[i].Kv = &changes[i] if change.CreateRevision == 0 { evs[i].Type = mvccpb.DELETE evs[i].Kv.ModRevision = rev } else { evs[i].Type = mvccpb.PUT } }
// end write txn under watchable store lock so the updates are visible // when asynchronous event posting checks the current store revision tw.s.mu.Lock() tw.s.notify(rev, evs) tw.TxnWrite.End() tw.s.mu.Unlock() } func (tw *watchableStoreTxnWrite) End() { changes := tw.Changes() if len(changes) == 0 { tw.TxnWrite.End() return }
rev := tw.Rev() + 1 evs := make([]mvccpb.Event, len(changes)) for i, change := range changes { evs[i].Kv = &changes[i] if change.CreateRevision == 0 { evs[i].Type = mvccpb.DELETE evs[i].Kv.ModRevision = rev } else { evs[i].Type = mvccpb.PUT } }
// end write txn under watchable store lock so the updates are visible // when asynchronous event posting checks the current store revision tw.s.mu.Lock() tw.s.notify(rev, evs) tw.TxnWrite.End() tw.s.mu.Unlock() }