metallb代码梳理-layer2模式
背景
本文主要梳理metallb layer2模式下ipv4的相关代码,旨在理解其工作流程。本文的代码分析以v0.13.10版本作为目标。
初始化
metallb部署后,如果使用的是layer2模式,那么核心起作用的有两个容器,一个是controller,一个是speaker。其中controller是单pod,speaker是daemonset部署的多pod。二者通过分工合作,各自负责一部分工作:
- The
controller
is in charge of assigning IPs to the services- the
speaker
s are in charge of announcing the services via L2 or BGP
controller是基于controller-runtime库写的控制器,主要监听了service的事件,当service变更时处理其lb ip。除此之外,还可以监听配置、ip池、node的状态变化:
// internal/k8s/listener.go:15
type Listener struct {
sync.Mutex
ServiceChanged func(log.Logger, string, *v1.Service, epslices.EpsOrSlices) controllers.SyncState
ConfigChanged func(log.Logger, *config.Config) controllers.SyncState
PoolChanged func(log.Logger, *config.Pools) controllers.SyncState
NodeChanged func(log.Logger, *v1.Node) controllers.SyncState
}
controller的入口在controller/main.go
,speaker的入口在speaker/main.go
核心逻辑
免费arp
免费arp的发送由Announce
结构体相关的方法完成,其位于internal/layer2/announcer.go:18
。我们可以看到这个结构的构造函数中,会运行两个协程:
// internal/layer2/announcer.go:35
func New(l log.Logger, excludeRegexp *regexp.Regexp) (*Announce, error) {
ret := &Announce{
logger: l,
nodeInterfaces: []string{},
arps: map[int]*arpResponder{},
ndps: map[int]*ndpResponder{},
ips: map[string][]IPAdvertisement{},
ipRefcnt: map[string]int{},
spamCh: make(chan IPAdvertisement, 1024),
excludeRegexp: excludeRegexp,
}
// 负责定时扫描网卡,对每个未被排除的网卡,建立一个arpResponder实例
// 后续arp的发送,将在所有未排除的网卡上进行
go ret.interfaceScan()
// 接收service到channel,调用arp的gratuitous方法调用
go ret.spamLoop()
return ret, nil
}
spamLoop这个协程基于channel、ticker,实现了对免费arp的有限次广播,既避免了单次发送可能丢包,又通过合理的时间间隔,降低网络拥塞,提高arp广播的成功率,且没有任务时,ticker还会自动停止工作,等待有任务时再启动。这个函数对于ticker、channel、map的结合使用,非常值得学习,也可以考虑将其抽取出来,封装成一个工具,实现可以自动休眠的有限次消费者。
// internal/layer2/announcer.go:156
func (a *Announce) spamLoop() {
// Map IP to spam stop time.
type timedSpam struct {
until time.Time
IPAdvertisement
}
m := map[string]timedSpam{}
// 建一个ticker,但是不需要其立即tick,只需要他存在,后续在和channel配合时动态调整tick周期
ticker := time.NewTicker(time.Hour)
ticker.Stop()
for {
select {
case s := <-a.spamCh:
if len(m) == 0 {
// See https://github.com/metallb/metallb/issues/172 for the 1100 choice.
ticker.Reset(1100 * time.Millisecond)
}
ipStr := s.ip.String()
_, ok := m[ipStr]
// 对于每个ip,设置5s的截止期,超过后,ticker会停止
m[ipStr] = timedSpam{time.Now().Add(5 * time.Second), s}
if !ok {
// Spam right away to avoid waiting up to 1100 milliseconds even if
// it means we call gratuitous() twice in a row in a short amount of time.
a.gratuitous(s)
}
case now := <-ticker.C:
for ipStr, tSpam := range m {
if now.After(tSpam.until) {
// We have spammed enough - remove the IP from the map.
delete(m, ipStr)
} else {
a.gratuitous(tSpam.IPAdvertisement)
}
}
if len(m) == 0 {
ticker.Stop()
}
}
}
}
上面的spamLoop是消费者,那么生产者在哪里呢?我们搜索Announce.spamCh这个channel的使用者,就可以找到下面的生产者代码
// internal/layer2/announcer.go:257
func (a *Announce) SetBalancer(name string, adv IPAdvertisement) {
defer a.doSpam(adv)
...
// 先维护lb ip的状态,最后通过defer调用doSpam方法,作为生产者向channel发送数据
}
如果我们继续向上追踪SetBalancer的调用,可以发现调用链的源头依然是控制器:
-
service变更事件
-
-->speaker.controller.SetBalancer
-
-->speaker.controller.handleService
-
-->layer2Controller.SetBalancer
-
-->Announce.SetBalancer
经过层层封装和逻辑,最终实现了将arp广播的信息作为生产对象,发往了channel里
arp reply
除了service变更事件引起的免费arp相关的操作,metallb是否会像openelb一样监听arp request然后发送arp reply呢?答案是会的。
在上面提到的启动spamLoop
协程时,前面还有一个协程被启动:interfaceScan
。从名字能猜出来,这个协程是在扫描node网卡的情况。为什么需要扫描网卡呢?我们查看此处代码的逻辑就会明白。
// internal/layer2/announcer.go:60
func (a *Announce) updateInterfaces() {
ifs, err := net.Interfaces()
for _, intf := range ifs {
// 遍历所有网卡
// 各种条件判断该网卡是否需要监听arp req,如果不需要,就continue
...
if keepARP[ifi.Index] && a.arps[ifi.Index] == nil {
// 对于筛选过后的网卡,为其初始化一个arpResponder实例
resp, err := newARPResponder(a.logger, &ifi, a.shouldAnnounce)
if err != nil {
level.Error(l).Log("op", "createARPResponder", "error", err, "msg", "failed to create ARP responder")
continue
}
// 每个网卡一个arpResponder实例,放到map里
a.arps[ifi.Index] = resp
level.Info(l).Log("event", "createARPResponder", "msg", "created ARP responder for interface")
}
...
}
...
}
在newARPResponder构造函数里,在实例化结构体后,立即以协程的方式运行了其run方法
func (a *arpResponder) run() {
for a.processRequest() != dropReasonClosed {
}
}
func (a *arpResponder) processRequest() dropReason {
// 监听arp相关帧
pkt, eth, err := a.conn.Read()
// 过滤不关心的arp帧
...
// 符合条件,需要reply的,发送reply
if err := a.conn.Reply(pkt, a.hardwareAddr, pkt.TargetIP); err != nil {
level.Error(a.logger).Log("op", "arpReply", "interface", a.intf, "ip", pkt.TargetIP, "senderIP", pkt.SenderIP, "senderMAC", pkt.SenderHardwareAddr, "responseMAC", a.hardwareAddr, "error", err, "msg", "failed to send ARP reply")
} else {
stats.SentResponse(pkt.TargetIP.String())
}
return dropReasonNone
}
选node
在openelb中,其使用了单进程模型,也就是控制器和speaker都在一个进程中,即使pod多副本,也会用leader election机制选出leader,最终只有一个pod会实际收发arp包
但是在metallb中架构有些区别,controller和speaker进程进行了解耦,speaker作为daemonset,每个node都会跑一个。那么最终流量是如何确定走哪个node进入集群的呢?
首先从比较直观的角度观察,service建好,lb ip分好后,在其中一个speaker的日志可以看到该speaker被分配了某个service,而其他speaker是不会分配到同样的service的。
这个日志在代码里是这样的写的
// speaker/main.go:339
func (c *controller) handleService(l log.Logger,
name string,
lbIPs []net.IP,
svc *v1.Service, pool *config.Pool,
eps epslices.EpsOrSlices,
protocol config.Proto) controllers.SyncState {
...
level.Info(l).Log("event", "serviceAnnounced", "msg", "service has IP, announcing", "protocol", protocol)
c.client.Infof(svc, "nodeAssigned", "announcing from node %q with protocol %q", c.myNode, protocol)
return controllers.SyncStateSuccess
}
说明在打印该日志前,选择node的逻辑已经走完了。经过对该日志上面代码的细细探究,终于找到了为service选择node的逻辑隐藏在了下面的函数内部,一开始从名字看还真没想到会在这里,看完代码明白了,其实也很合理。
因为用到了哈希一致性,保证service对node的唯一映射,所以对于任何一个service,一定只有一个speaker should announce,其他speaker should not announce,所以叫这个名字就很合理了。
// speaker/layer2_controller.go:98
func (c *layer2Controller) ShouldAnnounce(l log.Logger, name string, toAnnounce []net.IP, pool *config.Pool, svc *v1.Service, eps epslices.EpsOrSlices, nodes map[string]*v1.Node) string {
...
// Using the first IP should work for both single and dual stack.
ipString := toAnnounce[0].String()
// Sort the slice by the hash of node + load balancer ips. This
// produces an ordering of ready nodes that is unique to all the services
// with the same ip.
sort.Slice(availableNodes, func(i, j int) bool {
hi := sha256.Sum256([]byte(availableNodes[i] + "#" + ipString))
hj := sha256.Sum256([]byte(availableNodes[j] + "#" + ipString))
return bytes.Compare(hi[:], hj[:]) < 0
})
// Are we first in the list? If so, we win and should announce.
if len(availableNodes) > 0 && availableNodes[0] == c.myNode {
return ""
}
...
}
网卡过滤
updateInterfaces
每十秒执行一次的协程updateInterfaces中,会排除一些网卡,这些网卡的特点是,并非真实物理网卡,通常不会和lb service有关系,比如calico为pod建立的虚拟网卡calicoXXXX等。
对于这类网卡,updateInterfaces中完全不需要为其初始化newARPResponder的实例
因此该函数中用Announce.excludeRegexp去匹配遍历到的网卡名字,如果符合,就会跳过
excludeRegexp这个字段的值最终来源是configmap中存储的网卡匹配正则
由于该协程针对的是监听arp request,所以被configmap中规则排除掉的网卡,就不会再监听arp request,自然也不会发arp reply了
免费arp
service变更触发的免费arp广播,同样也可以选择网卡。当speaker确定自己需要对某lb ip发送免费arp时,会执行下面的函数:该函数针对给定的lb ip、node、和一组L2Advertisement CR,给出一个arp广播的结论(IPAdvertisement实例),该实例里包含了哪个lb ip,哪些网卡上发送这些最关键的信息。上面提到的生产者消费者,他们生产消费的数据结构就是IPAdvertisement实例。
由下面的函数可以看到,最终在哪些网卡上发送免费arp,是由L2Advertisement CR里的字段决定的
// speaker/layer2_controller.go:171
func ipAdvertisementFor(ip net.IP, localNode string, l2Advertisements []*config.L2Advertisement) layer2.IPAdvertisement {
ifs := sets.Set[string]{}
for _, l2 := range l2Advertisements {
if matchNode := l2.Nodes[localNode]; !matchNode {
continue
}
if l2.AllInterfaces {
return layer2.NewIPAdvertisement(ip, true, sets.Set[string]{})
}
ifs = ifs.Insert(l2.Interfaces...)
}
return layer2.NewIPAdvertisement(ip, false, ifs)
}
如何知道哪个speaker在代理arp
- describe service,看event:问题是event不持久化,会消失
- arp -n或ip neigh show:问题是不能在node上,必须有其他机器
- speaker log:问题是时间久了以后log可能被滚动掉
- 向社区提issue,是否可以在service注解中持久化?persist node allacated to some service, maybe by annotaion? · Issue #2125 · metallb/metallb (github.com)
参考
本作品采用 知识共享署名-相同方式共享 4.0 国际许可协议 进行许可。
还有种方式是看speaker的metrics。。
有道理,有被启发到