首页 文章

如果删除复合类型列中的值,则无法连接新的cassandra节点

提问于
浏览
0

我在新节点system.log上获得一个 RuntimeException 来引导一个新的DC .

Is it a bug of Cassandra 2.0.9?

Log System.out - Buffer.limit中由IllegalArgumentException引起的RuntimeException:

INFO [NonPeriodicTasks:1] 2014-08-26 15:43:01,030 SecondaryIndexManager.java (line 137) Submitting index build of [myColumnFamily.myColumnFamily_myColumn] for data in SSTableReader(path='/var/lib/cassandra/data/testbug/myColumnFamily/testbug-myColumnFamily-jb-1-Data.db')
ERROR [CompactionExecutor:2] 2014-08-26 15:43:01,035 CassandraDaemon.java (line 199) Exception in thread Thread[CompactionExecutor:2,1,main]
java.lang.IllegalArgumentException
    at java.nio.Buffer.limit(Buffer.java:267)
    at org.apache.cassandra.utils.ByteBufferUtil.readBytes(ByteBufferUtil.java:587)
    at org.apache.cassandra.utils.ByteBufferUtil.readBytesWithShortLength(ByteBufferUtil.java:596)
    at org.apache.cassandra.db.marshal.AbstractCompositeType.compare(AbstractCompositeType.java:61)
    at org.apache.cassandra.db.marshal.AbstractCompositeType.compare(AbstractCompositeType.java:36)
    at org.apache.cassandra.dht.LocalToken.compareTo(LocalToken.java:44)
    at org.apache.cassandra.db.DecoratedKey.compareTo(DecoratedKey.java:85)
    at org.apache.cassandra.db.DecoratedKey.compareTo(DecoratedKey.java:36)
    at java.util.concurrent.ConcurrentSkipListMap.findPredecessor(ConcurrentSkipListMap.java:727)
    at java.util.concurrent.ConcurrentSkipListMap.findNode(ConcurrentSkipListMap.java:789)
    at java.util.concurrent.ConcurrentSkipListMap.doGet(ConcurrentSkipListMap.java:828)
    at java.util.concurrent.ConcurrentSkipListMap.get(ConcurrentSkipListMap.java:1626)
    at org.apache.cassandra.db.Memtable.resolve(Memtable.java:215)
    at org.apache.cassandra.db.Memtable.put(Memtable.java:173)
    at org.apache.cassandra.db.ColumnFamilyStore.apply(ColumnFamilyStore.java:900)
    at org.apache.cassandra.db.index.AbstractSimplePerColumnSecondaryIndex.insert(AbstractSimplePerColumnSecondaryIndex.java:107)
    at org.apache.cassandra.db.index.SecondaryIndexManager.indexRow(SecondaryIndexManager.java:441)
    at org.apache.cassandra.db.Keyspace.indexRow(Keyspace.java:413)
    at org.apache.cassandra.db.index.SecondaryIndexBuilder.build(SecondaryIndexBuilder.java:62)
    at org.apache.cassandra.db.compaction.CompactionManager$9.run(CompactionManager.java:834)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
ERROR [NonPeriodicTasks:1] 2014-08-26 15:43:01,035 CassandraDaemon.java (line 199) Exception in thread Thread[NonPeriodicTasks:1,5,main]
java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException
    at org.apache.cassandra.utils.FBUtilities.waitOnFuture(FBUtilities.java:413)
    at org.apache.cassandra.db.index.SecondaryIndexManager.maybeBuildSecondaryIndexes(SecondaryIndexManager.java:142)
    at org.apache.cassandra.streaming.StreamReceiveTask$OnCompletionRunnable.run(StreamReceiveTask.java:113)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:188)
    at org.apache.cassandra.utils.FBUtilities.waitOnFuture(FBUtilities.java:409)
    ... 9 more
Caused by: java.lang.IllegalArgumentException
    at java.nio.Buffer.limit(Buffer.java:267)
    at org.apache.cassandra.utils.ByteBufferUtil.readBytes(ByteBufferUtil.java:587)
    at org.apache.cassandra.utils.ByteBufferUtil.readBytesWithShortLength(ByteBufferUtil.java:596)
    at org.apache.cassandra.db.marshal.AbstractCompositeType.compare(AbstractCompositeType.java:61)
    at org.apache.cassandra.db.marshal.AbstractCompositeType.compare(AbstractCompositeType.java:36)
    at org.apache.cassandra.dht.LocalToken.compareTo(LocalToken.java:44)
    at org.apache.cassandra.db.DecoratedKey.compareTo(DecoratedKey.java:85)
    at org.apache.cassandra.db.DecoratedKey.compareTo(DecoratedKey.java:36)
    at java.util.concurrent.ConcurrentSkipListMap.findPredecessor(ConcurrentSkipListMap.java:727)
    at java.util.concurrent.ConcurrentSkipListMap.findNode(ConcurrentSkipListMap.java:789)
    at java.util.concurrent.ConcurrentSkipListMap.doGet(ConcurrentSkipListMap.java:828)
    at java.util.concurrent.ConcurrentSkipListMap.get(ConcurrentSkipListMap.java:1626)
    at org.apache.cassandra.db.Memtable.resolve(Memtable.java:215)
    at org.apache.cassandra.db.Memtable.put(Memtable.java:173)
    at org.apache.cassandra.db.ColumnFamilyStore.apply(ColumnFamilyStore.java:900)
    at org.apache.cassandra.db.index.AbstractSimplePerColumnSecondaryIndex.insert(AbstractSimplePerColumnSecondaryIndex.java:107)
    at org.apache.cassandra.db.index.SecondaryIndexManager.indexRow(SecondaryIndexManager.java:441)
    at org.apache.cassandra.db.Keyspace.indexRow(Keyspace.java:413)
    at org.apache.cassandra.db.index.SecondaryIndexBuilder.build(SecondaryIndexBuilder.java:62)
    at org.apache.cassandra.db.compaction.CompactionManager$9.run(CompactionManager.java:834)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    ... 3 more
ERROR [CompactionExecutor:2] 2014-08-26 15:43:01,036 CassandraDaemon.java (line 199) Exception in thread Thread[CompactionExecutor:2,1,main]
java.lang.IllegalArgumentException
    at java.nio.Buffer.limit(Buffer.java:267)
    at org.apache.cassandra.utils.ByteBufferUtil.readBytes(ByteBufferUtil.java:587)
    at org.apache.cassandra.utils.ByteBufferUtil.readBytesWithShortLength(ByteBufferUtil.java:596)
    at org.apache.cassandra.db.marshal.AbstractCompositeType.compare(AbstractCompositeType.java:61)
    at org.apache.cassandra.db.marshal.AbstractCompositeType.compare(AbstractCompositeType.java:36)
    at org.apache.cassandra.dht.LocalToken.compareTo(LocalToken.java:44)
    at org.apache.cassandra.db.DecoratedKey.compareTo(DecoratedKey.java:85)
    at org.apache.cassandra.db.DecoratedKey.compareTo(DecoratedKey.java:36)
    at java.util.concurrent.ConcurrentSkipListMap.findPredecessor(ConcurrentSkipListMap.java:727)
    at java.util.concurrent.ConcurrentSkipListMap.findNode(ConcurrentSkipListMap.java:789)
    at java.util.concurrent.ConcurrentSkipListMap.doGet(ConcurrentSkipListMap.java:828)
    at java.util.concurrent.ConcurrentSkipListMap.get(ConcurrentSkipListMap.java:1626)
    at org.apache.cassandra.db.Memtable.resolve(Memtable.java:215)
    at org.apache.cassandra.db.Memtable.put(Memtable.java:173)
    at org.apache.cassandra.db.ColumnFamilyStore.apply(ColumnFamilyStore.java:900)
    at org.apache.cassandra.db.index.AbstractSimplePerColumnSecondaryIndex.insert(AbstractSimplePerColumnSecondaryIndex.java:107)
    at org.apache.cassandra.db.index.SecondaryIndexManager.indexRow(SecondaryIndexManager.java:441)
    at org.apache.cassandra.db.Keyspace.indexRow(Keyspace.java:413)
    at org.apache.cassandra.db.index.SecondaryIndexBuilder.build(SecondaryIndexBuilder.java:62)
    at org.apache.cassandra.db.compaction.CompactionManager$9.run(CompactionManager.java:834)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

我重现的步骤(Сassandra2.0.9):

  • 在一个节点数据中心DC1中,使用NetworkTopologyStrategy创建密钥空间,选项DC1 = 1,DC2 = 1 .

  • 在CompositeType(Int32Type,UUIDType)上创建具有辅助索引的CF.

  • 将任何复合值写入一行 .

  • 删除该值 .

  • 将另一个复合值写入另一行 .

  • 删除第二个值 .

  • Bootstrap在DC2中添加新节点 .

  • DC2获得异常 .

我使用SimpleStrategy和replication_factor = 3(cassandra 1.2.18)在单个3节点DC中对索引复合列进行了类似的问题 .

我使用Astyanax 2.0.1(我在Astyanax 1.56.49上测试过),这里是生成密钥空间(步骤1-6)的代码,该密钥空间在新的数据中心节点引导程序上失败 . 来源CassandraBugTest.java:

package test;

import com.netflix.astyanax.AstyanaxContext;
import com.netflix.astyanax.Cluster;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
import com.netflix.astyanax.connectionpool.exceptions.BadRequestException;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
import com.netflix.astyanax.connectionpool.impl.ConnectionPoolType;
import com.netflix.astyanax.connectionpool.impl.Slf4jConnectionPoolMonitorImpl;
import com.netflix.astyanax.ddl.ColumnDefinition;
import com.netflix.astyanax.ddl.ColumnFamilyDefinition;
import com.netflix.astyanax.ddl.KeyspaceDefinition;
import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.model.Composite;
import com.netflix.astyanax.model.ConsistencyLevel;
import com.netflix.astyanax.serializers.*;
import com.netflix.astyanax.thrift.ThriftFamilyFactory;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

@RunWith(JUnit4.class)
public class CassandraBugTest
{
    private static final String SEEDS = "dc1n1";
    private static final String CLUSTER_NAME = "Test Cluster";
    private static final String KEYSPACE_NAME = "testbug";
    private static final String COLUMN_FAMILY_NAME = "myColumnFamily";
    private static final String COLUMN_NAME = "myColumn";

    @Test
    public void test() throws ConnectionException, InterruptedException
    {
        AstyanaxContext.Builder builder = new AstyanaxContext.Builder()
                .forCluster(CLUSTER_NAME)
                .forKeyspace(KEYSPACE_NAME)
                .withAstyanaxConfiguration(new AstyanaxConfigurationImpl()
                                .setDiscoveryType(NodeDiscoveryType.RING_DESCRIBE)
                                .setConnectionPoolType(ConnectionPoolType.TOKEN_AWARE)
                                .setCqlVersion("3.1.7")
                                .setTargetCassandraVersion("2.0.9")
                                .setDefaultWriteConsistencyLevel(ConsistencyLevel.CL_LOCAL_QUORUM)
                                .setDefaultReadConsistencyLevel(ConsistencyLevel.CL_LOCAL_QUORUM)
                )
                .withConnectionPoolConfiguration(new ConnectionPoolConfigurationImpl("TaxiClusterConnectionPool")
                                .setInitConnsPerHost(1)
                                .setMaxConnsPerHost(8)
                                .setSeeds(SEEDS)
                )
                .withConnectionPoolMonitor(new Slf4jConnectionPoolMonitorImpl());

        AstyanaxContext<Cluster> clusterContext = builder.buildCluster(ThriftFamilyFactory.getInstance());
        clusterContext.start();
        Cluster cluster = clusterContext.getClient();

        Map<String, String> options = new HashMap<>();
        options.put("DC1", "1");
        options.put("DC2", "1");

        KeyspaceDefinition keyspaceDef = cluster.makeKeyspaceDefinition()
                                                .setName(KEYSPACE_NAME)
                                                .setStrategyClass("NetworkTopologyStrategy")
                                                .setStrategyOptions(options);

        try
        {
            cluster.dropKeyspace(KEYSPACE_NAME);
        }
        catch (BadRequestException e)
        {
        }

        cluster.addKeyspace(keyspaceDef);

        ColumnFamily<UUID, String> profiles = new ColumnFamily<>(COLUMN_FAMILY_NAME, UUIDSerializer.get(), StringSerializer.get());

        ColumnFamilyDefinition columnFamilyDef = cluster.makeColumnFamilyDefinition()
                                                        .setName(profiles.getName())
                                                        .setKeyspace(KEYSPACE_NAME)
                                                        .setKeyValidationClass(profiles.getKeySerializer().getComparatorType().getTypeName())
                                                        .setComparatorType(profiles.getColumnSerializer().getComparatorType().getTypeName());

        ColumnDefinition columnDef = cluster.makeColumnDefinition()
                                            .setValidationClass(
                                                    ComparatorType.COMPOSITETYPE.getTypeName() +
                                                    "(" + ComparatorType.INT32TYPE.getTypeName() +
                                                    "," + ComparatorType.UUIDTYPE.getTypeName() +
                                                    ")"
                                            )
//                                             Use BYTESTYPE here for validation class for workaround, then no bug occurs.
//                                            .setValidationClass(ComparatorType.BYTESTYPE.getTypeName())
                                            .setName(COLUMN_NAME)
                                            .setIndex(COLUMN_FAMILY_NAME + "_" + COLUMN_NAME, "KEYS");

        columnFamilyDef.addColumnDefinition(columnDef);

        cluster.addColumnFamily(columnFamilyDef);

        AstyanaxContext<Keyspace> keyspaceContext = builder.buildKeyspace(ThriftFamilyFactory.getInstance());
        keyspaceContext.start();
        Keyspace keyspace = keyspaceContext.getClient();

        for (int i = 0; i < 100; ++i)
        {
            UUID id = UUID.randomUUID();
            Composite value = new Composite();
            value.addComponent(1, IntegerSerializer.get());
            value.addComponent(UUID.randomUUID(), UUIDSerializer.get());
            value.setSerializersByPosition(IntegerSerializer.get(), UUIDSerializer.get());

            keyspace.prepareColumnMutation(profiles, id, COLUMN_NAME)
                    .putValue(value, CompositeSerializer.get(), null)
                    .execute();

            keyspace.prepareColumnMutation(profiles, id, COLUMN_NAME)
                    .deleteColumn()
                    .execute();
        }
    }
}

1 回答

  • 0

    作为一种解决方法,我发现如果我使用BytesType验证类(也就是blob)它会起作用 . 但仍无法使用CompositeType .

相关问题