Apache Ignite Spark Dataframes:客户端与服务器怀疑

我一直在努力整合点燃和火花 . 我的应用程序的目标是向/从点火写入和读取火花数据帧 . 但是,我面临着更大数据集(> 200 000 000行)的几个问题 .

我在YARN上运行了一个6节点的Ignite集群 . 它有160Gb的内存和12个内核 . 我试图在Ignite缓存(分区1备份)中使用spark(大约20Gb的原始文本数据)保存数据帧:

def main(args: Array[String]) {
    val ignite = setupIgnite

    closeAfter(ignite) { _ ⇒

      implicit val spark: SparkSession = SparkSession.builder
        .appName("Ignite Benchmark")
        .getOrCreate()

      val customer = readDF("csv", "|", Schemas.customerSchema, "hdfs://master.local:8020/apps/hive/warehouse/ssbplus100/customer")
      val part = readDF("csv", "|", Schemas.partSchema, "hdfs:// master.local:8020/apps/hive/warehouse/ssbplus100/part")
      val supplier = readDF("csv", "|", Schemas.supplierSchema, "hdfs:// master.local:8020/apps/hive/warehouse/ssbplus100/supplier")
      val dateDim = readDF("csv", "|", Schemas.dateDimSchema, "hdfs:// master.local:8020/apps/hive/warehouse/ssbplus100/date_dim")
      val lineorder = readDF("csv", "|", Schemas.lineorderSchema, "hdfs:// master.local:8020/apps/hive/warehouse/ssbplus100/lineorder")

      writeDF(customer, "customer", List("custkey"), TEMPLATES.REPLICATED)
      writeDF(part, "part", List("partkey"), TEMPLATES.REPLICATED)
      writeDF(supplier, "supplier", List("suppkey"), TEMPLATES.REPLICATED)
      writeDF(dateDim, "date_dim", List("datekey"), TEMPLATES.REPLICATED)
      writeDF(lineorder.limit(200000000), "lineorder", List("orderkey, linenumber"), TEMPLATES.NO_BACKUP)

    }
  }

在某些时候,spark应用程序检索此错误:

class org.apache.ignite.internal.mem.IgniteOutOfMemoryException: Out of memory in data region [name=default, initSize=256.0 MiB, maxSize=12.6 GiB, persistenceEnabled=false] Try the following:
  ^-- Increase maximum off-heap memory size (DataRegionConfiguration.maxSize)
  ^-- Enable Ignite persistence (DataRegionConfiguration.persistenceEnabled)
  ^-- Enable eviction or expiration policies
        at org.apache.ignite.internal.pagemem.impl.PageMemoryNoStoreImpl.allocatePage(PageMemoryNoStoreImpl.java:304)
        at org.apache.ignite.internal.processors.cache.persistence.freelist.AbstractFreeList.allocateDataPage(AbstractFreeList.java:463)
        at org.apache.ignite.internal.processors.cache.persistence.freelist.AbstractFreeList.insertDataRow(AbstractFreeList.java:501)
        at org.apache.ignite.internal.processors.cache.persistence.RowStore.addRow(RowStore.java:97)
        at org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl$CacheDataStoreImpl.createRow(IgniteCacheOffheapManagerImpl.java:1302)
        at org.apache.ignite.internal.processors.cache.GridCacheMapEntry$UpdateClosure.call(GridCacheMapEntry.java:4426)
        at org.apache.ignite.internal.processors.cache.GridCacheMapEntry$UpdateClosure.call(GridCacheMapEntry.java:4371)
        at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree$Invoke.invokeClosure(BPlusTree.java:3083)
        at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree$Invoke.access$6200(BPlusTree.java:2977)
        at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree.invokeDown(BPlusTree.java:1726)
        at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree.invokeDown(BPlusTree.java:1703)
        at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree.invokeDown(BPlusTree.java:1703)
        at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree.invoke(BPlusTree.java:1610)
        at org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl$CacheDataStoreImpl.invoke(IgniteCacheOffheapManagerImpl.java:1249)
        at org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl.invoke(IgniteCacheOffheapManagerImpl.java:352)
        at org.apache.ignite.internal.processors.cache.GridCacheMapEntry.storeValue(GridCacheMapEntry.java:3602)
        at org.apache.ignite.internal.processors.cache.GridCacheMapEntry.initialValue(GridCacheMapEntry.java:2774)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$IsolatedUpdater.receive(DataStreamerImpl.java:2125)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamerUpdateJob.call(DataStreamerUpdateJob.java:140)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.localUpdate(DataStreamProcessor.java:400)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.processRequest(DataStreamProcessor.java:305)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.access$000(DataStreamProcessor.java:60)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor$1.onMessage(DataStreamProcessor.java:90)
        at org.apache.ignite.internal.managers.communication.GridIoManager.invokeListener(GridIoManager.java:1556)
        at org.apache.ignite.internal.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:1184)
        at org.apache.ignite.internal.managers.communication.GridIoManager.access$4200(GridIoManager.java:125)
        at org.apache.ignite.internal.managers.communication.GridIoManager$9.run(GridIoManager.java:1091)
        at org.apache.ignite.internal.util.StripedExecutor$Stripe.run(StripedExecutor.java:511)
        at java.lang.Thread.run(Thread.java:748)

我认为问题在于点火服务器在火花 Session 之前启动,就像官方的点燃示例一样 . 此服务器开始缓存我写入Ignite缓存的数据并超过其默认区域大小max(12Gb,这与我为我的纱线群集定义的20GB不同) . 但是,我不明白示例和文档如何告诉我们在spark上下文(和我假设的会话)之前创建一个点火服务器 . 我知道如果没有这个,一旦所有的火花作业都被终止,应用程序就会挂起,但我不明白在spark应用程序上启动缓存数据的服务器的逻辑 . 我对这个概念非常困惑,现在我已经在spark中设置了这个点燃实例作为客户端 .

这是一种奇怪的行为,因为我的所有点火节点(在YARN上运行)都为默认区域定义了20GB(我更改了它并验证了它) . 这表明错误必须来自Spark上启动的点火服务器(我认为它是驱动程序上的一个,每个工作一个),因为我没有更改spark应用程序的ignite-config.xml中的默认区域大小(错误演示默认为12GB) . 但是,这有意义吗? Spark应该抛出这个错误是它唯一的目标是读取和写入数据吗? Spark是否参与缓存任何数据,这是否意味着我应该在我的应用程序的ignite-config.xml中设置客户端模式,尽管官方示例没有使用客户端模式?

最好的问候,卡洛斯

回答(1)

2 years ago

首先,Spark-Ignite连接器已经connects in client mode .

我将假设你有足够的内存,但你可以按照Capacity Planning指南中的例子来确定 .

但是,我认为问题是你跟踪示例应用程序有点过于紧密(!) . 该示例 - 以便自包含 - 包括服务器和Spark客户端 . 如果您已有Ignite群集,则无需在Spark客户端中启动服务器 .

这是一个从实际应用程序中略微被黑的示例(在Java中,抱歉):

try (SparkSession spark = SparkSession
        .builder()
        .appName("AppName")
        .master(sparkMaster)
        .config("spark.executor.extraClassPath", igniteClassPath())
        .getOrCreate()) {

        // Get source DataFrame
        DataSet<Row> results = ....

        results.write()
            .outputMode("append")
            .format(IgniteDataFrameSettings.FORMAT_IGNITE())
            .option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), igniteCfgFile)
            .option(IgniteDataFrameSettings.OPTION_TABLE(), "Results")
            .option(IgniteDataFrameSettings.OPTION_STREAMER_ALLOW_OVERWRITE(), true)
            .option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS(), "name")
            .option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PARAMETERS(), "backups=1")
            .write();
    }

我没有测试,但你应该明白:你需要提供一个Ignite配置文件的URL;它创建客户端以在后台连接到该服务器 .