首页 文章

将Apache Ignite BinaryObject与SQL表混合使用

提问于
浏览
0

我正在评估Apache Ignite并尝试了解各种模型如何相互映射 . 到目前为止,我发现,无论您使用哪个引擎/ API来访问数据,底层存储都是KV对(这是对的吗?) .

现在想到几个问题:

  • 每个SQL表是否映射到不同的缓存,即 create table acreate table b 创建两个缓存, ab

我问这个因为API,当前的API允许您创建一个缓存实例并对其运行多个create table查询 . 最初我认为这意味着缓存类似于来自RDBMS的DB构造,但是从点燃主分支获取的示例中的注释(如下所示)另有说明 .

  • 您可以创建的缓存数量有哪些实际限制,或者您可以简单地继续添加新节点以扩展网格中的缓存数量?

  • BinaryObject 如何与SQL表相关...?在我看过树的例子中,看起来你可以创建一个二进制对象然后通过SQL访问,只要你提供 QueryEntity API的映射 .

  • 仅使用 BinaryObject s vs表是否有好处?到目前为止,在我看来,创建表应该只映射到底层impl中的二进制对象 . 像索引,类型和 QueryEntity 映射一样自动完成 .

  • 二进制/表类型(表/缓存名称和列/字段名称)之间的命名限制是什么?我在一个例子中看到,可以在二进制对象上使用类似 a.b 的字段作为字段名称,但我不清楚如何通过SQL访问它,因为我相信这样的名称会与现有的语义冲突 .

  • 是否有Ignite中可用的各种结构的图表/摘要以及它们之间的关系?看到类似的东西会把我迄今为止所读到的所有东西彻底带给我 . 目前正在阅读“使用Ignite进行高性能内存计算”,但尚未完成,但是从内容页面到目前为止所读到的内容我感觉它没有涵盖其中的一些 .

最后,到目前为止,可能是一个混乱的理解,我尝试调整一个Java示例来结合我提出的大部分问题,但到目前为止还未能使它工作 .

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.QueryIndex;
import org.apache.ignite.cache.QueryIndexType;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.configuration.CacheConfiguration;

import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;

public class TableAndBinaryObjectCacheExperiment {
  private static final String cacheName = "some-cache-name";

  @SuppressWarnings({"unused", "ThrowFromFinallyBlock"})
  public static void main(String[] args) throws Exception {
    try (Ignite ignite = Ignition.start("ignite/ignite.xml")) {
      if (!ignite.cluster().active()) ignite.cluster().active(true);

      // Create dummy cache to act as an entry point for SQL queries (new SQL API which do not require this
      // will appear in future versions, JDBC and ODBC drivers do not require it already).
      CacheConfiguration<?, ?> cacheCfg = new CacheConfiguration<>(cacheName).setSqlSchema("PUBLIC");
      //
      LinkedHashMap<String, String> fields = new LinkedHashMap<>();
      fields.put("person_id", Long.class.getName());
      fields.put("name", String.class.getName());
      fields.put("address.postcode", String.class.getName());
      fields.put("age", Integer.class.getName());
      fields.put("about", String.class.getName());
      fields.put("misc", String.class.getName());

      QueryEntity testBinType = new QueryEntity();
      testBinType.setKeyType(String.class.getName());
      testBinType.setValueType("TestType");

      //primary key
      testBinType.setKeyType(Long.class.getName());
      testBinType.setKeyFieldName("test_id");

      testBinType.setFields(fields);
      testBinType.setTableName("test_type");
      testBinType.setIndexes(Arrays.asList(
        new QueryIndex("name"),
        new QueryIndex("address.postcode"),
        new QueryIndex("age"),
        new QueryIndex("about", QueryIndexType.FULLTEXT),
        new QueryIndex("person_id")
      ));

      CacheConfiguration<?, ?> binaryConf1 = new CacheConfiguration<>(cacheName);
      binaryConf1.setCacheMode(CacheMode.PARTITIONED);
      binaryConf1.setQueryEntities(Collections.singletonList(testBinType));
      //
      try (
        IgniteCache<?, ?> cache = ignite.getOrCreateCache(cacheCfg);
        IgniteCache<?, ?> binCacheX = ignite.getOrCreateCache(binaryConf1)
      ) {

        IgniteCache<?, ?> binCache = cache.withKeepBinary();
        // Create reference City table based on REPLICATED template.
        cache.query(new SqlFieldsQuery("CREATE TABLE IF NOT EXISTS city (id LONG PRIMARY KEY, name VARCHAR) WITH \"template=replicated\"")).getAll();
        // Create table based on PARTITIONED template with one backup.
        cache.query(new SqlFieldsQuery("CREATE TABLE IF NOT EXISTS person (id LONG, name VARCHAR, city_id LONG, PRIMARY KEY (id, city_id)) WITH \"backups=1, affinity_key=city_id\"")).getAll();
        // Create an index.
        cache.query(new SqlFieldsQuery("CREATE INDEX IF NOT EXISTS on Person (city_id)")).getAll();

        print("Created database objects.");

        SqlFieldsQuery qry = new SqlFieldsQuery("INSERT INTO city (id, name) VALUES (?, ?)");

        cache.query(qry.setArgs(1L, "Forest Hill")).getAll();
        cache.query(qry.setArgs(2L, "Denver")).getAll();
        cache.query(qry.setArgs(3L, "St. Petersburg")).getAll();

        qry = new SqlFieldsQuery("INSERT INTO person (id, name, city_id) values (?, ?, ?)");

        cache.query(qry.setArgs(1L, "John Doe", 3L)).getAll();
        cache.query(qry.setArgs(2L, "Jane Roe", 2L)).getAll();
        cache.query(qry.setArgs(3L, "Mary Major", 1L)).getAll();
        cache.query(qry.setArgs(4L, "Richard Miles", 2L)).getAll();

        qry = new SqlFieldsQuery("INSERT INTO test_type (test_id, name, age, about, \"address.postcode\") values (? ?, ?, ?, ?)");
        cache.query(qry.setArgs(1L, "Courtney", 12, "this is about me", "AB12CD", 3L));

        SqlFieldsQuery joinQuery = new SqlFieldsQuery(
          "SELECT p.name, c.name, t.about, \"t.address.postcode\" " +
            "FROM Person p " +
            "INNER JOIN City c on c.id = p.city_id " +
            "INNER JOIN test_type t on p.id = t.person_id " +
            "LIMIT 50");
        List<List<?>> res = cache.query(joinQuery).getAll();
        for (Object next : res)
          System.out.println(">>>    " + next);
      } finally {
        // Distributed cache can be removed from cluster only by #destroyCache() call.
        ignite.destroyCache(cacheName);
      }

      print("Cache query DDL example finished.");
    }
  }

  /**
   * Prints message.
   *
   * @param msg Message to print before all objects are printed.
   */
  private static void print(String msg) {
    System.out.println();
    System.out.println(">>> " + msg);
  }
}

我已经阅读了文档,但要么错过了这些信息,要么它们不会立即显现或存在 .

3 回答

  • 2
    • 创建两个表将创建两个缓存,默认情况下名为 SQL_PUBLIC_ASQL_PUBLIC_B . 一个缓存 - 一个表 . SQL查询当前需要在Native API中调用缓存,这一点很重要 . 您可以拥有一些默认缓存,在其上运行所有 CREATESELECT .

    • 所有节点都对所有缓存进行计费,因此它不会随节点数量而扩展 . 实际上,创建缓存是一项繁重的操作,缓存每个节点需要大约20M RAM . 避免创建太多缓存 . CacheGroup s让不同的具有相同结构的缓存共享大多数基础架构,节省开销 .

    • 缓存中的任何键或值(非原始值)都可以由 BinaryObject 表示 . 这意味着您始终可以访问 BinaryObject s行 . 如果在创建缓存时提供了映射,则还可以将 BinaryObject s作为行访问 .

    • 您可以将 Cache APIBinaryObject 一起使用,包括例如 IgniteDataStreamer . Ignite有大量的API,其中大部分都可以通过 BinaryObject 来访问,但只能通过表访问SQL查询 . 所以 BinaryObject 是你的编组工具 .

    • 您的里程可能会有所不同 . 我的建议是使用字母数字 . 请注意 BinaryObject 是所有功能的超集,因此只有 BinaryObject 的一个子集可能对SQL引擎有意义 .

    • 这张图片?
      https://ignite.apache.org/images/data_grid.png

    现在,修复代码示例需要单独响应 .

  • 1
    import org.apache.ignite.Ignite;
    import org.apache.ignite.IgniteCache;
    import org.apache.ignite.Ignition;
    import org.apache.ignite.cache.CacheMode;
    import org.apache.ignite.cache.QueryEntity;
    import org.apache.ignite.cache.QueryIndex;
    import org.apache.ignite.cache.QueryIndexType;
    import org.apache.ignite.cache.query.SqlFieldsQuery;
    import org.apache.ignite.configuration.CacheConfiguration;
    
    import java.util.Arrays;
    import java.util.Collections;
    import java.util.LinkedHashMap;
    import java.util.List;
    
    public class TableAndBinaryObjectCacheExperiment {
      private static final String cacheName = "some-cache-name";
    
      @SuppressWarnings({"unused", "ThrowFromFinallyBlock"})
      public static void main(String[] args) throws Exception {
        try (Ignite ignite = Ignition.start("ignite/ignite.xml")) {
          if (!ignite.cluster().active()) ignite.cluster().active(true);
    
          // Create dummy cache to act as an entry point for SQL queries (new SQL API which do not require this
          // will appear in future versions, JDBC and ODBC drivers do not require it already).
          CacheConfiguration<?, ?> cacheCfg = new CacheConfiguration<>("default").setSqlSchema("PUBLIC");
          //
          LinkedHashMap<String, String> fields = new LinkedHashMap<>();
          fields.put("person_id", Long.class.getName());
          fields.put("name", String.class.getName());
          fields.put("address_postcode", String.class.getName());
          fields.put("age", Integer.class.getName());
          fields.put("about", String.class.getName());
          fields.put("misc", String.class.getName());
    
          QueryEntity testBinType = new QueryEntity();
          testBinType.setValueType("TestType");
    
          //primary key
          testBinType.setKeyType(Long.class.getName());
          testBinType.setKeyFieldName("person_id");
    
          testBinType.setFields(fields);
          testBinType.setTableName("test_type");
          testBinType.setIndexes(Arrays.asList(
            new QueryIndex("name"),
            new QueryIndex("address_postcode"),
            new QueryIndex("age"),
            new QueryIndex("about", QueryIndexType.FULLTEXT)
          ));
    
          CacheConfiguration<?, ?> binaryConf1 = new CacheConfiguration<>(cacheName);
          binaryConf1.setCacheMode(CacheMode.PARTITIONED);
          binaryConf1.setQueryEntities(Collections.singletonList(testBinType));
          //
          try (
            IgniteCache<?, ?> cache = ignite.getOrCreateCache(cacheCfg);
            IgniteCache<?, ?> binCacheX = ignite.getOrCreateCache(binaryConf1)
          ) {
    
            IgniteCache<?, ?> binCache = cache.withKeepBinary();
            // Create reference City table based on REPLICATED template.
            cache.query(new SqlFieldsQuery("CREATE TABLE IF NOT EXISTS city (id LONG PRIMARY KEY, name VARCHAR) WITH \"template=replicated\"")).getAll();
            // Create table based on PARTITIONED template with one backup.
            cache.query(new SqlFieldsQuery("CREATE TABLE IF NOT EXISTS person (id LONG, name VARCHAR, city_id LONG, PRIMARY KEY (id, city_id)) WITH \"backups=1, affinity_key=city_id\"")).getAll();
            // Create an index.
            cache.query(new SqlFieldsQuery("CREATE INDEX IF NOT EXISTS on Person (city_id)")).getAll();
    
            print("Created database objects.");
    
            SqlFieldsQuery qry = new SqlFieldsQuery("INSERT INTO city (id, name) VALUES (?, ?)");
    
            cache.query(qry.setArgs(1L, "Forest Hill")).getAll();
            cache.query(qry.setArgs(2L, "Denver")).getAll();
            cache.query(qry.setArgs(3L, "St. Petersburg")).getAll();
    
            qry = new SqlFieldsQuery("INSERT INTO person (id, name, city_id) values (?, ?, ?)");
    
            cache.query(qry.setArgs(1L, "John Doe", 3L)).getAll();
            cache.query(qry.setArgs(2L, "Jane Roe", 2L)).getAll();
            cache.query(qry.setArgs(3L, "Mary Major", 1L)).getAll();
            cache.query(qry.setArgs(4L, "Richard Miles", 2L)).getAll();
    
            qry = new SqlFieldsQuery("INSERT INTO \"some-cache-name\".test_type (person_id, name, age, about, address_postcode) values (?, ?, ?, ?, ?)");
            cache.query(qry.setArgs(1L, "Courtney", 12, "this is about me", "AB12CD", 3L));
    
            SqlFieldsQuery joinQuery = new SqlFieldsQuery(
              "SELECT p.name, c.name, t.about, t.address_postcode " +
                "FROM Person p " +
                "INNER JOIN City c on c.id = p.city_id " +
                "INNER JOIN \"some-cache-name\".test_type t on p.id = t.person_id " +
                "LIMIT 50");
            List<List<?>> res = cache.query(joinQuery).getAll();
            for (Object next : res)
              System.out.println(">>>    " + next);
          } finally {
            // Distributed cache can be removed from cluster only by #destroyCache() call.
            ignite.destroyCache(cacheName);
          }
    
          print("Cache query DDL example finished.");
        }
      }
    
      /**
       * Prints message.
       *
       * @param msg Message to print before all objects are printed.
       */
      private static void print(String msg) {
        System.out.println();
        System.out.println(">>> " + msg);
      }
    }
    
    • 将默认缓存重命名为"default"(您试图通过"some-cache-name"有两个缓存,只创建了一个缓存,没有SQL表)

    • 删除额外的setKeyType(),将test_id重命名为person_id,删除它上面的索引,因为它是隐式的 .

    • 用person.address中的下划线替换点 .

    • 将模式添加到INSERT和SELECT,添加缺少的逗号 .

    现在好多了:

    >>> Created database objects.
    >>>    [John Doe, St. Petersburg, this is about me, AB12CD]
    
    >>> Cache query DDL example finished.
    
  • 0

    BinaryObject是Ignite的数据序列化和数据存储格式 . 您可能会将其视为JSON,它是针对Ignite特性进行优化和设计的 .

    格式的好处是您可以在服务器节点端运行计算和其他操作,从而避免对其类表单进行数据反序列化 .

    谈到SQL,您通过INSERTS,UPDATES等添加到集群的所有数据也将以BinaryObject形式存储 .

    如果您想混合SQL,键值和计算API,那么您可以使用CREATE TABLE命令创建一个表/缓存,使用CACHE_NAME参数将缓存名称覆盖到您喜欢的名称 . 稍后,使用您的缓存名称键值,计算网格和其他操作 . 请参阅this project,其中显示了API如何混合 .

相关问题