首页 文章

如何使用RxJava和Kotlin进行groupBy和收集?

提问于
浏览
7

我有 Observable<Rates> 和Rate只是一个简单的对象:

Rate(val value:String){}
Rates(val rates: List<Rate>)

我想将 Observable<Rates> 改为 Observable<HashMap<String,Long> .

所以例如对于费率 Rates(arrayOf(Rate("1"),Rate("2"), Rate("3"),Rate("3"), Rate("2"),Rate("2"))) 我期待结果:

(1 -> 1)
(2 -> 3)
(3 -> 2)
(4 -> 0)
(5 -> 0)

我开始创建这样的东西:

service.getRates()
        .flatMap {it-> Observable.from(it.rates) }
        .filter { !it.value.isNullOrEmpty() }
        .groupBy {it -> it.value}
        .collect({ HashMap<String,Long>()}, { b, t -> b.put(t.key, t.count???)}

但我被困在这里,我不知道所有的 Value ?如果没有5中的4,我不知道如何添加空值(0) . 有没有办法用rx做到这一点?

3 回答

  • 5

    我认为这更多是关于函数式编程而不是RxJava相关问题

    Rates -> Map<String,Int> 实现映射功能 .

    技巧:合并两个 Pair<String,Int> 列表以形成 Map<String,Int>

    val ratesToMapWithEmptyValues: (Rates) -> Map<String, Int> = { source ->
    
      //TODO: Just for demo
      val validRatesValue = arrayOf("1","2","3","4","5")
    
      mapOf(
        *validRatesValue.map { it to 0 }.toTypedArray(),
        *source.rates.groupBy(Rate::value).mapValues { it.value.size }.toList().toTypedArray()
      )
    
    }
    

    Observable.map 中应用该功能

    service.getRates()
            .map(ratesToMapWithEmptyValues)
    
  • 4

    请查看代码中的注释以获取问题的答案 .

    import rx.Observable
    
    fun main(args: Array<String>) {
        val service = Service()
    
        // This adds all keys with each key mapped to zero
        val referenceKeyCounts = Observable
            .just("1", "2", "3", "4", "5")
            .map { it to 0 }
    
        val keyCountsFromService = service.getRates()
            .flatMap { Observable.from(it.rates) }
            .filter { !it.value.isNullOrEmpty() }
            .map { it.value to 1 } // map each occurrence of key to 1
    
        Observable.concat(referenceKeyCounts, keyCountsFromService)
            .groupBy { it.first }
            .flatMap { group ->  // this converts GroupedObservable to final values
                group.reduce(0, { acc, pair -> acc + pair.second }) // add instead of counting
                    .map { group.key to it }
            }
            .subscribe(::println)
    
    }
    
    class Service {
        fun getRates(): Observable<Rates> = Observable.just(Rates(listOf(
            Rate("1"), Rate("2"), Rate("3"), Rate("3"), Rate("2"), Rate("2")
        )))
    }
    
    class Rate(val value: String)
    
    class Rates(val rates: List<Rate>)
    
  • 0

    诀窍是在GroupedObservable上使用count,因为它只在源observable完成时发出单个值:

    enter image description here

    从那里开始:

    rates
      .flatMap { Observable.from(it.rates) }
      .filter { !it.value.isNullOrEmpty() }
      .groupBy { it.value }
      .flatMap { group -> group.count().map { group.key to it } } // list "1"->1, "2"->3, ...
      .mergeWith(Observable.from((1..5).map { it.toString() to 0 })) // defaults "4"->0
      .reduce(mutableMapOf<String, Int>()) { acc, cur ->
          acc.apply {
            val (key, count) = cur  
            this[key] = (this[key] ?: 0) + count // add counts
          }
      }.subscribe { countedRates -> 
        println(countedRates)
    }
    

相关问题