首页 文章

如何在Apache Ignite中保存和读取spark DataFrame / DataSet?

提问于
浏览
0

如何在Apache Ignite中保存和读取spark DataFrame / DataSet?我尝试了其他类似问题的各种解决方案,但没有任何使用最新的点火和火花版本 . (我正在使用scala 2.11)谢谢 .

更新(添加代码):

<?xml version="1.0" encoding="UTF-8"?>        
<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd">

<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
    <property name="cacheConfiguration">
        <!-- SharedRDD cache example configuration (Atomic mode). -->
        <bean class="org.apache.ignite.configuration.CacheConfiguration">
            <!-- Set a cache name. -->
            <property name="name" value="sharedRDD"/>
            <!-- Set a cache mode. -->
            <property name="cacheMode" value="PARTITIONED"/>

            <!-- Set atomicity mode. -->
            <property name="atomicityMode" value="ATOMIC"/>
            <!-- Configure a number of backups. -->
            <property name="backups" value="1"/>
        </bean>
    </property>

    <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
    <property name="discoverySpi">
        <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
            <property name="ipFinder">
                <!--
                    Ignite provides several options for automatic discovery that can be used
                    instead os static IP based discovery. For information on all options refer
                    to our documentation: http://apacheignite.readme.io/docs/cluster-config
                -->
                <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
                <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
                <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
                    <property name="addresses">
                        <list>
                            <!-- In distributed environment, replace with actual host IP address. -->
                            <value>127.0.0.1:47500..47509</value>
                        </list>
                    </property>
                </bean>
            </property>
        </bean>
    </property>
</bean>

IgniteCache代码(这会放入df并尝试通过转换为RDD来读取它):

object SparkIgniteCache {
 private val CONFIG = "config/cache.xml"

 import org.apache.ignite.IgniteCache
 import org.apache.ignite.binary.BinaryObject
 import org.apache.ignite.cache.CacheAtomicityMode
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction
 import org.apache.ignite.configuration.CacheConfiguration


private[sample] def set(sc: SparkContext, df: DataFrame, KEY: String){
val ic = new IgniteContext(sc, CONFIG, false)

// FAILED ATTEMPT OF SETTING CONFIG : 1
//    val cacheConfiguration: CacheConfiguration[String, Row] = new CacheConfiguration[String, Row](KEY)
//      .setAtomicityMode(CacheAtomicityMode.ATOMIC).setBackups(0)
//      .setAffinity(new RendezvousAffinityFunction(false, 2))
//      .setIndexedTypes(classOf[String], classOf[Row])
//
//    val rddCache = ic.ignite.getOrCreateCache(cacheConfiguration)

// FAILED ATTEMPT OF SETTING CONFIG : 2
//    val cacheConfiguration: CacheConfiguration[String, BinaryObject] = new CacheConfiguration[String, BinaryObject](KEY)
//      .setAtomicityMode(CacheAtomicityMode.ATOMIC).setBackups(0)
//      .setAffinity(new RendezvousAffinityFunction(false, 2))
//      .setIndexedTypes(classOf[String], classOf[BinaryObject])
//
//    val rddCache = ic.ignite.getOrCreateCache(cacheConfiguration)

val sharedRDD = ic.fromCache[String, Row](KEY)
sharedRDD.saveValues(df.rdd)
}

private[sample] def get(sc: SparkContext, KEY: String) = {
  val ic = new IgniteContext(sc, CONFIG, false)
    //    val cacheConfiguration: CacheConfiguration[String, Row] = new CacheConfiguration[String, Row](KEY)
    //      .setAtomicityMode(CacheAtomicityMode.ATOMIC).setBackups(0)
   //      .setAffinity(new RendezvousAffinityFunction(false, 2))
   //      .setIndexedTypes(classOf[String], classOf[Row])
   //
   //    val rddCache = ic.ignite.getOrCreateCache(cacheConfiguration)
 ic.fromCache[String, Row](KEY)
}
}

1 回答

  • 0

    我可以使用以下方式解决上述问题:

    在CacheConfiguration节点下的XML文件中添加了以下xml片段:

    <property name="indexedTypes">
         <list>
             <value>java.lang.String</value>
             <value>org.apache.spark.sql.Row</value>
         </list>
    </property>
    

    我想存储DataFrame [Row]类型的数据框,这对于ignite来说是不可能的 . 但是,我可以保存RDD [Row]并保存您必须以对格式保存它 . 所以我需要将RDD [Row]转换为RDD [(String,Row)] . 为了在CacheConfiguration中表示,我已经添加了如上所述的IndexTypes .

    您还需要保存数据帧的架构以及数据,以便以后可以将其转换回数据帧 .

    以下是保存/读取DF的代码:

    object SparkIgniteCache {
        private val CONFIG = "config/cache.xml"
        private val schemaCacheConfig = makeSchemaCacheConfig("schemas")
    
    
        private[sample] def set(sc: SparkContext, df: DataFrame, KEY: String){
            val ic = new IgniteContext(sc, CONFIG, false)
            val sharedRDD = ic.fromCache[String, Row](KEY)
    
            val rddSchemaCache = ic.ignite.getOrCreateCache(schemaCacheConfig)
    
            rddSchemaCache.put(KEY+"_schema", df.schema)
    
           sharedRDD.saveValues(df.rdd)
        }
    
        private[sample] def get(sc: SparkContext, KEY: String)
                            : (StructType, IgniteRDD[String, Row]) = 
        {
            val ic = new IgniteContext(sc, CONFIG, false)
            val rddSchemaCache = ic.ignite.getOrCreateCache(schemaCacheConfig)
           (rddSchemaCache.get(KEY+"_schema"), ic.fromCache[String, Row](KEY))    
        }
    
        private def makeSchemaCacheConfig(name: String) =
           new CacheConfiguration[String, StructType](name)
                .setAtomicityMode(CacheAtomicityMode.ATOMIC)
                .setBackups(1)
                .setAffinity(new RendezvousAffinityFunction(false, 1))
    

    }

    在上面的代码中,我还创建了动态CacheConfiguraiton来保存SchemaType类型的数据帧的Schema .

    现在,您只需要调用set和get方法,如下所示:

    // Set data/dataframe for KEY=input_data
         SparkIgniteCache.set(spark.sparkContext, df, "input_data")
    
         //Get dataframe
         val (schema, igniteRDD) = SparkIgniteCache.get(spark.sparkContext, "input_data")
         val rdd1: RDD[Row] = igniteRDD.map(_._2) //Getting Row from (String,Row)
         val df = spark.sqlContext.createDataFrame(rdd1, schema)
    

    谢谢 .

相关问题