首页 文章

使用数据存储Cassandra客户端执行并发表创建

提问于
浏览
1

通过以下方式清除密钥空间后:

drop keyspace simplex

我正在通过datastax java Cassandra客户端(来自scala代码)执行以下命令:

val songsTable = (
    "CREATE TABLE IF NOT EXISTS simplex.songs ("
    + "id uuid PRIMARY KEY,"
    + "title text,"
    + "album text,"
    + "artist text,"
    + "tags set<text>,"
    + "data blob"
    + ");")

  val listsTable = (
    "CREATE TABLE IF NOT EXISTS simplex.playlists ("
    + "id bigint,"
    + "title text,"
    + "album text, "
    + "artist text,"
    + "song_id uuid,"
    + "PRIMARY KEY (id, title, album, artist)"
    + ");")

  val songs = (
    "INSERT INTO simplex.songs "
    + "(id, title, album, artist, tags) "
    + "VALUES ("
    + "756716f7-2e54-4715-9f00-91dcbea6cf50,"
    + "'La Petite Tonkinoise',"
    + "'Bye Bye Blackbird',"
    + "'Joséphine Baker',"
    + "{'jazz', '2013'}"
    + ");")
  ...
  ...
  ...
  val rsf = session.executeAsync(command) // in parallel
  rsf.addListener(() => p.success(rsf.get), exec)

这导致仅创建播放列表表,并且永远不会执行“创建歌曲表”和“插入歌曲”命令的回调 .

据我所知,datastax java Cassandra客户端可以安全地同时使用 . 这不是这种情况吗?我的假设有什么问题?

1 回答

  • 0

    当您创建表或键空间(在集成测试中很常见)时,您需要关闭 SessionCluster 实例,因为 Cluster 元数据已过期 . 虽然我看到一条日志消息说驱动程序正在尝试异步更新元数据但它似乎永远不会及时返回 - 关闭并重新打开 ClusterSession 实例,足以进行集成测试 .

    因为 Cluster.connect(keyspaceName) 存在并且我似乎无法让 USE keyspace; 作为执行的命令正常工作(永远不要在整个应用程序中共享会话的意义,任何客户端可以随意使用另一个键空间)我最后编写了以下内容:

    (此处 _clusterOption[Cluster] ,在 preStart 期间为我的 CassandraSessionActor 初始化, keyspace 是实现者定义的 Option[String] . CassandraSessionActor 在请求时返回 Session ,但在 preStart 期间调用 getSession 一次,因此它可以跨应用程序共享会话,按照Datastax 4 Simple Rules . )

    trait AutoCreateKeyspace extends CassandraSessionActor {
      override def getSession = {
        keyspace match {
          case None => _cluster.get.connect()
          case Some(ks) =>
            _cluster.map { cluster =>
              if (cluster.getMetadata.getKeyspace(ks) == null &&
                cluster.getMetadata.getKeyspace(ks.toLowerCase) == null) {
                val temporarySession = cluster.connect()
                log.debug(s"creating keyspace $ks")
                temporarySession.execute(s"""CREATE KEYSPACE $ks
                                           |with replication = {
                                           |'class':                'SimpleStrategy',
                                           |'replication_factor':    2 }""".stripMargin)
                temporarySession.close()
                cluster.close()
              }
            }
            _cluster = Some(Cassandra.createNewCluster())
            _cluster.get.connect(ks)
        }
    
      }
    }
    
    private object Cassandra extends LazyLogging {
    
      def conf = ConfigFactory.load()
    
      val hosts = conf.getStringList("cassandra.hosts")
    
      logger.info(s"cassandra.hosts specified: $hosts")
    
    
      def createNewCluster(): Cluster = {
        Cluster.builder()
          .addContactPoints(hosts: _*)
          .build()
      }
    
    }
    

相关问题