512 lines
14 KiB
C
512 lines
14 KiB
C
/*
|
|
* Copyright (c) 2012-2013 Spotify AB
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
|
|
* use this file except in compliance with the License. You may obtain a copy of
|
|
* the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
* License for the specific language governing permissions and limitations under
|
|
* the License.
|
|
*/
|
|
#include <string.h>
|
|
#include <errno.h>
|
|
#include <stdlib.h>
|
|
#include <sys/types.h>
|
|
#include <sys/stat.h>
|
|
#include <fcntl.h>
|
|
#include <sys/mman.h>
|
|
|
|
#include <snappy-c.h>
|
|
|
|
#include "sparkey.h"
|
|
#include "sparkey-internal.h"
|
|
#include "logheader.h"
|
|
#include "endiantools.h"
|
|
#include "util.h"
|
|
|
|
#define MAGIC_VALUE_LOGITER (0xd765c8cc)
|
|
#define MAGIC_VALUE_LOGREADER (0xe93356c4)
|
|
|
|
static inline uint64_t min64(uint64_t a, uint64_t b) {
|
|
if (a < b) {
|
|
return a;
|
|
}
|
|
return b;
|
|
}
|
|
|
|
static inline uint64_t read_vlq(uint8_t * array, uint64_t *position) {
|
|
uint64_t res = 0;
|
|
uint64_t shift = 0;
|
|
uint64_t tmp, tmp2;
|
|
while (1) {
|
|
tmp = array[(*position)++];
|
|
tmp2 = tmp & 0x7f;
|
|
if (tmp == tmp2) {
|
|
return res | tmp << shift;
|
|
}
|
|
res |= tmp2 << shift;
|
|
shift += 7;
|
|
}
|
|
return res;
|
|
}
|
|
|
|
sparkey_returncode sparkey_logreader_open_noalloc(sparkey_logreader *log, const char *filename) {
|
|
int fd = 0;
|
|
sparkey_returncode returncode;
|
|
TRY(sparkey_load_logheader(&log->header, filename), cleanup);
|
|
log->data_len = log->header.data_end;
|
|
|
|
struct stat s;
|
|
stat(filename, &s);
|
|
if (log->data_len > (uint64_t) s.st_size) {
|
|
returncode = SPARKEY_LOG_TOO_SMALL;
|
|
goto cleanup;
|
|
}
|
|
|
|
fd = open(filename, O_RDONLY);
|
|
if (fd < 0) {
|
|
returncode = sparkey_open_returncode(errno);
|
|
goto cleanup;
|
|
}
|
|
log->fd = fd;
|
|
|
|
log->data = mmap(NULL, log->data_len, PROT_READ, MAP_SHARED, fd, 0);
|
|
if (log->data == MAP_FAILED) {
|
|
returncode = SPARKEY_MMAP_FAILED;
|
|
goto cleanup;
|
|
}
|
|
|
|
log->open_status = MAGIC_VALUE_LOGREADER;
|
|
return SPARKEY_SUCCESS;
|
|
|
|
cleanup:
|
|
if (fd > 0) close(fd);
|
|
return returncode;
|
|
}
|
|
|
|
sparkey_returncode sparkey_logreader_open(sparkey_logreader **log_ref, const char *filename) {
|
|
RETHROW(correct_endian_platform());
|
|
|
|
sparkey_logreader *log = malloc(sizeof(sparkey_logreader));
|
|
if (log == NULL) {
|
|
return SPARKEY_INTERNAL_ERROR;
|
|
}
|
|
|
|
sparkey_returncode returncode;
|
|
TRY(sparkey_logreader_open_noalloc(log, filename), cleanup);
|
|
|
|
*log_ref = log;
|
|
return SPARKEY_SUCCESS;
|
|
|
|
cleanup:
|
|
free(log);
|
|
return returncode;
|
|
}
|
|
|
|
void sparkey_logreader_close_nodealloc(sparkey_logreader *log) {
|
|
if (log == NULL) {
|
|
return;
|
|
}
|
|
if (log->open_status != MAGIC_VALUE_LOGREADER) {
|
|
return;
|
|
}
|
|
log->open_status = 0;
|
|
if (log->data != NULL) {
|
|
munmap(log->data, log->data_len);
|
|
log->data = NULL;
|
|
}
|
|
close(log->fd);
|
|
log->fd = -1;
|
|
}
|
|
|
|
void sparkey_logreader_close(sparkey_logreader **log_ref) {
|
|
if (log_ref == NULL) {
|
|
return;
|
|
}
|
|
sparkey_logreader *log = *log_ref;
|
|
sparkey_logreader_close_nodealloc(log);
|
|
free(log);
|
|
*log_ref = NULL;
|
|
}
|
|
|
|
static sparkey_returncode assert_log_open(sparkey_logreader *log) {
|
|
if (log->open_status != MAGIC_VALUE_LOGREADER) {
|
|
return SPARKEY_LOG_CLOSED;
|
|
}
|
|
return SPARKEY_SUCCESS;
|
|
}
|
|
|
|
static sparkey_returncode assert_iter_open(sparkey_logiter *iter, sparkey_logreader *log) {
|
|
RETHROW(assert_log_open(log));
|
|
if (iter->open_status != MAGIC_VALUE_LOGITER) {
|
|
return SPARKEY_LOG_ITERATOR_CLOSED;
|
|
}
|
|
if (iter->file_identifier != log->header.file_identifier) {
|
|
return SPARKEY_LOG_ITERATOR_MISMATCH;
|
|
}
|
|
return SPARKEY_SUCCESS;
|
|
}
|
|
|
|
sparkey_returncode sparkey_logiter_create(sparkey_logiter **iter_ref, sparkey_logreader *log) {
|
|
RETHROW(assert_log_open(log));
|
|
|
|
sparkey_logiter *iter = malloc(sizeof(sparkey_logiter));
|
|
if (iter == NULL) {
|
|
return SPARKEY_INTERNAL_ERROR;
|
|
}
|
|
|
|
iter->open_status = MAGIC_VALUE_LOGITER;
|
|
iter->file_identifier = log->header.file_identifier;
|
|
iter->block_position = 0;
|
|
iter->next_block_position = log->header.header_size;
|
|
iter->block_offset = 0;
|
|
iter->block_len = 0;
|
|
iter->state = SPARKEY_ITER_NEW;
|
|
|
|
switch (log->header.compression_type) {
|
|
case SPARKEY_COMPRESSION_NONE:
|
|
iter->compression_buf_allocated = 0;
|
|
break;
|
|
case SPARKEY_COMPRESSION_SNAPPY:
|
|
iter->compression_buf_allocated = 1;
|
|
iter->compression_buf = malloc(log->header.compression_block_size);
|
|
if (iter->compression_buf == NULL) {
|
|
free(iter);
|
|
return SPARKEY_INTERNAL_ERROR;
|
|
}
|
|
break;
|
|
default:
|
|
free(iter);
|
|
return SPARKEY_INTERNAL_ERROR;
|
|
}
|
|
|
|
*iter_ref = iter;
|
|
return SPARKEY_SUCCESS;
|
|
}
|
|
|
|
void sparkey_logiter_close(sparkey_logiter **iter_ref) {
|
|
if (iter_ref == NULL) {
|
|
return;
|
|
}
|
|
sparkey_logiter *iter = *iter_ref;
|
|
if (iter == NULL) {
|
|
return;
|
|
}
|
|
if (iter->open_status != MAGIC_VALUE_LOGITER) {
|
|
return;
|
|
}
|
|
iter->open_status = 0;
|
|
|
|
if (iter->compression_buf_allocated) {
|
|
free(iter->compression_buf);
|
|
}
|
|
free(iter);
|
|
*iter_ref = NULL;
|
|
}
|
|
|
|
static sparkey_returncode seekblock(sparkey_logiter *iter, sparkey_logreader *log, uint64_t position) {
|
|
iter->block_offset = 0;
|
|
if (iter->block_position == position) {
|
|
return SPARKEY_SUCCESS;
|
|
}
|
|
if (log->header.compression_type == SPARKEY_COMPRESSION_NONE) {
|
|
iter->compression_buf = &log->data[position];
|
|
iter->block_position = position;
|
|
iter->next_block_position = log->header.data_end;
|
|
iter->block_len = log->data_len - position;
|
|
return SPARKEY_SUCCESS;
|
|
}
|
|
if (log->header.compression_type == SPARKEY_COMPRESSION_SNAPPY) {
|
|
uint64_t pos = position;
|
|
// TODO: assert that size_t >= uint64_t
|
|
size_t compressed_size = read_vlq(log->data, &pos);
|
|
uint64_t next_pos = pos + compressed_size;
|
|
const char *input = (char *) &log->data[pos];
|
|
|
|
size_t uncompressed_size = log->header.compression_block_size;
|
|
snappy_status status = snappy_uncompress(input, compressed_size, (char *) iter->compression_buf, &uncompressed_size);
|
|
switch (status) {
|
|
case SNAPPY_OK: break;
|
|
case SNAPPY_INVALID_INPUT:
|
|
return SPARKEY_INTERNAL_ERROR;
|
|
case SNAPPY_BUFFER_TOO_SMALL:
|
|
return SPARKEY_INTERNAL_ERROR;
|
|
default:
|
|
return SPARKEY_INTERNAL_ERROR;
|
|
}
|
|
iter->block_position = position;
|
|
iter->next_block_position = next_pos;
|
|
iter->block_len = uncompressed_size;
|
|
return SPARKEY_SUCCESS;
|
|
}
|
|
|
|
return SPARKEY_INTERNAL_ERROR;
|
|
}
|
|
|
|
sparkey_returncode sparkey_logiter_seek(sparkey_logiter *iter, sparkey_logreader *log, uint64_t position) {
|
|
RETHROW(assert_iter_open(iter, log));
|
|
if (position == log->header.data_end) {
|
|
iter->state = SPARKEY_ITER_CLOSED;
|
|
return SPARKEY_SUCCESS;
|
|
}
|
|
RETHROW(seekblock(iter, log, position));
|
|
iter->entry_count = -1;
|
|
iter->state = SPARKEY_ITER_NEW;
|
|
return SPARKEY_SUCCESS;
|
|
}
|
|
|
|
static sparkey_returncode ensure_available(sparkey_logiter *iter, sparkey_logreader *log) {
|
|
if (iter->block_offset < iter->block_len) {
|
|
return SPARKEY_SUCCESS;
|
|
}
|
|
|
|
if (iter->next_block_position >= log->header.data_end) {
|
|
iter->block_position = 0;
|
|
iter->block_offset = 0;
|
|
iter->block_len = 0;
|
|
return SPARKEY_SUCCESS;
|
|
}
|
|
RETHROW(seekblock(iter, log, iter->next_block_position));
|
|
iter->entry_count = -1;
|
|
|
|
return SPARKEY_SUCCESS;
|
|
}
|
|
|
|
static sparkey_returncode skip(sparkey_logiter *iter, sparkey_logreader *log, uint64_t len) {
|
|
while (len > 0) {
|
|
RETHROW(ensure_available(iter, log));
|
|
uint64_t m = min64(len, iter->block_len - iter->block_offset);
|
|
len -= m;
|
|
iter->block_offset += m;
|
|
}
|
|
return SPARKEY_SUCCESS;
|
|
}
|
|
|
|
sparkey_returncode sparkey_logiter_next(sparkey_logiter *iter, sparkey_logreader *log) {
|
|
if (iter->state == SPARKEY_ITER_CLOSED) {
|
|
return SPARKEY_SUCCESS;
|
|
}
|
|
uint64_t key_remaining = 0;
|
|
uint64_t value_remaining = 0;
|
|
if (iter->state == SPARKEY_ITER_ACTIVE) {
|
|
key_remaining = iter->key_remaining;
|
|
value_remaining = iter->value_remaining;
|
|
}
|
|
|
|
iter->state = SPARKEY_ITER_INVALID;
|
|
iter->key_remaining = 0;
|
|
iter->value_remaining = 0;
|
|
iter->keylen = 0;
|
|
iter->valuelen = 0;
|
|
|
|
RETHROW(assert_iter_open(iter, log));
|
|
RETHROW(skip(iter, log, key_remaining));
|
|
RETHROW(skip(iter, log, value_remaining));
|
|
|
|
RETHROW(ensure_available(iter, log));
|
|
if (iter->block_len - iter->block_offset == 0) {
|
|
// Reached end of data
|
|
iter->state = SPARKEY_ITER_CLOSED;
|
|
return SPARKEY_SUCCESS;
|
|
}
|
|
|
|
if (log->header.compression_type == SPARKEY_COMPRESSION_NONE) {
|
|
iter->block_position += iter->block_offset;
|
|
iter->block_len -= iter->block_offset;
|
|
iter->block_offset = 0;
|
|
iter->compression_buf = &log->data[iter->block_position];
|
|
iter->entry_count = -1;
|
|
}
|
|
|
|
iter->entry_count++;
|
|
|
|
uint64_t a = read_vlq(iter->compression_buf, &iter->block_offset);
|
|
uint64_t b = read_vlq(iter->compression_buf, &iter->block_offset);
|
|
if (a == 0) {
|
|
iter->keylen = iter->key_remaining = b;
|
|
iter->valuelen = iter->value_remaining = 0;
|
|
iter->type = SPARKEY_ENTRY_DELETE;
|
|
} else {
|
|
iter->keylen = iter->key_remaining = a - 1;
|
|
iter->valuelen = iter->value_remaining = b;
|
|
iter->type = SPARKEY_ENTRY_PUT;
|
|
}
|
|
|
|
iter->entry_block_position = iter->block_position;
|
|
iter->entry_block_offset = iter->block_offset;
|
|
|
|
iter->state = SPARKEY_ITER_ACTIVE;
|
|
|
|
return SPARKEY_SUCCESS;
|
|
}
|
|
|
|
sparkey_returncode sparkey_logiter_reset(sparkey_logiter *iter, sparkey_logreader *log) {
|
|
if (iter->state != SPARKEY_ITER_ACTIVE) {
|
|
return SPARKEY_LOG_ITERATOR_INACTIVE;
|
|
}
|
|
RETHROW(seekblock(iter, log, iter->entry_block_position));
|
|
|
|
iter->key_remaining = iter->keylen;
|
|
iter->value_remaining = iter->valuelen;
|
|
iter->block_offset = iter->entry_block_offset;
|
|
return SPARKEY_SUCCESS;
|
|
}
|
|
|
|
sparkey_returncode sparkey_logiter_skip(sparkey_logiter *iter, sparkey_logreader *log, int count) {
|
|
while (count > 0) {
|
|
count--;
|
|
RETHROW(sparkey_logiter_next(iter, log));
|
|
}
|
|
return SPARKEY_SUCCESS;
|
|
}
|
|
|
|
|
|
|
|
static sparkey_returncode sparkey_logiter_chunk(sparkey_logiter *iter, sparkey_logreader *log, uint64_t maxlen, uint64_t *len, uint8_t ** res, uint64_t *var) {
|
|
RETHROW(assert_iter_open(iter, log));
|
|
|
|
if (iter->state != SPARKEY_ITER_ACTIVE) {
|
|
return SPARKEY_LOG_ITERATOR_INACTIVE;
|
|
}
|
|
|
|
if (*var > 0) {
|
|
RETHROW(ensure_available(iter, log));
|
|
uint64_t m = min64(*var, iter->block_len - iter->block_offset);
|
|
m = min64(maxlen, m);
|
|
*len = m;
|
|
*res = &iter->compression_buf[iter->block_offset];
|
|
iter->block_offset += m;
|
|
*var -= m;
|
|
return SPARKEY_SUCCESS;
|
|
}
|
|
*len = 0;
|
|
return SPARKEY_SUCCESS;
|
|
}
|
|
|
|
sparkey_returncode sparkey_logiter_keychunk(sparkey_logiter *iter, sparkey_logreader *log, uint64_t maxlen, uint8_t ** res, uint64_t *len) {
|
|
return sparkey_logiter_chunk(iter, log, maxlen, len, res, &iter->key_remaining);
|
|
}
|
|
|
|
sparkey_returncode sparkey_logiter_valuechunk(sparkey_logiter *iter, sparkey_logreader *log, uint64_t maxlen, uint8_t ** res, uint64_t *len) {
|
|
RETHROW(skip(iter, log, iter->key_remaining));
|
|
iter->key_remaining = 0;
|
|
return sparkey_logiter_chunk(iter, log, maxlen, len, res, &iter->value_remaining);
|
|
}
|
|
|
|
sparkey_returncode sparkey_logiter_fill_key(sparkey_logiter *iter, sparkey_logreader *log, uint64_t maxlen, uint8_t *buf, uint64_t *len) {
|
|
*len = 0;
|
|
while (maxlen > 0) {
|
|
uint8_t *buf2;
|
|
uint64_t len2;
|
|
RETHROW(sparkey_logiter_keychunk(iter, log, maxlen, &buf2, &len2));
|
|
if (len2 == 0) {
|
|
return SPARKEY_SUCCESS;
|
|
}
|
|
memcpy(buf, buf2, len2);
|
|
buf += len2;
|
|
*len += len2;
|
|
maxlen -= len2;
|
|
}
|
|
return SPARKEY_SUCCESS;
|
|
}
|
|
|
|
sparkey_returncode sparkey_logiter_fill_value(sparkey_logiter *iter, sparkey_logreader *log, uint64_t maxlen, uint8_t *buf, uint64_t *len) {
|
|
*len = 0;
|
|
while (maxlen > 0) {
|
|
uint8_t *buf2;
|
|
uint64_t len2;
|
|
RETHROW(sparkey_logiter_valuechunk(iter, log, maxlen, &buf2, &len2));
|
|
if (len2 == 0) {
|
|
return SPARKEY_SUCCESS;
|
|
}
|
|
memcpy(buf, buf2, len2);
|
|
buf += len2;
|
|
*len += len2;
|
|
maxlen -= len2;
|
|
}
|
|
return SPARKEY_SUCCESS;
|
|
}
|
|
|
|
sparkey_returncode sparkey_logiter_keycmp(sparkey_logiter *iter1, sparkey_logiter *iter2, sparkey_logreader *log, int *res) {
|
|
uint8_t *first;
|
|
uint64_t first_len;
|
|
uint8_t *second;
|
|
uint64_t second_len;
|
|
|
|
RETHROW(sparkey_logiter_keychunk(iter1, log, 1 << 30, &first, &first_len));
|
|
RETHROW(sparkey_logiter_keychunk(iter2, log, 1 << 30, &second, &second_len));
|
|
|
|
while (1) {
|
|
if (first_len == 0 && second_len == 0) {
|
|
break;
|
|
}
|
|
if (first_len == 0) {
|
|
*res = -1;
|
|
return SPARKEY_SUCCESS;
|
|
}
|
|
if (second_len == 0) {
|
|
*res = 1;
|
|
return SPARKEY_SUCCESS;
|
|
}
|
|
|
|
uint64_t cmp_len = min64(first_len, second_len);
|
|
int v = memcmp(first, second, cmp_len);
|
|
if (v) {
|
|
*res = v;
|
|
return SPARKEY_SUCCESS;
|
|
}
|
|
first += cmp_len;
|
|
first_len -= cmp_len;
|
|
second += cmp_len;
|
|
second_len -= cmp_len;
|
|
|
|
if (first_len == 0) {
|
|
RETHROW(sparkey_logiter_keychunk(iter1, log, 1 << 30, &first, &first_len));
|
|
}
|
|
if (second_len == 0) {
|
|
RETHROW(sparkey_logiter_keychunk(iter2, log, 1 << 30, &second, &second_len));
|
|
}
|
|
}
|
|
*res = 0;
|
|
return SPARKEY_SUCCESS;
|
|
}
|
|
|
|
|
|
uint64_t sparkey_logreader_maxkeylen(sparkey_logreader *log) {
|
|
return log->header.max_key_len;
|
|
}
|
|
|
|
uint64_t sparkey_logreader_maxvaluelen(sparkey_logreader *log) {
|
|
return log->header.max_value_len;
|
|
}
|
|
|
|
int sparkey_logreader_get_compression_blocksize(sparkey_logreader *log) {
|
|
return log->header.compression_block_size;
|
|
}
|
|
|
|
sparkey_compression_type sparkey_logreader_get_compression_type(sparkey_logreader *log) {
|
|
return log->header.compression_type;
|
|
}
|
|
|
|
sparkey_iter_state sparkey_logiter_state(sparkey_logiter *iter) {
|
|
return iter->state;
|
|
}
|
|
|
|
sparkey_entry_type sparkey_logiter_type(sparkey_logiter *iter) {
|
|
return iter->type;
|
|
}
|
|
|
|
uint64_t sparkey_logiter_keylen(sparkey_logiter *iter) {
|
|
return iter->keylen;
|
|
}
|
|
|
|
uint64_t sparkey_logiter_valuelen(sparkey_logiter *iter) {
|
|
return iter->valuelen;
|
|
}
|
|
|