使用Java默认的序列化
用MEMORY_ONLY
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ListBuffer
object UserApp {
def main(args: Array[String]): Unit = {
val sparkConf= new SparkConf().setAppName("CacheApp").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val users = new ListBuffer[User]
for(i <- 1 to 10000000){
users.+=(new User(i,"name"+i,i.toString))
}
val usersRDD=sc.parallelize(users)
usersRDD.persist(StorageLevel.MEMORY_ONLY)
usersRDD
.foreach(println(_))
Thread.sleep(1000000)
sc.stop()
}
class User(id:Int,username:String,age:String) extends Serializable
}
测试数据为151MB:
测试的结果为191MB
使用MEMONY_ONLY_SER测试
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ListBuffer
object UserApp {
def main(args: Array[String]): Unit = {
val sparkConf= new SparkConf().setAppName("CacheApp").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val users = new ListBuffer[User]
for(i <- 1 to 10000000){
users.+=(new User(i,"name"+i,i.toString))
}
val usersRDD=sc.parallelize(users)
usersRDD.persist(StorageLevel.MEMORY_ONLY_SER)
usersRDD
.foreach(println(_))
Thread.sleep(1000000)
sc.stop()
}
class User(id:Int,username:String,age:String) extends Serializable
}
测试结果为60.5MB
使用Kyro序列化,且未注册
用MEMORY_ONLY测试
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ListBuffer
object UserApp {
def main(args: Array[String]): Unit = {
val sparkConf= new SparkConf().setAppName("CacheApp").setMaster("local[2]")
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// sparkConf.registerKryoClasses(Array(classOf[User]))
val sc = new SparkContext(sparkConf)
val users = new ListBuffer[User]
for(i <- 1 to 10000000){
users.+=(new User(i,"name"+i,i.toString))
}
val usersRDD=sc.parallelize(users).persist(StorageLevel.MEMORY_ONLY)
usersRDD.foreach(println(_))
Thread.sleep(1000000)
sc.stop()
}
class User(id:Int,username:String,age:String) extends Serializable
}
测试结果为191MB
使用MEMONY_ONLY_SER测试
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ListBuffer
object UserApp {
def main(args: Array[String]): Unit = {
val sparkConf= new SparkConf().setAppName("CacheApp").setMaster("local[2]")
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// sparkConf.registerKryoClasses(Array(classOf[User]))
val sc = new SparkContext(sparkConf)
val users = new ListBuffer[User]
for(i <- 1 to 10000000){
users.+=(new User(i,"name"+i,i.toString))
}
val usersRDD=sc.parallelize(users).persist(StorageLevel.MEMORY_ONLY_SER)
usersRDD.foreach(println(_))
Thread.sleep(1000000)
sc.stop()
}
class User(id:Int,username:String,age:String) extends Serializable
}
测试的结果为:238.4MB
使用Kyro序列化,且注册
用MEMORY_ONLY测试
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ListBuffer
object UserApp {
def main(args: Array[String]): Unit = {
val sparkConf= new SparkConf().setAppName("CacheApp").setMaster("local[2]")
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.registerKryoClasses(Array(classOf[User]))
val sc = new SparkContext(sparkConf)
val users = new ListBuffer[User]
for(i <- 1 to 10000000){
users.+=(new User(i,"name"+i,i.toString))
}
val usersRDD=sc.parallelize(users).persist(StorageLevel.MEMORY_ONLY)
usersRDD.foreach(println(_))
Thread.sleep(1000000)
sc.stop()
}
class User(id:Int,username:String,age:String) extends Serializable
}
测试结果为:190.7MB
用MEMORY_ONLY测试
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ListBuffer
object UserApp {
def main(args: Array[String]): Unit = {
val sparkConf= new SparkConf().setAppName("CacheApp").setMaster("local[2]")
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.registerKryoClasses(Array(classOf[User]))
val sc = new SparkContext(sparkConf)
val users = new ListBuffer[User]
for(i <- 1 to 10000000){
users.+=(new User(i,"name"+i,i.toString))
}
val usersRDD=sc.parallelize(users).persist(StorageLevel.MEMORY_ONLY_SER)
usersRDD.foreach(println(_))
Thread.sleep(1000000)
sc.stop()
}
class User(id:Int,username:String,age:String) extends Serializable
}
测试结果为19.1MB
总结:
不管是使用Java还是Kryo序列化,如果用MEMORY_ONLY存储格式,他们的大小都是一样的;如果用Kryo序列化并且注册,要比Java节省空间很多,10倍数量级。