FELK学习(elastalert源码分析)

在调研elastalert时几乎把整个源码都看了一遍,记录一下几个重要的部分.

核心查询

rule文件配置的filter,最终都会转换成es底层支持的格式, 核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def get_query(filters, starttime=None, endtime=None, sort=True, timestamp_field='@timestamp', to_ts_func=dt_to_ts, desc=False,
five=False):
""" Returns a query dict that will apply a list of filters, filter by
start and end time, and sort results by timestamp.

:param filters: A list of Elasticsearch filters to use.
:param starttime: A timestamp to use as the start time of the query.
:param endtime: A timestamp to use as the end time of the query.
:param sort: If true, sort results by timestamp. (Default True)
:return: A query dictionary to pass to Elasticsearch.
"""
starttime = to_ts_func(starttime)
endtime = to_ts_func(endtime)
filters = copy.copy(filters)
es_filters = {'filter': {'bool': {'must': filters}}}
if starttime and endtime:
es_filters['filter']['bool']['must'].insert(0, {'range': {timestamp_field: {'gt': starttime,
'lte': endtime}}})
if five:
query = {'query': {'bool': es_filters}}
else:
query = {'query': {'filtered': es_filters}}
if sort:
query['sort'] = [{timestamp_field: {'order': 'desc' if desc else 'asc'}}]
return quer

最终转换成的格式如下, 如果查询的结果与期望值不一样,也可以生产的此语句进行排查.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
curl -H 'Content-Type: application/json' -XGET 'http://localhost:9200/demo.demo-*/_search?pretty&_source_include=%40timestamp%2C%2A&ignore_unavailable=true&scroll=30s&size=10000' -d '{
"query": {
"bool": {
"filter": {
"bool": {
"must": [ {
"range": {
"@timestamp": {
"gt": "2020-05-29T11:04:00.198218Z",
"lte": "2020-05-29T11:09:00.198218Z"
}
}
},
{
"wildcard": { # 这里是根据rule文件中的filter条件.
"backendMod": "*senseface*"
}
}
]
}
}
}
},
"sort": [
{
"@timestamp": {
"order": "asc"
}
}
]
}'

match_body

num_hits vs num_matches

1
2
3
4
5
6
7
8
9
10
11
12
13
num_hits is the number of documents returned by elasticsearch for a given query. num_matches is how many times that data matched your rule, each one potentially generating an alert.

If it makes a query over a 10 minute range and gets 10 hits, and you have

type: frequency
num_events: 10
timeframe:
minutes: 10
then you'll get 1 match.

总结:
num_hits表示的是根据filter条件及查询时间段从es返回的记录,而num_matches表示的是预计会产生多少条报警
因此 num_matches = num_hits / num_events, 会四舍五入,所以在告警内容中会发现两者都是这样的关系

写回到es索引:elastalert_status

同样,产生的告警的现场日志在发送到告警介质后都会写入到es索引中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def send_alert(self, matches, rule, alert_time=None, retried=False):
# ...
for alert in rule['alert']:
alert.pipeline = alert_pipeline
try:
alert.alert(matches)
except EAException as e:
self.handle_error('Error while running alert %s: %s' % (alert.get_info()['type'], e), {'rule': rule['name']})
alert_exception = str(e)
else:
self.thread_data.alerts_sent += 1
alert_sent = True
# ...
for match in matches:
alert_body = self.get_alert_body(match, rule, alert_sent, alert_time, alert_exception)
# Set all matches to aggregate together
if agg_id:
alert_body['aggregate_id'] = agg_id
res = self.writeback('elastalert', alert_body, rule)
if res and not agg_id:
agg_id = res['_id']

starttime/endtime/original_time

endtime一般都是执行查询时取的当前时间,因此elastalert默认没有保存,starttime一般是由endtime减去timeframe计算而来,但是如果elastalert重启的话, starttime会跟期望的不太一样,通过源码可以看到还有一个original_time, original_time为存在在es中的上一次执行的endtime, 因此,每次查询的时候,都会从original_time处开始查询, endtime为当前时间

因此starttime最好设置为original_time,使用starttime有产生歧义

1
2
3
4
5
6
7
8
body = {'rule_name': rule['name'],
'endtime': endtime,
'starttime': rule['original_starttime'],
'matches': num_matches,
'hits': max(self.thread_data.num_hits, self.thread_data.cumulative_hits),
'@timestamp': ts_now(),
'time_taken': time_taken}
self.writeback('elastalert_status', body)

当然为了避免elastalert重启时离上次运行的时间间隔太久, 默认情况下启动后会从es中读取starttime,以这个时间点到当前时间为时间窗口,然后以run_every给定的时间进行查询,所以,在这种情况下可能会出现刚重启时就出现大量的查询,很多时间太久的历史日志监控是无意义,因此可以在elastalert.yaml中指定old_query_limit: minutes: 1来限定starttime为当前时间的前一分钟.

enhancement

elastalert提供两种方案可以让在找到匹配项后立即运行增强功能(run_enhancements_first)或者在发送告警前进行操作,比如修改match操作(match_enhancements)

run_enhancements_first

如果在rule配置中指定的了run_enhancements_first,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
def run_rule(self, rule, endtime, starttime=None):
# ...
self.run_query()
if rule.get('run_enhancements_first'):
try:
for enhancement in rule['match_enhancements']:
try:
enhancement.process(match) # point
except EAException as e:
self.handle_error("Error running match enhancement: %s" % (e), {'rule': rule['name']})
except DropMatchException:
continue

match_enhancements

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def send_alert(self, matches, rule, alert_time=None, retried=False):
# ...
if not rule.get('run_enhancements_first') and not retried:
for enhancement in rule['match_enhancements']:
valid_matches = []
for match in matches:
try:
enhancement.process(match) # point
valid_matches.append(match)
except DropMatchException:
pass
except EAException as e:
self.handle_error("Error running match enhancement: %s" % (e), {'rule': rule['name']})
matches = valid_matches
if not matches:
return None
# ...
alert.alert(matches)

很多人关心的elastalert没有保存的endtime字段,其实就可以match_enhancements中添加字段.

自定义告警

自定义邮件格式

由于默认的邮件告警只是简单的把str格式的内容通过邮件发送出去,非常不美观,因此通过jinja2的形式自定义了邮件格式

首先在rule配置中定义email_format: html

由于只是在发送邮件的时候才使用jinja2的html模板,因此需要对alert的类型进行判断

修改的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class BasicMatchString(object):
""" Creates a string containing fields in match for the given rule. """
def __init__(self, rule, match, alert_type): # 新增一个alert_type,为告警类型
self.rule = rule
self.match = match
self.alert_type = alert_type
@staticmethod
def render_html(summary_str, data):
env = Environment(loader=FileSystemLoader(
os.path.join(os.path.dirname(__file__), "templates")))
template = env.get_template('mail.html.tpl')
out = template.render(summary_str=summary_str, data=data)
#with open('mail.html', 'w', encoding='utf-8') as f:
# f.write(out)
return out
def _add_custom_alert_text(self):
missing = self.rule.get('alert_missing_value', '<MISSING VALUE>')
if 'alert_text_args' in self.rule:
alert_text_args = self.rule.get('alert_text_args')
# 判断告警的类型
if 'email' == self.alert_type:
summary_str = self.rule['type'].get_match_str(self.match)
alert_text = str(self.render_html(summary_str, alert_text_args))
# 如果不是email则保存跟源码一致
else:
alert_text = str(self.rule.get('alert_text', ''))
# ...

由于所有的告警类型的基类都是Alerter,因此通过这个来传递告警类型.

1
2
3
4
5
6
7
8
9
10
11
12
class Alerter(object):
#...
def create_alert_body(self, matches):
alert_type = self.get_info()['type'] # 通过get_info 获取alerter实例的type
body = self.get_aggregation_summary_text(matches)
if self.rule.get('alert_text_type') != 'aggregation_summary_only':
for match in matches:
body += str(BasicMatchString(self.rule, match, alert_type)) # 传入BasicMatchString
# Separate text of aggregated alerts with dashes
if len(matches) > 1:
body += '\n----------------------------------------\n'
return body

这样就完成自定义邮件格式了,模板如下:

字段直接在rule文件中alert_text_args中传入即可.

微信报警

elastalert本身支持自定义的告警接入,只需要实现 body = self.create_alert_body(matches)方法即可.不难,难点在于企业微信api如何使用, 代码参考elastalert-wechat-plugin, 亲测可用.

问题

  1. 如果修改源码后rule执行出现问题, 则会自动地生成exception写入到索引中且该rule会被disable掉,直到不再报错之前都不再执行.

1
2
03:18:59.552Z ERROR elastalert-server:
ProcessController: INFO:elastalert:Disabled rules are: ['index realty find invalid keyword']
  1. es中的until字段记录了下次执行的时候(由apscheduler框架支持)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
{
"_index": "demo.elastalert_status",
"_type": "silence",
"_id": "AXJt-tKUPz_v1Z2rFGRX",
"_score": null,
"_source": {
"exponent": 0,
"rule_name": "index realty find invalid keyword.senseface",
"@timestamp": "2020-06-01T03:44:13.967765Z",
"until": "2020-06-01T03:49:13.967756Z" # here
},
"fields": {
"@timestamp": [
1590983053967
],
"until": [
1590983353967
]
},
"sort": [
1590983053967
]
}

参考文章: