From 6172c60fe82dae51155ca97db39cd75bc42fcba3 Mon Sep 17 00:00:00 2001 From: William Benton Date: Thu, 27 Feb 2014 16:43:44 -0600 Subject: [PATCH 5/9] Removed code depending on Kryo Conflicts: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- .../apache/spark/serializer/KryoSerializer.scala | 175 --------------------- .../apache/spark/storage/StoragePerfTester.scala | 103 ------------ .../org/apache/spark/storage/ThreadingTest.scala | 115 -------------- .../util/collection/ExternalAppendOnlyMap.scala | 1 + .../apache/spark/graphx/GraphKryoRegistrator.scala | 48 ------ .../apache/spark/mllib/recommendation/ALS.scala | 12 -- .../spark/streaming/util/RawTextSender.scala | 82 ---------- 7 files changed, 1 insertion(+), 535 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala delete mode 100644 core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala delete mode 100644 core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala delete mode 100644 graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala delete mode 100644 streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala deleted file mode 100644 index c14cd47..0000000 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.serializer - -import java.nio.ByteBuffer -import java.io.{EOFException, InputStream, OutputStream} - -import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer} -import com.esotericsoftware.kryo.{KryoException, Kryo} -import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} -import com.twitter.chill.{EmptyScalaKryoInstantiator, AllScalaRegistrar} - -import org.apache.spark._ -import org.apache.spark.broadcast.HttpBroadcast -import org.apache.spark.scheduler.MapStatus -import org.apache.spark.storage._ -import org.apache.spark.storage.{GetBlock, GotBlock, PutBlock} - -/** - * A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]]. - */ -class KryoSerializer(conf: SparkConf) extends org.apache.spark.serializer.Serializer with Logging { - private val bufferSize = { - conf.getInt("spark.kryoserializer.buffer.mb", 2) * 1024 * 1024 - } - - def newKryoOutput() = new KryoOutput(bufferSize) - - def newKryo(): Kryo = { - val instantiator = new EmptyScalaKryoInstantiator - val kryo = instantiator.newKryo() - val classLoader = Thread.currentThread.getContextClassLoader - - // Allow disabling Kryo reference tracking if user knows their object graphs don't have loops. - // Do this before we invoke the user registrator so the user registrator can override this. - kryo.setReferences(conf.getBoolean("spark.kryo.referenceTracking", true)) - - for (cls <- KryoSerializer.toRegister) kryo.register(cls) - - // Allow sending SerializableWritable - kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer()) - kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer()) - - // Allow the user to register their own classes by setting spark.kryo.registrator - try { - for (regCls <- conf.getOption("spark.kryo.registrator")) { - logDebug("Running user registrator: " + regCls) - val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator] - reg.registerClasses(kryo) - } - } catch { - case e: Exception => logError("Failed to run spark.kryo.registrator", e) - } - - // Register Chill's classes; we do this after our ranges and the user's own classes to let - // our code override the generic serialziers in Chill for things like Seq - new AllScalaRegistrar().apply(kryo) - - kryo.setClassLoader(classLoader) - kryo - } - - def newInstance(): SerializerInstance = { - new KryoSerializerInstance(this) - } -} - -private[spark] -class KryoSerializationStream(kryo: Kryo, outStream: OutputStream) extends SerializationStream { - val output = new KryoOutput(outStream) - - def writeObject[T](t: T): SerializationStream = { - kryo.writeClassAndObject(output, t) - this - } - - def flush() { output.flush() } - def close() { output.close() } -} - -private[spark] -class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends DeserializationStream { - val input = new KryoInput(inStream) - - def readObject[T](): T = { - try { - kryo.readClassAndObject(input).asInstanceOf[T] - } catch { - // DeserializationStream uses the EOF exception to indicate stopping condition. - case _: KryoException => throw new EOFException - } - } - - def close() { - // Kryo's Input automatically closes the input stream it is using. - input.close() - } -} - -private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance { - val kryo = ks.newKryo() - - // Make these lazy vals to avoid creating a buffer unless we use them - lazy val output = ks.newKryoOutput() - lazy val input = new KryoInput() - - def serialize[T](t: T): ByteBuffer = { - output.clear() - kryo.writeClassAndObject(output, t) - ByteBuffer.wrap(output.toBytes) - } - - def deserialize[T](bytes: ByteBuffer): T = { - input.setBuffer(bytes.array) - kryo.readClassAndObject(input).asInstanceOf[T] - } - - def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = { - val oldClassLoader = kryo.getClassLoader - kryo.setClassLoader(loader) - input.setBuffer(bytes.array) - val obj = kryo.readClassAndObject(input).asInstanceOf[T] - kryo.setClassLoader(oldClassLoader) - obj - } - - def serializeStream(s: OutputStream): SerializationStream = { - new KryoSerializationStream(kryo, s) - } - - def deserializeStream(s: InputStream): DeserializationStream = { - new KryoDeserializationStream(kryo, s) - } -} - -/** - * Interface implemented by clients to register their classes with Kryo when using Kryo - * serialization. - */ -trait KryoRegistrator { - def registerClasses(kryo: Kryo) -} - -private[serializer] object KryoSerializer { - // Commonly used classes. - private val toRegister: Seq[Class[_]] = Seq( - ByteBuffer.allocate(1).getClass, - classOf[StorageLevel], - classOf[PutBlock], - classOf[GotBlock], - classOf[GetBlock], - classOf[MapStatus], - classOf[BlockManagerId], - classOf[Array[Byte]], - (1 to 10).getClass, - (1 until 10).getClass, - (1L to 10L).getClass, - (1L until 10L).getClass - ) -} diff --git a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala b/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala deleted file mode 100644 index 40734aa..0000000 --- a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.storage - -import java.util.concurrent.atomic.AtomicLong -import java.util.concurrent.{CountDownLatch, Executors} - -import org.apache.spark.serializer.KryoSerializer -import org.apache.spark.SparkContext -import org.apache.spark.util.Utils - -/** - * Utility for micro-benchmarking shuffle write performance. - * - * Writes simulated shuffle output from several threads and records the observed throughput. - */ -object StoragePerfTester { - def main(args: Array[String]) = { - /** Total amount of data to generate. Distributed evenly amongst maps and reduce splits. */ - val dataSizeMb = Utils.memoryStringToMb(sys.env.getOrElse("OUTPUT_DATA", "1g")) - - /** Number of map tasks. All tasks execute concurrently. */ - val numMaps = sys.env.get("NUM_MAPS").map(_.toInt).getOrElse(8) - - /** Number of reduce splits for each map task. */ - val numOutputSplits = sys.env.get("NUM_REDUCERS").map(_.toInt).getOrElse(500) - - val recordLength = 1000 // ~1KB records - val totalRecords = dataSizeMb * 1000 - val recordsPerMap = totalRecords / numMaps - - val writeData = "1" * recordLength - val executor = Executors.newFixedThreadPool(numMaps) - - System.setProperty("spark.shuffle.compress", "false") - System.setProperty("spark.shuffle.sync", "true") - - // This is only used to instantiate a BlockManager. All thread scheduling is done manually. - val sc = new SparkContext("local[4]", "Write Tester") - val blockManager = sc.env.blockManager - - def writeOutputBytes(mapId: Int, total: AtomicLong) = { - val shuffle = blockManager.shuffleBlockManager.forMapTask(1, mapId, numOutputSplits, - new KryoSerializer(sc.conf)) - val writers = shuffle.writers - for (i <- 1 to recordsPerMap) { - writers(i % numOutputSplits).write(writeData) - } - writers.map {w => - w.commit() - total.addAndGet(w.fileSegment().length) - w.close() - } - - shuffle.releaseWriters(true) - } - - val start = System.currentTimeMillis() - val latch = new CountDownLatch(numMaps) - val totalBytes = new AtomicLong() - for (task <- 1 to numMaps) { - executor.submit(new Runnable() { - override def run() = { - try { - writeOutputBytes(task, totalBytes) - latch.countDown() - } catch { - case e: Exception => - println("Exception in child thread: " + e + " " + e.getMessage) - System.exit(1) - } - } - }) - } - latch.await() - val end = System.currentTimeMillis() - val time = (end - start) / 1000.0 - val bytesPerSecond = totalBytes.get() / time - val bytesPerFile = (totalBytes.get() / (numOutputSplits * numMaps.toDouble)).toLong - - System.err.println("files_total\t\t%s".format(numMaps * numOutputSplits)) - System.err.println("bytes_per_file\t\t%s".format(Utils.bytesToString(bytesPerFile))) - System.err.println("agg_throughput\t\t%s/s".format(Utils.bytesToString(bytesPerSecond.toLong))) - - executor.shutdown() - sc.stop() - } -} diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala deleted file mode 100644 index 729ba2c..0000000 --- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.storage - -import akka.actor._ - -import java.util.concurrent.ArrayBlockingQueue -import util.Random -import org.apache.spark.serializer.KryoSerializer -import org.apache.spark.{SparkConf, SparkContext} - -/** - * This class tests the BlockManager and MemoryStore for thread safety and - * deadlocks. It spawns a number of producer and consumer threads. Producer - * threads continuously pushes blocks into the BlockManager and consumer - * threads continuously retrieves the blocks form the BlockManager and tests - * whether the block is correct or not. - */ -private[spark] object ThreadingTest { - - val numProducers = 5 - val numBlocksPerProducer = 20000 - - private[spark] class ProducerThread(manager: BlockManager, id: Int) extends Thread { - val queue = new ArrayBlockingQueue[(BlockId, Seq[Int])](100) - - override def run() { - for (i <- 1 to numBlocksPerProducer) { - val blockId = TestBlockId("b-" + id + "-" + i) - val blockSize = Random.nextInt(1000) - val block = (1 to blockSize).map(_ => Random.nextInt()) - val level = randomLevel() - val startTime = System.currentTimeMillis() - manager.put(blockId, block.iterator, level, true) - println("Pushed block " + blockId + " in " + (System.currentTimeMillis - startTime) + " ms") - queue.add((blockId, block)) - } - println("Producer thread " + id + " terminated") - } - - def randomLevel(): StorageLevel = { - math.abs(Random.nextInt()) % 4 match { - case 0 => StorageLevel.MEMORY_ONLY - case 1 => StorageLevel.MEMORY_ONLY_SER - case 2 => StorageLevel.MEMORY_AND_DISK - case 3 => StorageLevel.MEMORY_AND_DISK_SER - } - } - } - - private[spark] class ConsumerThread( - manager: BlockManager, - queue: ArrayBlockingQueue[(BlockId, Seq[Int])] - ) extends Thread { - var numBlockConsumed = 0 - - override def run() { - println("Consumer thread started") - while(numBlockConsumed < numBlocksPerProducer) { - val (blockId, block) = queue.take() - val startTime = System.currentTimeMillis() - manager.get(blockId) match { - case Some(retrievedBlock) => - assert(retrievedBlock.toList.asInstanceOf[List[Int]] == block.toList, - "Block " + blockId + " did not match") - println("Got block " + blockId + " in " + - (System.currentTimeMillis - startTime) + " ms") - case None => - assert(false, "Block " + blockId + " could not be retrieved") - } - numBlockConsumed += 1 - } - println("Consumer thread terminated") - } - } - - def main(args: Array[String]) { - System.setProperty("spark.kryoserializer.buffer.mb", "1") - val actorSystem = ActorSystem("test") - val conf = new SparkConf() - val serializer = new KryoSerializer(conf) - val blockManagerMaster = new BlockManagerMaster( - actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf))), conf) - val blockManager = new BlockManager( - "", actorSystem, blockManagerMaster, serializer, 1024 * 1024, conf) - val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i)) - val consumers = producers.map(p => new ConsumerThread(blockManager, p.queue)) - producers.foreach(_.start) - consumers.foreach(_.start) - producers.foreach(_.join) - consumers.foreach(_.join) - blockManager.stop() - blockManagerMaster.stop() - actorSystem.shutdown() - actorSystem.awaitTermination() - println("Everything stopped.") - println( - "It will take sometime for the JVM to clean all temporary files and shutdown. Sit tight.") - } -} diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 6f36817..c4f3efe 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -30,6 +30,7 @@ import org.apache.spark.{Logging, SparkEnv} import org.apache.spark.serializer.Serializer import org.apache.spark.storage.{BlockId, BlockManager} + /** * An append-only map that spills sorted content to disk when there is insufficient space for it * to grow. diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala deleted file mode 100644 index dd380d8..0000000 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.graphx - -import com.esotericsoftware.kryo.Kryo - -import org.apache.spark.graphx.impl._ -import org.apache.spark.serializer.KryoRegistrator -import org.apache.spark.util.collection.BitSet -import org.apache.spark.util.BoundedPriorityQueue - -/** - * Registers GraphX classes with Kryo for improved performance. - */ -class GraphKryoRegistrator extends KryoRegistrator { - - def registerClasses(kryo: Kryo) { - kryo.register(classOf[Edge[Object]]) - kryo.register(classOf[MessageToPartition[Object]]) - kryo.register(classOf[VertexBroadcastMsg[Object]]) - kryo.register(classOf[(VertexId, Object)]) - kryo.register(classOf[EdgePartition[Object]]) - kryo.register(classOf[BitSet]) - kryo.register(classOf[VertexIdToIndexMap]) - kryo.register(classOf[VertexAttributeBlock[Object]]) - kryo.register(classOf[PartitionStrategy]) - kryo.register(classOf[BoundedPriorityQueue[Object]]) - kryo.register(classOf[EdgeDirection]) - - // This avoids a large number of hash table lookups. - kryo.setReferences(false) - } -} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index 44db51c..f13781a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -26,10 +26,8 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.{Logging, HashPartitioner, Partitioner, SparkContext, SparkConf} import org.apache.spark.storage.StorageLevel import org.apache.spark.rdd.RDD -import org.apache.spark.serializer.KryoRegistrator import org.apache.spark.SparkContext._ -import com.esotericsoftware.kryo.Kryo import org.jblas.{DoubleMatrix, SimpleBlas, Solve} @@ -641,12 +639,6 @@ object ALS { trainImplicit(ratings, rank, iterations, 0.01, -1, 1.0) } - private class ALSRegistrator extends KryoRegistrator { - override def registerClasses(kryo: Kryo) { - kryo.register(classOf[Rating]) - } - } - def main(args: Array[String]) { if (args.length < 5 || args.length > 9) { println("Usage: ALS " + @@ -660,10 +652,6 @@ object ALS { val alpha = if (args.length >= 8) args(7).toDouble else 1 val blocks = if (args.length == 9) args(8).toInt else -1 val conf = new SparkConf() - .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .set("spark.kryo.registrator", classOf[ALSRegistrator].getName) - .set("spark.kryo.referenceTracking", "false") - .set("spark.kryoserializer.buffer.mb", "8") .set("spark.locality.wait", "10000") val sc = new SparkContext(master, "ALS", conf) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala deleted file mode 100644 index 684b38e..0000000 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.util - -import java.io.IOException -import java.net.ServerSocket -import java.nio.ByteBuffer - -import scala.io.Source - -import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream - -import org.apache.spark.{SparkConf, Logging} -import org.apache.spark.serializer.KryoSerializer -import org.apache.spark.util.IntParam - -/** - * A helper program that sends blocks of Kryo-serialized text strings out on a socket at a - * specified rate. Used to feed data into RawInputDStream. - */ -private[streaming] -object RawTextSender extends Logging { - def main(args: Array[String]) { - if (args.length != 4) { - System.err.println("Usage: RawTextSender ") - System.exit(1) - } - // Parse the arguments using a pattern match - val Array(IntParam(port), file, IntParam(blockSize), IntParam(bytesPerSec)) = args - - // Repeat the input data multiple times to fill in a buffer - val lines = Source.fromFile(file).getLines().toArray - val bufferStream = new FastByteArrayOutputStream(blockSize + 1000) - val ser = new KryoSerializer(new SparkConf()).newInstance() - val serStream = ser.serializeStream(bufferStream) - var i = 0 - while (bufferStream.position < blockSize) { - serStream.writeObject(lines(i)) - i = (i + 1) % lines.length - } - bufferStream.trim() - val array = bufferStream.array - - val countBuf = ByteBuffer.wrap(new Array[Byte](4)) - countBuf.putInt(array.length) - countBuf.flip() - - val serverSocket = new ServerSocket(port) - logInfo("Listening on port " + port) - - while (true) { - val socket = serverSocket.accept() - logInfo("Got a new connection") - val out = new RateLimitedOutputStream(socket.getOutputStream, bytesPerSec) - try { - while (true) { - out.write(countBuf.array) - out.write(array) - } - } catch { - case e: IOException => - logError("Client disconnected") - socket.close() - } - } - } -} -- 1.8.3.4 (Apple Git-47)