1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
cat > app.py <<EOF
#!/usr/bin/python
# -*- coding: utf8 -*-
from flask import Flask
from flask import request
import time
import hmac
import hashlib
import base64
import urllib
import requests
import json
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
app = Flask(__name__)
url = 'https://oapi.dingtalk.com/robot/send?access_token=859fe7562a332ec1d0a1b7385da590baa726a0b59c9311a68d7xxxxxxxxxx'
def get_timestamp_sign():
timestamp = long(round(time.time() * 1000))
secret = 'SEC5d4464c3b48f46352d7d0ec92a1f7b674c11910axxxxxxx'
secret_enc = bytes(secret).encode('utf-8')
string_to_sign = '{}\n{}'.format(timestamp, secret)
string_to_sign_enc = bytes(string_to_sign).encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.quote_plus(base64.b64encode(hmac_code))
return {"timestamp": timestamp, "sign": sign}
def send_dingtalk(msg,url):
data = {
'msgtype': 'text',
'text': {
'content': '{}'.format(msg)
},
'at': {
'atMobiles': []
}
}
headers = {
'Content-Type': 'application/json',
'Charset': 'utf-8'
}
response = requests.post(url, headers=headers, data=json.dumps(data))
def get_dingtalk_content(data):
parameter = get_timestamp_sign()
dingtalk_url = url + '×tamp=' + str(parameter['timestamp']) + '&sign=' + str(parameter['sign'])
for item in data:
if item['status'] == 'firing':
warning_level = '告警级别:' + item['labels']['severity'] + '\n'
warning_name = '告警标题:' + item['labels']['alertname'] + '\n'
warning_start = '告警时间:' + item['startsAt'].split('.')[0].replace('T',' ') + '\n'
warning_message = '告警详情:' +item['annotations']['message'] + '\n'
msg = warning_name + warning_level + warning_start + warning_message
send_dingtalk(msg,dingtalk_url)
@app.route('/',methods=['POST'])
def send():
if request.method == 'POST':
post_data = json.loads(request.get_data())
print(post_data['alerts'])
get_dingtalk_content(post_data['alerts'])
return 'Hello'
if __name__ == '__main__':
app.run(host='0.0.0.0',port=80)
EOF
|