001package com.alexkasko.unsafe.offheapstruct;
002
003import com.alexkasko.unsafe.offheap.OffHeapDisposableIterator;
004import com.alexkasko.unsafe.offheap.OffHeapUtils;
005
006import java.util.*;
007import java.util.concurrent.*;
008
009import static com.alexkasko.unsafe.offheapstruct.OffHeapStructSorter.INSERTION_SORT_THRESHOLD;
010
011/**
012 * <p>alexkasko: borrowed from {@code https://android.googlesource.com/platform/libcore/+/android-4.2.2_r1/luni/src/main/java/java/util/DualPivotQuicksort.java}
013 * and adapted to {@link OffHeapStructCollection} with signed long keys.
014 *
015 * <p>This class implements the Dual-Pivot Quicksort algorithm by
016 * Vladimir Yaroslavskiy, Jon Bentley, and Joshua Bloch. The algorithm
017 * offers O(n log(n)) performance on many data sets that cause other
018 * quicksorts to degrade to quadratic performance, and is typically
019 * faster than traditional (one-pivot) Quicksort implementations.
020 *
021 * @author Vladimir Yaroslavskiy
022 * @author Jon Bentley
023 * @author Josh Bloch
024 *
025 * @version 2009.11.29 m765.827.12i
026 */
027class OffHeapStructSorterLong {
028
029    /**
030     * Partially sorts collection and returns fully sorted iterator over it
031     *
032     * @param executor   executor service for parallel sorting
033     * @param threads    number of worker threads to use
034     * @param a          the off-heap struct collection to be sorted
035     * @param keyOffset key offset
036     * @return sorted iterator over the collection
037     * @throws IllegalArgumentException {@code if (fromIndex < 0 || fromIndex > toIndex || toIndex > a.size())}
038     * @throws RuntimeException         on worker thread error
039     */
040    static OffHeapDisposableIterator<byte[]> sortedIterator(ExecutorService executor, int threads, OffHeapStructCollection a, int keyOffset) {
041        return sortedIterator(executor, threads, a, 0, a.size(), keyOffset);
042    }
043
044    /**
045     * Partially sorts part of the collection and returns fully sorted iterator over it
046     *
047     * @param executor   executor service for parallel sorting
048     * @param threads    number of worker threads to use
049     * @param a          the off-heap struct collection to be sorted
050     * @param fromIndex  the index of the first element, inclusive, to be sorted
051     * @param toIndex    the index of the last element, exclusive, to be sorted
052     * @param keyOffset key offset
053     * @return sorted iterator over the collection part
054     * @throws IllegalArgumentException {@code if (fromIndex < 0 || fromIndex > toIndex || toIndex > a.size())}
055     * @throws RuntimeException         on worker thread error
056     */
057    static OffHeapDisposableIterator<byte[]> sortedIterator(ExecutorService executor, int threads, OffHeapStructCollection a, long fromIndex,
058                                           long toIndex, int keyOffset) {
059        if (fromIndex < 0 || fromIndex > toIndex || toIndex > a.size()) {
060            throw new IllegalArgumentException("Illegal input, collection size: [" + a.size() + "], " +
061                    "fromIndex: [" + fromIndex + "], toIndex: [" + toIndex + "]");
062        }
063        int len = a.structLength();
064        long step = (toIndex - 1 - fromIndex) / threads;
065        if (0 == step) {
066            doSort(a, fromIndex, toIndex - 1, keyOffset, new byte[len], new byte[len],
067                    new byte[len], new byte[len], new byte[len], new byte[len], new byte[len]);
068            return a.iterator();
069        } else {
070            List<Worker> workers = new ArrayList<Worker>();
071            for (int start = 0; start < toIndex; start += step) {
072                Worker worker = new Worker(a, start, Math.min(start + step - 1, toIndex - 1), keyOffset,
073                        new byte[len], new byte[len], new byte[len], new byte[len], new byte[len], new byte[len], new byte[len]);
074                workers.add(worker);
075            }
076            Worker.invokeAndWait(executor, workers);
077            return new MergeIter(a, keyOffset, workers);
078        }
079    }
080
081    /**
082     * Sorts collection by two long keys. Second key sorting is done in parallel.
083     *
084     * @param executor   executor for parallel sorting
085     * @param threads    number of worker threads to use
086     * @param a          the off-heap struct collection to be sorted
087     * @param key1Offset first key offset
088     * @param key2Offset second key offset
089     */
090    static void sortTwoKeysParallel(Executor executor, int threads, OffHeapStructCollection a, int key1Offset, int key2Offset) {
091        // parent
092        sort(a, 0, a.size(), key1Offset);
093        long childStart = 0;
094        long cur = a.getLong(childStart, key1Offset);
095        long size = a.size();
096        List<KeyWorker> workers = new ArrayList<KeyWorker>();
097        for (long i = 1; i < size; i++) {
098            long el = a.getLong(i, key1Offset);
099            if (el != cur) {
100                workers.add(new KeyWorker(a, childStart, i, key2Offset));
101                childStart = i;
102                cur = a.getLong(childStart, key1Offset);
103            }
104        }
105        // sort tail
106        if (childStart < size - 1) workers.add(new KeyWorker(a, childStart, size, key2Offset));
107        // invoke
108        new LimitedInvoker(executor, threads).invokeAll(workers);
109    }
110
111    /**
112     * Sorts the specified off-heap struct collection into ascending order using signed long struct key.
113     *
114     * @param a the off-heap struct collection to be sorted
115     * @param keyOffset long key field offset within stuct bounds
116     */
117    static void sort(OffHeapStructCollection a, int keyOffset) {
118        sort(a, 0, a.size(), keyOffset);
119    }
120
121    /**
122     * Sorts the specified range of the off-heap struct collection into ascending order using signed long struct key.
123     * The range to be sorted extends from the index {@code fromIndex}, inclusive, to
124     * the index {@code toIndex}, exclusive. If {@code fromIndex == toIndex},
125     * the range to be sorted is empty (and the call is a no-op).
126     *
127     * @param a the off-heap struct collection to be sorted
128     * @param fromIndex the index of the first element, inclusive, to be sorted
129     * @param toIndex the index of the last element, exclusive, to be sorted
130     * @param keyOffset long sort key field offset within stuct bounds
131     * @throws IllegalArgumentException {@code if (fromIndex < 0 || fromIndex > toIndex || toIndex > a.size())}
132     */
133    static void sort(OffHeapStructCollection a, long fromIndex, long toIndex, int keyOffset) {
134        if (fromIndex < 0 || fromIndex > toIndex || toIndex > a.size()) {
135            throw new IllegalArgumentException("Illegal input, collection size: [" + a.size() + "], " +
136                    "fromIndex: [" + fromIndex + "], toIndex: [" + toIndex + "]");
137        }
138        int len = a.structLength();
139        doSort(a, fromIndex, toIndex - 1, keyOffset, new byte[len], new byte[len], new byte[len], new byte[len], new byte[len],
140                new byte[len], new byte[len]);
141    }
142
143
144    /**
145     * Sorts the specified range of the off-heap struct collection into ascending order using signed long struct key.
146     * This method differs from the public {@code sort} method in that the
147     * {@code right} index is inclusive, and it does no range checking on
148     * {@code left} or {@code right}.
149     *
150     * @param a the off-heap header-payload collection to be sorted
151     * @param left the index of the first element, inclusive, to be sorted
152     * @param right the index of the last element, inclusive, to be sorted* @param keyOffset
153     * @param keyOffset long sort key field offset within stuct bounds
154     * @param pi temporary buffer for structs
155     * @param pj temporary buffer for structs
156     * @param pe1 temporary buffer for structs
157     * @param pe2 temporary buffer for structs
158     * @param pe3 temporary buffer for structs
159     * @param pe4 temporary buffer for structs
160     * @param pe5 temporary buffer for structs
161     */
162    private static void doSort(OffHeapStructCollection a, long left, long right, int keyOffset, byte[] pi, byte[] pj,
163                               byte[] pe1, byte[] pe2, byte[] pe3, byte[] pe4, byte[] pe5) {
164        // Use insertion sort on tiny arrays
165        if (right - left + 1 < INSERTION_SORT_THRESHOLD) {
166            for (long i = left + 1; i <= right; i++) {
167                long ai = a.getLong(i, keyOffset);
168                a.get(i, pi);
169                long j;
170                for (j = i - 1; j >= left && ai < a.getLong(j, keyOffset); j--) {
171                    a.get(j, pj);
172                    a.set(j + 1, pj);
173                }
174                a.set(j + 1, pi);
175            }
176        } else { // Use Dual-Pivot Quicksort on large arrays
177            dualPivotQuicksort(a, left, right, keyOffset, pi, pj, pe1, pe2, pe3, pe4, pe5);
178        }
179    }
180
181    /**
182     * Sorts the specified range of the off-heap struct collection into ascending order by the
183     * Dual-Pivot Quicksort algorithm using signed long struct key.
184     *
185     * @param a the off-heap header-payload collection to be sorted
186     * @param left the index of the first element, inclusive, to be sorted
187     * @param right the index of the last element, inclusive, to be sorted
188     * @param keyOffset long sort key field offset within stuct bounds
189     * @param pi temporary buffer for structs
190     * @param pj temporary buffer for structs
191     * @param pe1 temporary buffer for structs
192     * @param pe2 temporary buffer for structs
193     * @param pe3 temporary buffer for structs
194     * @param pe4 temporary buffer for structs
195     * @param pe5 temporary buffer for structs
196     */
197    private static void dualPivotQuicksort(OffHeapStructCollection a, long left, long right, int keyOffset, byte[] pi, byte[] pj,
198                                           byte[] pe1, byte[] pe2, byte[] pe3, byte[] pe4, byte[] pe5) {
199        // Compute indices of five evenly spaced elements
200        long sixth = (right - left + 1) / 6;
201        long e1 = left  + sixth;
202        long e5 = right - sixth;
203        long e3 = (left + right) >>> 1; // The midpoint
204        long e4 = e3 + sixth;
205        long e2 = e3 - sixth;
206
207        // Sort these elements using a 5-element sorting network
208        long ae1 = a.getLong(e1, keyOffset), ae2 = a.getLong(e2, keyOffset), ae3 = a.getLong(e3, keyOffset),
209                ae4 = a.getLong(e4, keyOffset), ae5 = a.getLong(e5, keyOffset);
210        a.get(e1, pe1); a.get(e2, pe2); a.get(e3, pe3); a.get(e4, pe4); a.get(e5, pe5);
211
212        if (ae1 > ae2) { long t = ae1; byte[] pt = pe1; ae1 = ae2; pe1 = pe2; ae2 = t; pe2 = pt; }
213        if (ae4 > ae5) { long t = ae4; byte[] pt = pe4; ae4 = ae5; pe4 = pe5; ae5 = t; pe5 = pt; }
214        if (ae1 > ae3) { long t = ae1; byte[] pt = pe1; ae1 = ae3; pe1 = pe3; ae3 = t; pe3 = pt; }
215        if (ae2 > ae3) { long t = ae2; byte[] pt = pe2; ae2 = ae3; pe2 = pe3; ae3 = t; pe3 = pt; }
216        if (ae1 > ae4) { long t = ae1; byte[] pt = pe1; ae1 = ae4; pe1 = pe4; ae4 = t; pe4 = pt; }
217        if (ae3 > ae4) { long t = ae3; byte[] pt = pe3; ae3 = ae4; pe3 = pe4; ae4 = t; pe4 = pt; }
218        if (ae2 > ae5) { long t = ae2; byte[] pt = pe2; ae2 = ae5; pe2 = pe5; ae5 = t; pe5 = pt; }
219        if (ae2 > ae3) { long t = ae2; byte[] pt = pe2; ae2 = ae3; pe2 = pe3; ae3 = t; pe3 = pt; }
220        if (ae4 > ae5) { long t = ae4; byte[] pt = pe4; ae4 = ae5; pe4 = pe5; ae5 = t; pe5 = pt; }
221
222        a.set(e1, pe1); a.set(e3, pe3); a.set(e5, pe5);
223
224        /*
225         * Use the second and fourth of the five sorted elements as pivots.
226         * These values are inexpensive approximations of the first and
227         * second terciles of the array. Note that pivot1 <= pivot2.
228         *
229         * The pivots are stored in local variables, and the first and
230         * the last of the elements to be sorted are moved to the locations
231         * formerly occupied by the pivots. When partitioning is complete,
232         * the pivots are swapped back into their final positions, and
233         * excluded from subsequent sorting.
234         */
235        a.get(left, pe1);
236        long pivot1 = ae2; a.set(e2, pe1);
237        a.get(right, pe1);
238        long pivot2 = ae4; a.set(e4, pe1);
239
240        // Pointers
241        long less  = left  + 1; // The index of first element of center part
242        long great = right - 1; // The index before first element of right part
243
244        boolean pivotsDiffer = (pivot1 != pivot2);
245
246        if (pivotsDiffer) {
247            /*
248             * Partitioning:
249             *
250             *   left part         center part                    right part
251             * +------------------------------------------------------------+
252             * | < pivot1  |  pivot1 <= && <= pivot2  |    ?    |  > pivot2 |
253             * +------------------------------------------------------------+
254             *              ^                          ^       ^
255             *              |                          |       |
256             *             less                        k     great
257             *
258             * Invariants:
259             *
260             *              all in (left, less)   < pivot1
261             *    pivot1 <= all in [less, k)     <= pivot2
262             *              all in (great, right) > pivot2
263             *
264             * Pointer k is the first index of ?-part
265             */
266            outer:
267            for (long k = less; k <= great; k++) {
268                long ak = a.getLong(k, keyOffset);
269                a.get(k, pe1);
270                if (ak < pivot1) { // Move a[k] to left part
271                    if (k != less) {
272                        a.get(less, pe3);
273                        a.set(k, pe3);
274                        a.set(less, pe1);
275                    }
276                    less++;
277                } else if (ak > pivot2) { // Move a[k] to right part
278                    while (a.getLong(great, keyOffset) > pivot2) {
279                        if (great-- == k) {
280                            break outer;
281                        }
282                    }
283                    if (a.getLong(great, keyOffset) < pivot1) {
284                        a.get(less, pe3);
285                        a.set(k, pe3);
286                        a.get(great, pe3);
287                        a.set(less++, pe3);
288                        a.set(great--, pe1);
289                    } else { // pivot1 <= a[great] <= pivot2
290                        a.get(great, pe3);
291                        a.set(k, pe3);
292                        a.set(great--, pe1);
293                    }
294                }
295            }
296        } else { // Pivots are equal
297            /*
298             * Partition degenerates to the traditional 3-way,
299             * or "Dutch National Flag", partition:
300             *
301             *   left part   center part            right part
302             * +----------------------------------------------+
303             * |  < pivot  |  == pivot  |    ?    |  > pivot  |
304             * +----------------------------------------------+
305             *              ^            ^       ^
306             *              |            |       |
307             *             less          k     great
308             *
309             * Invariants:
310             *
311             *   all in (left, less)   < pivot
312             *   all in [less, k)     == pivot
313             *   all in (great, right) > pivot
314             *
315             * Pointer k is the first index of ?-part
316             */
317            for (long k = less; k <= great; k++) {
318                long ak = a.getLong(k, keyOffset);
319                a.get(k, pe1);
320                if (ak == pivot1) {
321                    continue;
322                }
323                if (ak < pivot1) { // Move a[k] to left part
324                    if (k != less) {
325                        a.get(less, pe3);
326                        a.set(k, pe3);
327                        a.set(less, pe1);
328                    }
329                    less++;
330                } else { // (a[k] > pivot1) -  Move a[k] to right part
331                    /*
332                     * We know that pivot1 == a[e3] == pivot2. Thus, we know
333                     * that great will still be >= k when the following loop
334                     * terminates, even though we don't test for it explicitly.
335                     * In other words, a[e3] acts as a sentinel for great.
336                     */
337                    while (a.getLong(great, keyOffset) > pivot1) {
338                        great--;
339                    }
340                    if (a.getLong(great, keyOffset) < pivot1) {
341                        a.get(less, pe3);
342                        a.set(k, pe3);
343                        a.get(great, pe3);
344                        a.set(less++, pe3);
345                        a.set(great--, pe1);
346                    } else { // a[great] == pivot1
347                        a.get(great, pe3);
348                        a.set(k, pe3);
349                        a.set(great--, pe1);
350                    }
351                }
352            }
353        }
354
355        // Swap pivots into their final positions
356        a.get(less - 1, pe1);
357        a.set(left, pe1); a.set(less - 1, pe2);
358        a.get(great + 1, pe1);
359        a.set(right, pe1); a.set(great + 1, pe4);
360
361        // Sort left and right parts recursively, excluding known pivot values
362        doSort(a, left,   less - 2, keyOffset, pi, pj, pe1, pe2, pe3, pe4, pe5);
363        doSort(a, great + 2, right, keyOffset, pi, pj, pe1, pe2, pe3, pe4, pe5);
364
365        /*
366         * If pivot1 == pivot2, all elements from center
367         * part are equal and, therefore, already sorted
368         */
369        if (!pivotsDiffer) {
370            return;
371        }
372
373        /*
374         * If center part is too large (comprises > 2/3 of the array),
375         * swap internal pivot values to ends
376         */
377        if (less < e1 && great > e5) {
378            while (a.getLong(less, keyOffset) == pivot1) {
379                less++;
380            }
381            while (a.getLong(great, keyOffset) == pivot2) {
382                great--;
383            }
384
385            /*
386             * Partitioning:
387             *
388             *   left part       center part                   right part
389             * +----------------------------------------------------------+
390             * | == pivot1 |  pivot1 < && < pivot2  |    ?    | == pivot2 |
391             * +----------------------------------------------------------+
392             *              ^                        ^       ^
393             *              |                        |       |
394             *             less                      k     great
395             *
396             * Invariants:
397             *
398             *              all in (*, less)  == pivot1
399             *     pivot1 < all in [less, k)   < pivot2
400             *              all in (great, *) == pivot2
401             *
402             * Pointer k is the first index of ?-part
403             */
404            outer:
405            for (long k = less; k <= great; k++) {
406                long ak = a.getLong(k, keyOffset);
407                a.get(k, pe1);
408                if (ak == pivot2) { // Move a[k] to right part
409                    while (a.getLong(great, keyOffset) == pivot2) {
410                        if (great-- == k) {
411                            break outer;
412                        }
413                    }
414                    if (a.getLong(great, keyOffset) == pivot1) {
415                        a.get(less, pe3);
416                        a.set(k, pe3);
417                        a.get(great, pe3);
418                        a.set(less++, pe3);
419                    } else { // pivot1 < a[great] < pivot2
420                        a.get(great, pe3);
421                        a.set(k, pe3);
422                    }
423                    a.set(great--, pe1);
424                } else if (ak == pivot1) { // Move a[k] to left part
425                    a.get(less, pe3);
426                    a.set(k, pe3);
427                    a.set(less++, pe1);
428                }
429            }
430        }
431
432        // Sort center part recursively, excluding known pivot values
433        doSort(a, less, great, keyOffset, pi, pj, pe1, pe2, pe3, pe4, pe5);
434    }
435
436    /**
437     * Worker, sorts part of the collection
438     */
439    private static class Worker implements Callable<Void> {
440        private final OffHeapStructCollection a;
441        final long left;
442        final long right;
443        private final int keyOffset;
444        private final byte[] pi;
445        private final byte[] pj;
446        private final byte[] pe1;
447        private final byte[] pe2;
448        private final byte[] pe3;
449        private final byte[] pe4;
450        private final byte[] pe5;
451
452        /**
453         * Private constructor
454         *
455         * @param a collection
456         * @param left start sort index
457         * @param right end sort index
458         * @param keyOffset sort key offset
459         * @param pi sort buffer
460         * @param pj sort buffer
461         * @param pe1 sort buffer
462         * @param pe2 sort buffer
463         * @param pe3 sort buffer
464         * @param pe4 sort buffer
465         * @param pe5 sort buffer
466         */
467        private Worker(OffHeapStructCollection a, long left, long right, int keyOffset, byte[] pi,
468                       byte[] pj, byte[] pe1, byte[] pe2, byte[] pe3, byte[] pe4, byte[] pe5) {
469            this.a = a;
470            this.left = left;
471            this.right = right;
472            this.keyOffset = keyOffset;
473            this.pi = pi;
474            this.pj = pj;
475            this.pe1 = pe1;
476            this.pe2 = pe2;
477            this.pe3 = pe3;
478            this.pe4 = pe4;
479            this.pe5 = pe5;
480        }
481
482        /**
483         * {@inheritDoc}
484         */
485        @Override
486        public Void call() throws Exception {
487            doSort(a, left, right, keyOffset, pi, pj, pe1, pe2, pe3, pe4, pe5);
488            return null;
489        }
490
491        /**
492         * Helper method to invoke and wait
493         *
494         * @param executor executor instance
495         * @param workers workers list
496         */
497        private static void invokeAndWait(ExecutorService executor, List<Worker> workers) {
498            try {
499                List<Future<Void>> futures = executor.invokeAll(workers);
500                for (Future<Void> fu : futures) {
501                    fu.get();
502                }
503            } catch (InterruptedException e) {
504                throw new RuntimeException(e);
505            } catch (ExecutionException e) {
506                throw new RuntimeException(e);
507            }
508        }
509    }
510
511    /**
512     * Merge-sort iterator over partly sorted collection
513     */
514    private static class MergeIter implements OffHeapDisposableIterator<byte[]> {
515        private final OffHeapStructCollection col;
516        private final int keyOffset;
517        private final List<Worker> bounds;
518        private final long[] indices;
519        private final byte[] buf;
520
521        /**
522         * Private constructor
523         *
524         * @param col partly sorted collection
525         * @param keyOffset key offset used to partially sort collection
526         * @param bounds collection sorted parts bounds
527         */
528        private MergeIter(OffHeapStructCollection col, int keyOffset, List<Worker> bounds) {
529            this.col = col;
530            this.keyOffset = keyOffset;
531            this.bounds = bounds;
532            this.buf = new byte[col.structLength()];
533            this.indices = new long[bounds.size()];
534            for (int i = 0; i < bounds.size(); i++) {
535                indices[i] = bounds.get(i).left;
536            }
537        }
538
539        /**
540         * {@inheritDoc}
541         */
542        @Override
543        public boolean hasNext() {
544            for (int i = 0; i < indices.length; i++) {
545                if (indices[i] <= bounds.get(i).right) return true;
546            }
547            return false;
548        }
549
550        /**
551         * {@inheritDoc}
552         */
553        @Override
554        public byte[] next() {
555            int minind = -1;
556            for (int i = 0; i < indices.length; i++) {
557                if (indices[i] > bounds.get(i).right) continue;
558                if (-1 == minind) minind = i;
559                if (col.getLong(indices[minind], keyOffset) > col.getLong(indices[i], keyOffset)) minind = i;
560            }
561            if (-1 == minind) throw new NoSuchElementException();
562            col.get(indices[minind], buf);
563            indices[minind] += 1;
564            return buf;
565        }
566
567        /**
568         * {@inheritDoc}
569         */
570        @Override
571        public void remove() {
572            throw new UnsupportedOperationException("remove");
573        }
574
575        /**
576         * {@inheritDoc}
577         */
578        @Override
579        public void free() {
580            OffHeapUtils.free(col);
581        }
582
583        /**
584         * {@inheritDoc}
585         */
586        @Override
587        public long size() {
588            return col.size();
589        }
590    }
591
592    /**
593     * Worker for two long keys sort
594     */
595    private static class KeyWorker implements Runnable {
596        private final OffHeapStructCollection col;
597        private final long from;
598        private final long to;
599        private final int keyOffset;
600
601        /**
602         * Private constructor
603         *
604         * @param col collection
605         * @param from start sort index
606         * @param to end sort index
607         * @param keyOffset sort key offset
608         */
609        private KeyWorker(OffHeapStructCollection col, long from, long to, int keyOffset) {
610            this.col = col;
611            this.from = from;
612            this.to = to;
613            this.keyOffset = keyOffset;
614        }
615
616        /**
617         * {@inheritDoc}
618         */
619        @Override
620        public void run() {
621            sort(col, from, to, keyOffset);
622        }
623    }
624}