文章目录
-
- Elasticsearch 查询超过10000 的解决方案 - Python
-
- 法1:修改 设置 max_result_size (不推荐)
- 法2: scroll 分页
- 法3: search_after 分页
Elasticsearch 查询超过10000 的解决方案 - Python
法1:修改 设置 max_result_size (不推荐)
# 调大查询窗口大小,比如100w (不推荐,慎用) PUT test/_settings { "index.max_result_window": "1000000" } # 查看 查询最大数 GET test/_settings --- { "demo_scroll" : { "settings" : { "index" : { "number_of_shards" : "5", "provided_name" : "demo_scroll", "max_result_window" : "1000000", "creation_date" : "1680832840425", "number_of_replicas" : "1", "uuid" : "OLV5W_D9R-WBUaZ_QbGeWA", "version" : { "created" : "6082399" } } } } }
法2: scroll 分页
def getData(self): current_time = datetime.datetime.now() one_hour_ago = current_time - datetime.timedelta(hours=24) current_time_str = current_time.strftime('%Y-%m-%d %H:%M:%S') hours_ago_str = one_hour_ago.strftime('%Y-%m-%d %H:%M:%S') # 改为从elasticsearch读取数据 es = Elasticsearch(hosts='http://127.0.0.1/9200', timeout=1200) size = 10000 query_scroll = { "size": size, "query": { "range": { "create_time.keyword": { "gte": hours_ago_str.__str__(), "lte": current_time_str.__str__() } } }, "_source": ["ip_address", "OS", "host", "user", "create_time"], } scroll = "10m" # 该次连接超时时间设置 result = [] # first init_res = es.search(index="nac-users", body=query_scroll, scroll=scroll) scroll_id = init_res["_scroll_id"] for item in init_res["hits"]["hits"]: result.append({ 'id': item['_id'], 'ip_address': item['_source']['ip_address'], 'operating_system': item['_source']['OS'], 'hostname': item['_source']['host'], 'username': item['_source']['user'], 'date_t': item['_source']['create_time'], }) i = 0 while i < 16: # 剩下的数据 一天 24 小时数据估计不会超过 160000 res = es.scroll(scroll_id=scroll_id, scroll=scroll) if len(res["hits"]["hits"]) == 0: break for item in res["hits"]["hits"]: result.append({ 'id': item['_id'], 'ip_address': item['_source']['ip_address'], 'operating_system': item['_source']['OS'], 'hostname': item['_source']['host'], 'username': item['_source']['user'], 'date_t': item['_source']['create_time'], }) i = i + 1 # 原始的 # {"query": {"match_all": {}}, "size": 10000} # res = es.search(index="nac-users", body=query_scroll) # # result = [] # for item in res['hits']['hits']: # result.append({ # 'id': item['_id'], # 'ip_address': item['_source']['ip_address'], # 'operating_system': item['_source']['OS'], # 'hostname': item['_source']['host'], # 'username': item['_source']['user'], # 'date_t': item['_source']['create_time'], # }) self.data = pd.DataFrame(result)
法3: search_after 分页
def getData(self): current_time = datetime.datetime.now() one_hour_ago = current_time - datetime.timedelta(hours=24) current_time_str = current_time.strftime('%Y-%m-%d %H:%M:%S') hours_ago_str = one_hour_ago.strftime('%Y-%m-%d %H:%M:%S') # 改为从elasticsearch读取数据 es = Elasticsearch(hosts='http://127.0.0.1:9200', timeout=1200) size = 10000 query_scroll = { "size": size, "query": { "range": { "create_time.keyword": { "gte": hours_ago_str.__str__(), "lte": current_time_str.__str__() } } }, "sort": [ { "create_time.keyword": { "order": "desc" } } ], "_source": ["ip_address", "OS", "host", "user", "create_time"], } result = [] init_res = es.search(index="nac-users", body=query_scroll) if len(init_res["hits"]["hits"]) == 0: self.data = pd.DataFrame(result) return sort = init_res["hits"]["hits"][0]["sort"] # 我这里是用时间来排序的,所以取到的是时间字段 for item in init_res["hits"]["hits"]: result.append({ 'id': item['_id'], 'ip_address': item['_source']['ip_address'], 'operating_system': item['_source']['OS'], 'hostname': item['_source']['host'], 'username': item['_source']['user'], 'date_t': item['_source']['create_time'], }) i = 0 while i < 16: query_scroll["search_after"] = sort res = es.search(index="nac-users", body=query_scroll) sort = res["hits"]["hits"][0]["sort"] if len(res["hits"]["hits"]) == 0: break for item in res["hits"]["hits"]: result.append({ 'id': item['_id'], 'ip_address': item['_source']['ip_address'], 'operating_system': item['_source']['OS'], 'hostname': item['_source']['host'], 'username': item['_source']['user'], 'date_t': item['_source']['create_time'], }) i = i + 1 self.data = pd.DataFrame(result)
还有一个方法是在参考文章2里面提到的
我看参考文章里说到search_after 分页要比scroll快,但是在我的数据上是scroll要快很多,不是特别清楚,可能我这里的数据暂时只有2w多一点,感觉用到search_after 分页需要排序,可能是排序的字段的问题,时间字段我存的是字符串格式,,如有可以修改的地方,欢迎大家指正~ 有更多可以参考的方法欢迎贴在评论区供大家参考~
【参考1】https://juejin.cn/post/7224369270141993019
【参考2】https://blog.csdn.net/u011250186/article/details/125483759