首页 文章

批量插入在Cassandra中使用Apache Spark挂起并且当从Web Ser触发时不关闭上下文

提问于
浏览
0

我是新来的火花 . 我正在尝试使用spark-cassandra连接器将csv文件插入到cassandra表中,如下所示:文件在Hdfs中,我获取所有文件的路径,并且我为每个路径调用一个方法,它将csv数据转换为corressponding cassandra数据types并创建一个预准备语句将数据绑定到预准备语句并将其添加到批处理中 . 最后,我在1000时执行批处理 . 关键点1.我使用的是Apache Cassandra 2.1.8和Spark 1.5 2.我使用Spark Context 3读取了Csv文件 . 我使用的是com.datastax.spark.connector.cql . CassandraConnector用Cassandra创建一个Session .

我有9个文件,每个文件数据都会转到cassandra中的一个表中 . Every Things工作正常所有插入都按预期发生,当我在spark submit上提交jar时,工作就完成了 .

我面临的问题是当通过Web服务调用相同的Jar时(Web服务调用脚本来调用jar)其中一个文件数据没有插入,并且spark上下文不会因为哪个而停止工作永远在运行 .

当我插入4个文件或5个文件时,即使通过Web服务,一切正常 . 但是它一起悬挂,我在其中一个表中减少了10个记录,并且上下文不会停止 .

它很奇怪,因为当我提交jar上的火花直接提交一切正常,并通过网络服务我面临这个问题,它的奇怪bcz甚至网络服务提交工作相同的火花提交 .

这是我的代码

package com.pz.loadtocassandra;



 import java.io.File;
import java.io.IOException;
 import java.math.BigDecimal;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
 import java.util.List;
import java.util.Map;
import java.util.logging.ConsoleHandler;
import java.util.logging.FileHandler;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.InvalidTypeException;
import com.datastax.spark.connector.cql.CassandraConnector;
import com.datastax.spark.connector.japi.CassandraRow;
import com.pz.shared.UnicodeBOMInputStream;
import com.pz.shared.fileformat.Header;
import com.pz.shared.mr.fileformat.MRFileFormats.CSVInputFormat;
import com.pz.shared.mr.fileformat.MRFileFormats.TextArrayWritable;


 public class LoadToCassandra {

public static final String STUDYID = "STUDYID";
public static final String PROJECTNAME = "PROJECTNAME";
public static final String FILEID = "FILEID";
public static int count = 0;
public static final String FILE_SERPERATOR = "/";
public static Logger log = Logger.getLogger(LoadToCassandra.class.getName());
public static void main(String[] args) {
        String propFileLoc = args[0];
        String hdfsHome = args[1];
        String hdfs_DtdXmlPath = args[2];
        String hdfs_NormalizedDataPath = args[3];

        run(propFileLoc, hdfsHome,     hdfs_DtdXmlPath,hdfs_NormalizedDataPath);
    } catch (IOException exception) {
        log.log(Level.SEVERE, "Error occur in FileHandler.", exception);
    }
}

public static void run(String propFileLoc, String hdfsHome,
        String hdfs_DtdXmlPath, String hdfs_NormalizedDataPath) {
    JavaSparkContext ctx = null;
    FileSystem hadoopFs = null;
    try {

        PropInitialize.initailizeConfig(propFileLoc);
        //setting spark context
        ctx = setSparkContext(propFileLoc);
        ParseDtdXml.parseDTDXML(hdfsHome, hdfs_DtdXmlPath);
        Configuration configuration = setHadoopConf();
        hadoopFs = getHadoopFs(hdfsHome, configuration);
        FileStatus[] fstat = hadoopFs.listStatus(new Path(hdfs_NormalizedDataPath));
        //Getting the csv paths
        Path[] paths = FileUtil.stat2Paths(fstat);
        log.info("PATH.size - " + paths.length);
        for (Path path : paths) {
            log.info("path is : "+path.toString());
            loadToCassandra(propFileLoc, path, configuration,hdfsHome, ctx);
        }


    } catch (IOException | URISyntaxException e) {
        log.log(Level.SEVERE, "run method", e);
        e.printStackTrace();
    } finally {
        log.info("finally ");
        if (ctx!= null) {
            ctx.stop();
            System.out.println("SC Stopped");
        }
        if (hadoopFs != null) {
            try {
                hadoopFs.close();
            } catch (IOException e) {
                log.log(Level.SEVERE, "run method", e);
            }
        }
    }
}



// input : 1. String hdfs home ,
// 2. Configuration hadoop conf object
// returns : hadoop File System object
private static FileSystem getHadoopFs(String hdfsHome,
        Configuration configuration) throws IOException, URISyntaxException {
    return FileSystem.get(new URI(hdfsHome), configuration);

}

// input : no inputs
// process : sets hadoop config parameters
// output : retuns hadoop conf object
private static Configuration setHadoopConf() throws IOException,
        URISyntaxException {
    Configuration configuration = new Configuration();
    configuration.setBoolean("csvFileFormat.encoded.flag", true);
    configuration.set("csvinputformat.token.delimiter", ",");
    return configuration;

}

// input : string Properties File Location
// process : creates and sets the configurations of spark context
// retuns : JavaSparkContext object with configurations set to it.
private static JavaSparkContext setSparkContext(String propFileLoc) {
    PropInitialize.initailizeConfig(propFileLoc);
    SparkConf conf = new SparkConf();
    conf.set("spark.serializer",
            "org.apache.spark.serializer.KryoSerializer");
    conf.setAppName("Loading Data");
    conf.setMaster(PropInitialize.spark_master);
    conf.set("spark.cassandra.connection.host",
            PropInitialize.cassandra_hostname);
    conf.setJars(PropInitialize.external_jars);
    return new JavaSparkContext(conf);

}

private static void loadToCassandra(String propFileLoc, Path sourceFileHdfsPath,
        Configuration hadoopConf, String hdfsHome,JavaSparkContext ctx) {
    System.out.println("File :: " + sourceFileHdfsPath.toString());
    FileSystem hadoopFs = null;
    PropInitialize.initailizeConfig(propFileLoc);
    String cassKeyspaceName = PropInitialize.cass_keyspace_name;
    log.info("entered here for file "+sourceFileHdfsPath.toString());

    final String strInputFileName = StringUtils.split(
            sourceFileHdfsPath.getName(), "#")[0].toLowerCase();
    final String strTableNameInCass = StringUtils.split(
            sourceFileHdfsPath.getName(), "-")[0].split("#")[1]
            .toLowerCase();

    final String strSourceFilePath = sourceFileHdfsPath.toString();

    try {
        hadoopFs = getHadoopFs(hdfsHome, hadoopConf);

        //getting the cassandra connection using spark conf
        final CassandraConnector connector = getCassandraConnection(ctx);

         final JavaRDD<CassandraRow> cassTableObj=getCassTableObj(ctx,cassKeyspaceName,strTableNameInCass);

        final Map<String, List<String>> tabColMapWithColTypes1 = ParseDtdXml.tabColMapWithColTypes;

        final String headersUpdated;
        final String headers;

        UnicodeBOMInputStream ubis = new UnicodeBOMInputStream(
                hadoopFs.open(sourceFileHdfsPath));
        Header CsvHeader = Header.getCSVHeader(ubis, ",");
        if (!strTableNameInCass.equalsIgnoreCase("PCMASTER")) {

            String fString = "";
            for (int i = 0; i < CsvHeader.size() - 1; i++) {
                fString = fString + CsvHeader.get(i).ColumnName + ",";
            }
            fString = fString
                    + CsvHeader.get(CsvHeader.size() - 1).ColumnName;

            headers = fString; // StringUtils.join(stringArr.toString(),",");

            headersUpdated = strTableNameInCass.toUpperCase() + "ID,"
                    + headers;

        } else {

            String[] stringArr = new String[CsvHeader.size()];
            String fString = "";
            for (int i = 0; i < CsvHeader.size() - 1; i++) {
                // stringArr[i] = CsvHeader.get(i).ColumnName;
                fString = fString + CsvHeader.get(i).ColumnName + ",";
            }
            fString = fString
                    + CsvHeader.get(CsvHeader.size() - 1).ColumnName;
            headers = StringUtils.join(stringArr.toString(), ",");
            headersUpdated = fString;

        }

        ubis.close();


        //Reading the file using spark context
        JavaPairRDD<LongWritable, TextArrayWritable> fileRdd = ctx
                .newAPIHadoopFile(strSourceFilePath, CSVInputFormat.class,
                        LongWritable.class, TextArrayWritable.class,
                        hadoopConf);


        final long recCount = fileRdd.count();



        final String[] actCols = headersUpdated.split(",");

        final LinkedHashMap<Object, String> mapOfColNameAndType = new LinkedHashMap<Object, String>();
        final List<String> colNameAndType = tabColMapWithColTypes1
                .get(strTableNameInCass.toUpperCase());

        for (int i = 0; i < actCols.length; i++) {

            if (colNameAndType.contains(actCols[i] + " " + "text")) {
                int indexOfColName = colNameAndType.indexOf(actCols[i]
                        + " " + "text");

                mapOfColNameAndType.put(i,
                        colNameAndType.get(indexOfColName).split(" ")[1]);

            } else if (colNameAndType
                    .contains(actCols[i] + " " + "decimal")) {
                int indexOfColName = colNameAndType.indexOf(actCols[i]
                        + " " + "decimal");
                mapOfColNameAndType.put(i,
                        colNameAndType.get(indexOfColName).split(" ")[1]);
            } else {
                continue;
            }

        }

        //creates the query for prepared statement
        final String makeStatement = makeSt(cassKeyspaceName,
                strTableNameInCass, actCols);
        final long seqId1 = cassTableObj.count();


        //calling map on the fileRdd 
        JavaRDD<String> data = fileRdd.values().map(
                new Function<TextArrayWritable, String>() {
                    /**
                     * 
                     */
                    private static final long serialVersionUID = 1L;
                    Session session;
                    boolean isssession = false;
                    PreparedStatement statement;
                    BatchStatement batch;
                    int lineCount = 0;

                    long seqId = seqId1;

                    /*for each line returned as an TextArrayWritable convert each cell the corresponding
                     * bind the data to prepared statement
                     * add it to batch
                     */
                    @Override
                    public String call(TextArrayWritable tup)
                            throws Exception {
                        seqId++;
                        lineCount++;

                        log.info("entered here 3 for file "+strSourceFilePath.toString());
                        String[] part = tup.toStrings();


                        Object[] parts = getDataWithUniqueId(
                                strTableNameInCass, part);


                        //For each file
                        //Creates the session
                        //creates the PreparedStatement
                        if (!isssession) {
                            session = connector.openSession();
                            statement = session.prepare(makeStatement);
                            log.info("entered here 4 for file "+strSourceFilePath.toString());
                            // System.out.println("statement :" +
                            // statement);
                            isssession = true;
                            batch = new BatchStatement();
                        }

                        List<Object> typeConvData = new ArrayList<Object>();

                        for (int i = 0; i < parts.length; i++) {
                            String type = mapOfColNameAndType.get(i);
                            try {
                                if (type.equalsIgnoreCase("text")) {

                                    typeConvData.add(parts[i]);
                                } else {

                                    // parts[i] =
                                    // parts[i].toString().replace("\"",
                                    // "");
                                    // check if the String the has to
                                    // converted to a BigDecimal is any
                                    // positive or negetive integer or not.
                                    // if its not a positive integer or
                                    // negative forcefully convert it to
                                    // zero (avoiding NumberFormatException)
                                    if (!((String) parts[i])
                                            .matches("-?\\d+")) {
                                        parts[i] = "0";
                                    }
                                    long s = Long
                                            .valueOf((String) parts[i]);
                                    typeConvData.add(BigDecimal.valueOf(s));

                                }
                            } catch (NullPointerException e) {
                                log.log(Level.SEVERE, "loadToCass method", e);


                            } catch (NumberFormatException e) {
                                log.log(Level.SEVERE, "loadToCass method", e);
                            } catch (InvalidTypeException e) {
                                log.log(Level.SEVERE, "loadToCass method", e);
                            }
                        }

                                                    List<Object> data = typeConvData;

                        //bind data to query
                        final BoundStatement query = statement.bind(data
                                .toArray(new Object[data.size()]));

                        //add query to batch
                        batch.add(query);
                        int count = LoadToCassandra.count;

                        //when count is 1k execute batch
                        if (count == 1000) {

                            log.info("entered here 5 for file "+strSourceFilePath.toString());
                            log.info("batch done");
                            session.execute(batch);
                            LoadToCassandra.count = 0;
                            batch = new BatchStatement();
                            return StringUtils.join(tup.toStrings());
                        }

                        //if its the last batch and its not of size 1k
                        if (lineCount == (recCount))
                            {
                            log.info("Last Batch");
                            session.executeAsync(batch);
                            log.info("entered here 6 for file "+strSourceFilePath.toString());
                            //session.execute(batch);
                            session.close();
                            log.info("Session closed");
                        }

                        LoadToCassandra.count++;
                        return StringUtils.join(tup.toStrings());
                    }

                    private Object[] getDataWithUniqueId(
                            String strTableNameInCass, String[] part) {
                        Object[] parts = null;
                        ArrayList<String> tempArraylist = new ArrayList<String>();
                        if (!strTableNameInCass
                                .equalsIgnoreCase("PCMASTER")) {
                            for (int i = 0; i < part.length; i++) {
                                if (i == 0) {
                                    tempArraylist.add(0,
                                            String.valueOf(seqId));
                                }
                                tempArraylist.add(part[i]);
                            }
                            parts = tempArraylist.toArray();
                        } else {
                            parts = part;
                        }

                        return parts;
                    }

                });

        data.count();
        hadoopFs.close();

    } catch (Exception e) {
        e.printStackTrace();
    }
}

private static JavaRDD<CassandraRow> getCassTableObj(
        JavaSparkContext ctx, String cassKeyspaceName,
        String strTableNameInCass) {
    return javaFunctions(ctx)
            .cassandraTable(cassKeyspaceName,
                    strTableNameInCass.toLowerCase());
}

private static CassandraConnector getCassandraConnection(
        JavaSparkContext ctx) {
    return CassandraConnector.apply(ctx.getConf());

}

private static String makeSt(String keyspace, String tabName,
        String[] colNames) {
    StringBuilder sb = new StringBuilder();
    sb.append("INSERT INTO " + keyspace + "." + tabName + " ( ");
    List<String> vars = new ArrayList<>();
    for (int i = 0; i < (colNames.length - 1); i++) {
        sb.append(colNames[i] + ",");
        vars.add("?");
    }
    vars.add("?");
    sb.append(colNames[colNames.length - 1] + " ) values ( "
            + StringUtils.join(vars, ",") + " ) ");

    return sb.toString();
   }}

任何人都可以告诉我导致这个问题的原因是什么,以及如何解决它 . 谢谢

1 回答

  • 0

    将数据插入cassandra后,调用 ctx.stop() 方法,它将停止spark上下文 .

相关问题