2018-08-19 本文已影响45人
1. 概述
2. queue_store实现
void queue_store::put(const std::string &queue_name, const MemBlock &message) {
auto queueID = static_cast<unsigned long>(getQueueID(queue_name));
auto thread_id = static_cast<int>(queueID % IO_THREAD);
asyncfileio_thread_t *ioThread = asyncfileio->work_threads_object[thread_id];
uint64_t which_queue_in_this_io_thread = queueID / IO_THREAD;
uint64_t queue_offset = ioThread->queue_counter[which_queue_in_this_io_thread]++;
uint64_t chunk_id = ((queue_offset / CLUSTER_SIZE) * (CLUSTER_SIZE * QUEUE_NUM_PER_IO_THREAD) +
(which_queue_in_this_io_thread * CLUSTER_SIZE) +
queue_offset % CLUSTER_SIZE);
uint64_t idx_file_offset = INDEX_ENTRY_SIZE * chunk_id;
int which_mapped_chunk = static_cast<int>(idx_file_offset / INDEX_MAPPED_BLOCK_RAW_SIZE);
uint64_t offset_in_mapped_chunk = idx_file_offset % INDEX_MAPPED_BLOCK_RAW_SIZE;
if (ioThread->index_file_memory_block[which_mapped_chunk] == nullptr) {
std::unique_lock<std::mutex> lock(ioThread->mapped_block_mtx[which_mapped_chunk]);
ioThread->mapped_block_cond[which_mapped_chunk].wait(lock, [ioThread, which_mapped_chunk]() -> bool {
return ioThread->index_file_memory_block[which_mapped_chunk] != nullptr;
char *buf = ioThread->index_file_memory_block[which_mapped_chunk] + offset_in_mapped_chunk;
if (message.size <= RAW_NORMAL_MESSAGE_SIZE) {
serialize_base36_decoding_skip_index((uint8_t *) message.ptr, message.size,
(uint8_t *) buf);
} else {
unsigned char large_msg_buf[4096];
uint64_t length = (uint64_t) serialize_base36_decoding_skip_index((uint8_t *) message.ptr, message.size,
(uint8_t *) large_msg_buf);
uint64_t offset = ioThread->data_file_current_size.fetch_add(length);
pwrite(ioThread->data_file_fd, large_msg_buf, length, offset);
memcpy(buf + 4, &offset, 8);
memcpy(buf + 12, &length, 8);
delete[] ((char *) (message.ptr));
int write_times = ++(ioThread->index_mapped_block_write_counter[which_mapped_chunk]);
if (write_times == INDEX_BLOCK_WRITE_TIMES_TO_FULL) {
asyncio_task_t *task = new asyncio_task_t(0);
vector<MemBlock> queue_store::get(const std::string &queue_name, long offset, long number) {
static thread_local int tid = ++tidCounter;
if (tid < CHECK_THREAD_NUM) {
return doPhase2(tid, queue_name, offset, number);
return doPhase3(tid, queue_name, offset, number);
std::vector<MemBlock> queue_store::doPhase2(int tid, const std::string &queue_name, long offset, long number) {
static thread_local vector<MemBlock> result;
auto queueID = static_cast<unsigned long>(getQueueID(queue_name));
int threadID = queueID % IO_THREAD;
asyncfileio_thread_t *asyncfileio_thread = asyncfileio->work_threads_object[threadID];
uint32_t which_queue_in_this_io_thread = queueID / IO_THREAD;
auto max_offset = std::min(static_cast<uint32_t>(offset + number),
uint64_t chunk_offset = (which_queue_in_this_io_thread * CLUSTER_SIZE);
static thread_local unsigned char *index_record = (unsigned char *) memalign(FILESYSTEM_BLOCK_SIZE,
for (auto queue_offset = static_cast<uint64_t>(offset); queue_offset < max_offset;) {
uint64_t chunk_id = ((queue_offset / CLUSTER_SIZE) * (CLUSTER_SIZE * QUEUE_NUM_PER_IO_THREAD) + chunk_offset +
queue_offset % CLUSTER_SIZE);
auto remaining_num = static_cast<uint32_t>(CLUSTER_SIZE - queue_offset % CLUSTER_SIZE); // >= 1
if (max_offset - queue_offset < remaining_num) {
remaining_num = static_cast<uint32_t >(max_offset - queue_offset);
uint64_t idx_file_offset = INDEX_ENTRY_SIZE * chunk_id;
idx_file_offset = (idx_file_offset / INDEX_MAPPED_BLOCK_RAW_SIZE * INDEX_MAPPED_BLOCK_ALIGNED_SIZE) +
(idx_file_offset % INDEX_MAPPED_BLOCK_RAW_SIZE);
uint64_t idx_file_offset_aligned_start = (idx_file_offset / FILESYSTEM_BLOCK_SIZE * FILESYSTEM_BLOCK_SIZE);
size_t which_mapped_chunk = idx_file_offset_aligned_start / INDEX_MAPPED_BLOCK_ALIGNED_SIZE;
if (which_mapped_chunk < asyncfileio_thread->index_mapped_flush_start_chunkID) {
pread(asyncfileio->index_fds[queueID % IO_THREAD], index_record,
((INDEX_ENTRY_SIZE * remaining_num + (idx_file_offset - idx_file_offset_aligned_start)) /
} else {
asyncfileio_thread->index_file_memory_block[which_mapped_chunk] +
(idx_file_offset_aligned_start % INDEX_MAPPED_BLOCK_ALIGNED_SIZE),
(INDEX_ENTRY_SIZE * remaining_num) + (idx_file_offset - idx_file_offset_aligned_start));
for (uint32_t i = 0; i < remaining_num; i++) {
char *output_buf = nullptr;
int output_length;
unsigned char *serialized =
index_record + INDEX_ENTRY_SIZE * i + idx_file_offset - idx_file_offset_aligned_start;
if ((serialized[0] & 0xff) >> 2 != LARGE_MESSAGE_MAGIC_CHAR) {
output_buf = (char *) deserialize_base36_encoding_add_index(serialized, INDEX_ENTRY_SIZE,
output_length, queue_offset + i);
} else { //长消息(因为评测数据没有长消息,貌似这里的代码没有进行优化)
log_info("big msg");
size_t large_msg_size;
size_t large_msg_offset;
memcpy(&large_msg_offset, serialized + 4, 8);
memcpy(&large_msg_size, serialized + 12, 8);
unsigned char large_msg_buf[4096];
pread(asyncfileio->data_fds[queueID % IO_THREAD], large_msg_buf, large_msg_size,
output_buf = (char *) deserialize_base36_encoding_add_index((uint8_t *) large_msg_buf, large_msg_size,
output_length, queue_offset + i);
result.emplace_back(output_buf, (size_t) output_length);
queue_offset += remaining_num;
return result;
volatile bool startedReaderThreadFlag = false;
std::vector<MemBlock> queue_store::doPhase3(int tid, const std::string &queue_name, long offset, long number) {
auto queueID = static_cast<unsigned long>(getQueueID(queue_name));
static thread_local vector<MemBlock> result;
int threadID = queueID % IO_THREAD;
asyncfileio_thread_t *asyncfileio_thread = asyncfileio->work_threads_object[threadID];
uint32_t which_queue_in_this_io_thread = queueID / IO_THREAD;
size_t max_queue_offset = asyncfileio_thread->queue_counter[which_queue_in_this_io_thread];
size_t max_result_num = 10 < (max_queue_offset - offset) ? 10 : (max_queue_offset - offset);
if (offset == 0 && !startedReaderThreadFlag) {
barrier1->Wait([this] {
for (int i = 0; i < IO_THREAD; i++) {
// for (size_t chunkID = asyncfileio->work_threads_object[i]->index_mapped_flush_start_chunkID;
// chunkID < asyncfileio->work_threads_object[i]->index_mapped_flush_end_chunkID; chunkID++) {
// //free(asyncfileio->work_threads_object[i]->index_file_memory_block[chunkID]);
// log_debug("free thread %d chunk id %ld", i, chunkID);
// }
string tmp_str = asyncfileio->file_prefix + "_" + std::to_string(i) + ".idx";
asyncfileio->index_fds[i] = open(tmp_str.c_str(), O_RDONLY | O_DIRECT, S_IRUSR | S_IWUSR);
// posix_fadvise(asyncfileio->index_fds[i], 0,
// asyncfileio->mapped_index_files_length[i],
printf("phase3 start\n");
startedReaderThreadFlag = true;
if (max_result_num <= 0) {
return result;
static thread_local unsigned char **reader_hash_buffer = new unsigned char *[TOTAL_QUEUE_NUM]();
static thread_local short *reader_hash_buffer_start_offset = new short[TOTAL_QUEUE_NUM]();
if (reader_hash_buffer[queueID] == nullptr) {
reader_hash_buffer[queueID] = (unsigned char *) memalign(FILESYSTEM_BLOCK_SIZE,
size_t read_num_left = max_result_num;
for (size_t new_offset = offset; new_offset < offset + max_result_num;) {
size_t this_max_read = std::min<size_t>(read_num_left, CLUSTER_SIZE - (new_offset % CLUSTER_SIZE));
if (new_offset % CLUSTER_SIZE == 0) {
uint64_t chunk_offset = (which_queue_in_this_io_thread * CLUSTER_SIZE);
uint64_t chunk_id = ((new_offset / CLUSTER_SIZE) * (CLUSTER_SIZE * QUEUE_NUM_PER_IO_THREAD) + chunk_offset +
new_offset % CLUSTER_SIZE);
uint64_t idx_file_offset = INDEX_ENTRY_SIZE * chunk_id;
idx_file_offset = (idx_file_offset / INDEX_MAPPED_BLOCK_RAW_SIZE * INDEX_MAPPED_BLOCK_ALIGNED_SIZE) +
(idx_file_offset % INDEX_MAPPED_BLOCK_RAW_SIZE);
uint64_t idx_file_offset_aligned_start = (idx_file_offset / FILESYSTEM_BLOCK_SIZE * FILESYSTEM_BLOCK_SIZE);
reader_hash_buffer_start_offset[queueID] = static_cast<short>(idx_file_offset -
size_t which_mapped_chunk = idx_file_offset_aligned_start / INDEX_MAPPED_BLOCK_ALIGNED_SIZE;
if (which_mapped_chunk < asyncfileio_thread->index_mapped_flush_start_chunkID) {
pread(asyncfileio->index_fds[threadID], reader_hash_buffer[queueID],
((INDEX_ENTRY_SIZE * CLUSTER_SIZE + (idx_file_offset - idx_file_offset_aligned_start)) /
} else {
asyncfileio_thread->index_file_memory_block[which_mapped_chunk] +
(idx_file_offset_aligned_start % INDEX_MAPPED_BLOCK_ALIGNED_SIZE),
(INDEX_ENTRY_SIZE * CLUSTER_SIZE) + (idx_file_offset - idx_file_offset_aligned_start));
long in_cluster_offset = new_offset % CLUSTER_SIZE;
for (uint32_t i = 0; i < this_max_read; i++) {
char *output_buf = nullptr;
int output_length;
unsigned char *serialized = reader_hash_buffer[queueID] + INDEX_ENTRY_SIZE * (in_cluster_offset + i) +
if ((serialized[0] & 0xff) >> 2 != LARGE_MESSAGE_MAGIC_CHAR) {
output_buf = (char *) deserialize_base36_encoding_add_index(serialized, INDEX_ENTRY_SIZE,
output_length, new_offset + i);
} else { //长消息
size_t large_msg_size;
size_t large_msg_offset;
memcpy(&large_msg_offset, serialized + 4, 8);
memcpy(&large_msg_size, serialized + 12, 8);
unsigned char large_msg_buf[4096];
pread(asyncfileio->data_fds[queueID % IO_THREAD], large_msg_buf, large_msg_size,
output_buf = (char *) deserialize_base36_encoding_add_index((uint8_t *) large_msg_buf, large_msg_size,
output_length, new_offset + i);
result.emplace_back(output_buf, (size_t) output_length);
new_offset += this_max_read;
read_num_left -= this_max_read;
return result;
queue_store::queue_store() {
barrier1 = new Barrier(CHECK_THREAD_NUM);
asyncfileio = new asyncfileio_t(DATA_FILE_PATH);
3. asyncfileio实现
class asyncfileio_thread_t {
const int thread_id;
atomic<long> data_file_current_size;
int data_file_fd;
size_t index_file_size;
int index_file_fd;
size_t current_index_mapped_start_offset;
size_t current_index_mapped_end_offset;
size_t current_index_mapped_start_chunk;
size_t index_mapped_flush_start_chunkID;
size_t index_mapped_flush_end_chunkID;
atomic<int> *index_mapped_block_write_counter;
std::mutex *mapped_block_mtx;
std::condition_variable *mapped_block_cond;
char **index_file_memory_block;
uint32_t *queue_counter;
BlockingQueue<asyncio_task_t *> *blockingQueue;
enum asyncfileio_thread_status status;
asyncfileio_thread_t(int tid, std::string file_prefix) : thread_id(tid) {
this->index_file_size = 0;
index_mapped_block_write_counter = new atomic<int>[MAX_MAPED_CHUNK_NUM];
index_file_memory_block = new char *[MAX_MAPED_CHUNK_NUM];
for (int i = 0; i < MAX_MAPED_CHUNK_NUM; i++) {
index_file_memory_block[i] = nullptr;
for (int i = 0; i < MAX_CONCURRENT_INDEX_MAPPED_BLOCK_NUM; i++) {
index_file_memory_block[i] = (char *) memalign(FILESYSTEM_BLOCK_SIZE, INDEX_MAPPED_BLOCK_ALIGNED_SIZE);
//index_file_memory_block[i] = ;new char[INDEX_MAPPED_BLOCK_SIZE];
queue_counter = new uint32_t[QUEUE_NUM_PER_IO_THREAD];
memset(queue_counter, 0, sizeof(uint32_t) * QUEUE_NUM_PER_IO_THREAD);
mapped_block_mtx = new std::mutex[MAX_MAPED_CHUNK_NUM];
mapped_block_cond = new std::condition_variable[MAX_MAPED_CHUNK_NUM];
this->blockingQueue = new BlockingQueue<asyncio_task_t *>;
string tmp_str = file_prefix + "_" + std::to_string(tid) + ".data";
this->data_file_fd = open(tmp_str.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR);
ftruncate(data_file_fd, 0);
ftruncate(data_file_fd, DATA_FILE_MAX_SIZE);
tmp_str = file_prefix + "_" + std::to_string(tid) + ".idx";
this->index_file_fd = open(tmp_str.c_str(), O_WRONLY | O_CREAT | O_DIRECT | O_SYNC, S_IRUSR | S_IWUSR);
ftruncate(index_file_fd, 0);
this->current_index_mapped_start_chunk = 0;
this->current_index_mapped_start_offset = 0;
this->current_index_mapped_end_offset =
this->index_file_size = 0;
void doIO(asyncio_task_t *asyncio_task) {
for (; current_index_mapped_start_chunk < MAX_MAPED_CHUNK_NUM; current_index_mapped_start_chunk++) {
if (index_mapped_block_write_counter[current_index_mapped_start_chunk].load() >=
ftruncate(index_file_fd, index_file_size);
pwrite(index_file_fd, index_file_memory_block[current_index_mapped_start_chunk],
INDEX_MAPPED_BLOCK_ALIGNED_SIZE * current_index_mapped_start_chunk);
int next_chunk = current_index_mapped_start_chunk + MAX_CONCURRENT_INDEX_MAPPED_BLOCK_NUM;
current_index_mapped_start_offset += INDEX_MAPPED_BLOCK_ALIGNED_SIZE;
current_index_mapped_end_offset += INDEX_MAPPED_BLOCK_ALIGNED_SIZE;
std::unique_lock<std::mutex> lock(mapped_block_mtx[next_chunk]);
index_file_memory_block[next_chunk] = index_file_memory_block[current_index_mapped_start_chunk];
index_file_memory_block[current_index_mapped_start_chunk] = nullptr;
log_info("io thread %d advanced to %d", this->thread_id, next_chunk);
} else {
bool allFlushFlag = false;
bool ioFinished = false;
Barrier *barrier;
atomic<int> *finish_thread_counter;
void ioThreadFunction(asyncfileio_thread_t *args) {
asyncfileio_thread_t *work_thread = args;
work_thread->status = AT_RUNNING;
for (;;) {
asyncio_task_t *task = work_thread->blockingQueue->take();
if (work_thread->status == AT_CLOSING || task->global_offset == -1) {
size_t force_flush_chunk_num = 0;
if (work_thread->thread_id < 1) {
force_flush_chunk_num = 1;
work_thread->index_mapped_flush_start_chunkID =
work_thread->current_index_mapped_start_chunk + force_flush_chunk_num;
work_thread->index_mapped_flush_end_chunkID = work_thread->index_mapped_flush_start_chunkID;
for (size_t i = work_thread->current_index_mapped_start_chunk;
i < work_thread->index_mapped_flush_start_chunkID &&
work_thread->index_file_memory_block[i] != nullptr; i++) {
work_thread->index_file_size += INDEX_MAPPED_BLOCK_ALIGNED_SIZE;
ftruncate(work_thread->index_file_fd, work_thread->index_file_size);
pwrite(work_thread->index_file_fd, work_thread->index_file_memory_block[i],
ftruncate(work_thread->data_file_fd, work_thread->data_file_current_size.load());
delete task;
class asyncfileio_t {
asyncfileio_t(std::string file_prefix) {
this->file_prefix = file_prefix;
finish_thread_counter = new atomic<int>(0);
barrier = new Barrier(SEND_THREAD_NUM);
for (int i = 0; i < IO_THREAD; i++) {
work_threads_object[i] = new asyncfileio_thread_t(i, file_prefix);
void startIOThread() {
for (int i = 0; i < IO_THREAD; i++) {
work_threads_handle[i] = std::thread(ioThreadFunction, work_threads_object[i]);
void waitFinishIO(int tid) {
if (!ioFinished) {
if (tid == 0) {
printf("in wait_flush function %ld\n", getCurrentTimeInMS());
barrier->Wait([this] {
printf("start send flush cmd %ld\n", getCurrentTimeInMS());
for (int i = 0; i < IO_THREAD; i++) {
asyncio_task_t *task = new asyncio_task_t(-1);
printf("after send flush cmd %ld\n", getCurrentTimeInMS());
if (tid == 0) {
printf("before wait flush finish %ld\n", getCurrentTimeInMS());
while (finish_thread_counter->load() < IO_THREAD) {};
if (tid == 0) {
printf("after wait flush finish %ld\n", getCurrentTimeInMS());
barrier->Wait([this] {
for (int i = 0; i < IO_THREAD; i++) {
string tmp_str = file_prefix + "_" + std::to_string(i) + ".idx";
index_fds[i] = open(tmp_str.c_str(), O_RDONLY, S_IRUSR | S_IWUSR);
data_fds[i] = work_threads_object[i]->data_file_fd;
mapped_index_files_length[i] = work_threads_object[i]->index_file_size;
int ret = posix_fadvise(index_fds[i], 0,
printf("ret %d\n", ret);
if (tid == 0) {
printf("finish wait_flush function %ld\n", getCurrentTimeInMS());
ioFinished = true;
~asyncfileio_t() {
std::thread flush_thread(flush_all_func, this);
string file_prefix;
asyncfileio_thread_t *work_threads_object[IO_THREAD];
std::thread work_threads_handle[IO_THREAD];
size_t mapped_index_files_length[IO_THREAD];
int index_fds[IO_THREAD];
int data_fds[IO_THREAD];
void flush_all_func(void *args) {
asyncfileio_t *asyncfileio = (asyncfileio_t *) args;
for (int tid = 0; tid < IO_THREAD; tid++) {
asyncfileio_thread_t *work_thread = asyncfileio->work_threads_object[tid];
for (size_t i = work_thread->index_mapped_flush_start_chunkID;
i < MAX_MAPED_CHUNK_NUM && work_thread->index_file_memory_block[i] != nullptr; i++) {
work_thread->index_file_size += INDEX_MAPPED_BLOCK_ALIGNED_SIZE;
ftruncate(work_thread->index_file_fd, work_thread->index_file_size);
pwrite(work_thread->index_file_fd, work_thread->index_file_memory_block[i],
4. Barrier实现
class Barrier {
explicit Barrier(std::size_t iCount) :
mGeneration(0) {
void Wait(std::function<void()> func) {
std::unique_lock<std::mutex> lLock{mMutex};
auto lGen = mGeneration;
if (!--mCount) {
mCount = mThreshold;
} else {
mCond.wait(lLock, [this, lGen] { return lGen != mGeneration; });
std::mutex mMutex;
std::condition_variable mCond;
std::size_t mThreshold;
std::size_t mCount;
std::size_t mGeneration;
5. fast_base64实现
int serialize_base64_decoding(uint8_t *message, uint16_t len, uint8_t *serialized) {
auto serialize_len = len - FIXED_PART_LEN;
int padding_chars = (4 - serialize_len % 4) % 4;
uint8_t *buf = message;
size_t estimated_length = 3 * (serialize_len / 4 + (serialize_len % 4 == 0 ? 0 : 1));
memcpy(serialized + estimated_length, message + serialize_len, FIXED_PART_LEN);
// attention: add padding chars, assume following chars enough >= 3
memcpy(message + serialize_len, "BLINK", padding_chars);
#ifdef __AVX2__
fast_avx2_base64_decode(reinterpret_cast<char *>(serialized),
reinterpret_cast<const char *>(buf),
serialize_len + padding_chars);
chromium_base64_decode(reinterpret_cast<char *>(serialized),
reinterpret_cast<const char *>(buf),
serialize_len + padding_chars);
serialized[estimated_length + FIXED_PART_LEN] = padding_chars;
return estimated_length + FIXED_PART_LEN + 1;
uint8_t *deserialize_base64_encoding(const uint8_t *serialized, uint16_t total_serialized_len, int &len) {
auto serialize_len = total_serialized_len - FIXED_PART_LEN - 1;
auto *deserialized = new uint8_t[total_serialized_len / 3 * 4 + 16];
#ifdef __AVX2__
size_t length = fast_avx2_base64_encode(reinterpret_cast<char *>(deserialized),
reinterpret_cast<const char *>(serialized), serialize_len);
size_t length = chromium_base64_encode(reinterpret_cast<char *>(deserialized),
reinterpret_cast<const char *>(serialized), serialize_len);
memcpy(deserialized + length - serialized[total_serialized_len - 1], serialized + serialize_len, FIXED_PART_LEN);
len = length - serialized[total_serialized_len - 1] + FIXED_PART_LEN;
return deserialized;
#define BASE64_INFO_LEN (2u)
#define INDEX_LEN (4u)
#define FIXED_PART_LEN (10u)
// Skip index =================================================================================================
int serialize_base64_decoding_skip_index(uint8_t *message, uint16_t len, uint8_t *serialized) {
auto serialize_len = len - FIXED_PART_LEN;
int padding_chars = (4 - serialize_len % 4) % 4;
uint8_t *buf = message;
size_t estimated_length = 3 * (serialize_len / 4 + (serialize_len % 4 == 0 ? 0 : 1));
memcpy(serialized + estimated_length, message + serialize_len, BASE64_INFO_LEN);
memcpy(serialized + estimated_length + BASE64_INFO_LEN, message + serialize_len + BASE64_INFO_LEN + INDEX_LEN,
// attention: add padding chars, assume following chars enough >= 3
memcpy(message + serialize_len, "BLINK", padding_chars);
#ifdef __AVX2__
fast_avx2_base64_decode(reinterpret_cast<char *>(serialized),
reinterpret_cast<const char *>(buf),
serialize_len + padding_chars);
chromium_base64_decode(reinterpret_cast<char *>(serialized),
reinterpret_cast<const char *>(buf),
serialize_len + padding_chars);
serialized[estimated_length + FIXED_PART_LEN - INDEX_LEN] = padding_chars;
return estimated_length + FIXED_PART_LEN - INDEX_LEN + 1;
uint8_t *deserialize_base64_encoding_add_index(const uint8_t *serialized, uint16_t total_serialized_len,
int &deserialized_len, int32_t idx) {
auto serialize_len = total_serialized_len - (FIXED_PART_LEN - INDEX_LEN) - 1;
auto *deserialized = new uint8_t[total_serialized_len / 3 * 4 + 16];
#ifdef __AVX2__
size_t length = fast_avx2_base64_encode(reinterpret_cast<char *>(deserialized),
reinterpret_cast<const char *>(serialized), serialize_len);
size_t length = chromium_base64_encode(reinterpret_cast<char *>(deserialized),
reinterpret_cast<const char *>(serialized), serialize_len);
size_t offset = length - serialized[total_serialized_len - 1];
memcpy(deserialized + offset, serialized + serialize_len, BASE64_INFO_LEN);
offset += BASE64_INFO_LEN;
memcpy(deserialized + offset, &idx, sizeof(int32_t));
offset += INDEX_LEN;
memcpy(deserialized + offset, serialized + serialize_len + BASE64_INFO_LEN, VARYING_VERIFY_LEN);
deserialized_len = length - serialized[total_serialized_len - 1] + FIXED_PART_LEN;
return deserialized;
void deserialize_base64_encoding_add_index_in_place(const uint8_t *serialized, uint16_t total_serialized_len,
uint8_t *deserialized, int &deserialized_len, int32_t idx) {
auto serialize_len = total_serialized_len - (FIXED_PART_LEN - INDEX_LEN) - 1;
#ifdef __AVX2__
size_t length = fast_avx2_base64_encode(reinterpret_cast<char *>(deserialized),
reinterpret_cast<const char *>(serialized), serialize_len);
size_t length = chromium_base64_encode(reinterpret_cast<char *>(deserialized),
reinterpret_cast<const char *>(serialized), serialize_len);
size_t offset = length - serialized[total_serialized_len - 1];
memcpy(deserialized + offset, serialized + serialize_len, BASE64_INFO_LEN);
offset += BASE64_INFO_LEN;
memcpy(deserialized + offset, &idx, sizeof(int32_t));
offset += INDEX_LEN;
memcpy(deserialized + offset, serialized + serialize_len + BASE64_INFO_LEN, VARYING_VERIFY_LEN);
deserialized_len = length - serialized[total_serialized_len - 1] + FIXED_PART_LEN;
// End of Skip index ========================================================================================
// ------------------------------- Begin of Base36 -------------------------------------------------------------
int serialize_base36_decoding_skip_index(uint8_t *message, uint16_t len, uint8_t *serialized) {
auto serialize_len = len - FIXED_PART_LEN;
int padding_chars = (4 - serialize_len % 4) % 4;
uint8_t *buf = message;
size_t estimated_length = 3 * (serialize_len / 4 + (serialize_len % 4 == 0 ? 0 : 1));
memcpy(serialized + estimated_length, message + serialize_len, BASE64_INFO_LEN);
memcpy(serialized + estimated_length + BASE64_INFO_LEN, message + serialize_len + BASE64_INFO_LEN + INDEX_LEN,
// attention: add padding chars, assume following chars enough >= 3
memcpy(message + serialize_len, "BLINK", padding_chars);
#ifdef __AVX2__
fast_avx2_base64_decode(reinterpret_cast<char *>(serialized),
reinterpret_cast<const char *>(buf),
serialize_len + padding_chars);
chromium_base64_decode(reinterpret_cast<char *>(serialized),
reinterpret_cast<const char *>(buf),
serialize_len + padding_chars);
return estimated_length + FIXED_PART_LEN - INDEX_LEN;
uint8_t *deserialize_base36_encoding_add_index(const uint8_t *serialized, uint16_t total_serialized_len,
int &deserialized_len, int32_t idx) {
auto serialize_len = total_serialized_len - (FIXED_PART_LEN - INDEX_LEN);
auto *deserialized = new uint8_t[total_serialized_len / 3 * 4 + 16];
// 1st: deserialize preparation: base64 encoding
#ifdef __AVX2__
size_t length = fast_avx2_base64_encode(reinterpret_cast<char *>(deserialized),
reinterpret_cast<const char *>(serialized), serialize_len);
size_t length = chromium_base64_encode(reinterpret_cast<char *>(deserialized),
reinterpret_cast<const char *>(serialized), serialize_len);
// 2nd: skip padding (padding could be 'A'-'Z', '+', '/', '=')
for (; deserialized[length - 1] >= 'A' && deserialized[length - 1] <= 'Z' && length >= 0; length--) {}
// 3rd: append other info
size_t offset = length;
memcpy(deserialized + offset, serialized + serialize_len, BASE64_INFO_LEN);
offset += BASE64_INFO_LEN;
memcpy(deserialized + offset, &idx, sizeof(int32_t));
offset += INDEX_LEN;
memcpy(deserialized + offset, serialized + serialize_len + BASE64_INFO_LEN, VARYING_VERIFY_LEN);
// 4th: assign the correct length
deserialized_len = length + FIXED_PART_LEN;
return deserialized;
// ------------------------------ End of Base36, do not support A-Z yet --------------------------------------------
int serialize(uint8_t *message, uint16_t len, uint8_t *serialized) {
// add the header to indicate raw message varying-length part size
int serialize_len = len - FIXED_PART_LEN;
if (len < 128) {
serialized[0] = static_cast<uint8_t>(len - FIXED_PART_LEN);
serialized += 1;
} else {
uint16_t tmp = (len - FIXED_PART_LEN);
serialized[0] = static_cast<uint8_t>((tmp >> 7u) | 0x80); // assume < 32767
serialized[1] = static_cast<uint8_t>(tmp & (uint8_t) 0x7f);
serialized += 2;
uint32_t next_extra_3bits_idx = 5u * serialize_len;
uint32_t next_5bits_idx = 0;
// attention: message is not usable later
for (int i = 0; i < serialize_len; i++) {
message[i] = message[i] >= 'a' ? message[i] - 'a' : message[i] - '0' + (uint8_t) 26;
// attention: must clear to be correct
memset(serialized, 0, (len - FIXED_PART_LEN));
// 1) construct the compressed part
for (int i = 0; i < serialize_len; i++) {
uint16_t cur_uchar = message[i];
uint16_t expand_uchar = cur_uchar < MAX_FIVE_BITS_INT ? (cur_uchar << 11u) : (MAX_FIVE_BITS_INT << 11u);
int shift_bits = (next_5bits_idx & 0x7u);
expand_uchar >>= shift_bits;
int idx = (next_5bits_idx >> 3u);
serialized[idx] |= (expand_uchar >> 8u);
serialized[idx + 1] |= (expand_uchar & 0xffu);
next_5bits_idx += 5;
if (cur_uchar >= MAX_FIVE_BITS_INT) {
// do extra bits operations
expand_uchar = ((cur_uchar - MAX_FIVE_BITS_INT) << 13u);
shift_bits = (next_extra_3bits_idx & 0x7u);
expand_uchar >>= shift_bits;
// assume little-endian
idx = (next_extra_3bits_idx >> 3u);
serialized[idx] |= (expand_uchar >> 8u);
serialized[idx + 1] |= (expand_uchar & 0xffu);
next_extra_3bits_idx += 3;
// 2) left FIXED_PART_LEN, should use memcpy
int start_copy_byte_idx = (next_extra_3bits_idx >> 3u) + ((next_extra_3bits_idx & 0x7u) != 0);
memcpy(serialized + start_copy_byte_idx, message + serialize_len, FIXED_PART_LEN);
return start_copy_byte_idx + FIXED_PART_LEN + (len < 128 ? 1 : 2);
uint8_t *deserialize(const uint8_t *serialized, int &len) {
// get the length of varying part
uint16_t varying_byte_len;
if ((serialized[0] & 0x80u) == 0) {
varying_byte_len = serialized[0];
serialized += 1;
} else {
varying_byte_len = static_cast<uint16_t>(((serialized[0] & 0x7fu) << 7u) + serialized[1]);
serialized += 2;
uint32_t next_extra_3bits_idx = 5u * varying_byte_len;
uint32_t next_5bits_idx = 0;
auto *deserialized = new uint8_t[varying_byte_len + 8];
len = varying_byte_len + FIXED_PART_LEN;
// deserialize
for (int i = 0; i < varying_byte_len; i++) {
int idx = (next_5bits_idx >> 3u);
uint16_t value = (serialized[idx] << 8u) + serialized[idx + 1];
value = (value >> (11u - (next_5bits_idx & 07u))) & MAX_FIVE_BITS_INT;
if (value != MAX_FIVE_BITS_INT) {
deserialized[i] = static_cast<uint8_t>(value < 26 ? 'a' + value : value - 26 + '0');
} else {
idx = (next_extra_3bits_idx >> 3u);
value = (serialized[idx] << 8u) + serialized[idx + 1];
value = (value >> (13u - (next_extra_3bits_idx & 07u))) & (uint8_t) 0x7;
deserialized[i] = value + '5';
next_extra_3bits_idx += 3;
next_5bits_idx += 5;
// 2) copy the fixed part
memcpy(deserialized + varying_byte_len,
serialized + (next_extra_3bits_idx >> 3u) + ((next_extra_3bits_idx & 0x7u) != 0), FIXED_PART_LEN);
return deserialized;
#define BADCHAR 0x01FFFFFF
* you can control if we use padding by commenting out this
* next line. However, I highly recommend you use padding and not
* using it should only be for compatability with a 3rd party.
* Also, 'no padding' is not tested!
#define DOPAD 1
* if we aren't doing padding
* set the pad character to NULL
#ifndef DOPAD
#undef CHARPAD
#define CHARPAD '\0'
size_t chromium_base64_encode(char* dest, const char* str, size_t len)
size_t i = 0;
uint8_t* p = (uint8_t*) dest;
/* unsigned here is important! */
uint8_t t1, t2, t3;
if (len > 2) {
for (; i < len - 2; i += 3) {
t1 = str[i]; t2 = str[i+1]; t3 = str[i+2];
*p++ = e0[t1];
*p++ = e1[((t1 & 0x03) << 4) | ((t2 >> 4) & 0x0F)];
*p++ = e1[((t2 & 0x0F) << 2) | ((t3 >> 6) & 0x03)];
*p++ = e2[t3];
switch (len - i) {
case 0:
case 1:
t1 = str[i];
*p++ = e0[t1];
*p++ = e1[(t1 & 0x03) << 4];
*p++ = CHARPAD;
*p++ = CHARPAD;
default: /* case 2 */
t1 = str[i]; t2 = str[i+1];
*p++ = e0[t1];
*p++ = e1[((t1 & 0x03) << 4) | ((t2 >> 4) & 0x0F)];
*p++ = e2[(t2 & 0x0F) << 2];
*p++ = CHARPAD;
*p = '\0';
return p - (uint8_t*)dest;
size_t chromium_base64_decode(char* dest, const char* src, size_t len)
if (len == 0) return 0;
#ifdef DOPAD
* if padding is used, then the message must be at least
* 4 chars and be a multiple of 4
if (len < 4 || (len % 4 != 0)) {
return MODP_B64_ERROR; /* error */
/* there can be at most 2 pad chars at the end */
if (src[len-1] == CHARPAD) {
if (src[len -1] == CHARPAD) {
size_t i;
int leftover = len % 4;
size_t chunks = (leftover == 0) ? len / 4 - 1 : len /4;
uint8_t* p = (uint8_t*)dest;
uint32_t x = 0;
const uint8_t* y = (uint8_t*)src;
for (i = 0; i < chunks; ++i, y += 4) {
x = d0[y[0]] | d1[y[1]] | d2[y[2]] | d3[y[3]];
if (x >= BADCHAR) return MODP_B64_ERROR;
*p++ = ((uint8_t*)(&x))[0];
*p++ = ((uint8_t*)(&x))[1];
*p++ = ((uint8_t*)(&x))[2];
switch (leftover) {
case 0:
x = d0[y[0]] | d1[y[1]] | d2[y[2]] | d3[y[3]];
if (x >= BADCHAR) return MODP_B64_ERROR;
*p++ = ((uint8_t*)(&x))[0];
*p++ = ((uint8_t*)(&x))[1];
*p = ((uint8_t*)(&x))[2];
return (chunks+1)*3;
case 1: /* with padding this is an impossible case */
x = d0[y[0]];
*p = *((uint8_t*)(&x)); // i.e. first char/byte in int
case 2: // * case 2, 1 output byte */
x = d0[y[0]] | d1[y[1]];
*p = *((uint8_t*)(&x)); // i.e. first char
default: /* case 3, 2 output bytes */
x = d0[y[0]] | d1[y[1]] | d2[y[2]]; /* 0x3c */
*p++ = ((uint8_t*)(&x))[0];
*p = ((uint8_t*)(&x))[1];
if (x >= BADCHAR) return MODP_B64_ERROR;
return 3*chunks + (6*leftover)/8;
#include "fastavxbase64.h"
#include <x86intrin.h>
#include <stdbool.h>
* This code borrows from Wojciech Mula's library at
* https://github.com/WojciechMula/base64simd (published under BSD)
* as well as code from Alfred Klomp's library https://github.com/aklomp/base64 (published under BSD)
* Note : Hardware such as Knights Landing might do poorly with this AVX2 code since it relies on shuffles. Alternatives might be faster.
static inline __m256i enc_reshuffle(const __m256i input) {
// translation from SSE into AVX2 of procedure
// https://github.com/WojciechMula/base64simd/blob/master/encode/unpack_bigendian.cpp
const __m256i in = _mm256_shuffle_epi8(input, _mm256_set_epi8(
10, 11, 9, 10,
7, 8, 6, 7,
4, 5, 3, 4,
1, 2, 0, 1,
14, 15, 13, 14,
11, 12, 10, 11,
8, 9, 7, 8,
5, 6, 4, 5
const __m256i t0 = _mm256_and_si256(in, _mm256_set1_epi32(0x0fc0fc00));
const __m256i t1 = _mm256_mulhi_epu16(t0, _mm256_set1_epi32(0x04000040));
const __m256i t2 = _mm256_and_si256(in, _mm256_set1_epi32(0x003f03f0));
const __m256i t3 = _mm256_mullo_epi16(t2, _mm256_set1_epi32(0x01000010));
return _mm256_or_si256(t1, t3);
static inline __m256i enc_translate(const __m256i in) {
const __m256i lut = _mm256_setr_epi8(
65, 71, -4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -19, -16, 0, 0, 65, 71,
-4, -4, -4, -4, -4, -4, -4, -4, -4, -4, -19, -16, 0, 0);
__m256i indices = _mm256_subs_epu8(in, _mm256_set1_epi8(51));
__m256i mask = _mm256_cmpgt_epi8((in), _mm256_set1_epi8(25));
indices = _mm256_sub_epi8(indices, mask);
__m256i out = _mm256_add_epi8(in, _mm256_shuffle_epi8(lut, indices));
return out;
static inline __m256i dec_reshuffle(__m256i in) {
// inlined procedure pack_madd from https://github.com/WojciechMula/base64simd/blob/master/decode/pack.avx2.cpp
// The only difference is that elements are reversed,
// only the multiplication constants were changed.
const __m256i merge_ab_and_bc = _mm256_maddubs_epi16(in, _mm256_set1_epi32(0x01400140)); //_mm256_maddubs_epi16 is likely expensive
__m256i out = _mm256_madd_epi16(merge_ab_and_bc, _mm256_set1_epi32(0x00011000));
// end of inlined
// Pack bytes together within 32-bit words, discarding words 3 and 7:
out = _mm256_shuffle_epi8(out, _mm256_setr_epi8(
2, 1, 0, 6, 5, 4, 10, 9, 8, 14, 13, 12, -1, -1, -1, -1,
2, 1, 0, 6, 5, 4, 10, 9, 8, 14, 13, 12, -1, -1, -1, -1
// the call to _mm256_permutevar8x32_epi32 could be replaced by a call to _mm256_storeu2_m128i but it is doubtful that it would help
return _mm256_permutevar8x32_epi32(
out, _mm256_setr_epi32(0, 1, 2, 4, 5, 6, -1, -1));
size_t fast_avx2_base64_encode(char* dest, const char* str, size_t len) {
const char* const dest_orig = dest;
if(len >= 32 - 4) {
// first load is masked
__m256i inputvector = _mm256_maskload_epi32((int const*)(str - 4), _mm256_set_epi32(
0x00000000 // we do not load the first 4 bytes
// Intel docs: Faults occur only due to mask-bit required memory accesses that caused the faults.
// Faults will not occur due to referencing any memory location if the corresponding mask bit for
//that memory location is 0. For example, no faults will be detected if the mask bits are all zero.
while(true) {
inputvector = enc_reshuffle(inputvector);
inputvector = enc_translate(inputvector);
_mm256_storeu_si256((__m256i *)dest, inputvector);
str += 24;
dest += 32;
len -= 24;
if(len >= 32) {
inputvector = _mm256_loadu_si256((__m256i *)(str - 4)); // no need for a mask here
// we could do a mask load as long as len >= 24
} else {
size_t scalarret = chromium_base64_encode(dest, str, len);
if(scalarret == MODP_B64_ERROR) return MODP_B64_ERROR;
return (dest - dest_orig) + scalarret;
size_t fast_avx2_base64_decode(char *out, const char *src, size_t srclen) {
char* out_orig = out;
while (srclen >= 45) {
// The input consists of six character sets in the Base64 alphabet,
// which we need to map back to the 6-bit values they represent.
// There are three ranges, two singles, and then there's the rest.
// # From To Add Characters
// 1 [43] [62] +19 +
// 2 [47] [63] +16 /
// 3 [48..57] [52..61] +4 0..9
// 4 [65..90] [0..25] -65 A..Z
// 5 [97..122] [26..51] -71 a..z
// (6) Everything else => invalid input
__m256i str = _mm256_loadu_si256((__m256i *)src);
// code by @aqrit from
// https://github.com/WojciechMula/base64simd/issues/3#issuecomment-271137490
// transated into AVX2
const __m256i lut_lo = _mm256_setr_epi8(
0x15, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11,
0x11, 0x11, 0x13, 0x1A, 0x1B, 0x1B, 0x1B, 0x1A,
0x15, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11,
0x11, 0x11, 0x13, 0x1A, 0x1B, 0x1B, 0x1B, 0x1A
const __m256i lut_hi = _mm256_setr_epi8(
0x10, 0x10, 0x01, 0x02, 0x04, 0x08, 0x04, 0x08,
0x10, 0x10, 0x10, 0x10, 0x10, 0x10, 0x10, 0x10,
0x10, 0x10, 0x01, 0x02, 0x04, 0x08, 0x04, 0x08,
0x10, 0x10, 0x10, 0x10, 0x10, 0x10, 0x10, 0x10
const __m256i lut_roll = _mm256_setr_epi8(
0, 16, 19, 4, -65, -65, -71, -71,
0, 0, 0, 0, 0, 0, 0, 0,
0, 16, 19, 4, -65, -65, -71, -71,
0, 0, 0, 0, 0, 0, 0, 0
const __m256i mask_2F = _mm256_set1_epi8(0x2f);
// lookup
__m256i hi_nibbles = _mm256_srli_epi32(str, 4);
__m256i lo_nibbles = _mm256_and_si256(str, mask_2F);
const __m256i lo = _mm256_shuffle_epi8(lut_lo, lo_nibbles);
const __m256i eq_2F = _mm256_cmpeq_epi8(str, mask_2F);
hi_nibbles = _mm256_and_si256(hi_nibbles, mask_2F);
const __m256i hi = _mm256_shuffle_epi8(lut_hi, hi_nibbles);
const __m256i roll = _mm256_shuffle_epi8(lut_roll, _mm256_add_epi8(eq_2F, hi_nibbles));
if (!_mm256_testz_si256(lo, hi)) {
str = _mm256_add_epi8(str, roll);
// end of copied function
srclen -= 32;
src += 32;
// end of inlined function
// Reshuffle the input to packed 12-byte output format:
str = dec_reshuffle(str);
_mm256_storeu_si256((__m256i *)out, str);
out += 24;
size_t scalarret = chromium_base64_decode(out, src, srclen);
if(scalarret == MODP_B64_ERROR) return MODP_B64_ERROR;
return (out - out_orig) + scalarret;