我正在尝试dask进行单节点核外计算的POC . 我的完整脚本如下 .

我在AWS 2 vCPU上的(Anaconda)Python 3.6.5下运行dask 0.18.1,在Ubuntu v16 - 64bit上运行机器,具有4GB RAM和16GB SSD . RAM的大小是有意的,因为我希望看到中间结果溢出到磁盘并且没有一切都完全在内存中 .

我的输入数据不是那么大(大约300 MB)它可以从这里得到:Gowalla - checkins . 它包含用户签到一堆位置的日志(包括user_id,location_id,checkin_timestamp) . 我试图运行的计算生成一个比输入数据大很多的中间结果 . 发生这种情况是因为,对于每个位置,我们生成一组所有不同的用户(a,b),这些用户(a,b)已经访问了一个<登记入住时间b的登记时间 .

我的脚本采用单个参数来确定用于实验的输入数据的哪个部分 .

因此当它运行时:

python dask_experiment.py 10

它在开始时将10%的记录子集化并从那里开始 . 其输出如下:

Running on 10% of data
Time elapsed connecting to file and defining chins5 : 0.071 s
Time elapsed computing size of initial dataset : 20.576 s
len(chins1) = 642383

Time elapsed computing q999 : 34.751 s
Time elapsed computing chins5 : 34.918 s

          user_id_x  user_id_y  location_count
1436385      10971      10971              44
293388        1404       1080              40
...

所以在这种情况下一切都很糟糕 .

当它运行时:

python dask_experiment.py 50

我得到以下内容

Running on 50% of data 
Time elapsed connecting to file and defining chins5 : 0.069 s 
Time elapsed computing size of initial dataset : 20.635 s 
len(chins1) = 3238246

Killed   # :.( !

上面的实验是默认配置 .

{'array': {'chunk-size': '128MiB', 'rechunk-threshold': 4},
 'distributed': {'admin': {'log-format': '%(name)s - %(levelname)s - '
                                         '%(message)s',
                           'log-length': 10000,
                           'pdb-on-err': False,
                           'tick': {'interval': '20ms', 'limit': '3s'}},
                 'client': {'heartbeat': '5s'},
                 'comm': {'compression': 'auto',
                          'default-scheme': 'tcp',
                          'recent-messages-log-length': 0,
                          'socket-backlog': 2048,
                          'timeouts': {'connect': '10s', 'tcp': '30s'}},
                 'dashboard': {'export-tool': False,
                               'link': 'http://{host}:{port}/status'},
                 'scheduler': {'allowed-failures': 3,
                               'bandwidth': 100000000,
                               'default-data-size': 1000,
                               'transition-log-length': 100000,
                               'work-stealing': True,
                               'worker-ttl': None},
                 'version': 2,
                 'worker': {'memory': {'pause': 0.8,
                                       'spill': 0.7,
                                       'target': 0.6,
                                       'terminate': 0.95},
                            'multiprocessing-method': 'forkserver',
                            'profile': {'cycle': '1000ms', 'interval': '10ms'},
                            'use-file-locking': True}}}

我试图摆弄 worker.memory.* 参数,试图尽快强制溢出到磁盘,但得到了更糟糕的错误 .

Running on 50 % of data
Time elapsed connecting to file and defining chins5 : 0.071 s
Time elapsed computing size of initial dataset : 20.361 s
len(chins1) = 3238246

Traceback (most recent call last):
  File "dask_experiment.py", line 79, in <module>
    main()
  File "dask_experiment.py", line 61, in main
    q999   = easy_time_it( lambda : q999.compute(), "computing q999" )
  File "dask_experiment.py", line 72, in easy_time_it
    result = fun()
  File "dask_experiment.py", line 61, in <lambda>
    q999   = easy_time_it( lambda : q999.compute(), "computing q999" )
  File "/home/ubuntu/miniconda/lib/python3.6/site-packages/dask/base.py", line 156, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/home/ubuntu/miniconda/lib/python3.6/site-packages/dask/base.py", line 402, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/ubuntu/miniconda/lib/python3.6/site-packages/dask/threaded.py", line 75, in get
    pack_exception=pack_exception, **kwargs)
  File "/home/ubuntu/miniconda/lib/python3.6/site-packages/dask/local.py", line 521, in get_async
    raise_exception(exc, tb)
  File "/home/ubuntu/miniconda/lib/python3.6/site-packages/dask/compatibility.py", line 69, in reraise
    raise exc
  File "/home/ubuntu/miniconda/lib/python3.6/site-packages/dask/local.py", line 290, in execute_task
    result = _execute_task(task, data)
  File "/home/ubuntu/miniconda/lib/python3.6/site-packages/dask/local.py", line 271, in _execute_task
    return func(*args2)
  File "/home/ubuntu/miniconda/lib/python3.6/site-packages/dask/dataframe/methods.py", line 355, in merge
    suffixes=suffixes, indicator=indicator)
  File "/home/ubuntu/miniconda/lib/python3.6/site-packages/pandas/core/reshape/merge.py", line 62, in merge
    return op.get_result()
  File "/home/ubuntu/miniconda/lib/python3.6/site-packages/pandas/core/reshape/merge.py", line 582, in get_result
    concat_axis=0, copy=self.copy)
  File "/home/ubuntu/miniconda/lib/python3.6/site-packages/pandas/core/internals.py", line 5421, in concatenate_block_managers
    concatenate_join_units(join_units, concat_axis, copy=copy),
  File "/home/ubuntu/miniconda/lib/python3.6/site-packages/pandas/core/internals.py", line 5575, in concatenate_join_units
    concat_values = concat_values.copy()
MemoryError

我想知道这种情况是否能以某种方式得到修复......

Can anybody give me some suggestions on what to try?

我的代码如下

import os, sys, time, pprint, dask
import dask.dataframe as dd
import dask.distributed #even though not explicitely used it sets up some config options as side effect...


def load_define( data_dir, subset_fun ) :
    #checkins_df = dd.read_hdf( data_dir + 'gowalla_checkins.hdf5', key='data' )
    checkins_df_0 = dd.read_csv( data_dir +  'loc-gowalla_totalCheckins.txt',
                                 delimiter="\t", names=['user_id', 'checkin_ts', 'lat', 'lon', 'location_id'],
                                 parse_dates = ["checkin_ts"] )

    chins1 = subset_fun( checkins_df_0[["user_id", "location_id", "checkin_ts"]] )

    chins2 = chins1.merge( chins1, on="location_id" )
    chins3 = chins2[ chins2.checkin_ts_x < chins2.checkin_ts_y ]

    chins4 = ( chins3[["user_id_x", "user_id_y", "location_id"]]
                     .drop_duplicates()
                     .groupby(["user_id_x", "user_id_y"])
                     .agg( {"location_id" : "count"})
                     .rename( columns = { "location_id" : "location_count" } )
                     .reset_index() )

    q999 = chins4['location_count'].quantile( 0.999 )

    chins5 = chins4[ chins4.location_count > q999 ]

    return chins1, chins4, q999, chins5


def main( ) :
    #%%
    if os.name == 'nt' :
        data_dir = "C:/_DATA/experimentation/"
    else :
        data_dir = "./"
        #%%
    pprint.pprint( dask.config.config )

    pct_of_input_data = float(sys.argv[1])
    print( "Running on %.0f%% of data" % pct_of_input_data )

    subset_fun = lambda dd0 : subset( dd0, 100, pct_of_input_data  )

    ( chins1,
      chins4,
      q999,
      chins5 ) = easy_time_it( lambda : load_define(data_dir, subset_fun),
                               "connecting to file and defining chins5" )
    size1 = easy_time_it( lambda : len(chins1), "computing size of initial dataset" )
    print( "len(chins1) = %d\n" % size1 )
    q999   = easy_time_it( lambda : q999.compute(), "computing q999" )
    chins5 = easy_time_it( lambda : chins5.compute(), "computing chins5" )

    print( "\n", chins5.sort_values('location_count', ascending=False).head() )


def subset( dd0, mod, h ) :
    return dd0[ dd0.location_id % mod < h ]

def easy_time_it( fun, msg = "" ) :
    t0 = time.clock()
    result = fun()
    t1 = time.clock( )
    print( "Time elapsed " + msg  +  " : %.3f s" % (t1 - t0) )
    return result

if __name__ == "__main__" :
    main()