首页 文章

加快内核估算的采样

提问于
浏览
8

这是我正在使用的更大代码的 MWE . 基本上,它针对位于特定阈值以下的所有值对KDE(kernel density estimate)执行蒙特卡洛积分(在这个问题上提出了积分方法BTW:Integrate 2D kernel density estimate) .

import numpy as np
from scipy import stats
import time

# Generate some random two-dimensional data:
def measure(n):
    "Measurement model, return two coupled measurements."
    m1 = np.random.normal(size=n)
    m2 = np.random.normal(scale=0.5, size=n)
    return m1+m2, m1-m2

# Get data.
m1, m2 = measure(20000)
# Define limits.
xmin = m1.min()
xmax = m1.max()
ymin = m2.min()
ymax = m2.max()

# Perform a kernel density estimate on the data.
x, y = np.mgrid[xmin:xmax:100j, ymin:ymax:100j]
values = np.vstack([m1, m2])
kernel = stats.gaussian_kde(values)

# Define point below which to integrate the kernel.
x1, y1 = 0.5, 0.5

# Get kernel value for this point.
tik = time.time()
iso = kernel((x1,y1))
print 'iso: ', time.time()-tik

# Sample from KDE distribution (Monte Carlo process).
tik = time.time()
sample = kernel.resample(size=1000)
print 'resample: ', time.time()-tik

# Filter the sample leaving only values for which
# the kernel evaluates to less than what it does for
# the (x1, y1) point defined above.
tik = time.time()
insample = kernel(sample) < iso
print 'filter/sample: ', time.time()-tik

# Integrate for all values below iso.
tik = time.time()
integral = insample.sum() / float(insample.shape[0])
print 'integral: ', time.time()-tik

输出看起来像这样:

iso:  0.00259208679199
resample:  0.000817060470581
filter/sample:  2.10829401016
integral:  4.2200088501e-05

这显然意味着过滤器/样本调用几乎消耗了代码用于运行的所有时间 . 我必须迭代地运行这个代码块数千次,这样它会花费很多时间 .

有没有办法加快过滤/采样过程?


添加

这是一个稍微更现实的 MWE 我的实际代码,其中写有Ophion的多线程解决方案:

import numpy as np
from scipy import stats
from multiprocessing import Pool

def kde_integration(m_list):

    m1, m2 = [], []
    for item in m_list:
        # Color data.
        m1.append(item[0])
        # Magnitude data.
        m2.append(item[1])

    # Define limits.
    xmin, xmax = min(m1), max(m1)
    ymin, ymax = min(m2), max(m2)

    # Perform a kernel density estimate on the data:
    x, y = np.mgrid[xmin:xmax:100j, ymin:ymax:100j]
    values = np.vstack([m1, m2])
    kernel = stats.gaussian_kde(values)

    out_list = []

    for point in m_list:

        # Compute the point below which to integrate.
        iso = kernel((point[0], point[1]))

        # Sample KDE distribution
        sample = kernel.resample(size=1000)

        #Create definition.
        def calc_kernel(samp):
            return kernel(samp)

        #Choose number of cores and split input array.
        cores = 4
        torun = np.array_split(sample, cores, axis=1)

        #Calculate
        pool = Pool(processes=cores)
        results = pool.map(calc_kernel, torun)

        #Reintegrate and calculate results
        insample_mp = np.concatenate(results) < iso

        # Integrate for all values below iso.
        integral = insample_mp.sum() / float(insample_mp.shape[0])

        out_list.append(integral)

    return out_list


# Generate some random two-dimensional data:
def measure(n):
    "Measurement model, return two coupled measurements."
    m1 = np.random.normal(size=n)
    m2 = np.random.normal(scale=0.5, size=n)
    return m1+m2, m1-m2

# Create list to pass.
m_list = []
for i in range(60):
    m1, m2 = measure(5)
    m_list.append(m1.tolist())
    m_list.append(m2.tolist())

# Call KDE integration function.
print 'Integral result: ', kde_integration(m_list)

Ophion提供的解决方案在我提供的原始代码上运行良好,但在此版本中失败并出现以下错误:

Integral result: Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 504, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 319, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

我尝试移动 calc_kernel 函数,因为这个问题中的一个答案Multiprocessing: How to use Pool.map on a function defined in a class?表明"the function that you give to map() must be accessible through an import of your module";但我仍然无法使用此代码 .

任何帮助将非常感谢 .


添加2

实现Ophion建议删除 calc_kernel 函数并简单地使用:

results = pool.map(kernel, torun)

努力摆脱 PicklingError 但现在我看到,如果我创建一个超过62-63项的初始 m_list 我得到这个错误:

Traceback (most recent call last):
  File "~/gauss_kde_temp.py", line 67, in <module>
    print 'Integral result: ', kde_integration(m_list)
  File "~/gauss_kde_temp.py", line 38, in kde_integration
    pool = Pool(processes=cores)
  File "/usr/lib/python2.7/multiprocessing/__init__.py", line 232, in Pool
    return Pool(processes, initializer, initargs, maxtasksperchild)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 161, in __init__
    self._result_handler.start()
  File "/usr/lib/python2.7/threading.py", line 494, in start
    _start_new_thread(self.__bootstrap, ())
thread.error: can't start new thread

由于我在实际执行此代码中的实际列表最多可包含2000个项目,因此此问题会导致代码无法使用 . 线 38 是这一个:

pool = Pool(processes=cores)

显然它与我正在使用的核心数量有关?

这个问题"Can't start a new thread error" in Python建议使用:

threading.active_count()

当我收到错误时检查我要去的线程数 . 我检查了它,当它到达 374 个线程时总是崩溃 . 我该如何编码这个问题?


这是处理这最后一期的新问题:Thread error: can't start new thread

2 回答

  • 2

    加快这一点的最简单方法可能是并行化 kernel(sample)

    以此代码片段为例:

    tik = time.time()
    insample = kernel(sample) < iso
    print 'filter/sample: ', time.time()-tik
    #filter/sample:  1.94065904617
    

    将其更改为使用 multiprocessing

    from multiprocessing import Pool
    tik = time.time()
    
    #Create definition.
    def calc_kernel(samp):
        return kernel(samp)
    
    #Choose number of cores and split input array.
    cores = 4
    torun = np.array_split(sample, cores, axis=1)
    
    #Calculate
    pool = Pool(processes=cores)
    results = pool.map(calc_kernel, torun)
    
    #Reintegrate and calculate results
    insample_mp = np.concatenate(results) < iso
    
    print 'multiprocessing filter/sample: ', time.time()-tik
    #multiprocessing filter/sample:  0.496874094009
    

    仔细检查他们会回答相同的答案:

    print np.all(insample==insample_mp)
    #True
    

    4核增加3.9倍 . 不知道你在运行什么,但在大约6个处理器后,你的输入数组大小不足以获得相当大的收益 . 例如,使用20个处理器,其速度仅提高约5.8倍 .

  • 4

    本文评论部分(下面的链接)中的声明是

    “SciPy的gaussian_kde不使用FFT,而有一个statsmodels实现可以”

    ......这可能是导致观察到的不良表现的原因 . 它继续使用FFT报告数量级的改进 . 请参阅@ jseabold的回复 .

    http://slendrmeans.wordpress.com/2012/05/01/will-it-python-machine-learning-for-hackers-chapter-2-part-1-summary-stats-and-density-estimators/

    免责声明:我没有statsmodels或scipy的经验 .

相关问题