Experimental change to speed up ORDER BY clauses that sort based on a single expression.
FossilOrigin-Name: 2bb8c4926136a1fcad107b09c8718d36d7101a51
diff --git a/manifest b/manifest
index fa8c064..a2c361e 100644
--- a/manifest
+++ b/manifest
@@ -1,5 +1,5 @@
-C Silence\sthree\sharmless\scompiler\swarnings\sin\svdbesort.c.
-D 2012-08-14T19:04:27.492
+C Experimental\schange\sto\sspeed\sup\sORDER\sBY\sclauses\sthat\ssort\sbased\son\sa\ssingle\sexpression.
+D 2012-08-15T15:57:30.655
F Makefile.arm-wince-mingw32ce-gcc d6df77f1f48d690bd73162294bbba7f59507c72f
F Makefile.in abd5c10d21d1395f140d9e50ea999df8fa4d6376
F Makefile.linux-gcc 91d710bdc4998cb015f39edf3cb314ec4f4d7e23
@@ -237,14 +237,14 @@
F src/utf.c 890c67dcfcc7a74623c95baac7535aadfe265e84
F src/util.c 0af2e515dc0dabacec931bca39525f6c3f1c5455
F src/vacuum.c 587a52bb8833d7ac15af8916f25437e2575028bd
-F src/vdbe.c 75da79cdcd58481825a06f045bc2f5ea3966eeae
+F src/vdbe.c 93b560a418a64c4a4dd6aadb30e79fad5c7eb0bc
F src/vdbe.h 18f581cac1f4339ec3299f3e0cc6e11aec654cdb
-F src/vdbeInt.h 986b6b11a13c517337355009e5438703ba5b0a40
+F src/vdbeInt.h 4554c3a3c7f061c0b6b64366b97f8c3949f4dfe9
F src/vdbeapi.c 88ea823bbcb4320f5a6607f39cd7c2d3cc4c26b1
F src/vdbeaux.c dce80038c3c41f2680e5ab4dd0f7e0d8b7ff9071
F src/vdbeblob.c 32f2a4899d67f69634ea4dd93e3f651936d732cb
F src/vdbemem.c cb55e84b8e2c15704968ee05f0fae25883299b74
-F src/vdbesort.c 0dc1b274dcb4d4c8e71b0b2b15261f286caba39b
+F src/vdbesort.c e4aa1c16532854ae312cc91fe9e373630aaa6463
F src/vdbetrace.c 8bd5da325fc90f28464335e4cc4ad1407fe30835
F src/vtab.c bb8ea3a26608bb1357538a5d2fc72beba6638998
F src/wal.c 9294df6f96aae5909ae1a9b733fd1e1b4736978b
@@ -709,7 +709,7 @@
F test/sidedelete.test f0ad71abe6233e3b153100f3b8d679b19a488329
F test/soak.test 0b5b6375c9f4110c828070b826b3b4b0bb65cd5f
F test/softheap1.test c16709a16ad79fa43b32929b2e623d1d117ccf53
-F test/sort.test 0e4456e729e5a92a625907c63dcdedfbe72c5dc5
+F test/sort.test 7adab2fcb2bd4ee8ee024cc8eab5a9192762fd75
F test/speed1.test f2974a91d79f58507ada01864c0e323093065452
F test/speed1p.explain d841e650a04728b39e6740296b852dccdca9b2cb
F test/speed1p.test c4a469f29f135f4d76c55b1f2a52f36e209466cc
@@ -1010,7 +1010,10 @@
F tool/warnings-clang.sh f6aa929dc20ef1f856af04a730772f59283631d4
F tool/warnings.sh fbc018d67fd7395f440c28f33ef0f94420226381
F tool/win/sqlite.vsix 67d8a99aceb56384a81b3f30d6c71743146d2cc9
-P 6730579cf5c6c74cb293e7237d896d3a3a36b691
-R 7ea1e8d09a42f7daed61f69d5f68ea70
-U drh
-Z 6a9fd56c83b0e144f530abdc3bddd4ea
+P a5431c86df442c6e6dfaeae8e8aa62b56d204e97
+R 2b92d91d7f760dd74de397b9ffc908eb
+T *branch * sorter-exp
+T *sym-sorter-exp *
+T -sym-trunk *
+U dan
+Z 040bce45e18c929d5310b23a13be7d4e
diff --git a/manifest.uuid b/manifest.uuid
index 4dc7f48..41835e0 100644
--- a/manifest.uuid
+++ b/manifest.uuid
@@ -1 +1 @@
-a5431c86df442c6e6dfaeae8e8aa62b56d204e97
\ No newline at end of file
+2bb8c4926136a1fcad107b09c8718d36d7101a51
\ No newline at end of file
diff --git a/src/vdbe.c b/src/vdbe.c
index 12e7325..96d1e2f 100644
--- a/src/vdbe.c
+++ b/src/vdbe.c
@@ -4150,13 +4150,11 @@
*/
case OP_SorterCompare: {
VdbeCursor *pC;
- int res;
pC = p->apCsr[pOp->p1];
assert( isSorter(pC) );
pIn3 = &aMem[pOp->p3];
- rc = sqlite3VdbeSorterCompare(pC, pIn3, &res);
- if( res ){
+ if( sqlite3VdbeSorterCompare(pC, pIn3) ){
pc = pOp->p2-1;
}
break;
diff --git a/src/vdbeInt.h b/src/vdbeInt.h
index 1f5694a..e50f327 100644
--- a/src/vdbeInt.h
+++ b/src/vdbeInt.h
@@ -427,7 +427,7 @@
# define sqlite3VdbeSorterRowkey(Y,Z) SQLITE_OK
# define sqlite3VdbeSorterRewind(X,Y,Z) SQLITE_OK
# define sqlite3VdbeSorterNext(X,Y,Z) SQLITE_OK
-# define sqlite3VdbeSorterCompare(X,Y,Z) SQLITE_OK
+# define sqlite3VdbeSorterCompare(X,Y) 0
#else
int sqlite3VdbeSorterInit(sqlite3 *, VdbeCursor *);
void sqlite3VdbeSorterClose(sqlite3 *, VdbeCursor *);
@@ -435,7 +435,7 @@
int sqlite3VdbeSorterNext(sqlite3 *, const VdbeCursor *, int *);
int sqlite3VdbeSorterRewind(sqlite3 *, const VdbeCursor *, int *);
int sqlite3VdbeSorterWrite(sqlite3 *, const VdbeCursor *, Mem *);
-int sqlite3VdbeSorterCompare(const VdbeCursor *, Mem *, int *);
+int sqlite3VdbeSorterCompare(const VdbeCursor *, Mem *);
#endif
#if !defined(SQLITE_OMIT_SHARED_CACHE) && SQLITE_THREADSAFE>0
diff --git a/src/vdbesort.c b/src/vdbesort.c
index ba1e9f0..57999e4 100644
--- a/src/vdbesort.c
+++ b/src/vdbesort.c
@@ -39,7 +39,7 @@
** The aIter[] array contains an iterator for each of the PMAs being merged.
** An aIter[] iterator either points to a valid key or else is at EOF. For
** the purposes of the paragraphs below, we assume that the array is actually
-** N elements in size, where N is the smallest power of 2 greater to or equal
+** N elements in size, where N is the smallest power of 2 greater to or equal
** to the number of iterators being merged. The extra aIter[] elements are
** treated as if they are empty (always at EOF).
**
@@ -104,10 +104,21 @@
VdbeSorterIter *aIter; /* Array of iterators to merge */
int *aTree; /* Current state of incremental merge */
sqlite3_file *pTemp1; /* PMA file 1 */
- SorterRecord *pRecord; /* Head of in-memory record list */
+ SorterRecord *aRec[9]; /* Nine different types of records */
+ SorterRecord **aLastRec[9]; /* Locations to write the next pointers to */
UnpackedRecord *pUnpacked; /* Used to unpack keys */
};
+#define SORTER_NULL 0
+#define SORTER_INT_NEG 1
+#define SORTER_INT_ZERO 2
+#define SORTER_INT_ONE 3
+#define SORTER_INT_POS 4
+#define SORTER_DOUBLE 5
+#define SORTER_TEXT 6
+#define SORTER_BLOB 7
+#define SORTER_LARGE 8
+
/*
** The following type is an iterator for a PMA. It caches the current key in
** variables nKey/aKey. If the iterator is at EOF, pFile==0.
@@ -366,7 +377,6 @@
return rc;
}
-
/*
** Compare key1 (buffer pKey1, size nKey1 bytes) with key2 (buffer pKey2,
** size nKey2 bytes). Argument pKeyInfo supplies the collation functions
@@ -374,43 +384,74 @@
** Otherwise, return SQLITE_OK and set *pRes to a negative, zero or positive
** value, depending on whether key1 is smaller, equal to or larger than key2.
**
-** If the bOmitRowid argument is non-zero, assume both keys end in a rowid
-** field. For the purposes of the comparison, ignore it. Also, if bOmitRowid
-** is true and key1 contains even a single NULL value, it is considered to
-** be less than key2. Even if key2 also contains NULL values.
-**
** If pKey2 is passed a NULL pointer, then it is assumed that the pCsr->aSpace
** has been allocated and contains an unpacked record that is used as key2.
*/
-static void vdbeSorterCompare(
- const VdbeCursor *pCsr, /* Cursor object (for pKeyInfo) */
- int bOmitRowid, /* Ignore rowid field at end of keys */
+static int vdbeSorterCompareRec(
+ void *p, /* VdbeCursor object */
const void *pKey1, int nKey1, /* Left side of comparison */
- const void *pKey2, int nKey2, /* Right side of comparison */
- int *pRes /* OUT: Result of comparison */
+ const void *pKey2, int nKey2 /* Right side of comparison */
){
+ const VdbeCursor *pCsr = (VdbeCursor *)p;
KeyInfo *pKeyInfo = pCsr->pKeyInfo;
- VdbeSorter *pSorter = pCsr->pSorter;
- UnpackedRecord *r2 = pSorter->pUnpacked;
- int i;
+ UnpackedRecord *r2 = pCsr->pSorter->pUnpacked;
if( pKey2 ){
sqlite3VdbeRecordUnpack(pKeyInfo, nKey2, pKey2, r2);
}
+ return sqlite3VdbeRecordCompare(nKey1, pKey1, r2);
+}
- if( bOmitRowid ){
- r2->nField = pKeyInfo->nField;
- assert( r2->nField>0 );
- for(i=0; i<r2->nField; i++){
- if( r2->aMem[i].flags & MEM_Null ){
- *pRes = -1;
- return;
- }
- }
- r2->flags |= UNPACKED_PREFIX_MATCH;
+/*
+** Buffers pKey1 and pKey2 both contain encoded records. The first elements
+** of each are both either negative integers (if p!=0) or positive integers
+** greater than 1 (if p==0). Return a values less than, equal to or greater
+** than zero if the first field in pKey1 is less than, equal to or greater
+** than the first field in pKey2, respectively.
+*/
+static int vdbeSorterCompareInt(
+ void *p,
+ const void *pKey1, int nKey1, /* Left side of comparison */
+ const void *pKey2, int nKey2 /* Right side of comparison */
+){
+ static const int aLen[] = {0, 1, 2, 3, 4, 6, 8};
+ const u8 *aKey1 = (u8 *)pKey1;
+ const u8 *aKey2 = (u8 *)pKey2;
+ int res = (int)aKey1[1] - (int)aKey2[1];
+
+ if( res==0 ){
+ res = memcmp(&aKey1[aKey1[0]], &aKey2[aKey2[0]], aLen[aKey1[1]]);
+ }else if( aKey1[aKey1[0]] & 0x80 ){
+ res = res * -1;
}
+ return res;
+}
- *pRes = sqlite3VdbeRecordCompare(nKey1, pKey1, r2);
+/*
+** Buffers pKey1 and pKey2 both contain encoded records. The first elements
+** of each are both either text or blob values.
+**
+** Argument p points to a CollSeq structure. If the pKey1 and pKey2 buffers
+** contain blobs, then this is always the BINARY collation sequence. Either
+** way, compare the contents of the two buffers and return an integer less
+** than, equal to or greater than zero if the value in pKey1 is less than,
+** equal to or greater than that in pKey2, respectively.
+*/
+static int vdbeSorterCompareString(
+ void *p, /* Pointer to CollSeq object */
+ const void *pKey1, int nKey1, /* Left side of comparison */
+ const void *pKey2, int nKey2 /* Right side of comparison */
+){
+ CollSeq *pColl = (CollSeq *)p;
+ const u8 *aKey1 = (u8 *)pKey1;
+ const u8 *aKey2 = (u8 *)pKey2;
+ int n1, n2;
+ int res;
+
+ n1 = (aKey1[1] - 12) / 2;
+ n2 = (aKey2[1] - 12) / 2;
+ res = pColl->xCmp(pColl->pUser, n1, &aKey1[aKey1[0]], n2, &aKey2[aKey2[0]]);
+ return res;
}
/*
@@ -446,8 +487,8 @@
}else{
int res;
assert( pCsr->pSorter->pUnpacked!=0 ); /* allocated in vdbeSorterMerge() */
- vdbeSorterCompare(
- pCsr, 0, p1->aKey, p1->nKey, p2->aKey, p2->nKey, &res
+ res = vdbeSorterCompareRec(
+ (void *)pCsr, p1->aKey, p1->nKey, p2->aKey, p2->nKey
);
if( res<=0 ){
iRes = i1;
@@ -460,6 +501,18 @@
return SQLITE_OK;
}
+
+/*
+** Set each entry of the aLastRec[] array to point to the corresponding entry
+** in the aRec[] array.
+*/
+static void vdbeSorterSetLastRec(VdbeSorter *pSorter){
+ int i;
+ for(i=0; i<ArraySize(pSorter->aLastRec); i++){
+ pSorter->aLastRec[i] = &pSorter->aRec[i];
+ }
+}
+
/*
** Initialize the temporary index cursor just opened as a sorter cursor.
*/
@@ -487,6 +540,7 @@
pSorter->mxPmaSize = mxCache * pgsz;
}
+ vdbeSorterSetLastRec(pSorter);
return SQLITE_OK;
}
@@ -508,8 +562,8 @@
void sqlite3VdbeSorterClose(sqlite3 *db, VdbeCursor *pCsr){
VdbeSorter *pSorter = pCsr->pSorter;
if( pSorter ){
+ int i;
if( pSorter->aIter ){
- int i;
for(i=0; i<pSorter->nTree; i++){
vdbeSorterIterZero(db, &pSorter->aIter[i]);
}
@@ -518,7 +572,11 @@
if( pSorter->pTemp1 ){
sqlite3OsCloseFree(pSorter->pTemp1);
}
- vdbeSorterRecordFree(db, pSorter->pRecord);
+
+ for(i=0; i<ArraySize(pSorter->aRec); i++){
+ vdbeSorterRecordFree(db, pSorter->aRec[i]);
+ }
+
sqlite3DbFree(db, pSorter->pUnpacked);
sqlite3DbFree(db, pSorter);
pCsr->pSorter = 0;
@@ -545,6 +603,8 @@
*/
static void vdbeSorterMerge(
const VdbeCursor *pCsr, /* For pKeyInfo */
+ int (*xCmp)(void*,const void*,int,const void*,int),
+ void *pCtx, /* First argument to pass to xCmp() */
SorterRecord *p1, /* First list to merge */
SorterRecord *p2, /* Second list to merge */
SorterRecord **ppOut /* OUT: Head of merged list */
@@ -555,12 +615,15 @@
while( p1 && p2 ){
int res;
- vdbeSorterCompare(pCsr, 0, p1->pVal, p1->nVal, pVal2, p2->nVal, &res);
+ res = xCmp(pCtx, p1->pVal, p1->nVal, pVal2, p2->nVal);
+ if( xCmp!=vdbeSorterCompareRec && pCsr->pKeyInfo->aSortOrder[0] ){
+ res = res * -1;
+ }
if( res<=0 ){
*pp = p1;
pp = &p1->pNext;
p1 = p1->pNext;
- pVal2 = 0;
+ if( xCmp==vdbeSorterCompareRec ) pVal2 = 0;
}else{
*pp = p2;
pp = &p2->pNext;
@@ -574,39 +637,127 @@
}
/*
+** Concatenate the linked lists headed at elements iStart through iEnd
+** (inclusive) of the pSorter->aRec[] array. Store the result in
+** pSorter->aRec[iEnd]. Set entries iStart through iEnd-1 to zero.
+**
+** If parameter bReverse is false, the lists are concatenated so that
+** all the elements of list iStart occur before those of iStart+1, and
+** so on. Or, if bReverse is true, the original content of iEnd is at
+** the start of the result, followed by the content of iEnd-1, etc.
+*/
+static void vdbeSorterConcatLists(
+ VdbeSorter *pSorter,
+ int bReverse, /* True to concenate in reverse order */
+ int iStart,
+ int iEnd
+){
+ int i;
+ for(i=iStart; i<iEnd; i++){
+ if( bReverse ){
+ *pSorter->aLastRec[i] = pSorter->aRec[iEnd];
+ pSorter->aRec[iEnd] = pSorter->aRec[i];
+ }else{
+ *pSorter->aLastRec[iEnd] = pSorter->aRec[i];
+ if( pSorter->aRec[i] ) pSorter->aLastRec[iEnd] = pSorter->aLastRec[i];
+ }
+ pSorter->aRec[i] = 0;
+ pSorter->aLastRec[i] = &pSorter->aRec[i];
+ }
+}
+
+/*
** Sort the linked list of records headed at pCsr->pRecord. Return SQLITE_OK
** if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if an error
** occurs.
*/
static int vdbeSorterSort(const VdbeCursor *pCsr){
int i;
+ int iRec;
SorterRecord **aSlot;
SorterRecord *p;
+ KeyInfo *pKeyInfo = pCsr->pKeyInfo;
VdbeSorter *pSorter = pCsr->pSorter;
+ int bReverse = pCsr->pKeyInfo->aSortOrder[0];
aSlot = (SorterRecord **)sqlite3MallocZero(64 * sizeof(SorterRecord *));
if( !aSlot ){
return SQLITE_NOMEM;
}
- p = pSorter->pRecord;
- while( p ){
- SorterRecord *pNext = p->pNext;
- p->pNext = 0;
- for(i=0; aSlot[i]; i++){
- vdbeSorterMerge(pCsr, p, aSlot[i], &p);
+ /* If there are one or more SORTER_LARGE records, or if there are
+ ** SORTER_TEXT records that must be converted to a different encoding
+ ** before they can be compared, move everything to the SORTER_LARGE slot. */
+ if( pSorter->aRec[SORTER_LARGE]
+ || (pSorter->aRec[SORTER_TEXT] && pKeyInfo->enc!=pKeyInfo->aColl[0]->enc)
+ ){
+ vdbeSorterConcatLists(pSorter, 0, 0, SORTER_LARGE);
+ }
+
+ /* If there are one or more SORTER_DOUBLE records, move all numeric
+ ** records to the SORTER_DOUBLE slot. */
+ if( pSorter->aRec[SORTER_DOUBLE] ){
+ vdbeSorterConcatLists(pSorter, 0, SORTER_INT_NEG, SORTER_DOUBLE);
+ }
+
+ for(iRec=0; iRec<ArraySize(pSorter->aRec); iRec++){
+ void *pCtx = 0;
+ int (*xCmp)(void*,const void*,int,const void*,int);
+ switch( iRec ){
+ case SORTER_NULL:
+ case SORTER_INT_ZERO:
+ case SORTER_INT_ONE:
+ xCmp = 0;
+ break;
+
+ case SORTER_INT_NEG:
+ case SORTER_INT_POS:
+ xCmp = vdbeSorterCompareInt;
+ break;
+
+ case SORTER_BLOB:
+ pCtx = (void *)(pCsr->pKeyInfo->db->pDfltColl);
+ xCmp = vdbeSorterCompareString;
+ break;
+
+ case SORTER_TEXT:
+ pCtx = (void *)(pCsr->pKeyInfo->aColl[0]);
+ xCmp = vdbeSorterCompareString;
+ break;
+
+ default:
+ pCtx = (void *)pCsr;
+ xCmp = vdbeSorterCompareRec;
+ break;
+ }
+ if( !xCmp ) continue;
+
+ p = pSorter->aRec[iRec];
+ while( p ){
+ SorterRecord *pNext = p->pNext;
+ p->pNext = 0;
+ for(i=0; aSlot[i]; i++){
+ vdbeSorterMerge(pCsr, xCmp, pCtx, aSlot[i], p, &p);
+ aSlot[i] = 0;
+ }
+ aSlot[i] = p;
+ p = pNext;
+ }
+ p = 0;
+ for(i=0; i<64; i++){
+ vdbeSorterMerge(pCsr, xCmp, pCtx, aSlot[i], p, &p);
aSlot[i] = 0;
}
- aSlot[i] = p;
- p = pNext;
+ pSorter->aRec[iRec] = p;
+ if( p ){
+ SorterRecord *pRec;
+ for(pRec=pSorter->aRec[iRec]; pRec->pNext; pRec=pRec->pNext);
+ pSorter->aLastRec[iRec] = &pRec->pNext;
+ }
}
- p = 0;
- for(i=0; i<64; i++){
- vdbeSorterMerge(pCsr, p, aSlot[i], &p);
- }
- pSorter->pRecord = p;
-
+ vdbeSorterConcatLists(pSorter, bReverse, 0, SORTER_LARGE);
+ vdbeSorterSetLastRec(pSorter);
sqlite3_free(aSlot);
return SQLITE_OK;
}
@@ -711,15 +862,14 @@
** key). The varint is the number of bytes in the blob of data.
*/
static int vdbeSorterListToPMA(sqlite3 *db, const VdbeCursor *pCsr){
- int rc = SQLITE_OK; /* Return code */
+ int rc; /* Return code */
VdbeSorter *pSorter = pCsr->pSorter;
FileWriter writer;
memset(&writer, 0, sizeof(FileWriter));
if( pSorter->nInMemory==0 ){
- assert( pSorter->pRecord==0 );
- return rc;
+ return SQLITE_OK;
}
rc = vdbeSorterSort(pCsr);
@@ -739,13 +889,13 @@
fileWriterInit(db, pSorter->pTemp1, &writer, pSorter->iWriteOff);
pSorter->nPMA++;
fileWriterWriteVarint(&writer, pSorter->nInMemory);
- for(p=pSorter->pRecord; p; p=pNext){
+ for(p=pSorter->aRec[SORTER_LARGE]; rc==SQLITE_OK && p; p=pNext){
pNext = p->pNext;
fileWriterWriteVarint(&writer, p->nVal);
fileWriterWrite(&writer, p->pVal, p->nVal);
sqlite3DbFree(db, p);
}
- pSorter->pRecord = p;
+ pSorter->aRec[SORTER_LARGE] = p;
rc = fileWriterFinish(db, &writer, &pSorter->iWriteOff);
}
@@ -771,11 +921,48 @@
if( pNew==0 ){
rc = SQLITE_NOMEM;
}else{
- pNew->pVal = (void *)&pNew[1];
+ int iSlot;
+ u8 *aVal = pNew->pVal = (void *)&pNew[1];
memcpy(pNew->pVal, pVal->z, pVal->n);
pNew->nVal = pVal->n;
- pNew->pNext = pSorter->pRecord;
- pSorter->pRecord = pNew;
+
+ u8 n = aVal[0];
+ u8 t = aVal[1];
+
+ if( pCsr->pKeyInfo->nField!=1 || (t & 0x80) || (n & 0x80) ){
+ iSlot = SORTER_LARGE;
+ }else{
+ u8 t = aVal[1];
+ switch( t ){
+ case 0:
+ iSlot = SORTER_NULL;
+ break;
+
+ case 1: case 2: case 3: case 4: case 5: case 6:
+ iSlot = (aVal[n] & 0x80) ? SORTER_INT_NEG : SORTER_INT_POS;
+ break;
+
+ case 7:
+ iSlot = SORTER_DOUBLE;
+ break;
+
+ case 8:
+ iSlot = SORTER_INT_ZERO;
+ break;
+
+ case 9:
+ iSlot = SORTER_INT_ONE;
+ break;
+
+ default:
+ iSlot = SORTER_BLOB - (t & 0x01);
+ break;
+ }
+ }
+
+ pNew->pNext = 0;
+ *pSorter->aLastRec[iSlot] = pNew;
+ pSorter->aLastRec[iSlot] = &(pNew->pNext);
}
/* See if the contents of the sorter should now be written out. They
@@ -854,7 +1041,7 @@
** sort the VdbeSorter.pRecord list. The vdbe layer will read data directly
** from the in-memory list. */
if( pSorter->nPMA==0 ){
- *pbEof = !pSorter->pRecord;
+ *pbEof = (pSorter->nInMemory==0);
assert( pSorter->aTree==0 );
return vdbeSorterSort(pCsr);
}
@@ -963,11 +1150,11 @@
*pbEof = (pSorter->aIter[pSorter->aTree[1]].pFile==0);
}else{
- SorterRecord *pFree = pSorter->pRecord;
- pSorter->pRecord = pFree->pNext;
+ SorterRecord *pFree = pSorter->aRec[SORTER_LARGE];
+ pSorter->aRec[SORTER_LARGE] = pFree->pNext;
pFree->pNext = 0;
vdbeSorterRecordFree(db, pFree);
- *pbEof = !pSorter->pRecord;
+ *pbEof = !pSorter->aRec[SORTER_LARGE];
rc = SQLITE_OK;
}
return rc;
@@ -988,8 +1175,8 @@
*pnKey = pIter->nKey;
pKey = pIter->aKey;
}else{
- *pnKey = pSorter->pRecord->nVal;
- pKey = pSorter->pRecord->pVal;
+ *pnKey = pSorter->aRec[SORTER_LARGE]->nVal;
+ pKey = pSorter->aRec[SORTER_LARGE]->pVal;
}
return pKey;
}
@@ -1008,7 +1195,6 @@
pOut->n = nKey;
MemSetTypeFlag(pOut, MEM_Blob);
memcpy(pOut->z, pKey, nKey);
-
return SQLITE_OK;
}
@@ -1024,15 +1210,26 @@
*/
int sqlite3VdbeSorterCompare(
const VdbeCursor *pCsr, /* Sorter cursor */
- Mem *pVal, /* Value to compare to current sorter key */
- int *pRes /* OUT: Result of comparison */
+ Mem *pVal /* Value to compare to current sorter key */
){
+ KeyInfo *pKeyInfo = pCsr->pKeyInfo;
VdbeSorter *pSorter = pCsr->pSorter;
+ UnpackedRecord *r2 = pSorter->pUnpacked;
+ int i;
void *pKey; int nKey; /* Sorter key to compare pVal with */
pKey = vdbeSorterRowkey(pSorter, &nKey);
- vdbeSorterCompare(pCsr, 1, pVal->z, pVal->n, pKey, nKey, pRes);
- return SQLITE_OK;
+ assert( pKey && pVal->z );
+ sqlite3VdbeRecordUnpack(pKeyInfo, nKey, pKey, r2);
+
+ r2->nField = pKeyInfo->nField;
+ assert( r2->nField>0 );
+ for(i=0; i<r2->nField; i++){
+ if( r2->aMem[i].flags & MEM_Null ) return -1;
+ }
+ r2->flags |= UNPACKED_PREFIX_MATCH;
+
+ return sqlite3VdbeRecordCompare(pVal->n, pVal->z, r2);
}
#endif /* #ifndef SQLITE_OMIT_MERGE_SORT */
diff --git a/test/sort.test b/test/sort.test
index 08d496b..84793c8 100644
--- a/test/sort.test
+++ b/test/sort.test
@@ -464,4 +464,19 @@
}
} {1 2 xxx 1 3 yyy 1 1 zzz}
+foreach {tn shuffle} {
+ 1 {1 2 3 4} 2 {1 3 2 4}
+ 3 {4 3 2 1} 4 {1 3 2 4}
+} {
+ do_test sort-13.$tn {
+ execsql { DROP TABLE IF EXISTS w1 }
+ execsql { CREATE TABLE w1(x) }
+ foreach i $shuffle {
+ execsql { INSERT INTO w1 VALUES($i) }
+ }
+ execsql { SELECT x FROM w1 ORDER BY x }
+ } {1 2 3 4}
+}
+
finish_test
+