Spark中的序列化

Posted by JustDoDT on May 17, 2019

使用Java默认的序列化

用MEMORY_ONLY

import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ListBuffer

object UserApp {
  def main(args: Array[String]): Unit = {
    val sparkConf= new SparkConf().setAppName("CacheApp").setMaster("local[2]")

    val sc = new SparkContext(sparkConf)
    val users = new ListBuffer[User]
    for(i <- 1 to 10000000){
      users.+=(new User(i,"name"+i,i.toString))
    }
    val usersRDD=sc.parallelize(users)
   usersRDD.persist(StorageLevel.MEMORY_ONLY)

    usersRDD
      .foreach(println(_))
    Thread.sleep(1000000)
    sc.stop()
  }
  class User(id:Int,username:String,age:String) extends Serializable

}

测试数据为151MB:

测试的结果为191MB

浅谈RDD

使用MEMONY_ONLY_SER测试

import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ListBuffer

object UserApp {
  def main(args: Array[String]): Unit = {
    val sparkConf= new SparkConf().setAppName("CacheApp").setMaster("local[2]")

    val sc = new SparkContext(sparkConf)
    val users = new ListBuffer[User]
    for(i <- 1 to 10000000){
      users.+=(new User(i,"name"+i,i.toString))
    }
    val usersRDD=sc.parallelize(users)
   usersRDD.persist(StorageLevel.MEMORY_ONLY_SER)

    usersRDD
      .foreach(println(_))
    Thread.sleep(1000000)
    sc.stop()
  }
  class User(id:Int,username:String,age:String) extends Serializable

}

测试结果为60.5MB

浅谈RDD

使用Kyro序列化,且未注册

用MEMORY_ONLY测试

import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ListBuffer

object UserApp {

  def main(args: Array[String]): Unit = {
    val sparkConf= new SparkConf().setAppName("CacheApp").setMaster("local[2]")
    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//    sparkConf.registerKryoClasses(Array(classOf[User]))
    val sc = new SparkContext(sparkConf)
    val users = new ListBuffer[User]
    for(i <- 1 to 10000000){
      users.+=(new User(i,"name"+i,i.toString))
    }
    val usersRDD=sc.parallelize(users).persist(StorageLevel.MEMORY_ONLY)
    usersRDD.foreach(println(_))
    Thread.sleep(1000000)
    sc.stop()
  }
  class User(id:Int,username:String,age:String) extends Serializable

}

测试结果为191MB

浅谈RDD

使用MEMONY_ONLY_SER测试

import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ListBuffer

object UserApp {

  def main(args: Array[String]): Unit = {
    val sparkConf= new SparkConf().setAppName("CacheApp").setMaster("local[2]")
    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//    sparkConf.registerKryoClasses(Array(classOf[User]))
    val sc = new SparkContext(sparkConf)
    val users = new ListBuffer[User]
    for(i <- 1 to 10000000){
      users.+=(new User(i,"name"+i,i.toString))
    }
    val usersRDD=sc.parallelize(users).persist(StorageLevel.MEMORY_ONLY_SER)
    usersRDD.foreach(println(_))
    Thread.sleep(1000000)
    sc.stop()
  }
  class User(id:Int,username:String,age:String) extends Serializable

}

测试的结果为:238.4MB

浅谈RDD

使用Kyro序列化,且注册

用MEMORY_ONLY测试

import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ListBuffer

object UserApp {

  def main(args: Array[String]): Unit = {
    val sparkConf= new SparkConf().setAppName("CacheApp").setMaster("local[2]")
    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
 sparkConf.registerKryoClasses(Array(classOf[User]))
    val sc = new SparkContext(sparkConf)
    val users = new ListBuffer[User]
    for(i <- 1 to 10000000){
      users.+=(new User(i,"name"+i,i.toString))
    }
    val usersRDD=sc.parallelize(users).persist(StorageLevel.MEMORY_ONLY)
    usersRDD.foreach(println(_))
    Thread.sleep(1000000)
    sc.stop()
  }
  class User(id:Int,username:String,age:String) extends Serializable

}

测试结果为:190.7MB

浅谈RDD

用MEMORY_ONLY测试

import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ListBuffer

object UserApp {

  def main(args: Array[String]): Unit = {
    val sparkConf= new SparkConf().setAppName("CacheApp").setMaster("local[2]")
    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
 sparkConf.registerKryoClasses(Array(classOf[User]))
    val sc = new SparkContext(sparkConf)
    val users = new ListBuffer[User]
    for(i <- 1 to 10000000){
      users.+=(new User(i,"name"+i,i.toString))
    }
    val usersRDD=sc.parallelize(users).persist(StorageLevel.MEMORY_ONLY_SER)
    usersRDD.foreach(println(_))
    Thread.sleep(1000000)
    sc.stop()
  }
  class User(id:Int,username:String,age:String) extends Serializable

}

测试结果为19.1MB

浅谈RDD

总结:

不管是使用Java还是Kryo序列化,如果用MEMORY_ONLY存储格式,他们的大小都是一样的;如果用Kryo序列化并且注册,要比Java节省空间很多,10倍数量级。

参考文章