首页 文章

无分支K-means(或其他优化)

提问于
浏览
57

注意:我更感谢如何处理和提出这些解决方案而不是解决方案本身的指南 .

我的系统中有一个性能非常关键的功能,在特定的上下文中显示为一个特征分析热点 . 它位于k-means迭代的中间(已经是多线程,使用并行来处理每个工作线程中的点的子范围) .

ClusterPoint& pt = points[j];
pt.min_index = -1;
pt.min_dist = numeric_limits<float>::max();
for (int i=0; i < num_centroids; ++i)
{
    const ClusterCentroid& cent = centroids[i];
    const float dist = ...;
    if (dist < pt.min_dist) // <-- #1 hotspot
    {
        pt.min_dist = dist;
        pt.min_index = i;
    }
}

处理这部分代码所需的时间节省很多,所以我经常一直在摆弄它 . 例如,将质心环放在外面可能是值得的,并且对于给定的质心,并行地迭代这些点 . 这里的聚类点数量超过数百万,而质心的数量跨越数千 . 该算法适用于少数迭代(通常低于10) . 它不寻求完美的收敛/稳定性,只是一些“合理的”近似 .

任何想法都值得赞赏,但我真正渴望发现的是,这个代码是否可以无分支,因为它允许SIMD版本 . 我还没有真正发展出那种能够轻松掌握如何提出无分支解决方案的心智能力:我的大脑在那里失败的方式就像我早期第一次接触到递归一样,所以关于如何写无分支的指南代码以及如何为它开发适当的思维方式也会有所帮助 .

简而言之,我正在寻找有关如何微量优化此代码的任何指南,提示和建议(不一定是解决方案) . 它很可能有改进算法的空间,但我的盲点一直都是微优化解决方案(而且我很想学会如何更有效地应用它们而不会过度使用它) . 它已经与逻辑上的粗块并行紧密多线程,所以我几乎被推入微优化角落,作为没有更智能算法的更快速尝试之一 . 我们可以完全自由地改变内存布局 .

响应算法建议

关于在寻求微优化O(knm)算法的过程中看到这一切都是错误的,这种算法在算法层面上可以明显改进,我完全同意 . 这将这个具体问题推到一个有点学术和不切实际的领域 . 但是,如果我可以获得一个轶事,我来自高级编程的原始背景 - 强调广泛,大规模的观点,安全性,并且很少涉及低级实现细节 . 我最近将项目转换为一种非常不同的现代风格的项目,我正在学习各种新的技巧,包括缓存效率,GPGPU,无分支技术,SIMD,实际上比malloc更优秀的特殊用途内存分配器(但对于特定场景)等

这是我试图赶上最新的性能趋势的地方,令人惊讶的是我发现我在90年代常常偏爱的旧数据结构通常是链接/树型结构实际上远远超过了更天真的表现,粗略的,微优化的,并行化的代码,在连续的存储块上应用调谐指令 . 这有点令人失望,因为我觉得我们现在更多地将算法应用于机器并以这种方式缩小可能性(特别是使用GPGPU) .

最有趣的是,我发现这种微优化,快速的阵列处理代码比我之前使用的复杂算法和数据结构更容易维护 . 首先,它们更容易概括 . 此外,我的同事经常可以就某个区域的特定减速情况向客户提出投诉,只需要对某个SIMD进行并行操作,并且可能需要一些SIMD,并以适当的速度调用它 . 算法改进通常可以提供更多,但是这些微优化可以应用的速度和非侵入性使我想要在该领域学到更多,因为阅读更好算法的论文可能需要一些时间(以及需要更多广泛的变化) . 所以我最近一直在推动这种微优化的潮流,在这种特殊情况下可能有点过分,但我的好奇心更多的是为任何场景扩展我的可能解决方案范围 .

反汇编

注意:我在组装时非常非常糟糕,所以我经常在试错中调整更多的东西,在某种程度上提出来有教育意义的猜测为什么在vtune中显示的热点可能是瓶颈,然后尝试一下,看看时间是否有所改善,假设如果时间有所改善,猜测会有一些真实的暗示,或者如果他们不改善则完全错过了标记 .

000007FEEE3FB8A1  jl          thread_partition+70h (7FEEE3FB780h) 
    {
        ClusterPoint& pt = points[j];
        pt.min_index = -1;
        pt.min_dist = numeric_limits<float>::max();
        for (int i = 0; i < num_centroids; ++i)
000007FEEE3FB8A7  cmp         ecx,r10d 
000007FEEE3FB8AA  jge         thread_partition+1F4h (7FEEE3FB904h) 
000007FEEE3FB8AC  lea         rax,[rbx+rbx*2] 
000007FEEE3FB8B0  add         rax,rax 
000007FEEE3FB8B3  lea         r8,[rbp+rax*8+8] 
        {
            const ClusterCentroid& cent = centroids[i];
            const float x = pt.pos[0] - cent.pos[0];
            const float y = pt.pos[1] - cent.pos[1];
000007FEEE3FB8B8  movss       xmm0,dword ptr [rdx] 
            const float z = pt.pos[2] - cent.pos[2];
000007FEEE3FB8BC  movss       xmm2,dword ptr [rdx+4] 
000007FEEE3FB8C1  movss       xmm1,dword ptr [rdx-4] 
000007FEEE3FB8C6  subss       xmm2,dword ptr [r8] 
000007FEEE3FB8CB  subss       xmm0,dword ptr [r8-4] 
000007FEEE3FB8D1  subss       xmm1,dword ptr [r8-8] 
            const float dist = x*x + y*y + z*z;
000007FEEE3FB8D7  mulss       xmm2,xmm2 
000007FEEE3FB8DB  mulss       xmm0,xmm0 
000007FEEE3FB8DF  mulss       xmm1,xmm1 
000007FEEE3FB8E3  addss       xmm2,xmm0 
000007FEEE3FB8E7  addss       xmm2,xmm1 

            if (dist < pt.min_dist)
// VTUNE HOTSPOT
000007FEEE3FB8EB  comiss      xmm2,dword ptr [rdx-8] 
000007FEEE3FB8EF  jae         thread_partition+1E9h (7FEEE3FB8F9h) 
            {
                pt.min_dist = dist;
000007FEEE3FB8F1  movss       dword ptr [rdx-8],xmm2 
                pt.min_index = i;
000007FEEE3FB8F6  mov         dword ptr [rdx-10h],ecx 
000007FEEE3FB8F9  inc         ecx  
000007FEEE3FB8FB  add         r8,30h 
000007FEEE3FB8FF  cmp         ecx,r10d 
000007FEEE3FB902  jl          thread_partition+1A8h (7FEEE3FB8B8h) 
    for (int j = *irange.first; j < *irange.last; ++j)
000007FEEE3FB904  inc         edi  
000007FEEE3FB906  add         rdx,20h 
000007FEEE3FB90A  cmp         edi,dword ptr [rsi+4] 
000007FEEE3FB90D  jl          thread_partition+31h (7FEEE3FB741h) 
000007FEEE3FB913  mov         rbx,qword ptr [irange] 
            }
        }
    }
}

我们被迫瞄准SSE 2 - 落后于我们的时代,但是当我们假设即使SSE 4作为最低要求(用户有一些原型Intel机器)时,用户群实际上也绊倒了一次 .

使用独立测试更新:约5.6秒

我非常感谢所提供的所有帮助!因为代码库非常广泛并且触发该代码的条件很复杂(跨多个线程触发系统事件),所以每次进行实验性更改并对其进行分析时都有点笨拙 . 所以我作为一个独立的应用程序设置了一个表面测试,其他人也可以运行和尝试,以便我可以尝试所有这些优雅的解决方案 .

#define _SECURE_SCL 0
#include <iostream>
#include <fstream>
#include <vector>
#include <limits>
#include <ctime>
#if defined(_MSC_VER)
    #define ALIGN16 __declspec(align(16))
#else
    #include <malloc.h>
    #define ALIGN16 __attribute__((aligned(16)))
#endif

using namespace std;

// Aligned memory allocation (for SIMD).
static void* malloc16(size_t amount)
{
    #ifdef _MSC_VER
        return _aligned_malloc(amount, 16);
    #else
        void* mem = 0;
        posix_memalign(&mem, 16, amount);
        return mem;
    #endif
}
template <class T>
static T* malloc16_t(size_t num_elements)
{
    return static_cast<T*>(malloc16(num_elements * sizeof(T)));
}

// Aligned free.
static void free16(void* mem)
{
    #ifdef _MSC_VER
        return _aligned_free(mem);
    #else
        free(mem);
    #endif
}

// Test parameters.
enum {num_centroids = 512};
enum {num_points = num_centroids * 2000};
enum {num_iterations = 5};
static const float range = 10.0f;

class Points
{
public:
    Points(): data(malloc16_t<Point>(num_points))
    {
        for (int p=0; p < num_points; ++p)
        {
            const float xyz[3] =
            {
                range * static_cast<float>(rand()) / RAND_MAX,
                range * static_cast<float>(rand()) / RAND_MAX,
                range * static_cast<float>(rand()) / RAND_MAX
            };
            init(p, xyz);
        }
    }
    ~Points()
    {
        free16(data);
    }
    void init(int n, const float* xyz)
    {
        data[n].centroid = -1;
        data[n].xyz[0] = xyz[0];
        data[n].xyz[1] = xyz[1];
        data[n].xyz[2] = xyz[2];
    }
    void associate(int n, int new_centroid)
    {
        data[n].centroid = new_centroid;
    }
    int centroid(int n) const
    {
        return data[n].centroid;
    }
    float* operator[](int n)
    {
        return data[n].xyz;
    }

private:
    Points(const Points&);
    Points& operator=(const Points&);
    struct Point
    {
        int centroid;
        float xyz[3];
    };
    Point* data;
};

class Centroids
{
public:
    Centroids(Points& points): data(malloc16_t<Centroid>(num_centroids))
    {
        // Naive initial selection algorithm, but outside the 
        // current area of interest.
        for (int c=0; c < num_centroids; ++c)
            init(c, points[c]);
    }
    ~Centroids()
    {
        free16(data);
    }
    void init(int n, const float* xyz)
    {
        data[n].count = 0;
        data[n].xyz[0] = xyz[0];
        data[n].xyz[1] = xyz[1];
        data[n].xyz[2] = xyz[2];
    }
    void reset(int n)
    {
        data[n].count = 0;
        data[n].xyz[0] = 0.0f;
        data[n].xyz[1] = 0.0f;
        data[n].xyz[2] = 0.0f;
    }
    void sum(int n, const float* pt_xyz)
    {
        data[n].xyz[0] += pt_xyz[0];
        data[n].xyz[1] += pt_xyz[1];
        data[n].xyz[2] += pt_xyz[2];
        ++data[n].count;
    }
    void average(int n)
    {
        if (data[n].count > 0)
        {
            const float inv_count = 1.0f / data[n].count;
            data[n].xyz[0] *= inv_count;
            data[n].xyz[1] *= inv_count;
            data[n].xyz[2] *= inv_count;
        }
    }
    float* operator[](int n)
    {
        return data[n].xyz;
    }
    int find_nearest(const float* pt_xyz) const
    {
        float min_dist_squared = numeric_limits<float>::max();
        int min_centroid = -1;
        for (int c=0; c < num_centroids; ++c)
        {
            const float* cen_xyz = data[c].xyz;
            const float x = pt_xyz[0] - cen_xyz[0];
            const float y = pt_xyz[1] - cen_xyz[1];
            const float z = pt_xyz[2] - cen_xyz[2];
            const float dist_squared = x*x + y*y * z*z;

            if (min_dist_squared > dist_squared)
            {
                min_dist_squared = dist_squared;
                min_centroid = c;
            }
        }
        return min_centroid;
    }

private:
    Centroids(const Centroids&);
    Centroids& operator=(const Centroids&);
    struct Centroid
    {
        int count;
        float xyz[3];
    };
    Centroid* data;
};

// A high-precision real timer would be nice, but we lack C++11 and
// the coarseness of the testing here should allow this to suffice.
static double sys_time()
{
    return static_cast<double>(clock()) / CLOCKS_PER_SEC;
}

static void k_means(Points& points, Centroids& centroids)
{
    // Find the closest centroid for each point.
    for (int p=0; p < num_points; ++p)
    {
        const float* pt_xyz = points[p];
        points.associate(p, centroids.find_nearest(pt_xyz));
    }

    // Reset the data of each centroid.
    for (int c=0; c < num_centroids; ++c)
        centroids.reset(c);

    // Compute new position sum of each centroid.
    for (int p=0; p < num_points; ++p)
        centroids.sum(points.centroid(p), points[p]);

    // Compute average position of each centroid.
    for (int c=0; c < num_centroids; ++c)
        centroids.average(c);
}

int main()
{
    Points points;
    Centroids centroids(points);

    cout << "Starting simulation..." << endl;
    double start_time = sys_time();
    for (int i=0; i < num_iterations; ++i)
        k_means(points, centroids);
    cout << "Time passed: " << (sys_time() - start_time) << " secs" << endl;
    cout << "# Points: " << num_points << endl;
    cout << "# Centroids: " << num_centroids << endl;

    // Write the centroids to a file to give us some crude verification
    // of consistency as we make changes.
    ofstream out("centroids.txt");
    for (int c=0; c < num_centroids; ++c)
        out << "Centroid " << c << ": " << centroids[c][0] << "," << centroids[c][1] << "," << centroids[c][2] << endl;
}

我知道表面测试的危险,但由于它已经被认为是以前真实世界 Session 的热点,我希望它是可以原谅的 . 我也对与微优化此类代码相关的一般技术感兴趣 .

我在分析这个结果时得到的结果略有不同 . 这里的时间更均匀地分散在循环中,因为数据较小(我省略了成员并将 min_dist 成员提升并使其成为局部变量) . 质心与点之间的确切比例也有所不同,但希望足够接近以将此处的改进转化为原始代码 . 它现在愿意接受,因为我更感兴趣的是扩展我对可以优化这些案例的技术的知识,而不是解决这个问题的解决方案) .

VTune

更新Yochai Timmer的建议 - 约12.5秒

哦,我面对微优化的困境而不太了解组装 . 我替换了这个:

-if (min_dist_squared > dist_squared)
        -{
        -    min_dist_squared = dist_squared;
        -    pt.centroid = c;
        -}

有了这个:

+const bool found_closer = min_dist_squared > dist_squared;
        +pt.centroid = bitselect(found_closer, c, pt.centroid);
        +min_dist_squared = bitselect(found_closer, dist_squared, min_dist_squared);

..只是发现时间从~5.6秒升级到~12.5秒 . 尽管如此,这不是他的错,也不是从他的解决方案的 Value 中剔除 - 这是因为我无法理解机器级别的实际情况并且在黑暗中进行了攻击 . 那个人显然错过了,显然我并不像我最初想的那样是分支错误预测的受害者 . 尽管如此,他提出的解决方案是一个很好的和通用的功能,在这种情况下尝试,我很感激将它添加到我的工具箱的提示和技巧 . 现在是第2轮 .

Harold的SIMD解决方案 - 2.496秒(见警告)

这个解决方案可能很棒 . 将群集代表转换为SoA后,我得到的时间约为2.5秒!不幸的是,似乎有某种故障 . 对于最终输出,我得到的结果非常不同,这表明不仅仅是微小的精度差异,包括一些质心朝向末尾的质心(意味着在搜索中找不到它们) . 我一直试图通过调试器来查看SIMD逻辑,看看可能是什么 - 它本身可能只是一个转录错误,但这是代码,以防有人发现错误 .

如果错误可以在不减慢结果的情况下得到纠正,那么这种速度提升的速度比我从纯粹的微优化中想象的要快!

// New version of Centroids::find_nearest (from harold's solution):
    int find_nearest(const float* pt_xyz) const
    {
        __m128i min_index = _mm_set_epi32(3, 2, 1, 0);
        __m128 xdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[0]), _mm_load_ps(cen_x));
        __m128 ydif = _mm_sub_ps(_mm_set1_ps(pt_xyz[1]), _mm_load_ps(cen_y));
        __m128 zdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[2]), _mm_load_ps(cen_z));
        __m128 min_dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), 
                                                _mm_mul_ps(ydif, ydif)), 
                                                _mm_mul_ps(zdif, zdif));
        __m128i index = min_index;
        for (int i=4; i < num_centroids; i += 4) 
        {
            xdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[0]), _mm_load_ps(cen_x + i));
            ydif = _mm_sub_ps(_mm_set1_ps(pt_xyz[1]), _mm_load_ps(cen_y + i));
            zdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[2]), _mm_load_ps(cen_z + i));
            __m128 dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), 
                                                _mm_mul_ps(ydif, ydif)), 
                                                _mm_mul_ps(zdif, zdif));
            __m128i mask = _mm_castps_si128(_mm_cmplt_ps(dist, min_dist));
            min_dist = _mm_min_ps(min_dist, dist);
            min_index = _mm_or_si128(_mm_and_si128(index, mask), 
                                     _mm_andnot_si128(mask, min_index));
            index = _mm_add_epi32(index, _mm_set1_epi32(4));
        }

        ALIGN16 float mdist[4];
        ALIGN16 uint32_t mindex[4];
        _mm_store_ps(mdist, min_dist);
        _mm_store_si128((__m128i*)mindex, min_index);

        float closest = mdist[0];
        int closest_i = mindex[0];
        for (int i=1; i < 4; i++)
        {
            if (mdist[i] < closest) 
            {
                closest = mdist[i];
                closest_i = mindex[i];
            }
        }
        return closest_i;
    }

Harold的SIMD解决方案(更正) - 约2.5秒

应用校正并测试后,结果完好无损且功能正常,并对原始代码库进行了类似的改进!

由于这触及了我想要更好地理解的知识的圣杯(无分支SIMD),我将给予解决方案一些额外的道具,以使操作的速度加倍 . 我试图理解它的功课,因为我的目标不仅仅是缓解这个热点,而是扩展我个人对可能解决方案的理解 .

不过,我很感激这里的所有贡献,从算法建议到非常酷的 bitselect 技巧!我希望我能接受所有答案 . 我可能会在某些时候尝试所有这些,但是现在我已经完成了我的功课,理解了一些非算术的SIMD操作 .

int find_nearest_simd(const float* pt_xyz) const
{
    __m128i min_index = _mm_set_epi32(3, 2, 1, 0);
    __m128 pt_xxxx = _mm_set1_ps(pt_xyz[0]);
    __m128 pt_yyyy = _mm_set1_ps(pt_xyz[1]);
    __m128 pt_zzzz = _mm_set1_ps(pt_xyz[2]);

    __m128 xdif = _mm_sub_ps(pt_xxxx, _mm_load_ps(cen_x));
    __m128 ydif = _mm_sub_ps(pt_yyyy, _mm_load_ps(cen_y));
    __m128 zdif = _mm_sub_ps(pt_zzzz, _mm_load_ps(cen_z));
    __m128 min_dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), 
                                            _mm_mul_ps(ydif, ydif)), 
                                            _mm_mul_ps(zdif, zdif));
    __m128i index = min_index;
    for (int i=4; i < num_centroids; i += 4) 
    {
        xdif = _mm_sub_ps(pt_xxxx, _mm_load_ps(cen_x + i));
        ydif = _mm_sub_ps(pt_yyyy, _mm_load_ps(cen_y + i));
        zdif = _mm_sub_ps(pt_zzzz, _mm_load_ps(cen_z + i));
        __m128 dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), 
                                            _mm_mul_ps(ydif, ydif)), 
                                            _mm_mul_ps(zdif, zdif));
        index = _mm_add_epi32(index, _mm_set1_epi32(4));
        __m128i mask = _mm_castps_si128(_mm_cmplt_ps(dist, min_dist));
        min_dist = _mm_min_ps(min_dist, dist);
        min_index = _mm_or_si128(_mm_and_si128(index, mask), 
                                 _mm_andnot_si128(mask, min_index));
    }

    ALIGN16 float mdist[4];
    ALIGN16 uint32_t mindex[4];
    _mm_store_ps(mdist, min_dist);
    _mm_store_si128((__m128i*)mindex, min_index);

    float closest = mdist[0];
    int closest_i = mindex[0];
    for (int i=1; i < 4; i++)
    {
        if (mdist[i] < closest) 
        {
            closest = mdist[i];
            closest_i = mindex[i];
        }
    }
    return closest_i;
}

6 回答

  • 3

    太糟糕了,我们可以对此进行测试,只是编译它以查看是否存在语法错误并查看程序集是否有意义(它是大多数情况下,虽然GCC溢出 min_index 甚至还有一些未使用的 xmm 寄存器,但不确定为什么会发生这种情况)

    int find_closest(float *x, float *y, float *z,
                     float pt_x, float pt_y, float pt_z, int n) {
        __m128i min_index = _mm_set_epi32(3, 2, 1, 0);
        __m128 xdif = _mm_sub_ps(_mm_set1_ps(pt_x), _mm_load_ps(x));
        __m128 ydif = _mm_sub_ps(_mm_set1_ps(pt_y), _mm_load_ps(y));
        __m128 zdif = _mm_sub_ps(_mm_set1_ps(pt_z), _mm_load_ps(z));
        __m128 min_dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), 
                                                _mm_mul_ps(ydif, ydif)), 
                                                _mm_mul_ps(zdif, zdif));
        __m128i index = min_index;
        for (int i = 4; i < n; i += 4) {
            xdif = _mm_sub_ps(_mm_set1_ps(pt_x), _mm_load_ps(x + i));
            ydif = _mm_sub_ps(_mm_set1_ps(pt_y), _mm_load_ps(y + i));
            zdif = _mm_sub_ps(_mm_set1_ps(pt_z), _mm_load_ps(z + i));
            __m128 dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), 
                                                _mm_mul_ps(ydif, ydif)), 
                                                _mm_mul_ps(zdif, zdif));
            index = _mm_add_epi32(index, _mm_set1_epi32(4));
            __m128i mask = _mm_castps_si128(_mm_cmplt_ps(dist, min_dist));
            min_dist = _mm_min_ps(min_dist, dist);
            min_index = _mm_or_si128(_mm_and_si128(index, mask), 
                                     _mm_andnot_si128(mask, min_index));
        }
        float mdist[4];
        _mm_store_ps(mdist, min_dist);
        uint32_t mindex[4];
        _mm_store_si128((__m128i*)mindex, min_index);
        float closest = mdist[0];
        int closest_i = mindex[0];
        for (int i = 1; i < 4; i++) {
            if (mdist[i] < closest) {
                closest = mdist[i];
                closest_i = mindex[i];
            }
        }
        return closest_i;
    }
    

    像往常一样,它希望指针是16对齐的 . 此外,填充应该是无限远的点(因此它们永远不会最接近目标) .

    SSE 4.1会让你替换它

    min_index = _mm_or_si128(_mm_and_si128(index, mask), 
                             _mm_andnot_si128(mask, min_index));
    

    这样

    min_index = _mm_blendv_epi8(min_index, index, mask);
    

    这是一个asm版本,为vsyasm制作,测试了一下(似乎工作)

    bits 64
    
    section .data
    
    align 16
    centroid_four:
        dd 4, 4, 4, 4
    centroid_index:
        dd 0, 1, 2, 3
    
    section .text
    
    global find_closest
    
    proc_frame find_closest
        ;
        ;   arguments:
        ;       ecx: number of points (multiple of 4 and at least 4)
        ;       rdx -> array of 3 pointers to floats (x, y, z) (the points)
        ;       r8 -> array of 3 floats (the reference point)
        ;
        alloc_stack 0x58
        save_xmm128 xmm6, 0
        save_xmm128 xmm7, 16
        save_xmm128 xmm8, 32
        save_xmm128 xmm9, 48
    [endprolog]
        movss xmm0, [r8]
        shufps xmm0, xmm0, 0
        movss xmm1, [r8 + 4]
        shufps xmm1, xmm1, 0
        movss xmm2, [r8 + 8]
        shufps xmm2, xmm2, 0
        ; pointers to x, y, z in r8, r9, r10
        mov r8, [rdx]
        mov r9, [rdx + 8]
        mov r10, [rdx + 16]
        ; reference point is in xmm0, xmm1, xmm2 (x, y, z)
        movdqa xmm3, [rel centroid_index]   ; min_index
        movdqa xmm4, xmm3                   ; current index
        movdqa xmm9, [rel centroid_four]     ; index increment
        paddd xmm4, xmm9
        ; calculate initial min_dist, xmm5
        movaps xmm5, [r8]
        subps xmm5, xmm0
        movaps xmm7, [r9]
        subps xmm7, xmm1
        movaps xmm8, [r10]
        subps xmm8, xmm2
        mulps xmm5, xmm5
        mulps xmm7, xmm7
        mulps xmm8, xmm8
        addps xmm5, xmm7
        addps xmm5, xmm8
        add r8, 16
        add r9, 16
        add r10, 16
        sub ecx, 4
        jna _tail
    _loop:
        movaps xmm6, [r8]
        subps xmm6, xmm0
        movaps xmm7, [r9]
        subps xmm7, xmm1
        movaps xmm8, [r10]
        subps xmm8, xmm2
        mulps xmm6, xmm6
        mulps xmm7, xmm7
        mulps xmm8, xmm8
        addps xmm6, xmm7
        addps xmm6, xmm8
        add r8, 16
        add r9, 16
        add r10, 16
        movaps xmm7, xmm6
        cmpps xmm6, xmm5, 1
        minps xmm5, xmm7
        movdqa xmm7, xmm6
        pand xmm6, xmm4
        pandn xmm7, xmm3
        por xmm6, xmm7
        movdqa xmm3, xmm6
        paddd xmm4, xmm9
        sub ecx, 4
        ja _loop
    _tail:
        ; calculate horizontal minumum
        pshufd xmm0, xmm5, 0xB1
        minps xmm0, xmm5
        pshufd xmm1, xmm0, 0x4E
        minps xmm0, xmm1
        ; find index of the minimum
        cmpps xmm0, xmm5, 0
        movmskps eax, xmm0
        bsf eax, eax
        ; index into xmm3, sort of
        movaps [rsp + 64], xmm3
        mov eax, [rsp + 64 + rax * 4]
        movaps xmm9, [rsp + 48]
        movaps xmm8, [rsp + 32]
        movaps xmm7, [rsp + 16]
        movaps xmm6, [rsp]
        add rsp, 0x58
        ret
    endproc_frame
    

    在C:

    extern "C" int find_closest(int n, float** points, float* reference_point);
    
  • 6

    您可以使用无分支三元运算符,有时也称为bitselect(condition?true:false) .
    只需将它用于2名成员,默认为什么都不做 .
    不要担心额外的操作,它们与if语句分支相比没什么 .

    bitselect实现:

    inline static int bitselect(int condition, int truereturnvalue, int falsereturnvalue)
    {
        return (truereturnvalue & -condition) | (falsereturnvalue & ~(-condition)); //a when TRUE and b when FALSE
    }
    
    inline static float bitselect(int condition, float truereturnvalue, float falsereturnvalue)
    {
        //Reinterpret floats. Would work because it's just a bit select, no matter the actual value
        int& at = reinterpret_cast<int&>(truereturnvalue);
        int& af = reinterpret_cast<int&>(falsereturnvalue);
        int res = (at & -condition) | (af & ~(-condition)); //a when TRUE and b when FALSE
        return  reinterpret_cast<float&>(res);
    }
    

    你的循环应该是这样的:

    for (int i=0; i < num_centroids; ++i)
    {
      const ClusterCentroid& cent = centroids[i];
      const float dist = ...;
      bool isSmaeller = dist < pt.min_dist;
    
      //use same value if not smaller
      pt.min_index = bitselect(isSmaeller, i, pt.min_index);
      pt.min_dist = bitselect(isSmaeller, dist, pt.min_dist);
    }
    
  • 10

    C是一种高级语言 . 您假设C源代码中的控制流转换为分支指令是有缺陷的 . 我没有从您的示例中定义某些类型,因此我创建了一个具有类似条件赋值的简单测试程序:

    int g(int, int);
    
    int f(const int *arr)
    {
        int min = 10000, minIndex = -1;
        for ( int i = 0; i < 1000; ++i )
        {
            if ( arr[i] < min )
            {
                min = arr[i];
                minIndex = i;
            }
        }
        return g(min, minIndex);
    }
    

    请注意,使用undefined "g"仅仅是为了防止优化器删除所有内容 . 我将带有-O3和-S的G 4.9.2翻译成x86_64程序集(甚至不需要更改-march的默认值),并且(不过于令人惊讶)结果是循环体不包含分支

    movl    (%rdi,%rax,4), %ecx
    movl    %edx, %r8d
    cmpl    %edx, %ecx
    cmovle  %ecx, %r8d
    cmovl   %eax, %esi
    addq    $1, %rax
    

    除此之外,无分支必然更快的假设也可能存在缺陷,因为新距离的概率正在减少你所观察到的元素越多 . 这不是硬币投掷 . 当编译器生成"as-if"组件时,它们比现在更不具有攻击性,因此发明了"bitselect"技巧 . 我更愿意在尝试重新编写代码之前先看看编译器实际生成的汇编类型,以便编译器能够更好地优化它,或者将结果作为手写汇编的基础 . 如果您想查看SIMD,我建议尝试使用减少数据依赖性的"minimum of minimums"方法(在我的示例中,"min"上的依赖可能是瓶颈) .

  • 21

    首先,我建议在尝试任何代码更改之前,先查看优化构建中的反汇编 . 理想情况下,您希望在汇编级别查看分析器数据 . 这可以显示各种事物,例如:

    • 编译器可能没有生成实际的分支指令 .

    • 具有瓶颈的代码行可能会有比您想象的更多的与之关联的指令 - 例如,dist计算 .

    除此之外还有标准技巧,当你谈论距离计算时,它们通常需要一个平方根 . 你应该在最小平方值的过程结束时做那个平方根 .

    SSE可以使用_mm_min_ps一次处理四个值,而不使用任何分支 . 如果你真的需要速度,那么你想要使用SSE(或AVX)内在函数 . 这是一个基本的例子:

    float MinimumDistance(const float *values, int count)
      {
        __m128 min = _mm_set_ps(FLT_MAX, FLT_MAX, FLT_MAX, FLT_MAX);
        int i=0;
        for (; i < count - 3; i+=4)
        {
            __m128 distances = _mm_loadu_ps(&values[i]);
            min = _mm_min_ps(min, distances);
        }
        // Combine the four separate minimums to a single value
        min = _mm_min_ps(min, _mm_shuffle_ps(min, min, _MM_SHUFFLE(2, 3, 0, 1)));
        min = _mm_min_ps(min, _mm_shuffle_ps(min, min, _MM_SHUFFLE(1, 0, 3, 2)));
    
        // Deal with the last 0-3 elements the slow way
        float result = FLT_MAX;
        if (count > 3) _mm_store_ss(&result, min);
        for (; i < count; i++)
        {
            result = min(values[i], result);
        }
    
        return result;
      }
    

    为获得最佳SSE性能,您应确保在对齐的地址处发生负载 . 如有必要,您可以像上面代码中的最后几个一样处理前几个未对齐的元素 .

    另外需要注意的是内存带宽 . 如果在该循环期间没有使用ClusterCentroid结构的几个成员,那么您将从内存中读取比实际需要更多的数据,因为在高速缓存行大小的块中读取内存,每个块为64字节 .

  • 21

    这可能是双向的,但我会尝试以下结构:

    std::vector<float> centDists(num_centroids); //<-- one for each thread. 
    for (size_t p=0; p<num_points; ++p) {
        Point& pt = points[p];
        for (size_t c=0; c<num_centroids; ++c) {
            const float dist = ...;
            centDists[c]=dist;
        }
        pt.min_idx it= min_element(centDists.begin(),centDists.end())-centDists.begin();    
    }
    

    显然,你现在必须在内存上迭代两次,这可能会损害缓存命中率(你也可以将它分成子范围)但另一方面,每个内部循环都应该易于矢量化和展开 - 所以你只需要衡量它是否值得 .

    即使你坚持你的版本,我也会尝试使用 local variables 来跟踪最小索引和距离,并将结果应用到最后 .
    理性的是,对 pt.min_dist 的每次读取或写入都是通过指针实现的,这取决于编译器的优化,可能会也可能不会降低您的性能 .

    对矢量化很重要的另一件事是将 array of Structs (在这种情况下为蜈蚣)变成 struct of arrays (例如,对于点的每个坐标,例如一个数组),因为这样您就不需要额外的收集指令来加载数据以供SIMD指令使用 . 有关该主题的更多信息,请参见Eric Brumer's talk .

    编辑:我的系统的一些数字(haswell,clang 3.5):
    我用你的基准测试和我的系统做了一个简短的测试,上面的代码使算法减慢了大约10% - 基本上,没有任何东西可以被矢量化 .

    然而,当将AoS应用于质心的SoA变换时,距离计算被矢量化,与使用AoS到SoA转换的原始结构相比,导致整体运行时间减少约40% .

  • 6

    一种可能的微优化:将min_dist和min_index存储在局部变量中 . 编译器可能必须以编写方式更频繁地写入内存;在某些体系结构上,这会对性能产生很大影响 . 有关其他示例,请参阅my answer here .

    亚当斯建议同时做4比较也是一个很好的建议 .

    但是,您最好的加速来自减少必须检查的质心数量 . 理想情况下,在质心周围构建一个kd树(或类似物),然后查询它以找到最近的点 .

    如果您没有任何树构建代码,这里是我最喜欢的“穷人”最近点搜索:

    Sort the points by one coordinate, e.g. cent.pos[0]
    Pick a starting index for the query point (pt)
    Iterate forwards through the candidate points until you reach the end, OR when abs(pt.pos[0] - cent.pos[0]) > min_dist
    Repeat the previous step going the opposite direction.
    

    搜索的额外停止条件意味着您应该跳过相当多的积分;你也保证不会跳过任何比你已经找到的最好的点 .

    所以对于你的代码,这看起来像

    // sort centroid by x coordinate.
    min_index = -1;
    min_dist = numeric_limits<float>::max();
    
    // pick the start index. This works well if the points are evenly distributed.
    float min_x = centroids[0].pos[0];
    float max_x = centroids[num_centroids-1].pos[0];
    float cur_x = pt.pos[0];
    float t = (max_x - cur_x) / (max_x - min_x);
    // TODO clamp t between 0 and 1
    int start_index = int(t * float(num_centroids))
    
    // Forward search
    for (int i=start_index ; i < num_centroids; ++i)
    {
        const ClusterCentroid& cent = centroids[i];
        if (fabs(cent.pos[0] - pt.pos[0]) > min_i)
            // Everything to the right of this must be further min_dist, so break.
            // This is where the savings comes from!
            break; 
        const float dist = ...;
        if (dist < min_dist)
        {
            min_dist = dist;
            min_index = i;
        }
    }
    
    // Backwards search
    for (int i=start_index ; i >= 0; --i)
    {
        // same as above
    }
    pt.min_dist = min_dist
    pt.min_index = min_index
    

    (请注意,这假设您计算点之间的距离,但是您的程序集表明它是距离的平方 . 相应地调整中断条件) .

    构建树或对质心进行排序会产生轻微的开销,但这应该通过在更大的循环中(在点数上)更快地进行计算来抵消 .

相关问题