*Note: I'd appreciate more of a guide to how to approach and come up with these kinds of solutions rather than the solution itself.*

I have a very performance-critical function in my system showing up as a number one profiling hotspot in specific contexts. It's in the middle of a k-means iteration (already multi-threaded using a parallel for processing sub-ranges of points in each worker thread).

```
ClusterPoint& pt = points[j];
pt.min_index = -1;
pt.min_dist = numeric_limits<float>::max();
for (int i=0; i < num_centroids; ++i)
{
const ClusterCentroid& cent = centroids[i];
const float dist = ...;
if (dist < pt.min_dist) // <-- #1 hotspot
{
pt.min_dist = dist;
pt.min_index = i;
}
}
```

Any savings in the time required to process this section of code counts substantially, so I've often been fiddling with it a lot. It might be worth putting the centroid loop outside, for example, and iterate through the points in parallel for a given centroid. The number of cluster points here spans in the millions, while the number of centroids spans in the thousands. The algorithm is applied for a handful of iterations (often under 10). It doesn't seek perfect convergence/stability, just some 'reasonable' approximation.

Any ideas are appreciated, but what I'm really eager to discover is if this code can be made branchless as it would allow for a SIMD version. I haven't really developed the kind of mental ability to easily grasp how to come up with branchless solutions: my brain fails there much like it did when I was first exposed to recursion in the early days, so a guide on how to write branchless code and how to develop the appropriate mindset for it would also be helpful.

In short, I'm looking for any guides and hints and suggestions (not necessarily solutions) on how to micro-optimize this code. It most likely has room for algorithmic improvements, but my blindspot has always been in micro-optimization solutions (and I'm curious to learn how to apply them more effectively without going overboard with it). It's already tightly multithreaded with chunky parallel for logic, so I'm pretty much pushed into the micro-optimization corner as one of the quicker things to try without a smarter algorithm outright. We're completely free to change the memory layout.

## In Response to Algorithmic Suggestions

About looking at this all wrong in seeking to micro-optimize an O(knm) algorithm which could clearly be improved at the algorithmic level, I wholeheartedly agree. This pushes this specific question into a somewhat academic and impractical realm. However, if I could be allowed an anecdote, I come from an original background of high-level programming -- big emphasis on broad, large-scale viewpoint, safety, and very little on the low-level implementation details. I've recently switched projects to a very different kind of modern-flavored one and I'm learning all kinds of new tricks from my peers of cache efficiency, GPGPU, branchless techniques, SIMD, special-purpose mem allocators that actually outperform malloc (but for specific scenarios), etc.

It's where I'm trying to catch up with the latest performance trends, and surprisingly I've found that those old data structures I often favored during the 90s which were often linked/tree-type structures are actually being vastly outperformed by much more naive, brutish, micro-optimized, parallelized code applying tuned instructions over contiguous memory blocks. It's somewhat disappointing at the same time since I feel like we're fitting the algorithms more to the machine now and narrowing the possibilities this way (especially with GPGPU).

The funniest thing is that I find this type of micro-optimized, fast array-processing code much easier to maintain than the sophisticated algorithms and data structures I was using before. For a start, they're easier to generalize. Furthermore, my peers can often take a customer complaint about a specific slowdown in an area, just slap a parallel for and possibly some SIMD and call it done with a decent speed up. Algorithmic improvements can often offer substantially more, but the speed and non-intrusiveness at which these micro-optimizations can be applied has me wanting to learn more in that area, as reading papers on better algorithms can take some time (as well as require more extensive changes). So I've been jumping on that micro-optimization bandwagon a bit more lately, and perhaps a little too much in this specific case, but my curiosity is more about expanding my range of possible solutions for any scenario.

## Disassembly

*Note: I am really, really bad at assembly so I have often tuned things more in a trial and error kind of way, coming up with somewhat educated guesses about why a hotspot shown in vtune might be the bottleneck and then trying things out to see if the times improve, assuming the guesses have some hint of truth if the times do improve, or completely missed the mark if they don't.*

```
000007FEEE3FB8A1 jl thread_partition+70h (7FEEE3FB780h)
{
ClusterPoint& pt = points[j];
pt.min_index = -1;
pt.min_dist = numeric_limits<float>::max();
for (int i = 0; i < num_centroids; ++i)
000007FEEE3FB8A7 cmp ecx,r10d
000007FEEE3FB8AA jge thread_partition+1F4h (7FEEE3FB904h)
000007FEEE3FB8AC lea rax,[rbx+rbx*2]
000007FEEE3FB8B0 add rax,rax
000007FEEE3FB8B3 lea r8,[rbp+rax*8+8]
{
const ClusterCentroid& cent = centroids[i];
const float x = pt.pos[0] - cent.pos[0];
const float y = pt.pos[1] - cent.pos[1];
000007FEEE3FB8B8 movss xmm0,dword ptr [rdx]
const float z = pt.pos[2] - cent.pos[2];
000007FEEE3FB8BC movss xmm2,dword ptr [rdx+4]
000007FEEE3FB8C1 movss xmm1,dword ptr [rdx-4]
000007FEEE3FB8C6 subss xmm2,dword ptr [r8]
000007FEEE3FB8CB subss xmm0,dword ptr [r8-4]
000007FEEE3FB8D1 subss xmm1,dword ptr [r8-8]
const float dist = x*x + y*y + z*z;
000007FEEE3FB8D7 mulss xmm2,xmm2
000007FEEE3FB8DB mulss xmm0,xmm0
000007FEEE3FB8DF mulss xmm1,xmm1
000007FEEE3FB8E3 addss xmm2,xmm0
000007FEEE3FB8E7 addss xmm2,xmm1
if (dist < pt.min_dist)
// VTUNE HOTSPOT
000007FEEE3FB8EB comiss xmm2,dword ptr [rdx-8]
000007FEEE3FB8EF jae thread_partition+1E9h (7FEEE3FB8F9h)
{
pt.min_dist = dist;
000007FEEE3FB8F1 movss dword ptr [rdx-8],xmm2
pt.min_index = i;
000007FEEE3FB8F6 mov dword ptr [rdx-10h],ecx
000007FEEE3FB8F9 inc ecx
000007FEEE3FB8FB add r8,30h
000007FEEE3FB8FF cmp ecx,r10d
000007FEEE3FB902 jl thread_partition+1A8h (7FEEE3FB8B8h)
for (int j = *irange.first; j < *irange.last; ++j)
000007FEEE3FB904 inc edi
000007FEEE3FB906 add rdx,20h
000007FEEE3FB90A cmp edi,dword ptr [rsi+4]
000007FEEE3FB90D jl thread_partition+31h (7FEEE3FB741h)
000007FEEE3FB913 mov rbx,qword ptr [irange]
}
}
}
}
```

We're forced into targeting SSE 2 -- a bit behind on our times, but the user base actually tripped up once when we assumed that even SSE 4 was okay as a min requirement (the user had some prototype Intel machine).

## Update with Standalone Test: ~5.6 secs

I'm very appreciative of all the help being offered! Because the codebase is quite extensive and the conditions for triggering that code are complex (system events triggered across multiple threads), it's a bit unwieldy to make experimental changes and profile them each time. So I've set up a superficial test on the side as a standalone application that others can also run and try out so that I can experiment with all these graciously offered solutions.

```
#define _SECURE_SCL 0
#include <iostream>
#include <fstream>
#include <vector>
#include <limits>
#include <ctime>
#if defined(_MSC_VER)
#define ALIGN16 __declspec(align(16))
#else
#include <malloc.h>
#define ALIGN16 __attribute__((aligned(16)))
#endif
using namespace std;
// Aligned memory allocation (for SIMD).
static void* malloc16(size_t amount)
{
#ifdef _MSC_VER
return _aligned_malloc(amount, 16);
#else
void* mem = 0;
posix_memalign(&mem, 16, amount);
return mem;
#endif
}
template <class T>
static T* malloc16_t(size_t num_elements)
{
return static_cast<T*>(malloc16(num_elements * sizeof(T)));
}
// Aligned free.
static void free16(void* mem)
{
#ifdef _MSC_VER
return _aligned_free(mem);
#else
free(mem);
#endif
}
// Test parameters.
enum {num_centroids = 512};
enum {num_points = num_centroids * 2000};
enum {num_iterations = 5};
static const float range = 10.0f;
class Points
{
public:
Points(): data(malloc16_t<Point>(num_points))
{
for (int p=0; p < num_points; ++p)
{
const float xyz[3] =
{
range * static_cast<float>(rand()) / RAND_MAX,
range * static_cast<float>(rand()) / RAND_MAX,
range * static_cast<float>(rand()) / RAND_MAX
};
init(p, xyz);
}
}
~Points()
{
free16(data);
}
void init(int n, const float* xyz)
{
data[n].centroid = -1;
data[n].xyz[0] = xyz[0];
data[n].xyz[1] = xyz[1];
data[n].xyz[2] = xyz[2];
}
void associate(int n, int new_centroid)
{
data[n].centroid = new_centroid;
}
int centroid(int n) const
{
return data[n].centroid;
}
float* operator[](int n)
{
return data[n].xyz;
}
private:
Points(const Points&);
Points& operator=(const Points&);
struct Point
{
int centroid;
float xyz[3];
};
Point* data;
};
class Centroids
{
public:
Centroids(Points& points): data(malloc16_t<Centroid>(num_centroids))
{
// Naive initial selection algorithm, but outside the
// current area of interest.
for (int c=0; c < num_centroids; ++c)
init(c, points[c]);
}
~Centroids()
{
free16(data);
}
void init(int n, const float* xyz)
{
data[n].count = 0;
data[n].xyz[0] = xyz[0];
data[n].xyz[1] = xyz[1];
data[n].xyz[2] = xyz[2];
}
void reset(int n)
{
data[n].count = 0;
data[n].xyz[0] = 0.0f;
data[n].xyz[1] = 0.0f;
data[n].xyz[2] = 0.0f;
}
void sum(int n, const float* pt_xyz)
{
data[n].xyz[0] += pt_xyz[0];
data[n].xyz[1] += pt_xyz[1];
data[n].xyz[2] += pt_xyz[2];
++data[n].count;
}
void average(int n)
{
if (data[n].count > 0)
{
const float inv_count = 1.0f / data[n].count;
data[n].xyz[0] *= inv_count;
data[n].xyz[1] *= inv_count;
data[n].xyz[2] *= inv_count;
}
}
float* operator[](int n)
{
return data[n].xyz;
}
int find_nearest(const float* pt_xyz) const
{
float min_dist_squared = numeric_limits<float>::max();
int min_centroid = -1;
for (int c=0; c < num_centroids; ++c)
{
const float* cen_xyz = data[c].xyz;
const float x = pt_xyz[0] - cen_xyz[0];
const float y = pt_xyz[1] - cen_xyz[1];
const float z = pt_xyz[2] - cen_xyz[2];
const float dist_squared = x*x + y*y * z*z;
if (min_dist_squared > dist_squared)
{
min_dist_squared = dist_squared;
min_centroid = c;
}
}
return min_centroid;
}
private:
Centroids(const Centroids&);
Centroids& operator=(const Centroids&);
struct Centroid
{
int count;
float xyz[3];
};
Centroid* data;
};
// A high-precision real timer would be nice, but we lack C++11 and
// the coarseness of the testing here should allow this to suffice.
static double sys_time()
{
return static_cast<double>(clock()) / CLOCKS_PER_SEC;
}
static void k_means(Points& points, Centroids& centroids)
{
// Find the closest centroid for each point.
for (int p=0; p < num_points; ++p)
{
const float* pt_xyz = points[p];
points.associate(p, centroids.find_nearest(pt_xyz));
}
// Reset the data of each centroid.
for (int c=0; c < num_centroids; ++c)
centroids.reset(c);
// Compute new position sum of each centroid.
for (int p=0; p < num_points; ++p)
centroids.sum(points.centroid(p), points[p]);
// Compute average position of each centroid.
for (int c=0; c < num_centroids; ++c)
centroids.average(c);
}
int main()
{
Points points;
Centroids centroids(points);
cout << "Starting simulation..." << endl;
double start_time = sys_time();
for (int i=0; i < num_iterations; ++i)
k_means(points, centroids);
cout << "Time passed: " << (sys_time() - start_time) << " secs" << endl;
cout << "# Points: " << num_points << endl;
cout << "# Centroids: " << num_centroids << endl;
// Write the centroids to a file to give us some crude verification
// of consistency as we make changes.
ofstream out("centroids.txt");
for (int c=0; c < num_centroids; ++c)
out << "Centroid " << c << ": " << centroids[c][0] << "," << centroids[c][1] << "," << centroids[c][2] << endl;
}
```

I'm aware of the dangers of superficial testing, but since it's already deemed to be a hotspot from previous real-world sessions, I hope it's excusable. I'm also just interested in the general techniques associated with micro-optimizing such code.

I did get slightly different results in profiling this one. The times are a bit more evenly dispersed within the loop here, and I'm not sure why. Perhaps it's because the data is smaller (I omitted members and hoisted out the `min_dist`

member and made it a local variable). The exact ratio between centroids to points is also a bit different, but hopefully close enough to translate improvements here to the original code. It's also single-threaded in this superficial test, and the disassembly looks quite different so I may be risking optimizing this superficial test without the original (a risk I'm willing to take for now, as I'm more interested in expanding my knowledge of techniques that could optimize these cases rather than a solution for this exact case).

## Update with Yochai Timmer's Suggestion -- ~12.5 secs

Oh, I face the woes of micro-optimization without understanding assembly very well. I replaced this:

```
-if (min_dist_squared > dist_squared)
-{
- min_dist_squared = dist_squared;
- pt.centroid = c;
-}
```

With this:

```
+const bool found_closer = min_dist_squared > dist_squared;
+pt.centroid = bitselect(found_closer, c, pt.centroid);
+min_dist_squared = bitselect(found_closer, dist_squared, min_dist_squared);
```

.. only to find the times escalated from ~5.6 secs to ~12.5 secs. Nevertheless, that is not his fault nor does it take away from the value of his solution -- that's mine for failing to understand what's really going on at the machine level and taking stabs in the dark. That one apparently missed, and apparently I was not the victim of branch misprediction as I initially thought. Nevertheless, his proposed solution is a wonderful and generalized function to try in such cases, and I'm grateful to add it to my toolbox of tips and tricks. Now for round 2.

## Harold's SIMD Solution - 2.496 secs (see caveat)

This solution might be amazing. After converting the cluster rep to SoA, I'm getting times of ~2.5 seconds with this one! Unfortunately, there appears to be a glitch of some sort. I'm getting very different results for the final output that suggests more than slight precision differences, including some centroids towards the end with values of 0 (implying that they were not found in the search). I've been trying to go through the SIMD logic with the debugger to see what might be up -- it could merely be a transcription error on my part, but here's the code in case someone could spot the error.

If the error could be corrected without slowing down the results, this speed improvement is more than I ever imagined from a pure micro-optimization!

```
// New version of Centroids::find_nearest (from harold's solution):
int find_nearest(const float* pt_xyz) const
{
__m128i min_index = _mm_set_epi32(3, 2, 1, 0);
__m128 xdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[0]), _mm_load_ps(cen_x));
__m128 ydif = _mm_sub_ps(_mm_set1_ps(pt_xyz[1]), _mm_load_ps(cen_y));
__m128 zdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[2]), _mm_load_ps(cen_z));
__m128 min_dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif),
_mm_mul_ps(ydif, ydif)),
_mm_mul_ps(zdif, zdif));
__m128i index = min_index;
for (int i=4; i < num_centroids; i += 4)
{
xdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[0]), _mm_load_ps(cen_x + i));
ydif = _mm_sub_ps(_mm_set1_ps(pt_xyz[1]), _mm_load_ps(cen_y + i));
zdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[2]), _mm_load_ps(cen_z + i));
__m128 dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif),
_mm_mul_ps(ydif, ydif)),
_mm_mul_ps(zdif, zdif));
__m128i mask = _mm_castps_si128(_mm_cmplt_ps(dist, min_dist));
min_dist = _mm_min_ps(min_dist, dist);
min_index = _mm_or_si128(_mm_and_si128(index, mask),
_mm_andnot_si128(mask, min_index));
index = _mm_add_epi32(index, _mm_set1_epi32(4));
}
ALIGN16 float mdist[4];
ALIGN16 uint32_t mindex[4];
_mm_store_ps(mdist, min_dist);
_mm_store_si128((__m128i*)mindex, min_index);
float closest = mdist[0];
int closest_i = mindex[0];
for (int i=1; i < 4; i++)
{
if (mdist[i] < closest)
{
closest = mdist[i];
closest_i = mindex[i];
}
}
return closest_i;
}
```

## Harold's SIMD Solution (Corrected) - ~2.5 secs

After applying the corrections and testing them out, the results are intact and function correctly with similar improvements to the original codebase!

Since this hits the holy grail of knowledge I was seeking to understand better (branchless SIMD), I'm going to award the solution with some extra props for more than doubling the speed of the operation. I have my homework cut out in trying to understand it, since my goal was not merely to mitigate this hotspot, but to expand on my personal understanding of possible solutions to deal with them.

Nevertheless, I'm grateful for all the contributions here from the algorithmic suggestions to the really cool `bitselect`

trick! I wish I could accept all the answers. I may end up trying all of them at some point, but for now I have my homework cut out in understanding some of these non-arithmetical SIMD ops.

```
int find_nearest_simd(const float* pt_xyz) const
{
__m128i min_index = _mm_set_epi32(3, 2, 1, 0);
__m128 pt_xxxx = _mm_set1_ps(pt_xyz[0]);
__m128 pt_yyyy = _mm_set1_ps(pt_xyz[1]);
__m128 pt_zzzz = _mm_set1_ps(pt_xyz[2]);
__m128 xdif = _mm_sub_ps(pt_xxxx, _mm_load_ps(cen_x));
__m128 ydif = _mm_sub_ps(pt_yyyy, _mm_load_ps(cen_y));
__m128 zdif = _mm_sub_ps(pt_zzzz, _mm_load_ps(cen_z));
__m128 min_dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif),
_mm_mul_ps(ydif, ydif)),
_mm_mul_ps(zdif, zdif));
__m128i index = min_index;
for (int i=4; i < num_centroids; i += 4)
{
xdif = _mm_sub_ps(pt_xxxx, _mm_load_ps(cen_x + i));
ydif = _mm_sub_ps(pt_yyyy, _mm_load_ps(cen_y + i));
zdif = _mm_sub_ps(pt_zzzz, _mm_load_ps(cen_z + i));
__m128 dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif),
_mm_mul_ps(ydif, ydif)),
_mm_mul_ps(zdif, zdif));
index = _mm_add_epi32(index, _mm_set1_epi32(4));
__m128i mask = _mm_castps_si128(_mm_cmplt_ps(dist, min_dist));
min_dist = _mm_min_ps(min_dist, dist);
min_index = _mm_or_si128(_mm_and_si128(index, mask),
_mm_andnot_si128(mask, min_index));
}
ALIGN16 float mdist[4];
ALIGN16 uint32_t mindex[4];
_mm_store_ps(mdist, min_dist);
_mm_store_si128((__m128i*)mindex, min_index);
float closest = mdist[0];
int closest_i = mindex[0];
for (int i=1; i < 4; i++)
{
if (mdist[i] < closest)
{
closest = mdist[i];
closest_i = mindex[i];
}
}
return closest_i;
}
```