Automatic level experiment
WIP: Worse than standard.
diff --git a/s2/cmd/s2c/main.go b/s2/cmd/s2c/main.go
index 7bcef82..3eb9336 100644
--- a/s2/cmd/s2c/main.go
+++ b/s2/cmd/s2c/main.go
@@ -28,6 +28,7 @@
var (
faster = flag.Bool("faster", false, "Compress faster, but with a minor compression loss")
slower = flag.Bool("slower", false, "Compress more, but a lot slower")
+ auto = flag.Bool("auto", runtime.GOMAXPROCS(0) >= 4, "Auto automatic compression that adapts to IO speed. Requires at least 4 threads.")
cpu = flag.Int("cpu", runtime.GOMAXPROCS(0), "Compress using this amount of threads")
blockSize = flag.String("blocksize", "4M", "Max block size. Examples: 64K, 256K, 1M, 4M. Must be power of two and <= 4MB")
safe = flag.Bool("safe", false, "Do not overwrite output files")
@@ -75,12 +76,18 @@
flag.PrintDefaults()
os.Exit(0)
}
- opts := []s2.WriterOption{s2.WriterBlockSize(int(sz)), s2.WriterConcurrency(*cpu), s2.WriterPadding(int(pad))}
- if !*faster {
- opts = append(opts, s2.WriterBetterCompression())
+ if *cpu < 4 {
+ *auto = false
}
- if *slower {
+ opts := []s2.WriterOption{s2.WriterBlockSize(int(sz)), s2.WriterConcurrency(*cpu), s2.WriterPadding(int(pad))}
+ switch {
+ case *slower:
opts = append(opts, s2.WriterBestCompression())
+ case *auto:
+ opts = append(opts, s2.WriterAutoCompression())
+ case *faster:
+ default:
+ opts = append(opts, s2.WriterBetterCompression())
}
wr := s2.NewWriter(nil, opts...)
diff --git a/s2/encode.go b/s2/encode.go
index 8f89e21..eaeaf10 100644
--- a/s2/encode.go
+++ b/s2/encode.go
@@ -15,6 +15,7 @@
"math/bits"
"runtime"
"sync"
+ "sync/atomic"
)
// Encode returns the encoded form of src. The returned slice may be a sub-
@@ -301,8 +302,9 @@
// Writer is an io.Writer that can write Snappy-compressed bytes.
type Writer struct {
- errMu sync.Mutex
- errState error
+ selectedLevel int64 // auto selected level.
+ errMu sync.Mutex
+ errState error
// ibuf is a buffer for the incoming (uncompressed) bytes.
ibuf []byte
@@ -330,6 +332,7 @@
levelFast
levelBetter
levelBest
+ levelAuto
)
type result []byte
@@ -377,6 +380,12 @@
w.output = toWrite
w.writerWg.Add(1)
+ slowerAuto := cap(toWrite) / 4
+ faster := 0
+ selected := levelBetter // Currently selected
+ w.selectedLevel = levelBetter
+ conseqAvailable := 0
+
// Start a writer goroutine that will write all output in order.
go func() {
defer w.writerWg.Done()
@@ -384,7 +393,56 @@
// Get a queued write.
for write := range toWrite {
// Wait for the data to be available.
- in := <-write
+ var in result
+ switch w.level {
+ case levelAuto:
+ select {
+ case in = <-write:
+ // We did not have to wait.
+ // If input buffer is low, we are waiting for input and we
+ // can compress better.
+
+ if len(toWrite) < slowerAuto {
+ faster--
+ }
+
+ // If we consecutively have blocks available, slow down;
+ // we are likely write limited.
+ if len(toWrite) >= cap(toWrite)/2 {
+ conseqAvailable++
+ }
+ if conseqAvailable > cap(toWrite) {
+ faster -= slowerAuto
+ conseqAvailable = 0
+ }
+ default:
+ // We have to wait for output to be compressed.
+ // If we have more than half queued, speed up compression.
+ if len(toWrite) > slowerAuto {
+ faster++
+ }
+ in = <-write
+ conseqAvailable = 0
+ }
+ if faster >= slowerAuto {
+ if selected > levelFast {
+ selected--
+ fmt.Println("making faster ->", selected)
+ atomic.StoreInt64(&w.selectedLevel, int64(selected))
+ }
+ faster = 0
+ }
+ if faster <= -slowerAuto {
+ if selected < levelBest {
+ selected++
+ fmt.Println("making slower ->", selected)
+ atomic.StoreInt64(&w.selectedLevel, int64(selected))
+ }
+ faster = 0
+ }
+ default:
+ in = <-write
+ }
if len(in) > 0 {
if w.err(nil) == nil {
// Don't expose data from previous buffers.
@@ -484,6 +542,26 @@
return n, w.err(nil)
}
+func (w *Writer) encodeBlockStream(dst, src []byte) (n int) {
+ level := w.level
+ if level == levelAuto {
+ selected := atomic.LoadInt64(&w.selectedLevel)
+ level = uint8(selected)
+ //level = levelBetter
+ }
+ switch level {
+ case levelFast:
+ return encodeBlock(dst, src)
+ case levelBetter:
+ return encodeBlockBetter(dst, src)
+ case levelBest:
+ return encodeBlockBest(dst, src)
+ case levelUncompressed:
+ return 0
+ }
+ return 0
+}
+
// EncodeBuffer will add a buffer to the stream.
// This is the fastest way to encode a stream,
// but the input buffer cannot be written to by the caller
@@ -538,15 +616,7 @@
// Attempt compressing.
n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
- var n2 int
- switch w.level {
- case levelFast:
- n2 = encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
- case levelBetter:
- n2 = encodeBlockBetter(obuf[obufHeaderLen+n:], uncompressed)
- case levelBest:
- n2 = encodeBlockBest(obuf[obufHeaderLen+n:], uncompressed)
- }
+ n2 := w.encodeBlockStream(obuf[obufHeaderLen+n:], uncompressed)
// Check if we should use this, or store as uncompressed instead.
if n2 > 0 {
@@ -618,15 +688,7 @@
// Attempt compressing.
n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
- var n2 int
- switch w.level {
- case levelFast:
- n2 = encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
- case levelBetter:
- n2 = encodeBlockBetter(obuf[obufHeaderLen+n:], uncompressed)
- case levelBest:
- n2 = encodeBlockBest(obuf[obufHeaderLen+n:], uncompressed)
- }
+ n2 := w.encodeBlockStream(obuf[obufHeaderLen+n:], uncompressed)
// Check if we should use this, or store as uncompressed instead.
if n2 > 0 {
@@ -697,15 +759,7 @@
// Attempt compressing.
n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
- var n2 int
- switch w.level {
- case levelFast:
- n2 = encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
- case levelBetter:
- n2 = encodeBlockBetter(obuf[obufHeaderLen+n:], uncompressed)
- case levelBest:
- n2 = encodeBlockBest(obuf[obufHeaderLen+n:], uncompressed)
- }
+ n2 := w.encodeBlockStream(obuf[obufHeaderLen+n:], uncompressed)
// Check if we should use this, or store as uncompressed instead.
if n2 > 0 {
@@ -938,11 +992,26 @@
if n <= 0 {
return errors.New("concurrency must be at least 1")
}
+ if w.selectedLevel == levelAuto && n < minAutoConcurrency {
+ return fmt.Errorf("with automatic compressed level, concurrency must be at least %d", minAutoConcurrency)
+ }
w.concurrency = n
return nil
}
}
+// WriterAutoCompression will automatic compression selection.
+// The writer concurrency will be set to at least 4.
+func WriterAutoCompression() WriterOption {
+ return func(w *Writer) error {
+ w.level = levelAuto
+ if w.concurrency < minAutoConcurrency {
+ w.concurrency = minAutoConcurrency
+ }
+ return nil
+ }
+}
+
// WriterBetterCompression will enable better compression.
// EncodeBetter compresses better than Encode but typically with a
// 10-40% speed decrease on both compression and decompression.
diff --git a/s2/encode_test.go b/s2/encode_test.go
index fd668c8..b8e9a00 100644
--- a/s2/encode_test.go
+++ b/s2/encode_test.go
@@ -21,6 +21,7 @@
"default": {},
"better": {WriterBetterCompression()},
"best": {WriterBestCompression()},
+ "auto": {WriterAutoCompression()},
"none": {WriterUncompressed()},
}
diff --git a/s2/s2.go b/s2/s2.go
index a3e4abf..ecc5f37 100644
--- a/s2/s2.go
+++ b/s2/s2.go
@@ -91,6 +91,9 @@
defaultBlockSize = 1 << 20
obufHeaderLen = checksumSize + chunkHeaderSize
+
+ // minAutoConcurrency is the minimum concurrency for automatic compression.
+ minAutoConcurrency = 4
)
const (