首页 文章

自定义TypeConverters使用spark cassandra连接器

提问于
浏览
1

我使用spark cassandra连接器编写了一个应用程序 . 现在,当spark-submit工作我得到错误 java.lang.IllegalArgumentException: requirement failed: No mappable properties found in class: MailBox ,即使我定义了https://github.com/datastax/spark-cassandra-connector/blob/master/doc/6_advanced_mapper.md中指定的类型转换器,我的想法是我需要一个用于MailBox的伴随对象,我在其中定义一个映射器,但我找不到一个例子在文档中 . 有谁知道如何解决这个问题?谢谢

The code

object Test {
  case class Size(size: Long) {
    if (size < 0) throw new IllegalArgumentException
    def +(s: Size): Size = Size(size + s.size)
  }

  object LongToSizeConverter extends TypeConverter[Size] {
    def targetTypeTag = typeTag[Size]
    def convertPF = { case long: Long => Size(long)  }
  }
  object SizeToLongConverter extends TypeConverter[Long] {
    def targetTypeTag = typeTag[Long]
    def convertPF = { case Size(long) => long.toLong }
  }
  case class MailBox(id: String,totalsize: Size)
  case class Id(mailboxid:String) 
  object StringToIdConverter extends TypeConverter[Id] {
    def targetTypeTag = typeTag[Id]
    def convertPF = { case str: String => Id(str) 
                      case str: UUID => Id(str.toString)  }
  }
  object IdToStringConverter extends TypeConverter[String] {
    def targetTypeTag = typeTag[String]
    def convertPF = { case Id(str) => str.toString }
  }

  def main(args: Array[String]) {
    val sc = new SparkContext();
    TypeConverter.registerConverter(StringToIdConverter)
    TypeConverter.registerConverter(IdToStringConverter)
    TypeConverter.registerConverter(LongToSizeConverter)
    TypeConverter.registerConverter(SizeToLongConverter)
    val test= sc.parallelize(Array(MailBox(Id("1"),Size(10))))
    test.saveAsCassandraTable("test","Mailbox")
  }
}

1 回答

  • 3

    首先让我发布一个快速工作的例子,然后我将解决出错的问题

    package com.datastax.spark.example
    
    import com.datastax.spark.connector._
    import org.apache.spark.{SparkConf, SparkContext}
    import com.datastax.spark.connector.types._
    import scala.reflect.runtime.universe._
    import java.util.UUID
    
    import org.apache.spark.sql.catalyst.ReflectionLock.SparkReflectionLock
    
    case class Size(size: Long) {
        if (size < 0) throw new IllegalArgumentException
        def +(s: Size): Size = Size(size + s.size)
    }
    case class MailBox(id: Id,totalsize: Size)
    case class Id(mailboxid:String)
    
    
    
    object Test {
    
        val LongTypeTag = SparkReflectionLock.synchronized {
                implicitly[TypeTag[java.lang.Long]]
        }
        val SizeTypeTag = SparkReflectionLock.synchronized {
                typeTag[Size]
        }
        val IdTypeTag = SparkReflectionLock.synchronized {
                typeTag[Id]
        }
        val StringTypeTag = SparkReflectionLock.synchronized {
                implicitly[TypeTag[String]]
        }
    
        object LongToSizeConverter extends TypeConverter[Size] {
            def targetTypeTag = SizeTypeTag
            def convertPF = { case long: Long => Size(long)  }
        }
        object LongToSizeConverter extends TypeConverter[Size] {
            def targetTypeTag = SizeTypeTag
            def convertPF = { case long: Long => Size(long)  }
        }
    
        object SizeToLongConverter extends TypeConverter[java.lang.Long] {
            def targetTypeTag = LongTypeTag
            def convertPF = { case Size(long) => long.toLong }
        }
    
        object StringToIdConverter extends TypeConverter[Id] {
            def targetTypeTag = IdTypeTag
            def convertPF = { 
                case str: String => Id(str)
                case str: UUID => Id(str.toString)
            }
        }
    
        object IdToStringConverter extends TypeConverter[String] {
            def targetTypeTag = StringTypeTag
            def convertPF = { case Id(str) => str.toString }
        }
    
        TypeConverter.registerConverter(StringToIdConverter)
        TypeConverter.registerConverter(IdToStringConverter)
        TypeConverter.registerConverter(LongToSizeConverter)
        TypeConverter.registerConverter(SizeToLongConverter)
    
    
        def main(args: Array[String]) {
            val sc = new SparkContext();
            val test = sc.parallelize(Array(MailBox(Id("1"),Size(10))))
            test.saveToCassandra("ks","mailbox")
        }
    }
    

    saveAsCassandraTable不适用于自定义类型

    saveAsCassandraTable 使用fromType方法,该方法需要已知类型(不是自定义类型) . 这是因为saveAsCassandraTable基于已知的字段类型创建Cassandra列 . 使用自定义类型转换器,您不需要查找 . 由于saveAsCassandraTable在插入之前创建了Cassandra表,因此它不知道如何制作表 .

    为了解决这个问题,我们更改了一行

    test.saveAsCassandraTable("test","Mailbox")
    

    test.saveToCassandraTable("test","Mailbox")
    

    我们在CQLSH中预先制作了表格,但您也可以使用应用程序中的Java驱动程序来完成此操作 .

    我们需要转换为Java类型

    TypeConverter链接不适用于自定义类型转换器 . 这意味着我们需要提供从Custom类型到Java类型的转换器 . 为此,我改变了SizeToLong转换器

    object SizeToLongConverter extends TypeConverter[java.lang.Long] {
    

    我们应该反对Scala反思缺乏线程安全

    我添加了synchronized块(使用SparkReflectionLock)以确保我们不会遇到任何问题 .

    看到

    SparkReflectionLock.synchronized
    

    我们需要在对象级别进行注册

    为了确保我们的注册发生在执行程序JVM上,我将它们移出“主”范围 . 我不确定这有多重要,但最好反映这应该发生在代码发送到的地方,而不仅仅是在main方法中 .

相关问题