我正在尝试使用Elasticsearch parallel_bulk导入大量数据 . 这是我的索引结构:
{
"_index" : "myindex",
"_type" : domain,
"_id" : md5(email),
"_score" : 1.0,
"_source" : {
"purchase_date" : purchase_date,
"amount" : amount,
}
}
这是我的python代码:
def insert(input_file):
paramL = []
with open(input_file) as f:
for line in f:
line = line.rstrip()
fields = line.split(',')
purchase_date = fields[0]
amount = fields[1]
email = fields[2]
id_email = getMD5(email)
doc = {
"email": email,
"purchase_date": purchase_date,
"amount": amount _date
}
ogg = {
'_op_type': 'index',
'_index': index_param,
'_type': doctype_param,
'_id': id_email,
'_source': doc
}
paramL.append(ogg)
if len(paramL) > 500000:
for success, info in helpers.parallel_bulk(client=es, actions=paramL, thread_count=4):
if not success:
print "Insert failed: ", info
# empty paramL if size > 5.000.000
del paramL[:]
该文件包含42.644.394行,我想每次列表“paramL”约为5.000.000个元素时插入数据 . 因此,当我运行脚本时,它会插入大约436.226个值,直到它崩溃并出现以下错误:
回溯(最近一次调用最后一次):文件“test-2-0.py”,第133行,在main()文件“test-2-0.py”,第131行,在主插入(args.file)文件中“test-2-0.py”,第82行,插入成功,信息在helpers.parallel_bulk(client = es,actions = paramL,thread_count = 4):文件“/usr/local/lib/python2.7/ dist-packages / elasticsearch / helpers / init.py“,第306行,在parallel_bulk _chunk_actions(actions,chunk_size,max_chunk_bytes,client.transport.serializer)文件”/usr/lib/python2.7/multiprocessing/pool.py“,第668行,在下一个提升值elasticsearch.exceptions.ConnectionTimeout:ConnectionTimeout由 - ReadTimeoutError引起(HTTPConnectionPool(host = u'127.0.0.1',port = 9200):读取超时 . (读取超时= 10))
我还尝试在Elasticsearch构造函数中增加超时传递
es = Elasticsearch(['127.0.0.1'], request_timeout=30)
但结果是一样的 .
1 回答
真诚地,我从来没有用如此多的文档批量导入来表明 . 我不知道为什么会出现这个错误 . 在你的情况下,我建议不要创建一个列表-paramL - 但是用生成器函数来管理你的数据 - 正如弹性开发者在弹性论坛中对大量大量摄取的最佳实践所描述的那样:https://discuss.elastic.co/t/helpers-parallel-bulk-in-python-not-working/39498/3 . 像这样的东西:
您可以在java虚拟机中增加专用于弹性的空间来编辑此文件
/etc/elasticsearch/jvm.options
要分配2 GB的RAM,您应该更改 - 如果您的计算机有4 GB,您应该保留近1 GB的系统,因此您可以分配最多3 GB:然后你必须重新启动服务
然后再试一次 . 祝好运