首页 文章

在Wildfly 9 AS中实现Infinispan缓存(考虑集群)

提问于
浏览
1

The situation:

我有一个包含数千条记录的清算表 . 它们被分成例如包装盒 . 500条记录 . 然后,每个数据包通过Message Driven Beans发送到AS . AS根据每个记录的内容(例如currency,validStart,validEnd)计算密钥,并且需要将该密钥存储在数据库中(与内容的组合一起) .

The request:

为了避免重复,我想要一个集中的“工具”,它计算密钥并存储它们,从而减少与数据库的通信,方法是用记录缓存这些密钥 .

现在,我尝试在每个包处理线程的Utility-class-implementation中使用本地Infinispan缓存 . 这导致了这样的事实:多个包计算了相同的密钥,因此在数据库中插入了重复的密钥 . 或者有时我遇到了僵局 .

我尝试通过静态变量实现“锁定”以在数据库插入期间阻止对缓存的访问,但没有成功 . 下一次尝试是使用复制的分布式Infinispan缓存 . 这并没有改变AS行为的结果 .

我的最后一个想法是实现作为bean管理的单例会话bean,以在插入数据库期间获取事务锁 .

AS当前以独立模式运行,但在不久的将来将被移动到集群,因此首选高可用性解决方案 .

Resuming:

在创建(Key,Value)对期间锁定Infinispan缓存访问的正确方法是什么,以避免重复?

Update:

@cruftex:我的请求是:我有一组(Key,Value)对,应该被缓存 . 如果要发生新记录的插入,则对其应用算法并计算密钥 . 然后,如果密钥已经存在,则应检查缓存,并将Value附加到新记录 . 但如果Value不存在,则应创建并存储在数据库中 .

需要使用Infinispan实现缓存,因为AS应在集群中运行 . 存在用于创建密钥的算法 . 也可以在数据库中插入Value(通过JDBC或实体) . 但我有问题,使用消息驱动Bean(以及AS中的多线程)相同(密钥,值)对在不同的线程中计算,因此每个线程尝试在数据库中插入值(我想避免!) .

@戴夫:

public class Cache {
     private static final Logger log = Logger.getLogger(Cache.class);
     private final Cache<Key, FullValueViewer> fullCache;
     private HomeCache homes;        // wraps EntityManager
     private final Session session;

     public Cache(Session session, EmbeddedCacheManager cacheContainer, HomeCache homes) {
         this.session = session;
         this.homes = homes;
         fullCache = cacheContainer.getCache(Const.CACHE_CONDCOMBI);
     } 

     public Long getId(FullValueViewer viewerWithoutId) {
         Long result = null;

         final Key key = new Key(viewerWithoutId);
         FullValueViewer view = fullCache.get(key);

         if(view == null) {
             view = checkDatabase(viewerWithoutId);
             if(view != null) {
                 fullCache.put(key, view);
             }
         }

         if(view == null) {
             view = createValue(viewerWithoutId);

             // 1. Try
             fullCache.put(key, view);

             // 2. Try
             //      if(!fullCache.containsKey(key)) {
             //           fullCache.put(key, view);
             //       } else {
             //           try {
             //               homes.condCombi().remove(view.idnr);
             //           } catch (Exception e) {
             //               log.error("remove", e);
             //           }
             //       }

             // 3. Try
             //       synchronized(fullCache) {
             //           view = createValue(viewerWithoutId);
             //           fullCache.put(key, view);
             //       }
         }
         result = view.idnr;
         return result;
     }

     private FullValueViewer checkDatabase(FullValueViewer newView) {
         FullValueViewer result = null;
         try {
             CondCombiBean bean = homes.condCombi().findByTypeAndKeys(_parameters_);
             result = bean.getAsView();
         } catch (FinderException e) {
         }
         return result;
     }

     private FullValueViewer createValue(FullValueViewer newView) {
         FullValueViewer result = null;
         try {
             CondCombiBean bean = homes.condCombi().create(session.subpk);
             bean.setFromView(newView);
             result = bean.getAsView();
         } catch (Exception e) {
             log.error("createValue", e);
         }
         return result;
     }

     private class Key {

         private final FullValueViewer view;

         public Key(FullValueViewer v) {
            this.view = v;
         }

         @Override
         public int hashCode() {
             _omitted_
         }

         @Override
         public boolean equals(Object obj) {
             _omitted_
         }
     }
 }

我尝试使用Wildfly的缓存配置:

<cache-container name="server" default-cache="default" module="org.wildfly.clustering.server">
   <local-cache name="default">
      <transaction mode="BATCH"/>
   </local-cache>
</cache-container>

<cache-container name="server" default-cache="default" module="org.wildfly.clustering.server">
   <transport lock-timeout="60000"/>
   <distributed-cache name="default" mode="ASYNC"/>
</cache-container>

3 回答

  • 0

    我只会对简历问题作出反应:

    你可以't lock whole cache; that wouldn'吨 . 最好的方法是使用 cache.putIfAbsent(key, value) 操作,如果条目已经存在则生成不同的键(或使用list作为值并使用条件 cache.replace(key, oldValue, newValue) 替换它) .

    如果要真正禁止对某些键进行写入,可以使用具有悲观锁定策略的事务高速缓存,并发出 cache.getAdvancedCache().lock(key) . 请注意,没有解锁:当事务通过事务管理器提交/回滚时,将释放所有锁 .

  • 0

    您无法生成自己的密钥并使用它来同时检测重复项 .

    每个数据行都保证只到达一次,或者它需要体现来自生成它的外部系统的唯一标识符 .

    如果数据中有唯一的标识符,如果全部出错,并且没有id,那么只有所有属性连接在一起,那么您需要使用它来检查重复项 .

    现在,您可以直接使用该唯一标识符,或生成自己的内部标识符 . 如果您执行后者,则需要从外部ID到内部ID的转换 .

    如果重复到达,则需要在生成内部标识时根据外部标识锁定,然后记录您分配的内部标识 .

    要在群集中生成唯一的长值序列,可以使用缓存的CAS操作 . 例如这样的事情:

    @NotThreadSafe
    class KeyGeneratorForOneThread {
    
      final String KEY = "keySequenceForXyRecords";
      final int INTERVAL = 100;
      Cache<String,Long> cache = ...;
      long nextKey = 0;
      long upperBound = -1;
    
      void requestNewInterval() {
        do {
          nextKey = cache.get(KEY);
          upperBound = nextKey + INTERVAL;
        } while (!cache.replace(KEY, nextKey, upperBound));
      } 
    
      long generateKey() {
        if (nextKey >= upperBound) {
         requestNewInterval();
        }
        return nextKey++;
      }
    }
    

    每个线程都有自己的密钥生成器,无需协调即可生成100个密钥 .

    您可能需要单独的缓存:

    • 由外部ID锁定

    • 从外部查找到内部ID

    • 序列号,注意实际上不是缓存,因为它必须知道重启后的最后一个数字

    • 内部id到数据

  • 0

    我们找到了一个适用于我们案例的解决方案,可能对其他人有帮助:

    我们有两个主要组件,一个缓存类和一个单例bean .
    缓存包含所有记录的副本目前存在于数据库中并且有很多逻辑 .
    单例bean可以访问infinispan-cache并用于创建新记录 .

    最初,缓存从singleton bean中获取infinispan-cache的副本 . 然后,如果我们在缓存中搜索记录,我们首先应用一种哈希方法,它计算记录的unqiue密钥 . 如果需要将记录添加到数据库中,我们可以使用此密钥进行识别 . 如果是,则缓存使用带有@Lock(WRITE)Annotation的create-method调用singleton bean . 如果值包含在infinispan-cache中,create方法首先检查,如果不包含,则创建新记录 .

    使用这种方法,我们可以保证,即使缓存在多个线程中使用,并且每个线程发送一个请求以在数据库中创建相同的记录,创建过程也会被锁定,并且所有后续请求都不会继续,因为值是已在先前的请求中创建 .

相关问题