Changes to make the multi-threaded sorter sort stably.

FossilOrigin-Name: 83a105c864247a02b723c84069055765bf373703
diff --git a/manifest b/manifest
index 0b2bd63..5d63a50 100644
--- a/manifest
+++ b/manifest
@@ -1,5 +1,5 @@
-C Fix\sa\sbroken\sassert()\sin\svdbesort.c.
-D 2014-03-29T10:01:58.802
+C Changes\sto\smake\sthe\smulti-threaded\ssorter\ssort\sstably.
+D 2014-03-29T19:48:15.410
 F Makefile.arm-wince-mingw32ce-gcc d6df77f1f48d690bd73162294bbba7f59507c72f
 F Makefile.in ad0921c4b2780d01868cf69b419a4f102308d125
 F Makefile.linux-gcc 91d710bdc4998cb015f39edf3cb314ec4f4d7e23
@@ -286,7 +286,7 @@
 F src/vdbeaux.c 1153175fb57a8454e1c8cf79b59b7bf92b26779d
 F src/vdbeblob.c 15377abfb59251bccedd5a9c7d014a895f0c04aa
 F src/vdbemem.c 6fc77594c60f6155404f3f8d71bf36d1fdeb4447
-F src/vdbesort.c 2881297f4acdba5908078c5d7f00635288a1ca08
+F src/vdbesort.c d42a32a4fff1a1bac6033abc4252f93e6ef3282b
 F src/vdbetrace.c 6f52bc0c51e144b7efdcfb2a8f771167a8816767
 F src/vtab.c 21b932841e51ebd7d075e2d0ad1415dce8d2d5fd
 F src/wal.c 76e7fc6de229bea8b30bb2539110f03a494dc3a8
@@ -1160,7 +1160,7 @@
 F tool/warnings-clang.sh f6aa929dc20ef1f856af04a730772f59283631d4
 F tool/warnings.sh d1a6de74685f360ab718efda6265994b99bbea01
 F tool/win/sqlite.vsix 030f3eeaf2cb811a3692ab9c14d021a75ce41fff
-P a683c05f6250389e84b980b16559e162ba1a27c2
-R d8e63408790a442d33eec2a57272c54d
+P 18d1b402f2dbe78f1a1113bb356b710e348365ef
+R 9fe7387082ff9660756870dd3cb64af4
 U dan
-Z 6c5e71d99f167a1ded5f09353b3e0513
+Z 65bd6f7e332292b10d887f31007d4273
diff --git a/manifest.uuid b/manifest.uuid
index 1bf7c26..005734c 100644
--- a/manifest.uuid
+++ b/manifest.uuid
@@ -1 +1 @@
-18d1b402f2dbe78f1a1113bb356b710e348365ef
\ No newline at end of file
+83a105c864247a02b723c84069055765bf373703
\ No newline at end of file
diff --git a/src/vdbesort.c b/src/vdbesort.c
index c6927f8..08887e0 100644
--- a/src/vdbesort.c
+++ b/src/vdbesort.c
@@ -91,8 +91,10 @@
   SorterRecord *pList;            /* List of records for pThread to sort */
   int nInMemory;                  /* Expected size of PMA based on pList */
   u8 *aListMemory;                /* Records memory (or NULL) */
+  u32 iSeq;                       /* Sequence number for PMA */
 
   int nPMA;                       /* Number of PMAs currently in pTemp1 */
+  int bEmbeddedSeq;               /* True if pTemp1 contains embedded seq. */
   i64 iTemp1Off;                  /* Offset to write to in pTemp1 */
   sqlite3_file *pTemp1;           /* File to write PMAs to, or NULL */
 };
@@ -187,6 +189,7 @@
   u8 *aMemory;                    /* Block of memory to alloc records from */
   int iMemory;                    /* Offset of first free byte in aMemory */
   int nMemory;                    /* Size of aMemory allocation in bytes */
+  u32 iNextSeq;                   /* Sequence number for next PMA */
   SorterThread aThread[SQLITE_MAX_SORTER_THREAD];
 };
 
@@ -197,6 +200,8 @@
 struct VdbeSorterIter {
   i64 iReadOff;                   /* Current read offset */
   i64 iEof;                       /* 1 byte past EOF for this iterator */
+  int bEmbeddedSeq;               /* True if records have sequence values */
+  int iSeq;                       /* Current sequence value */
   int nAlloc;                     /* Bytes of space at aAlloc */
   int nKey;                       /* Number of bytes in key */
   sqlite3_file *pFile;            /* File iterator is reading from */
@@ -417,6 +422,11 @@
   }
 
   rc = vdbeSorterIterVarint(pIter, &nRec);
+  if( rc==SQLITE_OK && pIter->bEmbeddedSeq ){
+    u64 iSeq;
+    rc = vdbeSorterIterVarint(pIter, &iSeq);
+    pIter->iSeq = (int)iSeq;
+  }
   if( rc==SQLITE_OK ){
     pIter->nKey = (int)nRec;
     rc = vdbeSorterIterRead(pIter, (int)nRec, &pIter->aKey);
@@ -448,6 +458,7 @@
   pIter->iReadOff = iStart;
   pIter->nAlloc = 128;
   pIter->aAlloc = (u8*)sqlite3Malloc(pIter->nAlloc);
+  pIter->bEmbeddedSeq = pThread->bEmbeddedSeq;
 
   /* Try to xFetch() a mapping of the entire temp file. If this is possible,
   ** the PMA will be read via the mapping. Otherwise, use xRead().  */
@@ -470,7 +481,7 @@
           }
           rc = sqlite3OsRead(
               pThread->pTemp1, &pIter->aBuffer[iBuf], nRead, iStart
-              );
+          );
           assert( rc!=SQLITE_IOERR_SHORT_READ );
         }
       }
@@ -480,9 +491,20 @@
   if( rc==SQLITE_OK ){
     u64 nByte;                    /* Size of PMA in bytes */
     pIter->iEof = pThread->iTemp1Off;
-    rc = vdbeSorterIterVarint(pIter, &nByte);
-    pIter->iEof = pIter->iReadOff + nByte;
-    *pnByte += nByte;
+    if( pIter->bEmbeddedSeq==0 ){
+      u64 iSeq, nElem;
+      rc = vdbeSorterIterVarint(pIter, &iSeq);
+      pIter->iSeq = (int)iSeq;
+      if( rc==SQLITE_OK ){
+        rc = vdbeSorterIterVarint(pIter, &nElem);
+        *pnByte += (nElem * sqlite3VarintLen(iSeq));
+      }
+    }
+    if( rc==SQLITE_OK ){
+      rc = vdbeSorterIterVarint(pIter, &nByte);
+      pIter->iEof = pIter->iReadOff + nByte;
+      *pnByte += nByte;
+    }
   }
 
   if( rc==SQLITE_OK ){
@@ -751,6 +773,7 @@
   pSorter->nInMemory = 0;
   pSorter->bUsePMA = 0;
   pSorter->iMemory = 0;
+  pSorter->iNextSeq = 0;
 }
 
 /*
@@ -824,11 +847,15 @@
 ** Sort the linked list of records headed at pThread->pList. Return 
 ** SQLITE_OK if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if 
 ** an error occurs.
+**
+** If the pnElem argument is not NULL and no error occurs, set *pnElem to
+** the total number of elements in the list.
 */
-static int vdbeSorterSort(SorterThread *pThread){
+static int vdbeSorterSort(SorterThread *pThread, i64 *pnElem){
   int i;
   SorterRecord **aSlot;
   SorterRecord *p;
+  i64 nElem = 0;
 
   aSlot = (SorterRecord **)sqlite3MallocZero(64 * sizeof(SorterRecord *));
   if( !aSlot ){
@@ -856,6 +883,7 @@
     }
     aSlot[i] = p;
     p = pNext;
+    nElem++;
   }
 
   p = 0;
@@ -864,6 +892,7 @@
   }
   pThread->pList = p;
 
+  *pnElem = nElem;
   sqlite3_free(aSlot);
   return SQLITE_OK;
 }
@@ -989,7 +1018,7 @@
 **       Each record consists of a varint followed by a blob of data (the 
 **       key). The varint is the number of bytes in the blob of data.
 */
-static int vdbeSorterListToPMA(SorterThread *pThread){
+static int vdbeSorterListToPMA(SorterThread *pThread, i64 nElem){
   int rc = SQLITE_OK;             /* Return code */
   FileWriter writer;              /* Object used to write to the file */
 
@@ -1002,12 +1031,13 @@
     assert( rc!=SQLITE_OK || pThread->pTemp1 );
     assert( pThread->iTemp1Off==0 );
     assert( pThread->nPMA==0 );
+    assert( pThread->bEmbeddedSeq==0 );
   }
 
   /* Try to get the file to memory map */
   if( rc==SQLITE_OK ){
     rc = vdbeSorterExtendFile(
-        pThread->pTemp1, pThread->iTemp1Off + pThread->nInMemory + 9
+        pThread->pTemp1, pThread->iTemp1Off + 9 + 9 + 9 + pThread->nInMemory
     );
   }
 
@@ -1017,6 +1047,8 @@
 
     fileWriterInit(pThread->pTemp1, &writer, pThread->pgsz, pThread->iTemp1Off);
     pThread->nPMA++;
+    fileWriterWriteVarint(&writer, (u64)pThread->iSeq);
+    fileWriterWriteVarint(&writer, (u64)nElem);
     fileWriterWriteVarint(&writer, pThread->nInMemory);
     for(p=pThread->pList; p; p=pNext){
       pNext = p->u.pNext;
@@ -1091,7 +1123,7 @@
       ** PMA should be considered smaller. The VdbeSorter.aIter[] array
       ** is sorted from oldest to newest, so pIter1 contains older values
       ** than pIter2 iff (pIter1<pIter2).  */
-      if( iRes<0 || (iRes==0 && pIter1<pIter2) ){
+      if( iRes<0 || (iRes==0 && pIter1->iSeq < pIter2->iSeq) ){
         pMerger->aTree[i] = (int)(pIter1 - pMerger->aIter);
         pIter2 = &pMerger->aIter[ pMerger->aTree[i ^ 0x0001] ];
         pKey2 = pIter2->aKey;
@@ -1188,6 +1220,7 @@
           VdbeSorterIter *pIter = &pMerger->aIter[ pMerger->aTree[1] ];
           assert( pIter->pFile!=0 );        /* pIter is not at EOF */
           fileWriterWriteVarint(&writer, pIter->nKey);
+          fileWriterWriteVarint(&writer, (u64)pIter->iSeq);
           fileWriterWrite(&writer, pIter->aKey, pIter->nKey);
           rc = vdbeSorterNext(pThread, pMerger, &bEof);
         }
@@ -1200,19 +1233,25 @@
       pThread->pTemp1 = pTemp2;
       pThread->nPMA = (i / SORTER_MAX_MERGE_COUNT);
       pThread->iTemp1Off = iWriteOff;
+      pThread->bEmbeddedSeq = 1;
+      sqlite3OsUnfetch(pTemp2, 0, 0);
     }
   }else{
+    i64 nElem;
+
     /* Sort the pThread->pList list */
-    rc = vdbeSorterSort(pThread);
+    rc = vdbeSorterSort(pThread, &nElem);
 
     /* If required, write the list out to a PMA. */
     if( rc==SQLITE_OK && pThread->eWork==SORTER_THREAD_TO_PMA ){
 #ifdef SQLITE_DEBUG
       i64 nExpect = pThread->nInMemory
         + sqlite3VarintLen(pThread->nInMemory)
+        + sqlite3VarintLen(pThread->iSeq)
+        + sqlite3VarintLen(nElem)
         + pThread->iTemp1Off;
 #endif
-      rc = vdbeSorterListToPMA(pThread);
+      rc = vdbeSorterListToPMA(pThread, nElem);
       assert( rc!=SQLITE_OK || (nExpect==pThread->iTemp1Off) );
     }
   }
@@ -1268,6 +1307,7 @@
     pThread->eWork = SORTER_THREAD_TO_PMA;
     pThread->pList = pSorter->pRecord;
     pThread->nInMemory = pSorter->nInMemory;
+    pThread->iSeq = pSorter->iNextSeq++;
     pSorter->nInMemory = 0;
     pSorter->pRecord = 0;