首页 文章

按日期从Spark中读取S3中的多个文件

提问于
浏览
16

说明

我有一个应用程序,它将数据发送到AWS Kinesis Firehose,并将数据写入我的S3存储桶 . Firehose使用“yyyy / MM / dd / HH”格式来编写文件 .

就像在这个示例S3路径中一样:

s3://mybucket/2016/07/29/12

现在我有一个用Scala编写的Spark应用程序,我需要从特定时间段读取数据 . 我有开始和结束日期 . 数据采用JSON格式,这就是我使用 sqlContext.read.json() 而不是 sc.textFile() 的原因 .

如何快速有效地读取数据?

我尝试了什么?

  • Wildcards - 我可以选择特定日期或特定月份所有日期的所有小时数据,例如:
val df = sqlContext.read.json("s3://mybucket/2016/07/29/*")
val df = sqlContext.read.json("s3://mybucket/2016/07/*/*")

但是,如果我必须从几天的日期读取数据,例如2016-07-29 - 2016-07-30我不能以相同的方式使用通配符方法 .

这让我想到了下一点......

  • Using multiple paths 或由 samthebestthis解决方案中显示的目录CSV . 看起来用逗号分隔目录只适用于 sc.textFile() 而不是 sqlContext.read.json() .

  • Union - cloud 的上一个链接的第二个解决方案建议单独读取每个目录,然后将它们合并在一起 . 虽然他建议联合RDD-s,但也可以选择联合DataFrames . 如果我手动生成给定日期时间段的日期字符串,那么我可能会创建一个不存在的路径,而不是忽略它,整个读取失败 . 相反,我可以使用AWS SDK并使用AmazonS3Client中的函数 listObjects 来获取上一个链接中 iMKanchwala 解决方案中的所有密钥 .

唯一的问题是我的数据不断变化 . 如果 read.json() 函数将所有数据作为单个参数获取,它将读取所有必需的数据,并且足够智能从数据中推断出json模式 . 如果我分别读取2个目录并且它们的模式不匹配,那么我认为联合这两个数据帧会成为一个问题 .

nhahtdh This解决方案 nhahtdh 比选项 12 好一点,因为它们提供了更详细地指定日期和目录的选项,并且作为单个"path",因此它也适用于 read.json() .

但同样,关于丢失的目录会出现一个熟悉的问题 . 假设我想要从20.07到30.07的所有数据,我可以这样声明:

val df = sqlContext.read.json("s3://mybucket/2016/07/[20-30]/*")

但是如果我从7月25日开始丢失数据,那么路径 ..16/07/25/ 就不存在了,整个功能都失败了 .

显然,当请求的时间段是25.11.2015-12.02.2016时,它会变得更加困难,那么我需要以编程方式(在我的Scala脚本中)创建一个类似这样的字符串路径:

"s3://mybucket/{2015/11/[25-30],2015/12/*,2016/01/*,2016/02/[01-12]}/*"

通过创建它,我会以某种方式确定这些25-30和01-12间隔都有相应的路径,如果缺少一个,它会再次失败 . (幸运的是,Asterisk会处理丢失的目录,因为它会读取存在的所有内容)

How can I read all the necessary data from a single directory path all at once without the possibility of failing because of a missing directory between some date interval?

1 回答

  • 11

    有一个更简单的解决方案 . 如果你看DataFrameReader API,你会注意到有一个 .json(paths: String*) 方法 . 只需构建一个你想要的路径集合,然后根据你的喜好使用不是,然后调用方法,例如,

    val paths: Seq[String] = ...
    val df = sqlContext.read.json(paths: _*)
    

相关问题