首页 文章

Akka Persistence Query和actor分片

提问于
浏览
1

我正在做CQRS Akka演员应用程序的查询端 .

查询actor被设置为集群分片,并使用来自一个持久性查询流的事件填充 .

我的问题是:

  • 如果群集分片中的一个actor重新启动如何恢复它?

  • 关闭整个群集分片并回复所有事件?

  • 使集群中的actor成为持久化actor并为查询端保存新的事件集?

  • 如果使用持久性查询填充的actor重新启动,如何取消当前PQ并再次启动它?

1 回答

  • 2

    如上所述,我将评估在数据库中持久保存查询端 .

    如果这不是一个选项,并且您希望坚持每个分片的单个持久性查询,请在查询actor中执行以下操作:

    var inRecovery: Boolean = true;
    
    override def preStart( ) = {
        //Subscribe to your event live stream now, so you don't miss anything during recovery
        // e.g. send Subscription message to your persistence query actor
    
        //Re-Read everything up to now for recovery
        readJournal.currentEventsByPersistenceId("persistenceId")
            .watchTermination()((_, f) => f pipeTo self) // Send Done to self after recovery is finished
            .map(Replay.apply) // Mark your replay messages
            .runWith( Sink.actorRef( self, tag ) ) // Send all replay events to self
    }
    
    override def receive = {
        case Done => // Recovery is finished
            inRecovery = false
            unstashAll() // unstash all normal messages received during recovery
    
        case Replay( payload ) =>
            //handle replayed messages
    
        case events: Event =>
            //handle normal events from your persistence query
            inRecovery match {
                case true => stash() // stash normal messages until recovery is done
                case false => 
                    // recovery is done, start handling normal events
            }
    }
    
    
    case class Replay( payload: AnyRef )
    

    所以基本上在actor开始订阅持久性查询actor之前,用所有过去事件的有限流恢复状态,这些事件在所有事件通过后终止 . 在恢复期间存储所有传入的事件,这些事件不是重播的事件 . 然后在完成恢复后,取消暂停所有内容并开始处理正常消息 .

相关问题