首页 文章

foreach%dopar%RPostgreSQL

提问于
浏览
7

我正在使用RPostgreSQL连接到本地数据库 . 该设置在我的Linux机器上运行正常 . R 2.11.1,Postgres 8.4 .

我使用多核(doMC)并行后端玩'foreach'来包装一些重复查询(编号为几千)并将结果附加到数据结构中 . 奇怪的是,如果我使用%do%但是当我切换到%dopar%时会失败,只有一次迭代时会出现异常(如下所示)

我想知道它是否与单个连接对象有关,所以我创建了10个连接对象,并且取决于我是什么,为该查询给出了某个con对象,具体取决于i modulo 10.(仅在下面表示2个连接对象) . 被评估的表达式eval(expr.01),包含/是取决于'i'的查询 .

我无法理解这些特定的错误消息 . 我想知道是否有任何方法可以使这项工作 .

谢谢 .
Vishal Belsare

R片段如下:

> id.qed2.foreach <- foreach(i = 1588:1588, .inorder=FALSE) %dopar% { 
+ if (i %% 2 == 0) {con <- con0}; 
+ if (i %% 2 == 1) {con <- con1}; 
+ fetch(dbSendQuery(con,eval(expr.01)),n=-1)$idreuters};
> id.qed2.foreach
[[1]]
  [1]   411   414  2140  2406  4490  4507  4519  4570  4571  4572  4703  4731
[109] 48765 84312 91797

> id.qed2.foreach <- foreach(i = 1588:1589, .inorder=FALSE) %dopar% { 
+ if (i %% 2 == 0) {con <- con0}; 
+ if (i %% 2 == 1) {con <- con1}; 
+ fetch(dbSendQuery(con,eval(expr.01)),n=-1)$idreuters};
Error in stop(paste("expired", class(con))) : 
  no function to return from, jumping to top level
Error in stop(paste("expired", class(con))) : 
  no function to return from, jumping to top level
Error in { : 
  task 1 failed - "error in evaluating the argument 'res' in selecting a method for function 'fetch'"
>

编辑:我改变了一些东西,(仍然不成功),但有一些事情被曝光 . 在循环中创建的连接对象并未通过dbDisconnect“断开连接”,导致挂起连接,如Postgres的/ var / log所示 . 我这样做时会出现一些新的错误消息:

> system.time(
+ id.qed2.foreach <- foreach(i = 1588:1590, .inorder=FALSE, 
.packages=c("DBI", "RPostgreSQL")) %dopar% {drv0 <- dbDriver("PostgreSQL"); 
con0 <- dbConnect(drv0, dbname='nseindia');
list(idreuters=fetch(dbSendQuery(con0,eval(expr.01)),n=-1)$idreuters);
dbDisconnect(con0)})
Error in postgresqlExecStatement(conn, statement, ...) : 
  no function to return from, jumping to top level
Error in postgresqlExecStatement(conn, statement, ...) : 
  no function to return from, jumping to top level
Error in postgresqlExecStatement(conn, statement, ...) : 
  no function to return from, jumping to top level
Error in { : 
  task 1 failed - "error in evaluating the argument 'res' in selecting a method for function 'fetch'"

2 回答

  • 16

    每个worker创建一次数据库连接效率更高,而不是每个任务一次 . 不幸的是,mclapply在执行任务之前没有提供初始化工作程序的机制,因此使用doMC后端执行此操作并不容易,但如果使用doParallel后端,则可以使用clusterEvalQ初始化工作程序 . 以下是如何重构代码的示例:

    library(doParallel)
    cl <- makePSOCKcluster(detectCores())
    registerDoParallel(cl)
    
    clusterEvalQ(cl, {
      library(DBI)
      library(RPostgreSQL)
      drv <- dbDriver("PostgreSQL")
      con <- dbConnect(drv, dbname="nsdq")
      NULL
    })
    
    id.qed.foreach <- foreach(i=1588:3638, .inorder=FALSE,
                              .noexport="con",
                              .packages=c("DBI", "RPostgreSQL")) %dopar% {
      lst <- eval(expr.01)  #contains the SQL query which depends on 'i'
      qry <- dbSendQuery(con, lst)
      tmp <- fetch(qry, n=-1)
      dt <- dates.qed2[i]
      list(date=dt, idreuters=tmp$idreuters)
    }
    
    clusterEvalQ(cl, {
      dbDisconnect(con)
    })
    

    由于doParallel和clusterEvalQ使用相同的集群对象 cl ,因此foreach循环在执行任务时可以访问数据库连接对象 con .

  • 2

    以下工作和顺序形式加速约1.5倍 . 下一步,我想知道是否可以将连接对象附加到registerDoMC生成的每个worker . 如果是这样,则不需要创建/销毁连接对象,这可以防止PostgreSQL服务器被连接压倒 .

    pgparquery <- function(i) {
    drv <- dbDriver("PostgreSQL"); 
    con <- dbConnect(drv, dbname='nsdq'); 
    lst <- eval(expr.01); #contains the SQL query which depends on 'i'
    qry <- dbSendQuery(con,lst);
    tmp <- fetch(qry,n=-1);
    dt <- dates.qed2[i]
    dbDisconnect(con);
    result <- list(date=dt, idreuters=tmp$idreuters)
    return(result)}
    
    id.qed.foreach <- foreach(i = 1588:3638, .inorder=FALSE, .packages=c("DBI", "RPostgreSQL")) %dopar% {pgparquery(i)}
    

    Vishal Belsare

相关问题