Migrating Pelops composite keys to Astyanax

Recently we switched our flagship app’s Cassandra API lib from Pelops to a slightly more elegant solution: Astyanax. The other alternative: forerunner in Java Cassandra APIs, Hector, I have used extensively in other projects, and, my impression of it was that it is very mature, the first to implement new Cassandra features and has a small, yet thriving community of devs maintaining it. Astyanax, developed by Netflix, is heavily inspired by Hector, but comes with a slightly cleaner( a bit more fluent ) and very well designed APIs, is maintained regularly, and comes with an extensive suite of unit tests (though I have myself found and patched minor bugs).

Astyanax release cycles, albeit less regular than Hector’s, tend to be frequent and pull requests are merged very fast, usually within a week. Pelops had trouble handling stale connections and connections that timed out and from time to time was exhausting its connection pool in a heavily used cluster, making some parts of our app run very slow, waiting for the timeout of stale connections. Since Pelops did no longer fit our needs, and Thrift was too low-level for us, we had to switch to something, preferably high level, that would abstract all the nitty-gritty Thrift details. The choices were between Astyanax and Hector, and since I had already used Hector for a couple of my previous projects, and I really wanted to have experience with another Cassandra API under my belt, we decided to give Astyanax a try.

Luckily, our whole Cassandra dao layer was fully tested, so the only thing we had to do, was to reimplement the daos with Astyanax and hope all unit tests succeed. Reimplementing like 300 dao calls was a breeze, taking me about a day, at the end of which all tests passed.

However, when I built the project and ran it on our staging environment, it ran successfully and most of the Cassandra data was readily available in the interface, but some was not. Baffled by the results, I decided to take a look and saw an interesting pattern; all of the data that was missing was data that had aggregate/composite keys.

Astyanax, as well as Hector, had the concept of composite columns. In Pelops, you had to manually manage such columns, in the sense that you couldn’t make an instance of a class and use it as a column key, but you had to create it manually, using its Bytes class. Here is a typical composite column key creation in Pelops:

ByteBuffer fromPelops = Bytes.composite()
.addInt(SITE_ID_ONE)
.addLong(TIMESTAMP)
.addInt(CAMPAIGN_ID_TWO)
.addInt(DETAILID).build().getBytes();
view raw gistfile1.txt hosted with ❤ by GitHub

This doesn’t seem very object oriented, let us assume that this column describes a hotel, and the first int is the id of the location, the long is some external id, and the second two ints are the country code and the city code. You can implement the same key in Astyanax in the following way:

@NoArgsConstructor
@AllArgsConstructor
@Data
@Accessors(chain = true)
public class HotelKey {
@Component(ordinal=0) private Integer locationCode;
@Component(ordinal=1) private Long externalCode;
@Component(ordinal=2) private Integer countrycode;
@Component(ordinal=3) private Integer cityCode;
}
view raw gistfile1.java hosted with ❤ by GitHub

The annotations on top of the class are common lombok annotations to generate getters, setters and noargconstructors. The @component annotation is the relevant one, the one that tells Astyanax serializers how to serialize/desearlize the instance of the class in order to generate the proper byte value for the column.

Since the Pelops way and the Astyanax way seem to be equivalent, I was wondering why stuff was not working. I had to delve into the default implementation of Astyanax’s AnnotatedCompositeSerializer and Pelop’s Bytes class. What I found out was pretty interesting.

Pelops seemed to serialize bytes very abruptly, without metadata, not wasting space, and in Pelops serialization, an int is serialized into 4 bytes, a long is 8 bytes, and a String is double the length of the String characters. While that appears very straightforward, it created the well known limitation that you could use only a single String in your composite column, and only at the end of it, because since columns were serialized directly into bytes with no metadata, if you were to have an int, and a String in that order as a key, you would do:

int first = Bytes.fromInt(buffer);
String secondPart = Bytes.toUTF8(buffer);
view raw gistfile1.java hosted with ❤ by GitHub

But if you had a String and an int, you are out of luck. You would have to manually not read 4 bytes at the end of your buffer and assign it to your int. It becomes even more messy, if, for example,  you have two Strings in a row, cause with Pelops saving no metadata, there is no way to know how long each String was.

Astyanax addresses those problems by storing also the metadata for each property. For a single int, for example, there would be no difference with the Pelops way, but if you involve a String, Astyanax also stores a startup byte sequence (the size of the String, which specifies how many bytes ahead the String spans), then the payload, and then an end sequence. This makes storing composite keys with Astyanax less compact than Pelops, but it sure does make it less error-prone and easy on the developer, cause it can now handle complex composites.

Knowing all that, it became apparent that the default serialization/deserialization algorithm provided by AnnotatedCompositeSerializer was incompatible with Pelops. As we already had too much (2 years worth) of data, we would have to either convert it with hadoop jobs, during which parts of the interface would behave strangely, or we could make our Astyanax daos be compatible with Pelops. Since the first option meant downtime and more development, and the second one meant just implementing a single class, one can easily imagine that I just went on and implemented a serializer that was compatible with Pelops.

Below is the relevant code:

/**
* A utility serializer for data to be backwards compatible with Pelops. Use {@link AnnotatedCompositeSerializer} for all new daos.
* Of course, a closer look to the implementation of the AbstractCompositeSerializerPelopsCompatible reveals it is very error prone as it doesn't store
* object sizes, so, for example, a Composite key with two String keys will not be able to be deserialized.
* @param <T>
*/
public class AbstractCompositeSerializerPelopsCompatible<T> extends AnnotatedCompositeSerializer<T> {
private static final Integer READ_THE_REST = -1;
private final List<ComponentSerializer<?>> components;
private final Class<T> clazz;
public AbstractCompositeSerializerPelopsCompatible(Class<T> clazz) {
super(clazz);
this.clazz = clazz;
this.components = new ArrayList<ComponentSerializer<?>>();
for (Field field : ReflUtils.getAllDeclaredFields(clazz, true)) {
Component annotation = field.getAnnotation(Component.class);
if (annotation != null) {
Serializer s = SerializerTypeInferer.getSerializer(field.getType());
components.add(makeComponent(field, s, annotation.ordinal()));
}
}
Collections.sort(this.components);
}
@Override
public ByteBuffer toByteBuffer(T obj) {
int totalSize = 0;
List<ByteBuffer> buffList = new ArrayList<ByteBuffer>(components.size());
for (ComponentSerializer<?> serializer : components) {
try {
// First, serialize the ByteBuffer for this component
ByteBuffer cb = serializer.serialize(obj);
if (cb == null) {
cb = ByteBuffer.allocate(0);
}
totalSize += cb.capacity();
buffList.add(cb);
} catch (Exception ex) {
throw new RuntimeException("Could not convert obj to bytebuffer", ex);
}
}
ByteBuffer bb = ByteBuffer.allocate(totalSize);
for (ByteBuffer buff : buffList) {
bb.put(buff);
}
bb.flip();
return bb;
}
@Override
public T fromByteBuffer(ByteBuffer byteBuffer) {
byteBuffer = byteBuffer.duplicate();
try {
T obj = createContents(clazz);
for (ComponentSerializer<?> serializer : components) {
int bytesToRead = getByteSizeOf(serializer.getField().getType());
if (READ_THE_REST.equals(bytesToRead)) {
bytesToRead = byteBuffer.remaining();
}
ByteBuffer data = getBytes(byteBuffer, bytesToRead);
if (data != null) {
serializer.deserialize(obj, data);
} else {
throw new RuntimeException("Missing component data in composite type");
}
}
return obj;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private static int getByteSizeOf(final Class<?> clazz) {
Class<?> classToCheck = clazz;
if (clazz.isPrimitive()) {
classToCheck = ClassUtils.primitiveToWrapper(clazz);
}
if (classToCheck.equals(Integer.class)) {
return Integer.SIZE / Byte.SIZE;
} else if (classToCheck.equals(Long.class)) {
return Long.SIZE / Byte.SIZE;
} else if (classToCheck.equals(UUID.class)) {
return (Long.SIZE + Long.SIZE) / Byte.SIZE;
} else if (classToCheck.equals(Boolean.class)) {
return 1;
} else if (classToCheck.equals(String.class)) {
return READ_THE_REST;
} else {
throw new RuntimeException("Unsupported conversion type " + clazz);
}
}
private T createContents(Class<T> clazz) throws InstantiationException, IllegalAccessException {
return clazz.newInstance();
}
private static <P> ComponentSerializer<P> makeComponent(Field field, Serializer<P> serializer, int ordinal) {
return new ComponentSerializer<P>(field, serializer, ordinal);
}
private static int getShortLength(ByteBuffer bb) {
int length = (bb.get() & 0xFF) << 8;
return length | (bb.get() & 0xFF);
}
private static ByteBuffer getBytes(ByteBuffer bb, int length) {
ByteBuffer copy = bb.duplicate();
copy.limit(copy.position() + length);
bb.position(bb.position() + length);
return copy;
}
}
view raw gistfile1.java hosted with ❤ by GitHub

And cause no code is good without unit tests, here is a unit test.

private final AbstractCompositeSerializerPelopsCompatible<SomeKey> someKeySerialzier = new AbstractCompositeSerializerPelopsCompatible<SomeKey>(SomeKey.class);
@Test
public void testToByteBuffer(){
String SOME_STRING = "someKey------WordAsd";
Long SOME_LONG = 31414141L;
SomeKey key = new SomeKey(SOME_LONG, SOME_STRING );
ByteBuffer fromPelops = Bytes.composite()
.addLong(SOME_LONG)
.addUTF8(SOME_STRING )
.build().getBytes();
ByteBuffer fromCompositeSerializer = someKeySerialzier.toByteBuffer(key);
assertEquals(fromPelops, fromCompositeSerializer);
}
@Test
public void testFromByteBuffer(){
String SOME_STRING = "someKey------WordAsd";
Long SOME_LONG = 31414141L;
ByteBuffer fromPelops = Bytes.composite()
.addLong(SOME_LONG)
.addUTF8(SOME_STRING)
.build().getBytes();
SomeKey indexKey = someKeySerialzier.fromByteBuffer(fromPelops);
assertEquals(SOME_LONG, indexKey.getIntVal());
assertEquals(SOME_STRING, indexKey.getStringVal());
}
//the SomeKey.class
@Data
public class SomeKey {
@Component(ordinal=0) private int intVal;
@Component(ordinal=1) private String stringVal;
}
view raw gistfile1.java hosted with ❤ by GitHub

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.