如果实体大小> 1K,则Akka-Stream,日志记录,物化流失败

使用Akka 2.4.7 . 我想记录整个Http响应 . 使用类似于How does one log Akka HTTP client requests的实现关注的代码是从HttpEntity中提取数据的代码

def entityAsString(entity: HttpEntity) (implicit m: Materializer, ex: ExecutionContext): Future[String] = {
    entity.dataBytes.map(_.decodeString("UTF-8")).runWith(Sink.head)
}

如果POST请求具有较小的有效负载,则此方法很有效 . 但是从1K开始有一个例外:

java.lang.IllegalStateException: Substream Source cannot be materialized more than once

QUESTION :为什么此异常取决于POST有效负载的大小 . 希望有任何可能的解决方案吗?

完整日志消息:

2016-08-11 10:15:35,100 ERROR aaActorSystemImpl [undefined]:处理请求HttpRequest时出错(HttpMethod(POST),[http://localhost:3001/api/v2/exec,List(User-Agent](http://localhost:3001/api/v2/exec,List(User-Agent):curl / 7.30.0,主机:localhost:3001,接受:/,Expect:100 -continue,Timeout-Access:),HttpEntity.Default(multipart / form-data; boundary = ------------------------- acebdf13572468; charset = UTF-8,5599,Source(SourceShape(StreamUtils $$ anon $ 2.out),CompositeModule [2db5bfef]
姓名:未命名
模块:
(未命名)CompositeModule [4aac8b90]
姓名:未命名
模块:
(SubSource%28EntitySource%29)GraphStage(EntitySource)[073d36ba]
(未命名)[155dd7c9] GraphStage(OneHundredContinueStage)[40b6c892]的副本
(未命名)[1b902132] GraphStage(Collect)[75f65c1c]的副本
(可限制的)[76375468] CompositeModule [59626a09]的副本
名称:限制
模块:
(未命名)GraphStage(未知操作)[1bee846d]
下行流:
上行流:
MatValue:忽略
下行流:
SubSource.out - > GraphStage.in
GraphStage.out - > Collect.in
Collect.out - > unknown-operation.in
上行流:
GraphStage.in - > SubSource.out
Collect.in - > GraphStage.out
unknown-operation.in - > Collect.out
MatValue:Atomic(SubSource%28EntitySource%29 [073d36ba])
(未命名)[77d6c04c] GraphStage的副本(akka.http.impl.util.StreamUtils$$anon$2@30858cb0)[7e073049]
下行流:
SubSource.out - > GraphStage.in
GraphStage.out - > Collect.in
Collect.out - > unknown-operation.in
unknown-operation.out - > StreamUtils $$ anon $ 2.in
上行流:
GraphStage.in - > SubSource.out
Collect.in - > GraphStage.out
unknown-operation.in - > Collect.out
StreamUtils $$ anon $ 2.in - > unknown-operation.out
MatValue:Atomic(akka.stream.impl.StreamLayout $ CompositeModule [4aac8b90]))),HttpProtocol(HTTP / 1.1))
java.lang.IllegalStateException:子流源不能多次实现
at akka.stream.impl.fusing.SubSource $$ anon $ 4.setCB(StreamOfStreams.scala:703)
at akka.stream.impl.fusing.SubSource $$ anon $ 4.preStart(StreamOfStreams.scala:713)
at akka.stream.impl.fusing.GraphInterpreter.init(GraphInterpreter.scala:475)
at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:380)
at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:538)
at akka.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:586)
at akka.actor.Actor $ class.aroundPreStart(Actor.scala:489)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:529)
at akka.actor.ActorCell.create(ActorCell.scala:590)
at akka.actor.ActorCell.invokeAll $ 1(ActorCell.scala:461)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
at akka.dispatch.Mailbox.run(Mailbox.scala:223)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
在scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
在scala.concurrent.forkjoin.ForkJoinPool $ WorkQueue.runTask(ForkJoinPool.java:1339)
在scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
在scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

回答(1)

2 years ago

我假设 entity.dataBytes 在调用 entityAsString 之前已经用于某些有用的目的,或者 entityAsString 被调用两次 . 一般情况下, HttpEntity 的内容无法重复使用 . 但是, HttpEntity.Strict 的内容可以重复使用 .