首页 文章

使用Akka Streams快速压缩一个List [Source [ByteString,NotUsed]]

提问于
浏览
3

我有一个 Source[ByteString, NotUsed] 列表与来自S3存储桶的文件名配对 . 这些需要压缩在恒定的内存中,并在Play 2.6中提供 .

这里有一个类似的问题:stream a zip created on the fly with play 2.5 and akka stream with backpressure

使用Akka Streams的相关代码片段(Play 2.6需要):

https://gist.github.com/kirked/03c7f111de0e9a1f74377bf95d3f0f60

到目前为止,我的实验基于上面的要点,然而,要点解决了另一个问题 - 它通过传递图形阶段 InputStream 来从磁盘流式传输文件 . 但是,没有安全的方法可以将 Source[ByteString, NotUsed] 转换为 InputStream ,因此我无法按原样使用该代码段 .

到目前为止,我的实验一直是将输入类型从 () => InputStream 更改为 () => Source[ByteString, NotUsed] ,然后使用 source.runForeach(...) 消耗它 .

我的大部分变化都在这里:

override def onPush(): Unit = {
  val (filepath, source: StreamGenerator) = grab(in)
  buffer.startEntry(filepath)
  val src: Source[ByteString, NotUsed] = source()
  val operation = src.runForeach(bytestring => {
    val byteInputStream = new ByteArrayInputStream(bytestring.toArray)
    emitMultiple(out, fileChunks(byteInputStream, buffer))
  })
  operation.onComplete {
    case _ => buffer.endEntry()
  }
  Await.ready(operation, 5.minute)
}

我知道这是阻止,但我不确定在这种情况下是否允许它 .

我如何以安全的方式完成此任务?

EDIT

我也试过这个版本更接近要点:

override def onPush(): Unit = {
  val (filepath, source: StreamGenerator) = grab(in)
  buffer.startEntry(filepath)
  val stream = source().runWith(StreamConverters.asInputStream(1.minute))
  currentStream = Some(stream)
  emitMultiple(out, fileChunks(stream, buffer), () => buffer.endEntry())
}

但是,它会在此堆栈跟踪中产生错误:

[ERROR] [11/27/2017 09:26:38.428] [alpakka-akka.actor.default-dispatcher-3] [akka:// alpakka / user / StreamSupervisor-0 / flow-0-0-headSink]阶段错误[com.company.productregistration.services.s3.StreamedZip@7f573427]:终止活动流,无法读取java.io.IOException:终止活动流,akka.stream.impl无法读取 . io.InputStreamAdapter.subscriberClosedException(InputStreamSinkStage.scala:117)at akka.stream.impl.io.InputStreamAdapter.executeIfNotClosed(InputStreamSinkStage.scala:125)at akka.stream.impl.io.InputStreamAdapter.read(InputStreamSinkStage.scala:144) at com.company.productregistration.services.s3.StreamedZip $$ anon $ 2.result $ 1(StreamedZip.scala:99)at com.company.productregistration.services.s3.StreamedZip $$ anon $ 2. $ anonfun $ fileChunks $ 1(StreamedZip) .scala:105)scala.collection.immutable.Stream $ Cons.tail(Stream.scala:1169)at scala.collection.immutable.Stream $ Cons.tail(Stream.scala:1159)at scala.collection.immutable . StreamIte rator . $ anonfun $ next $ 1(Stream.scala:1058)at scala.collection.immutable.StreamIterator $ LazyCell.v $ lzycompute(Stream.scala:1047)at scala.collection.immutable.StreamIterator $ LazyCell.v(Stream . scala:1047)at akka.stream.impl.fusing.GraphInterpreter at akka.stream.stage.GraphStageLogic $ EmittingIterator.onPull(GraphStage.scala:911)的scala.collection.immutable.StreamIterator.hasNext(Stream.scala:1052) .processPull(GraphInterpreter.scala:506)at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:412)akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:571)at akka . stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:541)at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:659)at akka.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter . scala:707)at akka.actor.Actor.aroundPreStart(Actor.scala:522)at akka.stream的akka.actor.Actor.aroundPreStart $(Actor.scala:522) . 位于akka.actor的akka.actor.ActorCell.create(ActorCell.scala:591)的akka.actor.ActorCell.invokeAll $ 1(ActorCell.scala:462)中的impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:650) . ActorCell.systemInvoke(ActorCell.scala:484)at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)at akka.dispatch.Mailbox.run(Mailbox.scala:223)at akka.dispatch.Mailbox.exec(Mailbox .scala:234)at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)at akka.dispatch.forkjoin.ForkJoinPool $ WorkQueue.runTask(ForkJoinPool.java:1339)at akka.dispatch.forkjoin.ForkJoinPool . runWorker(ForkJoinPool.java:1979)at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

EDIT2 如果我没有设置 currentStream = Some(stream) ,我不会收到上述错误 . 此外,它确实适用于某些文件组合 . 我有一个大约20兆字节的较大文件,如果我把它作为最后一个源,会破坏我的zip文件 . 如果我把它放在源列表中的任何其他位置,一切正常 .

以下是我当前图阶段实施的完整列表:

import java.io.{ByteArrayInputStream, InputStream, OutputStream}

import akka.NotUsed
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage._
import akka.util.{ByteString, ByteStringBuilder}

import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext}
import scala.util.control.NonFatal

//scalastyle:off
class StreamedZip(bufferSize: Int = 64 * 1024)(implicit ec: ExecutionContext,
                                               mat: ActorMaterializer)
    extends GraphStage[FlowShape[StreamedZip.ZipSource, ByteString]] {

  import StreamedZip._

  val in: Inlet[ZipSource]    = Inlet("StreamedZip.in")
  val out: Outlet[ByteString] = Outlet("StreamedZip.out")
  override val shape          = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with StageLogging {
      private val buffer                             = new ZipBuffer(bufferSize)
      private var currentStream: Option[InputStream] = None

      setHandler(
        out,
        new OutHandler {
          override def onPull(): Unit =
            if (isClosed(in)) {
              if (buffer.isEmpty) completeStage()
              else {
                buffer.close
                push(out, buffer.toByteString)
              }
            } else pull(in)

          override def onDownstreamFinish(): Unit = {
            closeInput()
            buffer.close
            super.onDownstreamFinish()
          }
        }
      )

      setHandler(
        in,
        new InHandler {
          override def onPush(): Unit = {
            val (filepath, source: StreamGenerator) = grab(in)
            buffer.startEntry(filepath)
            val stream = source().runWith(StreamConverters.asInputStream(1.minute))
            emitMultiple(out, fileChunks(stream, buffer), () => { buffer.endEntry() })
          }

          override def onUpstreamFinish(): Unit = {
            println("Updstream finish")
            closeInput()
            if (buffer.isEmpty) completeStage()
            else {
              buffer.close()
              if (isAvailable(out)) {
                push(out, buffer.toByteString)
              }
            }
          }
        }
      )

      private def closeInput(): Unit = {
        currentStream.foreach(_.close)
        currentStream = None
      }

      private def fileChunks(stream: InputStream, buffer: ZipBuffer): Iterator[ByteString] = {
        // This seems like a good trade-off between single-byte
        // read I/O performance and doubling the ZipBuffer size.
        //
        // And it's still a decent defense against DDOS resource
        // limit attacks.
        val readBuffer = new Array[Byte](1024)
        var done       = false

        def result: Stream[ByteString] =
          if (done) Stream.empty
          else {
            try {
              while (!done && buffer.remaining > 0) {
                val bytesToRead = Math.min(readBuffer.length, buffer.remaining)
                val count       = stream.read(readBuffer, 0, bytesToRead)
                if (count == -1) {
                  stream.close
                  done = true
                } else buffer.write(readBuffer, count)
              }
              buffer.toByteString #:: result
            } catch {
              case NonFatal(e) =>
                closeInput()
                throw e
            }
          }

        result.iterator
      }
    }
}

object StreamedZip {
  type ZipFilePath     = String
  type StreamGenerator = () => Source[ByteString, NotUsed]
  type ZipSource       = (ZipFilePath, StreamGenerator)

  def apply()(implicit ec: ExecutionContext, mat: ActorMaterializer) = new StreamedZip()

}

class ZipBuffer(val bufferSize: Int = 64 * 1024) {
  import java.util.zip.{ZipEntry, ZipOutputStream}

  private var builder = new ByteStringBuilder()
  private val zip = new ZipOutputStream(builder.asOutputStream) {
    // this MUST ONLY be used after flush()!
    def setOut(newStream: OutputStream): Unit = out = newStream
  }
  private var inEntry = false
  private var closed  = false

  def close(): Unit = {
    endEntry()
    closed = true
    zip.close()
  }

  def remaining(): Int = bufferSize - builder.length

  def isEmpty(): Boolean = builder.isEmpty

  def startEntry(path: String): Unit =
    if (!closed) {
      endEntry()
      zip.putNextEntry(new ZipEntry(path))
      inEntry = true
    }

  def endEntry(): Unit =
    if (!closed && inEntry) {
      inEntry = false
      zip.closeEntry()
    }

  def write(byte: Int): Unit =
    if (!closed && inEntry) zip.write(byte)

  def write(bytes: Array[Byte], length: Int): Unit =
    if (!closed && inEntry) zip.write(bytes, 0, length)

  def toByteString(): ByteString = {
    zip.flush()
    val result = builder.result
    builder = new ByteStringBuilder()
    // set the underlying output for the zip stream to be the buffer
    // directly, so we don't have to copy the zip'd byte array.
    zip.setOut(builder.asOutputStream)
    result
  }
}

1 回答

  • 2

    我最终使用了上面的 ZipBuffer 并使用akka stream DSL解决了整体问题 .

    我的解决方案如下:

    import akka.NotUsed
    import akka.actor.ActorSystem
    import akka.stream.scaladsl.Source
    import akka.stream.{ActorMaterializer, SourceShape}
    import akka.util.ByteString
    import com.company.config.AWS
    import org.log4s.getLogger
    
    case class S3StreamingServiceLike(awsConf: AWS, s3Client: S3ClientAlpakka)(
        implicit sys: ActorSystem,
        mat: ActorMaterializer)
        extends S3StreamingService {
    
      private implicit class ConcatSyntax[T, U](source: Source[T, U]) {
        def ++[TT >: T, NotUsed](that: Source[SourceShape[TT], NotUsed]): Source[Any, U] = //scalastyle:ignore
          source.concat(that)
      }
    
      private val logger = getLogger
    
      private sealed trait ZipElement
      private case class FileStart(name: String, index: Int, outOf: Int) extends ZipElement
      private case class FileEnd(name: String, index: Int, outOf: Int)   extends ZipElement
      private case class FilePayload(byteString: ByteString)             extends ZipElement
      private case object EndZip                                         extends ZipElement
    
      private def payloadSource(filename: String) =
        s3Client.download(awsConf.s3BucketName, filename).map(FilePayload.apply)
    
      private def fileNameToZipElements(filename: String,
                                        index: Int,
                                        outOf: Int): Source[ZipElement, NotUsed] =
        Source.single(FileStart(filename, index, outOf)) ++
          payloadSource(filename) ++
          Source.single(FileEnd(filename, index, outOf))
    
      def streamFilesAsZip(filenames: List[String])(forUser: String): Source[ByteString, NotUsed] = {
    
        val zipBuffer = new ZipBuffer()
    
        val zipElementSource: Source[ZipElement, NotUsed] =
          Source(filenames.zipWithIndex).flatMapConcat {
            case (filename, index) => fileNameToZipElements(filename, index + 1, filenames.length)
          } ++ Source.single(EndZip)
    
        zipElementSource
          .map {
            case FileStart(name, index, outOf) =>
              logger.info(s"Zipping file #$index of $outOf with name $name for user $forUser")
              zipBuffer.startEntry(name)
              None
            case FilePayload(byteString) =>
              if (byteString.length > zipBuffer.remaining()) {
                throw new Exception(
                  s"Bytestring size exceeded buffer size ${byteString.length} > ${zipBuffer.remaining}")
              }
              zipBuffer.write(byteString.toArray, byteString.length)
              Some(zipBuffer.toByteString())
            case FileEnd(name, index, outOf) =>
              logger.info(s"Finished zipping file #$index of $outOf with $name for user $forUser")
              zipBuffer.endEntry()
              Some(zipBuffer.toByteString())
            case EndZip =>
              zipBuffer.close()
              Some(zipBuffer.toByteString())
          }
          .collect {
            case Some(bytes) if bytes.length > 0 => bytes
          }
      }
    
    }
    

相关问题