首页 文章

Cassandra:如何使用CQL插入具有良好性能的新宽行

提问于
浏览
7

我正在评估 Cassandra . 我正在使用datastax驱动程序和CQL .

我想存储一些具有以下内部结构的数据,其中每个更新的名称都不同 .

+-------+-------+-------+-------+-------+-------+
|       | name1 | name2 | name3 | ...   | nameN |
| time  +-------+-------+-------+-------+-------+
|       | val1  | val2  | val3  | ...   | valN  |
+-------+-------+-------+-------|-------+-------+

所以时间应该是列键,name应该是行键 . 我用来创建这个表的CQL语句是:

CREATE TABLE IF NOT EXISTS test.wide (
  time varchar,
  name varchar,
  value varchar,
  PRIMARY KEY (time,name))
  WITH COMPACT STORAGE

我希望架构以这种方式易于查询 . 我还必须偶尔存储超过65000行的更新 . 因此,使用cassandra list / set / map数据类型不是一种选择 .

我必须能够每秒处理至少1000个宽行插入,具有变化但大(~1000)个名称/值对的数量 .

问题如下:我编写了一个简单的基准测试,可以执行1000个宽行插入,每个插入10000个名称/值对 . 我使用CQL和datastax驱动程序的性能非常慢,而不使用CQL的版本(使用astyanax)在同一测试集群上具有良好的性能 .

我已经阅读了这个related question,并且在这个问题的接受答案中表明你应该能够通过使用 batch prepared statements 原型并快速创建一个新的宽行,这些行可以在cassandra 2中找到 .

所以我尝试使用它们,但我的性能仍然很慢(对于在localhost上运行的小型三节点集群,每秒两次插入) . 我错过了一些明显的东西,还是我必须使用较低级别的thrift API? I have implemented the same insert with a ColumnListMutation in astyanax, and I get about 30 inserts per second.

如果我必须使用较低级别的thrift API:

  • 它实际上是否已被弃用,或者使用起来不方便,因为它的级别较低?

  • 我能用CQL查询用thrift api创建的表吗?

下面是scala中的一个独立代码示例 . 它只是创建一个批处理语句,用于插入具有10000列的宽行并重复插入性能 .

我玩了BatchStatement的选项和一致性级别,但没有什么能让我获得更好的性能 .

我唯一的解释是,尽管批处理由预准备语句组成,但条目将逐个添加到行中 .


package cassandra

import com.datastax.driver.core._

object CassandraTestMinimized extends App {

  val keyspace = "test"
  val table = "wide"
  val tableName = s"$keyspace.$table"

  def createKeyspace = s"""
CREATE KEYSPACE IF NOT EXISTS ${keyspace}
WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }
"""

  def createWideTable = s"""
CREATE TABLE IF NOT EXISTS ${tableName} (
time varchar,
name varchar,
value varchar,
PRIMARY KEY (time,name))
WITH COMPACT STORAGE
"""

  def writeTimeNameValue(time: String) = s"""
INSERT INTO ${tableName} (time, name, value)
VALUES ('$time', ?, ?)
"""

  val cluster = Cluster.builder.addContactPoints("127.0.0.1").build
  val session = cluster.connect()

  session.execute(createKeyspace)
  session.execute(createWideTable)

  for(i<-0 until 1000) {
    val entries =
      for {
        i <- 0 until 10000
        name = i.toString
        value = name
      } yield name -> value
    val batchPreparedStatement = writeMap(i, entries)
    val t0 = System.nanoTime()
    session.execute(batchPreparedStatement)
    val dt = System.nanoTime() - t0
    println(i + " " + (dt/1.0e9))
  }

  def writeMap(time: Long, update: Seq[(String, String)]) : BatchStatement = {
    val template = session
      .prepare(writeTimeNameValue(time.toString))
      .setConsistencyLevel(ConsistencyLevel.ONE)
    val batch = new BatchStatement(BatchStatement.Type.UNLOGGED)
    for ((k, v) <- update)
      batch.add(template.bind(k, v))
    batch
  }
}

这是astyanax代码(从astyanax example修改),基本上相同的东西快15倍 . 请注意,这也不使用异步调用,因此这是一个公平的比较 . 这需要列族已经存在,因为我还没有弄清楚如何使用astyanax创建它,并且该示例没有任何用于创建columnfamily的代码 .

package cassandra;

import java.util.Iterator;

import com.netflix.astyanax.ColumnListMutation;
import com.netflix.astyanax.serializers.AsciiSerializer;
import com.netflix.astyanax.serializers.LongSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.netflix.astyanax.AstyanaxContext;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
import com.netflix.astyanax.connectionpool.OperationResult;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
import com.netflix.astyanax.model.Column;
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.model.ColumnList;
import com.netflix.astyanax.thrift.ThriftFamilyFactory;

public class AstClient {
    private static final Logger logger = LoggerFactory.getLogger(AstClient.class);

    private AstyanaxContext<Keyspace> context;
    private Keyspace keyspace;
    private ColumnFamily<Long, String> EMP_CF;
    private static final String EMP_CF_NAME = "employees2";

    public void init() {
        logger.debug("init()");

        context = new AstyanaxContext.Builder()
                .forCluster("Test Cluster")
                .forKeyspace("test1")
                .withAstyanaxConfiguration(new AstyanaxConfigurationImpl()
                        .setDiscoveryType(NodeDiscoveryType.RING_DESCRIBE)
                )
                .withConnectionPoolConfiguration(new ConnectionPoolConfigurationImpl("MyConnectionPool")
                        .setPort(9160)
                        .setMaxConnsPerHost(1)
                        .setSeeds("127.0.0.1:9160")
                )
                .withAstyanaxConfiguration(new AstyanaxConfigurationImpl()
                        .setCqlVersion("3.0.0")
                        .setTargetCassandraVersion("2.0.5"))
                .withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
                .buildKeyspace(ThriftFamilyFactory.getInstance());

        context.start();
        keyspace = context.getClient();

        EMP_CF = ColumnFamily.newColumnFamily(
                EMP_CF_NAME,
                LongSerializer.get(),
                AsciiSerializer.get());
    }

    public void insert(long time) {
        MutationBatch m = keyspace.prepareMutationBatch();

        ColumnListMutation<String> x =
                m.withRow(EMP_CF, time);
        for(int i=0;i<10000;i++)
            x.putColumn(Integer.toString(i), Integer.toString(i));

        try {
            @SuppressWarnings("unused")
            Object result = m.execute();
        } catch (ConnectionException e) {
            logger.error("failed to write data to C*", e);
            throw new RuntimeException("failed to write data to C*", e);
        }
        logger.debug("insert ok");
    }

    public void createCF() {
    }

    public void read(long time) {
        OperationResult<ColumnList<String>> result;
        try {
            result = keyspace.prepareQuery(EMP_CF)
                    .getKey(time)
                    .execute();

            ColumnList<String> cols = result.getResult();
            // process data

            // a) iterate over columsn
            for (Iterator<Column<String>> i = cols.iterator(); i.hasNext(); ) {
                Column<String> c = i.next();
                String v = c.getStringValue();
                System.out.println(c.getName() + " " + v);
            }

        } catch (ConnectionException e) {
            logger.error("failed to read from C*", e);
            throw new RuntimeException("failed to read from C*", e);
        }
    }

    public static void main(String[] args) {
        AstClient c = new AstClient();
        c.init();
        long t00 = System.nanoTime();
        for(int i=0;i<1000;i++) {
            long t0 = System.nanoTime();
            c.insert(i);
            long dt = System.nanoTime() - t0;
            System.out.println((1.0e9/dt) + " " + i);
        }
        long dtt = System.nanoTime() - t00;

        c.read(0);
        System.out.println(dtt / 1e9);
    }

}

更新:我在cassandra-user邮件列表中找到了这个帖子 . 在进行大型宽行插入时,CQL似乎存在性能问题 . 有一个票据CASSANDRA-6737来跟踪这个问题 .

Update2:我已经尝试了附加到CASSANDRA-6737的补丁,我可以确认这个补丁完全解决了这个问题 . 感谢DataStax的Sylvain Lebresne如此快速地解决这个问题!

3 回答

  • 5

    你不是唯一经历过这种情况的人 . 我刚才写了一篇博客文章,更多地关注CQL和thrift之间的转换,但是有人看到同样事情的人们的邮件列表问题的链接(宽行插入的性能问题是我调查的最初动机):http://thelastpickle.com/blog/2013/09/13/CQL3-to-Astyanax-Compatibility.html

    总而言之 - CQL非常适合消除处理打字和解决Cassandra新手数据模型的负担 . DataStax驱动程序编写得很好,包含许多有用的功能 .

    但是,对于宽行插入,Thrift API的速度要快一些 . Netflix博客并未详细介绍此特定用例 . 此外,只要人们使用Thrift API(很多人都是),它就不是传统的 . 这是一个ASF项目,因此不是由任何一个供应商运行 .

    通常,对于任何基于Cassandra的应用程序,如果您找到一种方法来满足(或经常超过)工作负载的性能要求,请坚持使用它 .

  • 8

    你的代码中有一个错误,我认为你解释了很多你遇到的性能问题:对于每一批你再次准备语句 . 准备一个声明并不是非常昂贵,但是这样做会增加很多延迟 . 您等待准备该语句的时间是您不构建批处理的时间,以及Cassandra不会花费处理该批处理的时间 . 准备好的声明只需要准备一次,并且应该重复使用 .

    我认为很多糟糕的表现可以解释延迟问题 . 瓶颈很可能是你的应用程序代码,而不是Cassandra . 即使您只准备了一次该语句,您仍然会花大部分时间在应用程序中进行CPU绑定(构建大批量)或不做任何事情(等待网络和Cassandra) .

    您可以做两件事:首先使用CQL驱动程序的异步API并构建下一批,而网络和Cassandra正忙于您刚刚完成的那个;其次尝试运行多个线程做同样的事情 . 您需要尝试的确切线程数取决于您拥有的核心数以及您是否在同一台计算机上运行一个或三个节点 .

    在同一台计算机上运行三节点群集会使群集比运行单个节点慢,而在不同的计算机上运行会使其更快 . 在同一台机器上运行应用程序也没有多大帮助 . 如果要测试性能,请仅运行一个节点或在不同的计算机上运行实际群集 .

    批次可以为您提供额外的性能,但并非总是如此 . 它们可能导致您在测试代码中看到的那种问题:缓冲膨胀 . 批量过大会导致应用程序花费太多时间构建它们,然后将太多时间推送到网络上,等待Cassandra处理它们的时间过长 . 您需要尝试批量大小并查看哪种方法效果最好(但是使用真实群集进行操作,否则您将看不到网络的影响,这将是批量变大时的一个重要因素) .

    如果您使用批次,请使用压缩 . 压缩在大多数请求加载中没有区别(响应是另一回事),但是当您发送大批量时,它可以产生很大的不同 .

    Cassandra中的宽行写入并没有什么特别之处 . 除了一些例外,模式不会改变处理写入所花费的时间 . 我运行的应用程序每秒执行数万次非批量混合宽行和非宽行写入 . 群集不大,每个只有三个或四个m1.xlarge EC2节点 . 诀窍是永远不要等待请求在发送下一个请求之前返回(这并不意味着火灾和遗忘,只是以相同的异步方式处理响应) . 延迟是性能杀手 .

  • 2

    你可以尝试一些事情......在你的 cassandra.yaml (这是Cassandra 1.2.x,也许在2.x中调用params有所不同):

    • 禁用行缓存( row_cache_size_in_mb: 0

    • 在内存中的行溢出到磁盘之前增加内存限制( min_memory_compaction_limit_in_mb ),只有在看到某些日志输出表明溢出确实发生时才执行此操作

    • 确保正确配置 num_tokens / initial_token 值,以便行在您的节点之间分布

    您可以尝试的其他事项:

    • 将群集中的所有节点IP提供给客户端,而不仅仅是一个

    • 为每个Cassandra节点提供更多RAM

    • 尝试运行测试多线程

    • 如果您在Linux上运行Cassandra,请确保您有JNA installed并且正在使用中

    要澄清的事情:

    • 您是否通过 nodetool 确认3个节点已找到对方?

    • nodetool 对3个节点的负载分配有什么看法?

    • 虚拟集群的物理主机对CPU和I / O使用情况有何看法?也许它已经完全超出了?

相关问题