001package com.alexkasko.unsafe.offheapstruct;
002
003import com.alexkasko.unsafe.offheap.OffHeapDisposableIterator;
004import com.alexkasko.unsafe.offheap.OffHeapUtils;
005
006import java.util.ArrayList;
007import java.util.List;
008import java.util.NoSuchElementException;
009import java.util.concurrent.Callable;
010import java.util.concurrent.ExecutionException;
011import java.util.concurrent.ExecutorService;
012import java.util.concurrent.Future;
013
014import static com.alexkasko.unsafe.offheapstruct.OffHeapStructSorter.INSERTION_SORT_THRESHOLD;
015
016/**
017 * <p>alexkasko: borrowed from {@code https://android.googlesource.com/platform/libcore/+/android-4.2.2_r1/luni/src/main/java/java/util/DualPivotQuicksort.java}
018 * and adapted to {@link OffHeapStructCollection} with signed int keys.
019 *
020 * <p>This class implements the Dual-Pivot Quicksort algorithm by
021 * Vladimir Yaroslavskiy, Jon Bentley, and Joshua Bloch. The algorithm
022 * offers O(n log(n)) performance on many data sets that cause other
023 * quicksorts to degrade to quadratic performance, and is typically
024 * faster than traditional (one-pivot) Quicksort implementations.
025 *
026 * @author Vladimir Yaroslavskiy
027 * @author Jon Bentley
028 * @author Josh Bloch
029 *
030 * @version 2009.11.29 m765.827.12i
031 */
032class OffHeapStructSorterInt {
033
034    /**
035     * Partially sorts collection and returns fully sorted iterator over it
036     *
037     * @param executor   executor service for parallel sorting
038     * @param threads    number of worker threads to use
039     * @param a          the off-heap struct collection to be sorted
040     * @param keyOffset key offset
041     * @return sorted iterator over the collection
042     * @throws IllegalArgumentException {@code if (fromIndex < 0 || fromIndex > toIndex || toIndex > a.size())}
043     * @throws RuntimeException         on worker thread error
044     */
045    static OffHeapDisposableIterator<byte[]> sortedIterator(ExecutorService executor, int threads, OffHeapStructCollection a, int keyOffset) {
046        return sortedIterator(executor, threads, a, 0, a.size(), keyOffset);
047    }
048
049    /**
050     * Partially sorts part of the collection and returns fully sorted iterator over it
051     *
052     * @param executor   executor service for parallel sorting
053     * @param threads    number of worker threads to use
054     * @param a          the off-heap struct collection to be sorted
055     * @param fromIndex  the index of the first element, inclusive, to be sorted
056     * @param toIndex    the index of the last element, exclusive, to be sorted
057     * @param keyOffset key offset
058     * @return sorted iterator over the collection part
059     * @throws IllegalArgumentException {@code if (fromIndex < 0 || fromIndex > toIndex || toIndex > a.size())}
060     * @throws RuntimeException         on worker thread error
061     */
062    static OffHeapDisposableIterator<byte[]> sortedIterator(ExecutorService executor, int threads, OffHeapStructCollection a, long fromIndex,
063                                           long toIndex, int keyOffset) {
064        if (fromIndex < 0 || fromIndex > toIndex || toIndex > a.size()) {
065            throw new IllegalArgumentException("Illegal input, collection size: [" + a.size() + "], " +
066                    "fromIndex: [" + fromIndex + "], toIndex: [" + toIndex + "]");
067        }
068        int len = a.structLength();
069        long step = (toIndex - 1 - fromIndex) / threads;
070        if (0 == step) {
071            doSort(a, fromIndex, toIndex - 1, keyOffset, new byte[len], new byte[len],
072                    new byte[len], new byte[len], new byte[len], new byte[len], new byte[len]);
073            return a.iterator();
074        } else {
075            List<Worker> workers = new ArrayList<Worker>();
076            for (int start = 0; start < toIndex; start += step) {
077                Worker worker = new Worker(a, start, Math.min(start + step - 1, toIndex - 1), keyOffset,
078                        new byte[len], new byte[len], new byte[len], new byte[len], new byte[len], new byte[len], new byte[len]);
079                workers.add(worker);
080            }
081            Worker.invokeAndWait(executor, workers);
082            return new MergeIter(a, keyOffset, workers);
083        }
084    }
085
086    /**
087     * Sorts the specified off-heap struct collection into ascending order using signed int struct key.
088     *
089     * @param a the off-heap struct collection to be sorted
090     * @param keyOffset int key field offset within stuct bounds
091     */
092    static void sort(OffHeapStructCollection a, int keyOffset) {
093        sort(a, 0, a.size(), keyOffset);
094    }
095
096    /**
097     * Sorts the specified range of the off-heap struct collection into ascending order using signed int struct key. The range
098     * to be sorted extends from the index {@code fromIndex}, inclusive, to
099     * the index {@code toIndex}, exclusive. If {@code fromIndex == toIndex},
100     * the range to be sorted is empty (and the call is a no-op).
101     *
102     * @param a the off-heap struct collection to be sorted
103     * @param fromIndex the index of the first element, inclusive, to be sorted
104     * @param toIndex the index of the last element, exclusive, to be sorted
105     * @param keyOffset int sort key field offset within stuct bounds
106     * @throws IllegalArgumentException {@code if (fromIndex < 0 || fromIndex > toIndex || toIndex > a.size())}
107     */
108    static void sort(OffHeapStructCollection a, long fromIndex, long toIndex, int keyOffset) {
109        if (fromIndex < 0 || fromIndex > toIndex || toIndex > a.size()) {
110            throw new IllegalArgumentException("Illegal input, collection size: [" + a.size() + "], " +
111                    "fromIndex: [" + fromIndex + "], toIndex: [" + toIndex + "]");
112        }
113        int len = a.structLength();
114        doSort(a, fromIndex, toIndex - 1, keyOffset, new byte[len], new byte[len], new byte[len], new byte[len], new byte[len],
115                new byte[len], new byte[len]);
116    }
117
118
119    /**
120     * Sorts the specified range of the off-heap struct collection into ascending order using signed int struct key.
121     * This method differs from the public {@code sort} method in that the
122     * {@code right} index is inclusive, and it does no range checking on
123     * {@code left} or {@code right}.
124     *
125     * @param a the off-heap header-payload collection to be sorted
126     * @param left the index of the first element, inclusive, to be sorted
127     * @param right the index of the last element, inclusive, to be sorted* @param keyOffset
128     * @param keyOffset int sort key field offset within stuct bounds
129     * @param pi temporary buffer for structs
130     * @param pj temporary buffer for structs
131     * @param pe1 temporary buffer for structs
132     * @param pe2 temporary buffer for structs
133     * @param pe3 temporary buffer for structs
134     * @param pe4 temporary buffer for structs
135     * @param pe5 temporary buffer for structs
136     */
137    private static void doSort(OffHeapStructCollection a, long left, long right, int keyOffset, byte[] pi, byte[] pj,
138                               byte[] pe1, byte[] pe2, byte[] pe3, byte[] pe4, byte[] pe5) {
139        // Use insertion sort on tiny arrays
140        if (right - left + 1 < INSERTION_SORT_THRESHOLD) {
141            for (long i = left + 1; i <= right; i++) {
142                long ai = a.getInt(i, keyOffset);
143                a.get(i, pi);
144                long j;
145                for (j = i - 1; j >= left && ai < a.getInt(j, keyOffset); j--) {
146                    a.get(j, pj);
147                    a.set(j + 1, pj);
148                }
149                a.set(j + 1, pi);
150            }
151        } else { // Use Dual-Pivot Quicksort on large arrays
152            dualPivotQuicksort(a, left, right, keyOffset, pi, pj, pe1, pe2, pe3, pe4, pe5);
153        }
154    }
155
156    /**
157     * Sorts the specified range of the off-heap struct collection into ascending order by the
158     * Dual-Pivot Quicksort algorithm using signed int struct key.
159     *
160     * @param a the off-heap header-payload collection to be sorted
161     * @param left the index of the first element, inclusive, to be sorted
162     * @param right the index of the last element, inclusive, to be sorted
163     * @param keyOffset int sort key field offset within stuct bounds
164     * @param pi temporary buffer for structs
165     * @param pj temporary buffer for structs
166     * @param pe1 temporary buffer for structs
167     * @param pe2 temporary buffer for structs
168     * @param pe3 temporary buffer for structs
169     * @param pe4 temporary buffer for structs
170     * @param pe5 temporary buffer for structs
171     */
172    private static void dualPivotQuicksort(OffHeapStructCollection a, long left, long right, int keyOffset, byte[] pi, byte[] pj,
173                                           byte[] pe1, byte[] pe2, byte[] pe3, byte[] pe4, byte[] pe5) {
174        // Compute indices of five evenly spaced elements
175        long sixth = (right - left + 1) / 6;
176        long e1 = left  + sixth;
177        long e5 = right - sixth;
178        long e3 = (left + right) >>> 1; // The midpoint
179        long e4 = e3 + sixth;
180        long e2 = e3 - sixth;
181
182        // Sort these elements using a 5-element sorting network
183        long ae1 = a.getInt(e1, keyOffset), ae2 = a.getInt(e2, keyOffset), ae3 = a.getInt(e3, keyOffset),
184                ae4 = a.getInt(e4, keyOffset), ae5 = a.getInt(e5, keyOffset);
185        a.get(e1, pe1); a.get(e2, pe2); a.get(e3, pe3); a.get(e4, pe4); a.get(e5, pe5);
186
187        if (ae1 > ae2) { long t = ae1; byte[] pt = pe1; ae1 = ae2; pe1 = pe2; ae2 = t; pe2 = pt; }
188        if (ae4 > ae5) { long t = ae4; byte[] pt = pe4; ae4 = ae5; pe4 = pe5; ae5 = t; pe5 = pt; }
189        if (ae1 > ae3) { long t = ae1; byte[] pt = pe1; ae1 = ae3; pe1 = pe3; ae3 = t; pe3 = pt; }
190        if (ae2 > ae3) { long t = ae2; byte[] pt = pe2; ae2 = ae3; pe2 = pe3; ae3 = t; pe3 = pt; }
191        if (ae1 > ae4) { long t = ae1; byte[] pt = pe1; ae1 = ae4; pe1 = pe4; ae4 = t; pe4 = pt; }
192        if (ae3 > ae4) { long t = ae3; byte[] pt = pe3; ae3 = ae4; pe3 = pe4; ae4 = t; pe4 = pt; }
193        if (ae2 > ae5) { long t = ae2; byte[] pt = pe2; ae2 = ae5; pe2 = pe5; ae5 = t; pe5 = pt; }
194        if (ae2 > ae3) { long t = ae2; byte[] pt = pe2; ae2 = ae3; pe2 = pe3; ae3 = t; pe3 = pt; }
195        if (ae4 > ae5) { long t = ae4; byte[] pt = pe4; ae4 = ae5; pe4 = pe5; ae5 = t; pe5 = pt; }
196
197        a.set(e1, pe1); a.set(e3, pe3); a.set(e5, pe5);
198
199        /*
200         * Use the second and fourth of the five sorted elements as pivots.
201         * These values are inexpensive approximations of the first and
202         * second terciles of the array. Note that pivot1 <= pivot2.
203         *
204         * The pivots are stored in local variables, and the first and
205         * the last of the elements to be sorted are moved to the locations
206         * formerly occupied by the pivots. When partitioning is complete,
207         * the pivots are swapped back into their final positions, and
208         * excluded from subsequent sorting.
209         */
210        a.get(left, pe1);
211        long pivot1 = ae2; a.set(e2, pe1);
212        a.get(right, pe1);
213        long pivot2 = ae4; a.set(e4, pe1);
214
215        // Pointers
216        long less  = left  + 1; // The index of first element of center part
217        long great = right - 1; // The index before first element of right part
218
219        boolean pivotsDiffer = (pivot1 != pivot2);
220
221        if (pivotsDiffer) {
222            /*
223             * Partitioning:
224             *
225             *   left part         center part                    right part
226             * +------------------------------------------------------------+
227             * | < pivot1  |  pivot1 <= && <= pivot2  |    ?    |  > pivot2 |
228             * +------------------------------------------------------------+
229             *              ^                          ^       ^
230             *              |                          |       |
231             *             less                        k     great
232             *
233             * Invariants:
234             *
235             *              all in (left, less)   < pivot1
236             *    pivot1 <= all in [less, k)     <= pivot2
237             *              all in (great, right) > pivot2
238             *
239             * Pointer k is the first index of ?-part
240             */
241            outer:
242            for (long k = less; k <= great; k++) {
243                long ak = a.getInt(k, keyOffset);
244                a.get(k, pe1);
245                if (ak < pivot1) { // Move a[k] to left part
246                    if (k != less) {
247                        a.get(less, pe3);
248                        a.set(k, pe3);
249                        a.set(less, pe1);
250                    }
251                    less++;
252                } else if (ak > pivot2) { // Move a[k] to right part
253                    while (a.getInt(great, keyOffset) > pivot2) {
254                        if (great-- == k) {
255                            break outer;
256                        }
257                    }
258                    if (a.getInt(great, keyOffset) < pivot1) {
259                        a.get(less, pe3);
260                        a.set(k, pe3);
261                        a.get(great, pe3);
262                        a.set(less++, pe3);
263                        a.set(great--, pe1);
264                    } else { // pivot1 <= a[great] <= pivot2
265                        a.get(great, pe3);
266                        a.set(k, pe3);
267                        a.set(great--, pe1);
268                    }
269                }
270            }
271        } else { // Pivots are equal
272            /*
273             * Partition degenerates to the traditional 3-way,
274             * or "Dutch National Flag", partition:
275             *
276             *   left part   center part            right part
277             * +----------------------------------------------+
278             * |  < pivot  |  == pivot  |    ?    |  > pivot  |
279             * +----------------------------------------------+
280             *              ^            ^       ^
281             *              |            |       |
282             *             less          k     great
283             *
284             * Invariants:
285             *
286             *   all in (left, less)   < pivot
287             *   all in [less, k)     == pivot
288             *   all in (great, right) > pivot
289             *
290             * Pointer k is the first index of ?-part
291             */
292            for (long k = less; k <= great; k++) {
293                long ak = a.getInt(k, keyOffset);
294                a.get(k, pe1);
295                if (ak == pivot1) {
296                    continue;
297                }
298                if (ak < pivot1) { // Move a[k] to left part
299                    if (k != less) {
300                        a.get(less, pe3);
301                        a.set(k, pe3);
302                        a.set(less, pe1);
303                    }
304                    less++;
305                } else { // (a[k] > pivot1) -  Move a[k] to right part
306                    /*
307                     * We know that pivot1 == a[e3] == pivot2. Thus, we know
308                     * that great will still be >= k when the following loop
309                     * terminates, even though we don't test for it explicitly.
310                     * In other words, a[e3] acts as a sentinel for great.
311                     */
312                    while (a.getInt(great, keyOffset) > pivot1) {
313                        great--;
314                    }
315                    if (a.getInt(great, keyOffset) < pivot1) {
316                        a.get(less, pe3);
317                        a.set(k, pe3);
318                        a.get(great, pe3);
319                        a.set(less++, pe3);
320                        a.set(great--, pe1);
321                    } else { // a[great] == pivot1
322                        a.get(great, pe3);
323                        a.set(k, pe3);
324                        a.set(great--, pe1);
325                    }
326                }
327            }
328        }
329
330        // Swap pivots into their final positions
331        a.get(less - 1, pe1);
332        a.set(left, pe1); a.set(less - 1, pe2);
333        a.get(great + 1, pe1);
334        a.set(right, pe1); a.set(great + 1, pe4);
335
336        // Sort left and right parts recursively, excluding known pivot values
337        doSort(a, left,   less - 2, keyOffset, pi, pj, pe1, pe2, pe3, pe4, pe5);
338        doSort(a, great + 2, right, keyOffset, pi, pj, pe1, pe2, pe3, pe4, pe5);
339
340        /*
341         * If pivot1 == pivot2, all elements from center
342         * part are equal and, therefore, already sorted
343         */
344        if (!pivotsDiffer) {
345            return;
346        }
347
348        /*
349         * If center part is too large (comprises > 2/3 of the array),
350         * swap internal pivot values to ends
351         */
352        if (less < e1 && great > e5) {
353            while (a.getInt(less, keyOffset) == pivot1) {
354                less++;
355            }
356            while (a.getInt(great, keyOffset) == pivot2) {
357                great--;
358            }
359
360            /*
361             * Partitioning:
362             *
363             *   left part       center part                   right part
364             * +----------------------------------------------------------+
365             * | == pivot1 |  pivot1 < && < pivot2  |    ?    | == pivot2 |
366             * +----------------------------------------------------------+
367             *              ^                        ^       ^
368             *              |                        |       |
369             *             less                      k     great
370             *
371             * Invariants:
372             *
373             *              all in (*, less)  == pivot1
374             *     pivot1 < all in [less, k)   < pivot2
375             *              all in (great, *) == pivot2
376             *
377             * Pointer k is the first index of ?-part
378             */
379            outer:
380            for (long k = less; k <= great; k++) {
381                long ak = a.getInt(k, keyOffset);
382                a.get(k, pe1);
383                if (ak == pivot2) { // Move a[k] to right part
384                    while (a.getInt(great, keyOffset) == pivot2) {
385                        if (great-- == k) {
386                            break outer;
387                        }
388                    }
389                    if (a.getInt(great, keyOffset) == pivot1) {
390                        a.get(less, pe3);
391                        a.set(k, pe3);
392                        a.get(great, pe3);
393                        a.set(less++, pe3);
394                    } else { // pivot1 < a[great] < pivot2
395                        a.get(great, pe3);
396                        a.set(k, pe3);
397                    }
398                    a.set(great--, pe1);
399                } else if (ak == pivot1) { // Move a[k] to left part
400                    a.get(less, pe3);
401                    a.set(k, pe3);
402                    a.set(less++, pe1);
403                }
404            }
405        }
406
407        // Sort center part recursively, excluding known pivot values
408        doSort(a, less, great, keyOffset, pi, pj, pe1, pe2, pe3, pe4, pe5);
409    }
410
411    /**
412     * Worker, sorts part of the collection
413     */
414    private static class Worker implements Callable<Void> {
415        private final OffHeapStructCollection a;
416        final long left;
417        final long right;
418        private final int keyOffset;
419        private final byte[] pi;
420        private final byte[] pj;
421        private final byte[] pe1;
422        private final byte[] pe2;
423        private final byte[] pe3;
424        private final byte[] pe4;
425        private final byte[] pe5;
426
427        /**
428         * Private constructor
429         *
430         * @param a collection
431         * @param left start sort index
432         * @param right end sort index
433         * @param keyOffset sort key offset
434         * @param pi sort buffer
435         * @param pj sort buffer
436         * @param pe1 sort buffer
437         * @param pe2 sort buffer
438         * @param pe3 sort buffer
439         * @param pe4 sort buffer
440         * @param pe5 sort buffer
441         */
442        private Worker(OffHeapStructCollection a, long left, long right, int keyOffset, byte[] pi,
443                       byte[] pj, byte[] pe1, byte[] pe2, byte[] pe3, byte[] pe4, byte[] pe5) {
444            this.a = a;
445            this.left = left;
446            this.right = right;
447            this.keyOffset = keyOffset;
448            this.pi = pi;
449            this.pj = pj;
450            this.pe1 = pe1;
451            this.pe2 = pe2;
452            this.pe3 = pe3;
453            this.pe4 = pe4;
454            this.pe5 = pe5;
455        }
456
457        /**
458         * {@inheritDoc}
459         */
460        @Override
461        public Void call() throws Exception {
462            doSort(a, left, right, keyOffset, pi, pj, pe1, pe2, pe3, pe4, pe5);
463            return null;
464        }
465
466        /**
467         * Helper method to invoke and wait
468         *
469         * @param executor executor instance
470         * @param workers workers list
471         */
472        private static void invokeAndWait(ExecutorService executor, List<Worker> workers) {
473            try {
474                List<Future<Void>> futures = executor.invokeAll(workers);
475                for (Future<Void> fu : futures) {
476                    fu.get();
477                }
478            } catch (InterruptedException e) {
479                throw new RuntimeException(e);
480            } catch (ExecutionException e) {
481                throw new RuntimeException(e);
482            }
483        }
484    }
485
486    /**
487     * Merge-sort iterator over partly sorted collection
488     */
489    private static class MergeIter implements OffHeapDisposableIterator<byte[]> {
490        private final OffHeapStructCollection col;
491        private final int keyOffset;
492        private final List<Worker> bounds;
493        private final long[] indices;
494        private final byte[] buf;
495
496        /**
497         * Private constructor
498         *
499         * @param col partly sorted collection
500         * @param keyOffset key offset used to partially sort collection
501         * @param bounds collection sorted parts bounds
502         */
503        private MergeIter(OffHeapStructCollection col, int keyOffset, List<Worker> bounds) {
504            this.col = col;
505            this.keyOffset = keyOffset;
506            this.bounds = bounds;
507            this.buf = new byte[col.structLength()];
508            this.indices = new long[bounds.size()];
509            for (int i = 0; i < bounds.size(); i++) {
510                indices[i] = bounds.get(i).left;
511            }
512        }
513
514        /**
515         * {@inheritDoc}
516         */
517        @Override
518        public boolean hasNext() {
519            for (int i = 0; i < indices.length; i++) {
520                if (indices[i] <= bounds.get(i).right) return true;
521            }
522            return false;
523        }
524
525        /**
526         * {@inheritDoc}
527         */
528        @Override
529        public byte[] next() {
530            int minind = -1;
531            for (int i = 0; i < indices.length; i++) {
532                if (indices[i] > bounds.get(i).right) continue;
533                if (-1 == minind) minind = i;
534                if (col.getInt(indices[minind], keyOffset) > col.getInt(indices[i], keyOffset)) minind = i;
535            }
536            if (-1 == minind) throw new NoSuchElementException();
537            col.get(indices[minind], buf);
538            indices[minind] += 1;
539            return buf;
540        }
541
542        /**
543         * {@inheritDoc}
544         */
545        @Override
546        public void remove() {
547            throw new UnsupportedOperationException("remove");
548        }
549
550        /**
551         * {@inheritDoc}
552         */
553        @Override
554        public void free() {
555            OffHeapUtils.free(col);
556        }
557
558        /**
559         * {@inheritDoc}
560         */
561        @Override
562        public long size() {
563            return col.size();
564        }
565    }
566}