Py4JJavaError: An error occurred while calling o349.defaultMinPartitions.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
我通过以下方式避免这种情况
def check_alive(spark_conn):
"""Check if connection is alive. ``True`` if alive, ``False`` if not"""
try:
get_java_obj = spark_conn._jsc.sc().getExecutorMemoryStatus()
return True
except Exception:
return False
def get_number_of_executors(spark_conn):
if not check_alive(spark_conn):
raise Exception('Unexpected Error: Spark Session has been killed')
try:
return spark_conn._jsc.sc().getExecutorMemoryStatus().size()
except:
raise Exception('Unknown error')
s = sc._jsc.sc().getExecutorMemoryStatus().keys()
l = str(s).replace("Set(","").replace(")","").split(", ")
d = set()
for i in l:
d.add(i.split(":")[0])
len(d)
5 回答
sc.defaultParallelism
只是一个提示 . 根据配置,它可能与节点数量无关 . 如果您使用带有分区计数参数但未提供分区计数参数的操作,则这是分区数 . 例如sc.parallelize
将从列表中生成新的RDD . 您可以使用第二个参数告诉它在RDD中创建多少个分区 . 但此参数的默认值为sc.defaultParallelism
.您可以在Scala API中使用
sc.getExecutorMemoryStatus
获取执行程序的数量,但这不会在Python API中公开 .一般来说,建议在RDD中的分区大约是执行程序的4倍 . 这是一个很好的提示,因为如果任务所花费的时间存在差异,这将使其均匀 . 例如,一些 Actuator 将处理5个更快的任务,而其他 Actuator 处理3个较慢的任务 .
你不需要对此非常准确 . 如果你有一个粗略的想法,你可以去估计 . 就像你知道你的CPU少于200个,你可以说500个分区就可以了 .
因此,尝试使用此数量的分区创建RDD:
或者如果不控制RDD的创建,则在计算之前重新分配RDD:
您可以使用
rdd.getNumPartitions()
检查RDD中的分区数 .在pyspark上你仍然可以使用pyspark的py4j网桥调用scala
getExecutorMemoryStatus
API:我发现有时我的会话被远程程序杀死,给出了一个奇怪的Java错误
我通过以下方式避免这种情况
应该可以使用它来获取集群中的节点数量(类似于上面的@Dan方法,但更短,效果更好!) .
其他答案提供了获取执行者数量的方法 . 这是一种获取节点数的方法 . 这包括头部和工作节点 .