Prometheus内部实现(四)

目标该怎么管理?静态还是动态发现?

目标管理是使用静态还是动态发现,这个和基础架构有关,要回答这个问题,需要先说说应用部署的现状!

virtual-container

目前,企业应用部署有两种选项:

  1. 基于虚拟机
  2. 基于容器

当然,更多的是虚拟机里装docker,算是两者的结合,节省成本。

那么基于虚拟机部署应用有什么特点呢?就是变化小,你的预期和实际情况是相对静止的,在虚拟机上部署一套应用,一套流程走下来,大概得小半天吧,这种情况把要监控的目标写到Prometheus的配置文件,再重启Prometheus server,完全玩得转!

然而,当公司的应用开始基于容器部署,特别是管控平面使用了k8s,那么手动配置Prometheus、重启就成为了噩梦一样的存在!运维会疲于奔命,开发会吐槽运维。

所以监控目标的配置,取决于使用的底层技术!

题外话:要想Devops在企业落地,得有一整套配套的工具,整体打通。

那么Prometheus在抓取目标方面是怎么做的呢?我们先重温一下相关组件的启动:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
{
// Scrape discovery manager.
g.Add(
func() error {
err := discoveryManagerScrape.Run()
level.Info(logger).Log("msg", "Scrape discovery manager stopped")
return err
},
func(err error) {
level.Info(logger).Log("msg", "Stopping scrape discovery manager...")
cancelScrape()
},
)
}
{
// Notify discovery manager.
...
}
{
// Scrape manager.
g.Add(
func() error {
// When the scrape manager receives a new targets list
// it needs to read a valid config for each job.
// It depends on the config being in sync with the discovery manager so
// we wait until the config is fully loaded.
<-reloadReady.C

err := scrapeManager.Run(discoveryManagerScrape.SyncCh())
level.Info(logger).Log("msg", "Scrape manager stopped")
return err
},
func(err error) {
// Scrape manager needs to be stopped before closing the local TSDB
// so that it doesn't try to write samples to a closed storage.
level.Info(logger).Log("msg", "Stopping scrape manager...")
scrapeManager.Stop()
},
)
}

启动的核心语句:

1
2
3
4
5
# 启动动态发现
discoveryManagerScrape.Run()

# 启动目标抓取
scrapeManager.Run(discoveryManagerScrape.SyncCh())

discoveryManagerScrape.SyncCh()作为参数传递给scrapeManager.Run,来看看它的实现:

1
2
3
4
// SyncCh returns a read only channel used by all the clients to receive target updates.
func (m *Manager) SyncCh() <-chan map[string][]*targetgroup.Group {
return m.syncCh
}

实际上,是将discoveryManagerScrape和scrapeManager通过map[string][]*targetgroup.Groupchannel 连接了起来,也就是discoveryManagerScrape动态发现的目标通过这个channel,同步给scrapeManager,scrapeManager负责抓取。

我们来做一个小结:

dsm

其实,就是两个goroutine之间通过channel通信。

接下来,让我们看看scrapeManager.Run里面的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Run receives and saves target set updates and triggers the scraping loops reloading.
// Reloading happens in the background so that it doesn't block receiving targets updates.
func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) error {
go m.reloader()
for {
select {
case ts := <-tsets:
m.updateTsets(ts)

select {
case m.triggerReload <- struct{}{}:
default:
}

case <-m.graceShut:
return nil
}
}
}

核心是一个for循环,channel tsets(map[string][]*targetgroup.Group)等待动态发现传递的目标,调用m.updateTsets(ts)更新抓取的目标,设置reload信号m.triggerReload <- struct{}{},周而复始,先来看看m.updateTsets(ts)坐了什么操作:

1
2
3
4
5
func (m *Manager) updateTsets(tsets map[string][]*targetgroup.Group) {
m.mtxScrape.Lock()
m.targetSets = tsets
m.mtxScrape.Unlock()
}

没错,很简单,将监控的目标进行了更新。再来,看看Manager结构体的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
type Manager struct {
logger log.Logger
append storage.Appendable
graceShut chan struct{}

jitterSeed uint64 // Global jitterSeed seed is used to spread scrape workload across HA setup.
mtxScrape sync.Mutex // Guards the fields below.
scrapeConfigs map[string]*config.ScrapeConfig
scrapePools map[string]*scrapePool
targetSets map[string][]*targetgroup.Group

triggerReload chan struct{}
}

可以看到跟抓取有关的三个字段为:

1
2
3
scrapeConfigs map[string]*config.ScrapeConfig
scrapePools map[string]*scrapePool
targetSets map[string][]*targetgroup.Group

现在,discoveryManagerScrape通过channel将抓取的目标传递过来,scrapeManager内部到底是怎样转化成最后的HTTP请求呢?

scrapeManager.Run有一个重要的语句go m.reloader(),并在接收目标后设置了triggerReload

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (m *Manager) reloader() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

for {
select {
case <-m.graceShut:
return
case <-ticker.C:
select {
case <-m.triggerReload:
m.reload()
case <-m.graceShut:
return
}
}
}
}

Prometheus server每隔5秒,检查triggerReload channel,调用m.reload()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (m *Manager) reload() {
m.mtxScrape.Lock()
var wg sync.WaitGroup
for setName, groups := range m.targetSets {
if _, ok := m.scrapePools[setName]; !ok {
scrapeConfig, ok := m.scrapeConfigs[setName]
if !ok {
level.Error(m.logger).Log("msg", "error reloading target set", "err", "invalid config id:"+setName)
continue
}
sp, err := newScrapePool(scrapeConfig, m.append, m.jitterSeed, log.With(m.logger, "scrape_pool", setName))
if err != nil {
level.Error(m.logger).Log("msg", "error creating new scrape pool", "err", err, "scrape_pool", setName)
continue
}
m.scrapePools[setName] = sp
}

wg.Add(1)
// Run the sync in parallel as these take a while and at high load can't catch up.
go func(sp *scrapePool, groups []*targetgroup.Group) {
sp.Sync(groups)
wg.Done()
}(m.scrapePools[setName], groups)

}
m.mtxScrape.Unlock()
wg.Wait()
}

这个方法完成了核心的转换,将目标转换成可以执行的任务,抽出核心业务我们可以得到这样的范式:

1
2
3
4
5
6
7
8
9

var wg sync.WaitGroup
for _, t := range tsets {
wg.Add(1)
go func(t int) {
wg.Done()
}(t)
}
wg.Wait()

回到业务上,我们发现targetSets、scrapeConfigs和scrapePools进行了完美的转换,如果scrapePools没有setName,并且scrapeConfigs有相应的配置,则调用newScrapePool创建scrapePool,更新到Manager,最后将scrapePool和[]*targetgroup.Group传递给匿名函数,完成sp.Sync(groups)调用,然我们继续跟踪Sync方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
sp.mtx.Lock()
defer sp.mtx.Unlock()
start := time.Now()

sp.targetMtx.Lock()
var all []*Target
sp.droppedTargets = []*Target{}
for _, tg := range tgs {
targets, err := targetsFromGroup(tg, sp.config)
if err != nil {
level.Error(sp.logger).Log("msg", "creating targets failed", "err", err)
continue
}
for _, t := range targets {
if t.Labels().Len() > 0 {
all = append(all, t)
} else if t.DiscoveredLabels().Len() > 0 {
sp.droppedTargets = append(sp.droppedTargets, t)
}
}
}
sp.targetMtx.Unlock()
sp.sync(all)

targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe(
time.Since(start).Seconds(),
)
targetScrapePoolSyncsCounter.WithLabelValues(sp.config.JobName).Inc()
}

Sync方法的核心有两个调用:

1
2
3
4
5
# 将Group转化成Target列表
targets, err := targetsFromGroup(tg, sp.config)

# 同步
sp.sync(all)

Target的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type Target struct {
// Labels before any processing.
discoveredLabels labels.Labels
// Any labels that are added to this target and its metrics.
labels labels.Labels
// Additional URL parameters that are part of the target URL.
params url.Values

mtx sync.RWMutex
lastError error
lastScrape time.Time
lastScrapeDuration time.Duration
health TargetHealth
metadata MetricMetadataStore
}

这就是要抓取一个目标的所有配置,基本信息都包含在这里面了,现在配置转化成了可以执行的目标,我们看看sp.sync的具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// sync takes a list of potentially duplicated targets, deduplicates them, starts
// scrape loops for new targets, and stops scrape loops for disappeared targets.
// It returns after all stopped scrape loops terminated.
func (sp *scrapePool) sync(targets []*Target) {
var (
uniqueLoops = make(map[uint64]loop)
interval = time.Duration(sp.config.ScrapeInterval)
timeout = time.Duration(sp.config.ScrapeTimeout)
limit = int(sp.config.SampleLimit)
honorLabels = sp.config.HonorLabels
honorTimestamps = sp.config.HonorTimestamps
mrc = sp.config.MetricRelabelConfigs
)

sp.targetMtx.Lock()
for _, t := range targets {
hash := t.hash()

if _, ok := sp.activeTargets[hash]; !ok {
s := &targetScraper{Target: t, client: sp.client, timeout: timeout}
l := sp.newLoop(scrapeLoopOptions{
target: t,
scraper: s,
limit: limit,
honorLabels: honorLabels,
honorTimestamps: honorTimestamps,
mrc: mrc,
})

sp.activeTargets[hash] = t
sp.loops[hash] = l

uniqueLoops[hash] = l
} else {
// This might be a duplicated target.
if _, ok := uniqueLoops[hash]; !ok {
uniqueLoops[hash] = nil
}
// Need to keep the most updated labels information
// for displaying it in the Service Discovery web page.
sp.activeTargets[hash].SetDiscoveredLabels(t.DiscoveredLabels())
}
}

var wg sync.WaitGroup

// Stop and remove old targets and scraper loops.
for hash := range sp.activeTargets {
if _, ok := uniqueLoops[hash]; !ok {
wg.Add(1)
go func(l loop) {
l.stop()
wg.Done()
}(sp.loops[hash])

delete(sp.loops, hash)
delete(sp.activeTargets, hash)
}
}

sp.targetMtx.Unlock()

targetScrapePoolTargetsAdded.WithLabelValues(sp.config.JobName).Set(float64(len(uniqueLoops)))
forcedErr := sp.refreshTargetLimitErr()
for _, l := range sp.loops {
l.setForcedError(forcedErr)
}
for _, l := range uniqueLoops {
if l != nil {
go l.run(interval, timeout, nil)
}
}
// Wait for all potentially stopped scrapers to terminate.
// This covers the case of flapping targets. If the server is under high load, a new scraper
// may be active and tries to insert. The old scraper that didn't terminate yet could still
// be inserting a previous sample set.
wg.Wait()
}

这个方法现将Target转换成Loop,然后调用go l.run(interval, timeout, nil)开始抓取:

1
2
3
4
5
6
7
type loop interface {
run(interval, timeout time.Duration, errc chan<- error)
setForcedError(err error)
stop()
getCache() *scrapeCache
disableEndOfRunStalenessMarkers()
}

loop是一个interface,只要实现相应的方法都是loop。

run方法核心的在于调用scraper的scrape方法和report方法,scrape负责抓取,report负责数据上报,进行下一步的存储,scraper的定义如下:

1
2
3
4
5
type scraper interface {
scrape(ctx context.Context, w io.Writer) (string, error)
Report(start time.Time, dur time.Duration, err error)
offset(interval time.Duration, jitterSeed uint64) time.Duration
}

具体实现的是targetScraper,定义如下:

1
2
3
4
5
6
7
8
9
10
type targetScraper struct {
*Target

client *http.Client
req *http.Request
timeout time.Duration

gzipr *gzip.Reader
buf *bufio.Reader
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
func (s *targetScraper) scrape(ctx context.Context, w io.Writer) (string, error) {
if s.req == nil {
req, err := http.NewRequest("GET", s.URL().String(), nil)
if err != nil {
return "", err
}
req.Header.Add("Accept", acceptHeader)
req.Header.Add("Accept-Encoding", "gzip")
req.Header.Set("User-Agent", userAgentHeader)
req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", fmt.Sprintf("%f", s.timeout.Seconds()))

s.req = req
}

resp, err := s.client.Do(s.req.WithContext(ctx))
if err != nil {
return "", err
}
defer func() {
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
}()

if resp.StatusCode != http.StatusOK {
return "", errors.Errorf("server returned HTTP status %s", resp.Status)
}

if resp.Header.Get("Content-Encoding") != "gzip" {
_, err = io.Copy(w, resp.Body)
if err != nil {
return "", err
}
return resp.Header.Get("Content-Type"), nil
}

if s.gzipr == nil {
s.buf = bufio.NewReader(resp.Body)
s.gzipr, err = gzip.NewReader(s.buf)
if err != nil {
return "", err
}
} else {
s.buf.Reset(resp.Body)
if err = s.gzipr.Reset(s.buf); err != nil {
return "", err
}
}

_, err = io.Copy(w, s.gzipr)
s.gzipr.Close()
if err != nil {
return "", err
}
return resp.Header.Get("Content-Type"), nil
}

我们终于看到,最后的最后,抓取其实是一个HTTP请求。为了完成一个抓取,Prometheus server进行了复杂的转化过程,正是这种实现,才让Prometheus拥有动态管理抓取目标的能力,我们大概来回忆一下这个转化过程:

1
2
3
4
5
6
Group --------
| \
| -> scrapePool -> pool -> scraper
| /
Tatget -------

终于,我们把Prometheus server的抓取过程翻了个底朝天,裨益甚多。

那么数据抓取过来就是存储、分析和展示了,下一篇我们开始研究存储。

分享到