我正在研究的项目需要从SQS读取消息,我决定使用Akka来分发这些消息的处理 .
由于SQS是Camel支持的,并且内置了在Consumer类中使用Akka的功能,我想最好以这种方式实现 endpoints 和读取消息,尽管我没有看到很多人这样做的例子 .
我的问题是我不能足够快地轮询我的队列以保持我的队列空,或接近空 . 我最初的想法是,我可以让消费者从SQS以X / s的速率接收来自Camel的消息 . 从那里,我可以简单地创建更多的消费者,以达到我需要处理消息的速度 .
我的消费者:
import akka.camel.{CamelMessage, Consumer}
import akka.actor.{ActorRef, ActorPath}
class MyConsumer() extends Consumer {
def endpointUri = "aws-sqs://my_queue?delay=1&maxMessagesPerPoll=10&accessKey=myKey&secretKey=RAW(mySecret)"
var count = 0
def receive = {
case msg: CamelMessage => {
count += 1
}
case _ => {
println("Got something else")
}
}
override def postStop(){
println("Count for actor: " + count)
}
}
如图所示,我设置 delay=1
以及 &maxMessagesPerPoll=10
来提高消息速率,但我无法使用相同的 endpoints 生成多个消费者 .
我在文档中读到 By default endpoints are assumed not to support multiple consumers.
并且我相信这对于SQS endpoints 也是如此,因为产生多个消费者将只给我一个消费者,在运行系统一分钟之后,输出消息是 Count for actor: x
而不是其他输出 Count for actor: 0
.
如果这是有用的;我能够在单个消费者的当前实现中读取大约33条消息/秒 .
这是从Akka的SQS队列中读取消息的正确方法吗?如果是这样,我有没有办法让它向外扩展,以便我可以将消息消耗率提高到接近900消息/秒的速度?
2 回答
可悲的是,Camel目前不支持并行消费SQS上的消息 .
http://camel.465427.n5.nabble.com/Amazon-SQS-listener-as-multi-threaded-td5741541.html
为了解决这个问题,我编写了自己的Actor,使用aws-java-sdk轮询批量消息SQS .
虽然我不确定这是否是最好的实现,并且它当然可以改进,以便请求不会经常命中空队列,但它确实适合我当前需要能够轮询来自同一队列的消息 .
从SQS获得大约133(消息/秒)/演员 .
Camel 2.15支持并发消费者,虽然不确定这是多么有用,因为我不知道akka camel是否支持2.15并且我不知道是否有一个消费者角色即使有多个消费者也有所作为 .