我对RxJava,android和Java都很新...我正在尝试创建一个发出同步List的observable . 哪里错了?
public class CurrentLocationHolder {
private List<LocationPoint> locationBuffer = Collections.synchronizedList(new ArrayList<>());
public final PublishSubject<List<LocationPoint>> locationBufferChanged = PublishSubject.create();
public Observable<List<LocationPoint>> observeLocationBufferChanged(boolean emitCurrentValue) {
return emitCurrentValue ? locationBufferChanged.startWith(locationPointsBuffer) : locationBufferChanged;
}
public void setLocation(LocationPoint point) {
locationBuffer.add(point);
if (locationBuffer.size() >= 10) {
locationBufferChanged.onNext(this.locationBuffer);
}
locationBufferChanged.onCompleted();
locationBuffer.clear();
}
}
这是我的订户#1对象:
public class DatabaseManager {
private Subscription locationBufferSubscription;
private static DatabaseManager instance;
public static void InitInstance() {
if (instance == null) {
instance = new DatabaseManager();
instance.changeLocationBufferSubscription =
CurrentLocationHolder.getInstance().observeLocationBufferChanged()
.subscribe(locArray -> {
ActiveAndroid.beginTransaction();
try {
for (int i = 0; i < locArray.size(); i++) {
locArray.get(i).save();
}
ActiveAndroid.setTransactionSuccessful();
} finally {
ActiveAndroid.endTransaction();
}
});
}
}
}
那么,如果我要创建另一个订阅者,例如HttpManager,它也将监听缓冲区更改,那么我的缓冲区是否会对所有侦听器同步?在所有听众处理所有10个LocationPoints后,我的列表是否会被清除?并不是我的代码有点矫枉过正吗?
1 回答
所以,答案是在主题上调用onNext时将locationBuffer复制到新的ArrayList: