首页 文章

为什么只有一个 Actuator 只运行一个火花作业?

提问于
浏览
1

我的Spark群集有1个主人和2个 Worker . 应用程序从s3读取csv文件到DataFrames,将它们注册为临时表,并使用sqlContext运行sql查询来创建新的DataFrames . 然后将这些DF存储到MySql DB . 这些作业都在多个节点上运行 .

但是当我将这些表从DB读回到DataFrames时,将它们注册为临时表并运行sqlContext查询,所有处理都只由一个节点完成 . 可能是什么导致了这个?

这是我的代码示例:

DataFrame a = sqlContext.read().format("com.databricks.spark.csv").options(options)
                .load("s3://s3bucket/a/part*");
 DataFrame b = sqlContext.read().format("com.databricks.spark.csv").options(options)
                .load("s3://s3bucket/b/part*");

a.registerTempTable("a");
b.registerTempTable("b");

DataFrame c = sqlContext.sql("SELECT  a.name, b.name from   a join b on  a.id = b.a_id");

c.write().mode(SaveMode.Append).jdbc(MYSQL_CONNECTION_URL, "c", prop);

// other jobs are similar 

Map<String, String> dOptions = new HashMap<String, String>();
dOptions.put("driver", MYSQL_DRIVER);
dOptions.put("url", MYSQL_CONNECTION_URL);

dOptions.put("dbtable", "(select * from c) AS c");
rC= sqlContext.read().format("jdbc").options(dOptions).load();
rC.cache();

 dOptions.put("dbtable", "(select * from d) AS d");
 rD= sqlContext.read().format("jdbc").options(dOptions).load();
 rD.cache();

 dOptions.put("dbtable", "(select * from f) AS f");
 rF= sqlContext.read().format("jdbc").options(dOptions).load();
 rF.cache();

 rC.registerTempTable("rC");
 rD.registerTempTable("rD");
 rF.registerTempTable("rF");

DataFrame result = sqlContext.sql("SELECT  rC.name, rD.name, rF.date  from rC join rD on rC.name = rD.name join rF on rC.date = rF.date");

result.write().mode(SaveMode.Append).jdbc(MYSQL_CONNECTION_URL, "result_table", prop);

1 回答

  • 0

    你能与我们分享你的SparkConf()对象吗?

    SparkConf()对象包含Spark应用程序的配置 . 它用于将各种Spark参数设置为键值对,例如:

    -大师

    • 执行人数

    • 执行者核心数量

    • 分配的内存

    -其他..

相关问题