我写了一个定制的火花水槽 . 在我的 addBatch
方法中,我使用 ForEachPartitionAsync
,如果我没有错,只会使驱动程序异步工作,返回未来 .
val work: FutureAction[Unit] = rdd.foreachPartitionAsync { rows =>
val sourceInfo: StreamSourceInfo = serializeRowsAsInputStream(schema, rows)
val ackIngestion = Future {
ingestRows(sourceInfo) } andThen {
case Success(ingestion) => ackIngestionDone(partitionId, ingestion)
}
Await.result(ackIngestion, timeOut) // I would like to remove this line..
}
work onSuccess {
case _ => // move data from temporary table, report success of all workers
}
work onFailure{
//delete tmp data
case t => throw t.getCause
}
我找不到一种方法来运行工作节点而不会在Await调用上阻塞,就像我删除它们一样成功报告给 work
future对象,尽管未来还没有真正完成 .
有没有办法向驱动程序报告所有工作人员完成了异步工作?
注意:我查看了 foreachPartitionAsync
函数,它只有一个实现需要一个返回Unit的函数(我希望它有另一个函数返回一个未来......)