Z.S.K.'s Records

关于业务监控重试机制

在使用监控系统的时候,一般在定义报警条件的时候都会在当某某报警连续出现N次之后才会被定义为报警(当然核心服务可能只出现一次就报警), 当监控系统无法满足复杂业务需求的时候,可能就得自己写监控了, 那么必然经常会面临异常的逻辑发生, 最典型的比如网络不稳定、机房割接等操作,都有可能产生网络抖动,在这样的情况下并非服务不可用, 如何使监控更加健硕, 而不是告警满天飞就变得很有必要.

那么重试机制又该如何保障能够真正发现生产上的问题呢? 因为有可能频繁地重试会对服务产生影响.

接口重试

首先,要指出的是,这是说到的需要的重试的过程可以分为两类:

  • 如果是网络协议出现异常, 比如说, http协议层面相关的问题, 连接错误、连接超时, 这类的可以直接使用网络框架来做重试, 这个大部分都有参数能够指定

比如代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
class Operator_Requests(object):
def __init__(self, **kwargs):
self.conn = self.req_conn(kwargs.get("max_retries", HTTP_MAX_RETRIES)) # 重试次数

@staticmethod
def req_conn(max_retries):
conn = requests.session()
conn.mount("http://", HTTPAdapter(max_retries=max_retries))
conn.mount("https://", HTTPAdapter(max_retries=max_retries))
return conn

@staticmethod
def type_map(url, method, wrong, code, msg, rnt):
msg = {
"ConnUrl": url,
"UrlMethod": method,
"ErrorType": wrong,
"HttpCode": code,
"ResponseMsg": msg,
"rntCode": rnt
}
return msg

def http_get(self, url, data=None, headers=None, timeout=HTTP_TIMEOUT):
try:
res = self.conn.get(url=url,
params=data,
headers=headers,
timeout=timeout)
en_code = res.encoding if res.encoding else "utf-8"
except (requests.exceptions.ConnectionError,
requests.exceptions.HTTPError) as e:
msg = self.type_map(url, "GET", "ConnError", "", str(e), 3)
LOGGER.error(msg)
return 3, msg
except requests.exceptions.Timeout as e:
msg = self.type_map(url, "GET", "ConnTimeout", "", str(e), 5)
LOGGER.error(msg)
return 5, msg
else:
if 200 == res.status_code:
return 0, res
else:
msg = self.type_map(
url, "GET", "NOT200",
str(res.status_code) if res.status_code else "Null",
res.text.encode(en_code) if res.text else "", 7)
LOGGER.error(msg)
return 7, msg

def http_post(self, url, data, headers=None):
try:
res = self.conn.post(url=url, data=data, headers=headers)
en_code = res.encoding if res.encoding else "utf-8"
except (requests.exceptions.ConnectionError,
requests.exceptions.HTTPError) as e:
msg = self.type_map(url, "POST", "ConnError", "", str(e), 3)
LOGGER.error(msg)
return 3, msg
except requests.exceptions.Timeout as e:
msg = self.type_map(url, "POST", "ConnTimeout", "", str(e), 5)
LOGGER.error(msg)
return 5, msg
else:
if 200 == res.status_code:
return 0, res
else:
msg = self.type_map(
url, "POST", "NOT200",
str(res.status_code) if res.status_code else "Null",
res.text.encode(en_code) if res.text else "", 7)
LOGGER.error(msg)
return 7, msg

这里相当于又封装了一层http常用的方法, HTTP_MAX_RETRIES指定了http接口的重试次数, 这个重试遵循http协议的相关标准, 一般不需要太关注

http接口上出现的错误,在返回码上就很容易判断,从上面的代码就可以看出

功能重试

对于单个接口上的重试依托底层协议去完成. 重要的是功能重试

因为对于某个具体的功能业说,只有人才真正明白涉及的逻辑

比如想要监控页面登录这个功能是否正常, 那么需要定义好以下内容:

  1. 要完成登录操作的输入是什么?
  2. 登录成功后的输出又是什么?
  3. 如何定义监控预期?

关于第3点,监控预期指的是怎么判断脚本执行完登录这个操作后是成功还是失败?那么必定存在一个判断逻辑

现如今,微服务大行其道, 登录功能可能涉及到多个服务间的调用, 任何一个服务出了问题,都将导致登录失败.

那是不是每个服务都需要去写判断逻辑呢?

答案是否定的,所谓的功能监控,也就是黑盒监控(Blackbox),是不需要关心这个功能会调用多少个服务的,把整个服务看成是个整体,单一的输入, 单一的输出, 然后用期望值与输出进行比较来判断登录功能是否正常

在功能上重试重试,很容易想到的二个参数: 重试次数、间隔时间

有了这两个条件, 可以很大程度上减少产生误报的情况.

可以写一个实现重试功能的装饰器,这样就不用在每个业务逻辑上都把这个重试函数都实现一遍

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def do_retry(retry_times=FUNC_RETRIES, delay_seconds=FUNC_RETRY_DELAY_SECONDS):
"""
Decorator for function retry .
@param retry_times: retry times
@param delay_seconds: sleep time after per try
@return: the return value of the called function
@raise Exception: raises an exception when an error occurred after last time

"""
def deco(f):
@wraps(f)
def wrapper(*args, **kwargs):
_retry_times, _delay_seconds = retry_times, delay_seconds
_err = None
while _retry_times > 0:
try:
_err = None
return f(*args, **kwargs)
except RetryException as e:
# e is a RetryException instance
# so can call e.get_err()
_err = e
if _delay_seconds > 0:
time.sleep(_delay_seconds)
_retry_times -= 1
if _err:
event = _err.get_err()
print("_err-->", type(event), event)
call_back(event)

return wrapper

return deco

那么, 在业务逻辑上需要进行重试的时候, 只需要加入这个装饰器即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@do_retry(2, 10)
# @run_time() # @run_time(10)
def login():
rnt, res = http_post(url=url, data=data, headers=headers)
if rnt > 0:
# httpcode is NOT 200
# res:
# error msg
# type: dict
raise RetryException(res, rnt)
else:
# httpcode 200
_json = res.json()
# but response is not as expect
if 0 != _json.get("error_code"):
event = self._buildup(_json)
raise RetryException(event, rnt)

这里使用了二层结构:

  1. 如果http状态码不为200,则抛出exception重试
  2. 如果返回码为200,但是返回的数据不是期望值,抛出exception重试,这里的期望值为登录成功后的可以从response中get到error_code=0

如果任一模块出现问题, 则error_code必不会为0, 抛出exception, 这个exceptiondo_retry中被捕获后进行重试操作, 这里指定了重试次数及间隔时间. 在do_retry中,如果超过了重试次数后还是发生异常, 则会调用call_back函数, 那么就可以自定义触发的动作, 比如邮件报警等

这里,使用了exception, 也更加自由地对错误进行定制.

当然, 生产环境下对功能进行监控,最好能够隔离生产数据, 比如使用一个只用于登录监控的用户名与密码, 这个用户可能没用任何权限, 只能够登录就行.

另一个见人见智了,最好能够做到功能监控随时能禁, 随时能启用, 或者说告警随时能屏蔽.

从这里也可以看出, 如果我只是对登录这个功能进行监控, 是不需要care登录这个功能涉及多少个模块的, 只需要关心最后收到的response是否符合,如果不符合预期,说明登录这个功能是异常的.

至于登录进行之后再做其它的操作有问题, 则不属于登录这个功能监控的范畴.

上述代码在本人写的大部分的功能监控一直使用, 后来发现个更强大的Retry库tenacity, 自由性更高, github在这里,开箱即用.

参考文章:

转载请注明出处https://izsk.me

Z.S.K. wechat
Scan Me To Read on Phone
I know you won't do this,but what if you did?