blob: 40f95f5a4b83fdcc1a6b4c34f5f7182825c30ab7 [file] [log] [blame] [edit]
//===-- QueueStreamer.h - Stream data from external source ------*- C++ -*-===//
//
// The LLVM Compiler Infrastructure
//
// This file is distributed under the University of Illinois Open Source
// License. See LICENSE.TXT for details.
//
//===----------------------------------------------------------------------===//
//
// Implements LLVM's interface for fetching data from a stream source
// (DataStreamer). Typically, bytes are pushed by an external source.
// Typically, the there is a handler thread that waits for push requests
// and calls PutBytes, and a consumer thread that calls GetBytes. The bytes
// are buffered until the consumer calls GetBytes to remove them.
// The blocking behavior of GetBytes and PutBytes means that if the consumer
// is faster than the producer, then the whole consumer pipeline can block
// waiting for the producer. Similarly, if the consumer is slower, then PutBytes
// will block, and the external source (producer) will know how far along
// the consumer has advanced, modulo the amount in the bounded buffer of
// the QueueStreamer.
//
//===----------------------------------------------------------------------===//
#ifndef LLVM_SUPPORT_QUEUESTREAMER_H
#define LLVM_SUPPORT_QUEUESTREAMER_H
#include <condition_variable>
#include <mutex>
#include <vector>
#include "llvm/Support/DataStream.h"
namespace llvm {
class QueueStreamer : public DataStreamer {
QueueStreamer(const QueueStreamer &) = delete;
QueueStreamer &operator=(const QueueStreamer &) = delete;
enum {
// Initial size of the queue's buffer.
BaseSize = 64 * 1024,
// Maximum size of the queue. Since PutBytes and GetBytes may block,
// the partial-copying behavior of GetBytes and PutBytes allows progress
// to be made and prevents deadlock even if the requested number of
// bytes from Put/Get are greater than the size limit.
// Keep the max size "small" so that the external source can approximate
// how far along the consumer has advanced (modulo this buffer amount).
MaxSize = 256 * 1024
};
public:
QueueStreamer() : Done(false), Prod(0), Cons(0) {
Bytes.resize(BaseSize);
}
// Copy Len bytes from the QueueStreamer into buf. If there are less
// than Len bytes available, copy as many as there are and signal the
// thread that may be blocking on PutBytes, and block GetBytes to wait
// for the rest. If all bytes have been received (SetDone is called)
// and there are fewer than Len bytes available, copy all remaining bytes.
// Return the number of bytes copied.
size_t GetBytes(unsigned char *Buf, size_t Len) override;
// Copy Len bytes from Buf into the QueueStreamer. If there is not enough
// space in the queue, copy as many bytes as will fit, signal the thread
// that may be blocking on GetBytes, and block until there is enough space
// for the rest. Return the number of bytes copied.
size_t PutBytes(unsigned char *Buf, size_t Len);
// Called by the same thread that does PutBytes. Signals the end of the
// data stream and may unblock GetBytes.
void SetDone();
private:
bool Done;
typedef std::mutex LockType;
LockType Mutex;
std::condition_variable Cond;
// Variables and functions to manage the circular queue
std::vector<unsigned char> Bytes;
size_t Prod; // Queue producer index
size_t Cons; // Queue consumer index
size_t queueSize() const {
return Prod >= Cons ? Prod - Cons : Bytes.size() - (Cons - Prod);
}
size_t capacityRemaining() const {
return (Prod >= Cons ? Bytes.size() - (Prod - Cons) : (Cons - Prod)) - 1;
}
void queueResize();
void queuePut(unsigned char *Buf, size_t Len);
void queueGet(unsigned char *Buf, size_t Len);
};
} // end of namespace llvm
#endif // LLVM_SUPPORT_QUEUESTREAMER_H