拉取代码及准备工作
¶代码拉取
克隆代码到本地
git clone https://github.com/Yelp/elastalert.git
以下是代码目录:
zhonghongpeng@bogon elastalert % tree ./ -L 1
./
├── Dockerfile-test
├── LICENSE
├── Makefile
├── README.md
├── build
├── changelog.md
├── config.yaml.example
├── dist
├── docker-compose.yml
├── docs
├── elastalert
├── elastalert.egg-info
├── example_rules
├── mytest
├── pytest.ini
├── requirements-dev.txt
├── requirements.txt
├── setup.cfg
├── setup.py
├── supervisord.conf.example
├── tests
├── tox.ini
└── venv
¶pycharm开发环境准备
使用 pycharm 打开项目根目录:
¶配置测试环境和venv
¶venv
venv 可以用来在当前目录创建一个隔离的 python 运行环境,在使用 pycharm 导入项目的时候一般会弹出是否在根目录创建 venv,设置是即可. 来到以下 Preferences
界面确认路径是否正确(venv的基础Python版本以及venv环境路径):
¶测试框架选择
还是 Preferences
窗口,配置 requirements.txt
和测试框架为 pytest
(requirements.txt
不是必要,下面将使用 setup.py
安装依赖):
¶安装依赖
在根目录下 setup.py
文件的依赖数组最后增加 pytest
然后右击该文件:
添加 install
参数后运行安装即可:
安装完成后可以看到在 venv 中导入了以下脚本:
zhonghongpeng@bogon elastalert % tree venv -L 4
venv
├── bin
│ ├── __pycache__
│ │ └── jp.cpython-37.pyc
│ ├── activate
│ ├── activate.csh
│ ├── activate.fish
│ ├── chardetect
│ ├── easy_install
│ ├── easy_install-3.7
│ ├── easy_install-3.9
│ ├── elastalert
│ ├── elastalert-create-index
│ ├── elastalert-rule-from-kibana
│ ├── elastalert-test-rule
│ ├── jirashell
│ ├── jp.py
│ ├── jsonschema
│ ├── natsort
│ ├── pbr
│ ├── pip
│ ├── pip3
│ ├── pip3.7
│ ├── py.test
│ ├── pytest
│ ├── python -> python3.7
│ ├── python3 -> python3.7
│ ├── python3.7 -> /usr/local/bin/python3.7
│ └── stomp
├── include
├── lib
│ └── python3.7
│ └── site-packages
│ ... ...
│ ├── elastalert-0.2.4-py3.7.egg
│ ... ...
└── pyvenv.cfg
119 directories, 44 files
查看 metric_aggregation 代码结构
一些问题
¶Python Import 问题
Traceback (most recent call last):
File "/Applications/PyCharm.app/Contents/plugins/python/helpers/pydev/pydevd.py", line 1477, in _exec
pydev_imports.execfile(file, globals, locals) # execute the script
File "/Applications/PyCharm.app/Contents/plugins/python/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile
exec(compile(contents+"\n", file, 'exec'), glob, loc)
File "/Users/zhonghongpeng/PycharmProjects/elastalert/elastalert/elastalert.py", line 29, in
from . import kibana
ImportError: cannot import name 'kibana' from '__main__' (/Users/zhonghongpeng/PycharmProjects/elastalert/elastalert/elastalert.py)
这是因为使用 python3 以及
¶ES Python 库问题
INFO:elastalert:Background alerts thread 0 pending alerts sent at 2020-12-23 18:32 CST
ERROR:root:Traceback (most recent call last):
File "/Users/zhonghongpeng/PycharmProjects/elastalert/elastalert/elastalert.py", line 1271, in handle_rule_execution
num_matches = self.run_rule(rule, endtime, rule.get('initial_starttime'))
File "/Users/zhonghongpeng/PycharmProjects/elastalert/elastalert/elastalert.py", line 898, in run_rule
if not self.run_query(rule, tmp_endtime, endtime):
File "/Users/zhonghongpeng/PycharmProjects/elastalert/elastalert/elastalert.py", line 632, in run_query
data = self.get_hits_aggregation(rule, datetime.datetime.fromtimestamp(1236472051807 / 1000.0, tz=datetime.timezone.utc), end, index, rule.get('query_key', None))
File "/Users/zhonghongpeng/PycharmProjects/elastalert/elastalert/elastalert.py", line 566, in get_hits_aggregation
body=query, size=0, ignore_unavailable=True)
File "/Users/zhonghongpeng/PycharmProjects/elastalert/venv/lib/python3.7/site-packages/elasticsearch/client/utils.py", line 152, in _wrapped
return func(*args, params=params, headers=headers, **kwargs)
TypeError: deprecated_search() got an unexpected keyword argument 'headers'
具体抛出错误的代码是在 elastalert/__init__.py
的 566 行:
res = self.thread_data.current_es.deprecated_search(index=index, doc_type=rule.get('doc_type'),
body=query, size=0, ignore_unavailable=True)
查看调用报错的方法 deprecated_search
,上面用了一个装饰器 @query_params
:
@query_params(
"_source",
"_source_exclude",
"_source_excludes",
"_source_include",
"_source_includes",
"allow_no_indices",
"allow_partial_search_results",
"analyze_wildcard",
"analyzer",
"batched_reduce_size",
"default_operator",
"df",
"docvalue_fields",
"expand_wildcards",
"explain",
"from_",
"ignore_unavailable",
"lenient",
"max_concurrent_shard_requests",
"pre_filter_shard_size",
"preference",
"q",
"rest_total_hits_as_int",
"request_cache",
"routing",
"scroll",
"search_type",
"seq_no_primary_term",
"size",
"sort",
"stats",
"stored_fields",
"suggest_field",
"suggest_mode",
"suggest_size",
"suggest_text",
"terminate_after",
"timeout",
"track_scores",
"track_total_hits",
"typed_keys",
"version",
)
def deprecated_search(self, index=None, doc_type=None, body=None, params=None):
if "from_" in params:
params["from"] = params.pop("from_")
if not index:
index = "_all"
res = self.transport.perform_request(
"GET", _make_path(index, doc_type, "_search"), params=params, body=body
)
if type(res) == list or type(res) == tuple:
return res[1]
return res
@query_params
是 ES 提供的 Python 客户端库中提供的,主要功能是将传入参数构建成一个 URL query 参数,实现 GET
查询请求,而不使用 POST
,当前报错的代码如下,可以看到 _wrapped
的 return
处的入参中传入了一个命名参数 headers
,但是回顾上面被包装的 deprecated_search
中不存在 headers
命名参数
def query_params(*es_query_params):
"""
Decorator that pops all accepted parameters from method's kwargs and puts
them in the params argument.
"""
def _wrapper(func):
@wraps(func)
def _wrapped(*args, **kwargs):
params = (kwargs.pop("params", None) or {}).copy()
headers = {
k.lower(): v
for k, v in (kwargs.pop("headers", None) or {}).copy().items()
}
if "opaque_id" in kwargs:
headers["x-opaque-id"] = kwargs.pop("opaque_id")
for p in es_query_params + GLOBAL_PARAMS:
if p in kwargs:
v = kwargs.pop(p)
if v is not None:
params[p] = _escape(v)
# don't treat ignore, request_timeout, and opaque_id as other params to avoid escaping
for p in ("ignore", "request_timeout"):
if p in kwargs:
params[p] = kwargs.pop(p)
return func(*args, params=params, headers=headers, **kwargs)
return _wrapped
return _wrapper
所以猜测应该是 ES Python 库的版本问题,查看 venv 中确实存在多个版本的 ES Python 库:
查看 7.0 版本的代码发现确实没有传入 headers
命名参数:
def query_params(*es_query_params):
"""
Decorator that pops all accepted parameters from method's kwargs and puts
them in the params argument.
"""
def _wrapper(func):
@wraps(func)
def _wrapped(*args, **kwargs):
params = {}
if "params" in kwargs:
params = kwargs.pop("params").copy()
for p in es_query_params + GLOBAL_PARAMS:
if p in kwargs:
v = kwargs.pop(p)
if v is not None:
params[p] = _escape(v)
# don't treat ignore and request_timeout as other params to avoid escaping
for p in ("ignore", "request_timeout"):
if p in kwargs:
params[p] = kwargs.pop(p)
return func(*args, params=params, **kwargs)
return _wrapped
return _wrapper
直接将上面没有标明版本号的库从环境删除,即可正常运行。