首页 文章

如何在@ngrx / effects中等待2个动作

提问于
浏览
5

可以效果等待Promise.all这两个动作吗?例:

@Effect()
pulic addUser() {
   return this.actions$.ofType(user.ADD)
      .switchMap(() => {
         return this.userService.add();
      })
      .map(() => {
         return new user.AddSuccessAction();
      });
}

@Effect()
pulic addUserOptions() {
   return this.actions$.ofType(userOptions.ADD)
      .switchMap(() => {
         return this.userOptionsService.add();
      })
      .map(() => {
         return new userOptions.AddSuccessAction();
      });
}

@Effect()
public complete() {
   return this.actions$.ofType(user.ADD_SUCCESS, userOptions.ADD_SUCCESS)
      // how to make it works like Promise.all ?
      .switchMap(() => {
         return this.statisticService.add();
      })
      .map(() => {
         return new account.CompleteAction();
      });
}

UPDATED 我想要实现的是Promise.all的simillar行为 . 如何并行调度两个效果,等待所有效果都解决,然后发出第三个动作 . 像https://redux-saga.js.org/docs/advanced/RunningTasksInParallel.html这样的承诺它是非常明显的:

Promise.all([fetch1, fetch2]).then(fetch3);

是否有可能在ngrx /效果?或者在ngrx /效果中它是错误的方式?

ANSWER

您可以使用的选项很少:

1) Do not use generic actions.

遵循Myke Ryan的演讲中的这些规则:https://youtu.be/JmnsEvoy-gY

Pros :更容易调试

Cons :大量的样板和动作

2) Use complex stream with nested actions.

查看这篇文章:https://bertrandg.github.io/ngrx-effects-complex-stream-with-nested-actions/

以下是两个操作的简单示例:

@Effect()
public someAction(): Observable<Action> {
    return this.actions$.pipe(
        ofType(actions.SOME_ACTION),
        map((action: actions.SomeAction) => action.payload),
        mergeMap((payload) => {
            const firstActionSuccess$ = this.actions$.pipe(
                ofType(actions.FIRST_ACTION_SUCCESS),
                takeUntil(this.actions$.pipe(ofType(actions.FIRST_ACTION_FAIL))),
                first(),
            );

            const secondActionsSuccess$ = this.actions$.pipe(
                ofType(actions.SECOND_ACTION_SUCCESS),
                takeUntil(this.actions$.pipe(ofType(actions.SECOND_ACTION_FAIL))),
                first(),
            );

            const result$ = forkJoin(firstActionSuccess$, secondActionsSuccess$).pipe(
                first(),
            )
                .subscribe(() => {
                    // do something
                });

            return [
                new actions.FirstAction(),
                new actions.SecondAction(),
            ];
        }),
    );
}

Pros :你可以实现你想要的

Cons :复杂的流太复杂而无法支持:)看起来很丑陋并且可能很快变成地狱,可观察者不会取消订阅直到成功或失败行动,这意味着理论上任何第三方行为都可以向这些可观察者发出信号 .

3) Use aggregator pattern.

检查Victor Savkin关于NgRx的状态管理模式和最佳实践的演示:https://www.youtube.com/watch?v=vX2vG0o-rpM

这是一个简单的例子:

首先,您需要使用correlationId参数创建操作 . CorrelationId应该是uniq,例如它可能是一些guid . 您将在您的操作链中使用此ID来标识您的操作 .

export class SomeAction implements Action {
    public readonly type = SOME_ACTION;

    constructor(public readonly correlationId?: string | number) { }
    // if you need payload, then make correlationId as a second argument
    // constructor(public readonly payload: any, public readonly correlationId?: string | number) { }
}

export class SomeActionSuccess implements Action {
    public readonly type = SOME_ACTION_SUCCESS;

    constructor(public readonly correlationId?: string | number) { }
}

export class FirstAction implements Action {
    public readonly type = FIRST_ACTION;

    constructor(public readonly correlationId?: string | number) { }
}

export class FirstActionSuccess implements Action {
    public readonly type = FIRST_ACTION_SUCCESS;

    constructor(public readonly correlationId?: string | number) { }
}

// the same actions for SecondAction and ResultAction

然后我们的影响:

@Effect()
public someAction(): Observable<Action> {
    return this.actions$.pipe(
        ofType(actions.SOME_ACTION),
        mergeMap((action: actions.SomeAction) => {
            return [
                new actions.FirstAction(action.corelationId),
                new actions.SecondAction(action.corelationId),
            ];
        }),
    );
}

@Effect()
public firstAction(): Observable<Action> {
    return this.actions$.pipe(
        ofType(actions.FIRST_ACTION),
        switchMap((action: actions.FirstAction) => {
            // something
            ...map(() => new actions.FirstActionSuccess(action.correlationId));
        }),
    );
}
// the same for secondAction

@Effect()
public resultAction(): Observable<Action> {
    return this.actions$.pipe(
        ofType(actions.SOME_ACTION),
        switchMap((action: actions.SomeAction) => {
            const firstActionSuccess$ = this.actions$.pipe(
                ofType(actions.FIRST_ACTION_SUCCESS),
                filter((t: actions.FirstActionSuccess) => t.correlationId === action.correlationId),
                first(),
            );

            const secondActionsSuccess$ = this.actions$.pipe(
                ofType(actions.SECOND_ACTION_SUCCESS),
                filter((t: actions.SecondActionSuccess) => t.correlationId === action.correlationId),
                first(),
            );

            return zip(firstActionSuccess$, secondActionsSuccess$).pipe(
                map(() => new actions.resultSuccessAction()),
            )
        }),
    );
}

Pros :与第2点相同,但没有第三方行动 .

Cons :与第1点和第2点相同

4) Do not use effects for API. Use good old services which emulate effects but return Observable.

在你的服务:

public dispatchFirstAction(): Observable<void> {
    this.store.dispatch(new actions.FirstAction(filter));

    return this.service.someCoolMethod().pipe(
        map((data) => this.store.dispatch(new actions.FirstActionSuccess(data))),
        catchError((error) => {
            this.store.dispatch(new actions.FirstActionFail());

            return Observable.throw(error);
        }),
    );
}

所以你可以在以后的任何地方组合它,例如:

const result1$ = this.service.dispatchFirstAction();
const result2$ = this.service.dispatchSecondAction();

forkJoin(result1$, result2$).subscribe();

5) Use ngxs: https://github.com/ngxs/store

Pros :更少的样板,这感觉像有角度的东西,它快速增长

Cons :功能比ngrx少

3 回答

  • 3

    另一个 combineLatest 版本 pipesswitchMap

    import { Observable, of } from 'rxjs'
    import { combineLatest, switchMap, withLatestFrom } from 'rxjs/operators'
    
    @Effect()
    someEffect$: Observable<Actions> = this.actions$.pipe(
      ofType(Action1),
      combineLatest(this.actions$.ofType(Action2)),
      switchMap(() => of({ type: Action3 }))
    )
    
  • 0

    使用Observable.combineLatest对我有用 .

    @Effect()
      complete$ = this.actions$.ofType<Action1>(ACTION1).combineLatest(this.actions$.ofType<Action2>(ACTION2),
        (action1, action2) => {
    
          return new Action3();
        }
      ).take(1);
    

    take(1)结果只调度一次Action3() .

  • 3

    我是RXJS的新手但是这个怎么样 .

    如果将 tap 更改为 switchMap ,则可以删除 {dispatch: false} .

    @Effect({dispatch: false})
    public waitForActions(): Observable<any> {
        const waitFor: string[] = [
            SomeAction.EVENT_1,
            SomeAction.EVENT_2,
            SomeAction.EVENT_3,
        ];
    
        return this._actions$
            .pipe(
                ofType(...waitFor),
                distinct((action: IAction<any>) => action.type),
                bufferCount(waitFor.length),
                tap(console.log),
            );
    }
    

相关问题