001/*
002 * Copyright 2013 Alex Kasko (alexkasko.com)
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.alexkasko.unsafe.offheapstruct;
018
019import com.alexkasko.unsafe.offheap.OffHeapDisposableIterator;
020import com.alexkasko.unsafe.offheap.OffHeapUtils;
021
022import java.util.ArrayList;
023import java.util.Comparator;
024import java.util.List;
025import java.util.NoSuchElementException;
026import java.util.concurrent.Callable;
027import java.util.concurrent.ExecutionException;
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.Future;
030
031import static com.alexkasko.unsafe.offheapstruct.OffHeapStructSorter.INSERTION_SORT_THRESHOLD;
032
033/**
034 * <p>alexkasko: borrowed from {@code https://android.googlesource.com/platform/libcore/+/android-4.2.2_r1/luni/src/main/java/java/util/DualPivotQuicksort.java}
035 * and adapted to {@link com.alexkasko.unsafe.offheapstruct.OffHeapStructCollection} with comparator.
036 *
037 * <p>This class implements the Dual-Pivot Quicksort algorithm by
038 * Vladimir Yaroslavskiy, Jon Bentley, and Joshua Bloch. The algorithm
039 * offers O(n log(n)) performance on many data sets that cause other
040 * quicksorts to degrade to quadratic performance, and is typically
041 * faster than traditional (one-pivot) Quicksort implementations.
042 *
043 * @author Vladimir Yaroslavskiy
044 * @author Jon Bentley
045 * @author Josh Bloch
046 *
047 * @version 2009.11.29 m765.827.12i
048 */
049class OffHeapStructSorterWithComparator {
050
051    /**
052     * Partially sorts collection and returns fully sorted iterator over it
053     *
054     * @param executor executor service for parallel sorting
055     * @param threads number of worker threads to use
056     * @param a the off-heap struct collection to be sorted
057     * @param comparator structs comparator
058     * @return sorted iterator over the collection
059     * @throws IllegalArgumentException {@code if (fromIndex < 0 || fromIndex > toIndex || toIndex > a.size())}
060     * @throws RuntimeException on worker thread error
061     */
062    static OffHeapDisposableIterator<byte[]> sortedIterator(ExecutorService executor, int threads, OffHeapStructCollection a, Comparator<OffHeapStructAccessor> comparator) {
063        return sortedIterator(executor, threads, a, 0, a.size(), comparator);
064    }
065
066    /**
067     * Partially sorts part of the collection and returns fully sorted iterator over it
068     *
069     * @param executor executor service for parallel sorting
070     * @param threads number of worker threads to use
071     * @param a the off-heap struct collection to be sorted
072     * @param fromIndex the index of the first element, inclusive, to be sorted
073     * @param toIndex the index of the last element, exclusive, to be sorted
074     * @param comparator structs comparator
075     * @return sorted iterator over the collection part
076     * @throws IllegalArgumentException {@code if (fromIndex < 0 || fromIndex > toIndex || toIndex > a.size())}
077     * @throws RuntimeException on worker thread error
078     */
079    static OffHeapDisposableIterator<byte[]> sortedIterator(ExecutorService executor, int threads, OffHeapStructCollection a, long fromIndex,
080                                           long toIndex, Comparator<OffHeapStructAccessor> comparator) {
081        if(null == comparator) throw new IllegalArgumentException("Provided comparator is null");
082        if (fromIndex < 0 || fromIndex > toIndex || toIndex > a.size()) {
083            throw new IllegalArgumentException("Illegal input, collection size: [" + a.size() + "], " +
084                    "fromIndex: [" + fromIndex + "], toIndex: [" + toIndex + "]");
085        }
086        int len = a.structLength();
087        long step = (toIndex - 1 - fromIndex)/threads;
088        if(0 == step) {
089            doSort(a, fromIndex, toIndex - 1, new OffHeapStructComparator(a, comparator), new byte[len], new byte[len],
090                    new byte[len], new byte[len], new byte[len], new byte[len], new byte[len]);
091            return a.iterator();
092        } else {
093            List<Worker> workers = new ArrayList<Worker>();
094            for(int start = 0; start < toIndex; start += step) {
095                Worker worker = new Worker(a, start, Math.min(start + step - 1, toIndex - 1), new OffHeapStructComparator(a, comparator),
096                        new byte[len], new byte[len], new byte[len], new byte[len], new byte[len], new byte[len], new byte[len]);
097                workers.add(worker);
098            }
099            invokeAndWait(executor, workers);
100            return new MergeIter(a, new OffHeapStructComparator(a, comparator), workers);
101        }
102    }
103
104    /**
105     * Sorts the specified off-heap struct collection into ascending order using unsigned long struct key.
106     *
107     * @param a the off-heap struct collection to be sorted
108     * @param comparator structs comparator
109     */
110    static void sort(OffHeapStructCollection a, Comparator<OffHeapStructAccessor> comparator) {
111        sort(a, 0, a.size(), comparator);
112    }
113
114    /**
115     * Sorts the specified range of the off-heap struct collection into ascending order using unsigned long struct key.
116     * The range to be sorted extends from the index {@code fromIndex}, inclusive, to
117     * the index {@code toIndex}, exclusive. If {@code fromIndex == toIndex},
118     * the range to be sorted is empty (and the call is a no-op).
119     *
120     * @param a the off-heap struct collection to be sorted
121     * @param fromIndex the index of the first element, inclusive, to be sorted
122     * @param toIndex the index of the last element, exclusive, to be sorted
123     * @param comparator structs comparator
124     * @throws IllegalArgumentException {@code if (fromIndex < 0 || fromIndex > toIndex || toIndex > a.size())}
125     */
126    static void sort(OffHeapStructCollection a, long fromIndex, long toIndex, Comparator<OffHeapStructAccessor> comparator) {
127        if(null == comparator) throw new IllegalArgumentException("Provided comparator is null");
128        if (fromIndex < 0 || fromIndex > toIndex || toIndex > a.size()) {
129            throw new IllegalArgumentException("Illegal input, collection size: [" + a.size() + "], " +
130                    "fromIndex: [" + fromIndex + "], toIndex: [" + toIndex + "]");
131        }
132        OffHeapStructComparator comp = new OffHeapStructComparator(a, comparator);
133        int len = a.structLength();
134        doSort(a, fromIndex, toIndex - 1, comp, new byte[len], new byte[len], new byte[len], new byte[len], new byte[len],
135                new byte[len], new byte[len]);
136    }
137
138    /**
139     * Sorts the specified range of the off-heap struct collection into ascending order using unsigned long struct key.
140     * This method differs from the public {@code sort} method in that the
141     * {@code right} index is inclusive, and it does no range checking on
142     * {@code left} or {@code right}.
143     *
144     * @param a the off-heap header-payload collection to be sorted
145     * @param left the index of the first element, inclusive, to be sorted
146     * @param right the index of the last element, inclusive, to be sorted* @param keyOffset
147     * @param comp structs comparator
148     * @param pi temporary buffer for structs
149     * @param pj temporary buffer for structs
150     * @param pe1 temporary buffer for structs
151     * @param pe2 temporary buffer for structs
152     * @param pe3 temporary buffer for structs
153     * @param pe4 temporary buffer for structs
154     * @param pe5 temporary buffer for structs
155     */
156    static void doSort(OffHeapStructCollection a, long left, long right, OffHeapStructComparator comp,
157                               byte[] pi, byte[] pj, byte[] pe1, byte[] pe2, byte[] pe3, byte[] pe4, byte[] pe5) {
158        // Use insertion sort on tiny arrays
159        if (right - left + 1 < INSERTION_SORT_THRESHOLD) {
160            for (long i = left + 1; i <= right; i++) {
161                a.get(i, pi);
162                long j;
163                for (j = i - 1; j >= left && comp.lt(pi, j); j--) {
164                    a.get(j, pj);
165                    a.set(j + 1, pj);
166                }
167                a.set(j + 1, pi);
168            }
169        } else { // Use Dual-Pivot Quicksort on large arrays
170            dualPivotQuicksort(a, left, right, comp, pi, pj, pe1, pe2, pe3, pe4, pe5);
171        }
172    }
173
174    /**
175     * Sorts the specified range of the off-heap struct collection into ascending order by the
176     * Dual-Pivot Quicksort algorithm using unsigned long struct key.
177     *
178     * @param a the off-heap header-payload collection to be sorted
179     * @param left the index of the first element, inclusive, to be sorted
180     * @param right the index of the last element, inclusive, to be sorted
181     * @param comp structs comparator
182     * @param pi temporary buffer for structs
183     * @param pj temporary buffer for structs
184     * @param pe1 temporary buffer for structs
185     * @param pe2 temporary buffer for structs
186     * @param pe3 temporary buffer for structs
187     * @param pe4 temporary buffer for structs
188     * @param pe5 temporary buffer for structs
189     */
190    private static void dualPivotQuicksort(OffHeapStructCollection a, long left, long right, OffHeapStructComparator comp, byte[] pi, byte[] pj,
191                                           byte[] pe1, byte[] pe2, byte[] pe3, byte[] pe4, byte[] pe5) {
192        // Compute indices of five evenly spaced elements
193        long sixth = (right - left + 1) / 6;
194        long e1 = left  + sixth;
195        long e5 = right - sixth;
196        long e3 = (left + right) >>> 1; // The midpoint
197        long e4 = e3 + sixth;
198        long e2 = e3 - sixth;
199
200        // Sort these elements using a 5-element sorting network
201        long ae1 = e1, ae2 = e2, ae3 = e3, ae4 = e4, ae5 = e5;
202        a.get(e1, pe1); a.get(e2, pe2); a.get(e3, pe3); a.get(e4, pe4); a.get(e5, pe5);
203
204        if (comp.gt(ae1, ae2)) { long t = ae1; byte[] pt = pe1; ae1 = ae2; pe1 = pe2; ae2 = t; pe2 = pt; }
205        if (comp.gt(ae4, ae5)) { long t = ae4; byte[] pt = pe4; ae4 = ae5; pe4 = pe5; ae5 = t; pe5 = pt; }
206        if (comp.gt(ae1, ae3)) { long t = ae1; byte[] pt = pe1; ae1 = ae3; pe1 = pe3; ae3 = t; pe3 = pt; }
207        if (comp.gt(ae2, ae3)) { long t = ae2; byte[] pt = pe2; ae2 = ae3; pe2 = pe3; ae3 = t; pe3 = pt; }
208        if (comp.gt(ae1, ae4)) { long t = ae1; byte[] pt = pe1; ae1 = ae4; pe1 = pe4; ae4 = t; pe4 = pt; }
209        if (comp.gt(ae3, ae4)) { long t = ae3; byte[] pt = pe3; ae3 = ae4; pe3 = pe4; ae4 = t; pe4 = pt; }
210        if (comp.gt(ae2, ae5)) { long t = ae2; byte[] pt = pe2; ae2 = ae5; pe2 = pe5; ae5 = t; pe5 = pt; }
211        if (comp.gt(ae2, ae3)) { long t = ae2; byte[] pt = pe2; ae2 = ae3; pe2 = pe3; ae3 = t; pe3 = pt; }
212        if (comp.gt(ae4, ae5)) { long t = ae4; byte[] pt = pe4; ae4 = ae5; pe4 = pe5; ae5 = t; pe5 = pt; }
213
214        a.set(e1, pe1); a.set(e3, pe3); a.set(e5, pe5);
215
216        /*
217         * Use the second and fourth of the five sorted elements as pivots.
218         * These values are inexpensive approximations of the first and
219         * second terciles of the array. Note that pivot1 <= pivot2.
220         *
221         * The pivots are stored in local variables, and the first and
222         * the last of the elements to be sorted are moved to the locations
223         * formerly occupied by the pivots. When partitioning is complete,
224         * the pivots are swapped back into their final positions, and
225         * excluded from subsequent sorting.
226         */
227        a.get(left, pe1);
228        a.set(e2, pe1);
229        a.get(right, pe1);
230        a.set(e4, pe1);
231
232        // Pointers
233        long less  = left  + 1; // The index of first element of center part
234        long great = right - 1; // The index before first element of right part
235
236        // comparable results holders
237        int kpivot1, greatpivot1;
238
239        boolean pivotsDiffer = !comp.eq(pe2, pe4);
240
241        if (pivotsDiffer) {
242            /*
243             * Partitioning:
244             *
245             *   left part         center part                    right part
246             * +------------------------------------------------------------+
247             * | < pivot1  |  pivot1 <= && <= pivot2  |    ?    |  > pivot2 |
248             * +------------------------------------------------------------+
249             *              ^                          ^       ^
250             *              |                          |       |
251             *             less                        k     great
252             *
253             * Invariants:
254             *
255             *              all in (left, less)   < pivot1
256             *    pivot1 <= all in [less, k)     <= pivot2
257             *              all in (great, right) > pivot2
258             *
259             * Pointer k is the first index of ?-part
260             */
261            outer:
262            for (long k = less; k <= great; k++) {
263                a.get(k, pe1);
264                if (comp.lt(k, pe2)) { // Move a[k] to left part
265                    if (k != less) {
266                        a.get(less, pe3);
267                        a.set(k, pe3);
268                        a.set(less, pe1);
269                    }
270                    less++;
271                } else if (comp.gt(k, pe4)) { // Move a[k] to right part
272                    while (comp.gt(great, pe4)) {
273                        if (great-- == k) {
274                            break outer;
275                        }
276                    }
277                    if (comp.lt(great, pe2)) {
278                        a.get(less, pe3);
279                        a.set(k, pe3);
280                        a.get(great, pe3);
281                        a.set(less++, pe3);
282                        a.set(great--, pe1);
283                    } else { // pivot1 <= a[great] <= pivot2
284                        a.get(great, pe3);
285                        a.set(k, pe3);
286                        a.set(great--, pe1);
287                    }
288                }
289            }
290        } else { // Pivots are equal
291            /*
292             * Partition degenerates to the traditional 3-way,
293             * or "Dutch National Flag", partition:
294             *
295             *   left part   center part            right part
296             * +----------------------------------------------+
297             * |  < pivot  |  == pivot  |    ?    |  > pivot  |
298             * +----------------------------------------------+
299             *              ^            ^       ^
300             *              |            |       |
301             *             less          k     great
302             *
303             * Invariants:
304             *
305             *   all in (left, less)   < pivot
306             *   all in [less, k)     == pivot
307             *   all in (great, right) > pivot
308             *
309             * Pointer k is the first index of ?-part
310             */
311            for (long k = less; k <= great; k++) {
312                a.get(k, pe1);
313                kpivot1 = comp.compare(k, pe2);
314                if (0 == kpivot1) {
315                    continue;
316                }
317                if (kpivot1 < 0) { // Move a[k] to left part
318                    if (k != less) {
319                        a.get(less, pe3);
320                        a.set(k, pe3);
321                        a.set(less, pe1);
322                    }
323                    less++;
324                } else { // (a[k] > pivot1) -  Move a[k] to right part
325                    /*
326                     * We know that pivot1 == a[e3] == pivot2. Thus, we know
327                     * that great will still be >= k when the following loop
328                     * terminates, even though we don't test for it explicitly.
329                     * In other words, a[e3] acts as a sentinel for great.
330                     */
331                    while ((greatpivot1 = comp.compare(great, pe2)) > 0) {
332                        great--;
333                    }
334                    if (greatpivot1 < 0) {
335                        a.get(less, pe3);
336                        a.set(k, pe3);
337                        a.get(great, pe3);
338                        a.set(less++, pe3);
339                        a.set(great--, pe1);
340                    } else { // a[great] == pivot1
341                        a.get(great, pe3);
342                        a.set(k, pe3);
343                        a.set(great--, pe1);
344                    }
345                }
346            }
347        }
348
349        // Swap pivots into their final positions
350        a.get(less - 1, pe1);
351        a.set(left, pe1); a.set(less - 1, pe2);
352        a.get(great + 1, pe1);
353        a.set(right, pe1); a.set(great + 1, pe4);
354
355        // Sort left and right parts recursively, excluding known pivot values
356        doSort(a, left,   less - 2, comp, pi, pj, pe1, pe2, pe3, pe4, pe5);
357        doSort(a, great + 2, right, comp, pi, pj, pe1, pe2, pe3, pe4, pe5);
358
359        /*
360         * If pivot1 == pivot2, all elements from center
361         * part are equal and, therefore, already sorted
362         */
363        if (!pivotsDiffer) {
364            return;
365        }
366
367        /*
368         * If center part is too large (comprises > 2/3 of the array),
369         * swap internal pivot values to ends
370         */
371        if (less < e1 && great > e5) {
372            while (comp.eq(less,pe2)) {
373                less++;
374            }
375            while (comp.eq(great, pe4)) {
376                great--;
377            }
378
379            /*
380             * Partitioning:
381             *
382             *   left part       center part                   right part
383             * +----------------------------------------------------------+
384             * | == pivot1 |  pivot1 < && < pivot2  |    ?    | == pivot2 |
385             * +----------------------------------------------------------+
386             *              ^                        ^       ^
387             *              |                        |       |
388             *             less                      k     great
389             *
390             * Invariants:
391             *
392             *              all in (*, less)  == pivot1
393             *     pivot1 < all in [less, k)   < pivot2
394             *              all in (great, *) == pivot2
395             *
396             * Pointer k is the first index of ?-part
397             */
398            outer:
399            for (long k = less; k <= great; k++) {
400                a.get(k, pe1);
401                if (comp.eq(k, pe4)) { // Move a[k] to right part
402                    while (comp.eq(great, pe4)) {
403                        if (great-- == k) {
404                            break outer;
405                        }
406                    }
407                    if (comp.eq(great, pe2)) {
408                        a.get(less, pe3);
409                        a.set(k, pe3);
410                        a.get(great, pe3);
411                        a.set(less++, pe3);
412                    } else { // pivot1 < a[great] < pivot2
413                        a.get(great, pe3);
414                        a.set(k, pe3);
415                    }
416                    a.set(great--, pe1);
417                } else if (comp.eq(k, pe2)) { // Move a[k] to left part
418                    a.get(less, pe3);
419                    a.set(k, pe3);
420                    a.set(less++, pe1);
421                }
422            }
423        }
424
425        // Sort center part recursively, excluding known pivot values
426        doSort(a, less, great, comp, pi, pj, pe1, pe2, pe3, pe4, pe5);
427    }
428
429    /**
430     * Helper method to invoke and wait
431     *
432     * @param executor executor instance
433     * @param workers  workers list
434     */
435    private static void invokeAndWait(ExecutorService executor, List<Worker> workers) {
436        try {
437            List<Future<Void>> futures = executor.invokeAll(workers);
438            for (Future<Void> fu : futures) {
439                fu.get();
440            }
441        } catch (InterruptedException e) {
442            throw new RuntimeException(e);
443        } catch (ExecutionException e) {
444            throw new RuntimeException(e);
445        }
446    }
447
448    /**
449     * Worker, sorts part of the collection
450     */
451    private static class Worker implements Callable<Void> {
452        private final OffHeapStructCollection a;
453        final long left;
454        final long right;
455        private final OffHeapStructComparator comp;
456        private final byte[] pi;
457        private final byte[] pj;
458        private final byte[] pe1;
459        private final byte[] pe2;
460        private final byte[] pe3;
461        private final byte[] pe4;
462        private final byte[] pe5;
463
464        /**
465         * Private constructor
466         *
467         * @param a     collection
468         * @param left  start sort index
469         * @param right end sort index
470         * @param comp  comparator
471         * @param pi    sort buffer
472         * @param pj    sort buffer
473         * @param pe1   sort buffer
474         * @param pe2   sort buffer
475         * @param pe3   sort buffer
476         * @param pe4   sort buffer
477         * @param pe5   sort buffer
478         */
479        private Worker(OffHeapStructCollection a, long left, long right, OffHeapStructComparator comp, byte[] pi,
480                       byte[] pj, byte[] pe1, byte[] pe2, byte[] pe3, byte[] pe4, byte[] pe5) {
481            this.a = a;
482            this.left = left;
483            this.right = right;
484            this.comp = comp;
485            this.pi = pi;
486            this.pj = pj;
487            this.pe1 = pe1;
488            this.pe2 = pe2;
489            this.pe3 = pe3;
490            this.pe4 = pe4;
491            this.pe5 = pe5;
492        }
493
494        /**
495         * {@inheritDoc}
496         */
497        @Override
498        public Void call() throws Exception {
499            doSort(a, left, right, comp, pi, pj, pe1, pe2, pe3, pe4, pe5);
500            return null;
501        }
502    }
503
504    /**
505     * Merge-sort iterator over partly sorted collection
506     */
507    private static class MergeIter implements OffHeapDisposableIterator<byte[]> {
508        private final OffHeapStructCollection col;
509        private final OffHeapStructComparator comp;
510        private final List<Worker> bounds;
511        private final long[] indices;
512        private final byte[] buf;
513
514
515        /**
516         * Private constructor
517         *
518         * @param col    partly sorted collection
519         * @param comp   comparator used to partially sort collection
520         * @param bounds collection sorted parts bounds
521         */
522        private MergeIter(OffHeapStructCollection col, OffHeapStructComparator comp, List<Worker> bounds) {
523            this.col = col;
524            this.comp = comp;
525            this.bounds = bounds;
526            this.buf = new byte[col.structLength()];
527            this.indices = new long[bounds.size()];
528            for (int i = 0; i < bounds.size(); i++) {
529                indices[i] = bounds.get(i).left;
530            }
531
532        }
533
534        /**
535         * {@inheritDoc}
536         */
537        @Override
538        public boolean hasNext() {
539            for (int i = 0; i < indices.length; i++) {
540                if (indices[i] <= bounds.get(i).right) return true;
541            }
542            return false;
543        }
544
545        /**
546         * {@inheritDoc}
547         */
548        @Override
549        public byte[] next() {
550            int minind = -1;
551            for (int i = 0; i < indices.length; i++) {
552                if (indices[i] > bounds.get(i).right) continue;
553                if (-1 == minind) minind = i;
554                if (comp.gt(indices[minind], indices[i])) minind = i;
555            }
556            if (-1 == minind) throw new NoSuchElementException();
557            col.get(indices[minind], buf);
558            indices[minind] += 1;
559            return buf;
560        }
561
562        /**
563         * {@inheritDoc}
564         */
565        @Override
566        public void remove() {
567            throw new UnsupportedOperationException("remove");
568        }
569
570        /**
571         * {@inheritDoc}
572         */
573        @Override
574        public void free() {
575            OffHeapUtils.free(col);
576        }
577
578        /**
579         * {@inheritDoc}
580         */
581        @Override
582        public long size() {
583            return col.size();
584        }
585    }
586}