这是我发布的第一个问题,如果我错过了一些信息和平庸的格式,请道歉 . 我可以根据需要更新 .
我会尽量添加尽可能多的细节 . 我有一个不太优化的Spark Job,它将RDBMS数据转换为Neo4j中的图形节点和关系 .
去做这个 . 以下是我遵循的步骤:
-
使用spark sql和join创建一个非规范化的数据帧'data' .
-
'data'中的foreach行运行graphInsert函数,该函数执行以下操作:
一个 . 读取行的内容
湾制定一个neo4j密码查询(我们使用Merge命令,这样我们只有一个城市,例如芝加哥在Neo4j中创建的芝加哥将出现在RDBMS表的多行中)
C . 连接到neo4j
d . 执行查询
即从neo4j断开连接
这是我面临的问题清单 .
- 插入很慢 .
我知道合并查询比创建慢,但还有另一种方法来实现这一点而不是连接和断开每条记录吗?这是我的第一个草案代码,也许我正在努力如何使用一个连接从不同的spark worker节点上的多个线程插入 . 因此,连接和断开每条记录 .
- 作业不可扩展 . 它只能运行1核心 . 一旦我用2个火花核心运行工作,我突然得到2个同名城市,即使我正在运行合并查询 . 例如有两个芝加哥城市违反了Merge的使用 . 我假设Merge的功能类似于"Create if not exist" .
我不知道我的实现是否在neo4j部分或spark中是错误的 . 如果有人能指导我帮助我以更好的规模实现这一点的任何文档,那将是有帮助的,因为我有一个大的火花集群,我需要充分利用这项工作 .
如果您有兴趣查看代码而不是算法 . 这是scala中的graphInsert实现:
class GraphInsert extends Serializable{
var case_attributes = new Array[String](4)
var city_attributes = new Array[String](2)
var location_attributes = new Array[String](20)
var incident_attributes = new Array[String](20)
val prop = new Properties()
prop.load(getClass().getResourceAsStream("/GraphInsertConnection.properties"))
// properties Neo4j
val url_neo4j = prop.getProperty("url_neo4j")
val neo4j_user = prop.getProperty("neo4j_user")
val neo4j_password = prop.getProperty("neo4j_password")
def graphInsert(data : Row){
val query = "MERGE (d:CITY {name:city_attributes(0)})\n" +"MERGE (a:CASE { " + case_attributes(0) + ":'" +data(11) + "'," +case_attributes(1) + ":'" +data(13) + "'," +case_attributes(2) + ":'" +data(14) +"'}) \n" +"MERGE (b:INCIDENT { " + incident_attributes(0) + ":" +data(0) + "," +incident_attributes(1) + ":" +data(2) + "," +incident_attributes(2) + ":'" +data(3) + "'," +incident_attributes(3) + ":'" +data(8)+ "'," +incident_attributes(4) + ":" +data(5) + "," +incident_attributes(5) + ":'" +data(4) + "'," +incident_attributes(6) + ":'" +data(6) + "'," +incident_attributes(7) + ":'" +data(1) + "'," +incident_attributes(8) + ":" +data(7)+"}) \n" +"MERGE (c:LOCATION { " + location_attributes(0) + ":" +data(9) + "," +location_attributes(1) + ":" +data(10) + "," +location_attributes(2) + ":'" +data(19) + "'," +location_attributes(3) + ":'" +data(20)+ "'," +location_attributes(4) + ":" +data(18) + "," +location_attributes(5) + ":" +data(21) + "," +location_attributes(6) + ":'" +data(17) + "'," +location_attributes(7) + ":" +data(22) + "," +location_attributes(8) + ":" +data(23)+"}) \n" +"MERGE (a) - [r1:"+relation_case_incident+"]->(b)-[r2:"+relation_incident_location+"]->(c)-[r3:belongs_to]->(d);"
println(query)
try{
var con = DriverManager.getConnection(url_neo4j, neo4j_user, neo4j_password)
var stmt = con.createStatement()
var rs = stmt.executeQuery(query)
con.close()
}catch{
case ex: SQLException =>{
println(ex.getMessage)
}
}
}
def operations(sqlContext: SQLContext){
....
#Get 'data' before this step
city_attributes = entity_metadata.filter(entity_metadata("source_name") === "tb_city").map(x =>x.getString(5)).collect()
case_attributes = entity_metadata.filter(entity_metadata("source_name") === "tb_case_number").map(x =>x.getString(5)).collect()
location_attributes = entity_metadata.filter(entity_metadata("source_name") === "tb_location").map(x =>x.getString(5)).collect()
incident_attributes= entity_metadata.filter(entity_metadata("source_name") === "tb_incident").map(x =>x.getString(5)).collect()
data.foreach(graphInsert)
}
object GraphObject {
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("GraphNeo4j")
.setMaster("xyz")
.set("spark.cores.max","2")
.set("spark.executor.memory","10g")
Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("akka").setLevel(Level.ERROR)
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val graph = new GraphInsert()
graph.operations(sqlContext)
}
}
2 回答
无论你在闭包内写什么,它都需要在Worker上执行分配 . 你可以在这里阅读更多相关信息:http://spark.apache.org/docs/latest/programming-guide.html#understanding-closures-a-nameclosureslinka
当你增加核心数量时,我认为它不会影响应用程序,因为如果你没有指定它!然后它需要贪婪的方法!我希望这份文件有所帮助 .
我完成了改进过程,但没有什么能像Cypher中的LOAD命令一样快 . 希望这有助于某人:使用
foreachPartition
而不是foreach
在进行此类过程时会获得显着的收益 . 还使用cypher添加定期提交 .