首页 文章

使用大熊猫的“大数据”工作流程

提问于
浏览
779

在学习大熊猫的过程中,我试图解决这个问题的答案已有好几个月了 . 我使用SAS进行日常工作,这非常适合它的核心支持 . 然而,由于其他许多原因,SAS作为一款软件非常糟糕 .

有一天,我希望用python和pandas替换我对SAS的使用,但我目前缺乏大型数据集的核心工作流程 . 我不是在谈论需要分布式网络的“大数据”,而是文件太大而无法容纳在内存中,但又足够小以适应硬盘驱动器 .

我的第一个想法是使用 HDFStore 在磁盘上保存大型数据集,并将我需要的部分拉到数据帧中进行分析 . 其他人提到MongoDB是一种更容易使用的替代品 . 我的问题是:

有哪些最佳实践工作流程可用于完成以下任务:

  • 将平面文件加载到永久磁盘数据库结构中

  • 查询该数据库以检索要提供给pandas数据结构的数据

  • 在操作pandas中的片段后更新数据库

真实世界的例子将非常受欢迎,尤其是那些在“大数据”上使用熊猫的人 .

编辑 - 我希望如何工作的示例:

  • 迭代导入大型平面文件并将其存储在永久的磁盘数据库结构中 . 这些文件通常太大而无法放入内存中 .

  • 为了使用Pandas,我想读取这些数据的子集(通常一次只有几列),它们可以适合内存 .

  • 我会通过对所选列执行各种操作来创建新列 .

  • 然后我必须将这些新列附加到数据库结构中 .

我正在努力寻找执行这些步骤的最佳实践方法 . 阅读关于pandas和pytables的链接似乎附加一个新列可能是个问题 .

编辑 - 特别回应杰夫的问题:

  • 我正在 Build 消费者信用风险模型 . 数据种类包括电话,SSN和地址特征; property Value ;犯罪记录,破产等贬损信息......我每天使用的数据集平均有近1,000到2,000个字段的混合数据类型:数字和字符数据的连续,名义和序数变量 . 我很少追加行,但我会执行许多创建新列的操作 .

  • 典型操作涉及使用条件逻辑将多个列组合到一个新的复合列中 . 例如, if var1 > 2 then newvar = 'A' elif var2 = 4 then newvar = 'B' . 这些操作的结果是我的数据集中每条记录的新列 .

  • 最后,我想将这些新列附加到磁盘上的数据结构中 . 我将重复步骤2,使用交叉表和描述性统计数据探索数据,试图找到有趣,直观的模型关系 .

  • 典型的项目文件通常约为1GB . 文件被组织成一种行,其中一行包括消费者数据的记录 . 每行对每条记录都有相同的列数 . 情况总是如此 .

  • 对于我来说,在创建报告或生成描述性统计信息时对行进行子集是非常常见的 . 例如,我可能想为特定的业务线创建一个简单的频率,比如零售信用卡 . 要做到这一点,除了我要报告的任何列之外,我只会选择那些业务线=零售的记录 . 但是,在创建新列时,我会提取所有数据行,只提取操作所需的列 .

  • 建模过程要求我分析每一列,寻找与某些结果变量的有趣关系,并创建描述这些关系的新化合物列 . 我探索的列通常以小集合完成 . 例如,我将专注于一组20个列,只处理属性值并观察它们与贷款违约的关系 . 一旦探索了这些并创建了新的专栏,我就转到另一组专栏,比如大学教育,然后重复这个过程 . 我正在做的是创建候选变量来解释我的数据和某些结果之间的关系 . 在这个过程的最后,我应用了一些学习技术,从这些复合列中创建一个方程式 .

我很少会在数据集中添加行 . 我几乎总是会创建新的列(统计/机器学习用语中的变量或特征) .

13 回答

  • 7

    值得一提的是Ray
    它以分布式方式为pandas实现了自己的实现 .

    只需替换pandas导入,代码应该按原样运行:

    # import pandas as pd
    import ray.dataframe as pd
    
    #use pd as usual
    

    可以在这里阅读更多细节:

    https://rise.cs.berkeley.edu/blog/pandas-on-ray/

  • 4

    我通常以这种方式使用数十亿字节的数据,例如我在磁盘上有表,我通过查询读取,创建数据并追加 .

    有关如何存储数据的一些建议值得阅读the docslate in this thread .

    详细信息将影响您存储数据的方式,例如:
    尽可能多地提供细节;我可以帮你 Build 一个结构 .

    • 数据大小,行数,列数,列类型;你是追加行还是只是列?

    • 典型的操作是什么样的 . 例如 . 对列进行查询以选择一堆行和特定列,然后执行操作(内存中),创建新列,保存这些列 .
      (给出一个玩具示例可以让我们提供更具体的建议 . )

    • 在那个处理之后,那你做什么?第2步是临时的,还是可重复的?

    • 输入平面文件:Gb中有多少粗略的总大小 . 这些如何组织,例如通过记录?每个字段是否包含不同的字段,或者每个文件是否包含一些记录与每个文件中的所有字段?

    • 您是否曾根据标准选择行(记录)的子集(例如,选择字段A> 5的行)?然后做一些事情,或者你只是选择包含所有记录的字段A,B,C(然后做一些事情)?

    • 您是'work on'所有列(在组中),还是有一个很好的比例,您只能用于报告(例如,您希望保留数据,但不需要明确列出该列,直到最终结果时间)?

    解决方案

    确保已安装pandas at least 0.10.1 .

    阅读iterating files chunk-by-chunkmultiple table queries .

    由于pytables被优化为按行进行操作(这是您查询的内容),因此我们将为每组字段创建一个表 . 通过这种方式,这样做更有效......我想我将来可能能够解决这个限制......无论如何这都更加直观了:
    (以下是伪代码 . )

    import numpy as np
    import pandas as pd
    
    # create a store
    store = pd.HDFStore('mystore.h5')
    
    # this is the key to your storage:
    #    this maps your fields to a specific group, and defines 
    #    what you want to have as data_columns.
    #    you might want to create a nice class wrapping this
    #    (as you will want to have this map and its inversion)  
    group_map = dict(
        A = dict(fields = ['field_1','field_2',.....], dc = ['field_1',....,'field_5']),
        B = dict(fields = ['field_10',......        ], dc = ['field_10']),
        .....
        REPORTING_ONLY = dict(fields = ['field_1000','field_1001',...], dc = []),
    
    )
    
    group_map_inverted = dict()
    for g, v in group_map.items():
        group_map_inverted.update(dict([ (f,g) for f in v['fields'] ]))
    

    读取文件并创建存储(基本上执行 append_to_multiple 的操作):

    for f in files:
       # read in the file, additional options hmay be necessary here
       # the chunksize is not strictly necessary, you may be able to slurp each 
       # file into memory in which case just eliminate this part of the loop 
       # (you can also change chunksize if necessary)
       for chunk in pd.read_table(f, chunksize=50000):
           # we are going to append to each table by group
           # we are not going to create indexes at this time
           # but we *ARE* going to create (some) data_columns
    
           # figure out the field groupings
           for g, v in group_map.items():
                 # create the frame for this group
                 frame = chunk.reindex(columns = v['fields'], copy = False)    
    
                 # append it
                 store.append(g, frame, index=False, data_columns = v['dc'])
    

    现在你已经拥有了文件中的所有表格(实际上你可以将它们存储在单独的文件中,如果你愿意,你可能需要将文件名添加到group_map中,但可能这不是必需的) .

    这是您获取列并创建新列的方法:

    frame = store.select(group_that_I_want)
    # you can optionally specify:
    # columns = a list of the columns IN THAT GROUP (if you wanted to
    #     select only say 3 out of the 20 columns in this sub-table)
    # and a where clause if you want a subset of the rows
    
    # do calculations on this frame
    new_frame = cool_function_on_frame(frame)
    
    # to 'add columns', create a new group (you probably want to
    # limit the columns in this new_group to be only NEW ones
    # (e.g. so you don't overlap from the other tables)
    # add this info to the group_map
    store.append(new_group, new_frame.reindex(columns = new_columns_created, copy = False), data_columns = new_columns_created)
    

    当您准备进行post_processing时:

    # This may be a bit tricky; and depends what you are actually doing.
    # I may need to modify this function to be a bit more general:
    report_data = store.select_as_multiple([groups_1,groups_2,.....], where =['field_1>0', 'field_1000=foo'], selector = group_1)
    

    关于data_columns,您实际上不需要定义 ANY data_columns;它们允许您根据列子选择行 . 例如 . 就像是:

    store.select(group, where = ['field_1000=foo', 'field_1001>0'])
    

    在最终报告生成阶段,它们可能对您最感兴趣(实质上,数据列与其他列隔离,如果您定义了很多,这可能会影响效率) .

    您可能还想:

    • 创建一个函数,该函数获取字段列表,在groups_map中查找组,然后选择这些并连接结果,以便得到结果帧(这实际上是select_as_multiple所做的) . 这样结构对你来说非常透明 .
      某些数据列上的

    • 索引(使行子集更快) .

    • 启用压缩 .

    当你有疑问时,请告诉我!

  • 56

    如果你去创建一个分解成多个较小文件的数据管道的简单路径,请考虑Ruffus .

  • 44

    我认为上面的答案缺少一个我发现非常有用的简单方法 .

    当我的文件太大而无法加载到内存中时,我会将文件分解为多个较小的文件(按行或列)

    示例:如果30天大小的交易数据为30天,我将其分成每天约1GB大小的文件 . 我随后处理每个文件最后分开并汇总结果

    其中一个最大的优点是它允许并行处理文件(多个线程或进程)

    另一个优点是文件操作(如在示例中添加/删除日期)可以通过常规shell命令来完成,这在更高级/复杂的文件格式中是不可能的

    这种方法并不涵盖所有场景,但在很多场景中非常有用

  • 104

    我发现有助于 large data 用例的一个技巧是通过将浮点精度降低到32位来减少数据量 . 它并不适用于所有情况,但在许多应用中,64位精度是过度的,节省2倍的内存是值得的 . 更明显地说明一点:

    >>> df = pd.DataFrame(np.random.randn(int(1e8), 5))
    >>> df.info()
    <class 'pandas.core.frame.DataFrame'>
    RangeIndex: 100000000 entries, 0 to 99999999
    Data columns (total 5 columns):
    ...
    dtypes: float64(5)
    memory usage: 3.7 GB
    
    >>> df.astype(np.float32).info()
    <class 'pandas.core.frame.DataFrame'>
    RangeIndex: 100000000 entries, 0 to 99999999
    Data columns (total 5 columns):
    ...
    dtypes: float32(5)
    memory usage: 1.9 GB
    
  • 53

    这是pymongo的情况 . 我还在python中使用sql server,sqlite,HDF,ORM(SQLAlchemy)进行原型设计 . 首先,pymongo是一个基于文档的数据库,因此每个人都是一个文档( dict 属性) . 许多人组成一个集合,你可以有很多集合(人,股票市场,收入) .

    pd.dateframe - > pymongo注意:我使用 chunksize 中的 chunksize 来保持5到10k记录(如果更大,pymongo会丢弃套接字)

    aCollection.insert((a[1].to_dict() for a in df.iterrows()))
    

    查询:gt =大于...

    pd.DataFrame(list(mongoCollection.find({'anAttribute':{'$gt':2887000, '$lt':2889000}})))
    

    .find() 返回一个迭代器,所以我通常使用 ichunked 来切割成较小的迭代器 .

    加入怎么样,因为我通常会将10个数据源粘贴在一起:

    aJoinDF = pandas.DataFrame(list(mongoCollection.find({'anAttribute':{'$in':Att_Keys}})))
    

    然后(在我的情况下,有时候我必须首先在 aJoinDF 之前对它进行攻击 . )

    df = pandas.merge(df, aJoinDF, on=aKey, how='left')
    

    然后,您可以通过下面的更新方法将新信息写入主集合 . (逻辑集合与物理数据源) .

    collection.update({primarykey:foo},{key:change})
    

    在较小的查找中,只是非规范化 . 例如,您在文档中有代码,只需添加字段代码文本,并在创建文档时执行 dict 查找 .

    现在你有一个基于一个人的好数据集,你可以在每个案例中释放你的逻辑并创建更多属性 . 最后,你可以读入你的3到内存最大关键指标的熊猫,并进行枢轴/聚合/数据探索 . 这对我有300万条记录的数字/大文/类别/代码/花车/ ...

    您还可以使用MongoDB中内置的两种方法(MapReduce和聚合框架) . See here for more info about the aggregate framework,因为它似乎比MapReduce更容易,并且看起来很方便快速聚合工作 . 注意我不需要定义我的字段或关系,我可以添加项目到文档 . 在快速变化的numpy,pandas,python工具集的当前状态下,MongoDB帮助我开始工作:)

  • 6

    如果您的数据集介于1到20GB之间,那么您应该得到一个具有48GB RAM的工作站 . 然后Pandas可以将整个数据集保存在RAM中 . 我知道它不是你在这里寻找的答案,但在具有4GB RAM的笔记本电脑上进行科学计算是不合理的 .

  • 48

    我最近遇到了类似的问题 . 我发现只是以块的形式读取数据并附加它,因为我将它写成块,同样的csv效果很好 . 我的问题是根据另一个表中的信息添加日期列,使用某些列的值,如下所示 . 这可能会帮助那些被dask和hdf5困惑但更熟悉像我这样的熊猫的人 .

    def addDateColumn():
    """Adds time to the daily rainfall data. Reads the csv as chunks of 100k 
       rows at a time and outputs them, appending as needed, to a single csv. 
       Uses the column of the raster names to get the date.
    """
        df = pd.read_csv(pathlist[1]+"CHIRPS_tanz.csv", iterator=True, 
                         chunksize=100000) #read csv file as 100k chunks
    
        '''Do some stuff'''
    
        count = 1 #for indexing item in time list 
        for chunk in df: #for each 100k rows
            newtime = [] #empty list to append repeating times for different rows
            toiterate = chunk[chunk.columns[2]] #ID of raster nums to base time
            while count <= toiterate.max():
                for i in toiterate: 
                    if i ==count:
                        newtime.append(newyears[count])
                count+=1
            print "Finished", str(chunknum), "chunks"
            chunk["time"] = newtime #create new column in dataframe based on time
            outname = "CHIRPS_tanz_time2.csv"
            #append each output to same csv, using no header
            chunk.to_csv(pathlist[2]+outname, mode='a', header=None, index=None)
    
  • 36

    我知道这是一个旧线程,但我认为Blaze库值得一试 . 它是为这些类型的情况而构建的 .

    From the docs:

    Blaze将NumPy和Pandas的可用性扩展到分布式和核心外计算 . Blaze提供类似于NumPy ND-Array或Pandas DataFrame的界面,但将这些熟悉的界面映射到各种其他计算引擎,如Postgres或Spark .

    Edit: 顺便提一下,它由ContinuumIO和NumPy的作者Travis Oliphant提供支持 .

  • 12

    我发现这有点晚了,但我处理类似的问题(按揭预付款模式) . 我的解决方案是跳过pandas HDFStore层并使用直接的pytables . 我在最终文件中将每列保存为单独的HDF5数组 .

    我的基本工作流程是首先从数据库中获取CSV文件 . 我gzip它,所以它不是那么大 . 然后我将其转换为面向行的HDF5文件,通过在python中迭代它,将每一行转换为实际数据类型,并将其写入HDF5文件 . 这需要几十分钟,但它不使用任何内存,因为它只是逐行操作 . 然后我将面向行的HDF5文件“转置”为面向列的HDF5文件 .

    表转置看起来像:

    def transpose_table(h_in, table_path, h_out, group_name="data", group_path="/"):
        # Get a reference to the input data.
        tb = h_in.getNode(table_path)
        # Create the output group to hold the columns.
        grp = h_out.createGroup(group_path, group_name, filters=tables.Filters(complevel=1))
        for col_name in tb.colnames:
            logger.debug("Processing %s", col_name)
            # Get the data.
            col_data = tb.col(col_name)
            # Create the output array.
            arr = h_out.createCArray(grp,
                                     col_name,
                                     tables.Atom.from_dtype(col_data.dtype),
                                     col_data.shape)
            # Store the data.
            arr[:] = col_data
        h_out.flush()
    

    读回来然后看起来像:

    def read_hdf5(hdf5_path, group_path="/data", columns=None):
        """Read a transposed data set from a HDF5 file."""
        if isinstance(hdf5_path, tables.file.File):
            hf = hdf5_path
        else:
            hf = tables.openFile(hdf5_path)
    
        grp = hf.getNode(group_path)
        if columns is None:
            data = [(child.name, child[:]) for child in grp]
        else:
            data = [(child.name, child[:]) for child in grp if child.name in columns]
    
        # Convert any float32 columns to float64 for processing.
        for i in range(len(data)):
            name, vec = data[i]
            if vec.dtype == np.float32:
                data[i] = (name, vec.astype(np.float64))
    
        if not isinstance(hdf5_path, tables.file.File):
            hf.close()
        return pd.DataFrame.from_items(data)
    

    现在,我通常在拥有大量内存的机器上运行它,所以我可能对内存使用情况不够谨慎 . 例如,默认情况下,加载操作会读取整个数据集 .

    这通常适合我,但它有点笨重,我不能使用花哨的pytables魔术 .

    编辑:这种方法的真正优势,在记录数组pytables默认情况下,我可以使用h5r将数据加载到R中,而h5r无法处理表 . 或者,至少,我无法让它加载异构表 .

  • 497

    正如其他人所指出的那样,经过几年的出现,已经出现了相同的大熊猫:dask . 虽然dask不是大熊猫及其所有功能的直接替代品,但它有以下几个原因:

    Dask是一个灵活的分析计算并行计算库,针对“大数据”集合的交互式计算工作负载的动态任务调度进行了优化,如并行数组,数据帧和列表,可将常用接口(如NumPy,Pandas或Python迭代器)扩展为更大 - 超过内存或分布式环境,并从笔记本电脑扩展到集群 .

    Dask强调以下优点:熟悉:提供并行化的NumPy数组和Pandas DataFrame对象灵活:为更多自定义工作负载和与其他项目的集成提供任务调度接口 . Native:通过访问PyData堆栈,在Pure Python中启用分布式计算 . 快速:低开销,低延迟,快速数值算法所需的最小序列化操作扩展:在具有1000个内核的集群上弹性运行缩小:在单个进程中在笔记本电脑上设置和运行的简单响应:设计为交互式考虑到它提供快速反馈和诊断以帮助人类

    并添加一个简单的代码示例:

    import dask.dataframe as dd
    df = dd.read_csv('2015-*-*.csv')
    df.groupby(df.user_id).value.mean().compute()
    

    替换一些像这样的pandas代码:

    import pandas as pd
    df = pd.read_csv('2015-01-01.csv')
    df.groupby(df.user_id).value.mean()
    

    并且,特别值得注意的是,通过concurrent.futures接口提供了一个用于提交自定义任务的通用:

    from dask.distributed import Client
    client = Client('scheduler:port')
    
    futures = []
    for fn in filenames:
        future = client.submit(load, fn)
        futures.append(future)
    
    summary = client.submit(summarize, futures)
    summary.result()
    
  • 18

    还有一个变种

    在pandas中完成的许多操作也可以作为db完成查询(sql,mongo)

    使用RDBMS或mongodb可以在数据库查询中执行一些聚合(针对大数据进行优化,并有效地使用缓存和索引)

    之后,您可以使用pandas执行后期处理 .

    这种方法的优点是你可以获得数据库优化来处理大数据,同时仍然用高级声明性语法定义逻辑 - 而不必处理决定在内存中做什么和做什么的细节核心 .

    虽然查询语言和pandas不同,但将部分逻辑从一个转换为另一个通常并不复杂 .

  • 10

    现在,在问题发生两年后,相当于'out-of-core'的熊猫:dask . 太棒了!虽然它不支持所有的熊猫功能,但你可以使用它 .

相关问题