Prometheus内部实现(三)

非线性 => 线性,混乱 => 秩序

上一篇博文主要写了Prometheus server的启动过程,但是并没有深入各个组件,了解其具体实现,这篇文件主要介绍Initial configuration loading部分。

Prometheus server的main函数有两个很精彩的实现,一个是reloader的注册,一个是组件启动。但是有个问题,所有的组件都是同时启动的吗?还是有什么顺序?

我们列一个组件清单:

1
2
3
4
5
6
7
8
9
10
- Termination handler.
- Scrape discovery manager.
- Notify discovery manager.
- Scrape manager.
- Reload handler.
- Rule manager.
- TSDB.
- Web handler.
- Notifier.

再来看一下组件启动使用的底层技术:

run.Group is a universal mechanism to manage goroutine lifecycles.

Create a zero-value run.Group, and then add actors to it. Actors are defined as a pair of functions: an execute function, which should run synchronously; and an interrupt function, which, when invoked, should cause the execute function to return. Finally, invoke Run, which concurrently runs all of the actors, waits until the first actor exits, invokes the interrupt functions, and finally returns control to the caller only once all actors have returned. This general-purpose API allows callers to model pretty much any runnable task, and achieve well-defined lifecycle semantics for the group.

run.Group was written to manage component lifecycles in func main for OK Log. But it’s useful in any circumstance where you need to orchestrate multiple goroutines as a unit whole. Click here to see a video of a talk where run.Group is described.

source: https://pkg.go.dev/github.com/oklog/run

简单总结一下,oklog/run库的run.Group实现了一种goroutine生命的管理机制,使用了actor模型,注册一对函数execute和interrupt。回忆一下上篇博客,实现正是如此。

Termination handler组件的execute函数使用select等待term、webHandler、cancel的通知,用于保证系统正常退出,进入即阻塞。

Scrape discovery manager组件直接调用discoveryManagerScrape.Run。

Notify discovery manager组件直接调用discoveryManagerNotify.Run。

Scrape manager组件等待reloadReady.C通知,然后运行scrapeManager.Run。

Reload handler组件等待reloadReady.C通知,然后等待hup、webHandler、cancel的通知,根据通知重新加载。

Initial configuration loading组件进入后等待dbOpen、cancel通知,dbOpen通知后,加载配置。

Rule manager组件等待reloadReady.C通知,然后运行ruleManager.Run。

TSDB组件运行openDBWithMetrics,然后close dbOpen channel。

Web handler组件直接调用webHandler.Run。

Notifier组件等待reloadReady.C通知,然后调用notifierManager.Run。

可以看出整个控制流程有两个很重要的channel:dbOpen和reloadReady.C。我们再看看这两个channel,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Start all components while we wait for TSDB to open but only load
// initial config and mark ourselves as ready after it completed.
dbOpen := make(chan struct{})

// sync.Once is used to make sure we can close the channel at different execution stages(SIGTERM or when the config is loaded).
type closeOnce struct {
C chan struct{}
once sync.Once
Close func()
}
// Wait until the server is ready to handle reloading.
reloadReady := &closeOnce{
C: make(chan struct{}),
}
reloadReady.Close = func() {
reloadReady.once.Do(func() {
close(reloadReady.C)
})
}

根据上面的总结,Termination handler、Scrape discovery manager、Notify discovery manager、TSDB、Web handler组件直接运行,TSDB初始化结束后,关闭dbOpen channel,接着Initial configuration loading结束阻塞,开始运行,在某处关闭了reloadReady.C channel后,等待该channel通知的组件开始运行。

Initial configuration loading组件的execute函数如下,可见最重要的就是reloadConfig函数的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
select {
case <-dbOpen:
// In case a shutdown is initiated before the dbOpen is released
case <-cancel:
reloadReady.Close()
return nil
}

if err := reloadConfig(cfg.configFile, logger, noStepSubqueryInterval, reloaders...); err != nil {
return errors.Wrapf(err, "error loading config from %q", cfg.configFile)
}

reloadReady.Close()

reloadConfig函数实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func reloadConfig(filename string, logger log.Logger, noStepSuqueryInterval *safePromQLNoStepSubqueryInterval, rls ...reloader) (err error) {
start := time.Now()
timings := []interface{}{}
level.Info(logger).Log("msg", "Loading configuration file", "filename", filename)

defer func() {
if err == nil {
configSuccess.Set(1)
configSuccessTime.SetToCurrentTime()
} else {
configSuccess.Set(0)
}
}()

conf, err := config.LoadFile(filename)
if err != nil {
return errors.Wrapf(err, "couldn't load configuration (--config.file=%q)", filename)
}

failed := false
for _, rl := range rls {
rstart := time.Now()
if err := rl.reloader(conf); err != nil {
level.Error(logger).Log("msg", "Failed to apply configuration", "err", err)
failed = true
}
timings = append(timings, rl.name, time.Since(rstart))
}
if failed {
return errors.Errorf("one or more errors occurred while applying the new configuration (--config.file=%q)", filename)
}

noStepSuqueryInterval.Set(conf.GlobalConfig.EvaluationInterval)
l := []interface{}{"msg", "Completed loading of configuration file", "filename", filename, "totalDuration", time.Since(start)}
level.Info(logger).Log(append(l, timings...)...)
return nil
}

该函数里面有两个重要功能,一个是解析配置文件,另一个是调用reloader。reloader的调用先不深究,我们留到下一篇讲scrape的时候再研究,这里先研究一下配置文件的解析,这个也是相当精彩的。配置文件的struct定义如下,

1
2
3
4
5
6
7
8
9
type Config struct {
GlobalConfig GlobalConfig `yaml:"global"`
AlertingConfig AlertingConfig `yaml:"alerting,omitempty"`
RuleFiles []string `yaml:"rule_files,omitempty"`
ScrapeConfigs []*ScrapeConfig `yaml:"scrape_configs,omitempty"`

RemoteWriteConfigs []*RemoteWriteConfig `yaml:"remote_write,omitempty"`
RemoteReadConfigs []*RemoteReadConfig `yaml:"remote_read,omitempty"`
}

因为需要支持各种动态发现组件,ServiceDiscoveryConfigs在这里tag key为“-”,表示不解析,config.go自定义实现了UnmarshalYAML,用于解析配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
type ScrapeConfig struct {
// The job name to which the job label is set by default.
JobName string `yaml:"job_name"`
// Indicator whether the scraped metrics should remain unmodified.
HonorLabels bool `yaml:"honor_labels,omitempty"`
// Indicator whether the scraped timestamps should be respected.
HonorTimestamps bool `yaml:"honor_timestamps"`
// A set of query parameters with which the target is scraped.
Params url.Values `yaml:"params,omitempty"`
// How frequently to scrape the targets of this scrape config.
ScrapeInterval model.Duration `yaml:"scrape_interval,omitempty"`
// The timeout for scraping targets of this config.
ScrapeTimeout model.Duration `yaml:"scrape_timeout,omitempty"`
// The HTTP resource path on which to fetch metrics from targets.
MetricsPath string `yaml:"metrics_path,omitempty"`
// The URL scheme with which to fetch metrics from targets.
Scheme string `yaml:"scheme,omitempty"`
// More than this many samples post metric-relabeling will cause the scrape to fail.
SampleLimit uint `yaml:"sample_limit,omitempty"`
// More than this many targets after the target relabeling will cause the
// scrapes to fail.
TargetLimit uint `yaml:"target_limit,omitempty"`

// We cannot do proper Go type embedding below as the parser will then parse
// values arbitrarily into the overflow maps of further-down types.

ServiceDiscoveryConfigs discovery.Configs `yaml:"-"`
HTTPClientConfig config.HTTPClientConfig `yaml:",inline"`

// List of target relabel configurations.
RelabelConfigs []*relabel.Config `yaml:"relabel_configs,omitempty"`
// List of metric relabel configurations.
MetricRelabelConfigs []*relabel.Config `yaml:"metric_relabel_configs,omitempty"`
}

conf, err := config.LoadFile(filename)这个调用返回一个Config指针,

1
2
3
4
5
6
7
8
9
10
11
12
13
func Load(s string) (*Config, error) {
cfg := &Config{}
// If the entire config body is empty the UnmarshalYAML method is
// never called. We thus have to set the DefaultConfig at the entry
// point as well.
*cfg = DefaultConfig

err := yaml.UnmarshalStrict([]byte(s), cfg)
if err != nil {
return nil, err
}
return cfg, nil
}

err := yaml.UnmarshalStrict([]byte(s), cfg)调用Config的yaml Unmarshal方法,解析配置,但是Prometheus自定义了整个配置文件的解析,Config的UnmarshalYAML实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {
*c = DefaultConfig
// We want to set c to the defaults and then overwrite it with the input.
// To make unmarshal fill the plain data struct rather than calling UnmarshalYAML
// again, we have to hide it using a type indirection.
type plain Config
if err := unmarshal((*plain)(c)); err != nil {
return err
}

// If a global block was open but empty the default global config is overwritten.
// We have to restore it here.
if c.GlobalConfig.isZero() {
c.GlobalConfig = DefaultGlobalConfig
}

for _, rf := range c.RuleFiles {
if !patRulePath.MatchString(rf) {
return errors.Errorf("invalid rule file path %q", rf)
}
}
// Do global overrides and validate unique names.
jobNames := map[string]struct{}{}
for _, scfg := range c.ScrapeConfigs {
if scfg == nil {
return errors.New("empty or null scrape config section")
}
// First set the correct scrape interval, then check that the timeout
// (inferred or explicit) is not greater than that.
if scfg.ScrapeInterval == 0 {
scfg.ScrapeInterval = c.GlobalConfig.ScrapeInterval
}
if scfg.ScrapeTimeout > scfg.ScrapeInterval {
return errors.Errorf("scrape timeout greater than scrape interval for scrape config with job name %q", scfg.JobName)
}
if scfg.ScrapeTimeout == 0 {
if c.GlobalConfig.ScrapeTimeout > scfg.ScrapeInterval {
scfg.ScrapeTimeout = scfg.ScrapeInterval
} else {
scfg.ScrapeTimeout = c.GlobalConfig.ScrapeTimeout
}
}

if _, ok := jobNames[scfg.JobName]; ok {
return errors.Errorf("found multiple scrape configs with job name %q", scfg.JobName)
}
jobNames[scfg.JobName] = struct{}{}
}
rwNames := map[string]struct{}{}
for _, rwcfg := range c.RemoteWriteConfigs {
if rwcfg == nil {
return errors.New("empty or null remote write config section")
}
// Skip empty names, we fill their name with their config hash in remote write code.
if _, ok := rwNames[rwcfg.Name]; ok && rwcfg.Name != "" {
return errors.Errorf("found multiple remote write configs with job name %q", rwcfg.Name)
}
rwNames[rwcfg.Name] = struct{}{}
}
rrNames := map[string]struct{}{}
for _, rrcfg := range c.RemoteReadConfigs {
if rrcfg == nil {
return errors.New("empty or null remote read config section")
}
// Skip empty names, we fill their name with their config hash in remote read code.
if _, ok := rrNames[rrcfg.Name]; ok && rrcfg.Name != "" {
return errors.Errorf("found multiple remote read configs with job name %q", rrcfg.Name)
}
rrNames[rrcfg.Name] = struct{}{}
}
return nil
}

这个配置的解析和验证实在是太复杂了,我们不管其他struct field的unmarshal,关注scrape configs的解析,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func (c *ScrapeConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
*c = DefaultScrapeConfig
if err := discovery.UnmarshalYAMLWithInlineConfigs(c, unmarshal); err != nil {
return err
}
if len(c.JobName) == 0 {
return errors.New("job_name is empty")
}

// The UnmarshalYAML method of HTTPClientConfig is not being called because it's not a pointer.
// We cannot make it a pointer as the parser panics for inlined pointer structs.
// Thus we just do its validation here.
if err := c.HTTPClientConfig.Validate(); err != nil {
return err
}

// Check for users putting URLs in target groups.
if len(c.RelabelConfigs) == 0 {
if err := checkStaticTargets(c.ServiceDiscoveryConfigs); err != nil {
return err
}
}

for _, rlcfg := range c.RelabelConfigs {
if rlcfg == nil {
return errors.New("empty or null target relabeling rule in scrape config")
}
}
for _, rlcfg := range c.MetricRelabelConfigs {
if rlcfg == nil {
return errors.New("empty or null metric relabeling rule in scrape config")
}
}

return nil
}

err := discovery.UnmarshalYAMLWithInlineConfigs(c, unmarshal)里面是解析动态发现的具体实现, let us drill down:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func UnmarshalYAMLWithInlineConfigs(out interface{}, unmarshal func(interface{}) error) error {
outVal := reflect.ValueOf(out)
if outVal.Kind() != reflect.Ptr {
return fmt.Errorf("discovery: can only unmarshal into a struct pointer: %T", out)
}
outVal = outVal.Elem()
if outVal.Kind() != reflect.Struct {
return fmt.Errorf("discovery: can only unmarshal into a struct pointer: %T", out)
}
outTyp := outVal.Type()

cfgTyp := getConfigType(outTyp)
cfgPtr := reflect.New(cfgTyp)
cfgVal := cfgPtr.Elem()

// Copy shared fields (defaults) to dynamic value.
var configs *Configs
for i, n := 0, outVal.NumField(); i < n; i++ {
if outTyp.Field(i).Type == configsType {
configs = outVal.Field(i).Addr().Interface().(*Configs)
continue
}
if cfgTyp.Field(i).PkgPath != "" {
continue // Field is unexported: ignore.
}
cfgVal.Field(i).Set(outVal.Field(i))
}
if configs == nil {
return fmt.Errorf("discovery: Configs field not found in type: %T", out)
}

// Unmarshal into dynamic value.
if err := unmarshal(cfgPtr.Interface()); err != nil {
return replaceYAMLTypeError(err, cfgTyp, outTyp)
}

// Copy shared fields from dynamic value.
for i, n := 0, outVal.NumField(); i < n; i++ {
if cfgTyp.Field(i).PkgPath != "" {
continue // Field is unexported: ignore.
}
outVal.Field(i).Set(cfgVal.Field(i))
}

var err error
*configs, err = readConfigs(cfgVal, outVal.NumField())
return err
}

Oh, my god! 这个太刺激了,用reflect去实现底层的数据解析,这个以后要看看底层的实现,先不管了,我们看到上面函数的倒数第二句有这样一个调用*configs, err = readConfigs(cfgVal, outVal.NumField()),继续drill down

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func readConfigs(structVal reflect.Value, startField int) (Configs, error) {
var (
configs Configs
targets []*targetgroup.Group
)
for i, n := startField, structVal.NumField(); i < n; i++ {
field := structVal.Field(i)
if field.Kind() != reflect.Slice {
panic("discovery: internal error: field is not a slice")
}
for k := 0; k < field.Len(); k++ {
val := field.Index(k)
if val.IsZero() || (val.Kind() == reflect.Ptr && val.Elem().IsZero()) {
key := configFieldNames[field.Type().Elem()]
key = strings.TrimPrefix(key, configFieldPrefix)
return nil, fmt.Errorf("empty or null section in %s", key)
}
switch c := val.Interface().(type) {
case *targetgroup.Group:
// Add index to the static config target groups for unique identification
// within scrape pool.
c.Source = strconv.Itoa(len(targets))
// Coalesce multiple static configs into a single static config.
targets = append(targets, c)
case Config:
configs = append(configs, c)
default:
panic("discovery: internal error: slice element is not a Config")
}
}
}
if len(targets) > 0 {
configs = append(configs, StaticConfig(targets))
}
return configs, nil
}

厉害了厉害了,我们终于解析完了scrape config,终于知道可以去抓取哪些目标metrics了。嗯,又有新目标,了解yaml底层解析的原理!!!

下一篇我们接着研究Prometheus server的scrape实现。

分享到