首页 文章

发现flatMap编译错误:需要TraversableOnce [String]:TraversableOnce [String]

提问于
浏览
3

编辑#2:这可能与内存有关 . 日志显示在堆外 .

Yes, definitely memory related. 基本上,docker日志报告了java中所有堆外的消息,但jupyter web笔记本没有将其传递给用户 . 相反,用户会获得内核故障和偶尔的奇怪行为,例如代码无法正确编译 .


Spark 1.6,特别是 docker run -d .... jupyter/all-spark-notebook

想在一个约100万笔交易的文件中计算帐户 .

这很简单,它可以在没有火花的情况下完成,但是我尝试使用spark scala时遇到了一个奇怪的错误 .

输入数据类型为 RDD[etherTrans] ,其中 etherTrans 是包含单个事务的自定义类型:时间戳,来自和来自帐户,以及在以太中交易的值 .

class etherTrans(ts_in:Long, afrom_in:String, ato_in:String, ether_in: Float)
extends Serializable {
    var ts: Long = ts_in
    var afrom: String = afrom_in
    var ato: String = ato_in
    var ether: Float = ether_in
    override def toString():String =  ts.toString+","+afrom+","+ato+","+ether.toString    
}

data:RDD[etherTrans] 看起来不错:

data.take(10).foreach(println)

etherTrans(1438918233,0xa1e4380a3b1f749673e270229993ee55f35663b4,0x5df9b87991262f6ba471f09758cde1c0fc1de734,3.1337E-14)
etherTrans(1438918613,0xbd08e0cddec097db7901ea819a3d1fd9de8951a2,0x5c12a8e43faf884521c2454f39560e6c265a68c8,19.9)
etherTrans(1438918630,0x63ac545c991243fa18aec41d4f6f598e555015dc,0xc93f2250589a6563f5359051c1ea25746549f0d8,599.9895)
etherTrans(1438918983,0x037dd056e7fdbd641db5b6bea2a8780a83fae180,0x7e7ec15a5944e978257ddae0008c2f2ece0a6090,100.0)
etherTrans(1438919175,0x3f2f381491797cc5c0d48296c14fd0cd00cdfa2d,0x4bd5f0ee173c81d42765154865ee69361b6ad189,803.9895)
etherTrans(1438919394,0xa1e4380a3b1f749673e270229993ee55f35663b4,0xc9d4035f4a9226d50f79b73aafb5d874a1b6537e,3.1337E-14)
etherTrans(1438919451,0xc8ebccc5f5689fa8659d83713341e5ad19349448,0xc8ebccc5f5689fa8659d83713341e5ad19349448,0.0)
etherTrans(1438919461,0xa1e4380a3b1f749673e270229993ee55f35663b4,0x5df9b87991262f6ba471f09758cde1c0fc1de734,3.1337E-14)
etherTrans(1438919491,0xf0cf0af5bd7d8a3a1cad12a30b097265d49f255d,0xb608771949021d2f2f1c9c5afb980ad8bcda3985,100.0)
etherTrans(1438919571,0x1c68a66138783a63c98cc675a9ec77af4598d35e,0xc8ebccc5f5689fa8659d83713341e5ad19349448,50.0)

下一个函数解析ok并以这种方式编写,因为之前的尝试都抱怨 Array[String]List[String]TraversableOnce[?] 之间的类型不匹配:

def arrow(e:etherTrans):TraversableOnce[String] = Array(e.afrom,e.ato)

但是然后在flatMap中使用此函数来获取所有帐户的RDD [String]失败 .

val accts:RDD[String] = data.flatMap(arrow)

Name: Compile Error
Message: :38: error: type mismatch;
 found   : etherTrans(in class $iwC)(in class $iwC)(in class $iwC)(in class $iwC) => TraversableOnce[String]
 required: etherTrans(in class $iwC)(in class $iwC)(in class $iwC)(in class $iwC) => TraversableOnce[String]
         val accts:RDD[String] = data.flatMap(arrow)
                                              ^
StackTrace:

确保向右滚动以查看它抱怨 TraversableOnce[String]TraversableOnce[String] 不匹配

这必须是一个相当普遍的问题,因为Generate List of Pairs中出现了更明显的类型不匹配,并且在I have a Scala List, how can I get a TraversableOnce?中建议没有足够的上下文 .

这里发生了什么?


EDIT :上面报告的问题没有出现,代码在docker容器中独立运行的旧版spark-shell,Spark 1.3.1中工作正常 . 使用jupyter / all-spark-notebook docker容器在spark 1.6 scala jupyter环境中生成错误 .

另外@ zero323说这个玩具的例子:

val rdd = sc.parallelize(Seq((1L, "foo", "bar", 1))).map{ case (ts, fr, to, et) => new etherTrans(ts, fr, to, et)} 
 rdd.flatMap(arrow).collect

他在终端spark-shell 1.6.0 / spark 2.10.5以及Scala 2.11.7和Spark 1.5.2工作 .

1 回答

  • 0

    我认为你应该切换到用例类,它应该工作正常 . 使用“常规”类,在序列化它们时可能会遇到奇怪的问题,看起来你需要的只是值对象,所以case类看起来更适合你的用例 .

    一个例子:

    case class EtherTrans(ts: Long, afrom: String, ato: String, ether: Float)
    
    val source = sc.parallelize(Array(
        (1L, "from1", "to1", 1.234F),
        (2L, "from2", "to2", 3.456F)
    ))
    
    val data = source.as[EtherTrans]
    
    val data = source.map { l => EtherTrans(l._1, l._2, l._3, l._4) }
    
    def arrow(e: EtherTrans) = Array(e.afrom, e.ato)
    
    data.map(arrow).take(5)
    /*
    res3: Array[Array[String]] = Array(Array(from1, to1), Array(from2, to2))
    */
    
    data.map(arrow).take(5)
    // res3: Array[Array[String]] = Array(Array(from1, to1), Array(from2, to2))
    

    如果需要,您可以创建一些方法/对象来生成案例类 . 如果你真的不需要逻辑的“toString”方法,只是为了“演示”,请将它保持在case类之外:在存储if或者显示之前,你总是可以使用map操作添加它 .

    此外,如果您使用的是Spark 1.6.0或更高版本,则可以尝试使用DataSet API,它看起来或多或少如下:

    val data = sqlContext.read.text("your_file").as[EtherTrans]
    

    https://databricks.com/blog/2016/01/04/introducing-spark-datasets.html

相关问题