1
0
mirror of https://github.com/ceph/ceph-csi.git synced 2025-01-12 06:49:45 +00:00
ceph-csi/vendor/github.com/pierrec/lz4/writer.go
Niels de Vos 91774fc936 rebase: vendor dependencies for Vault API
Uses github.com/libopenstorage/secrets to communicate with Vault. This
removes the need for maintaining our own limited Vault APIs.

By adding the new dependency, several other packages got updated in the
process. Unused indirect dependencies have been removed from go.mod.

Signed-off-by: Niels de Vos <ndevos@redhat.com>
2020-11-29 04:03:59 +00:00

409 lines
10 KiB
Go

package lz4
import (
"encoding/binary"
"fmt"
"github.com/pierrec/lz4/internal/xxh32"
"io"
"runtime"
)
// zResult contains the results of compressing a block.
type zResult struct {
size uint32 // Block header
data []byte // Compressed data
checksum uint32 // Data checksum
}
// Writer implements the LZ4 frame encoder.
type Writer struct {
Header
// Handler called when a block has been successfully written out.
// It provides the number of bytes written.
OnBlockDone func(size int)
buf [19]byte // magic number(4) + header(flags(2)+[Size(8)+DictID(4)]+checksum(1)) does not exceed 19 bytes
dst io.Writer // Destination.
checksum xxh32.XXHZero // Frame checksum.
data []byte // Data to be compressed + buffer for compressed data.
idx int // Index into data.
hashtable [winSize]int // Hash table used in CompressBlock().
// For concurrency.
c chan chan zResult // Channel for block compression goroutines and writer goroutine.
err error // Any error encountered while writing to the underlying destination.
}
// NewWriter returns a new LZ4 frame encoder.
// No access to the underlying io.Writer is performed.
// The supplied Header is checked at the first Write.
// It is ok to change it before the first Write but then not until a Reset() is performed.
func NewWriter(dst io.Writer) *Writer {
z := new(Writer)
z.Reset(dst)
return z
}
// WithConcurrency sets the number of concurrent go routines used for compression.
// A negative value sets the concurrency to GOMAXPROCS.
func (z *Writer) WithConcurrency(n int) *Writer {
switch {
case n == 0 || n == 1:
z.c = nil
return z
case n < 0:
n = runtime.GOMAXPROCS(0)
}
z.c = make(chan chan zResult, n)
// Writer goroutine managing concurrent block compression goroutines.
go func() {
// Process next block compression item.
for c := range z.c {
// Read the next compressed block result.
// Waiting here ensures that the blocks are output in the order they were sent.
// The incoming channel is always closed as it indicates to the caller that
// the block has been processed.
res := <-c
n := len(res.data)
if n == 0 {
// Notify the block compression routine that we are done with its result.
// This is used when a sentinel block is sent to terminate the compression.
close(c)
return
}
// Write the block.
if err := z.writeUint32(res.size); err != nil && z.err == nil {
z.err = err
}
if _, err := z.dst.Write(res.data); err != nil && z.err == nil {
z.err = err
}
if z.BlockChecksum {
if err := z.writeUint32(res.checksum); err != nil && z.err == nil {
z.err = err
}
}
if isCompressed := res.size&compressedBlockFlag == 0; isCompressed {
// It is now safe to release the buffer as no longer in use by any goroutine.
putBuffer(cap(res.data), res.data)
}
if h := z.OnBlockDone; h != nil {
h(n)
}
close(c)
}
}()
return z
}
// newBuffers instantiates new buffers which size matches the one in Header.
// The returned buffers are for decompression and compression respectively.
func (z *Writer) newBuffers() {
bSize := z.Header.BlockMaxSize
buf := getBuffer(bSize)
z.data = buf[:bSize] // Uncompressed buffer is the first half.
}
// freeBuffers puts the writer's buffers back to the pool.
func (z *Writer) freeBuffers() {
// Put the buffer back into the pool, if any.
putBuffer(z.Header.BlockMaxSize, z.data)
z.data = nil
}
// writeHeader builds and writes the header (magic+header) to the underlying io.Writer.
func (z *Writer) writeHeader() error {
// Default to 4Mb if BlockMaxSize is not set.
if z.Header.BlockMaxSize == 0 {
z.Header.BlockMaxSize = blockSize4M
}
// The only option that needs to be validated.
bSize := z.Header.BlockMaxSize
if !isValidBlockSize(z.Header.BlockMaxSize) {
return fmt.Errorf("lz4: invalid block max size: %d", bSize)
}
// Allocate the compressed/uncompressed buffers.
// The compressed buffer cannot exceed the uncompressed one.
z.newBuffers()
z.idx = 0
// Size is optional.
buf := z.buf[:]
// Set the fixed size data: magic number, block max size and flags.
binary.LittleEndian.PutUint32(buf[0:], frameMagic)
flg := byte(Version << 6)
flg |= 1 << 5 // No block dependency.
if z.Header.BlockChecksum {
flg |= 1 << 4
}
if z.Header.Size > 0 {
flg |= 1 << 3
}
if !z.Header.NoChecksum {
flg |= 1 << 2
}
buf[4] = flg
buf[5] = blockSizeValueToIndex(z.Header.BlockMaxSize) << 4
// Current buffer size: magic(4) + flags(1) + block max size (1).
n := 6
// Optional items.
if z.Header.Size > 0 {
binary.LittleEndian.PutUint64(buf[n:], z.Header.Size)
n += 8
}
// The header checksum includes the flags, block max size and optional Size.
buf[n] = byte(xxh32.ChecksumZero(buf[4:n]) >> 8 & 0xFF)
z.checksum.Reset()
// Header ready, write it out.
if _, err := z.dst.Write(buf[0 : n+1]); err != nil {
return err
}
z.Header.done = true
if debugFlag {
debug("wrote header %v", z.Header)
}
return nil
}
// Write compresses data from the supplied buffer into the underlying io.Writer.
// Write does not return until the data has been written.
func (z *Writer) Write(buf []byte) (int, error) {
if !z.Header.done {
if err := z.writeHeader(); err != nil {
return 0, err
}
}
if debugFlag {
debug("input buffer len=%d index=%d", len(buf), z.idx)
}
zn := len(z.data)
var n int
for len(buf) > 0 {
if z.idx == 0 && len(buf) >= zn {
// Avoid a copy as there is enough data for a block.
if err := z.compressBlock(buf[:zn]); err != nil {
return n, err
}
n += zn
buf = buf[zn:]
continue
}
// Accumulate the data to be compressed.
m := copy(z.data[z.idx:], buf)
n += m
z.idx += m
buf = buf[m:]
if debugFlag {
debug("%d bytes copied to buf, current index %d", n, z.idx)
}
if z.idx < len(z.data) {
// Buffer not filled.
if debugFlag {
debug("need more data for compression")
}
return n, nil
}
// Buffer full.
if err := z.compressBlock(z.data); err != nil {
return n, err
}
z.idx = 0
}
return n, nil
}
// compressBlock compresses a block.
func (z *Writer) compressBlock(data []byte) error {
if !z.NoChecksum {
_, _ = z.checksum.Write(data)
}
if z.c != nil {
c := make(chan zResult)
z.c <- c // Send now to guarantee order
go writerCompressBlock(c, z.Header, data)
return nil
}
zdata := z.data[z.Header.BlockMaxSize:cap(z.data)]
// The compressed block size cannot exceed the input's.
var zn int
if level := z.Header.CompressionLevel; level != 0 {
zn, _ = CompressBlockHC(data, zdata, level)
} else {
zn, _ = CompressBlock(data, zdata, z.hashtable[:])
}
var bLen uint32
if debugFlag {
debug("block compression %d => %d", len(data), zn)
}
if zn > 0 && zn < len(data) {
// Compressible and compressed size smaller than uncompressed: ok!
bLen = uint32(zn)
zdata = zdata[:zn]
} else {
// Uncompressed block.
bLen = uint32(len(data)) | compressedBlockFlag
zdata = data
}
if debugFlag {
debug("block compression to be written len=%d data len=%d", bLen, len(zdata))
}
// Write the block.
if err := z.writeUint32(bLen); err != nil {
return err
}
written, err := z.dst.Write(zdata)
if err != nil {
return err
}
if h := z.OnBlockDone; h != nil {
h(written)
}
if !z.BlockChecksum {
if debugFlag {
debug("current frame checksum %x", z.checksum.Sum32())
}
return nil
}
checksum := xxh32.ChecksumZero(zdata)
if debugFlag {
debug("block checksum %x", checksum)
defer func() { debug("current frame checksum %x", z.checksum.Sum32()) }()
}
return z.writeUint32(checksum)
}
// Flush flushes any pending compressed data to the underlying writer.
// Flush does not return until the data has been written.
// If the underlying writer returns an error, Flush returns that error.
func (z *Writer) Flush() error {
if debugFlag {
debug("flush with index %d", z.idx)
}
if z.idx == 0 {
return nil
}
data := z.data[:z.idx]
z.idx = 0
if z.c == nil {
return z.compressBlock(data)
}
if !z.NoChecksum {
_, _ = z.checksum.Write(data)
}
c := make(chan zResult)
z.c <- c
writerCompressBlock(c, z.Header, data)
return nil
}
func (z *Writer) close() error {
if z.c == nil {
return nil
}
// Send a sentinel block (no data to compress) to terminate the writer main goroutine.
c := make(chan zResult)
z.c <- c
c <- zResult{}
// Wait for the main goroutine to complete.
<-c
// At this point the main goroutine has shut down or is about to return.
z.c = nil
return z.err
}
// Close closes the Writer, flushing any unwritten data to the underlying io.Writer, but does not close the underlying io.Writer.
func (z *Writer) Close() error {
if !z.Header.done {
if err := z.writeHeader(); err != nil {
return err
}
}
if err := z.Flush(); err != nil {
return err
}
if err := z.close(); err != nil {
return err
}
z.freeBuffers()
if debugFlag {
debug("writing last empty block")
}
if err := z.writeUint32(0); err != nil {
return err
}
if z.NoChecksum {
return nil
}
checksum := z.checksum.Sum32()
if debugFlag {
debug("stream checksum %x", checksum)
}
return z.writeUint32(checksum)
}
// Reset clears the state of the Writer z such that it is equivalent to its
// initial state from NewWriter, but instead writing to w.
// No access to the underlying io.Writer is performed.
func (z *Writer) Reset(w io.Writer) {
n := cap(z.c)
_ = z.close()
z.freeBuffers()
z.Header.Reset()
z.dst = w
z.checksum.Reset()
z.idx = 0
z.err = nil
z.WithConcurrency(n)
}
// writeUint32 writes a uint32 to the underlying writer.
func (z *Writer) writeUint32(x uint32) error {
buf := z.buf[:4]
binary.LittleEndian.PutUint32(buf, x)
_, err := z.dst.Write(buf)
return err
}
// writerCompressBlock compresses data into a pooled buffer and writes its result
// out to the input channel.
func writerCompressBlock(c chan zResult, header Header, data []byte) {
zdata := getBuffer(header.BlockMaxSize)
// The compressed block size cannot exceed the input's.
var zn int
if level := header.CompressionLevel; level != 0 {
zn, _ = CompressBlockHC(data, zdata, level)
} else {
var hashTable [winSize]int
zn, _ = CompressBlock(data, zdata, hashTable[:])
}
var res zResult
if zn > 0 && zn < len(data) {
res.size = uint32(zn)
res.data = zdata[:zn]
} else {
res.size = uint32(len(data)) | compressedBlockFlag
res.data = data
}
if header.BlockChecksum {
res.checksum = xxh32.ChecksumZero(res.data)
}
c <- res
}