如何在Apache Ignite中保存和读取spark DataFrame / DataSet?我尝试了其他类似问题的各种解决方案,但没有任何使用最新的点火和火花版本 . (我正在使用scala 2.11)谢谢 .
更新(添加代码):
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="cacheConfiguration">
<!-- SharedRDD cache example configuration (Atomic mode). -->
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<!-- Set a cache name. -->
<property name="name" value="sharedRDD"/>
<!-- Set a cache mode. -->
<property name="cacheMode" value="PARTITIONED"/>
<!-- Set atomicity mode. -->
<property name="atomicityMode" value="ATOMIC"/>
<!-- Configure a number of backups. -->
<property name="backups" value="1"/>
</bean>
</property>
<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<!--
Ignite provides several options for automatic discovery that can be used
instead os static IP based discovery. For information on all options refer
to our documentation: http://apacheignite.readme.io/docs/cluster-config
-->
<!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
<!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
<property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>127.0.0.1:47500..47509</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
IgniteCache代码(这会放入df并尝试通过转换为RDD来读取它):
object SparkIgniteCache {
private val CONFIG = "config/cache.xml"
import org.apache.ignite.IgniteCache
import org.apache.ignite.binary.BinaryObject
import org.apache.ignite.cache.CacheAtomicityMode
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction
import org.apache.ignite.configuration.CacheConfiguration
private[sample] def set(sc: SparkContext, df: DataFrame, KEY: String){
val ic = new IgniteContext(sc, CONFIG, false)
// FAILED ATTEMPT OF SETTING CONFIG : 1
// val cacheConfiguration: CacheConfiguration[String, Row] = new CacheConfiguration[String, Row](KEY)
// .setAtomicityMode(CacheAtomicityMode.ATOMIC).setBackups(0)
// .setAffinity(new RendezvousAffinityFunction(false, 2))
// .setIndexedTypes(classOf[String], classOf[Row])
//
// val rddCache = ic.ignite.getOrCreateCache(cacheConfiguration)
// FAILED ATTEMPT OF SETTING CONFIG : 2
// val cacheConfiguration: CacheConfiguration[String, BinaryObject] = new CacheConfiguration[String, BinaryObject](KEY)
// .setAtomicityMode(CacheAtomicityMode.ATOMIC).setBackups(0)
// .setAffinity(new RendezvousAffinityFunction(false, 2))
// .setIndexedTypes(classOf[String], classOf[BinaryObject])
//
// val rddCache = ic.ignite.getOrCreateCache(cacheConfiguration)
val sharedRDD = ic.fromCache[String, Row](KEY)
sharedRDD.saveValues(df.rdd)
}
private[sample] def get(sc: SparkContext, KEY: String) = {
val ic = new IgniteContext(sc, CONFIG, false)
// val cacheConfiguration: CacheConfiguration[String, Row] = new CacheConfiguration[String, Row](KEY)
// .setAtomicityMode(CacheAtomicityMode.ATOMIC).setBackups(0)
// .setAffinity(new RendezvousAffinityFunction(false, 2))
// .setIndexedTypes(classOf[String], classOf[Row])
//
// val rddCache = ic.ignite.getOrCreateCache(cacheConfiguration)
ic.fromCache[String, Row](KEY)
}
}
1 回答
我可以使用以下方式解决上述问题:
在CacheConfiguration节点下的XML文件中添加了以下xml片段:
我想存储DataFrame [Row]类型的数据框,这对于ignite来说是不可能的 . 但是,我可以保存RDD [Row]并保存您必须以对格式保存它 . 所以我需要将RDD [Row]转换为RDD [(String,Row)] . 为了在CacheConfiguration中表示,我已经添加了如上所述的IndexTypes .
您还需要保存数据帧的架构以及数据,以便以后可以将其转换回数据帧 .
以下是保存/读取DF的代码:
}
在上面的代码中,我还创建了动态CacheConfiguraiton来保存SchemaType类型的数据帧的Schema .
现在,您只需要调用set和get方法,如下所示:
谢谢 .