340 lines
10 KiB
C
340 lines
10 KiB
C
/*
|
|
* Copyright (c) 2012-2013 Spotify AB
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
|
|
* use this file except in compliance with the License. You may obtain a copy of
|
|
* the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
* License for the specific language governing permissions and limitations under
|
|
* the License.
|
|
*/
|
|
#include <stdio.h>
|
|
#include <fcntl.h>
|
|
#include <unistd.h>
|
|
#include <errno.h>
|
|
#include <stdlib.h>
|
|
#include <string.h>
|
|
|
|
#include <snappy-c.h>
|
|
|
|
#include "util.h"
|
|
#include "sparkey.h"
|
|
#include "logheader.h"
|
|
#include "endiantools.h"
|
|
#include "buf.h"
|
|
#include "sparkey-internal.h"
|
|
|
|
#define MAGIC_VALUE_LOGWRITER (0x2866211b)
|
|
|
|
|
|
static inline int write_vlq(uint8_t *buf, uint64_t value) {
|
|
int count = 1;
|
|
while (value >= 1 << 7) {
|
|
*buf = (value & 0x7f) | 0x80;
|
|
value >>= 7;
|
|
count++;
|
|
buf++;
|
|
}
|
|
*buf = value;
|
|
return count;
|
|
}
|
|
|
|
static sparkey_returncode assert_writer_open(sparkey_logwriter *log) {
|
|
if (log->open_status != MAGIC_VALUE_LOGWRITER) {
|
|
return SPARKEY_LOG_CLOSED;
|
|
}
|
|
return SPARKEY_SUCCESS;
|
|
}
|
|
|
|
sparkey_returncode sparkey_logwriter_create(sparkey_logwriter **log_ref, const char *filename, sparkey_compression_type compression_type, int compression_block_size) {
|
|
sparkey_returncode returncode;
|
|
int fd = 0;
|
|
sparkey_logwriter *l = malloc(sizeof(sparkey_logwriter));
|
|
if (l == NULL) {
|
|
TRY(SPARKEY_INTERNAL_ERROR, error);
|
|
}
|
|
switch (compression_type) {
|
|
case SPARKEY_COMPRESSION_NONE:
|
|
compression_block_size = 0;
|
|
l->compressed = NULL;
|
|
break;
|
|
case SPARKEY_COMPRESSION_SNAPPY:
|
|
if (compression_block_size < 10) {
|
|
TRY(SPARKEY_INVALID_COMPRESSION_BLOCK_SIZE, error);
|
|
}
|
|
l->max_compressed_size = snappy_max_compressed_length(compression_block_size);
|
|
l->compressed = malloc(l->max_compressed_size);
|
|
if (l->compressed == NULL) {
|
|
TRY(SPARKEY_INTERNAL_ERROR, error);
|
|
}
|
|
break;
|
|
default:
|
|
TRY(SPARKEY_INVALID_COMPRESSION_TYPE, error);
|
|
}
|
|
|
|
// Try removing it first, to avoid overwriting existing files that readers may be using.
|
|
if (remove(filename) < 0) {
|
|
int e = errno;
|
|
if (e != ENOENT) {
|
|
TRY(sparkey_remove_returncode(e), error);
|
|
}
|
|
}
|
|
|
|
fd = open(filename, O_WRONLY | O_TRUNC | O_CREAT, 00644);
|
|
if (fd == -1) {
|
|
TRY(sparkey_create_returncode(errno), error);
|
|
}
|
|
l->fd = fd;
|
|
|
|
l->header.compression_block_size = compression_block_size;
|
|
l->header.compression_type = compression_type;
|
|
|
|
TRY(rand32(&(l->header.file_identifier)), error);
|
|
l->header.data_end = LOG_HEADER_SIZE;
|
|
l->header.major_version = LOG_MAJOR_VERSION;
|
|
l->header.minor_version = LOG_MINOR_VERSION;
|
|
l->header.put_size = 0;
|
|
l->header.delete_size = 0;
|
|
l->header.num_puts = 0;
|
|
l->header.num_deletes = 0;
|
|
l->header.max_entries_per_block = 0;
|
|
l->header.max_key_len = 0;
|
|
l->header.max_value_len = 0;
|
|
|
|
TRY(write_logheader(fd, &l->header), error);
|
|
off_t pos = lseek(fd, 0, SEEK_CUR);
|
|
if (pos != LOG_HEADER_SIZE) {
|
|
TRY(SPARKEY_INTERNAL_ERROR, error);
|
|
}
|
|
|
|
TRY(buf_init(&l->file_buf, 1024*1024), error);
|
|
TRY(buf_init(&l->block_buf, compression_block_size), error);
|
|
|
|
l->entry_count = 0;
|
|
|
|
l->open_status = MAGIC_VALUE_LOGWRITER;
|
|
*log_ref = l;
|
|
return SPARKEY_SUCCESS;
|
|
error:
|
|
free(l);
|
|
if (fd > 0) close(fd);
|
|
return returncode;
|
|
}
|
|
|
|
sparkey_returncode sparkey_logwriter_append(sparkey_logwriter **log_ref, const char *filename) {
|
|
sparkey_returncode returncode;
|
|
int fd = 0;
|
|
sparkey_logwriter *log = malloc(sizeof(sparkey_logwriter));
|
|
if (log == NULL) {
|
|
TRY(SPARKEY_INTERNAL_ERROR, error);
|
|
}
|
|
TRY(sparkey_load_logheader(&log->header, filename), error);
|
|
|
|
if (log->header.major_version != LOG_MAJOR_VERSION) {
|
|
TRY(SPARKEY_WRONG_LOG_MAJOR_VERSION, error);
|
|
}
|
|
if (log->header.minor_version != LOG_MINOR_VERSION) {
|
|
TRY(SPARKEY_UNSUPPORTED_LOG_MINOR_VERSION, error);
|
|
}
|
|
|
|
switch (log->header.compression_type) {
|
|
case SPARKEY_COMPRESSION_NONE:
|
|
log->header.compression_block_size = 0;
|
|
log->compressed = NULL;
|
|
break;
|
|
case SPARKEY_COMPRESSION_SNAPPY:
|
|
if (log->header.compression_block_size < 10) {
|
|
TRY(SPARKEY_INVALID_COMPRESSION_BLOCK_SIZE, error);
|
|
}
|
|
log->max_compressed_size = snappy_max_compressed_length(log->header.compression_block_size);
|
|
log->compressed = malloc(log->max_compressed_size);
|
|
break;
|
|
default:
|
|
TRY(SPARKEY_INVALID_COMPRESSION_TYPE, error);
|
|
}
|
|
|
|
fd = open(filename, O_WRONLY, 00644);
|
|
if (fd == -1) {
|
|
int e = errno;
|
|
TRY(sparkey_create_returncode(e), error);
|
|
}
|
|
log->fd = fd;
|
|
|
|
lseek(fd, log->header.data_end, SEEK_SET);
|
|
|
|
TRY(buf_init(&log->file_buf, 1024*1024), error);
|
|
TRY(buf_init(&log->block_buf, log->header.compression_block_size), error);
|
|
|
|
log->entry_count = 0;
|
|
|
|
log->open_status = MAGIC_VALUE_LOGWRITER;
|
|
*log_ref = log;
|
|
return SPARKEY_SUCCESS;
|
|
error:
|
|
free(log);
|
|
if (fd > 0) close(fd);
|
|
return returncode;
|
|
}
|
|
|
|
static sparkey_returncode flush_snappy(sparkey_logwriter *log) {
|
|
log->flushed = 1;
|
|
if (log->entry_count > (int) log->header.max_entries_per_block) {
|
|
log->header.max_entries_per_block = log->entry_count;
|
|
}
|
|
log->entry_count = 0;
|
|
sparkey_buf *block_buf = &log->block_buf;
|
|
uint8_t *compressed = log->compressed;
|
|
uint32_t max_compressed_size = log->max_compressed_size;
|
|
sparkey_buf *file_buf = &log->file_buf;
|
|
int fd = log->fd;
|
|
|
|
size_t compressed_size = max_compressed_size;
|
|
snappy_status status = snappy_compress((char *) block_buf->start, buf_used(block_buf), (char *) compressed, &compressed_size);
|
|
switch (status) {
|
|
case SNAPPY_OK: break;
|
|
case SNAPPY_INVALID_INPUT:
|
|
case SNAPPY_BUFFER_TOO_SMALL:
|
|
default:
|
|
return SPARKEY_INTERNAL_ERROR;
|
|
}
|
|
uint8_t buf1[10];
|
|
ptrdiff_t written1 = write_vlq(buf1, compressed_size);
|
|
RETHROW(buf_add(file_buf, fd, buf1, written1));
|
|
RETHROW(buf_add(file_buf, fd, compressed, compressed_size));
|
|
block_buf->cur = block_buf->start;
|
|
return SPARKEY_SUCCESS;
|
|
}
|
|
|
|
|
|
sparkey_returncode sparkey_logwriter_flush(sparkey_logwriter *log) {
|
|
RETHROW(assert_writer_open(log));
|
|
if (buf_used(&log->block_buf) > 0) {
|
|
RETHROW(flush_snappy(log));
|
|
}
|
|
if (buf_used(&log->file_buf) > 0) {
|
|
RETHROW(buf_flushfile(&log->file_buf, log->fd));
|
|
}
|
|
off_t pos = lseek(log->fd, 0, SEEK_CUR);
|
|
log->header.data_end = pos;
|
|
lseek(log->fd, 0, SEEK_SET);
|
|
RETHROW(write_logheader(log->fd, &log->header));
|
|
lseek(log->fd, pos, SEEK_SET);
|
|
|
|
/* Can't build fsync support on lenny */
|
|
/* fsync(log->fd); */
|
|
return SPARKEY_SUCCESS;
|
|
}
|
|
|
|
sparkey_returncode sparkey_logwriter_close(sparkey_logwriter **log) {
|
|
sparkey_logwriter *l = *log;
|
|
if (l->open_status != MAGIC_VALUE_LOGWRITER) {
|
|
return SPARKEY_SUCCESS;
|
|
}
|
|
|
|
RETHROW(sparkey_logwriter_flush(l));
|
|
close(l->fd);
|
|
buf_close(&l->file_buf);
|
|
buf_close(&l->block_buf);
|
|
if (l->compressed != NULL) {
|
|
free(l->compressed);
|
|
}
|
|
|
|
l->open_status = 0;
|
|
free(l);
|
|
*log = NULL;
|
|
return SPARKEY_SUCCESS;
|
|
}
|
|
|
|
static sparkey_returncode snappy_add(sparkey_logwriter *log, const uint8_t *data, ptrdiff_t len) {
|
|
sparkey_buf *block_buf = &log->block_buf;
|
|
|
|
while (1) {
|
|
ptrdiff_t remaining = buf_remaining(block_buf);
|
|
if (remaining >= len) {
|
|
memcpy(block_buf->cur, data, len);
|
|
block_buf->cur += len;
|
|
return SPARKEY_SUCCESS;
|
|
} else {
|
|
memcpy(block_buf->cur, data, remaining);
|
|
block_buf->cur += remaining;
|
|
data += remaining;
|
|
len -= remaining;
|
|
RETHROW(flush_snappy(log));
|
|
}
|
|
}
|
|
return SPARKEY_SUCCESS;
|
|
}
|
|
|
|
|
|
static sparkey_returncode log_add(sparkey_logwriter *log, uint64_t num1, uint64_t num2, uint64_t len1, const uint8_t *data1, uint64_t len2, const uint8_t *data2, ptrdiff_t *datasize) {
|
|
uint8_t buf1[10];
|
|
uint8_t buf2[10];
|
|
|
|
uint64_t written1 = write_vlq(buf1, num1);
|
|
uint64_t written2 = write_vlq(buf2, num2);
|
|
|
|
*datasize = written1 + written2 + len1 + len2;
|
|
uint64_t remaining;
|
|
switch (log->header.compression_type) {
|
|
case SPARKEY_COMPRESSION_NONE:
|
|
RETHROW(buf_add(&log->file_buf, log->fd, buf1, written1));
|
|
RETHROW(buf_add(&log->file_buf, log->fd, buf2, written2));
|
|
RETHROW(buf_add(&log->file_buf, log->fd, data1, len1));
|
|
RETHROW(buf_add(&log->file_buf, log->fd, data2, len2));
|
|
break;
|
|
case SPARKEY_COMPRESSION_SNAPPY:
|
|
remaining = buf_remaining(&log->block_buf);
|
|
// todo: make it smarter by checking if it's better to flush directly
|
|
uint64_t fits_in_one = written1 + written2 + len1 + len2 <= buf_size(&log->block_buf);
|
|
uint64_t doesnt_fit_this = written1 + written2 + len1 + len2 > buf_remaining(&log->block_buf);
|
|
if ((remaining < written1 + written2) || (fits_in_one && doesnt_fit_this)) {
|
|
RETHROW(flush_snappy(log));
|
|
}
|
|
log->entry_count++;
|
|
log->flushed = 0;
|
|
RETHROW(snappy_add(log, buf1, written1));
|
|
RETHROW(snappy_add(log, buf2, written2));
|
|
RETHROW(snappy_add(log, data1, len1));
|
|
RETHROW(snappy_add(log, data2, len2));
|
|
if (log->flushed && buf_used(&log->block_buf) > 0) {
|
|
RETHROW(flush_snappy(log));
|
|
}
|
|
break;
|
|
default:
|
|
return SPARKEY_INTERNAL_ERROR;
|
|
}
|
|
return SPARKEY_SUCCESS;
|
|
}
|
|
|
|
sparkey_returncode sparkey_logwriter_put(sparkey_logwriter *log, uint64_t keylen, const uint8_t *key, uint64_t valuelen, const uint8_t *value) {
|
|
RETHROW(assert_writer_open(log));
|
|
ptrdiff_t datasize;
|
|
RETHROW(log_add(log, keylen + 1, valuelen, keylen, key, valuelen, value, &datasize));
|
|
|
|
log->header.num_puts++;
|
|
log->header.put_size += datasize;
|
|
if (keylen > log->header.max_key_len) {
|
|
log->header.max_key_len = keylen;
|
|
}
|
|
if (valuelen > log->header.max_value_len) {
|
|
log->header.max_value_len = valuelen;
|
|
}
|
|
return SPARKEY_SUCCESS;
|
|
}
|
|
|
|
sparkey_returncode sparkey_logwriter_delete(sparkey_logwriter *log, uint64_t keylen, const uint8_t *key) {
|
|
RETHROW(assert_writer_open(log));
|
|
ptrdiff_t datasize;
|
|
RETHROW(log_add(log, 0, keylen, 0, NULL, keylen, key, &datasize));
|
|
|
|
log->header.num_deletes++;
|
|
log->header.delete_size += datasize;
|
|
return SPARKEY_SUCCESS;
|
|
}
|
|
|