首页 文章

rxjs - observable什么都不发出(没有完成?)

提问于
浏览
0

我有一个角度6应用程序,我最初设置我的后端与休息api,但我开始转换部件使用socket.io .

当我从我的rest api返回数据时,以下工作:

this.http.get(api_url + '/versions/entity/' + entityId).pipe(
  mergeMap((versions:IVersion[]) => versions),
  groupBy((version:IVersion) => version.type),
  mergeMap(group => group.pipe(
    toArray(),
    map(versions=> {
      return {
        type: group.key,
        versions: versions
      }
    }),
    toArray()
  )),
  reduce((acc, v) => acc.concat(v), [])
);

快递路线:

router.get('/entity/:entityId', (req, res) => {
  const entityId = req.params.entityId;

  Version.getVersionsByEntity(entityId, (err, versions) => {
    if (err) {
      res.json({success: false, msg: err});
    } else {
      res.json(versions);
    }
  });
});

哪个用mongoose从我的mongo数据库中获取数据:

export function getVersionsByEntity(entityId, callback) {
  console.log('models/version - get versions by entity');

  Version.find({'entity.entityId': entityId})
          .exec(callback);
}

但是,当我使用socket.io进行完全相同的调用时,observable不会返回任何内容 . 我猜它是因为它永远不会完成?成功传输数据时,http呼叫是否发送“完整”消息?

套接字从此服务发送:

getVersionsByEntity(entityId): Observable<IVersion[]> {
    // create observable to list to refreshJobs message
    let observable = new Observable(observer => {
      this._socketService.socket.on('versionsByEntity', (data) => {
        observer.next(data);
      });
    });

    this._socketService.event('versionsByEntity', entityId);

    return <Observable<IVersion[]>> observable;
  }

从服务器调用相同的mongoose函数 .

从(套接字)服务返回的observable确实包含数据,只有在我添加 toArray() 函数时它才会在我订阅时从不打印任何内容 .

有人可以帮我解决这个问题并解释其背后的理论吗?它没有完成吗? http是否使用Angular发送“已完成”消息?

编辑:我已经创建了一个简单的stackblitz,它正在做我想要的但我想从_dataService中删除take(1)因为我可能想要更新数据所以我想保持observable打开 - https://stackblitz.com/edit/rxjs-toarray-problem

编辑2:我接近用扫描操作符替换为Array,但它似乎是为阵列发射两次 . reduce()发出正确的数据,但似乎只发出完整(如toArray),所以它没有更好 - https://stackblitz.com/edit/rxjs-toarray-problem-tpeguu

2 回答

  • 1

    处理事件流有点不同 . 我试图提出一些代码:

    getVersionsByEntity(entityId): Observable<IVersion[]> {
        return defer(() => {
            this._socketService.event('versionsByEntity', entityId);
    
            return fromEvent(this._socketService.socket, 'versionsByEntity')
                .pipe(take(1))
        }) as Observable<IVersion[]>;
    }
    

    主要思想是将所有内容包装在 defer 中,这将使Observable变得懒惰,并且只在订阅时调用 socketService.event (您的原始实现通过立即调用 socketService.event 而急切) . 实施急切可能会产生意想不到的后果 - 如果Observable订阅得太晚,很容易错过事件 .

    我还建议使用 fromEvent Observable工厂 - 它处理事件监听器设置和拆除 .

    最后在第一次发射后完成Observable我已添加 take(1) - 这会将排放数量限制为1并取消订阅Observable .

  • 1

    除了@ m1ch4ls所说的,你必须考虑 toArray 的工作原理 . toArray 将Observable通知的所有数据转换为此类数据的数组 . 为了使其工作,Observable必须完成 .

    Angular http客户端返回的Observable始终在第一次通知后完成,因此 toArray 可以正常工作 . 一个socket.io流在关闭时完成,因此在这样的流上使用 toArray 可能会导致在流未关闭的情况下永远不会从Observable中获取任何值 .

    另一方面,如果你想在一次通知后关闭流,如果你使用 take(1) 就会发生这种情况,那么你最好考虑使用http请求 . 套接字流被认为是一种长生活 Channels ,因此如果你必须在传输一条消息后不得不关闭它们,它就不适合它们的性质 .

    UPDATED VERSION after comment

    这是没有 take 的代码

    getData() {
        return this._dataService.getVersions().pipe(
          map(
            (versions: Array<any>) => versions.reduce(
              (acc, val) => {
                const versionGroup = (acc.find(el => el.type === val.type));
                if (versionGroup) {
                  versionGroup.versions.push(val)
                } else {
                  acc.push({type: val.type, versions: [val]})
                }
                return acc;
              }, []
            )
          ),
        )
    

    要理解的关键点是,您的服务正在为您返回一些东西 . 为了实现您的结果,您可以使用 Array 方法直接使用 Array ,并且您不需要使用 Observable 运算符 . 请不要将我上面使用的分组逻辑的代码视为正确的实现 - 真正的实现可能会有利于使用 lodash 之类的东西进行分组,但我不想让事情复杂化 .

    这是你的原始代码

    getData() {
        return this._dataService.getVersions().pipe(
          mergeMap((versions: Array<any>) => versions),
          groupBy((version:IVersion) => version.type),
          mergeMap(group => group.pipe(
            toArray(),
            map(versions=> {
              return {
                type: group.key,
                versions: versions
              }
            }),
            toArray()
          )),
          reduce((acc, v) => acc.concat(v), [])
        )
      }
    

    这是做什么的

    • 您创建了一个对象流,将第一个 mergeMap 应用于服务返回的数组

    • 通过 groupBy 运算符创建一个新的Observable,它会发出

    • 根据您的逻辑分组的对象然后您输入第二个更复杂的 mergeMap ,它接受 groupBy 发出的数组,将它们转换为对象流,只是立即通过第一个 toArray 将它们转换为数组,然后转换为到类型为{type:string,versions:[]}的Object,然后最终再次调用 toArray 来创建一个Array数组

    • 你要做的最后一件事是运行 reduce 来创建你的最终数组

    为什么它只适用于 take ?因为 groupBy 仅在其源Observable完成时才执行,这对您有意义想要分组一组有限的东西 . 同样_1183212_ Observable运算符 .

    take 是一种完成源Observable的方法 .

相关问题