我有一个 Source[ByteString, NotUsed]
列表与来自S3存储桶的文件名配对 . 这些需要压缩在恒定的内存中,并在Play 2.6中提供 .
这里有一个类似的问题:stream a zip created on the fly with play 2.5 and akka stream with backpressure
使用Akka Streams的相关代码片段(Play 2.6需要):
https://gist.github.com/kirked/03c7f111de0e9a1f74377bf95d3f0f60
到目前为止,我的实验基于上面的要点,然而,要点解决了另一个问题 - 它通过传递图形阶段 InputStream
来从磁盘流式传输文件 . 但是,没有安全的方法可以将 Source[ByteString, NotUsed]
转换为 InputStream
,因此我无法按原样使用该代码段 .
到目前为止,我的实验一直是将输入类型从 () => InputStream
更改为 () => Source[ByteString, NotUsed]
,然后使用 source.runForeach(...)
消耗它 .
我的大部分变化都在这里:
override def onPush(): Unit = {
val (filepath, source: StreamGenerator) = grab(in)
buffer.startEntry(filepath)
val src: Source[ByteString, NotUsed] = source()
val operation = src.runForeach(bytestring => {
val byteInputStream = new ByteArrayInputStream(bytestring.toArray)
emitMultiple(out, fileChunks(byteInputStream, buffer))
})
operation.onComplete {
case _ => buffer.endEntry()
}
Await.ready(operation, 5.minute)
}
我知道这是阻止,但我不确定在这种情况下是否允许它 .
我如何以安全的方式完成此任务?
EDIT
我也试过这个版本更接近要点:
override def onPush(): Unit = {
val (filepath, source: StreamGenerator) = grab(in)
buffer.startEntry(filepath)
val stream = source().runWith(StreamConverters.asInputStream(1.minute))
currentStream = Some(stream)
emitMultiple(out, fileChunks(stream, buffer), () => buffer.endEntry())
}
但是,它会在此堆栈跟踪中产生错误:
[ERROR] [11/27/2017 09:26:38.428] [alpakka-akka.actor.default-dispatcher-3] [akka:// alpakka / user / StreamSupervisor-0 / flow-0-0-headSink]阶段错误[com.company.productregistration.services.s3.StreamedZip@7f573427]:终止活动流,无法读取java.io.IOException:终止活动流,akka.stream.impl无法读取 . io.InputStreamAdapter.subscriberClosedException(InputStreamSinkStage.scala:117)at akka.stream.impl.io.InputStreamAdapter.executeIfNotClosed(InputStreamSinkStage.scala:125)at akka.stream.impl.io.InputStreamAdapter.read(InputStreamSinkStage.scala:144) at com.company.productregistration.services.s3.StreamedZip $$ anon $ 2.result $ 1(StreamedZip.scala:99)at com.company.productregistration.services.s3.StreamedZip $$ anon $ 2. $ anonfun $ fileChunks $ 1(StreamedZip) .scala:105)scala.collection.immutable.Stream $ Cons.tail(Stream.scala:1169)at scala.collection.immutable.Stream $ Cons.tail(Stream.scala:1159)at scala.collection.immutable . StreamIte rator . $ anonfun $ next $ 1(Stream.scala:1058)at scala.collection.immutable.StreamIterator $ LazyCell.v $ lzycompute(Stream.scala:1047)at scala.collection.immutable.StreamIterator $ LazyCell.v(Stream . scala:1047)at akka.stream.impl.fusing.GraphInterpreter at akka.stream.stage.GraphStageLogic $ EmittingIterator.onPull(GraphStage.scala:911)的scala.collection.immutable.StreamIterator.hasNext(Stream.scala:1052) .processPull(GraphInterpreter.scala:506)at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:412)akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:571)at akka . stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:541)at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:659)at akka.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter . scala:707)at akka.actor.Actor.aroundPreStart(Actor.scala:522)at akka.stream的akka.actor.Actor.aroundPreStart $(Actor.scala:522) . 位于akka.actor的akka.actor.ActorCell.create(ActorCell.scala:591)的akka.actor.ActorCell.invokeAll $ 1(ActorCell.scala:462)中的impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:650) . ActorCell.systemInvoke(ActorCell.scala:484)at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)at akka.dispatch.Mailbox.run(Mailbox.scala:223)at akka.dispatch.Mailbox.exec(Mailbox .scala:234)at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)at akka.dispatch.forkjoin.ForkJoinPool $ WorkQueue.runTask(ForkJoinPool.java:1339)at akka.dispatch.forkjoin.ForkJoinPool . runWorker(ForkJoinPool.java:1979)at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
EDIT2 如果我没有设置 currentStream = Some(stream)
,我不会收到上述错误 . 此外,它确实适用于某些文件组合 . 我有一个大约20兆字节的较大文件,如果我把它作为最后一个源,会破坏我的zip文件 . 如果我把它放在源列表中的任何其他位置,一切正常 .
以下是我当前图阶段实施的完整列表:
import java.io.{ByteArrayInputStream, InputStream, OutputStream}
import akka.NotUsed
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage._
import akka.util.{ByteString, ByteStringBuilder}
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext}
import scala.util.control.NonFatal
//scalastyle:off
class StreamedZip(bufferSize: Int = 64 * 1024)(implicit ec: ExecutionContext,
mat: ActorMaterializer)
extends GraphStage[FlowShape[StreamedZip.ZipSource, ByteString]] {
import StreamedZip._
val in: Inlet[ZipSource] = Inlet("StreamedZip.in")
val out: Outlet[ByteString] = Outlet("StreamedZip.out")
override val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with StageLogging {
private val buffer = new ZipBuffer(bufferSize)
private var currentStream: Option[InputStream] = None
setHandler(
out,
new OutHandler {
override def onPull(): Unit =
if (isClosed(in)) {
if (buffer.isEmpty) completeStage()
else {
buffer.close
push(out, buffer.toByteString)
}
} else pull(in)
override def onDownstreamFinish(): Unit = {
closeInput()
buffer.close
super.onDownstreamFinish()
}
}
)
setHandler(
in,
new InHandler {
override def onPush(): Unit = {
val (filepath, source: StreamGenerator) = grab(in)
buffer.startEntry(filepath)
val stream = source().runWith(StreamConverters.asInputStream(1.minute))
emitMultiple(out, fileChunks(stream, buffer), () => { buffer.endEntry() })
}
override def onUpstreamFinish(): Unit = {
println("Updstream finish")
closeInput()
if (buffer.isEmpty) completeStage()
else {
buffer.close()
if (isAvailable(out)) {
push(out, buffer.toByteString)
}
}
}
}
)
private def closeInput(): Unit = {
currentStream.foreach(_.close)
currentStream = None
}
private def fileChunks(stream: InputStream, buffer: ZipBuffer): Iterator[ByteString] = {
// This seems like a good trade-off between single-byte
// read I/O performance and doubling the ZipBuffer size.
//
// And it's still a decent defense against DDOS resource
// limit attacks.
val readBuffer = new Array[Byte](1024)
var done = false
def result: Stream[ByteString] =
if (done) Stream.empty
else {
try {
while (!done && buffer.remaining > 0) {
val bytesToRead = Math.min(readBuffer.length, buffer.remaining)
val count = stream.read(readBuffer, 0, bytesToRead)
if (count == -1) {
stream.close
done = true
} else buffer.write(readBuffer, count)
}
buffer.toByteString #:: result
} catch {
case NonFatal(e) =>
closeInput()
throw e
}
}
result.iterator
}
}
}
object StreamedZip {
type ZipFilePath = String
type StreamGenerator = () => Source[ByteString, NotUsed]
type ZipSource = (ZipFilePath, StreamGenerator)
def apply()(implicit ec: ExecutionContext, mat: ActorMaterializer) = new StreamedZip()
}
class ZipBuffer(val bufferSize: Int = 64 * 1024) {
import java.util.zip.{ZipEntry, ZipOutputStream}
private var builder = new ByteStringBuilder()
private val zip = new ZipOutputStream(builder.asOutputStream) {
// this MUST ONLY be used after flush()!
def setOut(newStream: OutputStream): Unit = out = newStream
}
private var inEntry = false
private var closed = false
def close(): Unit = {
endEntry()
closed = true
zip.close()
}
def remaining(): Int = bufferSize - builder.length
def isEmpty(): Boolean = builder.isEmpty
def startEntry(path: String): Unit =
if (!closed) {
endEntry()
zip.putNextEntry(new ZipEntry(path))
inEntry = true
}
def endEntry(): Unit =
if (!closed && inEntry) {
inEntry = false
zip.closeEntry()
}
def write(byte: Int): Unit =
if (!closed && inEntry) zip.write(byte)
def write(bytes: Array[Byte], length: Int): Unit =
if (!closed && inEntry) zip.write(bytes, 0, length)
def toByteString(): ByteString = {
zip.flush()
val result = builder.result
builder = new ByteStringBuilder()
// set the underlying output for the zip stream to be the buffer
// directly, so we don't have to copy the zip'd byte array.
zip.setOut(builder.asOutputStream)
result
}
}
1 回答
我最终使用了上面的
ZipBuffer
并使用akka stream DSL解决了整体问题 .我的解决方案如下: