首页 文章

Scala中DSE Cassandra上的Spark SQL查询

提问于
浏览
1

我想在Scala IDE中的DSE Cassandra表上测试Spark-SQL查询 . 当在dse spark-submit中执行jar文件时,查询运行完美 . 但它在Scala IDE中运行时会出错 . 错误是,

线程“main”中的异常org.apache.spark.sql.AnalysisException:未找到表或视图:killr_video.videos;第1行pos 14;

我认为这是火花主配置错误,因为我在本地模式下运行主设备 .

这是我发起的火花 Session .

val spark = SparkSession
          .builder()
          .appName("CassandraSpark")
          .config("spark.cassandra.connection.host", "127.0.0.1")
          .config("spark.cassandra.connection.port", "9042")
          .enableHiveSupport()
          .master("local")
          .getOrCreate();

但我不知道要设置为主人的地址 . 当我启动Cassandra时,我尝试将主地址设置为“spark://127.0.0.1:7077”,这是我在Web UI(localhost:7080)中找到的 . 但是,它仍然给出了如下错误

错误MapOutputTrackerMaster:在scala.concurrent.impl.Promise $ DefaultPromise.tryAwait(Promise.scala:212)中与java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326)中的MapOutputTracker java.lang.InterruptedException进行通信时出错 . )scala.concurrent.impl.Promise $ DefaultPromise.ready(Promise.scala:222)at scala.concurrent.impl.Promise $ DefaultPromise.result(Promise.scala:227)at scala.concurrent.Await $$ anonfun $ result $ 1.apply(package.scala:190)at scala.concurrent.BlockContext $ DefaultBlockContext $ .blockOn(BlockContext.scala:53)at scala.concurrent.Await $ .result(package.scala:190)at org.apache.spark位于org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78 )org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:100)at org.apache.spark.MapOutputTracker.sendTracker( MapOutputTracker.scala:110)org.apache.spark.MapOutputTrackerMaster.stop(MapOutputTracker.scala:580)org.apache.spark.SparkEnv.stop(SparkEnv.scala:84)org.apache.spark.SparkContext $$ anonfun $ stop $ 11.apply $ mcV $ sp(SparkContext.scala:1797)org.apache.spark.util.Utils $ .tryLogNonFatalError(Utils.scala:1290)at org.apache.spark.SparkContext.stop(SparkContext . scala:1796)org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.dead(StandaloneSchedulerBackend.scala:142)atg.apache.spark.deploy.client.StandaloneAppClient $ ClientEndpoint.markDead(StandaloneAppClient.scala:254)at org java.util.concurrent上的.apache.spark.deploy.client.StandaloneAppClient $ ClientEndpoint $$ anon $ 2.run(StandaloneAppClient.scala:131)java.util.concurrent.Executors $ RunnableAdapter.call(Executors.java:511) java.util.concurrent.ScheduledThre的java.util.concurrent.ScheduledThreadPoolExecutor $ ScheduledFutureTask.access $ 201(ScheduledThreadPoolExecutor.java:180)的.FutureTask.run(FutureTask.java:266) adPoolExecutor $ ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:624)at java .lang.Thread.run(Thread.java:748)18/05/22 11:46:44 ERROR Utils:线程中的未捕获异常appclient-registration-retry-thread org.apache.spark.SparkException:与MapOutputTracker进行通信时出错org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:104)org.apache.spark.MapOutputTracker.sendTracker(MapOutputTracker.scala:110)org.apache.spark.MapOutputTrackerMaster.stop(MapOutputTracker.scala:580) at org.apache.spark.SparkEnv.stop(SparkEnv.scala:84)atg.apache.spark.SparkContext $$ anonfun $ stop $ 11.apply $ mcV $ sp(SparkContext.scala:1797)at org.apache.spark .util.Utils $ .tryLogNonFatalError(Utils.scala:1290)org.apache.spark.SparkContext.stop(SparkContext.scala:1796)org.apache.spark.scheduler.cluster.Stan daloneSchedulerBackend.dead(StandaloneSchedulerBackend.scala:142)位于org.apache.spark.deploy.client.StandaloneAppClient $ ClientEndpoint.markDead(StandaloneAppClient.scala:254)org.apache.spark.deploy.client.StandaloneAppClient $ ClientEndpoint $$ anon $ 2.run(StandaloneAppClient.scala:131)在java.util.concurrent.Executors $ RunnableAdapter.call(Executors.java:511)java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.util java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java: 1149)在java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)引起:java.util.concurrent.locks中的java.lang.InterruptedException . scala.concurrent.impl.Promise $ DefaultPromise.tryAwait(Promise.scala:212)中的Scala.concurrent.impl.Promise $ DefaultPromise.ready(Promise.scala:222)中的抽象问号同步器.AtrySharedNanos(AbstractQueuedSynchronizer.java:1326) .concurrent.impl.Promise $ DefaultPromise.result(Promise.scala:227)at scala.concurrent.Await $$ anonfun $ result $ 1.apply(package.scala:190)at scala.concurrent.BlockContext $ DefaultBlockContext $ .blockOn( BlockContext.scala:53)位于org.apache.spark.rpc的org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:81)的scala.concurrent.Await $ .result(package.scala:190) . 位于org.apache.spark.spark.MapOutputTracker.askTracker(MapOutputTracker.sc)的org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78)中的RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102) ala:100)... 16更多18/05/22 11:46:44错误SparkContext:初始化SparkContext时出错 . 位于org.apache.spark.Spark上的org.apache.Spark.SparkContext . (SparkContext.scala:546)中的java.lang.NullPointerException . 或者org.apache.spark.sql.SparkSession $ org.apache.Spark.SparkContext $ .getOrCreate(SparkContext.scala:2258) Builder $$ anonfun $ 8.apply(SparkSession.scala:831)at org.apache.spark.sql.SparkSession $ Builder $$ anonfun $ 8.apply(SparkSession.scala:823)at scala.Option.getOrElse(Option.scala: 121)at org.apache.spark.sql.SparkSession $ Builder.getOrCreate(SparkSession.scala:823)18/05/22 11:46:44 INFO SparkContext:SparkContext已经停止 . 位于org.apache的org.apache.spark.SparkContext . (SparkContext.scala:526)org.apache.spark.SparkContext $ .getOrCreate(SparkContext.scala:2258)中的线程“main”java.lang.NullPointerException中的异常 . spark.sql.SparkSession $ Builder $$ anonfun $ 8.apply(SparkSession.scala:831)at org.apache.spark.sql.SparkSession $ Builder $$ anonfun $ 8.apply(SparkSession.scala:823)at scala.Option . getOrElse(Option.scala:121)at org.apache.spark.sql.SparkSession $ Builder.getOrCreate(SparkSession.scala:823)

我该怎么做才能使这段代码有效?

1 回答

  • 0

    您不需要对Cassandra IP或master进行硬编码 - 只需创建SparkSession对象,它就可以工作 . 这是工作代码(用Java):

    SparkSession spark = SparkSession
      .builder()
      .appName("CassandraSpark")
      .getOrCreate();
    
    Dataset<Row> sqlDF = spark.sql("select * from test.t1 limit 1000");
    sqlDF.printSchema();
    sqlDF.show();
    

    在DSE中,如果要提交到分布式群集,则可以将master指定为 dse://? ,DSE将自动查找当前主服务器 . 全部possible options are described in documentation .

相关问题