我有一个名为flightData2015的spark数据帧,格式如下:

+--------------------------+---------------------+-------+
| Destination_country_name | Origin_country_name | count |
+--------------------------+---------------------+-------+
| United States            | Romania             |    15 |
| United States            | Croatia             |     1 |
| United States            | Ireland             |    15 |
| Egypt                    | United States       |    10 |
+--------------------------+---------------------+-------+

我想得到所有具有最大计数的行 . 所以在上面的例子中我将得到结果:

+--------------------------+---------------------+-------+
| Destination_country_name | Origin_country_name | count |
+--------------------------+---------------------+-------+
| United States            | Romania             |    15 |
| United States            | Ireland             |    15 |
+--------------------------+---------------------+-------+

我可以通过SparkSQL执行此操作,如下所示:

spark.sql("select * from flight_data_2015 where count = (select max(count) from flight_data_2015)")

但是,正如我在检查执行计划时所预期的那样,我发现数据集上有多次传递 .

== Physical Plan ==
*(1) Project [DEST_COUNTRY_NAME#10, ORIGIN_COUNTRY_NAME#11, count#12]
+- *(1) Filter (isnotnull(count#12) && (count#12 = Subquery subquery209))
   :  +- Subquery subquery209
   :     +- *(2) HashAggregate(keys=[], functions=[max(count#12)])
   :        +- Exchange SinglePartition
   :           +- *(1) HashAggregate(keys=[], functions=[partial_max(count#12)])
   :              +- *(1) FileScan csv [count#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/utk/Documents/Spark-The-Definitive-Guide/data/flight-data/csv/2..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<count:int>
   +- *(1) FileScan csv [DEST_COUNTRY_NAME#10,ORIGIN_COUNTRY_NAME#11,count#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/utk/Documents/Spark-The-Definitive-Guide/data/flight-data/csv/2..., PartitionFilters: [], PushedFilters: [IsNotNull(count)], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>
         +- Subquery subquery209
            +- *(2) HashAggregate(keys=[], functions=[max(count#12)])
               +- Exchange SinglePartition
                  +- *(1) HashAggregate(keys=[], functions=[partial_max(count#12)])
                     +- *(1) FileScan csv [count#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/utk/Documents/Spark-The-Definitive-Guide/data/flight-data/csv/2..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<count:int>

我想知道是否有办法一次性完成 . 如果没有,使用和不使用SparkSQL的最佳方法是什么 .

另请注意,数据框实际上有超过20亿行,因此无法将所有内容转移到一个分区 .