首页 文章

Spring WebFlux Webclient接收一个应用程序/八位字节流文件作为Mono

提问于
浏览
0

我正在为Kotlin中的一个小型Spring WebFlux应用程序进行原型设计 . 此应用程序需要从远程REST endpoints 获取tar存档并将其本地存储在磁盘上 . 听起来很简单 .

我首先创建了一个集成测试,它启动了spring服务器和另一个带有模拟REST endpoints 的WebFlux服务器,该 endpoints 为tar存档提供服务 .

测试应该像:

1)app:GET mock-server/archive

2)模拟服务器:状态为200的响应和主体中的tar存档作为类型附件

3)app:阻止直到收到所有字节,然后解压并使用文件

我遇到的问题是,当我尝试将字节收集到应用程序的 ByteArray 时,它会永久阻止 .

我的 mock-server/archive 路由到以下功能:

fun serveArchive(request: ServerRequest): Mono<ServerResponse> {
    val tarFile = FileSystemResource(ARCHIVE_PATH)
    assert(tarFile.exists() && tarFile.isFile && tarFile.contentLength() != 0L)
    return ServerResponse
            .ok()
            .contentType(MediaType.APPLICATION_OCTET_STREAM)
            .contentLength(tarFile.contentLength())
            .header("Content-Disposition", "attachment; filename=\"$ARCHIVE_FNAME\"")
            .body(fromResource(tarFile))
}

然后我的应用程序调用以下内容:

private fun retrieveArchive {
    client.get().uri(ARCHIVE_URL).accept(MediaType.APPLICATION_OCTET_STREAM)
            .exchange()
            .flatMap { response ->
                storeArchive(response.bodyToMono())
            }.subscribe()
}

private fun storeArchive(archive: Mono<ByteArrayResource>): Mono<Void> {
    val archiveContentBytes = archive.block() // <- this blocks forever
    val archiveContents = TarArchiveInputStream(archiveContentBytes.inputStream)
    // read archive
}

我看到了How to best get a byte array from a ClientResponse from Spring WebClient?和's why I' m试图使用 ByteArrayResource .

当我逐步完成所有操作时,我看到 serveArchive 似乎正在工作(断言语句说我传递的文件存在并且其中有一些字节) . 在 retrieveArchive 我得到200并且可以在 .headers 中看到所有适当的信息(内容类型,内容长度看起来都很好) . 当我下到 storeArchive 并尝试使用 block 从Mono中检索字节时,它只会永远阻塞 .

我完全失去了如何调试这样的东西 .

1 回答

  • 1

    您只需从 flatMap 返回转换后的正文,以便从 Mono<T> 转换为 T

    client.get().uri(ARCHIVE_URL).accept(MediaType.APPLICATION_OCTET_STREAM)
                .exchange()
                .flatMap { response ->
                    response.bodyToMono(ByteArrayResource::class.java)
                }
                .map { archiveContentBytes ->
                    archiveContentBytes.inputStream
                }
                .doOnSuccess { inputStream ->
                    //here is you code to do anything with the inputStream
                    val archiveContents = TarArchiveInputStream(inputStream)
                }
                .subscribe()
    

相关问题