首页 文章

Elasticsearch中的嵌套查询?

提问于
浏览
1

我的团队拥有多个仪表板,正在考虑转向Elasticsearch以整合软件堆栈的可能性 . 我们公开的一种常见图表就像“每天结束时的待处理工作流程是什么?” . 以下是一些示例数据:

day workflow_id version status
20151101    1   1   In Progress
20151101    2   1   In Progress
20151102    1   2   In Progress
20151102    3   1   In Progress
20151102    4   1   In Progress
20151102    2   2   Completed
20151103    1   3   Completed
20151103    3   2   In Progress
20151104    3   3   Completed
20151105    4   2   Completed

每次在工作流程中发生更改时,都会插入新记录,这可能会也可能不会更改状态 . 具有max(版本)的记录是workflow_id的最新数据 .

目标是制作一个图表,以显示每天结束时“进行中”和“已完成”工作流程的总数 . 这应该只考虑具有最大版本号的记录 . 这可以在SQL中使用嵌套查询完成:

with 

snapshot_dates as 
(select distinct day from workflow),

snapshot as 
(select d.day, w.workflow_id, max(w.version) as max_version
from snapshot_dates d, workflow w
where d.day >= w.day
group by d.day, w.workflow_id
order by d.day, w.workflow_id)

select s.day, w.status, count(1) 
from workflow w join snapshot s on w.workflow_id=s.workflow_id and w.version = s.max_version
group by s.day, w.status
order by s.day, w.status;

以下是查询的预期输出:

day,status,count  
20151101,In Progress,2  
20151102,Completed,1  
20151102,In Progress,3  
20151103,Completed,2  
20151103,In Progress,2  
20151104,Completed,3  
20151104,In Progress,1  
20151105,Completed,4

我仍然是Elasticsearch的新手,并想知道Elasticsearch是否可以在不使用应用程序端逻辑的情况下通过正确定义映射和查询来执行类似的查询 . 更一般地说,使用Elasticsearch解决此类问题的最佳做法是什么?

1 回答

  • 0

    我试图使用bucket selector aggregation找到解决方案,但我一度陷入困境 . 我在elasticsearch forum中讨论了同样的问题 . 以下是Christian Dahlqvist的建议 .

    除此之外,您还可以将记录编入具有唯一标识符的工作流程中心索引,例如:工作流ID,作为文档ID . 如果同一工作流程中有多个更新,则每个更新都将导致更新,并保留最新状态 . 在此索引中运行聚合以查找当前或最新状态将更加高效和可扩展,因为每个工作流只有一个记录,并且不需要根据与其他文档的关系过滤掉文档 .

    因此,根据此建议,您应该在索引时使用 Workflow Id 作为文档ID . 每当有该工作流程的更新时,您都可以使用工作流程ID更新新版本和日期 . 假设索引名称为 workflow ,索引类型为 workflow_status . 因此,此 workflow_status 类型的映射将如下所示:

    {
        "workflow_status": {
            "properties": {
                "date": {
                    "type": "date",
                    "format": "strict_date_optional_time||epoch_millis"
                },
                "status": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "version": {
                    "type": "long"
                },
                "workFlowId": {
                    "type": "long"
                }
            }
        }
    }
    

    继续将文档添加/更新为此索引类型,将 workFlowId 保留为文档ID .

    现在为了明天地显示图表,您可能需要创建另一个索引类型,假设 per_day_workflow 具有以下映射:

    {
        "per_day_workflow": {
            "properties": {
                "date": {
                    "type": "date",
                    "format": "strict_date_optional_time||epoch_millis"
                },            
                "in_progress": {
                    "type": "long"
                },
                "completed": {
                    "type": "long"
                }
            }
        }
    }
    

    该索引将保存每天的数据 . 因此,您需要创建一个将在一天结束时运行的作业,并使用以下聚合搜索从 workflow_status 索引类型中获取总计"In Progress"&"Completed"工作流:

    POST http://localhost:9200/workflow/workflow_status/_search?search_type=count
    
        {
            "aggs": {
                "per_status": {
                    "terms": {
                        "field": "status"
                    }
                }
            }
        }
    

    响应将如下所示(我在2015-11-02上针对您的示例数据运行):

    {
        "took": 3,
        "timed_out": false,
        "_shards": {
            "total": 5,
            "successful": 5,
            "failed": 0
        },
        "hits": {
            "total": 4,
            "max_score": 0,
            "hits": []
        },
        "aggregations": {
            "per_status": {
                "doc_count_error_upper_bound": 0,
                "sum_other_doc_count": 0,
                "buckets": [
                    {
                        "key": "In Progress",
                        "doc_count": 3
                    },
                    {
                        "key": "Completed",
                        "doc_count": 1
                    }
                ]
            }
        }
    }
    

    从此响应中,您需要提取 In ProgressCompleted 计数,并将它们添加到 per_day_workflow 索引类型与今天's'日期 .

    现在,当您需要每天的图表数据时,您可以轻松地从此 per_day_workflow 索引类型中获取 .

相关问题