我在连接到我的Java代码本地运行的ElasticSearch节点时遇到问题,该代码作为提交给Spark的作业(在本地运行)运行 . 但是,当我不使用Spark时,连接没问题 . 同时运行Python作业并将其提交给spark工作正常 .
我知道Java需要通过端口9300而不是9200(HTTP端口)连接 . 然而,我总是得到同样的例外,在阅读或写作上没有区别:
16/08/04 16:51:55 ERROR NetworkClient: Node [The server localhost failed to respond with a valid HTTP response] failed (localhost:9300); no other nodes left - aborting...
Exception in thread "main" org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[localhost:9300]]
at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:102)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:282)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:266)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:270)
at org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:108)
at org.elasticsearch.hadoop.rest.RestClient.discoverNodes(RestClient.java:90)
at org.elasticsearch.hadoop.rest.InitializationUtils.discoverNodesIfNeeded(InitializationUtils.java:61)
at org.elasticsearch.hadoop.mr.EsInputFormat.getSplits(EsInputFormat.java:434)
at org.elasticsearch.hadoop.mr.EsInputFormat.getSplits(EsInputFormat.java:415)
at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:120)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1307)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.take(RDD.scala:1302)
at org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1342)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.first(RDD.scala:1341)
at org.apache.spark.api.java.JavaPairRDD.first(JavaPairRDD.scala:211)
at com.dd.mediaforce.spark.most_popular.ExecutorMostPopular.main(ExecutorMostPopular.java:564)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
我们在许多节点上运行Spark和ElasticSearch . Python代码在这里运行正常,但尝试使用ES的这种设置的Java代码也没有帮助解决问题 .
我正在使用的代码从Java连接:
SparkConf _sparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("Test");
JavaSparkContext jsc = new JavaSparkContext(_sparkConf);
Configuration conf = new Configuration();
conf.set("cluster.name", "our_clustername");
conf.set("es.nodes", "localhost");
conf.setInt("es.port", 9300);
conf.set("es.resource", index_and_type);
JavaPairRDD readRdd = jsc.newAPIHadoopRDD(conf, org.elasticsearch.hadoop.mr.EsInputFormat.class, org.apache.hadoop.io.NullWritable.class, org.elasticsearch.hadoop.mr.LinkedMapWritable.class);
System.out.println(readRdd.first());
jsc.stop();
使用TransportClient(并且没有Spark)的以下Java代码如上所述连接到ES没有问题,写入和读取都正常工作:
Client client = TransportClient.builder().settings(settings).build().addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300));
ImmutableOpenMap<String, IndexMetaData> indices = client.admin().cluster().prepareState().get().getState().getMetaData().getIndices();
for (ObjectCursor<IndexMetaData> value : indices.values()) {
log.info("Index: " + value.index + " : " + value.toString());
}
GetResponse response = client.prepareGet("index_name", "type_name", "1").get();
log.info(response.getIndex() + " : " + response.getId() + " : " + response.isExists());
String field_id = "6";
IndexRequest indexRequest = new IndexRequest("index_name", "type", "2")
.source(jsonBuilder()
.startObject()
.prettyPrint()
.field("field_id", field_id)
.field("another_field", "value")
.field("integer_field", 100)
.endObject());
UpdateRequest updateRequest = new UpdateRequest("index_name", "type_name", article_id)
.doc(jsonBuilder()
.startObject()
.prettyPrint()
.field("field_id", field_id)
.field("another_field", "value")
.field("integer_field", 100)
.endObject())
.upsert(indexRequest);
UpdateResponse responseUpdate = client.update(updateRequest).get();
log.info(responseUpdate.getIndex() + " : " + responseUpdate.getGetResult() + " : " + responseUpdate.getType());
client.close();
任何建议都是受欢迎的,因为我已经被困在这里好几天了,没有进一步的印象 . 我显然用Google搜索了问题并在StackOverflow上进行了搜索,但到目前为止我还没有找到问题的答案 .
为了完整性,一些Python代码也使用Spark可以很好地读取和写入ES .
conf = SparkConf()
conf = conf.setAppName('Test')
sc = SparkContext(conf=conf)
#Omitting some of the code in creating some_rdd on Spark:
index_and_type = index_name + '/type_name'
groovy_script = "if (ctx._source.%s) { ctx._source.%s+=value } else { ctx._source.%s=value }" % (field, field, field)
es_db_connection_dictionary = {
"es.nodes": db_hosts,
"es.port": db_port,
"es.resource": index_and_type,
"es.write.operation": "upsert",
"es.mapping.id": "field_id",
"es.update.script": groovy_script,
"es.update.script.params": "value:%s" % integer_field,
"es.http.timeout": "10s"
}
es_input = views_tuple_rdd.map(lambda item: (item[0],
{
'field_id': item[0],
"integer_field": item[1],
"another_field": client_name,
}))
es_input.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_db_connection_dictionary)
1 回答
通常,如果使用elasticsearch-spark连接器,如果默认端口为9200,则不需要使用端口9300.它具有与常规elasticsearch API不同的行为 .
而且您似乎也在使用与弹性搜索不兼容的连接器版本 . 这是一个常见的错误,因为它们主要是2.x .
我相信弹性搜索5.x并不是这样,他们已将所有其他弹性产品版本与之对齐 .