我在 streaming 环境中使用Flink 's Table API and/or Flink'的SQL支持(Flink 1.3.1,Scala 2.11) . 我从 DataStream[Person]
开始, Person
是一个案例类,看起来像:
Person(name: String, age: Int, attributes: Map[String, String])
一切都按预期工作,直到我开始将 attributes
带入图片 .
例如:
val result = streamTableEnvironment.sql(
"""
|SELECT
|name,
|attributes['foo'],
|TUMBLE_START(rowtime, INTERVAL '1' MINUTE)
|FROM myTable
|GROUP BY TUMBLE(rowtime, INTERVAL '1' MINUTE), name, attributes['foo']
|""".stripMargin)
... 导致:
线程“main”中的异常org.apache.flink.table.api.TableException:不支持类型:org.apache.flink.table.api.TableException $ .apply(exceptions.scala:53)at org . 位于org.apache的org.apache.flink.table.plan.logical.LogicalRelNode $$ anonfun $ 12.apply(operators.scala:531)的apache.flink.table.calcite.FlinkTypeFactory $ .toTypeInfo(FlinkTypeFactory.scala:341) .flink.table.plan.logical.LogicalRelNode $$ anonfun $ 12.apply(operators.scala:530)at scala.collection.TraversableLike $$ anonfun $ map $ 1.apply(TraversableLike.scala:234)at scala.collection.TraversableLike $$ anonfun $ map $ 1.apply(TraversableLike.scala:234)at scala.collection.Iterator $ class.foreach(Iterator.scala:893)at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)at scala . collection.IterableLike $ class.foreach(IterableLike.scala:72)at scala.collection.AbstractIterable.foreach(Iterable.scala:54)at scala.collection.TraversableLike $ class.map(TraversableLike.scala:234)at scala.collection .AbstractTraversable.map(TRA versable.scala:104)org.apache.flink.table.plan.logical.LogicalRelNode . (operators.scala:530)at org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:503)at at COM.nordstrom.mdt.Job $ .main(Job.scala:112)at com.nordstrom.mdt.Job.main(Job.scala)
注意:无论是否存在特定的映射键,都会发生此错误 . 另请注意,如果我根本没有指定 Map 密钥,我会得到一个有意义的错误;这种情况在这里没有发挥作用 .
这个PR似乎说有一条前进的道路:https://github.com/apache/flink/pull/3767 . 特别关注test case,它表明DataSets可以提供类型信息 . 所有相关方法 fromDataStream
和_282045都没有提供提供类型信息的方法 .
这可能吗?换句话说,Streams上的Flink SQL可以支持 Map 吗?
澄清编辑...当省略 Map 键( GROUP BY ... attributes
而不是 attributes['foo']
)时,我得到以下错误 . 这表明运行时确实知道这些是字符串 .
此类型(接口scala.collection.immutable.Map [scala.Tuple2(_1:String,_2:String)])不能用作键 .
1 回答
目前,Flink SQL仅支持Java
java.util.Map
. Scala映射被视为具有FlinkGenericTypeInfo
/ SQLANY
数据类型的黑盒子 . 因此,您可以转发这些黑盒并在标量函数中使用它们,但不支持使用['key']
运算符进行访问 .因此,您要么使用Java映射,要么自己在UDF中实现访问操作 .
我为您的问题创建了一个问题:https://issues.apache.org/jira/browse/FLINK-7360