首页 文章

Spark流式传输DStream RDD以获取文件名

提问于
浏览
7

Spark streaming textFileStreamfileStream 可以监视目录并处理Dstream RDD中的新文件 .

如何在特定时间间隔内获取DStream RDD正在处理的文件名?

2 回答

  • 0

    fileStream 生成 UnionRDD of NewHadoopRDD s . 关于由 sc.newAPIHadoopFile 创建的 NewHadoopRDD 的好处是它们的 name 被设置为它们的路径 .

    以下是您可以使用该知识做的示例:

    def namedTextFileStream(ssc: StreamingContext, directory: String): DStream[String] =
      ssc.fileStream[LongWritable, Text, TextInputFormat](directory)
        .transform( rdd =>
          new UnionRDD(rdd.context,
            rdd.dependencies.map( dep =>
              dep.rdd.asInstanceOf[RDD[(LongWritable, Text)]].map(_._2.toString).setName(dep.rdd.name)
            )
          )
        )
    
    def transformByFile[U: ClassTag](unionrdd: RDD[String],
                                     transformFunc: String => RDD[String] => RDD[U]): RDD[U] = {
      new UnionRDD(unionrdd.context,
        unionrdd.dependencies.map{ dep =>
          if (dep.rdd.isEmpty) None
          else {
            val filename = dep.rdd.name
            Some(
              transformFunc(filename)(dep.rdd.asInstanceOf[RDD[String]])
                .setName(filename)
            )
          }
        }.flatten
      )
    }
    
    def main(args: Array[String]) = {
      val conf = new SparkConf()
        .setAppName("Process by file")
        .setMaster("local[2]")
    
      val ssc = new StreamingContext(conf, Seconds(30))
    
      val dstream = namesTextFileStream(ssc, "/some/directory")
    
      def byFileTransformer(filename: String)(rdd: RDD[String]): RDD[(String, String)] =
        rdd.map(line => (filename, line))
    
      val transformed = dstream.
        transform(rdd => transformByFile(rdd, byFileTransformer))
    
      // Do some stuff with transformed
    
      ssc.start()
      ssc.awaitTermination()
    }
    
  • 4

    对于那些需要一些Java代码而不是Scala的人:

    JavaPairInputDStream<LongWritable, Text> textFileStream = 
            jsc.fileStream(
                inputPath, 
                LongWritable.class, 
                Text.class,
                TextInputFormat.class, 
                FileInputDStream::defaultFilter,
                false
            );
    JavaDStream<Tuple2<String, String>> namedTextFileStream = textFileStream.transform((pairRdd, time) -> {
            UnionRDD<Tuple2<LongWritable, Text>> rdd = (UnionRDD<Tuple2<LongWritable, Text>>) pairRdd.rdd();
            List<RDD<Tuple2<LongWritable, Text>>> deps = JavaConverters.seqAsJavaListConverter(rdd.rdds()).asJava();
            List<RDD<Tuple2<String, String>>> collectedRdds = deps.stream().map( depRdd -> {
                if (depRdd.isEmpty()) {
                    return null;
                }
                JavaRDD<Tuple2<LongWritable, Text>> depJavaRdd = depRdd.toJavaRDD();
                String filename = depRdd.name();
                JavaPairRDD<String, String> newDep = JavaPairRDD.fromJavaRDD(depJavaRdd).mapToPair(t -> new Tuple2<String, String>(filename, t._2().toString())).setName(filename);
                return newDep.rdd();
            }).filter(t -> t != null).collect(Collectors.toList());
            Seq<RDD<Tuple2<String, String>>> rddSeq = JavaConverters.asScalaBufferConverter(collectedRdds).asScala().toIndexedSeq();
            ClassTag<Tuple2<String, String>> classTag = scala.reflect.ClassTag$.MODULE$.apply(Tuple2.class);
            return new UnionRDD<Tuple2<String, String>>(rdd.sparkContext(), rddSeq, classTag).toJavaRDD();
    });
    

相关问题