momo's Blog.

kube-proxy 源码分析

字数统计: 5.8k阅读时长: 33 min
2024/11/18 Share

从 kube-proxy 看 service 机制

代码根据最新版本 release-1.30

kube-proxy 启动流程

1
2
3
4
5
6
//cmd/kube-proxy/proxy.go
func main() {
command := app.NewProxyCommand()
code := cli.Run(command)
os.Exit(code)
}

此为 kube-proxy 的入口函数,app.NewProxyCommand() 返回一个 cobra.Command 对象,cli.Run(command) 运行这个命令。

1
2
3
4
5
6
7
8
9
//cmd/kube-proxy/app/server.go
func NewProxyCommand() *cobra.Command {
// 创建一个新的Options对象, Options对象包含了所有运行代理服务器所需的默认参数
opts := NewOptions()
...
// cmd/kube-proxy/app/server.go:542
if err := opts.Complete(cmd.Flags()); err != nil {
return fmt.Errorf("failed complete: %w", err)
}

NewOptions() 返回一个 Options 对象,Complete() 方法完成了命令行参数的解析, 并且初始化了 kube-proxy 的默认配置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// cmd/kube-proxy/app/server.go:374
func (o *Options) Run() error {
defer close(o.errCh)
if len(o.WriteConfigTo) > 0 {
return o.writeConfigFile()
}

err := platformCleanup(o.config.Mode, o.CleanupAndExit)
if o.CleanupAndExit {
return err
}
// We ignore err otherwise; the cleanup is best-effort, and the backends will have
// logged messages if they failed in interesting ways.

proxyServer, err := newProxyServer(o.logger, o.config, o.master, o.InitAndExit)
if err != nil {
return err
}
if o.InitAndExit {
return nil
}

o.proxyServer = proxyServer
return o.runLoop()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//cmd/kube-proxy/app/server.go:606
func newProxyServer(logger klog.Logger, config *kubeproxyconfig.KubeProxyConfiguration, master string, initOnly bool) (*ProxyServer, error) {
s := &ProxyServer{
Config: config,
logger: logger,
}

...
// 创建客户端
s.Client, err = createClient(logger, config.ClientConnection, master)
if err != nil {
return nil, err
}

rawNodeIPs := getNodeIPs(logger, s.Client, s.Hostname)
s.PrimaryIPFamily, s.NodeIPs = detectNodeIPs(logger, rawNodeIPs, config.BindAddress)

// 事件广播
s.Broadcaster = events.NewBroadcaster(&events.EventSinkImpl{Interface: s.Client.EventsV1()})
s.Recorder = s.Broadcaster.NewRecorder(proxyconfigscheme.Scheme, "kube-proxy")

// 创建对应的 proxier
s.Proxier, err = s.createProxier(config, dualStackSupported, initOnly)

newProxyServer() 方法主要负责初始化,创建 k8s 客户端 cli 配置,获取节点 IP,创建事件广播器,创建 ProxyServer.

ProxyServer 结构体定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// cmd/kube-proxy/app/server.go:586
type ProxyServer struct {
Config *kubeproxyconfig.KubeProxyConfiguration

Client clientset.Interface
Broadcaster events.EventBroadcaster
Recorder events.EventRecorder
NodeRef *v1.ObjectReference
HealthzServer *healthcheck.ProxierHealthServer
Hostname string
PrimaryIPFamily v1.IPFamily
NodeIPs map[v1.IPFamily]net.IP

podCIDRs []string // only used for LocalModeNodeCIDR

Proxier proxy.Provider

logger klog.Logger
}

其中 Proxier proxy.Provider 是 kube-proxy 的核心,目前有两种实现:iptables 和 ipvs。

createProxier方法中,主要是根据配置选择创建那种类型的 proxier。比如: iptables 或者 ipvs。是否是双栈网络等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// cmd/kube-proxy/app/server_linux.go:158
proxier, err = iptables.NewProxier(
s.PrimaryIPFamily,
iptInterface,
utilsysctl.New(),
exec.New(),
config.IPTables.SyncPeriod.Duration,
config.IPTables.MinSyncPeriod.Duration,
config.IPTables.MasqueradeAll,
*config.IPTables.LocalhostNodePorts,
int(*config.IPTables.MasqueradeBit),
localDetector,
s.Hostname,
s.NodeIPs[s.PrimaryIPFamily],
s.Recorder,
s.HealthzServer,
config.NodePortAddresses,
initOnly,
)

Proxier 需要实现 Provider 接口,这个接口定义了 Proxier 的基本启动,主要是 Sync()SyncLoop() 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
type Provider interface {
config.EndpointSliceHandler
config.ServiceHandler
config.NodeHandler
config.ServiceCIDRHandler

// Sync immediately synchronizes the Provider's current state to proxy rules.
Sync()
// SyncLoop runs periodic work.
// This is expected to run as a goroutine or as the main loop of the app.
// It does not return.
SyncLoop()
}

Run() 中,主要是创建 proxier,然后调用 runLoop() 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (o *Options) runLoop() error {
if o.watcher != nil {
o.watcher.Run()
}

// run the proxy in goroutine
go func() {
err := o.proxyServer.Run()
o.errCh <- err
}()

for {
err := <-o.errCh
if err != nil {
return err
}
}
}

最终执行为 ProxyServer.Run() 方法。

1
2
// cmd/kube-proxy/app/server.go:873
func (s *ProxyServer) Run() error

kube-proxy 事件监听

ProxyServer.Run() 方法中,k8s 通过 informer 机制监听 service 和 endpoint 的变化。

informer 就不展开了,网上有很多资料。

1
2
3
4
5
6
7
8
9
10
11
12
13
serviceConfig := config.NewServiceConfig(informerFactory.Core().V1().Services(), s.Config.ConfigSyncPeriod.Duration)
serviceConfig.RegisterEventHandler(s.Proxier)
go serviceConfig.Run(wait.NeverStop)

endpointSliceConfig := config.NewEndpointSliceConfig(informerFactory.Discovery().V1().EndpointSlices(), s.Config.ConfigSyncPeriod.Duration)
endpointSliceConfig.RegisterEventHandler(s.Proxier)
go endpointSliceConfig.Run(wait.NeverStop)

if utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) {
serviceCIDRConfig := config.NewServiceCIDRConfig(informerFactory.Networking().V1alpha1().ServiceCIDRs(), s.Config.ConfigSyncPeriod.Duration)
serviceCIDRConfig.RegisterEventHandler(s.Proxier)
go serviceCIDRConfig.Run(wait.NeverStop)
}

serviceConfig

首先看一下 NewServiceConfig() 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// pkg/proxy/config/config.go:168
func NewServiceConfig(serviceInformer v1informers.ServiceInformer, resyncPeriod time.Duration) *ServiceConfig {
result := &ServiceConfig{}

handlerRegistration, _ := serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: result.handleAddService,
UpdateFunc: result.handleUpdateService,
DeleteFunc: result.handleDeleteService,
},
resyncPeriod,
)

result.listerSynced = handlerRegistration.HasSynced

return result
}

func (c *ServiceConfig) RegisterEventHandler(handler ServiceHandler) {
c.eventHandlers = append(c.eventHandlers, handler)
}

func (c *ServiceConfig) handleAddService(obj interface{}) {
service, ok := obj.(*v1.Service)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
return
}
for i := range c.eventHandlers {
klog.V(4).InfoS("Calling handler.OnServiceAdd")
c.eventHandlers[i].OnServiceAdd(service)
}
}

方法初始化中,通过 serviceInformer.Informer().AddEventHandlerWithResyncPeriod() 注册了三个事件处理函数。 并且通过 RegisterEventHandler() 方法注册了 Proxier 作为事件处理器。 Proxier (iptables, lvs)实现了 ServiceHandler 接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type ServiceHandler interface {
// OnServiceAdd is called whenever creation of new service object
// is observed.
OnServiceAdd(service *v1.Service)
// OnServiceUpdate is called whenever modification of an existing
// service object is observed.
OnServiceUpdate(oldService, service *v1.Service)
// OnServiceDelete is called whenever deletion of an existing service
// object is observed.
OnServiceDelete(service *v1.Service)
// OnServiceSynced is called once all the initial event handlers were
// called and the state is fully propagated to local cache.
OnServiceSynced()
}

iptables 实现的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// pkg/proxy/iptables/proxier.go:539
// OnServiceAdd is called whenever creation of new service object
// is observed.
func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
proxier.OnServiceUpdate(nil, service)
}

// OnServiceUpdate is called whenever modification of an existing
// service object is observed.
func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() {
proxier.Sync()
}
}

// OnServiceDelete is called whenever deletion of an existing service
// object is observed.
func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
proxier.OnServiceUpdate(service, nil)

}

通过事件监听,当 service 对象发生变化时,会调用 ProxierOnServiceAddOnServiceUpdateOnServiceDelete 方法。

Iptables Proxier

iptables proxier 结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
// pkg/proxy/iptables/proxier.go:138
// Proxier is an iptables based proxy for connections between a localhost:lport
// and services that provide the actual backends.
type Proxier struct {
// ipFamily defines the IP family which this proxier is tracking.
ipFamily v1.IPFamily

// endpointsChanges and serviceChanges contains all changes to endpoints and
// services that happened since iptables was synced. For a single object,
// changes are accumulated, i.e. previous is state from before all of them,
// current is state after applying all of those.
endpointsChanges *proxy.EndpointsChangeTracker
serviceChanges *proxy.ServiceChangeTracker

mu sync.Mutex // protects the following fields
svcPortMap proxy.ServicePortMap
endpointsMap proxy.EndpointsMap
nodeLabels map[string]string
// endpointSlicesSynced, and servicesSynced are set to true
// when corresponding objects are synced after startup. This is used to avoid
// updating iptables with some partial data after kube-proxy restart.
endpointSlicesSynced bool
servicesSynced bool
needFullSync bool
initialized int32
syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules
syncPeriod time.Duration
lastIPTablesCleanup time.Time

// These are effectively const and do not need the mutex to be held.
iptables utiliptables.Interface
masqueradeAll bool
masqueradeMark string
conntrack conntrack.Interface
localDetector proxyutiliptables.LocalTrafficDetector
hostname string
nodeIP net.IP
recorder events.EventRecorder

serviceHealthServer healthcheck.ServiceHealthServer
healthzServer *healthcheck.ProxierHealthServer

// Since converting probabilities (floats) to strings is expensive
// and we are using only probabilities in the format of 1/n, we are
// precomputing some number of those and cache for future reuse.
precomputedProbabilities []string

// The following buffers are used to reuse memory and avoid allocations
// that are significantly impacting performance.
iptablesData *bytes.Buffer
existingFilterChainsData *bytes.Buffer
filterChains proxyutil.LineBuffer
filterRules proxyutil.LineBuffer
natChains proxyutil.LineBuffer
natRules proxyutil.LineBuffer

// largeClusterMode is set at the beginning of syncProxyRules if we are
// going to end up outputting "lots" of iptables rules and so we need to
// optimize for performance over debuggability.
largeClusterMode bool

// localhostNodePorts indicates whether we allow NodePort services to be accessed
// via localhost.
localhostNodePorts bool

// conntrackTCPLiberal indicates whether the system sets the kernel nf_conntrack_tcp_be_liberal
conntrackTCPLiberal bool

// nodePortAddresses selects the interfaces where nodePort works.
nodePortAddresses *proxyutil.NodePortAddresses
// networkInterfacer defines an interface for several net library functions.
// Inject for test purpose.
networkInterfacer proxyutil.NetworkInterfacer
}

serviceChangeTracker

iptables 的 serviceChanges 是一个 serviceChangeTracker 结构体,用于记录 service 的变化。并且维护了一个 serviceMap,记录了 service 的信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// pkg/proxy/servicechangetracker.go:35
type ServiceChangeTracker struct {
// lock protects items.
lock sync.Mutex
// items maps a service to its serviceChange.
items map[types.NamespacedName]*serviceChange

// makeServiceInfo allows the proxier to inject customized information when
// processing services.
makeServiceInfo makeServicePortFunc
// processServiceMapChange is invoked by the apply function on every change. This
// function should not modify the ServicePortMaps, but just use the changes for
// any Proxier-specific cleanup.
processServiceMapChange processServiceMapChangeFunc

ipFamily v1.IPFamily
recorder events.EventRecorder
}

如下代码,当监听到 service 变化时,会调用 serviceChanges.Update(oldService, service) 更新到 ServiceMap.

1
2
3
4
5
6
// pkg/proxy/iptables/proxier.go:545~549
func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() {
proxier.Sync()
}
}

endpointsChangeTracker

同理上述,endpointsChangeTracker 用于记录 endpoints 的变化。

syncRunner

syncRunner 的结构体如下:
该模块实现了用户方法的异步执行,速率限制,重试机制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// pkg/util/async/bounded_frequency_runner.go:31
type BoundedFrequencyRunner struct {
name string // the name of this instance
minInterval time.Duration // the min time between runs, modulo bursts
maxInterval time.Duration // the max time between runs

run chan struct{} // try an async run

mu sync.Mutex // guards runs of fn and all mutations
fn func() // function to run
lastRun time.Time // time of last run
timer timer // timer for deferred runs
limiter rateLimiter // rate limiter for on-demand runs

retry chan struct{} // schedule a retry
retryMu sync.Mutex // guards retryTime
retryTime time.Time // when to retry
}

在 proxier 主循环 SyncLoop() 中, 通过 syncRunner 控制函数执行周期。

newProxier() 方法中初始化 syncRunner。

1
2
// pkg/proxy/iptables/proxier.go:306
proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, time.Hour, burstSyncs)
1
2
3
4
5
6
7
8
9
10
11
12
// iptables 和 lvs 都实现了 `SyncLoop()` 方法。
// pkg/proxy/iptables/proxier.go:514
func (proxier *Proxier) SyncLoop() {
// Update healthz timestamp at beginning in case Sync() never succeeds.
if proxier.healthzServer != nil {
proxier.healthzServer.Updated(proxier.ipFamily)
}

// synthesize "last change queued" time as the informers are syncing.
metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime()
proxier.syncRunner.Loop(wait.NeverStop)
}

iptables proxier 同步规则

上述可以看到,规则同步,周期性的调用了 syncProxyRules() 方法。 看一下几个关键的代码片段。 k8s 会默认创建如下链。
这些链在 iptables 默认规则链中插入了 k8s 自定义的链,用于处理 service 的流量。比如:

1
2
3
4
// -A PREROUTING -m comment --comment "kubernetes service portals" -j KUBE-SERVICES
// -A OUTPUT -m comment --comment "kubernetes service portals" -j KUBE-SERVICES
{utiliptables.TableNAT, kubeServicesChain, utiliptables.ChainOutput, "kubernetes service portals", nil},
{utiliptables.TableNAT, kubeServicesChain, utiliptables.ChainPrerouting, "kubernetes service portals", nil},

这些规则将流量重定向到 k8s 自定义的链,然后根据 service 的类型,将流量转发到对应的 service 端点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// pkg/proxy/iptables/proxier.go:780
const (
// the services chain
kubeServicesChain utiliptables.Chain = "KUBE-SERVICES"

// the external services chain
kubeExternalServicesChain utiliptables.Chain = "KUBE-EXTERNAL-SERVICES"

// the nodeports chain
kubeNodePortsChain utiliptables.Chain = "KUBE-NODEPORTS"

// the kubernetes postrouting chain
kubePostroutingChain utiliptables.Chain = "KUBE-POSTROUTING"

// kubeMarkMasqChain is the mark-for-masquerade chain
kubeMarkMasqChain utiliptables.Chain = "KUBE-MARK-MASQ"

// the kubernetes forward chain
kubeForwardChain utiliptables.Chain = "KUBE-FORWARD"

// kubeProxyFirewallChain is the kube-proxy firewall chain
kubeProxyFirewallChain utiliptables.Chain = "KUBE-PROXY-FIREWALL"

// kube proxy canary chain is used for monitoring rule reload
kubeProxyCanaryChain utiliptables.Chain = "KUBE-PROXY-CANARY"

// kubeletFirewallChain is a duplicate of kubelet's firewall containing
// the anti-martian-packet rule. It should not be used for any other
// rules.
kubeletFirewallChain utiliptables.Chain = "KUBE-FIREWALL"

// largeClusterEndpointsThreshold is the number of endpoints at which
// we switch into "large cluster mode" and optimize for iptables
// performance over iptables debuggability
largeClusterEndpointsThreshold = 1000
)

func (proxier *Proxier) syncProxyRules() {
...
// 创建默认的iptables链, iptables-restore语法格式
for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain, kubeNodePortsChain, kubeProxyFirewallChain} {
proxier.filterChains.Write(utiliptables.MakeChainLine(chainName))
}
for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain, kubeMarkMasqChain} {
proxier.natChains.Write(utiliptables.MakeChainLine(chainName))
}
}

iptables chain 名称计算方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func portProtoHash(servicePortName string, protocol string) string {
hash := sha256.Sum256([]byte(servicePortName + protocol))
encoded := base32.StdEncoding.EncodeToString(hash[:])
return encoded[:16]
}

const (
servicePortPolicyClusterChainNamePrefix = "KUBE-SVC-"
servicePortPolicyLocalChainNamePrefix = "KUBE-SVL-"
serviceFirewallChainNamePrefix = "KUBE-FW-"
serviceExternalChainNamePrefix = "KUBE-EXT-"
servicePortEndpointChainNamePrefix = "KUBE-SEP-"
)

func servicePortPolicyClusterChain(servicePortName string, protocol string) utiliptables.Chain {
return utiliptables.Chain(servicePortPolicyClusterChainNamePrefix + portProtoHash(servicePortName, protocol))
}

// servicePortPolicyLocalChainName returns the name of the KUBE-SVL-XXXX chain for a service, which
// handles dispatching to local endpoints when using `Local` traffic policy. This chain only
// exists if the service has `Local` internal or external traffic policy.
func servicePortPolicyLocalChainName(servicePortName string, protocol string) utiliptables.Chain {
return utiliptables.Chain(servicePortPolicyLocalChainNamePrefix + portProtoHash(servicePortName, protocol))
}

// serviceFirewallChainName returns the name of the KUBE-FW-XXXX chain for a service, which
// is used to implement the filtering for the LoadBalancerSourceRanges feature.
func serviceFirewallChainName(servicePortName string, protocol string) utiliptables.Chain {
return utiliptables.Chain(serviceFirewallChainNamePrefix + portProtoHash(servicePortName, protocol))
}

// serviceExternalChainName returns the name of the KUBE-EXT-XXXX chain for a service, which
// implements "short-circuiting" for internally-originated external-destination traffic when using
// `Local` external traffic policy. It forwards traffic from local sources to the KUBE-SVC-XXXX
// chain and traffic from external sources to the KUBE-SVL-XXXX chain.
func serviceExternalChainName(servicePortName string, protocol string) utiliptables.Chain {
return utiliptables.Chain(serviceExternalChainNamePrefix + portProtoHash(servicePortName, protocol))
}

我们看一下集群里的 service, 例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
apiVersion: v1
kind: Service
metadata:
annotations:
prometheus.io/port: "9153"
prometheus.io/scrape: "true"
creationTimestamp: "2024-11-18T08:51:30Z"
labels:
addonmanager.kubernetes.io/mode: Reconcile
k8s-app: kube-dns
kubernetes.io/cluster-service: "true"
kubernetes.io/name: CoreDNS
name: kube-dns
namespace: kube-system
resourceVersion: "214"
uid: 4ccce55c-657e-4bc1-8961-284afc6f3be2
spec:
clusterIP: 10.0.0.10
clusterIPs:
- 10.0.0.10
internalTrafficPolicy: Cluster
ipFamilies:
- IPv4
ipFamilyPolicy: SingleStack
ports:
- name: dns
port: 53
protocol: UDP
targetPort: 53
- name: dns-tcp
port: 53
protocol: TCP
targetPort: 53
- name: metrics
port: 9153
protocol: TCP
targetPort: 9153
selector:
k8s-app: kube-dns
sessionAffinity: None
type: ClusterIP
status:
loadBalancer: {}

拼接格式为 {ns}/{svc}:{portname} + protocol
那么,名字为 dns 的端口最终为:

1
2
echo -n "kube-system/kube-dns:dnsudp" | openssl dgst -sha256 -binary | base32 | head -c 16
TCOU7JCQXEZGVUNU

在 iptables 中,我们查看 NAT 表:

1
2
3
4
iptables -t nat -L KUBE-SVC-TCOU7JCQXEZGVUNU  -n -v
Chain KUBE-SVC-TCOU7JCQXEZGVUNU (1 references)
pkts bytes target prot opt in out source destination
0 0 KUBE-SEP-4KGNQC7RDYAIS7HY 0 -- * * 0.0.0.0/0 0.0.0.0/0 /* kube-system/kube-dns:dns -> 10.88.0.26:53 */

下方为循环为每个 port 创建 iptables 规则的代码片段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
// 关键代码, 生成iptables规则
for svcName, svc := range proxier.svcPortMap {
svcInfo, ok := svc.(*servicePortInfo)
if !ok {
klog.ErrorS(nil, "Failed to cast serviceInfo", "serviceName", svcName)
continue
}
protocol := strings.ToLower(string(svcInfo.Protocol()))
svcPortNameString := svcInfo.nameString

// Figure out the endpoints for Cluster and Local traffic policy.
// allLocallyReachableEndpoints is the set of all endpoints that can be routed to
// from this node, given the service's traffic policies. hasEndpoints is true
// if the service has any usable endpoints on any node, not just this one.
allEndpoints := proxier.endpointsMap[svcName]
clusterEndpoints, localEndpoints, allLocallyReachableEndpoints, hasEndpoints := proxy.CategorizeEndpoints(allEndpoints, svcInfo, proxier.nodeLabels)

// clusterPolicyChain contains the endpoints used with "Cluster" traffic policy
clusterPolicyChain := svcInfo.clusterPolicyChainName
klog.V(2).InfoS("Syncing service", "service", svcName, "clusterPolicyChain", clusterPolicyChain)
usesClusterPolicyChain := len(clusterEndpoints) > 0 && svcInfo.UsesClusterEndpoints()

// localPolicyChain contains the endpoints used with "Local" traffic policy
localPolicyChain := svcInfo.localPolicyChainName
usesLocalPolicyChain := len(localEndpoints) > 0 && svcInfo.UsesLocalEndpoints()

// internalPolicyChain is the chain containing the endpoints for
// "internal" (ClusterIP) traffic. internalTrafficChain is the chain that
// internal traffic is routed to (which is always the same as
// internalPolicyChain). hasInternalEndpoints is true if we should
// generate rules pointing to internalTrafficChain, or false if there are
// no available internal endpoints.
internalPolicyChain := clusterPolicyChain
hasInternalEndpoints := hasEndpoints
if svcInfo.InternalPolicyLocal() {
internalPolicyChain = localPolicyChain
if len(localEndpoints) == 0 {
hasInternalEndpoints = false
}
}
internalTrafficChain := internalPolicyChain

// Similarly, externalPolicyChain is the chain containing the endpoints
// for "external" (NodePort, LoadBalancer, and ExternalIP) traffic.
// externalTrafficChain is the chain that external traffic is routed to
// (which is always the service's "EXT" chain). hasExternalEndpoints is
// true if there are endpoints that will be reached by external traffic.
// (But we may still have to generate externalTrafficChain even if there
// are no external endpoints, to ensure that the short-circuit rules for
// local traffic are set up.)
externalPolicyChain := clusterPolicyChain
hasExternalEndpoints := hasEndpoints
if svcInfo.ExternalPolicyLocal() {
externalPolicyChain = localPolicyChain
if len(localEndpoints) == 0 {
hasExternalEndpoints = false
}
}
externalTrafficChain := svcInfo.externalChainName // eventually jumps to externalPolicyChain

// usesExternalTrafficChain is based on hasEndpoints, not hasExternalEndpoints,
// because we need the local-traffic-short-circuiting rules even when there
// are no externally-usable endpoints.
usesExternalTrafficChain := hasEndpoints && svcInfo.ExternallyAccessible()

// Traffic to LoadBalancer IPs can go directly to externalTrafficChain
// unless LoadBalancerSourceRanges is in use in which case we will
// create a firewall chain.
loadBalancerTrafficChain := externalTrafficChain
fwChain := svcInfo.firewallChainName
usesFWChain := hasEndpoints && len(svcInfo.LoadBalancerVIPs()) > 0 && len(svcInfo.LoadBalancerSourceRanges()) > 0
if usesFWChain {
loadBalancerTrafficChain = fwChain
}

var internalTrafficFilterTarget, internalTrafficFilterComment string
var externalTrafficFilterTarget, externalTrafficFilterComment string
if !hasEndpoints {
// The service has no endpoints at all; hasInternalEndpoints and
// hasExternalEndpoints will also be false, and we will not
// generate any chains in the "nat" table for the service; only
// rules in the "filter" table rejecting incoming packets for
// the service's IPs.
internalTrafficFilterTarget = "REJECT"
internalTrafficFilterComment = fmt.Sprintf(`"%s has no endpoints"`, svcPortNameString)
externalTrafficFilterTarget = "REJECT"
externalTrafficFilterComment = internalTrafficFilterComment
} else {
if !hasInternalEndpoints {
// The internalTrafficPolicy is "Local" but there are no local
// endpoints. Traffic to the clusterIP will be dropped, but
// external traffic may still be accepted.
internalTrafficFilterTarget = "DROP"
internalTrafficFilterComment = fmt.Sprintf(`"%s has no local endpoints"`, svcPortNameString)
serviceNoLocalEndpointsTotalInternal++
}
if !hasExternalEndpoints {
// The externalTrafficPolicy is "Local" but there are no
// local endpoints. Traffic to "external" IPs from outside
// the cluster will be dropped, but traffic from inside
// the cluster may still be accepted.
externalTrafficFilterTarget = "DROP"
externalTrafficFilterComment = fmt.Sprintf(`"%s has no local endpoints"`, svcPortNameString)
serviceNoLocalEndpointsTotalExternal++
}
}

filterRules := proxier.filterRules
natChains := proxier.natChains
natRules := proxier.natRules

// Capture the clusterIP.
if hasInternalEndpoints {
natRules.Write(
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcPortNameString),
"-m", protocol, "-p", protocol,
"-d", svcInfo.ClusterIP().String(),
"--dport", strconv.Itoa(svcInfo.Port()),
"-j", string(internalTrafficChain))
} else {
// No endpoints.
filterRules.Write(
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", internalTrafficFilterComment,
"-m", protocol, "-p", protocol,
"-d", svcInfo.ClusterIP().String(),
"--dport", strconv.Itoa(svcInfo.Port()),
"-j", internalTrafficFilterTarget,
)
}

// Capture externalIPs.
for _, externalIP := range svcInfo.ExternalIPs() {
if hasEndpoints {
// Send traffic bound for external IPs to the "external
// destinations" chain.
natRules.Write(
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcPortNameString),
"-m", protocol, "-p", protocol,
"-d", externalIP.String(),
"--dport", strconv.Itoa(svcInfo.Port()),
"-j", string(externalTrafficChain))
}
if !hasExternalEndpoints {
// Either no endpoints at all (REJECT) or no endpoints for
// external traffic (DROP anything that didn't get
// short-circuited by the EXT chain.)
filterRules.Write(
"-A", string(kubeExternalServicesChain),
"-m", "comment", "--comment", externalTrafficFilterComment,
"-m", protocol, "-p", protocol,
"-d", externalIP.String(),
"--dport", strconv.Itoa(svcInfo.Port()),
"-j", externalTrafficFilterTarget,
)
}
}

// Capture load-balancer ingress.
for _, lbip := range svcInfo.LoadBalancerVIPs() {
if hasEndpoints {
natRules.Write(
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcPortNameString),
"-m", protocol, "-p", protocol,
"-d", lbip.String(),
"--dport", strconv.Itoa(svcInfo.Port()),
"-j", string(loadBalancerTrafficChain))

}
if usesFWChain {
filterRules.Write(
"-A", string(kubeProxyFirewallChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s traffic not accepted by %s"`, svcPortNameString, svcInfo.firewallChainName),
"-m", protocol, "-p", protocol,
"-d", lbip.String(),
"--dport", strconv.Itoa(svcInfo.Port()),
"-j", "DROP")
}
}
if !hasExternalEndpoints {
// Either no endpoints at all (REJECT) or no endpoints for
// external traffic (DROP anything that didn't get short-circuited
// by the EXT chain.)
for _, lbip := range svcInfo.LoadBalancerVIPs() {
filterRules.Write(
"-A", string(kubeExternalServicesChain),
"-m", "comment", "--comment", externalTrafficFilterComment,
"-m", protocol, "-p", protocol,
"-d", lbip.String(),
"--dport", strconv.Itoa(svcInfo.Port()),
"-j", externalTrafficFilterTarget,
)
}
}

// Capture nodeports.
if svcInfo.NodePort() != 0 {
if hasEndpoints {
// Jump to the external destination chain. For better or for
// worse, nodeports are not subect to loadBalancerSourceRanges,
// and we can't change that.
natRules.Write(
"-A", string(kubeNodePortsChain),
"-m", "comment", "--comment", svcPortNameString,
"-m", protocol, "-p", protocol,
"--dport", strconv.Itoa(svcInfo.NodePort()),
"-j", string(externalTrafficChain))
}
if !hasExternalEndpoints {
// Either no endpoints at all (REJECT) or no endpoints for
// external traffic (DROP anything that didn't get
// short-circuited by the EXT chain.)
filterRules.Write(
"-A", string(kubeExternalServicesChain),
"-m", "comment", "--comment", externalTrafficFilterComment,
"-m", "addrtype", "--dst-type", "LOCAL",
"-m", protocol, "-p", protocol,
"--dport", strconv.Itoa(svcInfo.NodePort()),
"-j", externalTrafficFilterTarget,
)
}
}

// Capture healthCheckNodePorts.
if svcInfo.HealthCheckNodePort() != 0 {
// no matter if node has local endpoints, healthCheckNodePorts
// need to add a rule to accept the incoming connection
filterRules.Write(
"-A", string(kubeNodePortsChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s health check node port"`, svcPortNameString),
"-m", "tcp", "-p", "tcp",
"--dport", strconv.Itoa(svcInfo.HealthCheckNodePort()),
"-j", "ACCEPT",
)
}

// If the SVC/SVL/EXT/FW/SEP chains have not changed since the last sync
// then we can omit them from the restore input. However, we have to still
// figure out how many chains we _would_ have written, to make the metrics
// come out right, so we just compute them and throw them away.
if tryPartialSync && !serviceUpdateResult.UpdatedServices.Has(svcName.NamespacedName) && !endpointUpdateResult.UpdatedServices.Has(svcName.NamespacedName) {
natChains = skippedNatChains
natRules = skippedNatRules
}

// Set up internal traffic handling.
if hasInternalEndpoints {
args = append(args[:0],
"-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcPortNameString),
"-m", protocol, "-p", protocol,
"-d", svcInfo.ClusterIP().String(),
"--dport", strconv.Itoa(svcInfo.Port()),
)
if proxier.masqueradeAll {
natRules.Write(
"-A", string(internalTrafficChain),
args,
"-j", string(kubeMarkMasqChain))
} else if proxier.localDetector.IsImplemented() {
// This masquerades off-cluster traffic to a service VIP. The
// idea is that you can establish a static route for your
// Service range, routing to any node, and that node will
// bridge into the Service for you. Since that might bounce
// off-node, we masquerade here.
natRules.Write(
"-A", string(internalTrafficChain),
args,
proxier.localDetector.IfNotLocal(),
"-j", string(kubeMarkMasqChain))
}
}

// Set up external traffic handling (if any "external" destinations are
// enabled). All captured traffic for all external destinations should
// jump to externalTrafficChain, which will handle some special cases and
// then jump to externalPolicyChain.
if usesExternalTrafficChain {
natChains.Write(utiliptables.MakeChainLine(externalTrafficChain))
activeNATChains.Insert(externalTrafficChain)

if !svcInfo.ExternalPolicyLocal() {
// If we are using non-local endpoints we need to masquerade,
// in case we cross nodes.
natRules.Write(
"-A", string(externalTrafficChain),
"-m", "comment", "--comment", fmt.Sprintf(`"masquerade traffic for %s external destinations"`, svcPortNameString),
"-j", string(kubeMarkMasqChain))
} else {
// If we are only using same-node endpoints, we can retain the
// source IP in most cases.

if proxier.localDetector.IsImplemented() {
// Treat all locally-originated pod -> external destination
// traffic as a special-case. It is subject to neither
// form of traffic policy, which simulates going up-and-out
// to an external load-balancer and coming back in.
natRules.Write(
"-A", string(externalTrafficChain),
"-m", "comment", "--comment", fmt.Sprintf(`"pod traffic for %s external destinations"`, svcPortNameString),
proxier.localDetector.IfLocal(),
"-j", string(clusterPolicyChain))
}

// Locally originated traffic (not a pod, but the host node)
// still needs masquerade because the LBIP itself is a local
// address, so that will be the chosen source IP.
natRules.Write(
"-A", string(externalTrafficChain),
"-m", "comment", "--comment", fmt.Sprintf(`"masquerade LOCAL traffic for %s external destinations"`, svcPortNameString),
"-m", "addrtype", "--src-type", "LOCAL",
"-j", string(kubeMarkMasqChain))

// Redirect all src-type=LOCAL -> external destination to the
// policy=cluster chain. This allows traffic originating
// from the host to be redirected to the service correctly.
natRules.Write(
"-A", string(externalTrafficChain),
"-m", "comment", "--comment", fmt.Sprintf(`"route LOCAL traffic for %s external destinations"`, svcPortNameString),
"-m", "addrtype", "--src-type", "LOCAL",
"-j", string(clusterPolicyChain))
}

// Anything else falls thru to the appropriate policy chain.
if hasExternalEndpoints {
natRules.Write(
"-A", string(externalTrafficChain),
"-j", string(externalPolicyChain))
}
}

// Set up firewall chain, if needed
if usesFWChain {
natChains.Write(utiliptables.MakeChainLine(fwChain))
activeNATChains.Insert(fwChain)

// The service firewall rules are created based on the
// loadBalancerSourceRanges field. This only works for VIP-like
// loadbalancers that preserve source IPs. For loadbalancers which
// direct traffic to service NodePort, the firewall rules will not
// apply.
args = append(args[:0],
"-A", string(fwChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcPortNameString),
)

// firewall filter based on each source range
allowFromNode := false
for _, cidr := range svcInfo.LoadBalancerSourceRanges() {
natRules.Write(args, "-s", cidr.String(), "-j", string(externalTrafficChain))
if cidr.Contains(proxier.nodeIP) {
allowFromNode = true
}
}
// For VIP-like LBs, the VIP is often added as a local
// address (via an IP route rule). In that case, a request
// from a node to the VIP will not hit the loadbalancer but
// will loop back with the source IP set to the VIP. We
// need the following rules to allow requests from this node.
if allowFromNode {
for _, lbip := range svcInfo.LoadBalancerVIPs() {
natRules.Write(
args,
"-s", lbip.String(),
"-j", string(externalTrafficChain))
}
}
// If the packet was able to reach the end of firewall chain,
// then it did not get DNATed, so it will match the
// corresponding KUBE-PROXY-FIREWALL rule.
natRules.Write(
"-A", string(fwChain),
"-m", "comment", "--comment", fmt.Sprintf(`"other traffic to %s will be dropped by KUBE-PROXY-FIREWALL"`, svcPortNameString),
)
}

// If Cluster policy is in use, create the chain and create rules jumping
// from clusterPolicyChain to the clusterEndpoints
if usesClusterPolicyChain {
natChains.Write(utiliptables.MakeChainLine(clusterPolicyChain))
activeNATChains.Insert(clusterPolicyChain)
proxier.writeServiceToEndpointRules(natRules, svcPortNameString, svcInfo, clusterPolicyChain, clusterEndpoints, args)
}

// If Local policy is in use, create the chain and create rules jumping
// from localPolicyChain to the localEndpoints
if usesLocalPolicyChain {
natChains.Write(utiliptables.MakeChainLine(localPolicyChain))
activeNATChains.Insert(localPolicyChain)
proxier.writeServiceToEndpointRules(natRules, svcPortNameString, svcInfo, localPolicyChain, localEndpoints, args)
}

// Generate the per-endpoint chains.
for _, ep := range allLocallyReachableEndpoints {
epInfo, ok := ep.(*endpointInfo)
if !ok {
klog.ErrorS(nil, "Failed to cast endpointInfo", "endpointInfo", ep)
continue
}

endpointChain := epInfo.ChainName

// Create the endpoint chain
natChains.Write(utiliptables.MakeChainLine(endpointChain))
activeNATChains.Insert(endpointChain)

args = append(args[:0], "-A", string(endpointChain))
args = proxier.appendServiceCommentLocked(args, svcPortNameString)
// Handle traffic that loops back to the originator with SNAT.
natRules.Write(
args,
"-s", epInfo.IP(),
"-j", string(kubeMarkMasqChain))
// Update client-affinity lists.
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
args = append(args, "-m", "recent", "--name", string(endpointChain), "--set")
}
// DNAT to final destination.
args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", epInfo.String())
natRules.Write(args)
}
}

ClusterIP 规则创建流程

假设说我们创建一个 nginx,端口为 80.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx-deployment
labels:
app: nginx
spec:
replicas: 2
selector:
matchLabels:
app: nginx
template:
metadata:
labels:
app: nginx
spec:
containers:
- name: nginx
image: nginx:latest
ports:
- containerPort: 80
1
2
3
4
5
6
7
8
9
10
11
apiVersion: v1
kind: Service
metadata:
name: nginx
spec:
ports:
- port: 80
targetPort: 80
name: http
selector:
app: nginx

已知
PodIP 为: 10.88.0.31 10.88.0.32
serviceIP 为: 10.0.0.246

1
2
3
4
5
kubectl get pod -o wide
nginx-deployment-576c6b7b6-jjccv 1/1 Running 0 2m33s 10.88.0.31 127.0.0.1 <none> <none>
nginx-deployment-576c6b7b6-p2t85 1/1 Running 0 2m33s 10.88.0.32 127.0.0.1 <none> <none>

nginx ClusterIP 10.0.0.246 <none> 80/TCP 2m49s

根据下面规则,我们可以结算出

echo -n "default/nginx:httptcp" | openssl dgst -sha256 -binary | base32 | head -c 16

P4Q3KNUAWJVP4ILH

1
2
3
4
5
6
7
8
9
if hasInternalEndpoints {
natRules.Write(
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcPortNameString),
"-m", protocol, "-p", protocol,
"-d", svcInfo.ClusterIP().String(),
"--dport", strconv.Itoa(svcInfo.Port()),
"-j", string(internalTrafficChain))
}

如果 service 匹配到了对应的 endpoint,那么上述代码会生成如下规则:

如果匹配目标地址为 10.0.0.246(serviceIP) 目标端口为 80 的 TCP 流量,那么会被重定向到 KUBE-SVC-P4Q3KNUAWJVP4ILH 链。

1
-A KUBE-SERVICES -d 10.0.0.246/32 -p tcp -m comment --comment "default/nginx:http cluster IP" -m tcp --dport 80 -j KUBE-SVC-P4Q3KNUAWJVP4ILH

转发&负载均衡

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
if usesClusterPolicyChain {
natChains.Write(utiliptables.MakeChainLine(clusterPolicyChain))
activeNATChains.Insert(clusterPolicyChain)
proxier.writeServiceToEndpointRules(natRules, svcPortNameString, svcInfo, clusterPolicyChain, clusterEndpoints, args)
}

func (proxier *Proxier) writeServiceToEndpointRules(natRules proxyutil.LineBuffer, svcPortNameString string, svcInfo proxy.ServicePort, svcChain utiliptables.Chain, endpoints []proxy.Endpoint, args []string) {
// First write session affinity rules, if applicable.
// 这里是ServiceAffinityClientIP添加的
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
for _, ep := range endpoints {
epInfo, ok := ep.(*endpointInfo)
if !ok {
continue
}
comment := fmt.Sprintf(`"%s -> %s"`, svcPortNameString, epInfo.String())

args = append(args[:0],
"-A", string(svcChain),
)
args = proxier.appendServiceCommentLocked(args, comment)
args = append(args,
"-m", "recent", "--name", string(epInfo.ChainName),
"--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds()), "--reap",
"-j", string(epInfo.ChainName),
)
natRules.Write(args)
}
}

// Now write loadbalancing rules.
numEndpoints := len(endpoints)
for i, ep := range endpoints {
epInfo, ok := ep.(*endpointInfo)
if !ok {
continue
}
comment := fmt.Sprintf(`"%s -> %s"`, svcPortNameString, epInfo.String())

// 注意args[:0], 重新生成了args
args = append(args[:0], "-A", string(svcChain))
args = proxier.appendServiceCommentLocked(args, comment)
// 将idx小于numEndpoints-1的规则设置为随机匹配
if i < (numEndpoints - 1) {
// Each rule is a probabilistic match.
args = append(args,
"-m", "statistic",
"--mode", "random",
"--probability", proxier.probability(numEndpoints-i))
}
// The final (or only if n == 1) rule is a guaranteed match.
natRules.Write(args, "-j", string(epInfo.ChainName))
}
}

上面代码最终会将所有的 endpoint 相关,插入 KUBE-SVC-P4Q3KNUAWJVP4ILH 链. 并且设置了随机匹配的规则。实现负载均衡。

1
2
-A KUBE-SVC-P4Q3KNUAWJVP4ILH -m comment --comment "default/nginx:http -> 10.88.0.31:80" -m statistic --mode random --probability 0.50000000000 -j KUBE-SEP-DSYSM2V7JTVFKHRG
-A KUBE-SVC-P4Q3KNUAWJVP4ILH -m comment --comment "default/nginx:http -> 10.88.0.32:80" -j KUBE-SEP-YCFYOPZYEILRUQYE

生成 SNAT 和 DNAT 规则用作转发

上面的代码,kube—proxy 拦截了访问 svc 的流量,交给 KUBE-SVC-XXXXXX 链处理。在 KUBE-SVC-XXXXXX 链中,根据随机匹配的规则,插入了对应后端的链。那么,我们看一下后端链的生成规则规则。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
for _, ep := range allLocallyReachableEndpoints {
epInfo, ok := ep.(*endpointInfo)
if !ok {
klog.ErrorS(nil, "Failed to cast endpointInfo", "endpointInfo", ep)
continue
}

endpointChain := epInfo.ChainName

// Create the endpoint chain
// 格式为 KUBE-SEP-XXXXXXX
natChains.Write(utiliptables.MakeChainLine(endpointChain))
activeNATChains.Insert(endpointChain)

// 重新生成args
args = append(args[:0], "-A", string(endpointChain))
args = proxier.appendServiceCommentLocked(args, svcPortNameString)
// Handle traffic that loops back to the originator with SNAT.
natRules.Write(
args,
"-s", epInfo.IP(),
"-j", string(kubeMarkMasqChain))
// Update client-affinity lists.
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
args = append(args, "-m", "recent", "--name", string(endpointChain), "--set")
}
// DNAT to final destination.
args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", epInfo.String())
natRules.Write(args)
}

上面代码会为每个 endpoint 生成如下规则:

1
2
3
4
5
6
-A KUBE-SEP-DSYSM2V7JTVFKHRG -s 10.88.0.31/32 -m comment --comment "default/nginx:http" -j KUBE-MARK-MASQ
// 将流量转发到 10.88.0.31 80 端口
-A KUBE-SEP-DSYSM2V7JTVFKHRG -p tcp -m comment --comment "default/nginx:http" -m tcp -j DNAT --to-destination 10.88.0.31:80

-A KUBE-SEP-YCFYOPZYEILRUQYE -s 10.88.0.32/32 -m comment --comment "default/nginx:http" -j KUBE-MARK-MASQ
-A KUBE-SEP-YCFYOPZYEILRUQYE -p tcp -m comment --comment "default/nginx:http" -m tcp -j DNAT --to-destination 10.88.0.32:80

这样,我们就完成了 ClusterIP 的规则生成。

clusterIP 总结

  1. kube-proxy 会为每个 service 创建一个 KUBE-SVC-XXXXXX 链,用于拦截访问 service 的流量。
  2. kube-proxy 会为每个 service 插入对应的 endpoint 链,用于负载均衡。
  3. kube-proxy 会为每个 endpoint 创建一个 KUBE-SEP-XXXXXX 链,用于转发流量到对应的 endpoint。
CATALOG
  1. 1. 从 kube-proxy 看 service 机制
  2. 2. kube-proxy 启动流程
  3. 3. kube-proxy 事件监听
    1. 3.1. serviceConfig
  4. 4. Iptables Proxier
    1. 4.1. iptables proxier 结构体
    2. 4.2. serviceChangeTracker
    3. 4.3. endpointsChangeTracker
    4. 4.4. syncRunner
  5. 5. iptables proxier 同步规则
    1. 5.1. iptables chain 名称计算方法
    2. 5.2. ClusterIP 规则创建流程
      1. 5.2.1. 转发&负载均衡
      2. 5.2.2. 生成 SNAT 和 DNAT 规则用作转发
      3. 5.2.3. clusterIP 总结