Amazon Kinesis Garbage Data

Are you seeing “garbage data” coming out of Amazon Kinesis? Something which looks like a raw memory dump or just corrupted records? Chances are that you are writing data with the KPL, which performs automatic aggregation. Aggregation means writing Protobuf blobs with multiple sequential messages instead of one message at a time, and unfortunately this is not handled transparently to a consumer other than the KCL. The data is stored this way in Kinesis itself, and the consumer is supposed to detect and decode these bulk messages. This process is called de-aggregation.

If you want to handle the de-aggregation, you need to do it manually with the SDK or use the KCL. For lambda, there is also the kinesis-deaggregation project.

To deaggregate manually when using the SDK, use:

List<UserRecord> deaggregatedRecords = UserRecord.deaggregate(poteniallyAggregatedRecords);

Interrestingly enough, the automatic de-aggregation in KCL seems to be triggered by a magic header (from UserRecord.java):

    private static final byte[] AGGREGATED_RECORD_MAGIC = new byte[] {-13, -119, -102, -62 };

As far as I can tell, this could potentially cause trouble if you accidentally write this value into Kinesis.

We hit this problem using the kinesis-storm-spout, which doesn’t seem to support de-aggregation out of the box.

The header in hex is f389 9ac2 or 11110011 10001001 10011010 11000010, but you might end up seeing efbf bdef bfbd or fffd since a unicode (UTF-8 or UTF-16) reader might replace your data with the replacement character.

If you want to disable aggregation on the producer side, you can use the KPL config setting KinesisProducerConfiguration#setAggregationEnabled, although this does seem to be discouraged by Amazon.

I hope this saved you some time mucking about with Kinesis, debugging the SDK and recording HTTPS requests :)

 
20
Kudos
 
20
Kudos

Now read this

Live Edit in Vert.x 3 and IntelliJ

If you’re using Vert.x 3 and Handlebars you probably want live class, static resources and template reloads. This post will explain how to acheive that. I’m running my app as follows, but any way of starting should work: public static... Continue →