-
0 votesanswersviews
从Lagom / Akka Kafka主题订阅者为Websocket创建源代码
我希望我的仅限Lagom订阅者服务订阅Kafka主题并将消息流式传输到websocket . 我使用此文档(https://www.lagomframework.com/documentation/1.4.x/scala/MessageBrokerApi.html#Subscribe-to-a-topic)作为指导,定义如下服务: // service call def stream():... -
3 votesanswersviews
使用Akka Streams快速压缩一个List [Source [ByteString,NotUsed]]
我有一个 Source[ByteString, NotUsed] 列表与来自S3存储桶的文件名配对 . 这些需要压缩在恒定的内存中,并在Play 2.6中提供 . 这里有一个类似的问题:stream a zip created on the fly with play 2.5 and akka stream with backpressure 使用Akka Streams的相关代码片段(Play ... -
1 votesanswersviews
AVRO架构更新的麻烦
我有一个简单的案例类: case class User(id: String, login: String, key: String) 我添加字段“名称” case class User(id: String, login: String, name: String, key: String) 然后在avro架构中添加此字段(user.avsc) { "namespace&quo... -
29 votesanswersviews
Akka Stream Kafka vs Kafka Streams
我目前正在与Akka Stream Kafka合作与kafka互动,我很惊讶与Kafka Streams有什么不同 . 我知道基于Akka的方法实现了反应性规范并处理了kafka流似乎缺乏的背压和功能 . 使用kafka流比akka溪流kafka有什么好处? -
9 votesanswersviews
在akka-stream中如何从期货集合中创建无序的来源
我需要从 Future[T] 的集合中创建 akka.stream.scaladsl.Source[T, Unit] . 例如,有一组期货返回整数, val f1: Future[Int] = ??? val f2: Future[Int] = ??? val fN: Future[Int] = ??? val futures = List(f1, f2, fN) 如何创建一个 val sou... -
2 votesanswersviews
如何在Akka Streams中获取导致失败的对象?
根据akka streams docs,可以通过定义将 Throwable 映射到 Strategy 的决策程序来处理流故障: val decider: Supervision.Decider = { case _: ArithmeticException => Supervision.Resume case _ => Supervis... -
0 votesanswersviews
如果实体大小> 1K,则Akka-Stream,日志记录,物化流失败
使用Akka 2.4.7 . 我想记录整个Http响应 . 使用类似于How does one log Akka HTTP client requests的实现关注的代码是从HttpEntity中提取数据的代码 def entityAsString(entity: HttpEntity) (implicit m: Materializer, ex: ExecutionContext): Futur... -
6 votesanswersviews
是否使用源队列实现线程安全的akka-http中的连接池?
参考以下提到的实现: http://doc.akka.io/docs/akka-http/10.0.5/scala/http/client-side/host-level.html val poolClientFlow = Http().cachedHostConnectionPool[Promise[HttpResponse]]("akka.io") val queue = ... -
2 votesanswersviews
Kafka Akka Streams Consumer在超时后因WakeupException而中断 . 消息:null
我正在做一个简单的代码: 使用docker创建kafka代理 使用scala akka生成器发布消息 使用scala akka使用者消费消息 . 可以在我的github帐户中找到如何运行的代码和说明: https://github.com/dvirgiln/akka-streams-ts 消费者失败了: val consumerSettings = ConsumerSettings... -
0 votesanswersviews
Play 2.5 ActorFlow:无法从Flow <String,String,capture#1-of?>转换为Flow <String,String,?>
在迁移到Play 2.5时,我正在尝试实现新的WebSocket功能,如here所述(使用actor处理WebSockets) . 在Play 2.5.4中,没有play.libs.streams.ActorFlow(如示例所示),但只有play.api.libs.streams.ActorFlow . 但是当我写这段代码时: public class MyController extends C... -
1 votesanswersviews
在Akka中PersistentView的典型用例是什么?
这里http://doc.akka.io/docs/akka/current/scala/persistence.html写道: PersistentView:视图是一个持久的有状态actor,它接收由另一个持久化actor编写的记录消息 . 视图本身不会记录新消息,而是仅从持久性actor的复制消息流更新内部状态 . PersistentView的典型用例是什么? 它与Akka Stream... -
2 votesanswersviews
持续演员的Akka Sharding背压
是否有Akka流背压模型或使用持久演员的akka分片? 因为我有问题,我有一个持久演员的akka分片集群 . (使用cassandra作为日志插件) .有时候应该同时创建很多演员 . (例如,当我们想要向所有用户发送广播消息时 . )成千上万的持久性演员试图在短时间内恢复(例如3.秒)并且在重负荷下cassandra无法在一段时间内做出响应并且许多演员多次无法恢复并且永远不会再次恢复 . -
0 votesanswersviews
将案例类(事件)传递给Akka Stream的Source()
我正在尝试实施CQRS解决方案 . 在我执行一些操作(执行一个命令)后,我将生成的事件(例如案例类(例如:AccountCreated))传递给另一个actor,然后使用akka流将事件通过线路推送到某个外部Sink . 假设事件类型是 Events ,我的发布actor的receive函数看起来像这样 override def receive = { case event: Events ... -
1 votesanswersviews
Akka Persistence Query和actor分片
我正在做CQRS Akka演员应用程序的查询端 . 查询actor被设置为集群分片,并使用来自一个持久性查询流的事件填充 . 我的问题是: 如果群集分片中的一个actor重新启动如何恢复它? 关闭整个群集分片并回复所有事件? 使集群中的actor成为持久化actor并为查询端保存新的事件集? 如果使用持久性查询填充的actor重新启动,如何取消当前PQ并再次启动它? -
0 votesanswersviews
将WebSocket从Play移动到Akka HTTP
我在Play应用程序中有一个WebSocket服务器,我想将它移动到akka-http服务 . 我目前正在使用 ActorFlow.actorRef ,这是Play中不存在的一部分 . 当WebSocket被接受时,我订阅了RabbitMQ队列,并将每条消息转发给WebSocket . 当我收到来自WebSocket的消息时,我会在本地处理一些消息并将其他人转发到RabbitMQ交换机 . 我如何... -
1 votesanswersviews
akka stream kafka Web套接字客户端在非活动期30秒后停止接收消息
我想从Kafka主题中读取消息并发送到Web套接字客户端 . 我使用akka-stream-kafka创建了一个示例Web套接字服务器应用程序 . 我正在使用kafka-console-producer脚本向kafka主题发送消息 . 工作正常,浏览器客户端通过Web套接字从kafka主题接收数据 . 如果我在30秒内没有向主题发送消息,服务器会给出以下消息: [KafkaServiceActor... -
0 votesanswersviews
Akka流:将新的发布者/订阅者附加到Flow
我正在构建一个Akka应用程序,并希望将某些actor的FSM状态转换暴露给外部消费者 . (最终目标是能够将状态转换消息推送到websocket,以便可以实时查看它们 . ) 根据文档Combining dynamic stages to build a simple Publish-Subscribe service,看起来我需要公开代表pub-sub通道的Flow,以便消费者和 生产环境 ... -
0 votesanswersviews
如何使用Akka Streams和Akka HTTP订阅websockets到actor的消息?
我想通过websockets向客户发送通知 . 这些通知是由actor生成的,因此我试图在服务器启动时创建一个actor的消息流,并订阅websockects与此流的连接(仅发送自订阅以来发出的那些通知) 使用Source.actorRef,我们可以创建一个actor消息源 . val ref = Source.actorRef[Weather](Int.MaxValue, fail) ... -
2 votesanswersviews
Kafka主题到websocket
我正在尝试实现一个设置,其中我有多个Web浏览器打开与我的akka-http服务器的websocket连接,以便读取发布到kafka主题的所有消息 . 所以消息流应该这样 kafka topic -> akka-http -> websocket connection 1 -> websocket connection 2 ... -
0 votesanswersviews
如何将akka streams kafka(reactive-kafka)集成到akka http应用程序中?
我有一个基本的scala akka http CRUD应用程序 . 请参阅下面的相关课程 . 我只想将实体id和一些数据(作为json)写入Kafka主题,例如,创建/更新实体 . 我正在看http://doc.akka.io/docs/akka-stream-kafka/current/producer.html,但我是scala和akka的新手,并且不确定如何将它集成到我的应用程序中? 例如,... -
1 votesanswersviews
如何在Akka Streams Kafka中构建ProducerMessage时配置偏移参数?
我正在使用Scala 2.11和Akka Streams Kafka 0.17 . 我有 stream 其中: A Source 使用Source.actorRef创建 . 这里,actor被安排以一定的间隔运行并连续生成消息,这些消息被发送到流 . 我已将 Producer 附加为 Flow . 制作人将 ProducerMessage.Message 推送到Kafka主题 . ... -
2 votesanswersviews
当使用websocket连接时,使用akka-stream-kafka从kafka主题获取最后一条消息
是否可以使用Akka Streams Kafka在Kafka主题上获取最后一条消息?我正在创建一个侦听Kafka主题的websocket,但是当我连接时它会检索所有先前的unred消息 . 这可以添加相当多的消息,所以我只对最后消息中的任何未来消息感兴趣 . (或仅限未来的消息) 来源: def source(): Flow[Any, String, NotUsed] = { val sou... -
1 votesanswersviews
如何在Akka Stream Kafka ConsumerSettings中指定Kafka Zookeeper endpoints
我有一个Kafka Broker,它运行在 Cloud 端的某个地方,当我试图通过命令行消费者工具从中消耗时,我可以使用消息 . 但是,当我在我的akka-stream kafka ConsumerSettings中放置相同的 endpoints 时,它无法正常工作 .例如: - bin / kafka-console-consumer.sh --zookeeper hostname:2181 ... -
0 votesanswersviews
使用OpenCV BackgroundSubtractorMOG2和Java中的Akka流实现流实现期间崩溃
对于我的项目,我想创建一个应用程序,在使用Akka流时使用Java中的OpenCV库进行一些视频分析 . 我尝试在一个不使用Akka流的单独项目中使用BGsubtractorMOG2,一切正常,但现在当我使用包含MOG2功能的舞台实现我的流时,我的程序崩溃了 . 我确信问题出在MOG2中,因为如果我尝试删除它并只是捕获并在视频上显示帧,一切正常 . 这里是akka actor中的一些代码 priv... -
0 votesanswersviews
Lagom Framework主题Websocket超时连接因错误而关闭
我在我的服务中创建了一个lagom主题 . def addingsTopic(): Topic[ProcessDefinitionAdded] 描述符: override def descriptor: Descriptor = { import Service._ named(ProductionService.TOPIC_NAME).withCalls( pat... -
17 votesanswersviews
akka http:阿卡流与演员 Build 休息服务
在akka http上使用60 API创建REST Web服务时 . 如何选择是否应该选择akka流或akka演员?在他的post中,乔斯展示了两种在akka http上创建API的方法,但他没有显示何时我应该选择一个而不是另一个 .