我有一个简单的工作,将数据从pub sub移动到gcs . pub子主题是一个共享主题,具有不同大小的许多不同消息类型

我希望结果在GCS中相应地进行垂直分区:

架构/版本/年/月/日/

在该父键下应该是当天的一组文件,文件应该是合理的大小,即10-200 MB

我使用scio,我能够进行groupby操作来进行[String,Iterable [Event]]的P / SCollection,其中密钥基于上面的分区方案 .

我无法使用默认文本接收器,因为它们不支持垂直分区,它只能将整个pcollection写入一个位置 . 而是遵循以下答案中的建议:

How do I write to multiple files in Apache Beam?

Writing to Google Cloud Storage from PubSub using Cloud Dataflow using DoFn

我创建了一个简单的函数,将我的组写入gcs .

object GcsWriter {

  private val gcs: storage.Storage = StorageOptions.getDefaultInstance.getService

  val EXTENSION = ".jsonl.gz"

  //todo no idea if this is ok. see org.apache.beam.sdk.io.WriteFiles is a ptransform that writes text files and seems very complex
  //maybe beam is aimed at a different use case
  //this is an output 'transform' that writes text files
  //org.apache.beam.sdk.io.TextIO.write().to("output")


  def gzip(bytes: Array[Byte]): Array[Byte] = {
    val byteOutputStream = new ByteArrayOutputStream()
    val compressedStream = new GZIPOutputStream(byteOutputStream)
    compressedStream.write(bytes)
    compressedStream.close()
    byteOutputStream.toByteArray
  }

  def writeAsTextToGcs(bucketName: String, key: String, items: Iterable[String]): Unit = {
    val bytes = items.mkString(start = "",sep ="\n" ,end = "\n").getBytes("UTF-8")
    val compressed = gzip(bytes)
    val blobInfo = BlobInfo.newBuilder(bucketName, key + System.currentTimeMillis() + EXTENSION).build()
    gcs.create(blobInfo, compressed)
  }

}

这工作并写我喜欢的文件我使用固定窗口的以下触发规则:

val WINDOW_DURATION: Duration = Duration.standardMinutes(10)
  val WINDOW_ELEMENT_MAX_COUNT = 5000
  val LATE_FIRING_DELAY: Duration = Duration.standardMinutes(10) //this is the time after receiving late data to refiring
  val ALLOWED_LATENESS: Duration = Duration.standardHours(1)


  val WINDOW_OPTIONS = WindowOptions(
    trigger = AfterFirst.of(
      ListBuffer(
        AfterPane.elementCountAtLeast(WINDOW_ELEMENT_MAX_COUNT),
        AfterWatermark.pastEndOfWindow().withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(LATE_FIRING_DELAY)))),
    allowedLateness = ALLOWED_LATENESS,
    accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES
  )

基本上是根据水印或当接收到x元素时窗口末端的复合触发器 .

问题在于源数据可以具有不同大小的消息 . 因此,如果我选择固定数量的元素来触发,我将:

1)选择一个太大的数字,对于较大的事件组,它将炸毁工作者的java堆2)选择一个较小的数字,然后我最终得到一些微小的文件,用于安静的事件,我想积累更多的事件在我的档案中 .

我没有看到自定义触发器,我可以传递一个lambda,它输出每个元素或类似的元素的度量 . 有没有办法可以实现我自己的触发器来触发窗口中的字节数 .

其他一些问题

我是否正确假设每个组中元素的迭代器在内存中而不是从存储中流式传输?如果不是,我可以以更有效的内存方式从迭代器流向gcs

对于我的GCS编写器,我只是在 Map 或ParDo中进行 . 它没有实现File输出接收器或看起来像TextIo . 这个简单的实现会出现问题吗?它在文档中说,如果转换抛出一个异常,它只是重试(对于流应用程序而言是无限的)