Blob Blame History Raw
From 39f1a360d8d8d7f688e0bd238c91d676240e55ad Mon Sep 17 00:00:00 2001
From: William Benton <willb@redhat.com>
Date: Thu, 27 Feb 2014 16:43:44 -0600
Subject: [PATCH 5/9] Removed code depending on Kryo

Conflicts:
	core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
---
 .../apache/spark/serializer/KryoSerializer.scala   | 175 ---------------------
 .../apache/spark/storage/StoragePerfTester.scala   | 103 ------------
 .../org/apache/spark/storage/ThreadingTest.scala   | 115 --------------
 .../util/collection/ExternalAppendOnlyMap.scala    |   1 +
 .../apache/spark/graphx/GraphKryoRegistrator.scala |  48 ------
 .../apache/spark/mllib/recommendation/ALS.scala    |  12 --
 .../spark/streaming/util/RawTextSender.scala       |  82 ----------
 7 files changed, 1 insertion(+), 535 deletions(-)
 delete mode 100644 core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
 delete mode 100644 core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala
 delete mode 100644 core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
 delete mode 100644 graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala
 delete mode 100644 streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala

diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
deleted file mode 100644
index c14cd47..0000000
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.serializer
-
-import java.nio.ByteBuffer
-import java.io.{EOFException, InputStream, OutputStream}
-
-import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
-import com.esotericsoftware.kryo.{KryoException, Kryo}
-import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
-import com.twitter.chill.{EmptyScalaKryoInstantiator, AllScalaRegistrar}
-
-import org.apache.spark._
-import org.apache.spark.broadcast.HttpBroadcast
-import org.apache.spark.scheduler.MapStatus
-import org.apache.spark.storage._
-import org.apache.spark.storage.{GetBlock, GotBlock, PutBlock}
-
-/**
- * A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]].
- */
-class KryoSerializer(conf: SparkConf) extends org.apache.spark.serializer.Serializer with Logging {
-  private val bufferSize = {
-    conf.getInt("spark.kryoserializer.buffer.mb", 2) * 1024 * 1024
-  }
-
-  def newKryoOutput() = new KryoOutput(bufferSize)
-
-  def newKryo(): Kryo = {
-    val instantiator = new EmptyScalaKryoInstantiator
-    val kryo = instantiator.newKryo()
-    val classLoader = Thread.currentThread.getContextClassLoader
-
-    // Allow disabling Kryo reference tracking if user knows their object graphs don't have loops.
-    // Do this before we invoke the user registrator so the user registrator can override this.
-    kryo.setReferences(conf.getBoolean("spark.kryo.referenceTracking", true))
-
-    for (cls <- KryoSerializer.toRegister) kryo.register(cls)
-
-    // Allow sending SerializableWritable
-    kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer())
-    kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer())
-
-    // Allow the user to register their own classes by setting spark.kryo.registrator
-    try {
-      for (regCls <- conf.getOption("spark.kryo.registrator")) {
-        logDebug("Running user registrator: " + regCls)
-        val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator]
-        reg.registerClasses(kryo)
-      }
-    } catch {
-      case e: Exception => logError("Failed to run spark.kryo.registrator", e)
-    }
-
-    // Register Chill's classes; we do this after our ranges and the user's own classes to let
-    // our code override the generic serialziers in Chill for things like Seq
-    new AllScalaRegistrar().apply(kryo)
-
-    kryo.setClassLoader(classLoader)
-    kryo
-  }
-
-  def newInstance(): SerializerInstance = {
-    new KryoSerializerInstance(this)
-  }
-}
-
-private[spark]
-class KryoSerializationStream(kryo: Kryo, outStream: OutputStream) extends SerializationStream {
-  val output = new KryoOutput(outStream)
-
-  def writeObject[T](t: T): SerializationStream = {
-    kryo.writeClassAndObject(output, t)
-    this
-  }
-
-  def flush() { output.flush() }
-  def close() { output.close() }
-}
-
-private[spark]
-class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends DeserializationStream {
-  val input = new KryoInput(inStream)
-
-  def readObject[T](): T = {
-    try {
-      kryo.readClassAndObject(input).asInstanceOf[T]
-    } catch {
-      // DeserializationStream uses the EOF exception to indicate stopping condition.
-      case _: KryoException => throw new EOFException
-    }
-  }
-
-  def close() {
-    // Kryo's Input automatically closes the input stream it is using.
-    input.close()
-  }
-}
-
-private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance {
-  val kryo = ks.newKryo()
-
-  // Make these lazy vals to avoid creating a buffer unless we use them
-  lazy val output = ks.newKryoOutput()
-  lazy val input = new KryoInput()
-
-  def serialize[T](t: T): ByteBuffer = {
-    output.clear()
-    kryo.writeClassAndObject(output, t)
-    ByteBuffer.wrap(output.toBytes)
-  }
-
-  def deserialize[T](bytes: ByteBuffer): T = {
-    input.setBuffer(bytes.array)
-    kryo.readClassAndObject(input).asInstanceOf[T]
-  }
-
-  def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = {
-    val oldClassLoader = kryo.getClassLoader
-    kryo.setClassLoader(loader)
-    input.setBuffer(bytes.array)
-    val obj = kryo.readClassAndObject(input).asInstanceOf[T]
-    kryo.setClassLoader(oldClassLoader)
-    obj
-  }
-
-  def serializeStream(s: OutputStream): SerializationStream = {
-    new KryoSerializationStream(kryo, s)
-  }
-
-  def deserializeStream(s: InputStream): DeserializationStream = {
-    new KryoDeserializationStream(kryo, s)
-  }
-}
-
-/**
- * Interface implemented by clients to register their classes with Kryo when using Kryo
- * serialization.
- */
-trait KryoRegistrator {
-  def registerClasses(kryo: Kryo)
-}
-
-private[serializer] object KryoSerializer {
-  // Commonly used classes.
-  private val toRegister: Seq[Class[_]] = Seq(
-    ByteBuffer.allocate(1).getClass,
-    classOf[StorageLevel],
-    classOf[PutBlock],
-    classOf[GotBlock],
-    classOf[GetBlock],
-    classOf[MapStatus],
-    classOf[BlockManagerId],
-    classOf[Array[Byte]],
-    (1 to 10).getClass,
-    (1 until 10).getClass,
-    (1L to 10L).getClass,
-    (1L until 10L).getClass
-  )
-}
diff --git a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala b/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala
deleted file mode 100644
index 40734aa..0000000
--- a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import java.util.concurrent.atomic.AtomicLong
-import java.util.concurrent.{CountDownLatch, Executors}
-
-import org.apache.spark.serializer.KryoSerializer
-import org.apache.spark.SparkContext
-import org.apache.spark.util.Utils
-
-/**
- * Utility for micro-benchmarking shuffle write performance.
- *
- * Writes simulated shuffle output from several threads and records the observed throughput.
- */
-object StoragePerfTester {
-  def main(args: Array[String]) = {
-    /** Total amount of data to generate. Distributed evenly amongst maps and reduce splits. */
-    val dataSizeMb = Utils.memoryStringToMb(sys.env.getOrElse("OUTPUT_DATA", "1g"))
-
-    /** Number of map tasks. All tasks execute concurrently. */
-    val numMaps = sys.env.get("NUM_MAPS").map(_.toInt).getOrElse(8)
-
-    /** Number of reduce splits for each map task. */
-    val numOutputSplits = sys.env.get("NUM_REDUCERS").map(_.toInt).getOrElse(500)
-
-    val recordLength = 1000 // ~1KB records
-    val totalRecords = dataSizeMb * 1000
-    val recordsPerMap = totalRecords / numMaps
-
-    val writeData = "1" * recordLength
-    val executor = Executors.newFixedThreadPool(numMaps)
-
-    System.setProperty("spark.shuffle.compress", "false")
-    System.setProperty("spark.shuffle.sync", "true")
-
-    // This is only used to instantiate a BlockManager. All thread scheduling is done manually.
-    val sc = new SparkContext("local[4]", "Write Tester")
-    val blockManager = sc.env.blockManager
-
-    def writeOutputBytes(mapId: Int, total: AtomicLong) = {
-      val shuffle = blockManager.shuffleBlockManager.forMapTask(1, mapId, numOutputSplits,
-        new KryoSerializer(sc.conf))
-      val writers = shuffle.writers
-      for (i <- 1 to recordsPerMap) {
-        writers(i % numOutputSplits).write(writeData)
-      }
-      writers.map {w =>
-        w.commit()
-        total.addAndGet(w.fileSegment().length)
-        w.close()
-      }
-
-      shuffle.releaseWriters(true)
-    }
-
-    val start = System.currentTimeMillis()
-    val latch = new CountDownLatch(numMaps)
-    val totalBytes = new AtomicLong()
-    for (task <- 1 to numMaps) {
-      executor.submit(new Runnable() {
-        override def run() = {
-          try {
-            writeOutputBytes(task, totalBytes)
-            latch.countDown()
-          } catch {
-            case e: Exception =>
-              println("Exception in child thread: " + e + " " + e.getMessage)
-              System.exit(1)
-          }
-        }
-      })
-    }
-    latch.await()
-    val end = System.currentTimeMillis()
-    val time = (end - start) / 1000.0
-    val bytesPerSecond = totalBytes.get() / time
-    val bytesPerFile = (totalBytes.get() / (numOutputSplits * numMaps.toDouble)).toLong
-
-    System.err.println("files_total\t\t%s".format(numMaps * numOutputSplits))
-    System.err.println("bytes_per_file\t\t%s".format(Utils.bytesToString(bytesPerFile)))
-    System.err.println("agg_throughput\t\t%s/s".format(Utils.bytesToString(bytesPerSecond.toLong)))
-
-    executor.shutdown()
-    sc.stop()
-  }
-}
diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
deleted file mode 100644
index 729ba2c..0000000
--- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import akka.actor._
-
-import java.util.concurrent.ArrayBlockingQueue
-import util.Random
-import org.apache.spark.serializer.KryoSerializer
-import org.apache.spark.{SparkConf, SparkContext}
-
-/**
- * This class tests the BlockManager and MemoryStore for thread safety and
- * deadlocks. It spawns a number of producer and consumer threads. Producer
- * threads continuously pushes blocks into the BlockManager and consumer
- * threads continuously retrieves the blocks form the BlockManager and tests
- * whether the block is correct or not.
- */
-private[spark] object ThreadingTest {
-
-  val numProducers = 5
-  val numBlocksPerProducer = 20000
-
-  private[spark] class ProducerThread(manager: BlockManager, id: Int) extends Thread {
-    val queue = new ArrayBlockingQueue[(BlockId, Seq[Int])](100)
-
-    override def run() {
-      for (i <- 1 to numBlocksPerProducer) {
-        val blockId = TestBlockId("b-" + id + "-" + i)
-        val blockSize = Random.nextInt(1000)
-        val block = (1 to blockSize).map(_ => Random.nextInt())
-        val level = randomLevel()
-        val startTime = System.currentTimeMillis()
-        manager.put(blockId, block.iterator, level, true)
-        println("Pushed block " + blockId + " in " + (System.currentTimeMillis - startTime) + " ms")
-        queue.add((blockId, block))
-      }
-      println("Producer thread " + id + " terminated")
-    }
-
-    def randomLevel(): StorageLevel = {
-      math.abs(Random.nextInt()) % 4 match {
-        case 0 => StorageLevel.MEMORY_ONLY
-        case 1 => StorageLevel.MEMORY_ONLY_SER
-        case 2 => StorageLevel.MEMORY_AND_DISK
-        case 3 => StorageLevel.MEMORY_AND_DISK_SER
-      }
-    }
-  }
-
-  private[spark] class ConsumerThread(
-      manager: BlockManager,
-      queue: ArrayBlockingQueue[(BlockId, Seq[Int])]
-    ) extends Thread {
-    var numBlockConsumed = 0
-
-    override def run() {
-      println("Consumer thread started")
-      while(numBlockConsumed < numBlocksPerProducer) {
-        val (blockId, block) = queue.take()
-        val startTime = System.currentTimeMillis()
-        manager.get(blockId) match {
-          case Some(retrievedBlock) =>
-            assert(retrievedBlock.toList.asInstanceOf[List[Int]] == block.toList,
-              "Block " + blockId + " did not match")
-            println("Got block " + blockId + " in " +
-              (System.currentTimeMillis - startTime) + " ms")
-          case None =>
-            assert(false, "Block " + blockId + " could not be retrieved")
-        }
-        numBlockConsumed += 1
-      }
-      println("Consumer thread terminated")
-    }
-  }
-
-  def main(args: Array[String]) {
-    System.setProperty("spark.kryoserializer.buffer.mb", "1")
-    val actorSystem = ActorSystem("test")
-    val conf = new SparkConf()
-    val serializer = new KryoSerializer(conf)
-    val blockManagerMaster = new BlockManagerMaster(
-      actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf))), conf)
-    val blockManager = new BlockManager(
-      "<driver>", actorSystem, blockManagerMaster, serializer, 1024 * 1024, conf)
-    val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i))
-    val consumers = producers.map(p => new ConsumerThread(blockManager, p.queue))
-    producers.foreach(_.start)
-    consumers.foreach(_.start)
-    producers.foreach(_.join)
-    consumers.foreach(_.join)
-    blockManager.stop()
-    blockManagerMaster.stop()
-    actorSystem.shutdown()
-    actorSystem.awaitTermination()
-    println("Everything stopped.")
-    println(
-      "It will take sometime for the JVM to clean all temporary files and shutdown. Sit tight.")
-  }
-}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 6f36817..c4f3efe 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -30,6 +30,7 @@ import org.apache.spark.{Logging, SparkEnv}
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.storage.{BlockId, BlockManager}
 
+
 /**
  * An append-only map that spills sorted content to disk when there is insufficient space for it
  * to grow.
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala
deleted file mode 100644
index dd380d8..0000000
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.graphx
-
-import com.esotericsoftware.kryo.Kryo
-
-import org.apache.spark.graphx.impl._
-import org.apache.spark.serializer.KryoRegistrator
-import org.apache.spark.util.collection.BitSet
-import org.apache.spark.util.BoundedPriorityQueue
-
-/**
- * Registers GraphX classes with Kryo for improved performance.
- */
-class GraphKryoRegistrator extends KryoRegistrator {
-
-  def registerClasses(kryo: Kryo) {
-    kryo.register(classOf[Edge[Object]])
-    kryo.register(classOf[MessageToPartition[Object]])
-    kryo.register(classOf[VertexBroadcastMsg[Object]])
-    kryo.register(classOf[(VertexId, Object)])
-    kryo.register(classOf[EdgePartition[Object]])
-    kryo.register(classOf[BitSet])
-    kryo.register(classOf[VertexIdToIndexMap])
-    kryo.register(classOf[VertexAttributeBlock[Object]])
-    kryo.register(classOf[PartitionStrategy])
-    kryo.register(classOf[BoundedPriorityQueue[Object]])
-    kryo.register(classOf[EdgeDirection])
-
-    // This avoids a large number of hash table lookups.
-    kryo.setReferences(false)
-  }
-}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
index 44db51c..f13781a 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
@@ -26,10 +26,8 @@ import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.{Logging, HashPartitioner, Partitioner, SparkContext, SparkConf}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.rdd.RDD
-import org.apache.spark.serializer.KryoRegistrator
 import org.apache.spark.SparkContext._
 
-import com.esotericsoftware.kryo.Kryo
 import org.jblas.{DoubleMatrix, SimpleBlas, Solve}
 
 
@@ -641,12 +639,6 @@ object ALS {
     trainImplicit(ratings, rank, iterations, 0.01, -1, 1.0)
   }
 
-  private class ALSRegistrator extends KryoRegistrator {
-    override def registerClasses(kryo: Kryo) {
-      kryo.register(classOf[Rating])
-    }
-  }
-
   def main(args: Array[String]) {
     if (args.length < 5 || args.length > 9) {
       println("Usage: ALS <master> <ratings_file> <rank> <iterations> <output_dir> " +
@@ -660,10 +652,6 @@ object ALS {
     val alpha = if (args.length >= 8) args(7).toDouble else 1
     val blocks = if (args.length == 9) args(8).toInt else -1
     val conf = new SparkConf()
-      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
-      .set("spark.kryo.registrator",  classOf[ALSRegistrator].getName)
-      .set("spark.kryo.referenceTracking", "false")
-      .set("spark.kryoserializer.buffer.mb", "8")
       .set("spark.locality.wait", "10000")
     val sc = new SparkContext(master, "ALS", conf)
 
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
deleted file mode 100644
index 684b38e..0000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.util
-
-import java.io.IOException
-import java.net.ServerSocket
-import java.nio.ByteBuffer
-
-import scala.io.Source
-
-import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
-
-import org.apache.spark.{SparkConf, Logging}
-import org.apache.spark.serializer.KryoSerializer
-import org.apache.spark.util.IntParam
-
-/**
- * A helper program that sends blocks of Kryo-serialized text strings out on a socket at a
- * specified rate. Used to feed data into RawInputDStream.
- */
-private[streaming]
-object RawTextSender extends Logging {
-  def main(args: Array[String]) {
-    if (args.length != 4) {
-      System.err.println("Usage: RawTextSender <port> <file> <blockSize> <bytesPerSec>")
-      System.exit(1)
-    }
-    // Parse the arguments using a pattern match
-    val Array(IntParam(port), file, IntParam(blockSize), IntParam(bytesPerSec)) = args
-
-    // Repeat the input data multiple times to fill in a buffer
-    val lines = Source.fromFile(file).getLines().toArray
-    val bufferStream = new FastByteArrayOutputStream(blockSize + 1000)
-    val ser = new KryoSerializer(new SparkConf()).newInstance()
-    val serStream = ser.serializeStream(bufferStream)
-    var i = 0
-    while (bufferStream.position < blockSize) {
-      serStream.writeObject(lines(i))
-      i = (i + 1) % lines.length
-    }
-    bufferStream.trim()
-    val array = bufferStream.array
-
-    val countBuf = ByteBuffer.wrap(new Array[Byte](4))
-    countBuf.putInt(array.length)
-    countBuf.flip()
-
-    val serverSocket = new ServerSocket(port)
-    logInfo("Listening on port " + port)
-
-    while (true) {
-      val socket = serverSocket.accept()
-      logInfo("Got a new connection")
-      val out = new RateLimitedOutputStream(socket.getOutputStream, bytesPerSec)
-      try {
-        while (true) {
-          out.write(countBuf.array)
-          out.write(array)
-        }
-      } catch {
-        case e: IOException =>
-          logError("Client disconnected")
-          socket.close()
-      }
-    }
-  }
-}
-- 
1.8.3.4 (Apple Git-47)