Apache Spark’s default serialization relies on Java with the default readObject(…) and writeObject(…) methods for all Serializable classes. This is a very fine default behavior as long as you don’t rely on it too much…
Why ? Because Java’s serialization framework is notoriously inefficient, consuming too much CPU, RAM and size to be a suitable large scale serialization format.
Ok, but you can always tell me that you, as a Apache Spark user, are not using Java’s serialization framework at all, but the fact is that Apache Spark as a system relies on it a lot :
- Every task run from Driver to Worker gets serialized : Closure serialization
- Every result from every task gets serialized at some point : Result serialization
And what’s implied is that during all closure serializations all the values used inside will get serialized as well, for the record, this is also one of the main reasons to use Broadcast variables when closures might get serialized with big values.
Kryo is a project like Apache Avro or Google’s Protobuf (or it’s Java oriented equivalent Protostuff – which I have not tested yet). I’m not a bug fan of benchmarks but they can be useful and Kryo designed a few to measure size and time of serialization. Here’s what such a benchmark looks like a the time of writing (i.e early 2015) :
So how can you change Spark’s default serializer easily, well, as usual Spark is a pretty configurable system, so all you need is to specify which serializer you want to use when you define your SparkContext using the SparkConf like that :
val conf = new SparkConf() .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
And voilà ! But that’s not all, if you’ve got big objects to serializer and are prepared to face the consequences you might get OutOfMemoryErrors or GC Overflows that will happen very fast using Java’s default serialization (did I tell you it sucks for some reasons… ?) and won’t get resolve auto-magically by switching to Kryo.
Luckily you can define what buffer size Kryo will use by default :
val conf = new SparkConf() .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // Now it's 24 Mb of buffer by default instead of 0.064 Mb .set("spark.kryoserializer.buffer.mb","24")
If you’re even bolder you can customize all of these options :
- spark.kryoserializer.buffer.max.mb (64 Mb by default) : useful if your default buffer size goes further than 64 Mb;
- spark.kryo.referenceTracking (true by default) : c.f. reference tracking in Kryo
- spark.kryo.registrationRequired (false by default) : Kryo’s parameter to define if all serializable classes must be registered
- spark.kryo.classesToRegister (empty string list by default) : you can add a list of the qualified names of all classes that must be registered (c.f. last parameter)
The examples above are defined in Scala, but of course these parameters can be used in Java and Python as well.