我正在Jupyter笔记本上使用 pyspark
,在IP“ spark://remote:port ”上运行 Spark 2.1.1
集群(spark master IP)我能够成功创建SparkContext .
但是, I want to read spark_master_ip and spark.cores.max from a .properties file (instead of hard coding it) . 当我尝试在' myspark_config.properties '文件(我解析并成功读取)中读取我的自定义spark属性文件时,但是当我尝试创建SparkContext()时,我得到以下Java网关异常 . 这是我的代码:
import pyspark
from pprint import pprint
from pyspark import SparkConf
def getproperties():
"""Get Spark configuration properties in python dictionary"""
global properties
properties = dict()
with open('myspark_config.properties') as f:
for line in f:
if not line.startswith('#') and not line.startswith('\n'):
tokens = line.split('=')
tokens[0] = tokens[0].strip()
tokens[1] = "=".join(tokens[1:])
properties[tokens[0]] = tokens[1].strip()
f.close()
pprint(properties)
return(properties)
properties = getproperties()
conf = (SparkConf()
.setMaster(properties["spark_master_url"])
.setAppName("testApp")
.set('spark.cores.max',properties["spark_app_cores"])
.set('spark.executor.memory',properties["spark_app_memory"])
.set('spark.dynamicAllocation.enabled','true')
.set('spark.shuffle.service.enabled','true')
)
# conf = (SparkConf()
# .setMaster("spark://remote:port")
# .setAppName("testApp")
# .set('spark.cores.max',"2")
# .set('spark.executor.memory',"2G")
# .set('spark.dynamicAllocation.enabled','true')
# .set('spark.shuffle.service.enabled','true')
# )
sc = pyspark.SparkContext(conf=conf)
我没有得到任何异常,如果我没有从文件中读取并且在SparkConf()中使用硬编码火花主控,我的代码运行顺畅(目前已被注释) . "JAVA_HOME","SPARK_HOME" "PYTHONPATH" are set successfully 我没有使用Anaconda . 我在ubuntu和spark-2.1.1上使用python 2.7上的Jupyter笔记本
{'spark_app_cores': '"2"',
'spark_app_memory': '"2G"',
'spark_master_url': '"spark://remote:port"'}
Exception Traceback (most recent call last)
<ipython-input-1-c893eaf079f2> in <module>()
36 # .set('spark.shuffle.service.enabled','true')
37 # )
---> 38 sc = pyspark.SparkContext(conf=conf)
/usr/local/spark/python/pyspark/context.py in __init__(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, gateway, jsc, profiler_cls)
113 """
114 self._callsite = first_spark_call() or CallSite(None, None, None)
--> 115 SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
116 try:
117 self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer,
/usr/local/spark/python/pyspark/context.py in _ensure_initialized(cls, instance, gateway, conf)
281 with SparkContext._lock:
282 if not SparkContext._gateway:
--> 283 SparkContext._gateway = gateway or launch_gateway(conf)
284 SparkContext._jvm = SparkContext._gateway.jvm
285
/usr/local/spark/python/pyspark/java_gateway.py in launch_gateway(conf)
93 callback_socket.close()
94 if gateway_port is None:
---> 95 raise Exception("Java gateway process exited before sending the driver its port number")
96
97 # In Windows, ensure the Java child processes do not linger after Python has exited.
Exception: Java gateway process exited before sending the driver its port number
我查看了各种链接,但没有找到解决方案:pyspark java gateway error stackoverflow Github issue on java Gateway
1 回答
这是一个很棒的想法,但你忽略了这个事实,这就是
$SPARK_HOME/conf/spark-defaults.conf
的用途 . 只需在那里放置所需的属性这看起来不对:
为什么在属性中使用
=
?否则它应该没有效果 . Python也提供属性解析器https://docs.python.org/3/library/configparser.html