前言
因为使用了阿里云的SLS服务,所以监控数据是直接推送到了阿里云时序数据库中。
流程
Telegraf -> SLS
这样着实方便了监控数据, 但是监控告警还是想继续使用Prometheus, 阿里云并没有提供相关的Prometheus的 remote_read的接口,所以需要自己写adapter去适配。
remote adapter 参考文档
官方文档
Prometheus走的是 protobuf 协议进行传输的。
1 2 3 4 5 6 7 8 9 10
| message ReadRequest { repeated Query queries = 1; }
message Query { int64 start_timestamp_ms = 1; int64 end_timestamp_ms = 2; repeated prometheus.LabelMatcher matchers = 3; prometheus.ReadHints hints = 4; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| message ReadResponse { repeated QueryResult results = 1; }
message Sample { double value = 1; int64 timestamp = 2; }
message TimeSeries { repeated Label labels = 1; repeated Sample samples = 2; }
message QueryResult { repeated prometheus.TimeSeries timeseries = 1; }
|
我们只需要拿到 ReadRequest 内容, 并且通过 start_timestamp_ms, end_timestamp_ms 还有 matchers 进行匹配数据,并返回即可。
关于hints是不需要考虑,因为Prometheus函数计算和聚合计算都是在本地完成的, 我们只需要关注返回匹配好的原始数据即可。
Python代码
编译 Protobuf
编译方法在网络上找一下。
声明的结构文件在Prometheus仓库中。 https://github.com/prometheus/prometheus/tree/main/prompb
通过Prometheus api查询数据
SLS 提供了Prometheus的接口, 所以我们直接通过接口去查询数据即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| import logging from flask import Flask, request, Response from querys import parse_data, SlsQuery
from proto import remote_pb2 import snappy
app = Flask(__name__) slsQ = SlsQuery(url='') logging.basicConfig(level=logging.INFO)
@app.route("/") def index(): return Response("<h1>403 Forbidden<h1/>", status=403)
@app.route("/read", methods=["POST"]) def read(): msg = remote_pb2.ReadRequest() msg.ParseFromString(snappy.uncompress(request.data))
slsQ.set_auth(request.authorization['username'], request.authorization['password']) if request.headers['X-Prometheus-Remote-Read-Version'] != '0.1.0': return Response("<h1> Protocol error <h1/>", status=404) query = msg.queries[0] metric_name = '' for matcher in query.matchers: if matcher.name == '__name__': metric_name = f'{matcher.value}'
matchers = [] for matcher in query.matchers: if matcher.name != '__name__': if matcher.type == 0: matchers.append(f'{matcher.name}="{matcher.value}"') elif matcher.type == 1: matchers.append(f'{matcher.name}!="{matcher.value}"') elif matcher.type == 2: matchers.append(f'{matcher.name}=~"{matcher.value}"') elif matcher.type == 3: matchers.append(f'{matcher.name}!~"{matcher.value}"')
query_str = f'{metric_name}' + '{' + f'{",".join(matchers)}' + '}' if not matchers: query_str = metric_name app.logger.debug(f'query_str: {query_str}') start_ms = query.hints.start_ms end_ms = query.hints.end_ms
resp = Response() resp.headers['Content-Type'] = 'application/x-protobuf' resp.headers['Content-Encoding'] = 'snappy'
read_response = remote_pb2.ReadResponse()
if query.hints.range_ms: content = slsQ.sls_query_range(query_str=query_str, start_ms=start_ms, end_ms=end_ms, step_ms=query.hints.step_ms) else: content = slsQ.sls_query(query_str=query_str, start_ms=start_ms, end_ms=end_ms)
if content['status'] == 'error': return Response(content['error'], status=500)
parse_data(read_response=read_response, data=content['data']) resp.set_data(snappy.compress(read_response.SerializeToString()))
return resp
|
完整代码
see https://github.com/momommm/Prometheus-sls-adapter