momo's Blog.

用Python写一个Promteheus remote read adapter

字数统计: 628阅读时长: 3 min
2022/06/15 Share

前言

因为使用了阿里云的SLS服务,所以监控数据是直接推送到了阿里云时序数据库中。

流程
Telegraf -> SLS

这样着实方便了监控数据, 但是监控告警还是想继续使用Prometheus, 阿里云并没有提供相关的Prometheus的 remote_read的接口,所以需要自己写adapter去适配。

remote adapter 参考文档

官方文档

Prometheus走的是 protobuf 协议进行传输的。

1
2
3
4
5
6
7
8
9
10
message ReadRequest {
repeated Query queries = 1;
}

message Query {
int64 start_timestamp_ms = 1;
int64 end_timestamp_ms = 2;
repeated prometheus.LabelMatcher matchers = 3;
prometheus.ReadHints hints = 4;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
message ReadResponse {
// In same order as the request's queries.
repeated QueryResult results = 1;
}

message Sample {
double value = 1;
int64 timestamp = 2;
}

message TimeSeries {
repeated Label labels = 1;
repeated Sample samples = 2;
}

message QueryResult {
repeated prometheus.TimeSeries timeseries = 1;
}

我们只需要拿到 ReadRequest 内容, 并且通过 start_timestamp_ms, end_timestamp_ms 还有 matchers 进行匹配数据,并返回即可。

关于hints是不需要考虑,因为Prometheus函数计算和聚合计算都是在本地完成的, 我们只需要关注返回匹配好的原始数据即可。

Python代码

编译 Protobuf

编译方法在网络上找一下。
声明的结构文件在Prometheus仓库中。 https://github.com/prometheus/prometheus/tree/main/prompb

通过Prometheus api查询数据

SLS 提供了Prometheus的接口, 所以我们直接通过接口去查询数据即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import logging
from flask import Flask, request, Response
from querys import parse_data, SlsQuery

from proto import remote_pb2
import snappy

app = Flask(__name__)
slsQ = SlsQuery(url='')
logging.basicConfig(level=logging.INFO)


@app.route("/")
def index():
return Response("<h1>403 Forbidden<h1/>", status=403)


@app.route("/read", methods=["POST"])
def read():
msg = remote_pb2.ReadRequest()
msg.ParseFromString(snappy.uncompress(request.data))

slsQ.set_auth(request.authorization['username'], request.authorization['password'])
if request.headers['X-Prometheus-Remote-Read-Version'] != '0.1.0':
return Response("<h1> Protocol error <h1/>", status=404)
query = msg.queries[0]
metric_name = ''
for matcher in query.matchers:
if matcher.name == '__name__':
metric_name = f'{matcher.value}'

matchers = []
for matcher in query.matchers:
if matcher.name != '__name__':
if matcher.type == 0:
matchers.append(f'{matcher.name}="{matcher.value}"')
elif matcher.type == 1:
matchers.append(f'{matcher.name}!="{matcher.value}"')
elif matcher.type == 2:
matchers.append(f'{matcher.name}=~"{matcher.value}"')
elif matcher.type == 3:
matchers.append(f'{matcher.name}!~"{matcher.value}"')

query_str = f'{metric_name}' + '{' + f'{",".join(matchers)}' + '}'
if not matchers:
query_str = metric_name
app.logger.debug(f'query_str: {query_str}')
start_ms = query.hints.start_ms
end_ms = query.hints.end_ms

resp = Response()
resp.headers['Content-Type'] = 'application/x-protobuf'
resp.headers['Content-Encoding'] = 'snappy'

read_response = remote_pb2.ReadResponse()

if query.hints.range_ms:
content = slsQ.sls_query_range(query_str=query_str, start_ms=start_ms, end_ms=end_ms,
step_ms=query.hints.step_ms)
else:
content = slsQ.sls_query(query_str=query_str, start_ms=start_ms, end_ms=end_ms)

if content['status'] == 'error':
return Response(content['error'], status=500)

parse_data(read_response=read_response, data=content['data'])
resp.set_data(snappy.compress(read_response.SerializeToString()))

return resp

完整代码

see https://github.com/momommm/Prometheus-sls-adapter

CATALOG
  1. 1. 前言
  2. 2. remote adapter 参考文档
  3. 3. Python代码
    1. 3.1. 编译 Protobuf
    2. 3.2. 通过Prometheus api查询数据
  4. 4. 完整代码