背景

本文主要梳理metallb layer2模式下ipv4的相关代码,旨在理解其工作流程。本文的代码分析以v0.13.10版本作为目标。

初始化

metallb部署后,如果使用的是layer2模式,那么核心起作用的有两个容器,一个是controller,一个是speaker。其中controller是单pod,speaker是daemonset部署的多pod。二者通过分工合作,各自负责一部分工作:

  • The controller is in charge of assigning IPs to the services
  • the speakers are in charge of announcing the services via L2 or BGP

controller是基于controller-runtime库写的控制器,主要监听了service的事件,当service变更时处理其lb ip。除此之外,还可以监听配置、ip池、node的状态变化:

// internal/k8s/listener.go:15
type Listener struct {
    sync.Mutex
    ServiceChanged func(log.Logger, string, *v1.Service, epslices.EpsOrSlices) controllers.SyncState
    ConfigChanged  func(log.Logger, *config.Config) controllers.SyncState
    PoolChanged    func(log.Logger, *config.Pools) controllers.SyncState
    NodeChanged    func(log.Logger, *v1.Node) controllers.SyncState
}

controller的入口在controller/main.go,speaker的入口在speaker/main.go

核心逻辑

免费arp

免费arp的发送由Announce结构体相关的方法完成,其位于internal/layer2/announcer.go:18。我们可以看到这个结构的构造函数中,会运行两个协程:

// internal/layer2/announcer.go:35
func New(l log.Logger, excludeRegexp *regexp.Regexp) (*Announce, error) {
    ret := &Announce{
        logger:         l,
        nodeInterfaces: []string{},
        arps:           map[int]*arpResponder{},
        ndps:           map[int]*ndpResponder{},
        ips:            map[string][]IPAdvertisement{},
        ipRefcnt:       map[string]int{},
        spamCh:         make(chan IPAdvertisement, 1024),
        excludeRegexp:  excludeRegexp,
    }

  // 负责定时扫描网卡,对每个未被排除的网卡,建立一个arpResponder实例
  // 后续arp的发送,将在所有未排除的网卡上进行
    go ret.interfaceScan()
  // 接收service到channel,调用arp的gratuitous方法调用
    go ret.spamLoop()

    return ret, nil
}

spamLoop这个协程基于channel、ticker,实现了对免费arp的有限次广播,既避免了单次发送可能丢包,又通过合理的时间间隔,降低网络拥塞,提高arp广播的成功率,且没有任务时,ticker还会自动停止工作,等待有任务时再启动。这个函数对于ticker、channel、map的结合使用,非常值得学习,也可以考虑将其抽取出来,封装成一个工具,实现可以自动休眠的有限次消费者。

// internal/layer2/announcer.go:156
func (a *Announce) spamLoop() {
    // Map IP to spam stop time.
    type timedSpam struct {
        until time.Time
        IPAdvertisement
    }
    m := map[string]timedSpam{}
    // 建一个ticker,但是不需要其立即tick,只需要他存在,后续在和channel配合时动态调整tick周期
    ticker := time.NewTicker(time.Hour)
    ticker.Stop()
    for {
        select {
        case s := <-a.spamCh:
            if len(m) == 0 {
                // See https://github.com/metallb/metallb/issues/172 for the 1100 choice.
                ticker.Reset(1100 * time.Millisecond)
            }
            ipStr := s.ip.String()
            _, ok := m[ipStr]
            // 对于每个ip,设置5s的截止期,超过后,ticker会停止
            m[ipStr] = timedSpam{time.Now().Add(5 * time.Second), s}
            if !ok {
                // Spam right away to avoid waiting up to 1100 milliseconds even if
                // it means we call gratuitous() twice in a row in a short amount of time.
                a.gratuitous(s)
            }
        case now := <-ticker.C:
            for ipStr, tSpam := range m {
                if now.After(tSpam.until) {
                    // We have spammed enough - remove the IP from the map.
                    delete(m, ipStr)
                } else {
                    a.gratuitous(tSpam.IPAdvertisement)
                }
            }
            if len(m) == 0 {
                ticker.Stop()
            }
        }
    }
}

上面的spamLoop是消费者,那么生产者在哪里呢?我们搜索Announce.spamCh这个channel的使用者,就可以找到下面的生产者代码

// internal/layer2/announcer.go:257
func (a *Announce) SetBalancer(name string, adv IPAdvertisement) {
    defer a.doSpam(adv)
        ...
  // 先维护lb ip的状态,最后通过defer调用doSpam方法,作为生产者向channel发送数据
}

如果我们继续向上追踪SetBalancer的调用,可以发现调用链的源头依然是控制器:

  • service变更事件

  • -->speaker.controller.SetBalancer

  • -->speaker.controller.handleService

  • -->layer2Controller.SetBalancer

  • -->Announce.SetBalancer

经过层层封装和逻辑,最终实现了将arp广播的信息作为生产对象,发往了channel里

arp reply

除了service变更事件引起的免费arp相关的操作,metallb是否会像openelb一样监听arp request然后发送arp reply呢?答案是会的。

在上面提到的启动spamLoop协程时,前面还有一个协程被启动:interfaceScan。从名字能猜出来,这个协程是在扫描node网卡的情况。为什么需要扫描网卡呢?我们查看此处代码的逻辑就会明白。

// internal/layer2/announcer.go:60
func (a *Announce) updateInterfaces() {
  ifs, err := net.Interfaces()
  for _, intf := range ifs {
    // 遍历所有网卡
    // 各种条件判断该网卡是否需要监听arp req,如果不需要,就continue
    ...
    if keepARP[ifi.Index] && a.arps[ifi.Index] == nil {
      // 对于筛选过后的网卡,为其初始化一个arpResponder实例
            resp, err := newARPResponder(a.logger, &ifi, a.shouldAnnounce)
            if err != nil {
                level.Error(l).Log("op", "createARPResponder", "error", err, "msg", "failed to create ARP responder")
                continue
            }
      // 每个网卡一个arpResponder实例,放到map里
            a.arps[ifi.Index] = resp
            level.Info(l).Log("event", "createARPResponder", "msg", "created ARP responder for interface")
        }
    ...
  }
  ...
}

在newARPResponder构造函数里,在实例化结构体后,立即以协程的方式运行了其run方法

func (a *arpResponder) run() {
    for a.processRequest() != dropReasonClosed {
    }
}

func (a *arpResponder) processRequest() dropReason {
  // 监听arp相关帧
    pkt, eth, err := a.conn.Read()
  // 过滤不关心的arp帧
  ...
  // 符合条件,需要reply的,发送reply
  if err := a.conn.Reply(pkt, a.hardwareAddr, pkt.TargetIP); err != nil {
        level.Error(a.logger).Log("op", "arpReply", "interface", a.intf, "ip", pkt.TargetIP, "senderIP", pkt.SenderIP, "senderMAC", pkt.SenderHardwareAddr, "responseMAC", a.hardwareAddr, "error", err, "msg", "failed to send ARP reply")
    } else {
        stats.SentResponse(pkt.TargetIP.String())
    }
    return dropReasonNone
}

选node

在openelb中,其使用了单进程模型,也就是控制器和speaker都在一个进程中,即使pod多副本,也会用leader election机制选出leader,最终只有一个pod会实际收发arp包

但是在metallb中架构有些区别,controller和speaker进程进行了解耦,speaker作为daemonset,每个node都会跑一个。那么最终流量是如何确定走哪个node进入集群的呢?

首先从比较直观的角度观察,service建好,lb ip分好后,在其中一个speaker的日志可以看到该speaker被分配了某个service,而其他speaker是不会分配到同样的service的。

这个日志在代码里是这样的写的

// speaker/main.go:339
func (c *controller) handleService(l log.Logger,
    name string,
    lbIPs []net.IP,
    svc *v1.Service, pool *config.Pool,
    eps epslices.EpsOrSlices,
  protocol config.Proto) controllers.SyncState {
  ...
  level.Info(l).Log("event", "serviceAnnounced", "msg", "service has IP, announcing", "protocol", protocol)
    c.client.Infof(svc, "nodeAssigned", "announcing from node %q with protocol %q", c.myNode, protocol)
    return controllers.SyncStateSuccess
}

说明在打印该日志前,选择node的逻辑已经走完了。经过对该日志上面代码的细细探究,终于找到了为service选择node的逻辑隐藏在了下面的函数内部,一开始从名字看还真没想到会在这里,看完代码明白了,其实也很合理。

因为用到了哈希一致性,保证service对node的唯一映射,所以对于任何一个service,一定只有一个speaker should announce,其他speaker should not announce,所以叫这个名字就很合理了。

// speaker/layer2_controller.go:98
func (c *layer2Controller) ShouldAnnounce(l log.Logger, name string, toAnnounce []net.IP, pool *config.Pool, svc *v1.Service, eps epslices.EpsOrSlices, nodes map[string]*v1.Node) string {
    ...
  // Using the first IP should work for both single and dual stack.
    ipString := toAnnounce[0].String()
    // Sort the slice by the hash of node + load balancer ips. This
    // produces an ordering of ready nodes that is unique to all the services
    // with the same ip.
    sort.Slice(availableNodes, func(i, j int) bool {
        hi := sha256.Sum256([]byte(availableNodes[i] + "#" + ipString))
        hj := sha256.Sum256([]byte(availableNodes[j] + "#" + ipString))

        return bytes.Compare(hi[:], hj[:]) < 0
    })

    // Are we first in the list? If so, we win and should announce.
    if len(availableNodes) > 0 && availableNodes[0] == c.myNode {
        return ""
    }
  ...
}

网卡过滤

updateInterfaces

每十秒执行一次的协程updateInterfaces中,会排除一些网卡,这些网卡的特点是,并非真实物理网卡,通常不会和lb service有关系,比如calico为pod建立的虚拟网卡calicoXXXX等。

对于这类网卡,updateInterfaces中完全不需要为其初始化newARPResponder的实例

因此该函数中用Announce.excludeRegexp去匹配遍历到的网卡名字,如果符合,就会跳过

excludeRegexp这个字段的值最终来源是configmap中存储的网卡匹配正则

由于该协程针对的是监听arp request,所以被configmap中规则排除掉的网卡,就不会再监听arp request,自然也不会发arp reply了

免费arp

service变更触发的免费arp广播,同样也可以选择网卡。当speaker确定自己需要对某lb ip发送免费arp时,会执行下面的函数:该函数针对给定的lb ip、node、和一组L2Advertisement CR,给出一个arp广播的结论(IPAdvertisement实例),该实例里包含了哪个lb ip,哪些网卡上发送这些最关键的信息。上面提到的生产者消费者,他们生产消费的数据结构就是IPAdvertisement实例。

由下面的函数可以看到,最终在哪些网卡上发送免费arp,是由L2Advertisement CR里的字段决定的

// speaker/layer2_controller.go:171
func ipAdvertisementFor(ip net.IP, localNode string, l2Advertisements []*config.L2Advertisement) layer2.IPAdvertisement {
  ifs := sets.Set[string]{}
    for _, l2 := range l2Advertisements {
        if matchNode := l2.Nodes[localNode]; !matchNode {
            continue
        }
        if l2.AllInterfaces {
            return layer2.NewIPAdvertisement(ip, true, sets.Set[string]{})
        }
        ifs = ifs.Insert(l2.Interfaces...)
    }
    return layer2.NewIPAdvertisement(ip, false, ifs)
}

如何知道哪个speaker在代理arp

  1. describe service,看event:问题是event不持久化,会消失
  2. arp -n或ip neigh show:问题是不能在node上,必须有其他机器
  3. speaker log:问题是时间久了以后log可能被滚动掉
  4. 向社区提issue,是否可以在service注解中持久化?persist node allacated to some service, maybe by annotaion? · Issue #2125 · metallb/metallb (github.com)

参考

文章目录