1.概览

分布式kv存储,一致性,高可用,采用raft。预写日志可用于恢复。日志多了转化成快照。提供grpc api可以被调用。watch机制可监听key变化。lease(租约)用于实现心跳。

经典的案例是k8s用它做配置中心。

和redis的比较

租约 对应ttl,watch机制对应订阅机制。

和zookeeper比较

多版本并发控制,语言支持更好。

raft

法定人数:写数据需要半数以上节点同意。一个宕机,集群将不可写。

奇数节点:因为偶数节点会形成平票结果,系统会不可写。

设计

https://etcd.io/docs/v3.6/learning/

kv存储:b+树。key是三元组,major(在源码里是main)表示版本,sub表示健,type为特殊后缀,如墓碑为0为当前代结束。维持二级 B 树索引加速范围查询。key具有mvcc。

learner:新加入的节点由于需要同步太多日志造成网络负担降低可用性。引入学习者概念,只负责同步日志而不算在etcd工作节点。

auth:用户角色模型。持久化存储。

持久化:bbolt。b+树文件存储,文件形式存预写日志和快照。

api

KV APIs

Watch APIs

Lease APIs

流程

比如一个put操作会发生那些事情。

server/etcdmain/main.go

startEtcdOrProxyV2(args)

server/etcdmain/etcd.go 的startEtcdOrProxyV2

which := identifyDataDirOrDie(cfg.ec.GetLogger(), cfg.ec.Dir)
if which != dirEmpty {
lg.Info(
"server has already been initialized",
zap.String("data-dir", cfg.ec.Dir),
zap.String("dir-type", string(which)),
)
switch which {
case dirMember:
stopped, errc, err = startEtcd(&cfg.ec)
case dirProxy:
lg.Panic("v2 http proxy has already been deprecated in 3.6", zap.String("dir-type", string(which)))
default:
lg.Panic(
"unknown directory type",
zap.String("dir-type", string(which)),
)
}
} else {
lg.Info(
"Initialize and start etcd server",
zap.String("data-dir", cfg.ec.Dir),
zap.String("dir-type", string(which)),
)
stopped, errc, err = startEtcd(&cfg.ec)
}
func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) {
e, err := embed.StartEtcd(cfg)
if err != nil {
return nil, nil, err
}
osutil.RegisterInterruptHandler(e.Close)
select {
case <-e.Server.ReadyNotify(): // wait for e.Server to join the cluster
case <-e.Server.StopNotify(): // publish aborted from 'ErrStopped'
}
return e.Server.StopNotify(), e.Err(), nil
}

此处可以看到startEtcd是阻塞的一个主程序,直到退出信号或者报错了才会往下走执行优雅的程序退出。

server/embed/etcd.go StartEtcd

先创建了一个etcd实例,然后启动后台监控和管理协程,启动对peer节点的服务,最后启动对client的服务。

e = &Etcd{cfg: *inCfg, stopc: make(chan struct{})}
...
e.Server.Start()

e.servePeers()

e.serveClients()

展示一下Etcd的结构。一个总引擎。发现注释还是很有必要写的。

// Etcd contains a running etcd server and its listeners.
type Etcd struct {
Peers []*peerListener
Clients []net.Listener
// a map of contexts for the servers that serves client requests.
sctxs map[string]*serveCtx
metricsListeners []net.Listener

tracingExporterShutdown func()

Server *etcdserver.EtcdServer

cfg Config

// closeOnce is to ensure `stopc` is closed only once, no matter
// how many times the Close() method is called.
closeOnce sync.Once
// stopc is used to notify the sub goroutines not to send
// any errors to `errc`.
stopc chan struct{}
// errc is used to receive error from sub goroutines (including
// client handler, peer handler and metrics handler). It's closed
// after all these sub goroutines exit (checked via `wg`). Writers
// should avoid writing after `stopc` is closed by selecting on
// reading from `stopc`.
errc chan error

// wg is used to track the lifecycle of all sub goroutines which
// need to send error back to the `errc`.
wg sync.WaitGroup
}

Put操作的关键是去看serveClient暴露的接口。

这里提一嘴go的服务器。不用框架情况下,http包的mux处理路由配置。然后serve函数启动。sctx是servectx,可以理解为一个serve。目前看来如果配置里写的监听地址是0.0.0.0,基本上就是单例,不用太care。

mux := http.NewServeMux()
etcdhttp.HandleDebug(mux)
etcdhttp.HandleVersion(mux, e.Server)
etcdhttp.HandleMetrics(mux)
etcdhttp.HandleHealth(e.cfg.logger, mux, e.Server)
...
// start client servers in each goroutine
for _, sctx := range e.sctxs {
s := sctx
e.startHandler(func() error {
return s.serve(e.Server, &e.cfg.ClientTLSInfo, mux, e.errHandler, e.grpcGatewayDial(splitHTTP), splitHTTP, gopts...)
})
}

server/embed/serve.go 注册了http和grpc的服务。这边就直接快进。到server/etcdserver/api/v3rpc/grpc.go Server的

pb.RegisterKVServer(grpcServer, NewQuotaKVServer(s))
pb.RegisterWatchServer(grpcServer, NewWatchServer(s))
pb.RegisterLeaseServer(grpcServer, NewQuotaLeaseServer(s))
pb.RegisterClusterServer(grpcServer, NewClusterServer(s))
pb.RegisterAuthServer(grpcServer, NewAuthServer(s))

KVserver是我们目前关注的。注意它的New函数,创建的函数声称了配额,说明它的占用资源是有限制的。在server/etcdserver/api/v3rpc/quota.go中

func NewQuotaKVServer(s *etcdserver.EtcdServer) pb.KVServer {
return &quotaKVServer{
NewKVServer(s),
quotaAlarmer{newBackendQuota(s, "kv"), s, s.MemberID()},
}
}

额外构造了一个报警器。这个实现其实就是规定一个大小,然后提供check接口,每次发生存储变动就检查一下。先关注NewKVServer,他的api层在server/etcdserver/api/v3rpc/key.go

type kvServer struct {
hdr header
kv etcdserver.RaftKV
// maxTxnOps is the max operations per txn.
// e.g suppose maxTxnOps = 128.
// Txn.Success can have at most 128 operations,
// and Txn.Failure can have at most 128 operations.
maxTxnOps uint
}
...
func (s *kvServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
if err := checkPutRequest(r); err != nil {
return nil, err
}

resp, err := s.kv.Put(ctx, r)
if err != nil {
return nil, togRPCError(err)
}

s.hdr.fill(resp.Header)
return resp, nil
}

首先raftkv是一个动作集。我们向上溯源,发现new出来的实例是一个EtcdServer,在server/etcdserver/server.go

看Put函数先,checkPutRequest是数据校验,然后server/etcdserver/v3_server.go的

func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
ctx = context.WithValue(ctx, traceutil.StartTimeKey{}, time.Now())
resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r})
if err != nil {
return nil, err
}
return resp.(*pb.PutResponse), nil
}

InternalRaftRequest是个大杂烩,然后我只需要把put请求放在里面,接下来就可以用通用的请求处理函数了。往下走几步。

err = s.r.Propose(cctx, data)

提出到raft,等待共共识完成回调(raft包之后再看)。

之前我们提到启动时候有个start方法,里面有个run方法的末尾

for {
select {
case ap := <-s.r.apply():
f := schedule.NewJob("server_applyAll", func(context.Context) { s.applyAll(&ep, &ap) })
sched.Schedule(f)
case leases := <-expiredLeaseC:
s.revokeExpiredLeases(leases)
case err := <-s.errorc:
lg.Warn("server error", zap.Error(err))
lg.Warn("data-dir used by this member must be removed")
return
case <-s.stop:
return
}
}

注意到applyAll,这里使用到了调度器,也是值得研究的一个点。调用了

s.applyEntries(ep, apply)

来到server/etcdserver/server.go的apply函数

switch ent.Type {
case raftpb.EntryNormal:
// 普通条目:客户端请求(PUT、DELETE等)
s.applyEntryNormal(&ent)

case raftpb.EntryConfChange:
// 配置变更条目:添加/删除成员等
s.applyConfChange(&ent, confState)

case raftpb.EntryConfChangeV2:
// V2配置变更条目
s.applyConfChangeV2(&ent, confState)
}

到applyEntryNormal

if shouldApplyV3 {
defer func() {
// The txPostLockInsideApplyHook will not get called in some cases,
// in which we should move the consistent index forward directly.
newIndex := s.consistIndex.ConsistentIndex()
if newIndex < e.Index {
s.consistIndex.SetConsistentIndex(e.Index, e.Term)
}
}()
}

保证兜底索引一致性

if needResult || !noSideEffect(&raftReq) {
if !needResult && raftReq.Txn != nil {
removeNeedlessRangeReqs(raftReq.Txn)
}
ar = s.applyInternalRaftRequest(&raftReq, shouldApplyV3)
}

逐层寻找,来到分发函数dispatch

	switch {
...
case r.Put != nil:
op = "Put"
ar.Resp, ar.Trace, ar.Err = a.applyV3.Put(r.Put)
...
}

然后就可以认真看下这个put函数了,在server/etcdserver/apply/apply.go

func (a *applierV3backend) Put(p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
return mvcctxn.Put(context.TODO(), a.lg, a.lessor, a.kv, p)
}

来到server/etcdserver/txn/txn.go的crud函数了

func Put(ctx context.Context, lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
trace = traceutil.Get(ctx)
// create put tracing if the trace in context is empty
if trace.IsEmpty() {
trace = traceutil.New("put",
lg,
traceutil.Field{Key: "key", Value: string(p.Key)},
traceutil.Field{Key: "req_size", Value: p.Size()},
)
ctx = context.WithValue(ctx, traceutil.TraceKey{}, trace)
}
leaseID := lease.LeaseID(p.Lease)
if leaseID != lease.NoLease {
if l := lessor.Lookup(leaseID); l == nil {
return nil, nil, lease.ErrLeaseNotFound
}
}
txnWrite := kv.Write(trace)
defer txnWrite.End()
resp, err = put(ctx, txnWrite, p)
return resp, trace, err
}

初始化trace,然后租约认证,然后创建写事务,kv存储事务,在server/storage/mvcc/kvstore_txn.go

func (tw *storeTxnWrite) Put(key, value []byte, lease lease.LeaseID) int64 {
tw.put(key, value, lease)
return tw.beginRev + 1
}

func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
rev := tw.beginRev + 1
c := rev
oldLease := lease.NoLease

// if the key exists before, use its previous created and
// get its previous leaseID
_, created, ver, err := tw.s.kvindex.Get(key, rev)
if err == nil {
c = created.Main
oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)})
tw.trace.Step("get key's previous created_revision and leaseID")
}
ibytes := NewRevBytes()
idxRev := Revision{Main: rev, Sub: int64(len(tw.changes))}
ibytes = RevToBytes(idxRev, ibytes)

ver = ver + 1
kv := mvccpb.KeyValue{
Key: key,
Value: value,
CreateRevision: c,
ModRevision: rev,
Version: ver,
Lease: int64(leaseID),
}

d, err := kv.Marshal()
if err != nil {
tw.storeTxnCommon.s.lg.Fatal(
"failed to marshal mvccpb.KeyValue",
zap.Error(err),
)
}

tw.trace.Step("marshal mvccpb.KeyValue")
tw.tx.UnsafeSeqPut(schema.Key, ibytes, d)
tw.s.kvindex.Put(key, idxRev)
tw.changes = append(tw.changes, kv)
tw.trace.Step("store kv pair into bolt db")

if oldLease == leaseID {
tw.trace.Step("attach lease to kv pair")
return
}

if oldLease != lease.NoLease {
if tw.s.le == nil {
panic("no lessor to detach lease")
}
err = tw.s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
if err != nil {
tw.storeTxnCommon.s.lg.Error(
"failed to detach old lease from a key",
zap.Error(err),
)
}
}
if leaseID != lease.NoLease {
if tw.s.le == nil {
panic("no lessor to attach lease")
}
err = tw.s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}})
if err != nil {
panic("unexpected error from lease Attach")
}
}
tw.trace.Step("attach lease to kv pair")
}

先按key和版本号去get到索引。此处就是先前所说的内存维持的一颗B树,作为二级的快速索引。解释一下,store结构里维持着一个index接口,这个接口的实现是treeindex。

type treeIndex struct {
sync.RWMutex
tree *btree.BTreeG[*keyIndex]
lg *zap.Logger
}
func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created Revision, ver int64, err error) {
ti.RLock()
defer ti.RUnlock()
return ti.unsafeGet(key, atRev)
}

这一步实现很精髓,get方法里面上锁并调用线程不安全的get,思路很好。

type generation struct {
ver int64
created Revision // when the generation is created (put in first revision).
revs []Revision
}

这是代的结构体,对应着生命周期,一代就是一个key的诞生到消亡,delete之后,新的健就是新的一代。而rev是版本链,此处的表现直接就是一个slice。

考虑以下场景

generations:
{empty} // Generation 2
{4.0, 5.0(t)} // Generation 1: 版本4-5,版本5是墓碑
{1.0, 2.0, 3.0(t)} // Generation 0: 版本1-3,版本3是墓碑

GET key@revision=2 → 找到 Generation 0,返回版本 2.0 ✓
GET key@revision=3 → 找到 Generation 0,返回版本 3.0 (墓碑) → 键不存在
GET key@revision=4 → 找到 Generation 1,返回版本 4.0 ✓
GET key@revision=5 → 找到 Generation 1,返回版本 5.0 (墓碑) → 键不存在
GET key@revision=6 → 查找失败,键不存在

版本是持续迭代的,delete代也就是墓碑代,但是不可被查询。

回观get,keyindex具有生命周期属性和版本属性,先找到最新的代,再找到最新的版本。

// get gets the modified, created revision and version of the key that satisfies the given atRev.
// Rev must be smaller than or equal to the given atRev.
func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created Revision, ver int64, err error) {
if ki.isEmpty() {
lg.Panic(
"'get' got an unexpected empty keyIndex",
zap.String("key", string(ki.key)),
)
}
g := ki.findGeneration(atRev)
if g.isEmpty() {
return Revision{}, Revision{}, 0, ErrRevisionNotFound
}

n := g.walk(func(rev Revision) bool { return rev.Main > atRev })
if n != -1 {
return g.revs[n], g.created, g.ver - int64(len(g.revs)-n-1), nil
}

return Revision{}, Revision{}, 0, ErrRevisionNotFound
}

回观put。如果没拿到对应的key,则去找更老的版本。这里懒得看。然后下一步是序列化。

存的值又是一整个kv

kv := mvccpb.KeyValue{
Key: key,
Value: value,
CreateRevision: c,
ModRevision: rev,
Version: ver,
Lease: int64(leaseID),
}

所以主要做这几个事情:存储到bolt(b+树),存储到内存二级索引b树,更新事务写更改队列。对于bolt,应该更多去了解这个包。后文再议。

tw.tx.UnsafeSeqPut(schema.Key, ibytes, d)
tw.s.kvindex.Put(key, idxRev)
tw.changes = append(tw.changes, kv)

同时需要更新租约。对于租约,管理器会轮询检查过期的租约的(一个ticker),就比较朴素。

这样就基本完成了。然后trace和log是贯穿始终的。

2.存储

很关键。有mvcc和b+树,覆盖到了mysql的考点,也是etcd唯一资源kv的核心物理设计之一。

版本

server/storage/mvcc/revision.go

版本号:长度8+1+8,中间的1是下划线,前后分别是main和sub。

你可能好奇最后的特殊后缀去哪了。

其实所谓三元组是这样的。也就是B+树的key的样子。不用听文档瞎吹,其实主要运用就在这里,加了一个字节用来表示墓碑。

type BucketKey struct {
Revision
tombstone bool
}

and这个key叫bucketkey。意味着存储时候他们定义这个存储结构是bucket为一个单元。

有一个细节,关于序列化。使用的是大端序,因为这和字典序一致,便于范围查找。此外,思考序列化的意义。此处主要是为了存储。存储在文件的话就是bytes。而在内存里就是结构体。此二者相互转化的意义就是kv的结构可以对磁盘进行读写,并且在工作过程中mvcc状态被提取到内存里的(bucketkey)。这和mysql不一样,因为mysql的场景数据量大,索引也是分了很多级的。

索引(b树)

上文有提

bolt(b+树)

参考:https://pkg.go.dev/go.etcd.io/bbolt

bbolt你可能以为是个什么三方包,其实也是etcd团队额外封装的一个包。定位是不需要很重的数据库但是需要优秀的持久化结构。理解为轻量简洁持久化。区别在于,bolt不支持sql也不是关系型数据库,专注与kv持久化。单文件存储,写入是串行的。多个goroutine同时操作一个事务是线程不安全的。

事务

Bolt 一次只允许一个读写事务,但允许同时进行任意数量的只读事务。读写事务使用Update方法。只读事务使用View方法。

每个事务都拥有与事务启动时一致的数据视图。根据所学的知识,这叫可重复读。

事务方面还有Batch批读写,但是需要内部函数幂等,因为他可能重试执行。还有Begin,Commit用于手动开启和提交事务。

buckets

桶是存储中关键的概念。在这里,它是kv集合,所有key必须是唯一。Tx.CreateBucket()可以创建桶。

此外,事务的操作是db的,那么可以理解为tx对象的操作就是对于bucket的crud。所以就不赘述了。每一个桶有很多kv,你也可以理解为桶就是单独一个容器空间,他们下面的key是不能重复的,但是不同bucket之间可以重复key。

桶调用NextSequence()可以获取自动递增的id,

这里要看看迭代健

db.View(func(tx *bolt.Tx) error {
// Assume bucket exists and has keys
b := tx.Bucket([]byte("MyBucket"))

c := b.Cursor()

for k, v := c.First(); k != nil; k, v = c.Next() {
fmt.Printf("key=%s, value=%s\n", k, v)
}

return nil
})

这里可以发现指针+链表可以对字典序的kv列表进行迭代。而这种迭代我们可以初步认为是b+树的叶子节点了。

前缀扫描也很妙,因为字典序前缀相同的一定是黏在一块。找范围的原理也类似。

db.View(func(tx *bolt.Tx) error {
// Assume bucket exists and has keys
c := tx.Bucket([]byte("MyBucket")).Cursor()

prefix := []byte("1234")
for k, v := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix); k, v = c.Next() {
fmt.Printf("key=%s, value=%s\n", k, v)
}

return nil
})
嵌套桶

v里面可以存储桶,也就是嵌套桶。一个bucket维护一个b+树。相当于mysql的表,一个表维持一个树

b+树

这一块就不详细说了,是一个比较经典的话题,建议手搓一遍了解。

3.raft共识

核心实现在 server/etcdserver/raft.go。但是在此之前有必要先了解一下raft本身和三方包实现。

raft机制

首先raft是强一致。从提议到决策到apply是一条事务,比如etcd put请求,本地的日志对外不可见,先会转发给leader,leader发起投票,获得多数派同意后,然后再广播apply信号获得才会被应用。

第一句话可能有点难懂,因此要说一下构成。实体有三种角色,leader,follower和candidate,正常情况leader是最新版本,follower向leader对齐,而leader时刻给所有follower发送存活信号。如果存活信号超时,说明leader挂了,则follower转化为candidate。由于超时时间是随机的,因此只会出现少数follower成为candidate的情况。一旦成为candidate,其他follower会收到candidate的信号然后进行投票。投票依据则是term(任期)和日志最新度。

这里就要描述raft的资源是什么了,那就是日志。你也可以理解为数据,所以数据的强一致也就在这里被偷换成了日志的强一致。而任期可以理解为时长。这是一个类似周期的概念,创建到消亡算一个任期,那么任期长的显然不容易挂,当然更适合当leader。因为竞选过程是一种服务暂停的状态,因此其实还是挂的越少越好。刚才描述的过程也叫选举。多数派同意(>1/2)才可以成为leader,这样可以有效避免脑裂(出现多个leader)。而部署要求节点一定是奇数,避免平票情况。而且多数派的参考是所有机子(包括宕机的)。那么就会有一种情况,如果宕机数量过多导致无法选出leader。这时集群就坏了,宁可不用也不能不一致,也不会有leader,进入只读状态。解决方案就是恢复宕机的机子或者增加机子的总数量来消除宕机量。

还有一点是保证日志的顺序(也是一种一致的安全保障)。日志在raft里叫entry。而leader发布AppendEntries 时,会带一前一条日志的信息进行校验,只有匹配才会接受新日志,这样可以保证日志顺序是完全正确的。如果leader宕机,新的日志以新leader为主,也就是说老leader如果有很新的日志但是没被其他人同步,那这些就会被遗弃且被新leader的日志强制覆盖。而老leader复活,会检查term,如果有term大于自己的,说明自己已经过时了,会自动变成follower。

这种日志同步机制可以从一个场景中看出妙处。比如如果一个决议没有通过,则leader不会显示回应,但是指针在follower上却没有到未提交的那条失败日志上,因此在向leader对齐的过程中逐渐被覆盖。且只有指针前代表着已提交或可读,之后为未提交且不可读。这就是决议失败的日志的处理,没有显式处理却保证了结果的正确性。而且日志的提交和构建是分开两步的。先AppendEntries ,再Commit。这意味着现在每个节点添加了未提交的日志,再获得指针后移的信号。这其实很好理解,因为节点投票的本质是确认是否写成功了。如果写成功,就同时返回投赞成票的信息给leader。至此,整个raft机制已形成闭环。

总的来说,一是高可用机制,leader选举+term+心跳保活的体系。二是绝对一致性机制,日志同步上,以leader为参考不断地更新;提交决议上,统一交给leader发起投票和确认。

raft包的使用

然后开始阅读server/etcdserver/raft.go

先说两个关键概念,ready和advance。对于一次提议,每次状态变更会收到ready信号,比如leader处理提议和收到投票等。而整个事务完成会收到advance信号代表完成。commit发生在ready之后,也就是advance之前。

ready的触发时机:1.leader收到新的提案,将提案写入本地日志后触发ready,包括entries和发送给followers的message,其实就是下一步动作需要的东西的打包。2.收到网络请求。如follower接收leader的日志复制,leader接收follower的响应,心跳,投票等。总结:日志提交、消息、快照。

tick:超时与心跳,内部的计时器

如果是leader

一个ready可以理解为一次请求

构建toApply对象。用于更新日志和快照。

notifyc := make(chan struct{}, 1)
raftAdvancedC := make(chan struct{}, 1)
ap := toApply{
entries: rd.CommittedEntries,
snapshot: rd.Snapshot,
notifyc: notifyc,
raftAdvancedC: raftAdvancedC,
}

updateCommittedIndex(&ap, rh)

这里代表着更新指针到最新已提交。

type toApply struct {
entries []raftpb.Entry
snapshot raftpb.Snapshot
// notifyc synchronizes etcd server applies with the raft node
notifyc chan struct{}
// raftAdvancedC notifies EtcdServer.apply that
// 'raftLog.applied' has advanced by r.Advance
// it should be used only when entries contain raftpb.EntryConfChange
raftAdvancedC <-chan struct{}
}

然后如果是leader就开始发送更新信息。send是一个消息的发送,异步执行func。也可以理解为leader的通知和存盘是异步的。

if islead {
// gofail: var raftBeforeLeaderSend struct{}
r.transport.Send(r.processMessages(rd.Messages))
}

然后就可以开始存盘

r.storage.SaveSnap(rd.Snapshot)
r.storage.Save(rd.HardState, rd.Entries)

如果有快照,说明需要使用快照,因此强制刷盘,

添加新日志到内存

r.raftStorage.Append(rd.Entries)

follower返回投票并等待apply完成的通知。

r.transport.Send(msgs)

Leader 则直接通知 apply 完成。

notifyc <- struct{}{}

通知ready已完成。

r.Advance()

总结一下这个raft驱动器做了什么。select tick和ready的循环,持久化WAL和Snapshot,应用日志到内存,发送通知。

然后来讨论raft包在这里的角色。他是一个纯逻辑的状态机,不做IO。首先node.go 有个interface为Node,是核心结构。然后他提供了tick和ready。一个ready对应一个advance来结束这个ready的周期。ready还维持messages,提供Append接口用于用户放入内存或落盘。ready可以理解粗浅为一个消息事件,相当于收到了外来的消息,管道就会传来一个ready。具体的如何可能需要自己写一遍才有感受。

所以这一部分先告一段落,等我搓个实现出来,后续再更新。// todo

4.调度器

在先前react框架的解读中我们也发现调度器这个概念对于性能拔高来说十分的重要。可以理解为这是一个worker pool的manager。

回到start的上层,就是run函数,有接收apply的函数,这里是使用了schedule包。

for {
select {
case ap := <-s.r.apply():
f := schedule.NewJob("server_applyAll", func(context.Context) { s.applyAll(&ep, &ap) })
sched.Schedule(f)
case leases := <-expiredLeaseC:
s.revokeExpiredLeases(leases)
case err := <-s.errorc:
lg.Warn("server error", zap.Error(err))
lg.Warn("data-dir used by this member must be removed")
return
case <-s.stop:
return
}
}

然后找了找,破案了,就是个先进先出队列,没啥。

type fifo struct {
mu sync.Mutex

resume chan struct{}
scheduled int
finished int
pendings []Job

ctx context.Context
cancel context.CancelFunc

finishCond *sync.Cond
donec chan struct{}
lg *zap.Logger
}

5.其他功能实现

租约(lease)

抽取主要的部分

return &Lease{
ID: id,
ttl: ttl,
itemSet: make(map[LeaseItem]struct{}),
revokec: make(chan struct{}),
}

主要是一个ttl和一个key的set。然后key在操作的时候也会关联到这个lease然后检查ttl是否过期

监听(watch)

看server/proxy/grpcproxy/watch.go

每个客户端有一个独立的订阅流,由 watchProxyStream 管理。而每个节点有个watchProxy作为manager。

wps := &watchProxyStream{
ranges: wp.ranges,
watchers: make(map[int64]*watcher),
stream: stream,
watchCh: make(chan *pb.WatchResponse, 1024),
ctx: ctx,
cancel: cancel,
kv: wp.kv,
lg: wp.lg,
}

可见watchCh是真正的事件通知通道。

有个细节,watch只可以在leader存在时候投递。

lostLeaderC = wp.leader.lostNotify()

这一点值得学习,就是每个对象可以维持事件管道,这样其他关联了他的对象就可以拿到这个管道并且接收事件。比如这里就是wp关联了leader对象。

启动三个协程,分别负责监听,发送通和监听领导。

go func() {
defer func() { stopc <- struct{}{} }()
wps.recvLoop()
}()
go func() {
defer func() { stopc <- struct{}{} }()
wps.sendLoop()
}()
// tear down watch if leader goes down or entire watch proxy is terminated
go func() {
defer func() { stopc <- struct{}{} }()
select {
case <-lostLeaderC:
case <-ctx.Done():
case <-wp.ctx.Done():
}
}()

这个负责反馈给用户

// post puts a watch response on the watcher's proxy stream channel
func (w *watcher) post(wr *pb.WatchResponse) bool {
select {
case w.wps.watchCh <- wr:
case <-time.After(50 * time.Millisecond):
w.wps.cancel()
w.wps.lg.Error("failed to put a watch response on the watcher's proxy stream channel,err is timeout")
return false
}
return true
}

sendLoop直接发到订阅方

func (wps *watchProxyStream) sendLoop() {
for {
select {
case wresp, ok := <-wps.watchCh:
if !ok {
return
}
if err := wps.stream.Send(wresp); err != nil {
return
}
case <-wps.ctx.Done():
return
}
}
}

但是事件的源头呢?找了半天。

以put为例。一个quotaApplier

func (a *quotaApplierV3) Put(p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
ok := a.q.Available(p)
resp, trace, err := a.applierV3.Put(p)
if err == nil && !ok {
err = errors.ErrNoSpace
}
return resp, trace, err
}

往里面走,是一个applier。注意此处,applier代表着已经通过决议的commmit。mvcc完成这个操作。

func (a *applierV3backend) Put(p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
return mvcctxn.Put(context.TODO(), a.lg, a.lessor, a.kv, p)
}

最终这个put函数,在server/etcdserver/txn/txn.go。他执行了Write和End。

func Put(ctx context.Context, lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
trace = traceutil.Get(ctx)
// create put tracing if the trace in context is empty
if trace.IsEmpty() {
trace = traceutil.New("put",
lg,
traceutil.Field{Key: "key", Value: string(p.Key)},
traceutil.Field{Key: "req_size", Value: p.Size()},
)
ctx = context.WithValue(ctx, traceutil.TraceKey{}, trace)
}
leaseID := lease.LeaseID(p.Lease)
if leaseID != lease.NoLease {
if l := lessor.Lookup(leaseID); l == nil {
return nil, nil, lease.ErrLeaseNotFound
}
}
txnWrite := kv.Write(trace)
defer txnWrite.End()
resp, err = put(ctx, txnWrite, p)
return resp, trace, err
}

而在server/storage/mvcc/watchable_store_txn.go我们的End函数代表写结束,会发回一个notify

func (tw *watchableStoreTxnWrite) End() {
changes := tw.Changes()
if len(changes) == 0 {
tw.TxnWrite.End()
return
}

rev := tw.Rev() + 1
evs := make([]mvccpb.Event, len(changes))
for i, change := range changes {
evs[i].Kv = &changes[i]
if change.CreateRevision == 0 {
evs[i].Type = mvccpb.DELETE
evs[i].Kv.ModRevision = rev
} else {
evs[i].Type = mvccpb.PUT
}
}

// end write txn under watchable store lock so the updates are visible
// when asynchronous event posting checks the current store revision
tw.s.mu.Lock()
tw.s.notify(rev, evs)
tw.TxnWrite.End()
tw.s.mu.Unlock()
}
func (tw *watchableStoreTxnWrite) End() {
changes := tw.Changes()
if len(changes) == 0 {
tw.TxnWrite.End()
return
}

rev := tw.Rev() + 1
evs := make([]mvccpb.Event, len(changes))
for i, change := range changes {
evs[i].Kv = &changes[i]
if change.CreateRevision == 0 {
evs[i].Type = mvccpb.DELETE
evs[i].Kv.ModRevision = rev
} else {
evs[i].Type = mvccpb.PUT
}
}

// end write txn under watchable store lock so the updates are visible
// when asynchronous event posting checks the current store revision
tw.s.mu.Lock()
tw.s.notify(rev, evs)
tw.TxnWrite.End()
tw.s.mu.Unlock()
}

至此闭环。进入notidy->send函数,最终有一个这样的结构。

select {
case w.ch <- wr:
return true
default:
return false
}

他会发送给watcher的ch。至此,完美闭环。