首页 文章

并行识别最新记录

提问于
浏览
2

我们使用U-SQL从一组.csv文件中提取传感器数据 . 每条记录包含传感器ID,测量时间和值,以及收到记录的时间:

+----------+---------------------+------------------+---------------------+
| SensorID |   MeasurementTime   | MeasurementValue |    ReceivedTime     |
+----------+---------------------+------------------+---------------------+
| xxx      | 2017-09-10 11:00:00 |           12.342 | 2017-09-19 14:25:17 |
| xxx      | 2017-09-10 12:00:00 |           14.654 | 2017-09-19 14:25:17 |
| yyy      | 2017-09-10 11:00:00 |            1.054 | 2017-09-19 14:25:17 |
| yyy      | 2017-09-10 12:00:00 |            1.354 | 2017-09-19 14:25:17 |
  ...
| xxx      | 2017-09-10 11:00:00 |           10.261 | 2017-09-19 15:25:17 |
+----------+---------------------+------------------+---------------------+

这些文件存储在ADLS中基于测量时间的日期部分的路径中,因此上面看到的数据可以在 /Data/2017/09/10/measurements.csv 中找到,其中前四行是在9月19日14:25:17写的,最后一行是在一小时后,在15:25:17附加的 .

如上例所示,可以在以后接收相同SensorID和MeasurementTime的新值 . 每个分区包含几百万行,每天有几千行附加到少量分区 . 我们希望每24小时运行一次批处理作业,对于任何给定的SensorID和MeasurementTime,它将仅输出最新值 . 为此,我们使用看起来类似于此的U-SQL脚本:

@newestMeasurements_addRN =
    SELECT *,
           ROW_NUMBER() OVER (PARTITION BY PDate, 
                                           SensorId, 
                                           MeasurementTime
                              ORDER BY ReceivedTime DESC) AS MeasurementRN;

@newestMeasurements =
    SELECT SensorId,
           MeasurementTime,
           MeasurementValue
    FROM @newestMeasurements_addRN
    WHERE MeasurementRN == 1;

这里, PDate 是从CSV文件路径中的yyyy / MM / dd推断出的虚拟列(等于MeasurementTime的日期部分) .

现在,因为我们在窗口函数的 PARTITION BY 部分中使用 PDate ,我预计此操作可以并行化,因为我们在尝试查找任何给定SensorID和MeasurementTime的最新记录时不必考虑不同的日期(分区) . 不幸的是,在工作图中看起来似乎并非如此:

enter image description here

在这里,我们从4个不同的日期提取数据 . 每个Extract顶点输出完整数量的记录,只将最新记录的任务保留到底部的Combine顶点,表明 ROW_NUMBER 和后续过滤不会并行发生 .

  • 这是 ROW_NUMBER 执行中的错误吗?

  • 我们可以使用不同的U-SQL技术来确保并行性吗?

1 回答

  • 1

    我设法找到了一个可用的解决方案,其中我封装了U-SQL,它检测U-SQL存储过程中的最新测量值,该过程采用与 pdate 对应的值作为输入参数 .

    然后,我简单地执行这个存储过程多次,带有我想要并行处理的日期列表:

    DetectLatestMeasurements(20170910);
    DetectLatestMeasurements(20170911);
    DetectLatestMeasurements(20170912);
    DetectLatestMeasurements(20170913);
    

    存储的proc处理一天数据的EXTRACT,转换和OUTPUT,因此这可以完成工作,并且按照我期望的方式进行并行处理 .

相关问题