なんていうことを特に調べるつもりはなくて、なんとなくRedisのsrc/を見てたらpqsort.cというのを見つけたのでファイルを開いて見てみたらNetBSD由来のqsortっぽくて、へ~なんて思っていたけどpqsort(3)なんて有ったっけ?なんて思いつつファイルをよく見たら「 The following is the NetBSD libc qsort implementation modified in order to support partial sorting of ranges for Redis.」とありlimitを付けた場合の処理かと気づいた訳です。
そんな訳でNetBSDのqsort.cをCVSのwebインターフェースからダウンロードして先頭のほうにあるコメントを除いてdiffを取ったのがこれ。
--- qsort.c 2013-12-12 20:34:07.126669638 +0900 +++ pqsort.c 2013-12-12 20:33:52.800003220 +0900 @@ -1,12 +1,11 @@ #include <sys/types.h> -#include <assert.h> #include <errno.h> #include <stdlib.h> -static inline char *med3(char *, char *, char *, +static inline char *med3 (char *, char *, char *, int (*)(const void *, const void *)); -static inline void swapfunc(char *, char *, size_t, int); +static inline void swapfunc (char *, char *, size_t, int); #define min(a, b) (a) < (b) ? a : b @@ -49,7 +48,7 @@ static inline char * med3(char *a, char *b, char *c, - int (*cmp)(const void *, const void *)) + int (*cmp) (const void *, const void *)) { return cmp(a, b) < 0 ? @@ -57,17 +56,14 @@ :(cmp(b, c) > 0 ? b : (cmp(a, c) < 0 ? a : c )); } -void -qsort(void *a, size_t n, size_t es, - int (*cmp)(const void *, const void *)) +static void +_pqsort(void *a, size_t n, size_t es, + int (*cmp) (const void *, const void *), void *lrange, void *rrange) { char *pa, *pb, *pc, *pd, *pl, *pm, *pn; size_t d, r; int swaptype, cmp_result; - _DIAGASSERT(a != NULL || n == 0 || es == 0); - _DIAGASSERT(cmp != NULL); - loop: SWAPINIT(a, es); if (n < 7) { for (pm = (char *) a + es; pm < (char *) a + n * es; pm += es) @@ -119,13 +115,32 @@ vecswap(a, pb - r, r); r = min((size_t)(pd - pc), pn - pd - es); vecswap(pb, pn - r, r); - if ((r = pb - pa) > es) - qsort(a, r / es, es, cmp); + if ((r = pb - pa) > es) { + void *_l = a, *_r = ((unsigned char*)a)+r-1; + if (!((lrange < _l && rrange < _l) || + (lrange > _r && rrange > _r))) + _pqsort(a, r / es, es, cmp, lrange, rrange); + } if ((r = pd - pc) > es) { + void *_l, *_r; + /* Iterate rather than recurse to save stack space */ a = pn - r; n = r / es; - goto loop; + + _l = a; + _r = ((unsigned char*)a)+r-1; + if (!((lrange < _l && rrange < _l) || + (lrange > _r && rrange > _r))) + goto loop; } /* qsort(pn - r, r / es, es, cmp);*/ } + +void +pqsort(void *a, size_t n, size_t es, + int (*cmp) (const void *, const void *), size_t lrange, size_t rrange) +{ + _pqsort(a,n,es,cmp,((unsigned char*)a)+(lrange*es), + ((unsigned char*)a)+((rrange+1)*es)-1); +}
qsort()がpqsort()に名前変更されて、関数のインターフェースは引数としてlrange、rrangeを受け取るようになってます。diffの上の方はインデントとか空白の付け方の違い程度しかないですね。
ソート処理の実体部分での違いは大きく2ヶ所。
1ヶ所目はここ。
- if ((r = pb - pa) > es) - qsort(a, r / es, es, cmp); + if ((r = pb - pa) > es) { + void *_l = a, *_r = ((unsigned char*)a)+r-1; + if (!((lrange < _l && rrange < _l) || + (lrange > _r && rrange > _r))) + _pqsort(a, r / es, es, cmp, lrange, rrange); + }
qsort()の場合は(pb - pa) > es)が真なら_qsort()を再帰呼び出ししているんだけど、pqsort()の場合はlrange、rrangeとの比較も追加して、条件を満たしたら再帰する。
もう一ヶ所はここで、qsortは変数a(ソート対象の配列の先頭アドレス)と変数n(ソート対象の配列の要素数)を再計算してloopラベルに飛んでいたところにlrangeとrrangeとの比較を追加。こちらは条件を満たした場合には再帰ではなくてloopラベルに飛ぶ。。
if ((r = pd - pc) > es) { + void *_l, *_r; + /* Iterate rather than recurse to save stack space */ a = pn - r; n = r / es; - goto loop; + + _l = a; + _r = ((unsigned char*)a)+r-1; + if (!((lrange < _l && rrange < _l) || + (lrange > _r && rrange > _r))) + goto loop; }
pqsortを使うのはsort.cにあるsortCommand()の以下の場所で、limitで範囲が指定された場合はpqsort()、そうじゃなければ標準cライブラリにあるqsort()が使われる。
if (sortby && (start != 0 || end != vectorlen-1)) pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end); else qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
- 作者: Josiah L. Carlson,長尾高弘
- 出版社/メーカー: アスキー・メディアワークス
- 発売日: 2013/12/27
- メディア: 大型本
- この商品を含むブログを見る