首页 文章

何时创建JavaRx observable,它向所有订阅者发出同步(或阻塞)列表?

提问于
浏览
0

我对RxJava,android和Java都很新...我正在尝试创建一个发出同步List的observable . 哪里错了?

public class CurrentLocationHolder {

    private List<LocationPoint> locationBuffer = Collections.synchronizedList(new ArrayList<>());

    public final PublishSubject<List<LocationPoint>> locationBufferChanged = PublishSubject.create();

    public Observable<List<LocationPoint>> observeLocationBufferChanged(boolean emitCurrentValue) {
        return emitCurrentValue ? locationBufferChanged.startWith(locationPointsBuffer) : locationBufferChanged;
    }

    public void setLocation(LocationPoint point) {

        locationBuffer.add(point);

        if (locationBuffer.size() >= 10) {
            locationBufferChanged.onNext(this.locationBuffer);
        }

        locationBufferChanged.onCompleted();

        locationBuffer.clear();
    }
}

这是我的订户#1对象:

public class DatabaseManager {
    private Subscription locationBufferSubscription;
    private static DatabaseManager instance;

    public static void InitInstance() {
        if (instance == null) {
            instance = new DatabaseManager();

        instance.changeLocationBufferSubscription = 
            CurrentLocationHolder.getInstance().observeLocationBufferChanged()
                .subscribe(locArray -> {
                    ActiveAndroid.beginTransaction();
                    try {
                        for (int i = 0; i < locArray.size(); i++) {
                            locArray.get(i).save();
                        }
                        ActiveAndroid.setTransactionSuccessful();
                    } finally {
                        ActiveAndroid.endTransaction();
                    }
            });
        }
    }
}

那么,如果我要创建另一个订阅者,例如HttpManager,它也将监听缓冲区更改,那么我的缓冲区是否会对所有侦听器同步?在所有听众处理所有10个LocationPoints后,我的列表是否会被清除?并不是我的代码有点矫枉过正吗?

1 回答

  • 0

    所以,答案是在主题上调用onNext时将locationBuffer复制到新的ArrayList:

    if (locationBuffer.size() >= 10) {
        locationBufferChanged.onNext(new ArrayList<>(this.locationBuffer));
    }
    locationBuffer.clear();
    

相关问题