首页 文章

无法将Java日期保存到Cassandra

提问于
浏览
2

我尝试使用spark为casandra保存一些数据,但是当我尝试保存Date时,我得到了

线程“main”中的异常org.apache.spark.sql.AnalysisException:无法从field2#5中提取值; org.apache.spark.sql.catalyst.expressions.ExtractValue $ .apply(complexTypeExtractors.scala:73)at org.apache.spark.sql.catalyst.analysis.Analyzer $ ResolveReferences $$ anonfun $ apply $ 10 $$ anonfun $ applyOrElse $ 4.applyOrElse(Analyzer.scala:475)atg.apache.spark.sql.catalyst.analysis.Analyzer $ ResolveReferences $$ anonfun $ apply $ 10 $$ anonfun $ applyOrElse $ 4.applyOrElse(Analyzer.scala:467)at org .apache.spark.sql.catalyst.trees.TreeNode $$ anonfun $ transformUp $ 2.apply(TreeNode.scala:339)at org.apache.spark.sql.catalyst.trees.TreeNode $$ anonfun $ transformUp $ 2.apply( TreeNode.scala:339)org.apache.spark.sql.catalyst.trees.CurrentOrigin $ .withOrigin(TreeNode.scala:69)at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala :338)org.apache.spark.sql.catalyst.trees.TreeNode $$ anonfun $ 5.apply(TreeNode.scala:332)at org.apache.spark.sql.catalyst.trees.TreeNode $$ anonfun $ 5.apply (TreeNode.scala:332)在sca的org.apache.spark.sql.catalyst.trees.TreeNode $$ anonfun $ 4.apply(TreeNode.scala:281) la.collection.Iterator $$ anon $ 11.next(Iterator.scala:328)at scala.collection.Iterator $ class.foreach(Iterator.scala:727)at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable $ class . $ plus $ plus $ eq(Growable.scala:48)at scala.collection.mutable.ArrayBuffer . $ plus $ plus $ eq(ArrayBuffer.scala:103)at scala.collection .mutable.ArrayBuffer . $ plus $ plus $ eq(ArrayBuffer.scala:47)at scala.collection.TraversableOnce $ class.to(TraversableOnce.scala:273)at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce $ class.toBuffer(TraversableOnce.scala:265)at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)at scala.collection.TraversableOnce $ class.toArray(TraversableOnce.scala:252)at at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)位于org.apache.spark.sql.catalyst.trees的org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321) . TreeNode.transformUp(TreeNode.scala:332 )org.apache.spark.sql.catalyst.trees.TreeNode $$ anonfun $ 5.apply(TreeNode.scala:332)at org.apache.spark.sql.catalyst.trees.TreeNode $$ anonfun $ 5.apply(TreeNode) .scala:332)at org.apache.spark.sql.catalyst.trees.TreeNode $$ anonfun $ 4 $$ anonfun $ apply $ 8.apply(TreeNode.scala:298)at scala.collection.MapLike $ MappedValues $$ anonfun $迭代器$ 3.apply(MapLike.scala:246)at scala.collection.MapLike $ MappedValues $$ anonfun $ iterator $ 3.apply(MapLike.scala:246)at scala.collection.Iterator $$ anon $ 11.next(Iterator.scala :328)scala.collection.Iterator $ class.foreach(Iterator.scala:727)at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)at scala.collection.IterableLike $ class.foreach(IterableLike.scala: 72)scala.collection.IterableLike $$ anon $ 1.foreach(IterableLike.scala:294)at scala.collection.generic.Growable $ class . $ plus $ plus $ eq(Growable.scala:48)at scala.collection . scable.collection.TraversableViewLike $ class.force(TraversableViewLi)中的mutable.MapBuilder . $ plus $ plus $ eq(MapBuilder.scala:24) ke.scala:87)scala.collection.IterableLike $$ anon $ 1.force(IterableLike.scala:294)at org.apache.spark.sql.catalyst.trees.TreeNode $$ anonfun $ 4.apply(TreeNode.scala: 306)scala.collection.Iterator $$ anon $ 11.next(Iterator.scala:328)at scala.collection.Iterator $ class.foreach(Iterator.scala:727)at scala.collection.AbstractIterator.foreach(Iterator.scala) :1157)scala.collection.generic.Growable $ class . $ plus $ plus $ eq(Growable.scala:48)at scala.collection.mutable.ArrayBuffer . $ plus $ plus $ eq(ArrayBuffer.scala:103)at at scala.collection.mutable.ArrayBuffer . $ plus $ plus $ eq(ArrayBuffer.scala:47)at scala.collection.TraversableOnce $ class.to(TraversableOnce.scala:273)at scala.collection.AbstractIterator.to(Iterator.scala :1157)scala.collection.TraversableOnce $ class.toBuffer(TraversableOnce.scala:265)at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)at scala.collection.TraversableOnce $ class.toArray(TraversableOnce.scala: 252)在scala.collection.AbstractIterator.toArray(Iterator.s cala:1157)org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321)at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:332) at org.apache.spark.sql.catalyst.trees.TreeNode $$ anonfun $ 5.apply(TreeNode.scala:332)at org.apache.spark.sql.catalyst.trees.TreeNode $$ anonfun $ 5.apply(TreeNode . scala:332)at org.apache.spark.sql.catalyst.trees.TreeNode $$ anonfun $ 4.apply(TreeNode.scala:281)at scala.collection.Iterator $$ anon $ 11.next(Iterator.scala:328) at scala.collection.Iterator $ class.foreach(Iterator.scala:727)at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)at scala.collection.generic.Growable $ class . $ plus $ plus $ eq( Growable.scala:48)atscala.collection.mutable.ArrayBuffer . $ plus $ plus $ eq(ArrayBuffer.scala:103)at scala.collection.mutable.ArrayBuffer . $ plus $ plus $ eq(ArrayBuffer.scala:47)at scala.collection.TraversableOnce $ class.to(TraversableOnce.scala:273)at scala.collection.AbstractIterator.to(Iterator.scala:1157)scala.collection.TraversableOnce $ class.toBuffer(TraversableOnce.scala:265)at scala.collection.AbstractIterator.toBuffer (Iterator.scala:1157)位于org.apache.spark.sql.catalyst的scala.collection.TraversableOnce $ class.toArray(TraversableOnce.scala:252)scala.collection.AbstractIterator.toArray(Iterator.scala:1157) . 树.TreeNode.transformChildren(TreeNode.scala:321)atg.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:332)org.apache.spark.sql.catalyst.trees.TreeNode $ $ anonfun $ 5.apply(TreeNode.scala:332)atg.apache.spark.sql.catalyst.trees.TreeNode $$ anonfun $ 5.apply(TreeNode.scala:332)at org.apache.spark.sql.catalyst . trees.TreeNode $$ anonfun $ 4 $$ anonfun $ $申请8.apply(TreeNode.sc ala:298)scala.collection.MapLike $ MappedValues $$ anonfun $ iterator $ 3.apply(MapLike.scala:246)at scala.collection.MapLike $ MappedValues $$ anonfun $ iterator $ 3.apply(MapLike.scala:246) at scala.collection.Iterator $$ anon $ 11.next(Iterator.scala:328)at scala.collection.Iterator $ class.foreach(Iterator.scala:727)at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) )scala.collection.IterableLike $ class.foreach(IterableLike.scala:72)at scala.collection.IterableLike $$ anon $ 1.foreach(IterableLike.scala:294)at scala.collection.generic.Growable $ class . $ plus $ plus $ eq(Growable.scala:48)scala.collection.mutable.MapBuilder . $ plus $ plus $ eq(MapBuilder.scala:24)at scala.collection.TraversableViewLike $ class.force(TraversableViewLike.scala:87) at scala.collection.IterableLike $$ anon $ 1.force(IterableLike.scala:294)at org.apache.spark.sql.catalyst.trees.TreeNode $$ anonfun $ 4.apply(TreeNode.scala:306)at scala.collection .Iterator $$ anon $ 11.next(Iterator.scala:328)at scala.collection.Iterator $ clas s.foreach(Iterator.scala:727)at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)at scala.collection.generic.Growable $ class . $ plus $ plus $ eq(Growable.scala:48)at at scala.collection.mutable.ArrayBuffer . $ plus $ plus $ eq(ArrayBuffer.scala:103)at scala.collection.mutable.ArrayBuffer . $ plus $ plus $ eq(ArrayBuffer.scala:47)at scala.collection.TraversableOnce $ class.to(TraversableOnce.scala:273)at scala.collection.AbstractIterator.to(Iterator.scala:1157)scala.collection.TraversableOnce $ class.toBuffer(TraversableOnce.scala:265)at scala.collection.AbstractIterator.toBuffer (Iterator.scala:1157)位于org.apache.spark.sql.catalyst的scala.collection.TraversableOnce $ class.toArray(TraversableOnce.scala:252)scala.collection.AbstractIterator.toArray(Iterator.scala:1157) . 树.TreeNode.transformChildren(TreeNode.scala:321)atg.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:332)org.apache.spark.sql.catalyst.trees.TreeNode $ $ anonfun $ 5.apply(TreeNode.scala:332)at org.apache.spark.sql.catalyst.trees.TreeNode $$ anonfun $ 5.apply(TreeNode.scala:332)at org.apache.spark.sql.catalyst.trees.TreeNode $$ anonfun $ 4.apply(TreeNode.scala) :281)scala.collection.Iterator $$ anon $ 11.next(Iterator.scala:328)at scala.collection.Iterator $ class.foreach(Iterator.scala:727)at scala.collection.AbstractIterator.foreach(Iterator . scala:1157)scala.collection.generic.Growable $ class . $ plus $ plus $ eq(Growable.scala:48)at scala.collection.mutable.ArrayBuffer . $ plus $ plus $ eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer . $ plus $ plus $ eq(ArrayBuffer.scala:47)at scala.collection.TraversableOnce $ class.to(TraversableOnce.scala:273)at scala.collection.AbstractIterator.to(Iterator . scala:1157)scala.collection.TraversableOnce $ class.toBuffer(TraversableOnce.scala:265)at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)at scala.collection.TraversableOnce $ class.toArray(TraversableOnce.scala :252)在scala.collection.AbstractIterator.toArray(Iterator.sca la:1157)org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321)at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:332)在org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp $ 1(QueryPlan.scala:108)org.apache.spark.sql.catalyst.plans.QueryPlan.org $ apache $ spark $ sql $ catalyst $ plans $ QueryPlan $$ recursiveTransform $ 2(QueryPlan.scala:118)在org.apache.spark.sql.catalyst.plans.QueryPlan $$ anonfun $ org $ apache $ spark $ sql $ catalyst $ plans $ QueryPlan $$ recursiveTransform $ 2 $ 1 . 申请(QueryPlan.scala:122)scala.collection.TraversableLike $$ anonfun $ map $ 1.apply(TraversableLike.scala:244)at scala.collection.TraversableLike $$ anonfun $ map $ 1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318)at scala.collection.TraversableLike $ class.map(TraversableLike.scala:244)at scala.collection.AbstractTraversable.map(Traversable.scala:105)at at或者.gap . .QueryPlan $$ anonfun $ 2.apply(QueryPlan.scala:127)at scala.collection.Iterator $$ anon $ 11.next(Iterator.scala:328)at scala.collection.Iterator $ class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)at scala.collection.generic.Growable $ class . $ plus $ plus $ eq(Growable.scala:48)at scala.collection.mutable.ArrayBuffer . $在scala.collection.mutable.ArrayBuffer . $ plus $ plus $ eq(ArrayBuffer.scala:47)scala.collection.TraversableOnce $ class.to(TraversableOnce.scala:273)加上$ plus $ eq(ArrayBuffer.scala:103) )scala.collection.AbstractIterator.to(Iterator.scala:1157)at scala.collection.TraversableOnce $ class.toBuffer(TraversableOnce.scala:265)at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)at scala .collection.TraversableOnce $ class.toArray(TraversableOnce.scala:252)at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)位于org.apache.spark.sql.catalyst.analysis的org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:127) . Analyzer $ ResolveReferences $$ anonfun $ apply $ 10.applyOrElse(Analyzer.scala:467)at org.apache.spark.sql.catalyst.analysis.Analyzer $ ResolveReferences $$ anonfun $ apply $ 10.applyOrElse(Analyzer.scala:347)at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan $$ anonfun $ resolveOperators $ 1.apply(LogicalPlan.scala:57)at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan $$ anonfun $在org.apache.spark.sql.cr响应 . 在org.apache.spark.sql.catalyst.analysis.Analyzer的org.apache.spark.sql.catalyst.analysis.Analyzer $ ResolveReferences $ .apply(Analyzer.scala:347)中的LogicalPlan.resolveOperators(LogicalPlan.scala:56) $ ResolveReferences $ .apply(Analyzer.scala:328)a t org.apache.spark.sql.catalyst.rules.RuleExecutor $$ anonfun $ execute $ 1 $$ anonfun $ apply $ 1.apply(RuleExecutor.scala:83)at org.apache.spark.sql.catalyst.rules.RuleExecutor $ $ anonfun $ execute $ 1 $$ anonfun $在scala.collection.immutable.List.foldLeft(List . )的scala.collection.LinearSeqOptimized $ class.foldLeft(LinearSeqOptimized.scala:111)处应用$ 1.apply(RuleExecutor.scala:80) . scala:84)org.apache.spark.sql.catalyst.rules.RuleExecutor $$ anonfun $ execute $ 1.apply(RuleExecutor.scala:80)at org.apache.spark.sql.catalyst.rules.RuleExecutor $$ anonfun $执行$ 1.apply(RuleExecutor.scala:72),位于org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:scala.collection.immutable.List.foreach(List.scala:318) 72)org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.resolve(ExpressionEncoder.scala:294)atg.apache.spark.sql.Dataset . (Dataset.scala:79)atg.apache.spark . sql.Dataset . (Dataset.scala:90)位于casandra.casandra.App的org.apache.spark.sql.DataFrame.as(DataFrame.scala:209) . readFromSqlServer(App.java:55)at casandra.casandra.App.main(App.java:76)

错误上诉:

Dataset<Table1> tData = dataFrame.as(Encoders.bean(Table1.class));
 List<Table1> tList = tData.collectAsList();

我的 class

public class Table1 {
    private String field1;
    private Date field2;
}

我的cassandra表:

CREATE TABLE "a"."table1" (
field1 text,
field2 timestamp,
PRIMARY KEY (( field1 )));

谁知道怎么解决这个问题?

编辑:

SparkConf conf = new SparkConf();
conf.setAppName("Casandra Test");
conf.setMaster("local[*]");
conf.set("spark.cassandra.connection.host", adress);
App app = new App(conf);
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
 Map<String, String> options = new HashMap<String, String>();
options.put("url", sqlServerAddress);
options.put("dbtable", "(SELECT field1, field2 FROM table1");
options.put("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver");
DataFrame dataFrame = sqlContext.read().format("jdbc").options(options).load();        
Dataset<table1> ceData = dataFrame.as(Encoders.bean(Table1.class));
List<table1> ceList = ceData.collectAsList();           
JavaRDD<table1> ceRDD = sc.parallelize(app.readFromSqlServer(sqlContext));
javaFunctions(ceRDD).writerBuilder("a", "table1", mapToRow(Table1.class)).saveToCassandra();
sc.stop();

1 回答

  • 5

    好的我找到了你的问题 .

    如果你查看方法Encoders.bean()的官方文档,它会说:

    Creates an encoder for Java Bean of type T.
    
    T must be publicly accessible.
    
    supported types for java bean field:
    
    - primitive types: boolean, int, double, etc.
    - boxed types: Boolean, Integer, Double, etc.
    - String
    - java.math.BigDecimal
    - time related: java.sql.Date, java.sql.Timestamp
    - collection types: only array and java.util.List currently, map support is in progress
    nested java bean.
    

    没有提到 java.util.Date 类型所以它是 not supported by Encoders

    您应该更新 Table1 类以使用其他类型而不是 java.util.Date

相关问题