我使用spark cassandra连接器编写了一个应用程序 . 现在,当spark-submit工作我得到错误 java.lang.IllegalArgumentException: requirement failed: No mappable properties found in class: MailBox ,即使我定义了https://github.com/datastax/spark-cassandra-connector/blob/master/doc/6_advanced_mapper.md中指定的类型转换器,我的想法是我需要一个用于MailBox的伴随对象,我在其中定义一个映射器,但我找不到一个例子在文档中 . 有谁知道如何解决这个问题?谢谢
The code :
object Test {
case class Size(size: Long) {
if (size < 0) throw new IllegalArgumentException
def +(s: Size): Size = Size(size + s.size)
}
object LongToSizeConverter extends TypeConverter[Size] {
def targetTypeTag = typeTag[Size]
def convertPF = { case long: Long => Size(long) }
}
object SizeToLongConverter extends TypeConverter[Long] {
def targetTypeTag = typeTag[Long]
def convertPF = { case Size(long) => long.toLong }
}
case class MailBox(id: String,totalsize: Size)
case class Id(mailboxid:String)
object StringToIdConverter extends TypeConverter[Id] {
def targetTypeTag = typeTag[Id]
def convertPF = { case str: String => Id(str)
case str: UUID => Id(str.toString) }
}
object IdToStringConverter extends TypeConverter[String] {
def targetTypeTag = typeTag[String]
def convertPF = { case Id(str) => str.toString }
}
def main(args: Array[String]) {
val sc = new SparkContext();
TypeConverter.registerConverter(StringToIdConverter)
TypeConverter.registerConverter(IdToStringConverter)
TypeConverter.registerConverter(LongToSizeConverter)
TypeConverter.registerConverter(SizeToLongConverter)
val test= sc.parallelize(Array(MailBox(Id("1"),Size(10))))
test.saveAsCassandraTable("test","Mailbox")
}
}
1 回答
首先让我发布一个快速工作的例子,然后我将解决出错的问题
saveAsCassandraTable不适用于自定义类型
saveAsCassandraTable
使用fromType方法,该方法需要已知类型(不是自定义类型) . 这是因为saveAsCassandraTable基于已知的字段类型创建Cassandra列 . 使用自定义类型转换器,您不需要查找 . 由于saveAsCassandraTable在插入之前创建了Cassandra表,因此它不知道如何制作表 .为了解决这个问题,我们更改了一行
至
我们在CQLSH中预先制作了表格,但您也可以使用应用程序中的Java驱动程序来完成此操作 .
我们需要转换为Java类型
TypeConverter链接不适用于自定义类型转换器 . 这意味着我们需要提供从Custom类型到Java类型的转换器 . 为此,我改变了SizeToLong转换器
我们应该反对Scala反思缺乏线程安全
我添加了synchronized块(使用SparkReflectionLock)以确保我们不会遇到任何问题 .
看到
我们需要在对象级别进行注册
为了确保我们的注册发生在执行程序JVM上,我将它们移出“主”范围 . 我不确定这有多重要,但最好反映这应该发生在代码发送到的地方,而不仅仅是在main方法中 .