From fbef0a15fe7c91bee424b5d74ee872899c65a754 Mon Sep 17 00:00:00 2001 From: Al Date: Thu, 9 Jul 2015 15:26:11 -0400 Subject: [PATCH] [geodb] Adding sparkey dependency --- src/sparkey/LICENSE | 201 ++++++++++ src/sparkey/MurmurHash3.c | 349 +++++++++++++++++ src/sparkey/MurmurHash3.h | 35 ++ src/sparkey/buf.c | 77 ++++ src/sparkey/buf.h | 48 +++ src/sparkey/endiantools.c | 141 +++++++ src/sparkey/endiantools.h | 79 ++++ src/sparkey/hashalgorithms.c | 50 +++ src/sparkey/hashalgorithms.h | 29 ++ src/sparkey/hashheader.c | 136 +++++++ src/sparkey/hashheader.h | 99 +++++ src/sparkey/hashiter.c | 46 +++ src/sparkey/hashiter.h | 26 ++ src/sparkey/hashreader.c | 255 +++++++++++++ src/sparkey/hashwriter.c | 521 +++++++++++++++++++++++++ src/sparkey/logheader.c | 124 ++++++ src/sparkey/logheader.h | 68 ++++ src/sparkey/logreader.c | 511 +++++++++++++++++++++++++ src/sparkey/logwriter.c | 339 +++++++++++++++++ src/sparkey/returncodes.c | 60 +++ src/sparkey/sparkey-internal.h | 90 +++++ src/sparkey/sparkey.h | 673 +++++++++++++++++++++++++++++++++ src/sparkey/util.c | 92 +++++ src/sparkey/util.h | 85 +++++ 24 files changed, 4134 insertions(+) create mode 100644 src/sparkey/LICENSE create mode 100644 src/sparkey/MurmurHash3.c create mode 100644 src/sparkey/MurmurHash3.h create mode 100644 src/sparkey/buf.c create mode 100644 src/sparkey/buf.h create mode 100644 src/sparkey/endiantools.c create mode 100644 src/sparkey/endiantools.h create mode 100644 src/sparkey/hashalgorithms.c create mode 100644 src/sparkey/hashalgorithms.h create mode 100644 src/sparkey/hashheader.c create mode 100644 src/sparkey/hashheader.h create mode 100644 src/sparkey/hashiter.c create mode 100644 src/sparkey/hashiter.h create mode 100644 src/sparkey/hashreader.c create mode 100644 src/sparkey/hashwriter.c create mode 100644 src/sparkey/logheader.c create mode 100644 src/sparkey/logheader.h create mode 100644 src/sparkey/logreader.c create mode 100644 src/sparkey/logwriter.c create mode 100644 src/sparkey/returncodes.c create mode 100644 src/sparkey/sparkey-internal.h create mode 100644 src/sparkey/sparkey.h create mode 100644 src/sparkey/util.c create mode 100644 src/sparkey/util.h diff --git a/src/sparkey/LICENSE b/src/sparkey/LICENSE new file mode 100644 index 00000000..4e39dd77 --- /dev/null +++ b/src/sparkey/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2012-2013 Spotify AB + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/src/sparkey/MurmurHash3.c b/src/sparkey/MurmurHash3.c new file mode 100644 index 00000000..6f6b2f50 --- /dev/null +++ b/src/sparkey/MurmurHash3.c @@ -0,0 +1,349 @@ +//----------------------------------------------------------------------------- +// MurmurHash3 was written by Austin Appleby, and is placed in the public +// domain. The author hereby disclaims copyright to this source code. + +// Note - The x86 and x64 versions do _not_ produce the same results, as the +// algorithms are optimized for their respective platforms. You can still +// compile and run any of them on any platform, but your performance with the +// non-native version will be less than optimal. + +#include "MurmurHash3.h" +#include "endiantools.h" + +//----------------------------------------------------------------------------- +// Platform-specific functions and macros + +// Microsoft Visual Studio + +#if defined(_MSC_VER) + +#define FORCE_INLINE __forceinline + +#include + +#define ROTL32(x,y) _rotl(x,y) +#define ROTL64(x,y) _rotl64(x,y) + +#define BIG_CONSTANT(x) (x) + +// Other compilers + +#else // defined(_MSC_VER) + +#define FORCE_INLINE __attribute__((always_inline)) inline + +static inline uint32_t rotl32 ( uint32_t x, int8_t r ) +{ + return (x << r) | (x >> (32 - r)); +} + +static inline uint64_t rotl64 ( uint64_t x, int8_t r ) +{ + return (x << r) | (x >> (64 - r)); +} + +#define ROTL32(x,y) rotl32(x,y) +#define ROTL64(x,y) rotl64(x,y) + +#define BIG_CONSTANT(x) (x##LLU) + +#endif // !defined(_MSC_VER) + +//----------------------------------------------------------------------------- +// Block read - if your platform needs to do endian-swapping or can only +// handle aligned reads, do the conversion here + +FORCE_INLINE uint32_t getblock32 ( const uint32_t * p, int i ) +{ + return read_little_endian32((uint8_t *) p, 4*i); +} + +FORCE_INLINE uint64_t getblock64 ( const uint64_t * p, int i ) +{ + return read_little_endian64((uint8_t *) p, 8*i); +} + +//----------------------------------------------------------------------------- +// Finalization mix - force all bits of a hash block to avalanche + +FORCE_INLINE uint32_t fmix32 ( uint32_t h ) +{ + h ^= h >> 16; + h *= 0x85ebca6b; + h ^= h >> 13; + h *= 0xc2b2ae35; + h ^= h >> 16; + + return h; +} + +//---------- + +FORCE_INLINE uint64_t fmix64 ( uint64_t k ) +{ + k ^= k >> 33; + k *= BIG_CONSTANT(0xff51afd7ed558ccd); + k ^= k >> 33; + k *= BIG_CONSTANT(0xc4ceb9fe1a85ec53); + k ^= k >> 33; + + return k; +} + +//----------------------------------------------------------------------------- + +void MurmurHash3_x86_32 ( const void * key, int len, + uint32_t seed, void * out ) +{ + const uint8_t * data = (const uint8_t*)key; + const int nblocks = len / 4; + + uint32_t h1 = seed; + + const uint32_t c1 = 0xcc9e2d51; + const uint32_t c2 = 0x1b873593; + + //---------- + // body + + const uint32_t * blocks = (const uint32_t *)(data + nblocks*4); + + for(int i = -nblocks; i; i++) + { + uint32_t k1 = getblock32(blocks,i); + + k1 *= c1; + k1 = ROTL32(k1,15); + k1 *= c2; + + h1 ^= k1; + h1 = ROTL32(h1,13); + h1 = h1*5+0xe6546b64; + } + + //---------- + // tail + + const uint8_t * tail = (const uint8_t*)(data + nblocks*4); + + uint32_t k1 = 0; + + switch(len & 3) + { + case 3: k1 ^= tail[2] << 16; + case 2: k1 ^= tail[1] << 8; + case 1: k1 ^= tail[0]; + k1 *= c1; k1 = ROTL32(k1,15); k1 *= c2; h1 ^= k1; + }; + + //---------- + // finalization + + h1 ^= len; + + h1 = fmix32(h1); + + *(uint32_t*)out = h1; +} + +//----------------------------------------------------------------------------- + +void MurmurHash3_x86_128 ( const void * key, const int len, + uint32_t seed, void * out ) +{ + const uint8_t * data = (const uint8_t*)key; + const int nblocks = len / 16; + + uint32_t h1 = seed; + uint32_t h2 = seed; + uint32_t h3 = seed; + uint32_t h4 = seed; + + const uint32_t c1 = 0x239b961b; + const uint32_t c2 = 0xab0e9789; + const uint32_t c3 = 0x38b34ae5; + const uint32_t c4 = 0xa1e38b93; + + //---------- + // body + + const uint32_t * blocks = (const uint32_t *)(data + nblocks*16); + + for(int i = -nblocks; i; i++) + { + uint32_t k1 = getblock32(blocks,i*4+0); + uint32_t k2 = getblock32(blocks,i*4+1); + uint32_t k3 = getblock32(blocks,i*4+2); + uint32_t k4 = getblock32(blocks,i*4+3); + + k1 *= c1; k1 = ROTL32(k1,15); k1 *= c2; h1 ^= k1; + + h1 = ROTL32(h1,19); h1 += h2; h1 = h1*5+0x561ccd1b; + + k2 *= c2; k2 = ROTL32(k2,16); k2 *= c3; h2 ^= k2; + + h2 = ROTL32(h2,17); h2 += h3; h2 = h2*5+0x0bcaa747; + + k3 *= c3; k3 = ROTL32(k3,17); k3 *= c4; h3 ^= k3; + + h3 = ROTL32(h3,15); h3 += h4; h3 = h3*5+0x96cd1c35; + + k4 *= c4; k4 = ROTL32(k4,18); k4 *= c1; h4 ^= k4; + + h4 = ROTL32(h4,13); h4 += h1; h4 = h4*5+0x32ac3b17; + } + + //---------- + // tail + + const uint8_t * tail = (const uint8_t*)(data + nblocks*16); + + uint32_t k1 = 0; + uint32_t k2 = 0; + uint32_t k3 = 0; + uint32_t k4 = 0; + + switch(len & 15) + { + case 15: k4 ^= tail[14] << 16; + case 14: k4 ^= tail[13] << 8; + case 13: k4 ^= tail[12] << 0; + k4 *= c4; k4 = ROTL32(k4,18); k4 *= c1; h4 ^= k4; + + case 12: k3 ^= tail[11] << 24; + case 11: k3 ^= tail[10] << 16; + case 10: k3 ^= tail[ 9] << 8; + case 9: k3 ^= tail[ 8] << 0; + k3 *= c3; k3 = ROTL32(k3,17); k3 *= c4; h3 ^= k3; + + case 8: k2 ^= tail[ 7] << 24; + case 7: k2 ^= tail[ 6] << 16; + case 6: k2 ^= tail[ 5] << 8; + case 5: k2 ^= tail[ 4] << 0; + k2 *= c2; k2 = ROTL32(k2,16); k2 *= c3; h2 ^= k2; + + case 4: k1 ^= tail[ 3] << 24; + case 3: k1 ^= tail[ 2] << 16; + case 2: k1 ^= tail[ 1] << 8; + case 1: k1 ^= tail[ 0] << 0; + k1 *= c1; k1 = ROTL32(k1,15); k1 *= c2; h1 ^= k1; + }; + + //---------- + // finalization + + h1 ^= len; h2 ^= len; h3 ^= len; h4 ^= len; + + h1 += h2; h1 += h3; h1 += h4; + h2 += h1; h3 += h1; h4 += h1; + + h1 = fmix32(h1); + h2 = fmix32(h2); + h3 = fmix32(h3); + h4 = fmix32(h4); + + h1 += h2; h1 += h3; h1 += h4; + h2 += h1; h3 += h1; h4 += h1; + + ((uint32_t*)out)[0] = h1; + ((uint32_t*)out)[1] = h2; + ((uint32_t*)out)[2] = h3; + ((uint32_t*)out)[3] = h4; +} + +//----------------------------------------------------------------------------- + +void MurmurHash3_x64_128 ( const void * key, const int len, + const uint32_t seed, void * out ) +{ + const uint8_t * data = (const uint8_t*)key; + const int nblocks = len / 16; + + uint64_t h1 = seed; + uint64_t h2 = seed; + + const uint64_t c1 = BIG_CONSTANT(0x87c37b91114253d5); + const uint64_t c2 = BIG_CONSTANT(0x4cf5ad432745937f); + + //---------- + // body + + const uint64_t * blocks = (const uint64_t *)(data); + + for(int i = 0; i < nblocks; i++) + { + uint64_t k1 = getblock64(blocks,i*2+0); + uint64_t k2 = getblock64(blocks,i*2+1); + + k1 *= c1; k1 = ROTL64(k1,31); k1 *= c2; h1 ^= k1; + + h1 = ROTL64(h1,27); h1 += h2; h1 = h1*5+0x52dce729; + + k2 *= c2; k2 = ROTL64(k2,33); k2 *= c1; h2 ^= k2; + + h2 = ROTL64(h2,31); h2 += h1; h2 = h2*5+0x38495ab5; + } + + //---------- + // tail + + const uint8_t * tail = (const uint8_t*)(data + nblocks*16); + + uint64_t k1 = 0; + uint64_t k2 = 0; + + switch(len & 15) + { + case 15: k2 ^= ((uint64_t)tail[14]) << 48; + case 14: k2 ^= ((uint64_t)tail[13]) << 40; + case 13: k2 ^= ((uint64_t)tail[12]) << 32; + case 12: k2 ^= ((uint64_t)tail[11]) << 24; + case 11: k2 ^= ((uint64_t)tail[10]) << 16; + case 10: k2 ^= ((uint64_t)tail[ 9]) << 8; + case 9: k2 ^= ((uint64_t)tail[ 8]) << 0; + k2 *= c2; k2 = ROTL64(k2,33); k2 *= c1; h2 ^= k2; + + case 8: k1 ^= ((uint64_t)tail[ 7]) << 56; + case 7: k1 ^= ((uint64_t)tail[ 6]) << 48; + case 6: k1 ^= ((uint64_t)tail[ 5]) << 40; + case 5: k1 ^= ((uint64_t)tail[ 4]) << 32; + case 4: k1 ^= ((uint64_t)tail[ 3]) << 24; + case 3: k1 ^= ((uint64_t)tail[ 2]) << 16; + case 2: k1 ^= ((uint64_t)tail[ 1]) << 8; + case 1: k1 ^= ((uint64_t)tail[ 0]) << 0; + k1 *= c1; k1 = ROTL64(k1,31); k1 *= c2; h1 ^= k1; + }; + + //---------- + // finalization + + h1 ^= len; h2 ^= len; + + h1 += h2; + h2 += h1; + + h1 = fmix64(h1); + h2 = fmix64(h2); + + h1 += h2; + h2 += h1; + + ((uint64_t*)out)[0] = h1; + ((uint64_t*)out)[1] = h2; +} + +//----------------------------------------------------------------------------- + +uint64_t murmurhash32_hash(const uint8_t *buf, uint64_t len, uint32_t seed) { + uint32_t res; + MurmurHash3_x86_32(buf, len, seed, &res); + return res; +} + +uint64_t murmurhash64_hash(const uint8_t *buf, uint64_t len, uint32_t seed) { + uint64_t res[2]; + MurmurHash3_x64_128(buf, len, seed, res); + return res[0]; +} + + diff --git a/src/sparkey/MurmurHash3.h b/src/sparkey/MurmurHash3.h new file mode 100644 index 00000000..b395ee18 --- /dev/null +++ b/src/sparkey/MurmurHash3.h @@ -0,0 +1,35 @@ +//----------------------------------------------------------------------------- +// MurmurHash3 was written by Austin Appleby, and is placed in the public +// domain. The author hereby disclaims copyright to this source code. + +#ifndef _MURMURHASH3_H_ +#define _MURMURHASH3_H_ + +//----------------------------------------------------------------------------- +// Platform-specific functions and macros + +// Microsoft Visual Studio + +#if defined(_MSC_VER) + +typedef unsigned char uint8_t; +typedef unsigned long uint32_t; +typedef unsigned __int64 uint64_t; + +// Other compilers + +#else // defined(_MSC_VER) + +#include + +#endif // !defined(_MSC_VER) + +//----------------------------------------------------------------------------- + +uint64_t murmurhash32_hash(const uint8_t *buf, uint64_t len, uint32_t seed); + +uint64_t murmurhash64_hash(const uint8_t *buf, uint64_t len, uint32_t seed); + +//----------------------------------------------------------------------------- + +#endif // _MURMURHASH3_H_ diff --git a/src/sparkey/buf.c b/src/sparkey/buf.c new file mode 100644 index 00000000..33044f95 --- /dev/null +++ b/src/sparkey/buf.c @@ -0,0 +1,77 @@ +/* +* Copyright (c) 2012-2013 Spotify AB +* +* Licensed under the Apache License, Version 2.0 (the "License"); you may not +* use this file except in compliance with the License. You may obtain a copy of +* the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +* License for the specific language governing permissions and limitations under +* the License. +*/ +#include +#include +#include +#include +#include +#include "util.h" +#include "endiantools.h" +#include "buf.h" + +sparkey_returncode buf_init(sparkey_buf *buf, ptrdiff_t size) { + buf->start = malloc(size); + if (buf->start == NULL) { + return SPARKEY_INTERNAL_ERROR; + } + buf->cur = buf->start; + buf->end = buf->start + size; + return SPARKEY_SUCCESS; +} + +void buf_close(sparkey_buf *buf) { + free(buf->start); + buf->start = NULL; + buf->cur = NULL; + buf->end = NULL; +} + +uint64_t buf_size(sparkey_buf *buf) { + return buf->end - buf->start; +} + +uint64_t buf_remaining(sparkey_buf *buf) { + return buf->end - buf->cur; +} + +uint64_t buf_used(sparkey_buf *buf) { + return buf->cur - buf->start; +} + +sparkey_returncode buf_flushfile(sparkey_buf *buf, int fd) { + RETHROW(write_full(fd, buf->start, buf_used(buf))); + buf->cur = buf->start; + return SPARKEY_SUCCESS; +} + +sparkey_returncode buf_add(sparkey_buf *buf, int fd, const uint8_t *data, ptrdiff_t len) { + while (1) { + ptrdiff_t remaining = buf_remaining(buf); + if (remaining >= len) { + memcpy(buf->cur, data, len); + buf->cur += len; + return SPARKEY_SUCCESS; + } else { + memcpy(buf->cur, data, remaining); + buf->cur += remaining; + data += remaining; + len -= remaining; + RETHROW(buf_flushfile(buf, fd)); + } + } + return SPARKEY_SUCCESS; +} + diff --git a/src/sparkey/buf.h b/src/sparkey/buf.h new file mode 100644 index 00000000..02473f57 --- /dev/null +++ b/src/sparkey/buf.h @@ -0,0 +1,48 @@ +/* +* Copyright (c) 2012-2013 Spotify AB +* +* Licensed under the Apache License, Version 2.0 (the "License"); you may not +* use this file except in compliance with the License. You may obtain a copy of +* the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +* License for the specific language governing permissions and limitations under +* the License. +*/ +#ifndef BUF_H_INCLUDED +#define BUF_H_INCLUDED + +#include +#include +#include +#include +#include +#include "util.h" +#include "endiantools.h" + +typedef struct { + uint8_t *start; + uint8_t *cur; + uint8_t *end; +} sparkey_buf; + +sparkey_returncode buf_init(sparkey_buf *buf, ptrdiff_t size); + +void buf_close(sparkey_buf *buf); + +uint64_t buf_size(sparkey_buf *buf); + +uint64_t buf_remaining(sparkey_buf *buf); + +uint64_t buf_used(sparkey_buf *buf); + +sparkey_returncode buf_flushfile(sparkey_buf *buf, int fd); + +sparkey_returncode buf_add(sparkey_buf *buf, int fd, const uint8_t *data, ptrdiff_t len); + +#endif + diff --git a/src/sparkey/endiantools.c b/src/sparkey/endiantools.c new file mode 100644 index 00000000..17eee630 --- /dev/null +++ b/src/sparkey/endiantools.c @@ -0,0 +1,141 @@ +/* +* Copyright (c) 2012-2013 Spotify AB +* +* Licensed under the Apache License, Version 2.0 (the "License"); you may not +* use this file except in compliance with the License. You may obtain a copy of +* the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +* License for the specific language governing permissions and limitations under +* the License. +*/ +#if defined(__linux) +#include +#elif defined(__APPLE__) +#include +#define bswap_32 OSSwapInt32 +#define bswap_64 OSSwapInt64 +#else +#error "no byteswap.h or libkern/OSByteOrder.h" +#endif + +#include +#include +#include +#include + +#include "util.h" +#include "endiantools.h" +#include "sparkey.h" + +static sparkey_returncode _write_full(int fd, uint8_t *buf, size_t count) { + while (count > 0) { + ssize_t actual = write(fd, buf, count); + if (actual < 0) { + switch (errno) { + case EINTR: + case EAGAIN: continue; + case ENOSPC: return SPARKEY_OUT_OF_DISK; + case EFBIG: return SPARKEY_FILE_SIZE_EXCEEDED; + case EBADF: return SPARKEY_FILE_CLOSED; + default: + fprintf(stderr, "_write_full():%d bug: actual_written = %"PRIu64", wanted = %"PRIu64", errno = %d\n", __LINE__, (uint64_t)actual, (uint64_t)count, errno); + return SPARKEY_INTERNAL_ERROR; + } + } + count -= actual; + buf += actual; + } + return SPARKEY_SUCCESS; +} + +sparkey_returncode write_full(int fd, uint8_t *buf, size_t count) { + const size_t block_size = 256*1024*1024; + size_t fullruns = count / block_size; + while (fullruns > 0) { + RETHROW(_write_full(fd, buf, block_size)); + buf += block_size; + fullruns--; + } + return _write_full(fd, buf, count % block_size); +} + +void write_little_endian32(uint8_t *buf, uint32_t value) { + buf[0] = (value >> 0) & 0xFF; + buf[1] = (value >> 8) & 0xFF; + buf[2] = (value >> 16) & 0xFF; + buf[3] = (value >> 24) & 0xFF; +} + +sparkey_returncode fwrite_little_endian32(int fd, uint32_t value) { + uint8_t buf[4]; + write_little_endian32(buf, value); + return write_full(fd, buf, 4); +} + +void write_little_endian64(uint8_t *buf, uint64_t value) { + buf[0] = (value >> 0) & 0xFF; + buf[1] = (value >> 8) & 0xFF; + buf[2] = (value >> 16) & 0xFF; + buf[3] = (value >> 24) & 0xFF; + buf[4] = (value >> 32) & 0xFF; + buf[5] = (value >> 40) & 0xFF; + buf[6] = (value >> 48) & 0xFF; + buf[7] = (value >> 56) & 0xFF; +} + +sparkey_returncode fwrite_little_endian64(int fd, uint64_t value) { + uint8_t buf[8]; + write_little_endian64(buf, value); + return write_full(fd, buf, 8); +} + +#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__ || defined(__LITTLE_ENDIAN) || defined(__LITTLE_ENDIAN__) +uint32_t read_little_endian32(const uint8_t * array, uint64_t pos) { + return *((uint32_t*)(array + pos)); +} + +uint64_t read_little_endian64(const uint8_t * array, uint64_t pos) { + return *((uint64_t*)(array + pos)); +} +#elif __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__ || defined(__BIG_ENDIAN) || defined(__BIG_ENDIAN__) +uint32_t read_little_endian32(const uint8_t * array, uint64_t pos) { + return bswap_32(*((uint32_t*)(array + pos))); +} + +uint64_t read_little_endian64(const uint8_t * array, uint64_t pos) { + return bswap_64(*((uint64_t*)(array + pos))); +} +#else +#error "none of __LITTLE_ENDIAN, __LITTLE_ENDIAN__, __BIG_ENDIAN, __BIG_ENDIAN__ is defined" +#endif + + +sparkey_returncode correct_endian_platform() { + return SPARKEY_SUCCESS; +} + +sparkey_returncode fread_little_endian32(FILE *fp, uint32_t *res) { + uint8_t data[4]; + int count = fread(data, 4, 1, fp); + if (count < 1) { + return SPARKEY_UNEXPECTED_EOF; + } + *res = read_little_endian32(data, 0); + return SPARKEY_SUCCESS; +} + +sparkey_returncode fread_little_endian64(FILE *fp, uint64_t *res) { + uint8_t data[8]; + int count = fread(data, 8, 1, fp); + if (count < 1) { + return SPARKEY_UNEXPECTED_EOF; + } + *res = read_little_endian64(data, 0); + return SPARKEY_SUCCESS; +} + diff --git a/src/sparkey/endiantools.h b/src/sparkey/endiantools.h new file mode 100644 index 00000000..ea2df7da --- /dev/null +++ b/src/sparkey/endiantools.h @@ -0,0 +1,79 @@ +/* +* Copyright (c) 2012-2013 Spotify AB +* +* Licensed under the Apache License, Version 2.0 (the "License"); you may not +* use this file except in compliance with the License. You may obtain a copy of +* the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +* License for the specific language governing permissions and limitations under +* the License. +*/ +#ifndef ENDIANTOOLS_H_INCLUDED +#define ENDIANTOOLS_H_INCLUDED + +#include +#include +#include + +#include "sparkey.h" + +typedef union { + uint32_t i; + uint8_t c[4]; +} endian_union; + +/** + * Writes count bytes of buf to a file with file descriptor fd + * @param fd file descriptor of a file to write to. + * @param buf bytes to write to file. + * Must point to a block of memory at least count long. + * @param count number of bytes to write. + * @returns SPARKEY_SUCCESS if all goes well, otherwise a sparkey error code. + */ +sparkey_returncode write_full(int fd, uint8_t *buf, size_t count); + +/** + * Write a 32 bit value to buf in little endian. + * @param buf buf to write to. Must be at least 4 bytes long. + * @param value the value to write. + */ +void write_little_endian32(uint8_t *buf, uint32_t value); + +/** + * Write a 32 bit value to file in little endian. + * @param fd file descriptor of file open for write. + * @param value the value to write. + * @returns SPARKEY_SUCCESS if all goes well, a sparkey error if the + * writing to file fails. + */ +sparkey_returncode fwrite_little_endian32(int fd, uint32_t value); + +/** + * Write a 64 bit value to buf in little endian. + * @param buf buf to write to. Must be at least 8 bytes long. + * @param value the value to write. + */ +void write_little_endian64(uint8_t *buf, uint64_t value); + +/** + * Write a 64 bit value to file in little endian. + * @param fd file descriptor of file open for write. + * @param value the value to write. + * @returns SPARKEY_SUCCESS if all goes well, a sparkey error if the + * writing to file fails. + */ +sparkey_returncode fwrite_little_endian64(int fd, uint64_t value); +uint32_t read_little_endian32(const uint8_t * array, uint64_t pos); +uint64_t read_little_endian64(const uint8_t * array, uint64_t pos); +sparkey_returncode correct_endian_platform(); + +sparkey_returncode fread_little_endian32(FILE *fp, uint32_t *res); +sparkey_returncode fread_little_endian64(FILE *fp, uint64_t *res); + +#endif /* ENDIAN_H_INCLUDED */ + diff --git a/src/sparkey/hashalgorithms.c b/src/sparkey/hashalgorithms.c new file mode 100644 index 00000000..fab4a563 --- /dev/null +++ b/src/sparkey/hashalgorithms.c @@ -0,0 +1,50 @@ +/* +* Copyright (c) 2012-2013 Spotify AB +* +* Licensed under the Apache License, Version 2.0 (the "License"); you may not +* use this file except in compliance with the License. You may obtain a copy of +* the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +* License for the specific language governing permissions and limitations under +* the License. +*/ +#include "hashalgorithms.h" +#include "MurmurHash3.h" + +static uint64_t _read_little_endian32(const uint8_t *data, uint64_t pos) { + return read_little_endian32(data, pos); +} + +static void _write_little_endian32(uint8_t *data, uint64_t hash) { + write_little_endian32(data, hash); +} + +static sparkey_hash_algorithm murmurhash32 = { + &murmurhash32_hash, + &_read_little_endian32, + &_write_little_endian32 +}; + +static sparkey_hash_algorithm murmurhash64 = { + &murmurhash64_hash, + &read_little_endian64, + &write_little_endian64 +}; + +static sparkey_hash_algorithm invalid = { + NULL, NULL, NULL +}; + +sparkey_hash_algorithm sparkey_get_hash_algorithm(uint32_t hash_size) { + switch (hash_size) { + case 4: return murmurhash32; + case 8: return murmurhash64; + default: return invalid; + } +} + diff --git a/src/sparkey/hashalgorithms.h b/src/sparkey/hashalgorithms.h new file mode 100644 index 00000000..b60a537f --- /dev/null +++ b/src/sparkey/hashalgorithms.h @@ -0,0 +1,29 @@ +/* +* Copyright (c) 2012-2013 Spotify AB +* +* Licensed under the Apache License, Version 2.0 (the "License"); you may not +* use this file except in compliance with the License. You may obtain a copy of +* the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +* License for the specific language governing permissions and limitations under +* the License. +*/ +#ifndef SPARKEY_HASHALGORITHM_H_INCLUDED +#define SPARKEY_HASHALGORITHM_H_INCLUDED + +#include "endiantools.h" + +typedef struct { + uint64_t (*hash)(const uint8_t *data, uint64_t len, uint32_t seed); + uint64_t (*read_hash)(const uint8_t *data, uint64_t pos); + void (*write_hash)(uint8_t *data, uint64_t hash); +} sparkey_hash_algorithm; + +sparkey_hash_algorithm sparkey_get_hash_algorithm(uint32_t hash_size); + +#endif diff --git a/src/sparkey/hashheader.c b/src/sparkey/hashheader.c new file mode 100644 index 00000000..5176e487 --- /dev/null +++ b/src/sparkey/hashheader.c @@ -0,0 +1,136 @@ +/* +* Copyright (c) 2012-2013 Spotify AB +* +* Licensed under the Apache License, Version 2.0 (the "License"); you may not +* use this file except in compliance with the License. You may obtain a copy of +* the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +* License for the specific language governing permissions and limitations under +* the License. +*/ +#include +#include +#include +#include + +#include "hashheader.h" +#include "endiantools.h" +#include "util.h" +#include "sparkey.h" + +void print_hashheader(sparkey_hashheader *header) { + printf("Hash file version %d.%d\n", header->major_version, header->minor_version); + printf("Identifier: %08x\n", header->file_identifier); + printf("Max key size: %"PRIu64", Max value size: %"PRIu64"\n", header->max_key_len, header->max_value_len); + printf("Hash size: %d bit Murmurhash3\n", 8*header->hash_size); + printf("Num entries: %"PRIu64", Capacity: %"PRIu64"\n", header->num_entries, header->hash_capacity); + printf("Num collisions: %"PRIu64", Max displacement: %"PRIu64", Average displacement: %.2f\n", header->hash_collisions, header->max_displacement, (double) header->total_displacement / (double) header->num_entries); + printf("Data size: %"PRIu64", Garbage size: %"PRIu64"\n", header->data_end, header->garbage_size); +} + +static sparkey_returncode hashheader_version0(sparkey_hashheader *header, FILE *fp) { + RETHROW(fread_little_endian32(fp, &header->file_identifier)); + RETHROW(fread_little_endian32(fp, &header->hash_seed)); + RETHROW(fread_little_endian64(fp, &header->data_end)); + RETHROW(fread_little_endian64(fp, &header->max_key_len)); + RETHROW(fread_little_endian64(fp, &header->max_value_len)); + RETHROW(fread_little_endian64(fp, &header->num_puts)); + RETHROW(fread_little_endian64(fp, &header->garbage_size)); + RETHROW(fread_little_endian64(fp, &header->num_entries)); + + RETHROW(fread_little_endian32(fp, &header->address_size)); + RETHROW(fread_little_endian32(fp, &header->hash_size)); + RETHROW(fread_little_endian64(fp, &header->hash_capacity)); + RETHROW(fread_little_endian64(fp, &header->max_displacement)); + RETHROW(fread_little_endian32(fp, &header->entry_block_bits)); + header->entry_block_bitmask = (1 << header->entry_block_bits) - 1; + RETHROW(fread_little_endian64(fp, &header->hash_collisions)); + RETHROW(fread_little_endian64(fp, &header->total_displacement)); + header->header_size = HASH_HEADER_SIZE; + + header->hash_algorithm = sparkey_get_hash_algorithm(header->hash_size); + if (header->hash_algorithm.hash == NULL) { + return SPARKEY_HASH_HEADER_CORRUPT; + } + // Some basic consistency checks + if (header->num_entries > header->num_puts) { + return SPARKEY_HASH_HEADER_CORRUPT; + } + if (header->max_displacement > header->num_entries) { + return SPARKEY_HASH_HEADER_CORRUPT; + } + if (header->hash_collisions > header->num_entries) { + return SPARKEY_HASH_HEADER_CORRUPT; + } + + return SPARKEY_SUCCESS; +} + + +typedef sparkey_returncode (*loader)(sparkey_hashheader *header, FILE *fp); + +static loader loaders[2] = { hashheader_version0, hashheader_version0 }; + +sparkey_returncode sparkey_load_hashheader(sparkey_hashheader *header, const char *filename) { + FILE *fp = fopen(filename, "r"); + if (fp == NULL) { + return sparkey_open_returncode(errno); + } + + uint32_t tmp; + RETHROW(fread_little_endian32(fp, &tmp)); + if (tmp != HASH_MAGIC_NUMBER) { + fclose(fp); + return SPARKEY_WRONG_HASH_MAGIC_NUMBER; + } + RETHROW(fread_little_endian32(fp, &header->major_version)); + if (header->major_version != HASH_MAJOR_VERSION) { + fclose(fp); + return SPARKEY_WRONG_HASH_MAJOR_VERSION; + } + RETHROW(fread_little_endian32(fp, &header->minor_version)); + if (header->minor_version > HASH_MINOR_VERSION) { + fclose(fp); + return SPARKEY_UNSUPPORTED_HASH_MINOR_VERSION; + } + int version = header->minor_version; + loader l = loaders[version]; + if (l == NULL) { + fclose(fp); + return SPARKEY_INTERNAL_ERROR; + } + sparkey_returncode x = (*l)(header, fp); + fclose(fp); + return x; +} + +sparkey_returncode write_hashheader(int fd, sparkey_hashheader *header) { + RETHROW(fwrite_little_endian32(fd, HASH_MAGIC_NUMBER)); + RETHROW(fwrite_little_endian32(fd, HASH_MAJOR_VERSION)); + RETHROW(fwrite_little_endian32(fd, HASH_MINOR_VERSION)); + RETHROW(fwrite_little_endian32(fd, header->file_identifier)); + RETHROW(fwrite_little_endian32(fd, header->hash_seed)); + RETHROW(fwrite_little_endian64(fd, header->data_end)); + RETHROW(fwrite_little_endian64(fd, header->max_key_len)); + RETHROW(fwrite_little_endian64(fd, header->max_value_len)); + RETHROW(fwrite_little_endian64(fd, header->num_puts)); + RETHROW(fwrite_little_endian64(fd, header->garbage_size)); + RETHROW(fwrite_little_endian64(fd, header->num_entries)); + RETHROW(fwrite_little_endian32(fd, header->address_size)); + RETHROW(fwrite_little_endian32(fd, header->hash_size)); + RETHROW(fwrite_little_endian64(fd, header->hash_capacity)); + RETHROW(fwrite_little_endian64(fd, header->max_displacement)); + RETHROW(fwrite_little_endian32(fd, header->entry_block_bits)); + RETHROW(fwrite_little_endian64(fd, header->hash_collisions)); + RETHROW(fwrite_little_endian64(fd, header->total_displacement)); + + return SPARKEY_SUCCESS; +} + + + diff --git a/src/sparkey/hashheader.h b/src/sparkey/hashheader.h new file mode 100644 index 00000000..f9ce699a --- /dev/null +++ b/src/sparkey/hashheader.h @@ -0,0 +1,99 @@ +/* +* Copyright (c) 2012-2013 Spotify AB +* +* Licensed under the Apache License, Version 2.0 (the "License"); you may not +* use this file except in compliance with the License. You may obtain a copy of +* the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +* License for the specific language governing permissions and limitations under +* the License. +*/ +#ifndef SPARKEY_HASHHEADER_H_INCLUDED +#define SPARKEY_HASHHEADER_H_INCLUDED + +#include +#include "endiantools.h" + +#include "sparkey.h" +#include "hashalgorithms.h" + +#define HASH_MAGIC_NUMBER (0x9a11318f) +#define HASH_MAJOR_VERSION (1) +#define HASH_MINOR_VERSION (1) +#define HASH_HEADER_SIZE (112) + +typedef struct { + uint32_t major_version; + uint32_t minor_version; + uint32_t file_identifier; + uint32_t hash_seed; + uint32_t header_size; + + uint64_t data_end; + uint64_t max_key_len; + uint64_t max_value_len; + + uint64_t garbage_size; + uint64_t num_entries; + uint32_t address_size; + uint32_t hash_size; + uint64_t hash_capacity; + uint64_t max_displacement; + uint64_t num_puts; + uint32_t entry_block_bits; + uint32_t entry_block_bitmask; + uint64_t hash_collisions; + uint64_t total_displacement; + sparkey_hash_algorithm hash_algorithm; +} sparkey_hashheader; + +/** + * fills up a hashheader struct based on the contents at the beginning of the file. + * @param header header struct to fill + * @param filename a hash file + * @returns an error code if it could not load the file. + */ +sparkey_returncode sparkey_load_hashheader(sparkey_hashheader *header, const char *filename); + +/** + * Dumps a human readable representation of the header to stdout + * @param header an initialized header struct + */ +void print_hashheader(sparkey_hashheader *header); + +/** + * Writes a header to the current position in the file + * @param fd a file descripter pointing to a file open for writing + * @param header the header to write + * @returns an error code if it could not write to file. + */ +sparkey_returncode write_hashheader(int fd, sparkey_hashheader *header); + +static inline uint64_t get_displacement(uint64_t capacity, uint64_t slot, uint64_t hash) { + uint64_t wanted_slot = hash % capacity; + return (capacity + (slot - wanted_slot)) % capacity; +} + +static inline uint64_t read_addr(uint8_t *hashtable, uint64_t pos, int address_size) { + switch (address_size) { + case 4: return read_little_endian32(hashtable, pos); + case 8: return read_little_endian64(hashtable, pos); + } + return -1; +} + +static inline void write_addr(uint8_t *buf, uint64_t value, int address_size) { + switch (address_size) { + case 4: write_little_endian32(buf, value); return; + case 8: write_little_endian64(buf, value); return; + } +} + + +#endif + diff --git a/src/sparkey/hashiter.c b/src/sparkey/hashiter.c new file mode 100644 index 00000000..27aef4be --- /dev/null +++ b/src/sparkey/hashiter.c @@ -0,0 +1,46 @@ +/* +* Copyright (c) 2012-2013 Spotify AB +* +* Licensed under the Apache License, Version 2.0 (the "License"); you may not +* use this file except in compliance with the License. You may obtain a copy of +* the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +* License for the specific language governing permissions and limitations under +* the License. +*/ +#include +#include + +#include "sparkey.h" +#include "sparkey-internal.h" +#include "hashiter.h" + +uint64_t sparkey_iter_hash(sparkey_hashheader *hash_header, sparkey_logiter *iter, sparkey_logreader *log) { + uint8_t *buf; + uint64_t len; + sparkey_returncode returncode = sparkey_logiter_keychunk(iter, log, 1 << 31, &buf, &len); + if (returncode != SPARKEY_SUCCESS) { + return 0; + } + if (len == iter->keylen) { + return hash_header->hash_algorithm.hash(buf, len, hash_header->hash_seed); + } else { + uint8_t *keybuf = malloc(iter->keylen); + memcpy(keybuf, buf, len); + uint64_t len2; + returncode = sparkey_logiter_fill_key(iter, log, 1 << 31, keybuf + len, &len2); + if (len + len2 != iter->keylen) { + free(keybuf); + return 0; + } + uint64_t hash = hash_header->hash_algorithm.hash(keybuf, iter->keylen, hash_header->hash_seed); + free(keybuf); + return hash; + } +} + diff --git a/src/sparkey/hashiter.h b/src/sparkey/hashiter.h new file mode 100644 index 00000000..70bd6d5a --- /dev/null +++ b/src/sparkey/hashiter.h @@ -0,0 +1,26 @@ +/* +* Copyright (c) 2012-2013 Spotify AB +* +* Licensed under the Apache License, Version 2.0 (the "License"); you may not +* use this file except in compliance with the License. You may obtain a copy of +* the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +* License for the specific language governing permissions and limitations under +* the License. +*/ +#ifndef SPARKEY_HASHITER_H_INCLUDED +#define SPARKEY_HASHITER_H_INCLUDED + +#include "sparkey.h" + +#include "hashheader.h" + +uint64_t sparkey_iter_hash(sparkey_hashheader *hash_header, sparkey_logiter *iter, sparkey_logreader *log); + +#endif + diff --git a/src/sparkey/hashreader.c b/src/sparkey/hashreader.c new file mode 100644 index 00000000..4fd2bd83 --- /dev/null +++ b/src/sparkey/hashreader.c @@ -0,0 +1,255 @@ +/* +* Copyright (c) 2012-2013 Spotify AB +* +* Licensed under the Apache License, Version 2.0 (the "License"); you may not +* use this file except in compliance with the License. You may obtain a copy of +* the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +* License for the specific language governing permissions and limitations under +* the License. +*/ +#include +#include +#include +#include +#include +#include + +#include "hashheader.h" +#include "hashiter.h" +#include "util.h" +#include "endiantools.h" +#include "sparkey.h" +#include "sparkey-internal.h" + +#define MAGIC_VALUE_HASHREADER (0x75103df9) + +sparkey_returncode sparkey_hash_open(sparkey_hashreader **reader_ref, const char *hash_filename, const char *log_filename) { + RETHROW(correct_endian_platform()); + + sparkey_returncode returncode; + + sparkey_hashreader *reader = malloc(sizeof(sparkey_hashreader)); + if (reader == NULL) { + return SPARKEY_INTERNAL_ERROR; + } + + TRY(sparkey_load_hashheader(&reader->header, hash_filename), free_reader); + TRY(sparkey_logreader_open_noalloc(&reader->log, log_filename), free_reader); + if (reader->header.file_identifier != reader->log.header.file_identifier) { + returncode = SPARKEY_FILE_IDENTIFIER_MISMATCH; + goto close_reader; + } + if (reader->header.data_end > reader->log.header.data_end) { + returncode = SPARKEY_HASH_HEADER_CORRUPT; + goto close_reader; + } + if (reader->header.max_key_len > reader->log.header.max_key_len) { + returncode = SPARKEY_HASH_HEADER_CORRUPT; + goto close_reader; + } + if (reader->header.max_value_len > reader->log.header.max_value_len) { + returncode = SPARKEY_HASH_HEADER_CORRUPT; + goto close_reader; + } + + reader->fd = open(hash_filename, O_RDONLY); + if (reader->fd < 0) { + int e = errno; + returncode = sparkey_open_returncode(e); + goto close_reader; + } + + reader->data_len = reader->header.header_size + reader->header.hash_capacity * (reader->header.hash_size + reader->header.address_size); + + struct stat s; + stat(hash_filename, &s); + if (reader->data_len > (uint64_t) s.st_size) { + returncode = SPARKEY_HASH_TOO_SMALL; + goto close_reader; + } + + reader->data = mmap(NULL, reader->data_len, PROT_READ, MAP_SHARED, reader->fd, 0); + if (reader->data == MAP_FAILED) { + returncode = SPARKEY_MMAP_FAILED; + goto close_reader; + } + + *reader_ref = reader; + reader->open_status = MAGIC_VALUE_HASHREADER; + return SPARKEY_SUCCESS; + +close_reader: + sparkey_hash_close(&reader); + return returncode; + +free_reader: + free(reader); + return returncode; +} + +void sparkey_hash_close(sparkey_hashreader **reader_ref) { + if (reader_ref == NULL) { + return; + } + sparkey_hashreader *reader = *reader_ref; + if (reader == NULL) { + return; + } + + if (reader->open_status != MAGIC_VALUE_HASHREADER) { + return; + } + sparkey_logreader_close_nodealloc(&reader->log); + + reader->open_status = 0; + if (reader->data != NULL) { + munmap(reader->data, reader->data_len); + reader->data = NULL; + } + close(reader->fd); + reader->fd = -1; + free(reader); + *reader_ref = NULL; +} + +static sparkey_returncode assert_reader_open(sparkey_hashreader *reader) { + if (reader->open_status != MAGIC_VALUE_HASHREADER) { + return SPARKEY_HASH_CLOSED; + } + return SPARKEY_SUCCESS; +} + +sparkey_returncode sparkey_hash_get(sparkey_hashreader *reader, const uint8_t *key, uint64_t keylen, sparkey_logiter *iter) { + RETHROW(assert_reader_open(reader)); + uint64_t hash = reader->header.hash_algorithm.hash(key, keylen, reader->header.hash_seed); + uint64_t wanted_slot = hash % reader->header.hash_capacity; + + int slot_size = reader->header.address_size + reader->header.hash_size; + uint64_t pos = wanted_slot * slot_size; + + uint64_t displacement = 0; + uint64_t slot = wanted_slot; + + uint8_t *hashtable = reader->data + reader->header.header_size; + + while (1) { + uint64_t hash2 = reader->header.hash_algorithm.read_hash(hashtable, pos); + uint64_t position2 = read_addr(hashtable, pos + reader->header.hash_size, reader->header.address_size); + if (position2 == 0) { + iter->state = SPARKEY_ITER_INVALID; + return SPARKEY_SUCCESS; + } + int entry_index2 = (int) (position2) & reader->header.entry_block_bitmask; + position2 >>= reader->header.entry_block_bits; + if (hash == hash2) { + RETHROW(sparkey_logiter_seek(iter, &reader->log, position2)); + RETHROW(sparkey_logiter_skip(iter, &reader->log, entry_index2)); + RETHROW(sparkey_logiter_next(iter, &reader->log)); + uint64_t keylen2 = iter->keylen; + if (iter->type != SPARKEY_ENTRY_PUT) { + iter->state = SPARKEY_ITER_INVALID; + return SPARKEY_INTERNAL_ERROR; + } + if (keylen == keylen2) { + uint64_t pos2 = 0; + int equals = 1; + while (pos2 < keylen) { + uint8_t *buf2; + uint64_t len2; + RETHROW(sparkey_logiter_keychunk(iter, &reader->log, keylen, &buf2, &len2)); + if (memcmp(&key[pos2], buf2, len2) != 0) { + equals = 0; + break; + } + pos2 += len2; + } + if (equals) { + return SPARKEY_SUCCESS; + } + } + } + uint64_t other_displacement = get_displacement(reader->header.hash_capacity, slot, hash2); + if (displacement > other_displacement) { + iter->state = SPARKEY_ITER_INVALID; + return SPARKEY_SUCCESS; + } + pos += slot_size; + displacement++; + slot++; + if (slot >= reader->header.hash_capacity) { + pos = 0; + slot = 0; + } + } + iter->state = SPARKEY_ITER_INVALID; + return SPARKEY_INTERNAL_ERROR; +} + +sparkey_returncode sparkey_logiter_hashnext(sparkey_logiter *iter, sparkey_hashreader *reader) { + RETHROW(assert_reader_open(reader)); + + uint8_t *hashtable = reader->data + reader->header.header_size; + int slot_size = reader->header.address_size + reader->header.hash_size; + + while (1) { + RETHROW(sparkey_logiter_next(iter, &reader->log)); + if (iter->state != SPARKEY_ITER_ACTIVE) { + return SPARKEY_SUCCESS; + } + if (iter->type != SPARKEY_ENTRY_PUT) { + continue; + } + uint64_t position = (iter->entry_block_position << reader->header.entry_block_bits) | iter->entry_count; + + uint64_t key_hash = sparkey_iter_hash(&reader->header, iter, &reader->log); + uint64_t wanted_slot = key_hash % reader->header.hash_capacity; + + uint64_t pos = wanted_slot * slot_size; + + uint64_t displacement = 0; + uint64_t slot = wanted_slot; + + while (1) { + uint64_t hash2 = reader->header.hash_algorithm.read_hash(hashtable, pos); + uint64_t position2 = read_addr(hashtable, pos + reader->header.hash_size, reader->header.address_size); + if (position2 == 0) { + break; + } + if (position == position2) { + // Found a match! Just reset the iterator + RETHROW(sparkey_logiter_reset(iter, &reader->log)); + return SPARKEY_SUCCESS; + } + uint64_t other_displacement = get_displacement(reader->header.hash_capacity, slot, hash2); + if (displacement > other_displacement) { + break; + } + pos += slot_size; + displacement++; + slot++; + if (slot >= reader->header.hash_capacity) { + pos = 0; + slot = 0; + } + } + } +} + +sparkey_logreader * sparkey_hash_getreader(sparkey_hashreader *reader) { + return &reader->log; +} + +uint64_t sparkey_hash_numentries(sparkey_hashreader *reader) { + return reader->header.num_entries; +} + +uint64_t sparkey_hash_numcollisions(sparkey_hashreader *reader) { + return reader->header.hash_collisions; +} + diff --git a/src/sparkey/hashwriter.c b/src/sparkey/hashwriter.c new file mode 100644 index 00000000..9a57ae2d --- /dev/null +++ b/src/sparkey/hashwriter.c @@ -0,0 +1,521 @@ +/* +* Copyright (c) 2012-2013 Spotify AB +* +* Licensed under the Apache License, Version 2.0 (the "License"); you may not +* use this file except in compliance with the License. You may obtain a copy of +* the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +* License for the specific language governing permissions and limitations under +* the License. +*/ +#include +#include +#include +#include +#include +#include +#include +#include + +#include "sparkey.h" +#include "sparkey-internal.h" + +#include "logheader.h" +#include "endiantools.h" +#include "util.h" +#include "hashheader.h" +#include "hashiter.h" + +static uint32_t int_log2(uint32_t x) { + uint32_t count = 0; + while (x > 0) { + x >>= 1; + count++; + } + return count; +} + +static int unsigned_vlq_size(uint64_t value) { + if (value < 1ULL << 7ULL) { + return 1; + } + if (value < 1ULL << 14ULL) { + return 2; + } + if (value < 1ULL << 21ULL) { + return 3; + } + if (value < 1ULL << 28ULL) { + return 4; + } + if (value < 1ULL << 35ULL) { + return 5; + } + if (value < 1ULL << 42ULL) { + return 6; + } + if (value < 1ULL << 49ULL) { + return 7; + } + if (value < 1ULL << 56ULL) { + return 8; + } + if (value < 1ULL << 63ULL) { + return 9; + } + return 10; +} + + +static void added_entry(sparkey_hashheader *hash_header) { + hash_header->num_entries++; +} + +static void replaced_entry(sparkey_hashheader *hash_header, uint64_t keylen, uint64_t valuelen) { + hash_header->garbage_size += keylen + valuelen + unsigned_vlq_size(keylen + 1) + unsigned_vlq_size(valuelen); +} + +static void deleted_entry(sparkey_hashheader *hash_header, uint64_t keylen, uint64_t valuelen) { + hash_header->garbage_size += keylen + valuelen + unsigned_vlq_size(keylen + 1) + unsigned_vlq_size(valuelen); + hash_header->num_entries--; +} + +static sparkey_returncode hash_delete(uint64_t wanted_slot, uint64_t hash, uint8_t *hashtable, sparkey_hashheader *hash_header, sparkey_logiter *iter, sparkey_logiter *ra_iter, sparkey_logreader *log) { + int slot_size = hash_header->address_size + hash_header->hash_size; + uint64_t pos = wanted_slot * slot_size; + + uint64_t displacement = 0; + uint64_t slot = wanted_slot; + + while (1) { + uint64_t hash2 = hash_header->hash_algorithm.read_hash(hashtable, pos); + uint64_t position2 = read_addr(hashtable, pos + hash_header->hash_size, hash_header->address_size); + if (position2 == 0) { + return SPARKEY_SUCCESS; + } + int entry_index2 = (int) (position2) & hash_header->entry_block_bitmask; + position2 >>= hash_header->entry_block_bits; + if (position2 < log->header.header_size || position2 >= log->header.data_end ) { + fprintf(stderr, "hash_delete():%d bug: found pointer outside of range %"PRIu64"\n", __LINE__, position2); + return SPARKEY_INTERNAL_ERROR; + } + if (hash == hash2) { + RETHROW(sparkey_logiter_seek(ra_iter, log, position2)); + RETHROW(sparkey_logiter_skip(ra_iter, log, entry_index2)); + RETHROW(sparkey_logiter_next(ra_iter, log)); + uint64_t keylen2 = ra_iter->keylen; + uint64_t valuelen2 = ra_iter->valuelen; + if (ra_iter->type != SPARKEY_ENTRY_PUT) { + fprintf(stderr, "hash_delete():%d bug: expected a put entry but found %d\n", __LINE__, ra_iter->type); + return SPARKEY_INTERNAL_ERROR; + } + if (iter->keylen == keylen2) { + RETHROW(sparkey_logiter_reset(iter, log)); + int cmp; + RETHROW(sparkey_logiter_keycmp(iter, ra_iter, log, &cmp)); + if (cmp == 0) { + // TODO: possibly optimize this to read and write stuff to move in chunks instead of one by one, to decrease number of seeks. + while (1) { + uint64_t next_slot = (slot + 1) % hash_header->hash_capacity; + uint64_t next_pos = next_slot * slot_size; + + uint64_t hash3 = hash_header->hash_algorithm.read_hash(hashtable, next_pos); + uint64_t position3 = read_addr(hashtable, next_pos + hash_header->hash_size, hash_header->address_size); + if (position3 == 0) { + break; + } + if ((hash3 % hash_header->hash_capacity) == next_slot) { + break; + } + + uint64_t pos3 = slot * slot_size; + hash_header->hash_algorithm.write_hash(&hashtable[pos3], hash3); + write_addr(&hashtable[pos3 + hash_header->hash_size], position3, hash_header->address_size); + + slot = next_slot; + } + + uint64_t pos3 = slot * slot_size; + hash_header->hash_algorithm.write_hash(&hashtable[pos3], 0); + write_addr(&hashtable[pos3 + hash_header->hash_size], 0, hash_header->address_size); + deleted_entry(hash_header, keylen2, valuelen2); + + return SPARKEY_SUCCESS; + + } + } + } + uint64_t other_displacement = get_displacement(hash_header->hash_capacity, slot, hash2); + if (displacement > other_displacement) { + return SPARKEY_SUCCESS; + } + pos += slot_size; + displacement++; + slot++; + if (slot >= hash_header->hash_capacity) { + pos = 0; + slot = 0; + } + } + fprintf(stderr, "hash_put():%d bug: unreachable statement\n", __LINE__); + return SPARKEY_INTERNAL_ERROR; +} + +static sparkey_returncode hash_put(uint64_t wanted_slot, uint64_t hash, uint8_t *hashtable, sparkey_hashheader *hash_header, sparkey_logiter *iter, sparkey_logiter *ra_iter, sparkey_logreader *log, uint64_t position) { + int slot_size = hash_header->address_size + hash_header->hash_size; + uint64_t pos = wanted_slot * slot_size; + + uint64_t displacement = 0; + uint64_t slot = wanted_slot; + + int might_be_collision = iter != NULL && ra_iter != NULL && log != NULL; + while (1) { + uint64_t hash2 = hash_header->hash_algorithm.read_hash(hashtable, pos); + uint64_t position2 = read_addr(hashtable, pos + hash_header->hash_size, hash_header->address_size); + if (position2 == 0) { + hash_header->hash_algorithm.write_hash(&hashtable[pos], hash); + write_addr(&hashtable[pos + hash_header->hash_size], position, hash_header->address_size); + added_entry(hash_header); + return SPARKEY_SUCCESS; + } + + int entry_index2 = (int) (position2) & hash_header->entry_block_bitmask; + uint64_t position3 = position2 >> hash_header->entry_block_bits; + + if (might_be_collision && hash == hash2) { + RETHROW(sparkey_logiter_seek(ra_iter, log, position3)); + RETHROW(sparkey_logiter_skip(ra_iter, log, entry_index2)); + RETHROW(sparkey_logiter_next(ra_iter, log)); + uint64_t keylen2 = ra_iter->keylen; + uint64_t valuelen2 = ra_iter->valuelen; + if (ra_iter->type != SPARKEY_ENTRY_PUT) { + fprintf(stderr, "hash_put():%d bug: expected a put entry but found %d\n", __LINE__, ra_iter->type); + return SPARKEY_INTERNAL_ERROR; + } + if (iter->keylen == keylen2) { + RETHROW(sparkey_logiter_reset(iter, log)); + int cmp; + RETHROW(sparkey_logiter_keycmp(iter, ra_iter, log, &cmp)); + if (cmp == 0) { + hash_header->hash_algorithm.write_hash(&hashtable[pos], hash); + write_addr(&hashtable[pos + hash_header->hash_size], position, hash_header->address_size); + replaced_entry(hash_header, keylen2, valuelen2); + return SPARKEY_SUCCESS; + } + } + } + + uint64_t other_displacement = get_displacement(hash_header->hash_capacity, slot, hash2); + if (displacement > other_displacement) { + // Steal the slot, and move the other one + hash_header->hash_algorithm.write_hash(&hashtable[pos], hash); + write_addr(&hashtable[pos + hash_header->hash_size], position, hash_header->address_size); + position = position2; + displacement = other_displacement; + hash = hash2; + might_be_collision = 0; + } + pos += slot_size; + displacement++; + slot++; + if (slot >= hash_header->hash_capacity) { + pos = 0; + slot = 0; + } + } + fprintf(stderr, "hash_put():%d bug: unreachable statement\n", __LINE__); + return SPARKEY_INTERNAL_ERROR; +} + +static void calculate_max_displacement(sparkey_hashheader *hash_header, uint8_t *hashtable) { + uint64_t capacity = hash_header->hash_capacity; + int hash_size = hash_header->hash_size; + int slot_size = hash_header->address_size + hash_size; + + uint64_t max_displacement = 0; + uint64_t num_hash_collisions = 0; + uint64_t total_displacement = 0; + + int has_first = 0; + uint64_t first_hash = 0; + + int has_last = 0; + uint64_t last_hash = 0; + + int has_prev = 0; + uint64_t prev_hash = -1; + for (uint64_t slot = 0; slot < capacity; slot++) { + uint64_t hash = hash_header->hash_algorithm.read_hash(hashtable, slot * slot_size); + if (has_prev && prev_hash == hash) { + num_hash_collisions++; + } + uint64_t position = read_addr(hashtable, slot * slot_size + hash_size, hash_header->address_size); + if (position != 0) { + prev_hash = hash; + has_prev = 1; + uint64_t displacement = get_displacement(capacity, slot, hash); + total_displacement += displacement; + if (displacement > max_displacement) { + max_displacement = displacement; + } + if (slot == 0) { + first_hash = hash; + has_first = 1; + } + if (slot == capacity - 1) { + last_hash = hash; + has_last = 1; + } + } else { + has_prev = 0; + } + } + if (has_first && has_last && first_hash == last_hash) { + num_hash_collisions++; + } + hash_header->total_displacement = total_displacement; + hash_header->max_displacement = max_displacement; + hash_header->hash_collisions = num_hash_collisions; +} + +static sparkey_returncode read_fully(int fd, uint8_t *buf, size_t count) { + while (count > 0) { + ssize_t actual_read = read(fd, buf, count); + if (actual_read < 0) { + fprintf(stderr, "read_fully():%d bug: actual_read = %"PRIu64", errno = %d\n", __LINE__, (uint64_t)actual_read, errno); + return SPARKEY_INTERNAL_ERROR; + } + count -= actual_read; + } + return SPARKEY_SUCCESS; +} + +static sparkey_returncode hash_copy(uint8_t *hashtable, uint8_t *buf, size_t buffer_size, sparkey_hashheader *old_header, sparkey_hashheader *new_header) { + int slot_size = old_header->address_size + old_header->hash_size; + for (unsigned int i = 0; i < buffer_size; i += slot_size) { + uint64_t hash = old_header->hash_algorithm.read_hash(buf, i); + uint64_t position = read_addr(buf, i + old_header->hash_size, old_header->address_size); + + int entry_index = (int) (position) & old_header->entry_block_bitmask; + position >>= old_header->entry_block_bits; + + uint64_t wanted_slot = hash % new_header->hash_capacity; + if (position != 0) { + RETHROW(hash_put(wanted_slot, hash, hashtable, new_header, NULL, NULL, NULL, (position << new_header->entry_block_bits) | entry_index)); + } + } + return SPARKEY_SUCCESS; +} + +static sparkey_returncode fill_hash(uint8_t *hashtable, const char *hash_filename, sparkey_hashheader *old_header, sparkey_hashheader *new_header) { + int fd = open(hash_filename, O_RDONLY); + if (fd < 0) { + return sparkey_open_returncode(errno); + } + + lseek(fd, old_header->header_size, SEEK_SET); + + int slot_size = old_header->address_size + old_header->hash_size; + uint64_t buffer_size = slot_size * 1024; + uint8_t *buf = malloc(buffer_size); + if (buf == NULL) { + fprintf(stderr, "fill_hash():%d bug: could not malloc %"PRIu64" bytes\n", __LINE__, buffer_size); + return SPARKEY_INTERNAL_ERROR; + } + + sparkey_returncode returncode = SPARKEY_SUCCESS; + uint64_t remaining = old_header->hash_capacity * slot_size; + while (buffer_size <= remaining) { + TRY(read_fully(fd, buf, buffer_size), free); + TRY(hash_copy(hashtable, buf, buffer_size, old_header, new_header), free); + remaining -= buffer_size; + } + TRY(read_fully(fd, buf, remaining), free); + TRY(hash_copy(hashtable, buf, remaining, old_header, new_header), free); + +free: + free(buf); + if (close(fd) < 0) { + if (returncode == SPARKEY_SUCCESS) { + fprintf(stderr, "fill_hash():%d bug: could not close file. errno = %d\n", __LINE__, errno); + returncode = SPARKEY_INTERNAL_ERROR; + } + } + + return returncode; +} + +sparkey_returncode sparkey_hash_write(const char *hash_filename, const char *log_filename, int hash_size) { + sparkey_logheader log_header; + sparkey_logreader *log; + sparkey_logiter *iter = NULL; + sparkey_logiter *ra_iter = NULL; + + RETHROW(sparkey_load_logheader(&log_header, log_filename)); + + RETHROW(sparkey_logreader_open(&log, log_filename)); + sparkey_returncode returncode = SPARKEY_SUCCESS; + TRY(sparkey_logiter_create(&iter, log), close_reader); + TRY(sparkey_logiter_create(&ra_iter, log), close_iter); + + sparkey_hashheader hash_header; + sparkey_hashheader old_header; + + double cap; + uint64_t start; + uint32_t hash_seed; + int copy_old; + uint32_t old_hash_size = 0; + returncode = sparkey_load_hashheader(&old_header, hash_filename); + if (returncode == SPARKEY_SUCCESS && + old_header.file_identifier == log_header.file_identifier && + old_header.major_version == HASH_MAJOR_VERSION && + old_header.minor_version == HASH_MINOR_VERSION) { + // Prepare to copy stuff from old header + cap = ((log_header.num_puts - old_header.num_puts) + old_header.num_entries) * 1.3; + start = old_header.data_end; + hash_seed = old_header.hash_seed; + hash_header.garbage_size = old_header.garbage_size; + + copy_old = 1; + old_hash_size = old_header.hash_size; + } else { + cap = log_header.num_puts * 1.3; + start = log_header.header_size; + TRY(rand32(&hash_seed), close_iter); + hash_header.garbage_size = 0; + copy_old = 0; + returncode = SPARKEY_SUCCESS; + } + + hash_header.hash_capacity = 1 | (uint64_t) cap; + + hash_header.hash_seed = hash_seed; + hash_header.max_key_len = log_header.max_key_len; + hash_header.max_value_len = log_header.max_value_len; + hash_header.data_end = log_header.data_end; + hash_header.num_puts = log_header.num_puts; + + hash_header.entry_block_bits = int_log2(log_header.max_entries_per_block); + hash_header.entry_block_bitmask = (1 << hash_header.entry_block_bits) - 1; + + if (hash_header.data_end < (1ULL << (32 - hash_header.entry_block_bits))) { + hash_header.address_size = 4; + } else { + hash_header.address_size = 8; + } + if (old_hash_size == 8 || hash_header.hash_capacity >= (1 << 23)) { + hash_header.hash_size = 8; + } else { + hash_header.hash_size = 4; + } + if (hash_size != 0) { + if (hash_size == 4 || hash_size == 8) { + hash_header.hash_size = hash_size; + } else { + returncode = SPARKEY_HASH_SIZE_INVALID; + goto close_iter; + } + } + if (hash_header.hash_size != old_hash_size) { + copy_old = 0; + } + hash_header.hash_algorithm = sparkey_get_hash_algorithm(hash_header.hash_size); + + int slot_size = hash_header.hash_size + hash_header.address_size; + uint64_t hashsize = slot_size * hash_header.hash_capacity; + uint8_t *hashtable = malloc(hashsize); + if (hashtable == NULL) { + fprintf(stderr, "sparkey_hash_write():%d bug: could not malloc %"PRIu64" bytes\n", __LINE__, hashsize); + returncode = SPARKEY_INTERNAL_ERROR; + goto close_iter; + } + memset(hashtable, 0, hashsize); + + hash_header.max_displacement = 0; + hash_header.total_displacement = 0; + hash_header.num_entries = 0; + hash_header.hash_collisions = 0; + + if (copy_old) { + if (old_header.data_end == log->header.data_end) { + // Nothing needs to be done - just exit + goto close_iter; + } + TRY(fill_hash(hashtable, hash_filename, &old_header, &hash_header), free_hashtable); + TRY(sparkey_logiter_seek(iter, log, start), free_hashtable); + } + + while (1) { + TRY(sparkey_logiter_next(iter, log), free_hashtable); + switch (iter->state) { + case SPARKEY_ITER_CLOSED: + goto normal_exit; + break; + case SPARKEY_ITER_ACTIVE: + break; + default: + fprintf(stderr, "sparkey_hash_write():%d bug: invalid iter state: %d\n", __LINE__, iter->state); + returncode = SPARKEY_INTERNAL_ERROR; + goto free_hashtable; + break; + } + + uint64_t iter_block_start = iter->block_position; + uint64_t iter_entry_count = iter->entry_count; + + uint64_t key_hash = sparkey_iter_hash(&hash_header, iter, log); + uint64_t wanted_slot = key_hash % hash_header.hash_capacity; + + switch (iter->type) { + case SPARKEY_ENTRY_PUT: + TRY(hash_put(wanted_slot, key_hash, hashtable, &hash_header, iter, ra_iter, log, (iter_block_start << hash_header.entry_block_bits) | iter_entry_count), free_hashtable); + break; + case SPARKEY_ENTRY_DELETE: + hash_header.garbage_size += 1 + unsigned_vlq_size(iter->keylen) + iter->keylen; + TRY(hash_delete(wanted_slot, key_hash, hashtable, &hash_header, iter, ra_iter, log), free_hashtable); + break; + } + } +normal_exit: + + calculate_max_displacement(&hash_header, hashtable); + + // Try removing it first, to avoid overwriting existing files that readers may be using. + if (remove(hash_filename) < 0) { + int e = errno; + if (e != ENOENT) { + returncode = sparkey_remove_returncode(e); + goto free_hashtable; + } + } + int fd = creat(hash_filename, 00644); + hash_header.major_version = HASH_MAJOR_VERSION; + hash_header.minor_version = HASH_MINOR_VERSION; + hash_header.file_identifier = log_header.file_identifier; + hash_header.data_end = log_header.data_end; + + TRY(write_hashheader(fd, &hash_header), close_hash); + TRY(write_full(fd, hashtable, hashsize), close_hash); + +close_hash: + close(fd); + +free_hashtable: + free(hashtable); + +close_iter: + sparkey_logiter_close(&iter); + sparkey_logiter_close(&ra_iter); + +close_reader: + sparkey_logreader_close(&log); + + return returncode; +} + diff --git a/src/sparkey/logheader.c b/src/sparkey/logheader.c new file mode 100644 index 00000000..2a2e6a1a --- /dev/null +++ b/src/sparkey/logheader.c @@ -0,0 +1,124 @@ +/* +* Copyright (c) 2012-2013 Spotify AB +* +* Licensed under the Apache License, Version 2.0 (the "License"); you may not +* use this file except in compliance with the License. You may obtain a copy of +* the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +* License for the specific language governing permissions and limitations under +* the License. +*/ +#include +#include +#include +#include + +#include "logheader.h" +#include "endiantools.h" +#include "util.h" + +static char * compression_types[] = { "Uncompressed", "Snappy", NULL }; + +void print_logheader(sparkey_logheader *header) { + printf("Log file version %d.%d\n", header->major_version, + header->minor_version); + printf("Identifier: %08x\n", header->file_identifier); + printf("Puts: %"PRIu64", Deletes: %"PRIu64"\n", header->num_puts, header->num_deletes); + printf("Max key size: %"PRIu64", Max value size: %"PRIu64"\n", header->max_key_len, header->max_value_len); + printf("Compression: %s, block size: %d\n", + compression_types[header->compression_type], + header->compression_block_size); +} + +static sparkey_returncode logheader_version0(sparkey_logheader *header, FILE *fp) { + RETHROW(fread_little_endian32(fp, &header->file_identifier)); + RETHROW(fread_little_endian64(fp, &header->num_puts)); + RETHROW(fread_little_endian64(fp, &header->num_deletes)); + RETHROW(fread_little_endian64(fp, &header->data_end)); + RETHROW(fread_little_endian64(fp, &header->max_key_len)); + RETHROW(fread_little_endian64(fp, &header->max_value_len)); + RETHROW(fread_little_endian64(fp, &header->delete_size)); + RETHROW(fread_little_endian32(fp, &header->compression_type)); + RETHROW(fread_little_endian32(fp, &header->compression_block_size)); + RETHROW(fread_little_endian64(fp, &header->put_size)); + RETHROW(fread_little_endian32(fp, &header->max_entries_per_block)); + header->header_size = LOG_HEADER_SIZE; + + // Some basic consistency checks + if (header->data_end < header->header_size) { + return SPARKEY_LOG_HEADER_CORRUPT; + } + if (header->num_puts > header->data_end) { + return SPARKEY_LOG_HEADER_CORRUPT; + } + if (header->num_deletes > header->data_end) { + return SPARKEY_LOG_HEADER_CORRUPT; + } + if (header->compression_type > SPARKEY_COMPRESSION_SNAPPY) { + return SPARKEY_LOG_HEADER_CORRUPT; + } + return SPARKEY_SUCCESS; +} + + +typedef sparkey_returncode (*loader)(sparkey_logheader *header, FILE *fp); + +static loader loaders[1] = { logheader_version0 }; + +sparkey_returncode sparkey_load_logheader(sparkey_logheader *header, const char *filename) { + FILE *fp = fopen(filename, "r"); + if (fp == NULL) { + return sparkey_open_returncode(errno); + } + + uint32_t tmp; + RETHROW(fread_little_endian32(fp, &tmp)); + if (tmp != LOG_MAGIC_NUMBER) { + fclose(fp); + return SPARKEY_WRONG_LOG_MAGIC_NUMBER; + } + RETHROW(fread_little_endian32(fp, &header->major_version)); + if (header->major_version != LOG_MAJOR_VERSION) { + fclose(fp); + return SPARKEY_WRONG_LOG_MAJOR_VERSION; + } + RETHROW(fread_little_endian32(fp, &header->minor_version)); + if (header->minor_version > LOG_MINOR_VERSION) { + fclose(fp); + return SPARKEY_UNSUPPORTED_LOG_MINOR_VERSION; + } + int version = header->minor_version; + loader l = loaders[version]; + if (l == NULL) { + fclose(fp); + return SPARKEY_INTERNAL_ERROR; + } + sparkey_returncode x = (*l)(header, fp); + fclose(fp); + return x; +} + +sparkey_returncode write_logheader(int fd, sparkey_logheader *header) { + RETHROW(fwrite_little_endian32(fd, LOG_MAGIC_NUMBER)); + RETHROW(fwrite_little_endian32(fd, LOG_MAJOR_VERSION)); + RETHROW(fwrite_little_endian32(fd, LOG_MINOR_VERSION)); + RETHROW(fwrite_little_endian32(fd, header->file_identifier)); + RETHROW(fwrite_little_endian64(fd, header->num_puts)); + RETHROW(fwrite_little_endian64(fd, header->num_deletes)); + RETHROW(fwrite_little_endian64(fd, header->data_end)); + RETHROW(fwrite_little_endian64(fd, header->max_key_len)); + RETHROW(fwrite_little_endian64(fd, header->max_value_len)); + RETHROW(fwrite_little_endian64(fd, header->delete_size)); + RETHROW(fwrite_little_endian32(fd, header->compression_type)); + RETHROW(fwrite_little_endian32(fd, header->compression_block_size)); + RETHROW(fwrite_little_endian64(fd, header->put_size)); + RETHROW(fwrite_little_endian32(fd, header->max_entries_per_block)); + return SPARKEY_SUCCESS; +} + + diff --git a/src/sparkey/logheader.h b/src/sparkey/logheader.h new file mode 100644 index 00000000..3ebe3cc7 --- /dev/null +++ b/src/sparkey/logheader.h @@ -0,0 +1,68 @@ +/* +* Copyright (c) 2012-2013 Spotify AB +* +* Licensed under the Apache License, Version 2.0 (the "License"); you may not +* use this file except in compliance with the License. You may obtain a copy of +* the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +* License for the specific language governing permissions and limitations under +* the License. +*/ +#ifndef SPARKEY_LOGHEADER_H_INCLUDED +#define SPARKEY_LOGHEADER_H_INCLUDED + +#include + +#include "sparkey.h" + +#define LOG_MAGIC_NUMBER (0x49b39c95) +#define LOG_MAJOR_VERSION (1) +#define LOG_MINOR_VERSION (0) +#define LOG_HEADER_SIZE (84) + +typedef struct { + uint32_t major_version; + uint32_t minor_version; + uint32_t file_identifier; + uint64_t num_puts; + uint64_t num_deletes; + uint64_t data_end; + uint64_t max_key_len; + uint64_t max_value_len; + uint64_t delete_size; + sparkey_compression_type compression_type; + uint32_t compression_block_size; + uint64_t put_size; + uint32_t header_size; + uint32_t max_entries_per_block; +} sparkey_logheader; + +/** + * fills up a logheader struct based on the contents at the beginning of the file. + * @param header header struct to fill + * @param filename a log file + * @returns an error code if it could not load the file. + */ +sparkey_returncode sparkey_load_logheader(sparkey_logheader *header, const char *filename); + +/** + * Dumps a human readable representation of the header to stdout + * @param header an initialized header struct + */ +void print_logheader(sparkey_logheader *header); + +/** + * Writes a header to the current position in the file + * @param fd a file descripter pointing to a file open for writing + * @param header the header to write + * @returns an error code if it could not write to file. + */ +sparkey_returncode write_logheader(int fd, sparkey_logheader *header); + +#endif + diff --git a/src/sparkey/logreader.c b/src/sparkey/logreader.c new file mode 100644 index 00000000..19897dc5 --- /dev/null +++ b/src/sparkey/logreader.c @@ -0,0 +1,511 @@ +/* +* Copyright (c) 2012-2013 Spotify AB +* +* Licensed under the Apache License, Version 2.0 (the "License"); you may not +* use this file except in compliance with the License. You may obtain a copy of +* the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +* License for the specific language governing permissions and limitations under +* the License. +*/ +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "sparkey.h" +#include "sparkey-internal.h" +#include "logheader.h" +#include "endiantools.h" +#include "util.h" + +#define MAGIC_VALUE_LOGITER (0xd765c8cc) +#define MAGIC_VALUE_LOGREADER (0xe93356c4) + +static inline uint64_t min64(uint64_t a, uint64_t b) { + if (a < b) { + return a; + } + return b; +} + +static inline uint64_t read_vlq(uint8_t * array, uint64_t *position) { + uint64_t res = 0; + uint64_t shift = 0; + uint64_t tmp, tmp2; + while (1) { + tmp = array[(*position)++]; + tmp2 = tmp & 0x7f; + if (tmp == tmp2) { + return res | tmp << shift; + } + res |= tmp2 << shift; + shift += 7; + } + return res; +} + +sparkey_returncode sparkey_logreader_open_noalloc(sparkey_logreader *log, const char *filename) { + int fd = 0; + sparkey_returncode returncode; + TRY(sparkey_load_logheader(&log->header, filename), cleanup); + log->data_len = log->header.data_end; + + struct stat s; + stat(filename, &s); + if (log->data_len > (uint64_t) s.st_size) { + returncode = SPARKEY_LOG_TOO_SMALL; + goto cleanup; + } + + fd = open(filename, O_RDONLY); + if (fd < 0) { + returncode = sparkey_open_returncode(errno); + goto cleanup; + } + log->fd = fd; + + log->data = mmap(NULL, log->data_len, PROT_READ, MAP_SHARED, fd, 0); + if (log->data == MAP_FAILED) { + returncode = SPARKEY_MMAP_FAILED; + goto cleanup; + } + + log->open_status = MAGIC_VALUE_LOGREADER; + return SPARKEY_SUCCESS; + +cleanup: + if (fd > 0) close(fd); + return returncode; +} + +sparkey_returncode sparkey_logreader_open(sparkey_logreader **log_ref, const char *filename) { + RETHROW(correct_endian_platform()); + + sparkey_logreader *log = malloc(sizeof(sparkey_logreader)); + if (log == NULL) { + return SPARKEY_INTERNAL_ERROR; + } + + sparkey_returncode returncode; + TRY(sparkey_logreader_open_noalloc(log, filename), cleanup); + + *log_ref = log; + return SPARKEY_SUCCESS; + +cleanup: + free(log); + return returncode; +} + +void sparkey_logreader_close_nodealloc(sparkey_logreader *log) { + if (log == NULL) { + return; + } + if (log->open_status != MAGIC_VALUE_LOGREADER) { + return; + } + log->open_status = 0; + if (log->data != NULL) { + munmap(log->data, log->data_len); + log->data = NULL; + } + close(log->fd); + log->fd = -1; +} + +void sparkey_logreader_close(sparkey_logreader **log_ref) { + if (log_ref == NULL) { + return; + } + sparkey_logreader *log = *log_ref; + sparkey_logreader_close_nodealloc(log); + free(log); + *log_ref = NULL; +} + +static sparkey_returncode assert_log_open(sparkey_logreader *log) { + if (log->open_status != MAGIC_VALUE_LOGREADER) { + return SPARKEY_LOG_CLOSED; + } + return SPARKEY_SUCCESS; +} + +static sparkey_returncode assert_iter_open(sparkey_logiter *iter, sparkey_logreader *log) { + RETHROW(assert_log_open(log)); + if (iter->open_status != MAGIC_VALUE_LOGITER) { + return SPARKEY_LOG_ITERATOR_CLOSED; + } + if (iter->file_identifier != log->header.file_identifier) { + return SPARKEY_LOG_ITERATOR_MISMATCH; + } + return SPARKEY_SUCCESS; +} + +sparkey_returncode sparkey_logiter_create(sparkey_logiter **iter_ref, sparkey_logreader *log) { + RETHROW(assert_log_open(log)); + + sparkey_logiter *iter = malloc(sizeof(sparkey_logiter)); + if (iter == NULL) { + return SPARKEY_INTERNAL_ERROR; + } + + iter->open_status = MAGIC_VALUE_LOGITER; + iter->file_identifier = log->header.file_identifier; + iter->block_position = 0; + iter->next_block_position = log->header.header_size; + iter->block_offset = 0; + iter->block_len = 0; + iter->state = SPARKEY_ITER_NEW; + + switch (log->header.compression_type) { + case SPARKEY_COMPRESSION_NONE: + iter->compression_buf_allocated = 0; + break; + case SPARKEY_COMPRESSION_SNAPPY: + iter->compression_buf_allocated = 1; + iter->compression_buf = malloc(log->header.compression_block_size); + if (iter->compression_buf == NULL) { + free(iter); + return SPARKEY_INTERNAL_ERROR; + } + break; + default: + free(iter); + return SPARKEY_INTERNAL_ERROR; + } + + *iter_ref = iter; + return SPARKEY_SUCCESS; +} + +void sparkey_logiter_close(sparkey_logiter **iter_ref) { + if (iter_ref == NULL) { + return; + } + sparkey_logiter *iter = *iter_ref; + if (iter == NULL) { + return; + } + if (iter->open_status != MAGIC_VALUE_LOGITER) { + return; + } + iter->open_status = 0; + + if (iter->compression_buf_allocated) { + free(iter->compression_buf); + } + free(iter); + *iter_ref = NULL; +} + +static sparkey_returncode seekblock(sparkey_logiter *iter, sparkey_logreader *log, uint64_t position) { + iter->block_offset = 0; + if (iter->block_position == position) { + return SPARKEY_SUCCESS; + } + if (log->header.compression_type == SPARKEY_COMPRESSION_NONE) { + iter->compression_buf = &log->data[position]; + iter->block_position = position; + iter->next_block_position = log->header.data_end; + iter->block_len = log->data_len - position; + return SPARKEY_SUCCESS; + } + if (log->header.compression_type == SPARKEY_COMPRESSION_SNAPPY) { + uint64_t pos = position; + // TODO: assert that size_t >= uint64_t + size_t compressed_size = read_vlq(log->data, &pos); + uint64_t next_pos = pos + compressed_size; + const char *input = (char *) &log->data[pos]; + + size_t uncompressed_size = log->header.compression_block_size; + snappy_status status = snappy_uncompress(input, compressed_size, (char *) iter->compression_buf, &uncompressed_size); + switch (status) { + case SNAPPY_OK: break; + case SNAPPY_INVALID_INPUT: + return SPARKEY_INTERNAL_ERROR; + case SNAPPY_BUFFER_TOO_SMALL: + return SPARKEY_INTERNAL_ERROR; + default: + return SPARKEY_INTERNAL_ERROR; + } + iter->block_position = position; + iter->next_block_position = next_pos; + iter->block_len = uncompressed_size; + return SPARKEY_SUCCESS; + } + + return SPARKEY_INTERNAL_ERROR; +} + +sparkey_returncode sparkey_logiter_seek(sparkey_logiter *iter, sparkey_logreader *log, uint64_t position) { + RETHROW(assert_iter_open(iter, log)); + if (position == log->header.data_end) { + iter->state = SPARKEY_ITER_CLOSED; + return SPARKEY_SUCCESS; + } + RETHROW(seekblock(iter, log, position)); + iter->entry_count = -1; + iter->state = SPARKEY_ITER_NEW; + return SPARKEY_SUCCESS; +} + +static sparkey_returncode ensure_available(sparkey_logiter *iter, sparkey_logreader *log) { + if (iter->block_offset < iter->block_len) { + return SPARKEY_SUCCESS; + } + + if (iter->next_block_position >= log->header.data_end) { + iter->block_position = 0; + iter->block_offset = 0; + iter->block_len = 0; + return SPARKEY_SUCCESS; + } + RETHROW(seekblock(iter, log, iter->next_block_position)); + iter->entry_count = -1; + + return SPARKEY_SUCCESS; +} + +static sparkey_returncode skip(sparkey_logiter *iter, sparkey_logreader *log, uint64_t len) { + while (len > 0) { + RETHROW(ensure_available(iter, log)); + uint64_t m = min64(len, iter->block_len - iter->block_offset); + len -= m; + iter->block_offset += m; + } + return SPARKEY_SUCCESS; +} + +sparkey_returncode sparkey_logiter_next(sparkey_logiter *iter, sparkey_logreader *log) { + if (iter->state == SPARKEY_ITER_CLOSED) { + return SPARKEY_SUCCESS; + } + uint64_t key_remaining = 0; + uint64_t value_remaining = 0; + if (iter->state == SPARKEY_ITER_ACTIVE) { + key_remaining = iter->key_remaining; + value_remaining = iter->value_remaining; + } + + iter->state = SPARKEY_ITER_INVALID; + iter->key_remaining = 0; + iter->value_remaining = 0; + iter->keylen = 0; + iter->valuelen = 0; + + RETHROW(assert_iter_open(iter, log)); + RETHROW(skip(iter, log, key_remaining)); + RETHROW(skip(iter, log, value_remaining)); + + RETHROW(ensure_available(iter, log)); + if (iter->block_len - iter->block_offset == 0) { + // Reached end of data + iter->state = SPARKEY_ITER_CLOSED; + return SPARKEY_SUCCESS; + } + + if (log->header.compression_type == SPARKEY_COMPRESSION_NONE) { + iter->block_position += iter->block_offset; + iter->block_len -= iter->block_offset; + iter->block_offset = 0; + iter->compression_buf = &log->data[iter->block_position]; + iter->entry_count = -1; + } + + iter->entry_count++; + + uint64_t a = read_vlq(iter->compression_buf, &iter->block_offset); + uint64_t b = read_vlq(iter->compression_buf, &iter->block_offset); + if (a == 0) { + iter->keylen = iter->key_remaining = b; + iter->valuelen = iter->value_remaining = 0; + iter->type = SPARKEY_ENTRY_DELETE; + } else { + iter->keylen = iter->key_remaining = a - 1; + iter->valuelen = iter->value_remaining = b; + iter->type = SPARKEY_ENTRY_PUT; + } + + iter->entry_block_position = iter->block_position; + iter->entry_block_offset = iter->block_offset; + + iter->state = SPARKEY_ITER_ACTIVE; + + return SPARKEY_SUCCESS; +} + +sparkey_returncode sparkey_logiter_reset(sparkey_logiter *iter, sparkey_logreader *log) { + if (iter->state != SPARKEY_ITER_ACTIVE) { + return SPARKEY_LOG_ITERATOR_INACTIVE; + } + RETHROW(seekblock(iter, log, iter->entry_block_position)); + + iter->key_remaining = iter->keylen; + iter->value_remaining = iter->valuelen; + iter->block_offset = iter->entry_block_offset; + return SPARKEY_SUCCESS; +} + +sparkey_returncode sparkey_logiter_skip(sparkey_logiter *iter, sparkey_logreader *log, int count) { + while (count > 0) { + count--; + RETHROW(sparkey_logiter_next(iter, log)); + } + return SPARKEY_SUCCESS; +} + + + +static sparkey_returncode sparkey_logiter_chunk(sparkey_logiter *iter, sparkey_logreader *log, uint64_t maxlen, uint64_t *len, uint8_t ** res, uint64_t *var) { + RETHROW(assert_iter_open(iter, log)); + + if (iter->state != SPARKEY_ITER_ACTIVE) { + return SPARKEY_LOG_ITERATOR_INACTIVE; + } + + if (*var > 0) { + RETHROW(ensure_available(iter, log)); + uint64_t m = min64(*var, iter->block_len - iter->block_offset); + m = min64(maxlen, m); + *len = m; + *res = &iter->compression_buf[iter->block_offset]; + iter->block_offset += m; + *var -= m; + return SPARKEY_SUCCESS; + } + *len = 0; + return SPARKEY_SUCCESS; +} + +sparkey_returncode sparkey_logiter_keychunk(sparkey_logiter *iter, sparkey_logreader *log, uint64_t maxlen, uint8_t ** res, uint64_t *len) { + return sparkey_logiter_chunk(iter, log, maxlen, len, res, &iter->key_remaining); +} + +sparkey_returncode sparkey_logiter_valuechunk(sparkey_logiter *iter, sparkey_logreader *log, uint64_t maxlen, uint8_t ** res, uint64_t *len) { + RETHROW(skip(iter, log, iter->key_remaining)); + iter->key_remaining = 0; + return sparkey_logiter_chunk(iter, log, maxlen, len, res, &iter->value_remaining); +} + +sparkey_returncode sparkey_logiter_fill_key(sparkey_logiter *iter, sparkey_logreader *log, uint64_t maxlen, uint8_t *buf, uint64_t *len) { + *len = 0; + while (maxlen > 0) { + uint8_t *buf2; + uint64_t len2; + RETHROW(sparkey_logiter_keychunk(iter, log, maxlen, &buf2, &len2)); + if (len2 == 0) { + return SPARKEY_SUCCESS; + } + memcpy(buf, buf2, len2); + buf += len2; + *len += len2; + maxlen -= len2; + } + return SPARKEY_SUCCESS; +} + +sparkey_returncode sparkey_logiter_fill_value(sparkey_logiter *iter, sparkey_logreader *log, uint64_t maxlen, uint8_t *buf, uint64_t *len) { + *len = 0; + while (maxlen > 0) { + uint8_t *buf2; + uint64_t len2; + RETHROW(sparkey_logiter_valuechunk(iter, log, maxlen, &buf2, &len2)); + if (len2 == 0) { + return SPARKEY_SUCCESS; + } + memcpy(buf, buf2, len2); + buf += len2; + *len += len2; + maxlen -= len2; + } + return SPARKEY_SUCCESS; +} + +sparkey_returncode sparkey_logiter_keycmp(sparkey_logiter *iter1, sparkey_logiter *iter2, sparkey_logreader *log, int *res) { + uint8_t *first; + uint64_t first_len; + uint8_t *second; + uint64_t second_len; + + RETHROW(sparkey_logiter_keychunk(iter1, log, 1 << 30, &first, &first_len)); + RETHROW(sparkey_logiter_keychunk(iter2, log, 1 << 30, &second, &second_len)); + + while (1) { + if (first_len == 0 && second_len == 0) { + break; + } + if (first_len == 0) { + *res = -1; + return SPARKEY_SUCCESS; + } + if (second_len == 0) { + *res = 1; + return SPARKEY_SUCCESS; + } + + uint64_t cmp_len = min64(first_len, second_len); + int v = memcmp(first, second, cmp_len); + if (v) { + *res = v; + return SPARKEY_SUCCESS; + } + first += cmp_len; + first_len -= cmp_len; + second += cmp_len; + second_len -= cmp_len; + + if (first_len == 0) { + RETHROW(sparkey_logiter_keychunk(iter1, log, 1 << 30, &first, &first_len)); + } + if (second_len == 0) { + RETHROW(sparkey_logiter_keychunk(iter2, log, 1 << 30, &second, &second_len)); + } + } + *res = 0; + return SPARKEY_SUCCESS; +} + + +uint64_t sparkey_logreader_maxkeylen(sparkey_logreader *log) { + return log->header.max_key_len; +} + +uint64_t sparkey_logreader_maxvaluelen(sparkey_logreader *log) { + return log->header.max_value_len; +} + +int sparkey_logreader_get_compression_blocksize(sparkey_logreader *log) { + return log->header.compression_block_size; +} + +sparkey_compression_type sparkey_logreader_get_compression_type(sparkey_logreader *log) { + return log->header.compression_type; +} + +sparkey_iter_state sparkey_logiter_state(sparkey_logiter *iter) { + return iter->state; +} + +sparkey_entry_type sparkey_logiter_type(sparkey_logiter *iter) { + return iter->type; +} + +uint64_t sparkey_logiter_keylen(sparkey_logiter *iter) { + return iter->keylen; +} + +uint64_t sparkey_logiter_valuelen(sparkey_logiter *iter) { + return iter->valuelen; +} + diff --git a/src/sparkey/logwriter.c b/src/sparkey/logwriter.c new file mode 100644 index 00000000..019330d2 --- /dev/null +++ b/src/sparkey/logwriter.c @@ -0,0 +1,339 @@ +/* +* Copyright (c) 2012-2013 Spotify AB +* +* Licensed under the Apache License, Version 2.0 (the "License"); you may not +* use this file except in compliance with the License. You may obtain a copy of +* the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +* License for the specific language governing permissions and limitations under +* the License. +*/ +#include +#include +#include +#include +#include +#include + +#include + +#include "util.h" +#include "sparkey.h" +#include "logheader.h" +#include "endiantools.h" +#include "buf.h" +#include "sparkey-internal.h" + +#define MAGIC_VALUE_LOGWRITER (0x2866211b) + + +static inline int write_vlq(uint8_t *buf, uint64_t value) { + int count = 1; + while (value >= 1 << 7) { + *buf = (value & 0x7f) | 0x80; + value >>= 7; + count++; + buf++; + } + *buf = value; + return count; +} + +static sparkey_returncode assert_writer_open(sparkey_logwriter *log) { + if (log->open_status != MAGIC_VALUE_LOGWRITER) { + return SPARKEY_LOG_CLOSED; + } + return SPARKEY_SUCCESS; +} + +sparkey_returncode sparkey_logwriter_create(sparkey_logwriter **log_ref, const char *filename, sparkey_compression_type compression_type, int compression_block_size) { + sparkey_returncode returncode; + int fd = 0; + sparkey_logwriter *l = malloc(sizeof(sparkey_logwriter)); + if (l == NULL) { + TRY(SPARKEY_INTERNAL_ERROR, error); + } + switch (compression_type) { + case SPARKEY_COMPRESSION_NONE: + compression_block_size = 0; + l->compressed = NULL; + break; + case SPARKEY_COMPRESSION_SNAPPY: + if (compression_block_size < 10) { + TRY(SPARKEY_INVALID_COMPRESSION_BLOCK_SIZE, error); + } + l->max_compressed_size = snappy_max_compressed_length(compression_block_size); + l->compressed = malloc(l->max_compressed_size); + if (l->compressed == NULL) { + TRY(SPARKEY_INTERNAL_ERROR, error); + } + break; + default: + TRY(SPARKEY_INVALID_COMPRESSION_TYPE, error); + } + + // Try removing it first, to avoid overwriting existing files that readers may be using. + if (remove(filename) < 0) { + int e = errno; + if (e != ENOENT) { + TRY(sparkey_remove_returncode(e), error); + } + } + + fd = open(filename, O_WRONLY | O_TRUNC | O_CREAT, 00644); + if (fd == -1) { + TRY(sparkey_create_returncode(errno), error); + } + l->fd = fd; + + l->header.compression_block_size = compression_block_size; + l->header.compression_type = compression_type; + + TRY(rand32(&(l->header.file_identifier)), error); + l->header.data_end = LOG_HEADER_SIZE; + l->header.major_version = LOG_MAJOR_VERSION; + l->header.minor_version = LOG_MINOR_VERSION; + l->header.put_size = 0; + l->header.delete_size = 0; + l->header.num_puts = 0; + l->header.num_deletes = 0; + l->header.max_entries_per_block = 0; + l->header.max_key_len = 0; + l->header.max_value_len = 0; + + TRY(write_logheader(fd, &l->header), error); + off_t pos = lseek(fd, 0, SEEK_CUR); + if (pos != LOG_HEADER_SIZE) { + TRY(SPARKEY_INTERNAL_ERROR, error); + } + + TRY(buf_init(&l->file_buf, 1024*1024), error); + TRY(buf_init(&l->block_buf, compression_block_size), error); + + l->entry_count = 0; + + l->open_status = MAGIC_VALUE_LOGWRITER; + *log_ref = l; + return SPARKEY_SUCCESS; +error: + free(l); + if (fd > 0) close(fd); + return returncode; +} + +sparkey_returncode sparkey_logwriter_append(sparkey_logwriter **log_ref, const char *filename) { + sparkey_returncode returncode; + int fd = 0; + sparkey_logwriter *log = malloc(sizeof(sparkey_logwriter)); + if (log == NULL) { + TRY(SPARKEY_INTERNAL_ERROR, error); + } + TRY(sparkey_load_logheader(&log->header, filename), error); + + if (log->header.major_version != LOG_MAJOR_VERSION) { + TRY(SPARKEY_WRONG_LOG_MAJOR_VERSION, error); + } + if (log->header.minor_version != LOG_MINOR_VERSION) { + TRY(SPARKEY_UNSUPPORTED_LOG_MINOR_VERSION, error); + } + + switch (log->header.compression_type) { + case SPARKEY_COMPRESSION_NONE: + log->header.compression_block_size = 0; + log->compressed = NULL; + break; + case SPARKEY_COMPRESSION_SNAPPY: + if (log->header.compression_block_size < 10) { + TRY(SPARKEY_INVALID_COMPRESSION_BLOCK_SIZE, error); + } + log->max_compressed_size = snappy_max_compressed_length(log->header.compression_block_size); + log->compressed = malloc(log->max_compressed_size); + break; + default: + TRY(SPARKEY_INVALID_COMPRESSION_TYPE, error); + } + + fd = open(filename, O_WRONLY, 00644); + if (fd == -1) { + int e = errno; + TRY(sparkey_create_returncode(e), error); + } + log->fd = fd; + + lseek(fd, log->header.data_end, SEEK_SET); + + TRY(buf_init(&log->file_buf, 1024*1024), error); + TRY(buf_init(&log->block_buf, log->header.compression_block_size), error); + + log->entry_count = 0; + + log->open_status = MAGIC_VALUE_LOGWRITER; + *log_ref = log; + return SPARKEY_SUCCESS; +error: + free(log); + if (fd > 0) close(fd); + return returncode; +} + +static sparkey_returncode flush_snappy(sparkey_logwriter *log) { + log->flushed = 1; + if (log->entry_count > (int) log->header.max_entries_per_block) { + log->header.max_entries_per_block = log->entry_count; + } + log->entry_count = 0; + sparkey_buf *block_buf = &log->block_buf; + uint8_t *compressed = log->compressed; + uint32_t max_compressed_size = log->max_compressed_size; + sparkey_buf *file_buf = &log->file_buf; + int fd = log->fd; + + size_t compressed_size = max_compressed_size; + snappy_status status = snappy_compress((char *) block_buf->start, buf_used(block_buf), (char *) compressed, &compressed_size); + switch (status) { + case SNAPPY_OK: break; + case SNAPPY_INVALID_INPUT: + case SNAPPY_BUFFER_TOO_SMALL: + default: + return SPARKEY_INTERNAL_ERROR; + } + uint8_t buf1[10]; + ptrdiff_t written1 = write_vlq(buf1, compressed_size); + RETHROW(buf_add(file_buf, fd, buf1, written1)); + RETHROW(buf_add(file_buf, fd, compressed, compressed_size)); + block_buf->cur = block_buf->start; + return SPARKEY_SUCCESS; +} + + +sparkey_returncode sparkey_logwriter_flush(sparkey_logwriter *log) { + RETHROW(assert_writer_open(log)); + if (buf_used(&log->block_buf) > 0) { + RETHROW(flush_snappy(log)); + } + if (buf_used(&log->file_buf) > 0) { + RETHROW(buf_flushfile(&log->file_buf, log->fd)); + } + off_t pos = lseek(log->fd, 0, SEEK_CUR); + log->header.data_end = pos; + lseek(log->fd, 0, SEEK_SET); + RETHROW(write_logheader(log->fd, &log->header)); + lseek(log->fd, pos, SEEK_SET); + + /* Can't build fsync support on lenny */ + /* fsync(log->fd); */ + return SPARKEY_SUCCESS; +} + +sparkey_returncode sparkey_logwriter_close(sparkey_logwriter **log) { + sparkey_logwriter *l = *log; + if (l->open_status != MAGIC_VALUE_LOGWRITER) { + return SPARKEY_SUCCESS; + } + + RETHROW(sparkey_logwriter_flush(l)); + close(l->fd); + buf_close(&l->file_buf); + buf_close(&l->block_buf); + if (l->compressed != NULL) { + free(l->compressed); + } + + l->open_status = 0; + free(l); + *log = NULL; + return SPARKEY_SUCCESS; +} + +static sparkey_returncode snappy_add(sparkey_logwriter *log, const uint8_t *data, ptrdiff_t len) { + sparkey_buf *block_buf = &log->block_buf; + + while (1) { + ptrdiff_t remaining = buf_remaining(block_buf); + if (remaining >= len) { + memcpy(block_buf->cur, data, len); + block_buf->cur += len; + return SPARKEY_SUCCESS; + } else { + memcpy(block_buf->cur, data, remaining); + block_buf->cur += remaining; + data += remaining; + len -= remaining; + RETHROW(flush_snappy(log)); + } + } + return SPARKEY_SUCCESS; +} + + +static sparkey_returncode log_add(sparkey_logwriter *log, uint64_t num1, uint64_t num2, uint64_t len1, const uint8_t *data1, uint64_t len2, const uint8_t *data2, ptrdiff_t *datasize) { + uint8_t buf1[10]; + uint8_t buf2[10]; + + uint64_t written1 = write_vlq(buf1, num1); + uint64_t written2 = write_vlq(buf2, num2); + + *datasize = written1 + written2 + len1 + len2; + uint64_t remaining; + switch (log->header.compression_type) { + case SPARKEY_COMPRESSION_NONE: + RETHROW(buf_add(&log->file_buf, log->fd, buf1, written1)); + RETHROW(buf_add(&log->file_buf, log->fd, buf2, written2)); + RETHROW(buf_add(&log->file_buf, log->fd, data1, len1)); + RETHROW(buf_add(&log->file_buf, log->fd, data2, len2)); + break; + case SPARKEY_COMPRESSION_SNAPPY: + remaining = buf_remaining(&log->block_buf); + // todo: make it smarter by checking if it's better to flush directly + uint64_t fits_in_one = written1 + written2 + len1 + len2 <= buf_size(&log->block_buf); + uint64_t doesnt_fit_this = written1 + written2 + len1 + len2 > buf_remaining(&log->block_buf); + if ((remaining < written1 + written2) || (fits_in_one && doesnt_fit_this)) { + RETHROW(flush_snappy(log)); + } + log->entry_count++; + log->flushed = 0; + RETHROW(snappy_add(log, buf1, written1)); + RETHROW(snappy_add(log, buf2, written2)); + RETHROW(snappy_add(log, data1, len1)); + RETHROW(snappy_add(log, data2, len2)); + if (log->flushed && buf_used(&log->block_buf) > 0) { + RETHROW(flush_snappy(log)); + } + break; + default: + return SPARKEY_INTERNAL_ERROR; + } + return SPARKEY_SUCCESS; +} + +sparkey_returncode sparkey_logwriter_put(sparkey_logwriter *log, uint64_t keylen, const uint8_t *key, uint64_t valuelen, const uint8_t *value) { + RETHROW(assert_writer_open(log)); + ptrdiff_t datasize; + RETHROW(log_add(log, keylen + 1, valuelen, keylen, key, valuelen, value, &datasize)); + + log->header.num_puts++; + log->header.put_size += datasize; + if (keylen > log->header.max_key_len) { + log->header.max_key_len = keylen; + } + if (valuelen > log->header.max_value_len) { + log->header.max_value_len = valuelen; + } + return SPARKEY_SUCCESS; +} + +sparkey_returncode sparkey_logwriter_delete(sparkey_logwriter *log, uint64_t keylen, const uint8_t *key) { + RETHROW(assert_writer_open(log)); + ptrdiff_t datasize; + RETHROW(log_add(log, 0, keylen, 0, NULL, keylen, key, &datasize)); + + log->header.num_deletes++; + log->header.delete_size += datasize; + return SPARKEY_SUCCESS; +} + diff --git a/src/sparkey/returncodes.c b/src/sparkey/returncodes.c new file mode 100644 index 00000000..c9833646 --- /dev/null +++ b/src/sparkey/returncodes.c @@ -0,0 +1,60 @@ +/* +* Copyright (c) 2012-2013 Spotify AB +* +* Licensed under the Apache License, Version 2.0 (the "License"); you may not +* use this file except in compliance with the License. You may obtain a copy of +* the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +* License for the specific language governing permissions and limitations under +* the License. +*/ +#include "sparkey.h" + +const char * sparkey_errstring(sparkey_returncode code) { + switch (code) { + case SPARKEY_SUCCESS: return "Success"; + case SPARKEY_INTERNAL_ERROR: return "Internal error"; + case SPARKEY_FILE_NOT_FOUND: return "File not found"; + case SPARKEY_PERMISSION_DENIED: return "Permission denied"; + case SPARKEY_TOO_MANY_OPEN_FILES: return "Too many open files"; + case SPARKEY_FILE_TOO_LARGE: return "File is too large"; + case SPARKEY_FILE_ALREADY_EXISTS: return "File already exists"; + case SPARKEY_FILE_BUSY: return "File is busy"; + case SPARKEY_FILE_IS_DIRECTORY: return "File is a directory"; + case SPARKEY_FILE_SIZE_EXCEEDED: return "Maximum file size exceeded"; + case SPARKEY_FILE_CLOSED: return "File is closed"; + case SPARKEY_OUT_OF_DISK: return "Out of free disk space"; + case SPARKEY_UNEXPECTED_EOF: return "Encountered unexpected end of file"; + case SPARKEY_MMAP_FAILED: return "mmap failed - running on 32 bit system?"; + + case SPARKEY_WRONG_LOG_MAGIC_NUMBER: return "Wrong magic number of log file"; + case SPARKEY_WRONG_LOG_MAJOR_VERSION: return "Wrong major version of log file"; + case SPARKEY_UNSUPPORTED_LOG_MINOR_VERSION: return "Unsupported minor version of log file"; + case SPARKEY_LOG_TOO_SMALL: return "Corrupt log file - smaller than the header indicates"; + case SPARKEY_LOG_CLOSED: return "Log file is closed"; + case SPARKEY_LOG_ITERATOR_INACTIVE: return "Log iterator is inactive"; + case SPARKEY_LOG_ITERATOR_MISMATCH: return "The iterator is not associated with the log"; + case SPARKEY_LOG_ITERATOR_CLOSED: return "Log iterator is closed"; + case SPARKEY_LOG_HEADER_CORRUPT: return "Log header is corrupt"; + + case SPARKEY_INVALID_COMPRESSION_BLOCK_SIZE: return "Invalid compression block size"; + case SPARKEY_INVALID_COMPRESSION_TYPE: return "Invalid compression type"; + + case SPARKEY_WRONG_HASH_MAGIC_NUMBER: return "Wrong magic number of hash file"; + case SPARKEY_WRONG_HASH_MAJOR_VERSION: return "Wrong major version of hash file"; + case SPARKEY_UNSUPPORTED_HASH_MINOR_VERSION: return "Unsupported minor version of hash file"; + case SPARKEY_HASH_TOO_SMALL: return "Corrupt hash file - smaller than the header indicates"; + case SPARKEY_HASH_CLOSED: return "Hash file is closed"; + case SPARKEY_FILE_IDENTIFIER_MISMATCH: return "File identifier differs between hash file and log file"; + case SPARKEY_HASH_HEADER_CORRUPT: return "Hash header is corrupt"; + case SPARKEY_HASH_SIZE_INVALID: return "Hash size is invalid"; + + default: return "Unknown error"; + } +} + diff --git a/src/sparkey/sparkey-internal.h b/src/sparkey/sparkey-internal.h new file mode 100644 index 00000000..509122c7 --- /dev/null +++ b/src/sparkey/sparkey-internal.h @@ -0,0 +1,90 @@ +/* +* Copyright (c) 2012-2013 Spotify AB +* +* Licensed under the Apache License, Version 2.0 (the "License"); you may not +* use this file except in compliance with the License. You may obtain a copy of +* the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +* License for the specific language governing permissions and limitations under +* the License. +*/ +#ifndef SPARKEY_INTERNAL_H +#define SPARKEY_INTERNAL_H +#include + +#include "sparkey.h" + +#include "logheader.h" +#include "hashheader.h" +#include "buf.h" + +struct sparkey_logreader { + uint32_t open_status; + sparkey_logheader header; + int fd; + + uint64_t data_len; + uint8_t *data; +}; + +struct sparkey_logiter { + uint32_t open_status; + uint32_t file_identifier; + + // position in reader + uint64_t block_position; + uint64_t next_block_position; + uint64_t block_offset; + uint64_t block_len; + int entry_count; + + // compression buffer + int compression_buf_allocated; + uint8_t *compression_buf; + + // current entry + uint64_t entry_block_position; + uint64_t entry_block_offset; + sparkey_entry_type type; + sparkey_iter_state state; + uint64_t keylen; + uint64_t valuelen; + uint64_t key_remaining; + uint64_t value_remaining; +}; + +struct sparkey_logwriter { + uint32_t open_status; + sparkey_logheader header; + int fd; + + sparkey_buf block_buf; + uint32_t max_compressed_size; + uint8_t *compressed; + sparkey_buf file_buf; + int flushed; + + int entry_count; +}; + +struct sparkey_hashreader { + uint32_t open_status; + sparkey_hashheader header; + sparkey_logreader log; + + int fd; + + uint64_t data_len; + uint8_t *data; + +}; + +sparkey_returncode sparkey_logreader_open_noalloc(sparkey_logreader *log, const char *filename); +void sparkey_logreader_close_nodealloc(sparkey_logreader *log); + +#endif diff --git a/src/sparkey/sparkey.h b/src/sparkey/sparkey.h new file mode 100644 index 00000000..e1a6f1cc --- /dev/null +++ b/src/sparkey/sparkey.h @@ -0,0 +1,673 @@ +/* +* Copyright (c) 2012-2013 Spotify AB +* +* Licensed under the Apache License, Version 2.0 (the "License"); you may not +* use this file except in compliance with the License. You may obtain a copy of +* the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +* License for the specific language governing permissions and limitations under +* the License. +*/ +#ifndef SPOTIFY_SPARKEY_H_INCLUDED +#define SPOTIFY_SPARKEY_H_INCLUDED + +/** + * \mainpage Sparkey C API + * \section intro_sec Getting started + * + * For a complete listing of available functions, see sparkey.h . + * + * \section logwriter Writing to a log file + * + * This section contains all functions relevant for writing entries to a log. + * Writing to the same log file is not thread safe. Only use the writer objects + * from one thread at a time, and make sure to only write to a file from one process. + * The library will not do any form of locking or checking for other writers, so be + * careful. + * + * Basic workflow: + * - Create and initialize the logwriter: + * \code + * sparkey_logwriter *mywriter; + * sparkey_returncode returncode = sparkey_logwriter_create(&mywriter, "mylog.spl", SPARKEY_COMPRESSION_NONE, 0); + * // TODO: check the returncode + * \endcode + * - Write to the log: + * \code + * const char *mykey = "mykey"; + * const char *myvalue = "this is my value"; + * sparkey_returncode returncode = sparkey_logwriter_put(mywriter, strlen(mykey), (uint8_t*)mykey, strlen(myvalue), (uint8_t*)myvalue); + * // TODO: check the returncode + * \endcode + * - Close it when you're done: + * \code + * sparkey_returncode returncode = sparkey_logwriter_close(&mywriter); + * // TODO: check the returncode + * \endcode + * + * \section logreader Reading from a log file + * + * This section contains all functions relevant for reading entries from a log. + * A sparkey_logreader may be shared between multiple threads, but \ref sparkey_logreader_open + * and \ref sparkey_logreader_close are not thread safe. + * + * The logreader is not useful by itself. You also need a sparkey_logiter to iterate through the entries. + * This is a highly mutable struct and should not be shared between threads. It is not threadsafe. + * + * Here is a basic workflow for iterating through all entries in a logfile: + * - Create a logreader + * \code + * sparkey_logreader *myreader; + * sparkey_returncode returncode = sparkey_logreader_open(&myreader, "mylog.spl"); + * \endcode + * - Create a logiter + * \code + * sparkey_logiter *myiter; + * sparkey_returncode returncode = sparkey_logiter_create(&myiter, myreader); + * \endcode + * - Perform the iteration: + * \code + * while (1) { + * sparkey_returncode returncode = sparkey_logiter_next(myiter, myreader); + * // TODO: check the returncode + * if (sparkey_logiter_state(myiter) != SPARKEY_ITER_ACTIVE) { + * break; + * } + * uint64_t wanted_keylen = sparkey_logiter_keylen(myiter); + * uint8_t *keybuf = malloc(wanted_keylen); + * uint64_t actual_keylen; + * returncode = sparkey_logiter_fill_key(myiter, myreader, wanted_keylen, keybuf, &actual_keylen); + * // TODO: check the returncode + * // TODO: assert actual_keylen == wanted_keylen + * uint64_t wanted_valuelen = sparkey_logiter_valuelen(myiter); + * uint8_t *valuebuf = malloc(wanted_valuelen); + * uint64_t actual_valuelen; + * returncode = sparkey_logiter_fill_value(myiter, myreader, wanted_valuelen, valuebuf, &actual_valuelen); + * // TODO: check the returncode + * // TODO: assert actual_valuelen == wanted_valuelen + * // Do stuff with key and value + * free(keybuf); + * free(valuebuf); + * } + * \endcode + * Note that you have to allocate memory for the key and value manually - Sparkey does not allocate memory except for when + * creating readers, writers and iterators. + * + * - Alternatively, you can preallocate the buffers by using + * max_key_len and max_value_len provided by the log header: + * \code + * uint8_t *keybuf = malloc(sparkey_logreader_maxkeylen(sparkey_hash_getreader(myreader))); + * uint8_t *valuebuf = malloc(sparkey_logreader_maxvaluelen(sparkey_hash_getreader(myreader))); + * while (1) { + * sparkey_returncode returncode = sparkey_logiter_next(&myiter, &myreader); + * // TODO: check the returncode + * if (sparkey_logiter_state(myiter) != SPARKEY_ITER_ACTIVE) { + * break; + * } + * uint64_t wanted_keylen = sparkey_logiter_keylen(myiter); + * uint64_t actual_keylen; + * returncode = sparkey_logiter_fill_key(&myiter, &myreader, wanted_keylen, keybuf, &actual_keylen); + * // TODO: check the returncode + * // TODO: assert actual_keylen == wanted_keylen + * uint64_t wanted_valuelen = sparkey_logiter_valuelen(myiter); + * uint64_t actual_valuelen; + * returncode = sparkey_logiter_fill_value(&myiter, &myreader, wanted_valuelen, valuebuf, &actual_valuelen); + * // TODO: check the returncode + * // TODO: assert actual_valuelen == wanted_valuelen + * // Do stuff with key and value + * } + * free(keybuf); + * free(valuebuf); + * \endcode + * - You can also skip allocating at all, if you can process the key and/or value in chunks. Here's an example for processing a key in chunks, + * but the same can be applied for values: + * \code + * uint64_t total_len = sparkey_logiter_keylen(myiter); + * while (total_len > 0) { + * uint8_t *buf; + * uint64_t len; + * sparkey_returncode returncode = sparkey_logiter_keychunk(&myiter, &myreader, total_len, &buf, &len); + * // TODO: check the returncode + * // Example: use the chunks to write to standard out + * fwrite(buf, 1, len, stdout); + * total_len -= len; + * } + * \endcode + * - Close everything when you're done: + * \code + * sparkey_logreader_close(&myreader); + * sparkey_logiter_close(&myiter); + * \endcode + * + * \section hashwriter Creating hash files from log files + * + * This header only contains the function sparkey_hash_write which creates a hash file. + * + * This is all you need to do to create a hash file based on an existing log file: + * \code + * sparkey_returncode returncode = sparkey_hash_write("mylog.spi", "mylog.spl", 0); + * // TODO: check the returncode + * \endcode + * + * \section hashreader Reading from a hash-file/log-file pair + * + * This header contains all functions relevant for reading live key/value pairs from a log and hash file. + * The documentation is very similar to the one for reading from a log file, because this api is an extension. + * Random lookups is the only feature that's added, and iteration simply skips dead entries. + * + * A sparkey_hashreader may be shared between multiple threads, but \ref sparkey_hash_open + * and \ref sparkey_hash_close are not thread safe. + * + * The hashreader is not useful by itself. You also need a sparkey_logiter to do random lookups and + * iterate through the entries. + * This is a highly mutable struct and should not be shared between threads. It is not threadsafe. + * + * Here is a basic workflow for iterating through all live entries in a log and hash file: + * - Create a hashreader + * \code + * sparkey_hashreader *myreader; + * sparkey_returncode returncode = sparkey_hash_open(&myreader, "mylog.spi", "mylog.spl"); + * // TODO: check the returncode + * \endcode + * \code + * sparkey_logiter *myiter; + * sparkey_returncode returncode = sparkey_logiter_create(&myiter, sparkey_hash_getreader(myreader)); + * // TODO: check the returncode + * \endcode + * - Iteration is exactly as described previously, but uses \ref sparkey_logiter_hashnext instead of + * \ref sparkey_logiter_next. + * - Random lookup + * \code + * sparkey_returncode returncode = sparkey_hash_get(myreader, (uint8_t*)"mykey", 5, myiter); + * if (sparkey_logiter_state(myiter) != SPARKEY_ITER_ACTIVE) { + * // Entry not found; + * } else { + * // Extracting value is done the same as when iterating. + * uint64_t wanted_valuelen = sparkey_logiter_valuelen(myiter); + * uint8_t *valuebuf = malloc(wanted_valuelen); + * uint64_t actual_valuelen; + * returncode = sparkey_logiter_fill_value(myiter, sparkey_hash_getreader(myreader), wanted_valuelen, valuebuf, &actual_valuelen); + * } + * \endcode + * Note that this API allows you to do a random seek and then iterate through the following entries. This may be + * useful when you insert groups of entries in order and quickly want to access all of them. + * - Close everything when you're done: + * \code + * sparkey_hash_close(&myreader); + * sparkey_logiter_close(&myiter); + * \endcode + */ + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +typedef enum { + SPARKEY_SUCCESS = 0, + SPARKEY_INTERNAL_ERROR = -1, + + SPARKEY_FILE_NOT_FOUND = -100, + SPARKEY_PERMISSION_DENIED = -101, + SPARKEY_TOO_MANY_OPEN_FILES = -102, + SPARKEY_FILE_TOO_LARGE = -103, + SPARKEY_FILE_ALREADY_EXISTS = -104, + SPARKEY_FILE_BUSY = -105, + SPARKEY_FILE_IS_DIRECTORY = -106, + SPARKEY_FILE_SIZE_EXCEEDED = -107, + SPARKEY_FILE_CLOSED = -108, + SPARKEY_OUT_OF_DISK = -109, + SPARKEY_UNEXPECTED_EOF = -110, + SPARKEY_MMAP_FAILED = -111, + + SPARKEY_WRONG_LOG_MAGIC_NUMBER = -200, + SPARKEY_WRONG_LOG_MAJOR_VERSION = -201, + SPARKEY_UNSUPPORTED_LOG_MINOR_VERSION = -202, + SPARKEY_LOG_TOO_SMALL = -203, + SPARKEY_LOG_CLOSED = -204, + SPARKEY_LOG_ITERATOR_INACTIVE = -205, + SPARKEY_LOG_ITERATOR_MISMATCH = -206, + SPARKEY_LOG_ITERATOR_CLOSED = -207, + SPARKEY_LOG_HEADER_CORRUPT = -208, + SPARKEY_INVALID_COMPRESSION_BLOCK_SIZE = -209, + SPARKEY_INVALID_COMPRESSION_TYPE = -210, + + SPARKEY_WRONG_HASH_MAGIC_NUMBER = -300, + SPARKEY_WRONG_HASH_MAJOR_VERSION = -301, + SPARKEY_UNSUPPORTED_HASH_MINOR_VERSION = -302, + SPARKEY_HASH_TOO_SMALL = -303, + SPARKEY_HASH_CLOSED = -304, + SPARKEY_FILE_IDENTIFIER_MISMATCH = -305, + SPARKEY_HASH_HEADER_CORRUPT = -306, + SPARKEY_HASH_SIZE_INVALID = -307, + +} sparkey_returncode; + +/** + * Get a human readable string from a return code. + * @param code a return code + * @returns a string representing the return code. + */ +const char * sparkey_errstring(sparkey_returncode code); + +/* logwriter */ + +/** + * A structure holding all the data necessary to add entries to a log file. + */ +struct sparkey_logwriter; +typedef struct sparkey_logwriter sparkey_logwriter; + +typedef enum { + SPARKEY_COMPRESSION_NONE, + SPARKEY_COMPRESSION_SNAPPY +} sparkey_compression_type; + +typedef enum { + SPARKEY_ENTRY_PUT, + SPARKEY_ENTRY_DELETE +} sparkey_entry_type; + +typedef enum { + SPARKEY_ITER_NEW, + SPARKEY_ITER_ACTIVE, + SPARKEY_ITER_CLOSED, + SPARKEY_ITER_INVALID +} sparkey_iter_state; + +struct sparkey_logreader; +typedef struct sparkey_logreader sparkey_logreader; + +struct sparkey_logiter; +typedef struct sparkey_logiter sparkey_logiter; + +struct sparkey_hashreader; +typedef struct sparkey_hashreader sparkey_hashreader; + + +/** + * Creates a new Sparkey log file, possibly overwriting an already existing. + * @param log a double reference to a sparkey_logwriter structure that gets allocated and initialized by this call. + * @param filename the file to create. + * @param compression_type NONE or SNAPPY, specifies if block compression should be used or not. + * @param compression_block_size is only relevant if compression type is not NONE. + * It represents the maximum number of bytes of an uncompressed block. + * @return SPARKEY_SUCCESS if all goes well. + */ +sparkey_returncode sparkey_logwriter_create(sparkey_logwriter **log, const char *filename, sparkey_compression_type compression_type, int compression_block_size); + +/** + * Append to an existing Sparkey log file. + * @param log a double reference to a sparkey_logwriter structure that gets allocated and initialized by this call. + * @param filename the file to open for appending. + * It represents the maximum number of bytes of an uncompressed block. + * @return SPARKEY_SUCCESS if all goes well. + */ +sparkey_returncode sparkey_logwriter_append(sparkey_logwriter **log, const char *filename); + +/** + * Append a key/value pair to the log file + * @param log a reference to an open log writer. + * @param keylen the number of bytes of the key data block + * @param key a pointer to a block of continuous data where the key can be found. + * Does not need to be NUL-terminated. + * @param valuelen the number of bytes of the value data block + * @param value a pointer to a block of continuous data where the value can be found. + * Does not need to be NUL-terminated. + * @return SPARKEY_SUCCESS if all goes well. + */ +sparkey_returncode sparkey_logwriter_put(sparkey_logwriter *log, uint64_t keylen, const uint8_t *key, uint64_t valuelen, const uint8_t *value); + +/** + * Append a delete operation for a key to the log file + * @param log a reference to an open log writer. + * @param keylen the number of bytes of the key data block + * @param key a pointer to a block of continuous data where the key can be found. + * Does not need to be NUL-terminated. + * @return SPARKEY_SUCCESS if all goes well. + */ +sparkey_returncode sparkey_logwriter_delete(sparkey_logwriter *log, uint64_t keylen, const uint8_t *key); + +/** + * Flush any open compression block to file buffer. + * Flush any open file buffer to disk. + * Rewrite the header on disk. + * This enables readers to read from the log. + * @param log a reference to an open log writer. + * @return SPARKEY_SUCCESS if all goes well. + */ +sparkey_returncode sparkey_logwriter_flush(sparkey_logwriter *log); + +/** + * Flushes the log, then closes the file and marks the log as closed. + * The log will be closed after this, the sparkey_logwriter struct + * referenced will be freed and *log will be set to NULL. + * @param log a double reference to an open log writer. + * @return SPARKEY_SUCCESS if all goes well. + */ +sparkey_returncode sparkey_logwriter_close(sparkey_logwriter **log); + +/* logreader */ + +/** + * Opens a log file for reading. The logreader is threadsafe, except during opening or closing. + * @param log a double reference to a logreader. + * @param filename a filename of a file containing a sparkey log. + * @returns SPARKEY_SUCCESS if all goes well. Otherwise a return code indicating the error. + */ +sparkey_returncode sparkey_logreader_open(sparkey_logreader **log, const char *filename); + +/** + * Closes a logreader. + * It's allowed to close a logreader while there are open logiterators. + * Further operations on such logiterators will fail. + * This is a failsafe operation. + * @param log a double reference to a logreader + * This will be set to NULL after close. + */ +void sparkey_logreader_close(sparkey_logreader **log); + +/** + * Get the size of the largest key in the log. + * @param log a reference to a logreader. + * @returns + */ +uint64_t sparkey_logreader_maxkeylen(sparkey_logreader *log); + +/** + * Get the size of the largest value in the log. + * @param log a reference to a logreader. + * @returns + */ +uint64_t sparkey_logreader_maxvaluelen(sparkey_logreader *log); + +/** + * Get the blocksize for a reader + * @param log a reference to a logreader. + * @returns the blocksize + */ +int sparkey_logreader_get_compression_blocksize(sparkey_logreader *log); + +/** + * Get the compression type for a reader + * @param log a reference to a logreader. + * @returns the compression type + */ +sparkey_compression_type sparkey_logreader_get_compression_type(sparkey_logreader *log); + +/** + * Initializes a logiter and associates it with a logreader. + * The logreader must be open. The logiter is not threadsafe. + * @param iter a double reference to an uninitialized logiter. Will be set on success. + * @param log an open logreader + * @returns SPARKEY_SUCCESS or all goes well. Otherwise a returncode indicating the error. + */ +sparkey_returncode sparkey_logiter_create(sparkey_logiter **iter, sparkey_logreader *log); + +/** + * Closes a log iterator. + * This is a failsafe operation. + * @param iter a double reference to a log iterator. + * This will be set to NULL after close. + */ +void sparkey_logiter_close(sparkey_logiter **iter); + +/** + * Skips to a specific block in the logfile. + * The position must be a valid block start, but that will not be verified by the function. + * If an illegal position is used, all other operations on this logiterator are undefined, + * and may even segfault. + * @param iter an open log iterator. + * @param log an open logreader associated with iter. + * @param position an offset into the logfile where a block begins. + * @returns SPARKEY_SUCCESS if all goes well. Otherwise a returncode indicating the error. + */ +sparkey_returncode sparkey_logiter_seek(sparkey_logiter *iter, sparkey_logreader *log, uint64_t position); + +/** + * Skip a number of entries. + * This is equivalent to calling sparkey_logiter_next count number of times. + * @param iter an open logiter + * @param log an open logreader associated with iter. + * @param count the number of entries to skip. + * @returns SPARKEY_SUCCESS if all goes well. Otherwise a returncode indicating the error. + */ +sparkey_returncode sparkey_logiter_skip(sparkey_logiter *iter, sparkey_logreader *log, int count); + +/** + * Prepares the logiter to start reading from the next entry. + * iter->state will be SPARKEY_ITER_CLOSED if the last entry has been passed. + * iter->state will be SPARKEY_ITER_INVALID if anything goes wrong. + * iter->state will be SPARKEY_ITER_ACTIVE if it successfully reached the next entry. + * @param iter an open logiter + * @param log an open logreader associated with iter. + * @returns SPARKEY_SUCCESS if all goes well. Otherwise a returncode indicating the error. + */ +sparkey_returncode sparkey_logiter_next(sparkey_logiter *iter, sparkey_logreader *log); + +/** + * Resets the iterator to the start of the current entry. This is only valid if + * iter->state is SPARKEY_ITER_ACTIVE. + * @param iter an open logiter + * @param log an open logreader associated with iter. + * @returns SPARKEY_SUCCESS if all goes well. Otherwise a returncode indicating the error. + */ +sparkey_returncode sparkey_logiter_reset(sparkey_logiter *iter, sparkey_logreader *log); + +/** + * Consumes and returns part of or all of the key of the current entry. + * Usage example: + * uint8_t *res; + * uint64_t len; + * sparkey_returncode code = sparkey_logiter_keychunk(iter, log, 1 << 30, &res, &len); + * + * @param iter an open logiter + * @param log an open logreader associated with iter. + * @param maxlen a limit for how much data you want to handle. + * @param res (output parameter) reference to a read only array of data. The array is of size res, and is not NUL-terminated. + * You can not use this as a string, and you may not modify it. The data in the array is valid until the next operation on the + * logiter or until the log is closed. + * @param len (output parameter) reference to a variable holding the size of res. + * @returns SPARKEY_SUCCESS if all goes well. Otherwise a returncode indicating the error. + */ +sparkey_returncode sparkey_logiter_keychunk(sparkey_logiter *iter, sparkey_logreader *log, uint64_t maxlen, uint8_t ** res, uint64_t *len); + +/** + * First consumes and discards any remaining key parts. + * Then consumes and returns part of or all of the value of the current entry. + * Usage example: + * uint8_t *res; + * uint64_t len; + * sparkey_returncode code = sparkey_logiter_valuechunk(iter, log, 1 << 30, &res, &len); + * + * @param iter an open logiter + * @param log an open logreader associated with iter. + * @param maxlen a limit for how much data you want to handle. + * @param res (output parameter) reference to a read only array of data. The array is of size res, and is not NUL-terminated. + * You can not use this as a string, and you may not modify it. The data in the array is valid until the next operation on the + * logiter or until the log is closed. + * @param len (output parameter) reference to a variable holding the size of res. + * @returns SPARKEY_SUCCESS if all goes well. Otherwise a returncode indicating the error. + */ +sparkey_returncode sparkey_logiter_valuechunk(sparkey_logiter *iter, sparkey_logreader *log, uint64_t maxlen, uint8_t ** res, uint64_t *len); + +/** + * Convenience function around sparkey_logiter_keychunk. + * Takes a user allocated buffer and fills it as much as possible by consuming parts of the key of the current entry. + * No NUL will be appended after the data, so you may not use it as a string unless you add the NUL manually. + * Usage example: + * uint8_t *buf = malloc(iter->keylen); + * uint64_t len; + * sparkey_returncode code = sparkey_logiter_fill_key(iter, log, iter->keylen, buf, &len); + * + * @param iter an open logiter + * @param log an open logreader associated with iter. + * @param maxlen a limit for how much data you want to handle. + * @param buf a writable array of data. The array must at least be of size maxlen. + * @param len (output parameter) reference to a variable holding the amount of data written to buf. + * @returns SPARKEY_SUCCESS if all goes well. Otherwise a returncode indicating the error. + */ +sparkey_returncode sparkey_logiter_fill_key(sparkey_logiter *iter, sparkey_logreader *log, uint64_t maxlen, uint8_t *buf, uint64_t *len); + +/** + * Convenience function around sparkey_logiter_valuechunk. + * Takes a user allocated buffer and fills it as much as possible by consuming parts of the key of the current entry. + * No NUL will be appended after the data, so you may not use it as a string unless you add the NUL manually. + * Usage example: + * uint8_t *buf = malloc(iter->valuelen); + * uint64_t len; + * sparkey_returncode code = sparkey_logiter_fill_value(iter, log, iter->valuelen, buf, &len); + * + * @param iter an open logiter + * @param log an open logreader associated with iter. + * @param maxlen a limit for how much data you want to handle. + * @param buf a writable array of data. The array must at least be of size maxlen. + * @param len (output parameter) reference to a variable holding the amount of data written to buf. + * @returns SPARKEY_SUCCESS if all goes well. Otherwise a returncode indicating the error. + */ +sparkey_returncode sparkey_logiter_fill_value(sparkey_logiter *iter, sparkey_logreader *log, uint64_t maxlen, uint8_t *buf, uint64_t *len); + +/** + * Compares the keys of two iterators pointing to the same log. + * It assumes that the iterators are both clean, i.e. nothing has been consumed from the current entry. + * + * @param iter1 an open logiter + * @param iter2 an open logiter + * @param log an open logreader associated with iter1 and iter2. + * @param res (output parameter) reference to a variable holding the result of the comparison. + * It will be zero if the keys are equal, negative if key1 is smaller than key2 and positive if key1 is larger than key2. + * The behaviour is thus Like memcmp. + * @returns SPARKEY_SUCCESS if all goes well. Otherwise a returncode indicating the error. + */ +sparkey_returncode sparkey_logiter_keycmp(sparkey_logiter *iter1, sparkey_logiter *iter2, sparkey_logreader *log, int *res); + +/** + * Get the state for an iterator. + * @returns iter->state + */ +sparkey_iter_state sparkey_logiter_state(sparkey_logiter *iter); + +/** + * Get the type of an iterator. + * @returns iter->type + */ +sparkey_entry_type sparkey_logiter_type(sparkey_logiter *iter); + +/** + * Get the keylen of an iterator. + * @returns iter->keylen + */ +uint64_t sparkey_logiter_keylen(sparkey_logiter *iter); + +/** + * Get the valuelen of an iterator. + * @returns iter->valuelen + */ +uint64_t sparkey_logiter_valuelen(sparkey_logiter *iter); + +/* hashwriter */ + +/** + * Creates a hash table for a specific log file. + * It's safe and efficient to run this multiple times. + * If the hash file already exists, it will be used to speed up the creation of the new file + * by reusing the existing entries, and only update the new hash table based on + * the entries in the log that are new since the last hash was built. + * Note that the hash file is never overwritten, instead the old file is unlinked from + * the filesystem and the new one is created. Thus, it's safe to rewrite the hash table while + * other processes are reading from it. + * @param hash_filename the file to create and put the sparkey hash table in. + * @param log_filename a file that must exist and be a sparkey log file. + * @param hash_size size of the hashes for keys. + Valid values are 4 (32 bit murmurhash3_x86_32) and + 8 (lower 64-bit part of murmurhash3_x64_128). + A value of zero will make it autoselect hash size, depending on number of entries. + * @returns SPARKEY_SUCCESS if all goes well. Otherwise a returncode indicating the error. + */ +sparkey_returncode sparkey_hash_write(const char *hash_filename, const char *log_filename, int hash_size); + +/* hashreader */ +/** + * Opens a hash file and a log file for reading. The the hashreader is threadsafe, except during opening or closing. + * @param reader a double reference to an uninitialized hashreader. Will be set on success. + * @param hash_filename a filename of a file containing a sparkey hash table. + * @param log_filename a filename of a file containing a sparkey log. + * @returns SPARKEY_SUCCESS if all goes well. Otherwise a return code indicating the error. + */ +sparkey_returncode sparkey_hash_open(sparkey_hashreader **reader, const char *hash_filename, const char *log_filename); + +/** + * Gets the logreader that is referenced by the hashreader + * @param reader an open reader. + * @returns the associated logreader + */ +sparkey_logreader * sparkey_hash_getreader(sparkey_hashreader *reader); + +/** + * Closes a hashreader. + * It's allowed to close a hashreader while there are open logiterators associated with it. + * Further operations on such logiterators will fail. + * This is a failsafe operation. + * @param reader a double reference to a hashreader + */ +void sparkey_hash_close(sparkey_hashreader **reader); + +/** + * Performs a hash table lookup of a key. If the key is found, + * the iterator will have state SPARKEY_ITER_ACTIVE and the key chunk will be consumed. + * Otherwise, the iterator will have state SPARKEY_ITER_INVALID. + * @param reader an open reader. + * @param key a buffer containing the key. It does not have be NUL terminated. + * @param keylen the length of the key. + * @param iter an iterator associated with the reader. Will be mutated. + * @returns SPARKEY_SUCCESS if all goes well. Otherwise a return code indicating the error. + */ +sparkey_returncode sparkey_hash_get(sparkey_hashreader *reader, const uint8_t *key, uint64_t keylen, sparkey_logiter *iter); + +/** + * Works the same as sparkey_logiter_next, except it skips entries that are not of type SPARKEY_ENTRY_PUT + * and entries that have been overwritten or deleted. Thus it only stops at live entries. + * iter->state will be SPARKEY_ITER_CLOSED if the last entry has been passed. + * iter->state will be SPARKEY_ITER_INVALID if anything goes wrong. + * iter->state will be SPARKEY_ITER_ACTIVE if it successfully reached the next entry. + * @see sparkey_logiter_next + * @param iter an open logiter + * @param reader an open reader associated with iter. + * @returns SPARKEY_SUCCESS if all goes well. Otherwise a returncode indicating the error. + */ +sparkey_returncode sparkey_logiter_hashnext(sparkey_logiter *iter, sparkey_hashreader *reader); + +uint64_t sparkey_hash_numentries(sparkey_hashreader *reader); + +uint64_t sparkey_hash_numcollisions(sparkey_hashreader *reader); + +/* util */ + +/** + * Allocates and creates a string denoting a log file from an index file. + * This is simply a string replacement of .spi$ to .spl$ + * @param index_filename the filename representing the index file + * @returns NULL if the index_filename does not end with ".spi" + */ +char * sparkey_create_log_filename(const char *index_filename); + +/** + * Allocates and creates a string denoting an index file from a log file. + * This is simply a string replacement of .spl$ to .spi$ + * @param log_filename the filename representing the log file + * @returns NULL if the log_filename does not end with ".spl" + */ +char * sparkey_create_index_filename(const char *log_filename); + +#ifdef __cplusplus +} +#endif + +#endif + diff --git a/src/sparkey/util.c b/src/sparkey/util.c new file mode 100644 index 00000000..1db613f6 --- /dev/null +++ b/src/sparkey/util.c @@ -0,0 +1,92 @@ +/* +* Copyright (c) 2012-2013 Spotify AB +* +* Licensed under the Apache License, Version 2.0 (the "License"); you may not +* use this file except in compliance with the License. You may obtain a copy of +* the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +* License for the specific language governing permissions and limitations under +* the License. +*/ +#include +#include + +#include "util.h" +#include "sparkey.h" + +#include +#include + +sparkey_returncode sparkey_open_returncode(int e) { + switch (e) { + case EPERM: + case EACCES: return SPARKEY_PERMISSION_DENIED; + case ENFILE: return SPARKEY_TOO_MANY_OPEN_FILES; + case ENOENT: return SPARKEY_FILE_NOT_FOUND; + case EOVERFLOW: return SPARKEY_FILE_TOO_LARGE; + default: + fprintf(stderr, "_sparkey_open_returncode():%d error: errno = %d\n", __LINE__, e); + return SPARKEY_INTERNAL_ERROR; + } +} + +sparkey_returncode sparkey_create_returncode(int e) { + switch (e) { + case EPERM: + case EROFS: + case EACCES: return SPARKEY_PERMISSION_DENIED; + case EEXIST: return SPARKEY_FILE_ALREADY_EXISTS; + case EISDIR: return SPARKEY_FILE_IS_DIRECTORY; + case ENFILE: + case EMFILE: return SPARKEY_TOO_MANY_OPEN_FILES; + default: + fprintf(stderr, "_sparkey_create_returncode():%d error: errno = %d\n", __LINE__, e); + return SPARKEY_INTERNAL_ERROR; + } +} + +sparkey_returncode sparkey_remove_returncode(int e) { + switch (e) { + case EPERM: + case EROFS: + case EACCES: return SPARKEY_PERMISSION_DENIED; + case EBUSY: return SPARKEY_FILE_BUSY; // Can't happen on linux + case EISDIR: return SPARKEY_FILE_IS_DIRECTORY; + case EOVERFLOW: return SPARKEY_FILE_TOO_LARGE; + default: + fprintf(stderr, "_sparkey_remove_returncode():%d error: errno = %d\n", __LINE__, e); + return SPARKEY_INTERNAL_ERROR; + } +} + +static inline char * _create_filename(const char *input, const char *from, char to) { + if (input == NULL) return NULL; + size_t l = strlen(input); + + // Paranoia - avoid ridiculously long filenames. + if (l > 10000) return NULL; + + // Too short to contain from + if (l < strlen(from)) return NULL; + + if (memcmp(&input[l - strlen(from)], from, strlen(from))) return NULL; + + char *output = strdup(input); + if (output == NULL) return NULL; + + output[l - 1] = to; + return output; +} + +char * sparkey_create_log_filename(const char *index_filename) { + return _create_filename(index_filename, ".spi", 'l'); +} + +char * sparkey_create_index_filename(const char *log_filename) { + return _create_filename(log_filename, ".spl", 'i'); +} diff --git a/src/sparkey/util.h b/src/sparkey/util.h new file mode 100644 index 00000000..e1a6a08c --- /dev/null +++ b/src/sparkey/util.h @@ -0,0 +1,85 @@ +/* +* Copyright (c) 2012-2013 Spotify AB +* +* Licensed under the Apache License, Version 2.0 (the "License"); you may not +* use this file except in compliance with the License. You may obtain a copy of +* the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +* License for the specific language governing permissions and limitations under +* the License. +*/ +#ifndef SPARKEY_UTIL_H_INCLUDED +#define SPARKEY_UTIL_H_INCLUDED + +#include +#include +#include +#include + +#include "sparkey.h" + +/** + * This macro sort of behaves like a new keyword. + * Input must be an expression that returns sparkey_returncode. + * The macro must be executed in a function that returns sparkey_returncode. + * If the expression evaluates to something else than SPARKEY_SUCCESS, + * then the containing function returns that value directly. + */ +#define RETHROW(f) do { sparkey_returncode returncode = (f); if (returncode != SPARKEY_SUCCESS) return returncode; } while (0); + +/** + * This macro requires that a sparkey_returncode returncode; is already defined in the function. + * It evaluates the first argument which must return sparkey_returncode. + * If that evaluates to something else than SPARKEY_SUCCESS, + * it sets the returncode to that and jumps to the specified label. + */ +#define TRY(f, label) do { returncode = (f); if (returncode != SPARKEY_SUCCESS) goto label; } while (0); + +/** + * Convert error codes generated by open and fopen into sparkey return codes. + * @param e an error code + * @returns a sparkey_returncode corresponding to the error, or SPARKEY_INTERNAL_ERROR + */ +sparkey_returncode sparkey_open_returncode(int e); + +/** + * Convert error codes generated by creat into sparkey return codes. + * @param e an error code + * @returns a sparkey_returncode corresponding to the error, or SPARKEY_INTERNAL_ERROR + */ +sparkey_returncode sparkey_create_returncode(int e); + +/** + * Convert error codes generated by remove or unlink into sparkey return codes. + * @param e an error code + * @returns a sparkey_returncode corresponding to the error, or SPARKEY_INTERNAL_ERROR + */ +sparkey_returncode sparkey_remove_returncode(int e); + +/** + * Fetches a 32 bit unsigned value from a pseudorandom source. + * + * @param output a pointer to an uint32_t where the random value is written + * @returns a sparkey_returncode SPARKEY_SUCCESS or, in case of error SPARKEY_INTERNAL_ERROR. + */ +static inline sparkey_returncode rand32(uint32_t *output) { + int fd = open("/dev/urandom", O_RDONLY); + if (fd < 0) { + return SPARKEY_INTERNAL_ERROR; + } + int actual = read(fd, output, 4); + close(fd); + if (actual < 4) { + return SPARKEY_INTERNAL_ERROR; + } + return SPARKEY_SUCCESS; +} + + +#endif +