首页 文章

用于图形数据库(Neo4j)插入的Spark UDF优化

提问于
浏览
0

这是我发布的第一个问题,如果我错过了一些信息和平庸的格式,请道歉 . 我可以根据需要更新 .

我会尽量添加尽可能多的细节 . 我有一个不太优化的Spark Job,它将RDBMS数据转换为Neo4j中的图形节点和关系 .

去做这个 . 以下是我遵循的步骤:

  • 使用spark sql和join创建一个非规范化的数据帧'data' .

  • 'data'中的foreach行运行graphInsert函数,该函数执行以下操作:

一个 . 读取行的内容
湾制定一个neo4j密码查询(我们使用Merge命令,这样我们只有一个城市,例如芝加哥在Neo4j中创建的芝加哥将出现在RDBMS表的多行中)
C . 连接到neo4j
d . 执行查询
即从neo4j断开连接

这是我面临的问题清单 .

  • 插入很慢 .

我知道合并查询比创建慢,但还有另一种方法来实现这一点而不是连接和断开每条记录吗?这是我的第一个草案代码,也许我正在努力如何使用一个连接从不同的spark worker节点上的多个线程插入 . 因此,连接和断开每条记录 .

  • 作业不可扩展 . 它只能运行1核心 . 一旦我用2个火花核心运行工作,我突然得到2个同名城市,即使我正在运行合并查询 . 例如有两个芝加哥城市违反了Merge的使用 . 我假设Merge的功能类似于"Create if not exist" .

我不知道我的实现是否在neo4j部分或spark中是错误的 . 如果有人能指导我帮助我以更好的规模实现这一点的任何文档,那将是有帮助的,因为我有一个大的火花集群,我需要充分利用这项工作 .

如果您有兴趣查看代码而不是算法 . 这是scala中的graphInsert实现:

class GraphInsert extends Serializable{
   var case_attributes = new Array[String](4)
   var city_attributes = new Array[String](2)
   var location_attributes = new Array[String](20)
   var incident_attributes = new Array[String](20)
   val prop = new Properties()
   prop.load(getClass().getResourceAsStream("/GraphInsertConnection.properties"))
   // properties Neo4j
   val url_neo4j = prop.getProperty("url_neo4j")
   val neo4j_user = prop.getProperty("neo4j_user")
   val neo4j_password = prop.getProperty("neo4j_password")


   def graphInsert(data : Row){  
      val query = "MERGE (d:CITY {name:city_attributes(0)})\n" +"MERGE (a:CASE { " + case_attributes(0)  + ":'" +data(11) + "'," +case_attributes(1)  + ":'" +data(13)  + "'," +case_attributes(2)  + ":'" +data(14) +"'}) \n" +"MERGE (b:INCIDENT { " + incident_attributes(0)  + ":" +data(0) + "," +incident_attributes(1)  + ":" +data(2)  + "," +incident_attributes(2)  + ":'" +data(3) +  "'," +incident_attributes(3)  + ":'" +data(8)+  "'," +incident_attributes(4)  + ":" +data(5) +  "," +incident_attributes(5)  + ":'" +data(4) +  "'," +incident_attributes(6)  + ":'" +data(6) +  "'," +incident_attributes(7)  + ":'" +data(1) +  "'," +incident_attributes(8)  + ":" +data(7)+"}) \n" +"MERGE (c:LOCATION { " + location_attributes(0)  + ":" +data(9) + "," +location_attributes(1)  + ":" +data(10)  + "," +location_attributes(2)  + ":'" +data(19) +  "'," +location_attributes(3)  + ":'" +data(20)+  "'," +location_attributes(4)  + ":" +data(18) +  "," +location_attributes(5)  + ":" +data(21) +  "," +location_attributes(6)  + ":'" +data(17) +  "'," +location_attributes(7)  + ":" +data(22) +  "," +location_attributes(8)  + ":" +data(23)+"}) \n" +"MERGE (a) - [r1:"+relation_case_incident+"]->(b)-[r2:"+relation_incident_location+"]->(c)-[r3:belongs_to]->(d);"
              println(query)
              try{
                      var con = DriverManager.getConnection(url_neo4j, neo4j_user, neo4j_password)
                          var stmt = con.createStatement()
                          var rs = stmt.executeQuery(query)
                          con.close()
              }catch{
              case ex: SQLException =>{
                  println(ex.getMessage)
              }
              }
  } 

def operations(sqlContext: SQLContext){
    ....
    #Get 'data' before this step
    city_attributes = entity_metadata.filter(entity_metadata("source_name") === "tb_city").map(x =>x.getString(5)).collect()
    case_attributes = entity_metadata.filter(entity_metadata("source_name") === "tb_case_number").map(x =>x.getString(5)).collect()
    location_attributes = entity_metadata.filter(entity_metadata("source_name") === "tb_location").map(x =>x.getString(5)).collect()
    incident_attributes= entity_metadata.filter(entity_metadata("source_name") === "tb_incident").map(x =>x.getString(5)).collect()

    data.foreach(graphInsert)

}

object GraphObject {
  def main(args: Array[String]) {  
      val conf = new SparkConf()
        .setAppName("GraphNeo4j")
        .setMaster("xyz")
        .set("spark.cores.max","2")
        .set("spark.executor.memory","10g")

      Logger.getLogger("org").setLevel(Level.ERROR)
      Logger.getLogger("akka").setLevel(Level.ERROR)
      val sc = new SparkContext(conf)
      val sqlContext = new SQLContext(sc)
      val graph = new GraphInsert()
      graph.operations(sqlContext)

  }
}

2 回答

  • 0

    无论你在闭包内写什么,它都需要在Worker上执行分配 . 你可以在这里阅读更多相关信息:http://spark.apache.org/docs/latest/programming-guide.html#understanding-closures-a-nameclosureslinka

    当你增加核心数量时,我认为它不会影响应用程序,因为如果你没有指定它!然后它需要贪婪的方法!我希望这份文件有所帮助 .

  • 0

    我完成了改进过程,但没有什么能像Cypher中的LOAD命令一样快 . 希望这有助于某人:使用 foreachPartition 而不是 foreach 在进行此类过程时会获得显着的收益 . 还使用cypher添加定期提交 .

相关问题