我试图从远程Amazon RedShift服务器(Postgresql数据库)下载一组非常大的数据 . 数据是关于用户访问日志的 . 由于数据非常大 . 我提取在指定时间段内访问网站的用户的ID,然后递归提取他们的日志 .

代码如下 .

static Connection getUserLogConn() throws SQLException, ClassNotFoundException {
        System.out.println("-------- PostgreSQL "
                + "JDBC Connection Testing ------------");

        Class.forName("org.postgresql.Driver");
        Connection connection = null;
        connection = DriverManager.getConnection("<address>", "<username>", "<password>");
        return connection;
    }

    static LinkedList<String> extractAllUIDsFromRemote( Connection connection ) throws SQLException, UnknownHostException {
        LinkedList<String> allUIDs = new LinkedList<String>();
        String query = "SELECT distinct uid " + 
                       fromStr +
                       " WHERE ts >= " + startTime;
        if(!endTime.equals(""))
            query += " AND ts < " + endTime;

        System.out.println("Sent SQL to RedShift: " + query);

        // ***Below statement is where the exception occurs ***
        ResultSet rs_uid = connection.createStatement().executeQuery( query ); 

        System.out.println( "Received all UIDs successfully" );

        int n = 0;
        while( rs_uid.next() ) {
            // The cursor points to a row in the result
            n++;
            String uid = rs_uid.getString( "uid" );
            allUIDs.add(uid);
        }
        System.out.println( n + " docs are retrieved." );

        return allUIDs;
    }    


    static void queryIndividualUserLog( Connection connection, LinkedList<String> uids ) throws SQLException, UnknownHostException {
        MongoDBManager db = new MongoDBManager( database, "FreqUserLog" );
        db.createIndex("uid");
        db.createIndex("url");

        StringBuffer sb = new StringBuffer();
        int i = 0;
        for( String uid : uids ) {
            sb.append( "uid='" + uid + "'" );
            // Compose SQL query every 10000 users
            if( ( i != 0 && i % 10000 == 0 ) || i == uids.size() - 1 ){
                System.out.println("Processing up to User " + i);
                String query = "SELECT * " + 
                               fromStr +
                               " WHERE " + sb.toString() +
                               " AND ts >= " + startTime;
                if(!endTime.equals(""))
                    query += " AND ts < " + endTime;

                System.out.println("Sent SQL to RedShift for retrieving individual users' logs");
                **ResultSet rs_log = connection.createStatement().executeQuery( query );** // This step takes time to wait for the response from RedShift
                System.out.println( "Received individual users' logs succesfully" );

                while( rs_log.next() ) {
                    db.insertOneLog( rs_log ); // one log = one doc, i.e. one row
                }
                System.out.println( "Have written to DB." );
                sb = new StringBuffer();
            }
            else {
                sb.append( " OR " );
            }   
            i++;
        }
        System.out.println(uids.size() + " user's log are stored into DB");
    }

    public static void main(String[] args) throws ClassNotFoundException, SQLException, UnknownHostException {

        Connection connection = getUserLogConn();
        if(connection != null) {
            System.out.println( "Connect succesfully" );


        /** Extract all users' UIDs, and store them in FreqUserIDs collection */
            LinkedList<String> allUIDs = extractAllUIDsFromRemote( connection );

        /** Query all records of freq users from RedShift, and store them in FreqUserLog collection */
            queryIndividualUserLog( connection, allUIDs );

        connection.close();
    }

但是,问题是有时会抛出异常 . 代码中“***”注释下的语句是问题发生的地方 .

org.postgresql.util.PSQLException: An I/O error occured while sending to the backend.
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:218)
at org.postgresql.jdbc2.AbstractAbstractJdbcAbstractedly5statement.excute(AbstractJdbcabstractedly5statement.java:561)
...
Caused by java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(Unknown Source)
at java.net.SocketInputStream.read(Unknown Source)
at org.postgresql.core.VisibleBufferedInputStream.readMore(VisibleBufferedInputStream.java:143)
org.postgresql.core.VisibleBufferedInputStream.ensureBytes(VisibleBufferedInputStream.java:112)
org.postgresql.core.VisibleBufferedInputStream.read(VisibleBufferedInputStream.java:194)
org.postgresql.core.PGStream.Receive(PGStream.read)

由于我无法访问远程Postgresql服务器,因此我没有数据库日志 . 我用Google搜索了这个问题 . 许多相关问题是关于“通过对等方重置连接”,而不是“连接重置” . 有人说“连接重置”意味着连接在这一侧关闭,即我的身边 . 但我不知道为什么会发生这种情况以及如何解决这个问题 . 谢谢 .

UPDATE: 我的猜测是查询过程通常需要很长时间,因为数据太大 . 所以问题一直在等待RedShift的响应 . 在这种情况下,我的程序会因超时而关闭连接 . 我不知道这是不是真的......如果是的话,有没有更好的解决方案? (我的意思是,比每次减少查询的用户数量更好 . 现在这个数字是10000) .