首页 文章

使用spark hivecontext读取外部hive分区表的问题

提问于
浏览
1

我有一个外部的hive分区表,我试图使用HiveContext从Spark读取 . 但我得到空值 .

val maxClose = hiveContext.sql(“从stock_partitioned_data中选择max(Close),其中symbol ='AAPL'”); maxClose.collect() . foreach(println)

=====


scala> import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.HiveContext

scala>  val hiveContext = new HiveContext(sc);
16/09/22 00:12:47 INFO HiveContext: Initializing execution hive, version 1.1.0
16/09/22 00:12:47 INFO ClientWrapper: Inspected Hadoop version: 2.6.0-cdh5.5.0
16/09/22 00:12:47 INFO ClientWrapper: Loaded org.apache.hadoop.hive.shims.Hadoop23Shims for Hadoop version 2.6.0-cdh5.5.0
hiveContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@455aef06

scala> val maxClose = hiveContext.sql("select max(Close) from stock_data2")
16/09/22 00:12:53 INFO ParseDriver: Parsing command: select max(Close) from stock_data2
16/09/22 00:12:54 INFO ParseDriver: Parse Completed
16/09/22 00:12:54 INFO ClientWrapper: Inspected Hadoop version: 2.6.0-cdh5.5.0
16/09/22 00:12:54 INFO ClientWrapper: Loaded org.apache.hadoop.hive.shims.Hadoop23Shims for Hadoop version 2.6.0-cdh5.5.0
maxClose: org.apache.spark.sql.DataFrame = [_c0: double]

scala>  maxClose.collect().foreach (println )
16/09/22 00:13:04 INFO deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
16/09/22 00:13:04 INFO MemoryStore: ensureFreeSpace(425824) called with curMem=0, maxMem=556038881
16/09/22 00:13:04 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 415.8 KB, free 529.9 MB)
16/09/22 00:13:05 INFO MemoryStore: ensureFreeSpace(44793) called with curMem=425824, maxMem=556038881
16/09/22 00:13:05 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 43.7 KB, free 529.8 MB)
16/09/22 00:13:05 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.0.2.15:47553 (size: 43.7 KB, free: 530.2 MB)
16/09/22 00:13:05 INFO SparkContext: Created broadcast 0 from collect at <console>:27
16/09/22 00:13:05 INFO SparkContext: Starting job: collect at <console>:27
16/09/22 00:13:06 INFO FileInputFormat: Total input paths to process : 1
16/09/22 00:13:06 INFO DAGScheduler: Registering RDD 5 (collect at <console>:27)
16/09/22 00:13:06 INFO DAGScheduler: Got job 0 (collect at <console>:27) with 1 output partitions
16/09/22 00:13:06 INFO DAGScheduler: Final stage: ResultStage 1(collect at <console>:27)
16/09/22 00:13:06 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
16/09/22 00:13:06 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
16/09/22 00:13:06 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[5] at collect at <console>:27), which has no missing parents
16/09/22 00:13:06 INFO MemoryStore: ensureFreeSpace(18880) called with curMem=470617, maxMem=556038881
16/09/22 00:13:06 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 18.4 KB, free 529.8 MB)
16/09/22 00:13:06 INFO MemoryStore: ensureFreeSpace(8367) called with curMem=489497, maxMem=556038881
16/09/22 00:13:06 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 8.2 KB, free 529.8 MB)
16/09/22 00:13:06 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.0.2.15:47553 (size: 8.2 KB, free: 530.2 MB)
16/09/22 00:13:06 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:861
16/09/22 00:13:06 INFO DAGScheduler: Submitting 2 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[5] at collect at <console>:27)
16/09/22 00:13:06 INFO YarnScheduler: Adding task set 0.0 with 2 tasks
16/09/22 00:13:07 INFO ExecutorAllocationManager: Requesting 1 new executor because tasks are backlogged (new desired total will be 1)
16/09/22 00:13:08 INFO ExecutorAllocationManager: Requesting 1 new executor because tasks are backlogged (new desired total will be 2)
16/09/22 00:13:11 ERROR ErrorMonitor: AssociationError [akka.tcp://sparkDriver@10.0.2.15:45637] <- [akka.tcp://driverPropsFetcher@quickstart.cloudera:33635]: Error [Shut down address: akka.tcp://driverPropsFetcher@quickstart.cloudera:33635] [
akka.remote.ShutDownAssociation: Shut down address: akka.tcp://driverPropsFetcher@quickstart.cloudera:33635
Caused by: akka.remote.transport.Transport$InvalidAssociationException: The remote system terminated the association because it is shutting down.
]
akka.event.Logging$Error$NoCause$
16/09/22 00:13:12 INFO YarnClientSchedulerBackend: Registered executor: AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@quickstart.cloudera:49490/user/Executor#-842589632]) with ID 1
16/09/22 00:13:12 INFO ExecutorAllocationManager: New executor 1 has registered (new total is 1)
16/09/22 00:13:13 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, quickstart.cloudera, partition 0,NODE_LOCAL, 2291 bytes)
16/09/22 00:13:13 INFO BlockManagerMasterEndpoint: Registering block manager quickstart.cloudera:56958 with 530.3 MB RAM, BlockManagerId(1, quickstart.cloudera, 56958)
16/09/22 00:13:13 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on quickstart.cloudera:56958 (size: 8.2 KB, free: 530.3 MB)
16/09/22 00:13:15 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on quickstart.cloudera:56958 (size: 43.7 KB, free: 530.2 MB)
16/09/22 00:13:31 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, quickstart.cloudera, partition 1,NODE_LOCAL, 2291 bytes)
16/09/22 00:13:31 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 18583 ms on quickstart.cloudera (1/2)
16/09/22 00:13:31 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 157 ms on quickstart.cloudera (2/2)
16/09/22 00:13:31 INFO DAGScheduler: ShuffleMapStage 0 (collect at <console>:27) finished in 25.082 s
16/09/22 00:13:31 INFO DAGScheduler: looking for newly runnable stages
16/09/22 00:13:31 INFO DAGScheduler: running: Set()
16/09/22 00:13:31 INFO DAGScheduler: waiting: Set(ResultStage 1)
16/09/22 00:13:31 INFO YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool 
16/09/22 00:13:31 INFO DAGScheduler: failed: Set()
16/09/22 00:13:31 INFO DAGScheduler: Missing parents for ResultStage 1: List()
16/09/22 00:13:31 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[8] at collect at <console>:27), which is now runnable
16/09/22 00:13:31 INFO MemoryStore: ensureFreeSpace(16544) called with curMem=497864, maxMem=556038881
16/09/22 00:13:31 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 16.2 KB, free 529.8 MB)
16/09/22 00:13:31 INFO MemoryStore: ensureFreeSpace(7375) called with curMem=514408, maxMem=556038881
16/09/22 00:13:31 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 7.2 KB, free 529.8 MB)
16/09/22 00:13:31 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.0.2.15:47553 (size: 7.2 KB, free: 530.2 MB)
16/09/22 00:13:31 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:861
16/09/22 00:13:31 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[8] at collect at <console>:27)
16/09/22 00:13:31 INFO YarnScheduler: Adding task set 1.0 with 1 tasks
16/09/22 00:13:31 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, quickstart.cloudera, partition 0,PROCESS_LOCAL, 1914 bytes)
16/09/22 00:13:31 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on quickstart.cloudera:56958 (size: 7.2 KB, free: 530.2 MB)
16/09/22 00:13:31 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to quickstart.cloudera:49490
16/09/22 00:13:31 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 157 bytes
16/09/22 00:13:31 INFO DAGScheduler: ResultStage 1 (collect at <console>:27) finished in 0.245 s
16/09/22 00:13:31 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 245 ms on quickstart.cloudera (1/1)
16/09/22 00:13:31 INFO YarnScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool 
16/09/22 00:13:31 INFO DAGScheduler: Job 0 finished: collect at <console>:27, took 26.194947 s
[null]

===

但是,如果我直接从蜂巢控制台进行,我得到的结果 .

hive> select max(Close) from stock_data2
    > ;
Query ID = cloudera_20160922001414_4b684522-3e42-4957-8260-ff6b4da67c8f
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Starting Job = job_1474445009419_0005, Tracking URL = http://quickstart.cloudera:8088/proxy/application_1474445009419_0005/
Kill Command = /usr/lib/hadoop/bin/hadoop job  -kill job_1474445009419_0005
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2016-09-22 00:14:45,000 Stage-1 map = 0%,  reduce = 0%
2016-09-22 00:14:55,165 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 1.28 sec
2016-09-22 00:15:03,707 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 2.68 sec
MapReduce Total cumulative CPU time: 2 seconds 680 msec
Ended Job = job_1474445009419_0005
MapReduce Jobs Launched: 
Stage-Stage-1: Map: 1  Reduce: 1   Cumulative CPU: 2.68 sec   HDFS Read: 43379 HDFS Write: 10 SUCCESS
Total MapReduce CPU Time Spent: 2 seconds 680 msec
OK
52.369999
Time taken: 42.57 seconds, Fetched: 1 row(s)

我得到count(*)就好了,但是将列值和最大值查询为null .

1 回答

  • 1

    Spark版本1.6中已解决此问题

相关问题