首页 文章

在Spark SQL中使用带有任意行的数据集上的映射

提问于
浏览
1

我正在尝试在任意数据集上使用Dataframe映射函数 . 但是我不明白你将如何从Row-> Row映射 . spark sql文档中没有给出任意数据的示例:

Dataset<Row> original_data = ...
Dataset<Row> changed_data = original_data.map(new MapFunction<Row,Row>{
            @Override
            public Row call(Row row) throws Exception {
                Row newRow = RowFactory.create(obj1,obj2);
                return newRow;
            }
}, Encoders.bean(Row.class));

但是这不起作用,因为需要某种编码器?如何映射到通用行?

1 回答

  • 1

    如果obj1和obj2不是基本类型,则将其模式表示为StructType以创建行编码器 . 我建议不要使用Row类型,创建存储obj1和obj2的自定义bean,然后在 map 转换中使用该自定义bean编码器 .

    Row type:

    StructType customStructType = new StructType();
            customStructType = customStructType.add("obj1", DataTypes.< type>, false);
            customStructType = customStructType.add("obj2", DataTypes.< type >, false);
            ExpressionEncoder<Row> customTypeEncoder = null;
    
            Dataset<Row> changed_data = original_data.map(row->{
                return RowFactory.create(obj1,obj2);;
        }, RowEncoder.apply(customStructType));
    

    Custom Bean type:

    class CustomBean implements ....{
        Object obj1;
        Object obj2;
    ....
    }
    
    Dataset<CustomBean> changed_data = original_data.map(row->{
                    return new CustomBean(obj1,obj2);
            }, Encoders.bean(CustomBean));
    

相关问题