我是HBase中的新手 . 我的应用程序使用Kafka使用者(多线程),它们从Kafka读取消息 . 每个消费者从Kafka读取消息,从消息中创建put,创建HBase表并在HBase中执行插入 .

但伙计们,对我来说,有一些阻碍 . 这个流程在一段时间后被绞死 . 我已经附上相同的代码,看看 .

public void doBatchInsertionForDeviceTable(List<Put> totalRows, Integer hbaseTableSuffix) {
    long start, end;
    HTable tableForDeviceTable = null;
    HBaseConfigurationCache hBaseConfigurationCache = null;
    String tableName = null;
    start = System.currentTimeMillis();
    try {
        hBaseConfigurationCache = CacheManager.getInstance().getCache(HBaseConfigurationCache.class);
        tableName = DataHandlerConstant.HBASE_DEVICE_TABLE + String.valueOf(hbaseTableSuffix);
        tableForDeviceTable = new HTable(hBaseConfigurationCache.getHBaseConfiguration(), tableName);
        LOG.info("Attempting HBase Insertion in Table (" + tableName + ") with rows (" + totalRows.size() + ")");
        try {
            putCountForDeviceStream.addAndGet(totalRows.size());
            LOG.info("Total put have been build till now is :("+putCountForDeviceStream.get()+") for device stream.");
            tableForDeviceTable.put(totalRows);
            LOG.info("Batch have been inserted successfully.");

        } catch (Exception ex) {
            LOG.error("Problem while inserting rows (" + totalRows.size() + ") ." + ex.getLocalizedMessage());
            ex.printStackTrace();
        } finally {
            tableForDeviceTable.flushCommits();
        }
    } catch (Exception ex) {
        LOG.error("Sorry,We are unable to perform insertion due to (" + ex.getMessage() + ")");
        ex.printStackTrace();
    }
    end = System.currentTimeMillis();
    LOG.info("Consume Time HBase Insertion in Table (" + tableName + ") with rows (" + totalRows.size() + ") is (" + (end - start) + ")");

}

线程转储看起来像这样:

java.lang.Thread.State:在sun.misc.Unsafe.park(原生方法)中等待(停放) - 在java停止等待<0x00000005adbb0720>(java.util.concurrent.locks.AbstractQueuedSynchronizer $ ConditionObject) . util.concurrent.locks.LockSupport.park(LockSupport.java:186)在java.util.concurrent.locks.AbstractQueuedSynchronizer中$ ConditionObject.await(AbstractQueuedSynchronizer.java:2043)在java.util.concurrent.LinkedBlockingQueue.take(的LinkedBlockingQueue . java:442)at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:67)锁定可拥有的同步器: - 无“metrics-meter-tick-thread-2”守护进程prio = 10 tid = 0x00007fed950c1800 nid = 0x3006 runnable [ 0x00007fedb4451000] java.lang.Thread.State:TIMED_WAITING(停放)在sun.misc.Unsafe.park(本机方法) - 在java停车等待<0x00000005ad4b4608>(java.util.concurrent.locks.AbstractQueuedSynchronizer $ ConditionObject) java.util.concurrent.locks.AbstractQueuedSyn中的.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226) chronizer $ ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)at java.util.concurrent.ScheduledThreadPoolExecutor $ DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1090)at java.util.concurrent.ScheduledThreadPoolExecutor $ DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807)在java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)在java.util.concurrent.ThreadPoolExecutor中的$ Worker.run(ThreadPoolExecutor.java :615)at java.lang.Thread.run(Thread.java:744)锁定的可拥有同步器: - 无“metrics-meter-tick-thread-1”守护进程prio = 10 tid = 0x00007fed950c8800 nid = 0x3005等待条件[0x00007fedb4653000 ] java.lang.Thread.State:在sun.misc.Unsafe.park(原生方法)中等待(停放) - 在java停车等待<0x00000005ad4b4608>(java.util.concurrent.locks.AbstractQueuedSynchronizer $ ConditionObject) . util.concurrent.locks.LockSupport.park(LockSuppo rt.java:186)在java.util.concurrent.locks.AbstractQueuedSynchronizer中$ ConditionObject.await(AbstractQueuedSynchronizer.java:2043)在java.util.concurrent.ScheduledThreadPoolExecutor中$ DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1085)在java.util中.concurrent.ScheduledThreadPoolExecutor $ DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807)在java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)在java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:615)at java.lang.Thread.run(Thread.java:744)锁定的可拥有同步器: - 无“pool-42-thread-1”prio = 10 tid = 0x00007fed94b3c800 nid = 0x2ffe in Object.wait()[0x00007fedb4955000] java.lang.Thread.State:TIMED_WAITING(在对象监视器上)java.lang.Object.wait(Native Method)at org.apache.hadoop.hbase .client.AsyncProcess.waitForNextTaskDone(AsyncProcess.java:853) - 已锁定<0x00000005af1d9880>(a java.util.concurrent.atomic.AtomicLong中)在org.apache.hadoop.hbase.client.AsyncProcess.waitForMaximumCurrentTasks(AsyncProcess.java:879)在org.apache.hadoop.hbase.client.AsyncProcess.waitUntilDone(AsyncProcess.java: 892)org.apache.hadoop.hbase.client.HTable.backgroundFlushCommits(HTable.java:968)atg.apache.hadoop.hbase.client.HTable.flushCommits(HTable.java:1252)at com.snapdeal.datahandler .utils.hbase.HBaseUtils.doBatchInsertionForClickStream(HBaseUtils.java:290)在com.snapdeal.datahandler.hbase.queue.consumer.HBaseWorkerThread.performHBaseInsertion(HBaseWorkerThread.java:111)在com.snapdeal.datahandler.hbase.queue.consumer .HBaseWorkerThread.run(HBaseWorkerThread.java:65)位于java.util.concurrent.Executors $ RunnableAdapter.call(Executors.java:471),位于java的java.util.concurrent.FutureTask.run(FutureTask.java:262) . util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)at atJava.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:615)at java.lang.Thread.run(Thread.java:744)锁定的可拥有同步器: - <0x00000005af1d8260>(java.util.concurrent.ThreadPoolExecutor $ Worker)在java.lang.Object中的Object.wait()[0x00007fedb4a56000] java.lang.Thread.State:TIMED_WAITING(在对象监视器上)“pool-41-thread-1”prio = 10 tid = 0x00007fed94b3a800 nid = 0x2ffd org.apache.hadoop.hbase.client.AsyncProcess.waitForNextTaskDone(AsyncProcess.java:853)中的.wait(Native Method) - 在org.apache.hadoop上锁定<0x00000005aedcf770>(java.util.concurrent.atomic.AtomicLong)位于org.apache.hadoop.hbase.client.HTable.backgroundFlushCommits的org.apache.hadoop.hbase.client.AsyncProcess.waitUntilDone(AsyncProcess.java:892)中的.hbase.client.AsyncProcess.waitForMaximumCurrentTasks(AsyncProcess.java:879) (HTable.java:968)atg.apache.hadoop.hbase.client.HTable.flushCommits(HTable.java:1252)at com.snapdeal.datahandler.utils.hbase.HBaseUtils.doBatchInsertionForClickStream (HBaseUtils.java:290)com.snapdeal.datahandler.hbase.queue.consumer.HBaseWorkerThread.performHBaseInsertion(HBaseWorkerThread.java:111)at com.snapdeal.datahandler.hbase.queue.consumer.HBaseWorkerThread.run(HBaseWorkerThread.java) :65)at java.util.concurrent.Executors $ RunnableAdapter.call(Executors.java:471)at java.util.concurrent.FutureTask.run(FutureTask.java:262)at java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1145)java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:615)at java.lang.Thread.run(Thread.java:744)锁定的可拥有同步器: - <0x00000005aedcf1d8>(a java.util.concurrent.ThreadPoolExecutor $ Worker)“pool-40-thread-1”prio = 10 tid = 0x00007fed94b38800 nid = 0x2ffc in Object.wait()[0x00007fedb4b57000] java.lang.Thread.State:TIMED_WAITING(在对象监视器上) )atg.apache.hadoop.hbase.client.AsyncProcess.waitForNextTaskDone(AsyncProcess.java:853)中的java.lang.Object.wait(Native Method) - 已锁定<0x00000005ad8c8c38> (java.util.concurrent.atomic.AtomicLong)org.apache.hadoop.hbase.client.AsyncProcess.waitForMaximumCurrentTasks(AsyncProcess.java:879)org.apache.hadoop.hbase.client.AsyncProcess.waitUntilDone(AsyncProcess . java:892)org.apache.hadoop.hbase.client.HTable.backgroundFlushCommits(HTable.java:968)atg.apache.hadoop.hbase.client.HTable.flushCommits(HTable.java:1252)at com.snapdeal .datahandler.utils.hbase.HBaseUtils.doBatchInsertionForClickStream(HBaseUtils.java:290)at com.snapdeal.datahandler.hbase.queue.consumer.HBaseWorkerThread.performHBaseInsertion(HBaseWorkerThread.java:111)at com.snapdeal.datahandler.hbase.queue java.util.concurrent.Executors中的.consumer.HBaseWorkerThread.run(HBaseWorkerThread.java:65)java.util.concurrent.FutureTask.run(FutureTask.java:262)中的$ RunnableAdapter.call(Executors.java:471)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:615)at j ava.lang.Thread.run(Thread.java:744)锁定了可拥有的同步器: - <0x00000005ad8c7618>(java.util.concurrent.ThreadPoolExecutor $ Worker)“pool-39-thread-1”prio = 10 tid = 0x00007fed94b36000 nid Object.wait()中的0x2ffb(0x00007fedb4c58000)java.lang.Thread.State:在org.apache.hadoop.hbase.client.AsyncProcess的java.lang.Object.wait(Native Method)上的TIMED_WAITING(在对象监视器上) . waitForNextTaskDone(AsyncProcess.java:853) - 在org.apache的org.apache.hadoop.hbase.client.AsyncProcess.waitForMaximumCurrentTasks(AsyncProcess.java:879)上锁定<0x00000005ad8cf3b8>(java.util.concurrent.atomic.AtomicLong)位于org.apache.hadoop.hbase.client.HTable的org.apache.hadoop.hbase.client.HTable.backgroundFlushCommits(HTable.java:968)的.hadoop.hbase.client.AsyncProcess.waitUntilDone(AsyncProcess.java:892) .flushCommits(HTable.java:1252)at com.snapdeal.datahandler.utils.hbase.HBaseUtils.doBatchInsertionForClickStream(HBaseUtils.java:290)at com.snapdeal.datahandler.hbase.queue.consumer.HBaseWorkerTh read.performHBaseInsertion(HBaseWorkerThread.java:111)位于java.util.concurrent.Executors的com.snapdeal.datahandler.hbase.queue.consumer.HBaseWorkerThread.run(HBaseWorkerThread.java:65)$ RunnableAdapter.call(Executors.java: 471)java.util.concurrent.FutureTask.run(FutureTask.java:262)java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor)的.java:615)