首页 文章

使用hadoop中的复合主键插入cassandra表

提问于
浏览
1

我正在使用Apache Hadoop,MapReduce和Cassandra来运行从Cassandra表读入的MapReduce作业,并输出到另一个Cassandra表 .

我有几个作业输出到一个主键的表 . 例如,该表用于计算每种类型的单词的数量,只有一个单词 .

CREATE TABLE word_count(
        word text,
        count int,
        PRIMARY KEY(text)
    ) WITH COMPACT STORAGE;

关联的reduce类看起来有点像这样:

public static class ReducerToCassandra 
    extends Reducer<Text, IntWritable, ByteBuffer, List<Mutation>>
{
    public void reduce(Text word, Iterable<IntWritable> values, Context context) 
        throws IOException, InterruptedException
    {
        int sum = 0;
        for (IntWritable val : values){
            sum += val.get();
        }

        org.apache.cassandra.thrift.Column c 
                = new org.apache.cassandra.thrift.Column();
        c.setName(ByteBufferUtil.bytes("count");
        c.setValue(ByteBufferUtil.bytes(sum));
        c.setTimestamp(System.currentTimeMillis());

        Mutation mutation = new Mutation();
        mutation.setColumn_or_supercolumn(new ColumnOrSuperColumn());
        mutation.column_or_supercolumn.setColumn(c);

        ByteBuffer keyByteBuffer = ByteBufferUtil.bytes(word.toString());
        context.write(keyByteBuffer, Collections.singletonList(mutation));
    }
}

如果我想添加一个额外的列,那么我只需要在 reduce 已经输出的 List<Mutation> 中添加另一个突变,但是我无法知道如何输出到复合主键中具有新列的表 . 例如,此表与上面的表相同,但也会对单词及其发布时间进行索引 .

CREATE TABLE word_count(
        word text,
        publication_hour bigint,
        count int,
        PRIMARY KEY(word, publication_hour)
    ) WITH COMPACT STORAGE;

我尝试了几种不同的方法,比如尝试输出自定义 WritableComparable (同时包含一个单词和一小时)并相应地更新 classmethod 签名和 job 配置,但这会使 reduce 在尝试强制转换时抛出 ClassCastException 自定义 WritableComparableByteBuffer .

我尝试使用 Builder 构建适当的列名 .

public static class ReducerToCassandra 
    //              MappedKey     MappedValue  ReducedKey  ReducedValues
    extends Reducer<WordHourPair, IntWritable, ByteBuffer, List<Mutation>>
{
    //                 MappedKey                  Values with the key wordHourPair
    public void reduce(WordHourPair wordHourPair, Iterable<IntWritable> values, 
    Context context) 
        throws IOException, InterruptedException
    {
        int sum = 0;
        for (IntWritable val : values){
        sum += val.get();
        }
        long hour = wordHourPair.getHourLong();

        org.apache.cassandra.thrift.Column c 
            = new org.apache.cassandra.thrift.Column();
        c.setName(ByteBufferUtil.bytes("count");
        c.setValue(ByteBufferUtil.bytes(sum));
        c.setTimestamp(System.currentTimeMillis());

        Mutation mutation = new Mutation();
        mutation.setColumn_or_supercolumn(new ColumnOrSuperColumn());
        mutation.column_or_supercolumn.setColumn(c);

        //New Code
        List<AbstractType<?>> keyTypes = new ArrayList<AbstractType<?>>(); 
        keyTypes.add(UTF8Type.instance);
        keyTypes.add(LongType.instance);
        CompositeType compositeKey = CompositeType.getInstance(keyTypes);

        Builder builder = new Builder(compositeKey);
        builder.add(ByteBufferUtil.bytes(word.toString());
        builder.add(ByteBufferUtil.bytes(hour));

        ByteBuffer keyByteBuffer = builder.build();
        context.write(keyByteBuffer, Collections.singletonList(mutation));
    }
}

但那引发了一场 IOException

java.io.IOException: InvalidRequestException(why:String didn't validate.)
    at org.apache.cassandra.hadoop.ColumnFamilyRecordWriter$RangeClient.run(ColumnFamilyRecordWriter.java:204)
Caused by: InvalidRequestException(why:String didn't validate.)
    at org.apache.cassandra.thrift.Cassandra$batch_mutate_result$batch_mutate_resultStandardScheme.read(Cassandra.java:28232)
    at org.apache.cassandra.thrift.Cassandra$batch_mutate_result$batch_mutate_resultStandardScheme.read(Cassandra.java:28218)
    at org.apache.cassandra.thrift.Cassandra$batch_mutate_result.read(Cassandra.java:28152)
    at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78)
    at org.apache.cassandra.thrift.Cassandra$Client.recv_batch_mutate(Cassandra.java:1069)
    at org.apache.cassandra.thrift.Cassandra$Client.batch_mutate(Cassandra.java:1055)
    at org.apache.cassandra.hadoop.ColumnFamilyRecordWriter$RangeClient.run(ColumnFamilyRecordWriter.java:196)

这个问题:Cassandra CQL3 composite key not written by Hadoop reducer似乎展示了我正在寻找的那种代码,但是它调用 context.write ,参数类型为 HashMap, ByteBuffer ,而我'm not sure how I' d使 context.write 接受这些参数 .

如何将我想要的数据(word-hour键,int值)放入我的表中?

1 回答

  • 1

    答案是使用Cassandra的CQL接口,而不是Thrift API .

    现在我可以通过将我的reduce类的输出键/值类声明为“Map,List”来写入具有复合键的表,然后为复合键创建一个Map,其中Key(类型为string)是一列name和Value(类型为ByteBuffer)是使用ByteBufferUtil转换为ByteBuffer的列值 .

    例如,要写入如此定义的表:

    CREATE TABLE foo (
        customer_id uuid,
        time timestamp,
        my_value int,
        PRIMARY KEY (customer_id, time)
    )
    

    我可以写:

    String customerID = "the customer's id";
    long time = DateTime.now().getMillis();
    int myValue = 1;
    
    Map<String, ByteBuffer> key = new Map<String, ByteBuffer>();
    key.put("customer_id",ByteBufferUtil.bytes(customerID));
    key.put("time",ByteBufferUtil.bytes(time));
    
    List<ByteBuffer> values = Collections.singletonList(ByteBufferUtil.bytes(myValue));
    
    context.write(key, values);
    

相关问题