首页 文章

RXJS可观察取消订阅内部可观察量

提问于
浏览
2

this.route.params 被销毁并且 this.sub 被取消订阅后, "interval run" 继续记录 . 这违反了我的直觉,即通过一次订阅,在取消订阅后,一切都应该停止发生 . 什么's happening here & what'是最好的处理方式?我应该添加一个takeUntil,还是应该使用concatMap以外的东西? (我正在查看新照片)

constructor(private authService:AuthService,
            private route:ActivatedRoute,
            private router:Router,
            private photosApi:PhotosApi) {
}

ngOnInit() {
    this.sub = this.route.params
        .switchMap(()=>{
            console.log('switchmap Ruun')
            return Observable.interval(2000).startWith(0).concatMap(()=>{
                console.log('interval run')
                return this.photosApi.apiPhotosGet(this.authService.getAuth(), 0, 40)
            })
        })
        .subscribe((data:PagedResultPatientPhotoModel) => {
                this.processPhotos(data)
            }).add(()=>console.log('unsubscribe happened'))

编辑:我尝试重写以使其更干净,以下使用concatMap只进行一次调用,我不确定为什么它在第一次调用后停止工作:

this.intervalSub = Observable.interval(3000).startWith(0).concatMap(()=>{
        console.log('getting photos interval', this.route.params)
        return this.route.params.switchMap(()=>{
            console.log('getting photos')
            return this.photosApi.apiPhotosGet(this.authService.getAuth(), 0, 40)
        })
    }).subscribe((data:PagedResultPatientPhotoModel) => {
                this.processPhotos(data)
            }).add(()=>console.log('unsubscribe happened'))

只要api调用的运行速度快于3000,以及switchMap的工作原理如下,否则它们会相互抵消:

this.intervalSub = Observable.interval(3000).startWith(0).switchMap(()=>{
        console.log('getting photos interval', this.route.params)
        return this.route.params.switchMap(()=>{
            console.log('getting photos')
            return this.photosApi.apiPhotosGet(this.authService.getAuth(), 0, 40)
        })
    }).subscribe((data:PagedResultPatientPhotoModel) => {
                this.processPhotos(data)
            }).add(()=>console.log('unsubscribe happened'))

我很好奇为什么switchMap(和mergemap)每次都工作但是concatMap只运行一次,看起来像RXJS中的一个bug

我决定使用mergeMap,因为我觉得这些请求不太可能经常出现错误,如果是这样,它会在几秒后自行纠正 .

1 回答

  • 1

    问题不在于 switchMap ,它的行为与您期望的方式相同 . 在以下代码段中,当对组合的observable的订阅被取消订阅时,内部 interval observable被取消订阅:

    const source = new Rx.Subject();
    
    const subscription = source
      .switchMap(() => {
        console.log('> switchmap');
        return Rx.Observable.interval(500);
      })
      .subscribe((value) => console.log(value));
    
    source.next(1);
    setTimeout(() => source.next(2), 2000);
    
    setTimeout(() => {
      console.log('> unsubscribe');
      subscription.unsubscribe();
    }, 5000);
    
    .as-console-wrapper { max-height: 100% !important; top: 0; }
    
    <script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
    

    您的问题在于调用订阅的 add 方法 . 这会创建一个额外的订阅,它是分配给 sub 的订阅 .

    初始订阅 - 您已添加其他订阅 - 未分配给变量,并且永远不会取消订阅 . 这是看到 interval 继续运行的订阅 .

    以下代码段显示了问题:

    const source = new Rx.Subject();
    
    const subscription1 = source
      .switchMap(() => {
        console.log('> switchmap');
        return Rx.Observable.interval(500);
      })
      .subscribe((value) => console.log(value));
    
    const subscription2 = subscription1.add(() => console.log('> unsubscribed'));
    
    source.next(1);
    setTimeout(() => source.next(2), 2000);
    
    setTimeout(() => {
      console.log('> unsubscribe 2');
      subscription2.unsubscribe();
    }, 5000);
    setTimeout(() => {
      console.log('> unsubscribe 1');
      subscription1.unsubscribe();
    }, 6000);
    
    .as-console-wrapper { max-height: 100% !important; top: 0; }
    
    <script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
    

    请注意,在 subscription1 上调用 unsubscribe 也会看到 subscription2 取消订阅 - 但反之亦然 .

相关问题