我有一个角度6应用程序,我最初设置我的后端与休息api,但我开始转换部件使用socket.io .
当我从我的rest api返回数据时,以下工作:
this.http.get(api_url + '/versions/entity/' + entityId).pipe(
mergeMap((versions:IVersion[]) => versions),
groupBy((version:IVersion) => version.type),
mergeMap(group => group.pipe(
toArray(),
map(versions=> {
return {
type: group.key,
versions: versions
}
}),
toArray()
)),
reduce((acc, v) => acc.concat(v), [])
);
快递路线:
router.get('/entity/:entityId', (req, res) => {
const entityId = req.params.entityId;
Version.getVersionsByEntity(entityId, (err, versions) => {
if (err) {
res.json({success: false, msg: err});
} else {
res.json(versions);
}
});
});
哪个用mongoose从我的mongo数据库中获取数据:
export function getVersionsByEntity(entityId, callback) {
console.log('models/version - get versions by entity');
Version.find({'entity.entityId': entityId})
.exec(callback);
}
但是,当我使用socket.io进行完全相同的调用时,observable不会返回任何内容 . 我猜它是因为它永远不会完成?成功传输数据时,http呼叫是否发送“完整”消息?
套接字从此服务发送:
getVersionsByEntity(entityId): Observable<IVersion[]> {
// create observable to list to refreshJobs message
let observable = new Observable(observer => {
this._socketService.socket.on('versionsByEntity', (data) => {
observer.next(data);
});
});
this._socketService.event('versionsByEntity', entityId);
return <Observable<IVersion[]>> observable;
}
从服务器调用相同的mongoose函数 .
从(套接字)服务返回的observable确实包含数据,只有在我添加 toArray()
函数时它才会在我订阅时从不打印任何内容 .
有人可以帮我解决这个问题并解释其背后的理论吗?它没有完成吗? http是否使用Angular发送“已完成”消息?
编辑:我已经创建了一个简单的stackblitz,它正在做我想要的但我想从_dataService中删除take(1)因为我可能想要更新数据所以我想保持observable打开 - https://stackblitz.com/edit/rxjs-toarray-problem
编辑2:我接近用扫描操作符替换为Array,但它似乎是为阵列发射两次 . reduce()发出正确的数据,但似乎只发出完整(如toArray),所以它没有更好 - https://stackblitz.com/edit/rxjs-toarray-problem-tpeguu
2 回答
处理事件流有点不同 . 我试图提出一些代码:
主要思想是将所有内容包装在
defer
中,这将使Observable变得懒惰,并且只在订阅时调用socketService.event
(您的原始实现通过立即调用socketService.event
而急切) . 实施急切可能会产生意想不到的后果 - 如果Observable订阅得太晚,很容易错过事件 .我还建议使用
fromEvent
Observable工厂 - 它处理事件监听器设置和拆除 .最后在第一次发射后完成Observable我已添加
take(1)
- 这会将排放数量限制为1并取消订阅Observable .除了@ m1ch4ls所说的,你必须考虑
toArray
的工作原理 .toArray
将Observable通知的所有数据转换为此类数据的数组 . 为了使其工作,Observable必须完成 .Angular http客户端返回的Observable始终在第一次通知后完成,因此
toArray
可以正常工作 . 一个socket.io流在关闭时完成,因此在这样的流上使用toArray
可能会导致在流未关闭的情况下永远不会从Observable中获取任何值 .另一方面,如果你想在一次通知后关闭流,如果你使用
take(1)
就会发生这种情况,那么你最好考虑使用http请求 . 套接字流被认为是一种长生活 Channels ,因此如果你必须在传输一条消息后不得不关闭它们,它就不适合它们的性质 .UPDATED VERSION after comment
这是没有
take
的代码要理解的关键点是,您的服务正在为您返回一些东西 . 为了实现您的结果,您可以使用
Array
方法直接使用Array
,并且您不需要使用Observable
运算符 . 请不要将我上面使用的分组逻辑的代码视为正确的实现 - 真正的实现可能会有利于使用lodash
之类的东西进行分组,但我不想让事情复杂化 .这是你的原始代码
这是做什么的
您创建了一个对象流,将第一个
mergeMap
应用于服务返回的数组通过
groupBy
运算符创建一个新的Observable,它会发出根据您的逻辑分组的对象然后您输入第二个更复杂的
mergeMap
,它接受groupBy
发出的数组,将它们转换为对象流,只是立即通过第一个toArray
将它们转换为数组,然后转换为到类型为{type:string,versions:[]}的Object,然后最终再次调用toArray
来创建一个Array数组你要做的最后一件事是运行
reduce
来创建你的最终数组为什么它只适用于
take
?因为groupBy
仅在其源Observable完成时才执行,这对您有意义想要分组一组有限的东西 . 同样_1183212_ Observable运算符 .take
是一种完成源Observable的方法 .