elastalert聚合告警

拉取代码及准备工作

代码拉取

克隆代码到本地

git clone https://github.com/Yelp/elastalert.git

以下是代码目录:

zhonghongpeng@bogon elastalert % tree ./ -L 1
./
├── Dockerfile-test
├── LICENSE
├── Makefile
├── README.md
├── build
├── changelog.md
├── config.yaml.example
├── dist
├── docker-compose.yml
├── docs
├── elastalert
├── elastalert.egg-info
├── example_rules
├── mytest
├── pytest.ini
├── requirements-dev.txt
├── requirements.txt
├── setup.cfg
├── setup.py
├── supervisord.conf.example
├── tests
├── tox.ini
└── venv

pycharm开发环境准备

使用 pycharm 打开项目根目录:

image-20201223153727704

配置测试环境和venv

venv

venv 可以用来在当前目录创建一个隔离的 python 运行环境,在使用 pycharm 导入项目的时候一般会弹出是否在根目录创建 venv,设置是即可. 来到以下 Preferences 界面确认路径是否正确(venv的基础Python版本以及venv环境路径):

image-20201223154238353

测试框架选择

还是 Preferences 窗口,配置 requirements.txt 和测试框架为 pytestrequirements.txt不是必要,下面将使用 setup.py 安装依赖):

image-20201223154047862

安装依赖

在根目录下 setup.py 文件的依赖数组最后增加 pytest

image-20201223155006145

然后右击该文件:

image-20201223190839038

添加 install 参数后运行安装即可:

image-20201223190908119

安装完成后可以看到在 venv 中导入了以下脚本:

zhonghongpeng@bogon elastalert % tree venv -L 4
venv
├── bin
│   ├── __pycache__
│   │   └── jp.cpython-37.pyc
│   ├── activate
│   ├── activate.csh
│   ├── activate.fish
│   ├── chardetect
│   ├── easy_install
│   ├── easy_install-3.7
│   ├── easy_install-3.9
│   ├── elastalert
│   ├── elastalert-create-index
│   ├── elastalert-rule-from-kibana
│   ├── elastalert-test-rule
│   ├── jirashell
│   ├── jp.py
│   ├── jsonschema
│   ├── natsort
│   ├── pbr
│   ├── pip
│   ├── pip3
│   ├── pip3.7
│   ├── py.test
│   ├── pytest
│   ├── python -> python3.7
│   ├── python3 -> python3.7
│   ├── python3.7 -> /usr/local/bin/python3.7
│   └── stomp
├── include
├── lib
│   └── python3.7
│       └── site-packages
│				... ...
│           ├── elastalert-0.2.4-py3.7.egg
│				... ...
└── pyvenv.cfg

119 directories, 44 files

查看 metric_aggregation 代码结构

一些问题

Python Import 问题

Traceback (most recent call last):
  File "/Applications/PyCharm.app/Contents/plugins/python/helpers/pydev/pydevd.py", line 1477, in _exec
    pydev_imports.execfile(file, globals, locals)  # execute the script
  File "/Applications/PyCharm.app/Contents/plugins/python/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile
    exec(compile(contents+"\n", file, 'exec'), glob, loc)
  File "/Users/zhonghongpeng/PycharmProjects/elastalert/elastalert/elastalert.py", line 29, in 
    from . import kibana
ImportError: cannot import name 'kibana' from '__main__' (/Users/zhonghongpeng/PycharmProjects/elastalert/elastalert/elastalert.py)

这是因为使用 python3 以及

image-20201223191219999

ES Python 库问题

INFO:elastalert:Background alerts thread 0 pending alerts sent at 2020-12-23 18:32 CST
ERROR:root:Traceback (most recent call last):
  File "/Users/zhonghongpeng/PycharmProjects/elastalert/elastalert/elastalert.py", line 1271, in handle_rule_execution
    num_matches = self.run_rule(rule, endtime, rule.get('initial_starttime'))
  File "/Users/zhonghongpeng/PycharmProjects/elastalert/elastalert/elastalert.py", line 898, in run_rule
    if not self.run_query(rule, tmp_endtime, endtime):
  File "/Users/zhonghongpeng/PycharmProjects/elastalert/elastalert/elastalert.py", line 632, in run_query
    data = self.get_hits_aggregation(rule, datetime.datetime.fromtimestamp(1236472051807 / 1000.0, tz=datetime.timezone.utc), end, index, rule.get('query_key', None))
  File "/Users/zhonghongpeng/PycharmProjects/elastalert/elastalert/elastalert.py", line 566, in get_hits_aggregation
    body=query, size=0, ignore_unavailable=True)
  File "/Users/zhonghongpeng/PycharmProjects/elastalert/venv/lib/python3.7/site-packages/elasticsearch/client/utils.py", line 152, in _wrapped
    return func(*args, params=params, headers=headers, **kwargs)
TypeError: deprecated_search() got an unexpected keyword argument 'headers'

具体抛出错误的代码是在 elastalert/__init__.py 的 566 行:

                res = self.thread_data.current_es.deprecated_search(index=index, doc_type=rule.get('doc_type'),
                                                                    body=query, size=0, ignore_unavailable=True)

查看调用报错的方法 deprecated_search,上面用了一个装饰器 @query_params

    @query_params(
        "_source",
        "_source_exclude",
        "_source_excludes",
        "_source_include",
        "_source_includes",
        "allow_no_indices",
        "allow_partial_search_results",
        "analyze_wildcard",
        "analyzer",
        "batched_reduce_size",
        "default_operator",
        "df",
        "docvalue_fields",
        "expand_wildcards",
        "explain",
        "from_",
        "ignore_unavailable",
        "lenient",
        "max_concurrent_shard_requests",
        "pre_filter_shard_size",
        "preference",
        "q",
        "rest_total_hits_as_int",
        "request_cache",
        "routing",
        "scroll",
        "search_type",
        "seq_no_primary_term",
        "size",
        "sort",
        "stats",
        "stored_fields",
        "suggest_field",
        "suggest_mode",
        "suggest_size",
        "suggest_text",
        "terminate_after",
        "timeout",
        "track_scores",
        "track_total_hits",
        "typed_keys",
        "version",
    )
    def deprecated_search(self, index=None, doc_type=None, body=None, params=None):
        if "from_" in params:
            params["from"] = params.pop("from_")

        if not index:
            index = "_all"
        res = self.transport.perform_request(
            "GET", _make_path(index, doc_type, "_search"), params=params, body=body
        )
        if type(res) == list or type(res) == tuple:
            return res[1]
        return res

@query_params 是 ES 提供的 Python 客户端库中提供的,主要功能是将传入参数构建成一个 URL query 参数,实现 GET 查询请求,而不使用 POST,当前报错的代码如下,可以看到 _wrappedreturn 处的入参中传入了一个命名参数 headers,但是回顾上面被包装的 deprecated_search 中不存在 headers 命名参数

def query_params(*es_query_params):
    """
    Decorator that pops all accepted parameters from method's kwargs and puts
    them in the params argument.
    """

    def _wrapper(func):
        @wraps(func)
        def _wrapped(*args, **kwargs):
            params = (kwargs.pop("params", None) or {}).copy()
            headers = {
                k.lower(): v
                for k, v in (kwargs.pop("headers", None) or {}).copy().items()
            }

            if "opaque_id" in kwargs:
                headers["x-opaque-id"] = kwargs.pop("opaque_id")

            for p in es_query_params + GLOBAL_PARAMS:
                if p in kwargs:
                    v = kwargs.pop(p)
                    if v is not None:
                        params[p] = _escape(v)

            # don't treat ignore, request_timeout, and opaque_id as other params to avoid escaping
            for p in ("ignore", "request_timeout"):
                if p in kwargs:
                    params[p] = kwargs.pop(p)
            return func(*args, params=params, headers=headers, **kwargs)

        return _wrapped

    return _wrapper

所以猜测应该是 ES Python 库的版本问题,查看 venv 中确实存在多个版本的 ES Python 库:

image-20201223184233434

查看 7.0 版本的代码发现确实没有传入 headers 命名参数:

def query_params(*es_query_params):
    """
    Decorator that pops all accepted parameters from method's kwargs and puts
    them in the params argument.
    """

    def _wrapper(func):
        @wraps(func)
        def _wrapped(*args, **kwargs):
            params = {}
            if "params" in kwargs:
                params = kwargs.pop("params").copy()
            for p in es_query_params + GLOBAL_PARAMS:
                if p in kwargs:
                    v = kwargs.pop(p)
                    if v is not None:
                        params[p] = _escape(v)

            # don't treat ignore and request_timeout as other params to avoid escaping
            for p in ("ignore", "request_timeout"):
                if p in kwargs:
                    params[p] = kwargs.pop(p)
            return func(*args, params=params, **kwargs)

        return _wrapped

    return _wrapper

直接将上面没有标明版本号的库从环境删除,即可正常运行。


   转载规则


《elastalert聚合告警》 阿钟 采用 知识共享署名 4.0 国际许可协议 进行许可。
  目录