diff --git a/.gitignore b/.gitignore index 51de6b0..d5686f9 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /v0.9.0-incubating.tar.gz /v0.9.1-rc1.tar.gz /v0.9.1-rc2.tar.gz +/v0.9.1-rc3.tar.gz diff --git a/sources b/sources index ecb82b8..609994a 100644 --- a/sources +++ b/sources @@ -1 +1 @@ -54c3557e17435510b5112f931ccfd4ef v0.9.1-rc2.tar.gz +fa1ce7b3838246e7c0ff181863277406 v0.9.1-rc3.tar.gz diff --git a/spark-v0.9.1-rc2-0001-Replace-lift-json-with-json4s-jackson.patch b/spark-v0.9.1-rc2-0001-Replace-lift-json-with-json4s-jackson.patch deleted file mode 100644 index 37278a4..0000000 --- a/spark-v0.9.1-rc2-0001-Replace-lift-json-with-json4s-jackson.patch +++ /dev/null @@ -1,253 +0,0 @@ -From 5d70618a8c585a1d7cb0bbb18d5c90681e8872e9 Mon Sep 17 00:00:00 2001 -From: William Benton -Date: Sun, 23 Feb 2014 17:22:07 -0600 -Subject: [PATCH 1/9] Replace lift-json with json4s-jackson. - -The aim of the Json4s project is to provide a common API for -Scala JSON libraries. It is Apache-licensed, easier for -downstream distributions to package, and mostly API-compatible -with lift-json. Furthermore, the Jackson-backed implementation -parses faster than lift-json on all but the smallest inputs. - -Backported patch from master to 0.9.0. - -Conflicts: - core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala - core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala - core/src/main/scala/org/apache/spark/ui/JettyUtils.scala - core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala ---- - core/pom.xml | 5 +++-- - .../apache/spark/deploy/FaultToleranceTest.scala | 9 ++++---- - .../org/apache/spark/deploy/JsonProtocol.scala | 2 +- - .../spark/deploy/master/ui/ApplicationPage.scala | 2 +- - .../apache/spark/deploy/master/ui/IndexPage.scala | 2 +- - .../apache/spark/deploy/worker/ui/IndexPage.scala | 2 +- - .../scala/org/apache/spark/ui/JettyUtils.scala | 8 ++++---- - .../apache/spark/deploy/JsonProtocolSuite.scala | 24 ++++++++++++++++++---- - project/SparkBuild.scala | 2 +- - 9 files changed, 37 insertions(+), 19 deletions(-) - -diff --git a/core/pom.xml b/core/pom.xml -index 8111400..e7aa19a 100644 ---- a/core/pom.xml -+++ b/core/pom.xml -@@ -104,8 +104,9 @@ - scala-library - - -- net.liftweb -- lift-json_${scala.binary.version} -+ org.json4s -+ json4s-jackson_${scala.binary.version} -+ 3.2.6 - - - it.unimi.dsi -diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala -index 4dfb19e..60a87af 100644 ---- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala -+++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala -@@ -29,7 +29,8 @@ import scala.concurrent.ExecutionContext.Implicits.global - import scala.collection.mutable.ListBuffer - import scala.sys.process._ - --import net.liftweb.json.JsonParser -+import org.json4s._ -+import org.json4s.jackson.JsonMethods - - import org.apache.spark.{Logging, SparkContext} - import org.apache.spark.deploy.master.RecoveryState -@@ -312,7 +313,7 @@ private[spark] object FaultToleranceTest extends App with Logging { - private[spark] class TestMasterInfo(val ip: String, val dockerId: DockerId, val logFile: File) - extends Logging { - -- implicit val formats = net.liftweb.json.DefaultFormats -+ implicit val formats = org.json4s.DefaultFormats - var state: RecoveryState.Value = _ - var liveWorkerIPs: List[String] = _ - var numLiveApps = 0 -@@ -322,7 +323,7 @@ private[spark] class TestMasterInfo(val ip: String, val dockerId: DockerId, val - def readState() { - try { - val masterStream = new InputStreamReader(new URL("http://%s:8080/json".format(ip)).openStream) -- val json = JsonParser.parse(masterStream, closeAutomatically = true) -+ val json = JsonMethods.parse(masterStream) - - val workers = json \ "workers" - val liveWorkers = workers.children.filter(w => (w \ "state").extract[String] == "ALIVE") -@@ -350,7 +351,7 @@ private[spark] class TestMasterInfo(val ip: String, val dockerId: DockerId, val - private[spark] class TestWorkerInfo(val ip: String, val dockerId: DockerId, val logFile: File) - extends Logging { - -- implicit val formats = net.liftweb.json.DefaultFormats -+ implicit val formats = org.json4s.DefaultFormats - - logDebug("Created worker: " + this) - -diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala -index e607b8c..a43d004 100644 ---- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala -+++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala -@@ -17,7 +17,7 @@ - - package org.apache.spark.deploy - --import net.liftweb.json.JsonDSL._ -+import org.json4s.JsonDSL._ - - import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse} - import org.apache.spark.deploy.master.{ApplicationInfo, WorkerInfo} -diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala -index f29a6ad..cba89dc 100644 ---- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala -+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala -@@ -22,7 +22,7 @@ import scala.xml.Node - - import akka.pattern.ask - import javax.servlet.http.HttpServletRequest --import net.liftweb.json.JsonAST.JValue -+import org.json4s.JValue - - import org.apache.spark.deploy.JsonProtocol - import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} -diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala -index b549825..aa8beff 100644 ---- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala -+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala -@@ -23,7 +23,7 @@ import scala.xml.Node - - import akka.pattern.ask - import javax.servlet.http.HttpServletRequest --import net.liftweb.json.JsonAST.JValue -+import org.json4s.JValue - - import org.apache.spark.deploy.JsonProtocol - import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} -diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala -index 925c6fb..de356dc 100644 ---- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala -+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala -@@ -22,7 +22,7 @@ import scala.xml.Node - - import akka.pattern.ask - import javax.servlet.http.HttpServletRequest --import net.liftweb.json.JsonAST.JValue -+import org.json4s.JValue - - import org.apache.spark.deploy.JsonProtocol - import org.apache.spark.deploy.DeployMessages.{RequestWorkerState, WorkerStateResponse} -diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala -index 7211dbc..4e43fd5 100644 ---- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala -+++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala -@@ -23,10 +23,10 @@ import scala.annotation.tailrec - import scala.util.{Try, Success, Failure} - import scala.xml.Node - --import net.liftweb.json.{JValue, pretty, render} -- --import org.eclipse.jetty.server.{Server, Request, Handler} --import org.eclipse.jetty.server.handler.{ResourceHandler, HandlerList, ContextHandler, AbstractHandler} -+import org.json4s.JValue -+import org.json4s.jackson.JsonMethods.{pretty, render} -+import org.eclipse.jetty.server.{Handler, Request, Server} -+import org.eclipse.jetty.server.handler.{AbstractHandler, ContextHandler, HandlerList, ResourceHandler} - import org.eclipse.jetty.util.thread.QueuedThreadPool - - import org.apache.spark.Logging -diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala -index d05bbd6..8f1df8a 100644 ---- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala -+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala -@@ -20,8 +20,12 @@ package org.apache.spark.deploy - import java.io.File - import java.util.Date - --import net.liftweb.json.{JsonAST, JsonParser} --import net.liftweb.json.JsonAST.JValue -+import org.json4s._ -+ -+import org.json4s.JValue -+import org.json4s.jackson.JsonMethods -+import com.fasterxml.jackson.core.JsonParseException -+ - import org.scalatest.FunSuite - - import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse} -@@ -32,21 +36,31 @@ class JsonProtocolSuite extends FunSuite { - test("writeApplicationInfo") { - val output = JsonProtocol.writeApplicationInfo(createAppInfo()) - assertValidJson(output) -+ assertValidDataInJson(output, JsonMethods.parse(JsonConstants.appInfoJsonStr)) - } - - test("writeWorkerInfo") { - val output = JsonProtocol.writeWorkerInfo(createWorkerInfo()) - assertValidJson(output) -+ assertValidDataInJson(output, JsonMethods.parse(JsonConstants.workerInfoJsonStr)) - } - - test("writeApplicationDescription") { - val output = JsonProtocol.writeApplicationDescription(createAppDesc()) - assertValidJson(output) -+ assertValidDataInJson(output, JsonMethods.parse(JsonConstants.appDescJsonStr)) - } - - test("writeExecutorRunner") { - val output = JsonProtocol.writeExecutorRunner(createExecutorRunner()) - assertValidJson(output) -+ assertValidDataInJson(output, JsonMethods.parse(JsonConstants.executorRunnerJsonStr)) -+ } -+ -+ test("writeDriverInfo") { -+ val output = JsonProtocol.writeDriverInfo(createDriverInfo()) -+ assertValidJson(output) -+ assertValidDataInJson(output, JsonMethods.parse(JsonConstants.driverInfoJsonStr)) - } - - test("writeMasterState") { -@@ -59,6 +73,7 @@ class JsonProtocolSuite extends FunSuite { - activeDrivers, completedDrivers, RecoveryState.ALIVE) - val output = JsonProtocol.writeMasterState(stateResponse) - assertValidJson(output) -+ assertValidDataInJson(output, JsonMethods.parse(JsonConstants.masterStateJsonStr)) - } - - test("writeWorkerState") { -@@ -70,6 +85,7 @@ class JsonProtocolSuite extends FunSuite { - finishedExecutors, drivers, finishedDrivers, "masterUrl", 4, 1234, 4, 1234, "masterWebUiUrl") - val output = JsonProtocol.writeWorkerState(stateResponse) - assertValidJson(output) -+ assertValidDataInJson(output, JsonMethods.parse(JsonConstants.workerStateJsonStr)) - } - - def createAppDesc(): ApplicationDescription = { -@@ -106,9 +122,9 @@ class JsonProtocolSuite extends FunSuite { - - def assertValidJson(json: JValue) { - try { -- JsonParser.parse(JsonAST.compactRender(json)) -+ JsonMethods.parse(JsonMethods.compact(json)) - } catch { -- case e: JsonParser.ParseException => fail("Invalid Json detected", e) -+ case e: JsonParseException => fail("Invalid Json detected", e) - } - } - } -diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala -index 52e894e..6346b29 100644 ---- a/project/SparkBuild.scala -+++ b/project/SparkBuild.scala -@@ -274,7 +274,7 @@ object SparkBuild extends Build { - "org.spark-project.akka" %% "akka-remote" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty), - "org.spark-project.akka" %% "akka-slf4j" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty), - "org.spark-project.akka" %% "akka-testkit" % "2.2.3-shaded-protobuf" % "test", -- "net.liftweb" %% "lift-json" % "2.5.1" excludeAll(excludeNetty), -+ "org.json4s" %% "json4s-jackson" % "3.2.6", - "it.unimi.dsi" % "fastutil" % "6.4.4", - "colt" % "colt" % "1.2.0", - "org.apache.mesos" % "mesos" % "0.13.0", --- -1.8.3.4 (Apple Git-47) - diff --git a/spark-v0.9.1-rc2-0002-use-sbt-0.13.1.patch b/spark-v0.9.1-rc2-0002-use-sbt-0.13.1.patch deleted file mode 100644 index d68e035..0000000 --- a/spark-v0.9.1-rc2-0002-use-sbt-0.13.1.patch +++ /dev/null @@ -1,22 +0,0 @@ -From 92143b68e18754528f927f40e9a0eab9442fe3e8 Mon Sep 17 00:00:00 2001 -From: William Benton -Date: Thu, 27 Feb 2014 14:25:34 -0600 -Subject: [PATCH 2/9] use sbt 0.13.1 - ---- - project/build.properties | 2 +- - 1 file changed, 1 insertion(+), 1 deletion(-) - -diff --git a/project/build.properties b/project/build.properties -index 839f5fb..4b52bb9 100644 ---- a/project/build.properties -+++ b/project/build.properties -@@ -14,4 +14,4 @@ - # See the License for the specific language governing permissions and - # limitations under the License. - # --sbt.version=0.12.4 -+sbt.version=0.13.1 --- -1.8.3.4 (Apple Git-47) - diff --git a/spark-v0.9.1-rc2-0003-Removed-sbt-plugins.patch b/spark-v0.9.1-rc2-0003-Removed-sbt-plugins.patch deleted file mode 100644 index c092784..0000000 --- a/spark-v0.9.1-rc2-0003-Removed-sbt-plugins.patch +++ /dev/null @@ -1,175 +0,0 @@ -From 496a085091397f0d40d4ee63b058909334457f53 Mon Sep 17 00:00:00 2001 -From: William Benton -Date: Thu, 27 Feb 2014 15:46:41 -0600 -Subject: [PATCH 3/9] Removed sbt plugins. - -Patch ported from Fedora Spark 0.9.0 package. - -Conflicts: - project/SparkBuild.scala - project/project/SparkPluginBuild.scala ---- - project/SparkBuild.scala | 44 ++++------------------------------ - project/plugins.sbt | 18 -------------- - project/project/SparkPluginBuild.scala | 24 ------------------- - 3 files changed, 5 insertions(+), 81 deletions(-) - delete mode 100644 project/plugins.sbt - delete mode 100644 project/project/SparkPluginBuild.scala - -diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala -index 6346b29..1eaa755 100644 ---- a/project/SparkBuild.scala -+++ b/project/SparkBuild.scala -@@ -18,8 +18,6 @@ - import sbt._ - import sbt.Classpaths.publishTask - import Keys._ --import sbtassembly.Plugin._ --import AssemblyKeys._ - import scala.util.Properties - // For Sonatype publishing - //import com.jsuereth.pgp.sbtplugin.PgpKeys._ -@@ -60,17 +58,6 @@ object SparkBuild extends Build { - - lazy val mllib = Project("mllib", file("mllib"), settings = mllibSettings) dependsOn(core) - -- lazy val assemblyProj = Project("assembly", file("assembly"), settings = assemblyProjSettings) -- .dependsOn(core, graphx, bagel, mllib, repl, streaming) dependsOn(maybeYarn: _*) dependsOn(maybeGanglia: _*) -- -- lazy val assembleDeps = TaskKey[Unit]("assemble-deps", "Build assembly of dependencies and packages Spark projects") -- -- // A dummy command so we can run the Jenkins pull request builder for older versions of Spark. -- lazy val scalastyle = TaskKey[Unit]("scalastyle", "Dummy scalastyle check") -- val scalastyleTask = scalastyle := { -- println("scalastyle is not configured for this version of Spark project.") -- } -- - // A configuration to set an alternative publishLocalConfiguration - lazy val MavenCompile = config("m2r") extend(Compile) - lazy val publishLocalBoth = TaskKey[Unit]("publish-local", "publish local for m2 and ivy") -@@ -130,7 +117,7 @@ object SparkBuild extends Build { - // Everything except assembly, tools and examples belong to packageProjects - lazy val packageProjects = Seq[ProjectReference](core, repl, bagel, streaming, mllib, graphx) ++ maybeYarnRef ++ maybeGangliaRef - -- lazy val allProjects = packageProjects ++ allExternalRefs ++ Seq[ProjectReference](examples, tools, assemblyProj) -+ lazy val allProjects = packageProjects ++ allExternalRefs ++ Seq[ProjectReference](examples, tools) - - def sharedSettings = Defaults.defaultSettings ++ Seq( - organization := "org.apache.spark", -@@ -143,7 +130,6 @@ object SparkBuild extends Build { - retrieveManaged := true, - retrievePattern := "[type]s/[artifact](-[revision])(-[classifier]).[ext]", - transitiveClassifiers in Scope.GlobalScope := Seq("sources"), -- testListeners <<= target.map(t => Seq(new eu.henkelmann.sbt.JUnitXmlTestsListener(t.getAbsolutePath))), - - // Fork new JVMs for tests and set Java options for those - fork := true, -@@ -245,8 +231,8 @@ object SparkBuild extends Build { - publishMavenStyle in MavenCompile := true, - publishLocal in MavenCompile <<= publishTask(publishLocalConfiguration in MavenCompile, deliverLocal), - publishLocalBoth <<= Seq(publishLocal in MavenCompile, publishLocal).dependOn -- ) ++ net.virtualvoid.sbt.graph.Plugin.graphSettings -- -+ ) -+ - val slf4jVersion = "1.7.2" - - val excludeCglib = ExclusionRule(organization = "org.sonatype.sisu.inject") -@@ -322,11 +308,11 @@ object SparkBuild extends Build { - excludeAll(excludeSnappy) - excludeAll(excludeCglib) - ) -- ) ++ assemblySettings ++ extraAssemblySettings -+ ) - - def toolsSettings = sharedSettings ++ Seq( - name := "spark-tools" -- ) ++ assemblySettings ++ extraAssemblySettings -+ ) - - def graphxSettings = sharedSettings ++ Seq( - name := "spark-graphx", -@@ -395,26 +381,6 @@ object SparkBuild extends Build { - ) - ) - -- def assemblyProjSettings = sharedSettings ++ Seq( -- libraryDependencies += "net.sf.py4j" % "py4j" % "0.8.1", -- name := "spark-assembly", -- scalastyleTask, -- assembleDeps in Compile <<= (packageProjects.map(packageBin in Compile in _) ++ Seq(packageDependency in Compile)).dependOn, -- jarName in assembly <<= version map { v => "spark-assembly-" + v + "-hadoop" + hadoopVersion + ".jar" }, -- jarName in packageDependency <<= version map { v => "spark-assembly-" + v + "-hadoop" + hadoopVersion + "-deps.jar" } -- ) ++ assemblySettings ++ extraAssemblySettings -- -- def extraAssemblySettings() = Seq( -- test in assembly := {}, -- mergeStrategy in assembly := { -- case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard -- case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard -- case "log4j.properties" => MergeStrategy.discard -- case m if m.toLowerCase.startsWith("meta-inf/services/") => MergeStrategy.filterDistinctLines -- case "reference.conf" => MergeStrategy.concat -- case _ => MergeStrategy.first -- } -- ) - - def twitterSettings() = sharedSettings ++ Seq( - name := "spark-streaming-twitter", -diff --git a/project/plugins.sbt b/project/plugins.sbt -deleted file mode 100644 -index 4ba0e42..0000000 ---- a/project/plugins.sbt -+++ /dev/null -@@ -1,18 +0,0 @@ --resolvers += Resolver.url("artifactory", url("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases"))(Resolver.ivyStylePatterns) -- --resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/" -- --resolvers += "Spray Repository" at "http://repo.spray.cc/" -- --addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.9.2") -- --addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.2.0") -- --addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.5.1") -- --// For Sonatype publishing --//resolvers += Resolver.url("sbt-plugin-releases", new URL("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases/"))(Resolver.ivyStylePatterns) -- --//addSbtPlugin("com.jsuereth" % "xsbt-gpg-plugin" % "0.6") -- --addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.7.3") -diff --git a/project/project/SparkPluginBuild.scala b/project/project/SparkPluginBuild.scala -deleted file mode 100644 -index 4853be2..0000000 ---- a/project/project/SparkPluginBuild.scala -+++ /dev/null -@@ -1,24 +0,0 @@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one or more -- * contributor license agreements. See the NOTICE file distributed with -- * this work for additional information regarding copyright ownership. -- * The ASF licenses this file to You under the Apache License, Version 2.0 -- * (the "License"); you may not use this file except in compliance with -- * the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ -- --import sbt._ -- --object SparkPluginDef extends Build { -- lazy val root = Project("plugins", file(".")) dependsOn(junitXmlListener) -- /* This is not published in a Maven repository, so we get it from GitHub directly */ -- lazy val junitXmlListener = uri("https://github.com/ijuma/junit_xml_listener.git#fe434773255b451a38e8d889536ebc260f4225ce") --} --- -1.8.3.4 (Apple Git-47) - diff --git a/spark-v0.9.1-rc2-0004-removed-examples.patch b/spark-v0.9.1-rc2-0004-removed-examples.patch deleted file mode 100644 index 21e86fa..0000000 --- a/spark-v0.9.1-rc2-0004-removed-examples.patch +++ /dev/null @@ -1,126 +0,0 @@ -From cc24412b35a4f3118bcb4ad3e57a5ce33549c809 Mon Sep 17 00:00:00 2001 -From: William Benton -Date: Thu, 27 Feb 2014 16:01:11 -0600 -Subject: [PATCH 4/9] removed examples - -ported patch from Fedora Spark 0.9.0 package - -Conflicts: - project/SparkBuild.scala ---- - project/SparkBuild.scala | 83 ++---------------------------------------------- - 1 file changed, 3 insertions(+), 80 deletions(-) - -diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala -index 1eaa755..3925475 100644 ---- a/project/SparkBuild.scala -+++ b/project/SparkBuild.scala -@@ -93,31 +93,13 @@ object SparkBuild extends Build { - lazy val maybeYarn: Seq[ClasspathDependency] = if (isYarnEnabled) Seq(if (isNewHadoop) yarn else yarnAlpha) else Seq() - lazy val maybeYarnRef: Seq[ProjectReference] = if (isYarnEnabled) Seq(if (isNewHadoop) yarn else yarnAlpha) else Seq() - -- lazy val externalTwitter = Project("external-twitter", file("external/twitter"), settings = twitterSettings) -- .dependsOn(streaming % "compile->compile;test->test") -- -- lazy val externalKafka = Project("external-kafka", file("external/kafka"), settings = kafkaSettings) -- .dependsOn(streaming % "compile->compile;test->test") -- -- lazy val externalFlume = Project("external-flume", file("external/flume"), settings = flumeSettings) -- .dependsOn(streaming % "compile->compile;test->test") -- -- lazy val externalZeromq = Project("external-zeromq", file("external/zeromq"), settings = zeromqSettings) -- .dependsOn(streaming % "compile->compile;test->test") -- -- lazy val externalMqtt = Project("external-mqtt", file("external/mqtt"), settings = mqttSettings) -- .dependsOn(streaming % "compile->compile;test->test") -- -- lazy val allExternal = Seq[ClasspathDependency](externalTwitter, externalKafka, externalFlume, externalZeromq, externalMqtt) -- lazy val allExternalRefs = Seq[ProjectReference](externalTwitter, externalKafka, externalFlume, externalZeromq, externalMqtt) -- -- lazy val examples = Project("examples", file("examples"), settings = examplesSettings) -- .dependsOn(core, mllib, graphx, bagel, streaming, externalTwitter) dependsOn(allExternal: _*) -+ lazy val allExternal = Seq[ClasspathDependency]() -+ lazy val allExternalRefs = Seq[ProjectReference]() - - // Everything except assembly, tools and examples belong to packageProjects - lazy val packageProjects = Seq[ProjectReference](core, repl, bagel, streaming, mllib, graphx) ++ maybeYarnRef ++ maybeGangliaRef - -- lazy val allProjects = packageProjects ++ allExternalRefs ++ Seq[ProjectReference](examples, tools) -+ lazy val allProjects = packageProjects ++ allExternalRefs ++ Seq[ProjectReference](tools) - - def sharedSettings = Defaults.defaultSettings ++ Seq( - organization := "org.apache.spark", -@@ -291,25 +273,6 @@ object SparkBuild extends Build { - libraryDependencies <+= scalaVersion(v => "org.scala-lang" % "scala-reflect" % v ) - ) - -- def examplesSettings = sharedSettings ++ Seq( -- name := "spark-examples", -- libraryDependencies ++= Seq( -- "com.twitter" %% "algebird-core" % "0.1.11", -- "org.apache.hbase" % "hbase" % "0.94.6" excludeAll(excludeNetty, excludeAsm), -- "org.apache.hbase" % "hbase" % HBASE_VERSION excludeAll(excludeNetty, excludeAsm), -- "org.apache.cassandra" % "cassandra-all" % "1.2.6" -- exclude("com.google.guava", "guava") -- exclude("com.googlecode.concurrentlinkedhashmap", "concurrentlinkedhashmap-lru") -- exclude("com.ning","compress-lzf") -- exclude("io.netty", "netty") -- exclude("jline","jline") -- exclude("log4j","log4j") -- exclude("org.apache.cassandra.deps", "avro") -- excludeAll(excludeSnappy) -- excludeAll(excludeCglib) -- ) -- ) -- - def toolsSettings = sharedSettings ++ Seq( - name := "spark-tools" - ) -@@ -380,44 +343,4 @@ object SparkBuild extends Build { - "org.apache.hadoop" % "hadoop-yarn-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib) - ) - ) -- -- -- def twitterSettings() = sharedSettings ++ Seq( -- name := "spark-streaming-twitter", -- libraryDependencies ++= Seq( -- "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty) -- ) -- ) -- -- def kafkaSettings() = sharedSettings ++ Seq( -- name := "spark-streaming-kafka", -- libraryDependencies ++= Seq( -- "com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty), -- "org.apache.kafka" %% "kafka" % "0.8.0" -- exclude("com.sun.jdmk", "jmxtools") -- exclude("com.sun.jmx", "jmxri") -- exclude("net.sf.jopt-simple", "jopt-simple") -- excludeAll(excludeNetty) -- ) -- ) -- -- def flumeSettings() = sharedSettings ++ Seq( -- name := "spark-streaming-flume", -- libraryDependencies ++= Seq( -- "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy) -- ) -- ) -- -- def zeromqSettings() = sharedSettings ++ Seq( -- name := "spark-streaming-zeromq", -- libraryDependencies ++= Seq( -- "org.spark-project.akka" %% "akka-zeromq" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty) -- ) -- ) -- -- def mqttSettings() = streamingSettings ++ Seq( -- name := "spark-streaming-mqtt", -- resolvers ++= Seq("Eclipse Repo" at "https://repo.eclipse.org/content/repositories/paho-releases/"), -- libraryDependencies ++= Seq("org.eclipse.paho" % "mqtt-client" % "0.4.0") -- ) - } --- -1.8.3.4 (Apple Git-47) - diff --git a/spark-v0.9.1-rc2-0005-Removed-code-depending-on-Kryo.patch b/spark-v0.9.1-rc2-0005-Removed-code-depending-on-Kryo.patch deleted file mode 100644 index 1cd34d0..0000000 --- a/spark-v0.9.1-rc2-0005-Removed-code-depending-on-Kryo.patch +++ /dev/null @@ -1,629 +0,0 @@ -From 6172c60fe82dae51155ca97db39cd75bc42fcba3 Mon Sep 17 00:00:00 2001 -From: William Benton -Date: Thu, 27 Feb 2014 16:43:44 -0600 -Subject: [PATCH 5/9] Removed code depending on Kryo - -Conflicts: - core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala ---- - .../apache/spark/serializer/KryoSerializer.scala | 175 --------------------- - .../apache/spark/storage/StoragePerfTester.scala | 103 ------------ - .../org/apache/spark/storage/ThreadingTest.scala | 115 -------------- - .../util/collection/ExternalAppendOnlyMap.scala | 1 + - .../apache/spark/graphx/GraphKryoRegistrator.scala | 48 ------ - .../apache/spark/mllib/recommendation/ALS.scala | 12 -- - .../spark/streaming/util/RawTextSender.scala | 82 ---------- - 7 files changed, 1 insertion(+), 535 deletions(-) - delete mode 100644 core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala - delete mode 100644 core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala - delete mode 100644 core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala - delete mode 100644 graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala - delete mode 100644 streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala - -diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala -deleted file mode 100644 -index c14cd47..0000000 ---- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala -+++ /dev/null -@@ -1,175 +0,0 @@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one or more -- * contributor license agreements. See the NOTICE file distributed with -- * this work for additional information regarding copyright ownership. -- * The ASF licenses this file to You under the Apache License, Version 2.0 -- * (the "License"); you may not use this file except in compliance with -- * the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ -- --package org.apache.spark.serializer -- --import java.nio.ByteBuffer --import java.io.{EOFException, InputStream, OutputStream} -- --import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer} --import com.esotericsoftware.kryo.{KryoException, Kryo} --import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} --import com.twitter.chill.{EmptyScalaKryoInstantiator, AllScalaRegistrar} -- --import org.apache.spark._ --import org.apache.spark.broadcast.HttpBroadcast --import org.apache.spark.scheduler.MapStatus --import org.apache.spark.storage._ --import org.apache.spark.storage.{GetBlock, GotBlock, PutBlock} -- --/** -- * A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]]. -- */ --class KryoSerializer(conf: SparkConf) extends org.apache.spark.serializer.Serializer with Logging { -- private val bufferSize = { -- conf.getInt("spark.kryoserializer.buffer.mb", 2) * 1024 * 1024 -- } -- -- def newKryoOutput() = new KryoOutput(bufferSize) -- -- def newKryo(): Kryo = { -- val instantiator = new EmptyScalaKryoInstantiator -- val kryo = instantiator.newKryo() -- val classLoader = Thread.currentThread.getContextClassLoader -- -- // Allow disabling Kryo reference tracking if user knows their object graphs don't have loops. -- // Do this before we invoke the user registrator so the user registrator can override this. -- kryo.setReferences(conf.getBoolean("spark.kryo.referenceTracking", true)) -- -- for (cls <- KryoSerializer.toRegister) kryo.register(cls) -- -- // Allow sending SerializableWritable -- kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer()) -- kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer()) -- -- // Allow the user to register their own classes by setting spark.kryo.registrator -- try { -- for (regCls <- conf.getOption("spark.kryo.registrator")) { -- logDebug("Running user registrator: " + regCls) -- val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator] -- reg.registerClasses(kryo) -- } -- } catch { -- case e: Exception => logError("Failed to run spark.kryo.registrator", e) -- } -- -- // Register Chill's classes; we do this after our ranges and the user's own classes to let -- // our code override the generic serialziers in Chill for things like Seq -- new AllScalaRegistrar().apply(kryo) -- -- kryo.setClassLoader(classLoader) -- kryo -- } -- -- def newInstance(): SerializerInstance = { -- new KryoSerializerInstance(this) -- } --} -- --private[spark] --class KryoSerializationStream(kryo: Kryo, outStream: OutputStream) extends SerializationStream { -- val output = new KryoOutput(outStream) -- -- def writeObject[T](t: T): SerializationStream = { -- kryo.writeClassAndObject(output, t) -- this -- } -- -- def flush() { output.flush() } -- def close() { output.close() } --} -- --private[spark] --class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends DeserializationStream { -- val input = new KryoInput(inStream) -- -- def readObject[T](): T = { -- try { -- kryo.readClassAndObject(input).asInstanceOf[T] -- } catch { -- // DeserializationStream uses the EOF exception to indicate stopping condition. -- case _: KryoException => throw new EOFException -- } -- } -- -- def close() { -- // Kryo's Input automatically closes the input stream it is using. -- input.close() -- } --} -- --private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance { -- val kryo = ks.newKryo() -- -- // Make these lazy vals to avoid creating a buffer unless we use them -- lazy val output = ks.newKryoOutput() -- lazy val input = new KryoInput() -- -- def serialize[T](t: T): ByteBuffer = { -- output.clear() -- kryo.writeClassAndObject(output, t) -- ByteBuffer.wrap(output.toBytes) -- } -- -- def deserialize[T](bytes: ByteBuffer): T = { -- input.setBuffer(bytes.array) -- kryo.readClassAndObject(input).asInstanceOf[T] -- } -- -- def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = { -- val oldClassLoader = kryo.getClassLoader -- kryo.setClassLoader(loader) -- input.setBuffer(bytes.array) -- val obj = kryo.readClassAndObject(input).asInstanceOf[T] -- kryo.setClassLoader(oldClassLoader) -- obj -- } -- -- def serializeStream(s: OutputStream): SerializationStream = { -- new KryoSerializationStream(kryo, s) -- } -- -- def deserializeStream(s: InputStream): DeserializationStream = { -- new KryoDeserializationStream(kryo, s) -- } --} -- --/** -- * Interface implemented by clients to register their classes with Kryo when using Kryo -- * serialization. -- */ --trait KryoRegistrator { -- def registerClasses(kryo: Kryo) --} -- --private[serializer] object KryoSerializer { -- // Commonly used classes. -- private val toRegister: Seq[Class[_]] = Seq( -- ByteBuffer.allocate(1).getClass, -- classOf[StorageLevel], -- classOf[PutBlock], -- classOf[GotBlock], -- classOf[GetBlock], -- classOf[MapStatus], -- classOf[BlockManagerId], -- classOf[Array[Byte]], -- (1 to 10).getClass, -- (1 until 10).getClass, -- (1L to 10L).getClass, -- (1L until 10L).getClass -- ) --} -diff --git a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala b/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala -deleted file mode 100644 -index 40734aa..0000000 ---- a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala -+++ /dev/null -@@ -1,103 +0,0 @@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one or more -- * contributor license agreements. See the NOTICE file distributed with -- * this work for additional information regarding copyright ownership. -- * The ASF licenses this file to You under the Apache License, Version 2.0 -- * (the "License"); you may not use this file except in compliance with -- * the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ -- --package org.apache.spark.storage -- --import java.util.concurrent.atomic.AtomicLong --import java.util.concurrent.{CountDownLatch, Executors} -- --import org.apache.spark.serializer.KryoSerializer --import org.apache.spark.SparkContext --import org.apache.spark.util.Utils -- --/** -- * Utility for micro-benchmarking shuffle write performance. -- * -- * Writes simulated shuffle output from several threads and records the observed throughput. -- */ --object StoragePerfTester { -- def main(args: Array[String]) = { -- /** Total amount of data to generate. Distributed evenly amongst maps and reduce splits. */ -- val dataSizeMb = Utils.memoryStringToMb(sys.env.getOrElse("OUTPUT_DATA", "1g")) -- -- /** Number of map tasks. All tasks execute concurrently. */ -- val numMaps = sys.env.get("NUM_MAPS").map(_.toInt).getOrElse(8) -- -- /** Number of reduce splits for each map task. */ -- val numOutputSplits = sys.env.get("NUM_REDUCERS").map(_.toInt).getOrElse(500) -- -- val recordLength = 1000 // ~1KB records -- val totalRecords = dataSizeMb * 1000 -- val recordsPerMap = totalRecords / numMaps -- -- val writeData = "1" * recordLength -- val executor = Executors.newFixedThreadPool(numMaps) -- -- System.setProperty("spark.shuffle.compress", "false") -- System.setProperty("spark.shuffle.sync", "true") -- -- // This is only used to instantiate a BlockManager. All thread scheduling is done manually. -- val sc = new SparkContext("local[4]", "Write Tester") -- val blockManager = sc.env.blockManager -- -- def writeOutputBytes(mapId: Int, total: AtomicLong) = { -- val shuffle = blockManager.shuffleBlockManager.forMapTask(1, mapId, numOutputSplits, -- new KryoSerializer(sc.conf)) -- val writers = shuffle.writers -- for (i <- 1 to recordsPerMap) { -- writers(i % numOutputSplits).write(writeData) -- } -- writers.map {w => -- w.commit() -- total.addAndGet(w.fileSegment().length) -- w.close() -- } -- -- shuffle.releaseWriters(true) -- } -- -- val start = System.currentTimeMillis() -- val latch = new CountDownLatch(numMaps) -- val totalBytes = new AtomicLong() -- for (task <- 1 to numMaps) { -- executor.submit(new Runnable() { -- override def run() = { -- try { -- writeOutputBytes(task, totalBytes) -- latch.countDown() -- } catch { -- case e: Exception => -- println("Exception in child thread: " + e + " " + e.getMessage) -- System.exit(1) -- } -- } -- }) -- } -- latch.await() -- val end = System.currentTimeMillis() -- val time = (end - start) / 1000.0 -- val bytesPerSecond = totalBytes.get() / time -- val bytesPerFile = (totalBytes.get() / (numOutputSplits * numMaps.toDouble)).toLong -- -- System.err.println("files_total\t\t%s".format(numMaps * numOutputSplits)) -- System.err.println("bytes_per_file\t\t%s".format(Utils.bytesToString(bytesPerFile))) -- System.err.println("agg_throughput\t\t%s/s".format(Utils.bytesToString(bytesPerSecond.toLong))) -- -- executor.shutdown() -- sc.stop() -- } --} -diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala -deleted file mode 100644 -index 729ba2c..0000000 ---- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala -+++ /dev/null -@@ -1,115 +0,0 @@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one or more -- * contributor license agreements. See the NOTICE file distributed with -- * this work for additional information regarding copyright ownership. -- * The ASF licenses this file to You under the Apache License, Version 2.0 -- * (the "License"); you may not use this file except in compliance with -- * the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ -- --package org.apache.spark.storage -- --import akka.actor._ -- --import java.util.concurrent.ArrayBlockingQueue --import util.Random --import org.apache.spark.serializer.KryoSerializer --import org.apache.spark.{SparkConf, SparkContext} -- --/** -- * This class tests the BlockManager and MemoryStore for thread safety and -- * deadlocks. It spawns a number of producer and consumer threads. Producer -- * threads continuously pushes blocks into the BlockManager and consumer -- * threads continuously retrieves the blocks form the BlockManager and tests -- * whether the block is correct or not. -- */ --private[spark] object ThreadingTest { -- -- val numProducers = 5 -- val numBlocksPerProducer = 20000 -- -- private[spark] class ProducerThread(manager: BlockManager, id: Int) extends Thread { -- val queue = new ArrayBlockingQueue[(BlockId, Seq[Int])](100) -- -- override def run() { -- for (i <- 1 to numBlocksPerProducer) { -- val blockId = TestBlockId("b-" + id + "-" + i) -- val blockSize = Random.nextInt(1000) -- val block = (1 to blockSize).map(_ => Random.nextInt()) -- val level = randomLevel() -- val startTime = System.currentTimeMillis() -- manager.put(blockId, block.iterator, level, true) -- println("Pushed block " + blockId + " in " + (System.currentTimeMillis - startTime) + " ms") -- queue.add((blockId, block)) -- } -- println("Producer thread " + id + " terminated") -- } -- -- def randomLevel(): StorageLevel = { -- math.abs(Random.nextInt()) % 4 match { -- case 0 => StorageLevel.MEMORY_ONLY -- case 1 => StorageLevel.MEMORY_ONLY_SER -- case 2 => StorageLevel.MEMORY_AND_DISK -- case 3 => StorageLevel.MEMORY_AND_DISK_SER -- } -- } -- } -- -- private[spark] class ConsumerThread( -- manager: BlockManager, -- queue: ArrayBlockingQueue[(BlockId, Seq[Int])] -- ) extends Thread { -- var numBlockConsumed = 0 -- -- override def run() { -- println("Consumer thread started") -- while(numBlockConsumed < numBlocksPerProducer) { -- val (blockId, block) = queue.take() -- val startTime = System.currentTimeMillis() -- manager.get(blockId) match { -- case Some(retrievedBlock) => -- assert(retrievedBlock.toList.asInstanceOf[List[Int]] == block.toList, -- "Block " + blockId + " did not match") -- println("Got block " + blockId + " in " + -- (System.currentTimeMillis - startTime) + " ms") -- case None => -- assert(false, "Block " + blockId + " could not be retrieved") -- } -- numBlockConsumed += 1 -- } -- println("Consumer thread terminated") -- } -- } -- -- def main(args: Array[String]) { -- System.setProperty("spark.kryoserializer.buffer.mb", "1") -- val actorSystem = ActorSystem("test") -- val conf = new SparkConf() -- val serializer = new KryoSerializer(conf) -- val blockManagerMaster = new BlockManagerMaster( -- actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf))), conf) -- val blockManager = new BlockManager( -- "", actorSystem, blockManagerMaster, serializer, 1024 * 1024, conf) -- val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i)) -- val consumers = producers.map(p => new ConsumerThread(blockManager, p.queue)) -- producers.foreach(_.start) -- consumers.foreach(_.start) -- producers.foreach(_.join) -- consumers.foreach(_.join) -- blockManager.stop() -- blockManagerMaster.stop() -- actorSystem.shutdown() -- actorSystem.awaitTermination() -- println("Everything stopped.") -- println( -- "It will take sometime for the JVM to clean all temporary files and shutdown. Sit tight.") -- } --} -diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala -index 6f36817..c4f3efe 100644 ---- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala -+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala -@@ -30,6 +30,7 @@ import org.apache.spark.{Logging, SparkEnv} - import org.apache.spark.serializer.Serializer - import org.apache.spark.storage.{BlockId, BlockManager} - -+ - /** - * An append-only map that spills sorted content to disk when there is insufficient space for it - * to grow. -diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala -deleted file mode 100644 -index dd380d8..0000000 ---- a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala -+++ /dev/null -@@ -1,48 +0,0 @@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one or more -- * contributor license agreements. See the NOTICE file distributed with -- * this work for additional information regarding copyright ownership. -- * The ASF licenses this file to You under the Apache License, Version 2.0 -- * (the "License"); you may not use this file except in compliance with -- * the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ -- --package org.apache.spark.graphx -- --import com.esotericsoftware.kryo.Kryo -- --import org.apache.spark.graphx.impl._ --import org.apache.spark.serializer.KryoRegistrator --import org.apache.spark.util.collection.BitSet --import org.apache.spark.util.BoundedPriorityQueue -- --/** -- * Registers GraphX classes with Kryo for improved performance. -- */ --class GraphKryoRegistrator extends KryoRegistrator { -- -- def registerClasses(kryo: Kryo) { -- kryo.register(classOf[Edge[Object]]) -- kryo.register(classOf[MessageToPartition[Object]]) -- kryo.register(classOf[VertexBroadcastMsg[Object]]) -- kryo.register(classOf[(VertexId, Object)]) -- kryo.register(classOf[EdgePartition[Object]]) -- kryo.register(classOf[BitSet]) -- kryo.register(classOf[VertexIdToIndexMap]) -- kryo.register(classOf[VertexAttributeBlock[Object]]) -- kryo.register(classOf[PartitionStrategy]) -- kryo.register(classOf[BoundedPriorityQueue[Object]]) -- kryo.register(classOf[EdgeDirection]) -- -- // This avoids a large number of hash table lookups. -- kryo.setReferences(false) -- } --} -diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala -index 44db51c..f13781a 100644 ---- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala -+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala -@@ -26,10 +26,8 @@ import org.apache.spark.broadcast.Broadcast - import org.apache.spark.{Logging, HashPartitioner, Partitioner, SparkContext, SparkConf} - import org.apache.spark.storage.StorageLevel - import org.apache.spark.rdd.RDD --import org.apache.spark.serializer.KryoRegistrator - import org.apache.spark.SparkContext._ - --import com.esotericsoftware.kryo.Kryo - import org.jblas.{DoubleMatrix, SimpleBlas, Solve} - - -@@ -641,12 +639,6 @@ object ALS { - trainImplicit(ratings, rank, iterations, 0.01, -1, 1.0) - } - -- private class ALSRegistrator extends KryoRegistrator { -- override def registerClasses(kryo: Kryo) { -- kryo.register(classOf[Rating]) -- } -- } -- - def main(args: Array[String]) { - if (args.length < 5 || args.length > 9) { - println("Usage: ALS " + -@@ -660,10 +652,6 @@ object ALS { - val alpha = if (args.length >= 8) args(7).toDouble else 1 - val blocks = if (args.length == 9) args(8).toInt else -1 - val conf = new SparkConf() -- .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") -- .set("spark.kryo.registrator", classOf[ALSRegistrator].getName) -- .set("spark.kryo.referenceTracking", "false") -- .set("spark.kryoserializer.buffer.mb", "8") - .set("spark.locality.wait", "10000") - val sc = new SparkContext(master, "ALS", conf) - -diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala -deleted file mode 100644 -index 684b38e..0000000 ---- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala -+++ /dev/null -@@ -1,82 +0,0 @@ --/* -- * Licensed to the Apache Software Foundation (ASF) under one or more -- * contributor license agreements. See the NOTICE file distributed with -- * this work for additional information regarding copyright ownership. -- * The ASF licenses this file to You under the Apache License, Version 2.0 -- * (the "License"); you may not use this file except in compliance with -- * the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ -- --package org.apache.spark.streaming.util -- --import java.io.IOException --import java.net.ServerSocket --import java.nio.ByteBuffer -- --import scala.io.Source -- --import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream -- --import org.apache.spark.{SparkConf, Logging} --import org.apache.spark.serializer.KryoSerializer --import org.apache.spark.util.IntParam -- --/** -- * A helper program that sends blocks of Kryo-serialized text strings out on a socket at a -- * specified rate. Used to feed data into RawInputDStream. -- */ --private[streaming] --object RawTextSender extends Logging { -- def main(args: Array[String]) { -- if (args.length != 4) { -- System.err.println("Usage: RawTextSender ") -- System.exit(1) -- } -- // Parse the arguments using a pattern match -- val Array(IntParam(port), file, IntParam(blockSize), IntParam(bytesPerSec)) = args -- -- // Repeat the input data multiple times to fill in a buffer -- val lines = Source.fromFile(file).getLines().toArray -- val bufferStream = new FastByteArrayOutputStream(blockSize + 1000) -- val ser = new KryoSerializer(new SparkConf()).newInstance() -- val serStream = ser.serializeStream(bufferStream) -- var i = 0 -- while (bufferStream.position < blockSize) { -- serStream.writeObject(lines(i)) -- i = (i + 1) % lines.length -- } -- bufferStream.trim() -- val array = bufferStream.array -- -- val countBuf = ByteBuffer.wrap(new Array[Byte](4)) -- countBuf.putInt(array.length) -- countBuf.flip() -- -- val serverSocket = new ServerSocket(port) -- logInfo("Listening on port " + port) -- -- while (true) { -- val socket = serverSocket.accept() -- logInfo("Got a new connection") -- val out = new RateLimitedOutputStream(socket.getOutputStream, bytesPerSec) -- try { -- while (true) { -- out.write(countBuf.array) -- out.write(array) -- } -- } catch { -- case e: IOException => -- logError("Client disconnected") -- socket.close() -- } -- } -- } --} --- -1.8.3.4 (Apple Git-47) - diff --git a/spark-v0.9.1-rc2-0006-remove-unavailable-and-unnecessary-deps.patch b/spark-v0.9.1-rc2-0006-remove-unavailable-and-unnecessary-deps.patch deleted file mode 100644 index ca5c9b6..0000000 --- a/spark-v0.9.1-rc2-0006-remove-unavailable-and-unnecessary-deps.patch +++ /dev/null @@ -1,58 +0,0 @@ -From 204fa5821bf7f20b49297d5a9019ec6a5bd3da87 Mon Sep 17 00:00:00 2001 -From: William Benton -Date: Fri, 28 Feb 2014 09:00:04 -0600 -Subject: [PATCH 6/9] remove unavailable and unnecessary deps - -Conflicts: - project/SparkBuild.scala ---- - project/SparkBuild.scala | 14 ++------------ - 1 file changed, 2 insertions(+), 12 deletions(-) - -diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala -index 3925475..c028cd7 100644 ---- a/project/SparkBuild.scala -+++ b/project/SparkBuild.scala -@@ -191,13 +191,7 @@ object SparkBuild extends Build { - "io.netty" % "netty-all" % "4.0.13.Final", - "org.eclipse.jetty" % "jetty-server" % "7.6.8.v20121106", - /** Workaround for SPARK-959. Dependency used by org.eclipse.jetty. Fixed in ivy 2.3.0. */ -- "org.eclipse.jetty.orbit" % "javax.servlet" % "2.5.0.v201103041518" artifacts Artifact("javax.servlet", "jar", "jar"), -- "org.scalatest" %% "scalatest" % "1.9.1" % "test", -- "org.scalacheck" %% "scalacheck" % "1.10.0" % "test", -- "com.novocode" % "junit-interface" % "0.10" % "test", -- "org.easymock" % "easymock" % "3.1" % "test", -- "org.mockito" % "mockito-all" % "1.8.5" % "test", -- "commons-io" % "commons-io" % "2.4" % "test" -+ "org.eclipse.jetty.orbit" % "javax.servlet" % "2.5.0.v201103041518" artifacts Artifact("javax.servlet", "jar", "jar") - ), - - testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"), -@@ -241,14 +235,12 @@ object SparkBuild extends Build { - "org.xerial.snappy" % "snappy-java" % "1.0.5", - "org.spark-project.akka" %% "akka-remote" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty), - "org.spark-project.akka" %% "akka-slf4j" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty), -- "org.spark-project.akka" %% "akka-testkit" % "2.2.3-shaded-protobuf" % "test", - "org.json4s" %% "json4s-jackson" % "3.2.6", - "it.unimi.dsi" % "fastutil" % "6.4.4", - "colt" % "colt" % "1.2.0", - "org.apache.mesos" % "mesos" % "0.13.0", - "net.java.dev.jets3t" % "jets3t" % "0.7.1", -- "org.apache.derby" % "derby" % "10.4.2.0" % "test", -- "org.apache.hadoop" % hadoopClient % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib), -+ "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib), - "org.apache.avro" % "avro" % "1.7.4", - "org.apache.avro" % "avro-ipc" % "1.7.4" excludeAll(excludeNetty), - "org.apache.zookeeper" % "zookeeper" % "3.4.5" excludeAll(excludeNetty), -@@ -256,8 +248,6 @@ object SparkBuild extends Build { - "com.codahale.metrics" % "metrics-jvm" % "3.0.0", - "com.codahale.metrics" % "metrics-json" % "3.0.0", - "com.codahale.metrics" % "metrics-graphite" % "3.0.0", -- "com.twitter" %% "chill" % "0.3.1", -- "com.twitter" % "chill-java" % "0.3.1", - "com.clearspring.analytics" % "stream" % "2.5.1" - ) - ) --- -1.8.3.4 (Apple Git-47) - diff --git a/spark-v0.9.1-rc2-0007-use-Jetty-8.patch b/spark-v0.9.1-rc2-0007-use-Jetty-8.patch deleted file mode 100644 index abf8d58..0000000 --- a/spark-v0.9.1-rc2-0007-use-Jetty-8.patch +++ /dev/null @@ -1,27 +0,0 @@ -From 80144d10b48dd1c936e5ef37ad2a9766fbbb0936 Mon Sep 17 00:00:00 2001 -From: William Benton -Date: Fri, 28 Feb 2014 15:16:45 -0600 -Subject: [PATCH 7/9] use Jetty 8 - ---- - project/SparkBuild.scala | 4 +--- - 1 file changed, 1 insertion(+), 3 deletions(-) - -diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala -index c028cd7..9fdf9f8 100644 ---- a/project/SparkBuild.scala -+++ b/project/SparkBuild.scala -@@ -189,9 +189,7 @@ object SparkBuild extends Build { - - libraryDependencies ++= Seq( - "io.netty" % "netty-all" % "4.0.13.Final", -- "org.eclipse.jetty" % "jetty-server" % "7.6.8.v20121106", -- /** Workaround for SPARK-959. Dependency used by org.eclipse.jetty. Fixed in ivy 2.3.0. */ -- "org.eclipse.jetty.orbit" % "javax.servlet" % "2.5.0.v201103041518" artifacts Artifact("javax.servlet", "jar", "jar") -+ "org.eclipse.jetty" % "jetty-server" % "8.1.14.v20131031" - ), - - testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"), --- -1.8.3.4 (Apple Git-47) - diff --git a/spark-v0.9.1-rc2-0008-use-Akka-2.3.0-RC2.patch b/spark-v0.9.1-rc2-0008-use-Akka-2.3.0-RC2.patch deleted file mode 100644 index 973d27a..0000000 --- a/spark-v0.9.1-rc2-0008-use-Akka-2.3.0-RC2.patch +++ /dev/null @@ -1,84 +0,0 @@ -From 2cbc1a0bf275759bd17033e8844fc18ddcbbed2d Mon Sep 17 00:00:00 2001 -From: William Benton -Date: Fri, 28 Feb 2014 15:31:52 -0600 -Subject: [PATCH 8/9] use Akka 2.3.0-RC2 - ---- - core/src/main/scala/org/apache/spark/deploy/Client.scala | 2 +- - core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala | 2 +- - .../main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala | 2 +- - .../main/scala/org/apache/spark/util/IndestructibleActorSystem.scala | 2 +- - project/SparkBuild.scala | 5 +++-- - 5 files changed, 7 insertions(+), 6 deletions(-) - -diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala -index 9987e23..7fda886 100644 ---- a/core/src/main/scala/org/apache/spark/deploy/Client.scala -+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala -@@ -116,7 +116,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends - println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.") - System.exit(-1) - -- case AssociationErrorEvent(cause, _, remoteAddress, _) => -+ case AssociationErrorEvent(cause, _, remoteAddress, _, _) => - println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.") - println(s"Cause was: $cause") - System.exit(-1) -diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala -index 1415e2f..8d732db 100644 ---- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala -+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala -@@ -153,7 +153,7 @@ private[spark] class AppClient( - logWarning(s"Connection to $address failed; waiting for master to reconnect...") - markDisconnected() - -- case AssociationErrorEvent(cause, _, address, _) if isPossibleMaster(address) => -+ case AssociationErrorEvent(cause, _, address, _, _) if isPossibleMaster(address) => - logWarning(s"Could not connect to $address: $cause") - - case StopAppClient => -diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala -index 1dc39c4..732a1d7 100644 ---- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala -+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala -@@ -52,7 +52,7 @@ private[spark] class WorkerWatcher(workerUrl: String) extends Actor - case AssociatedEvent(localAddress, remoteAddress, inbound) if isWorker(remoteAddress) => - logInfo(s"Successfully connected to $workerUrl") - -- case AssociationErrorEvent(cause, localAddress, remoteAddress, inbound) -+ case AssociationErrorEvent(cause, localAddress, remoteAddress, inbound, _) - if isWorker(remoteAddress) => - // These logs may not be seen if the worker (and associated pipe) has died - logError(s"Could not initialize connection to worker $workerUrl. Exiting.") -diff --git a/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala -index bf71882..08d703e 100644 ---- a/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala -+++ b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala -@@ -39,7 +39,7 @@ private[akka] class IndestructibleActorSystemImpl( - override val name: String, - applicationConfig: Config, - classLoader: ClassLoader) -- extends ActorSystemImpl(name, applicationConfig, classLoader) { -+ extends ActorSystemImpl(name, applicationConfig, classLoader, None) { - - protected override def uncaughtExceptionHandler: Thread.UncaughtExceptionHandler = { - val fallbackHandler = super.uncaughtExceptionHandler -diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala -index 9fdf9f8..9f9962d 100644 ---- a/project/SparkBuild.scala -+++ b/project/SparkBuild.scala -@@ -231,8 +231,9 @@ object SparkBuild extends Build { - "commons-daemon" % "commons-daemon" % "1.0.10", // workaround for bug HADOOP-9407 - "com.ning" % "compress-lzf" % "1.0.0", - "org.xerial.snappy" % "snappy-java" % "1.0.5", -- "org.spark-project.akka" %% "akka-remote" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty), -- "org.spark-project.akka" %% "akka-slf4j" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty), -+ "org.ow2.asm" % "asm" % "4.0", -+ "com.typesafe.akka" %% "akka-remote" % "2.3.0-RC2" excludeAll(excludeNetty), -+ "com.typesafe.akka" %% "akka-slf4j" % "2.3.0-RC2" excludeAll(excludeNetty), - "org.json4s" %% "json4s-jackson" % "3.2.6", - "it.unimi.dsi" % "fastutil" % "6.4.4", - "colt" % "colt" % "1.2.0", --- -1.8.3.4 (Apple Git-47) - diff --git a/spark-v0.9.1-rc2-0009-fedora-only-resolver-changes.patch b/spark-v0.9.1-rc2-0009-fedora-only-resolver-changes.patch deleted file mode 100644 index f2784d4..0000000 --- a/spark-v0.9.1-rc2-0009-fedora-only-resolver-changes.patch +++ /dev/null @@ -1,53 +0,0 @@ -From 378005da937aff3b1b548508d0fcbb2041bc939d Mon Sep 17 00:00:00 2001 -From: William Benton -Date: Fri, 28 Feb 2014 16:39:51 -0600 -Subject: [PATCH 9/9] fedora-only resolver changes - ---- - project/SparkBuild.scala | 15 ++++----------- - 1 file changed, 4 insertions(+), 11 deletions(-) - -diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala -index 9f9962d..6a3f40f 100644 ---- a/project/SparkBuild.scala -+++ b/project/SparkBuild.scala -@@ -101,7 +101,11 @@ object SparkBuild extends Build { - - lazy val allProjects = packageProjects ++ allExternalRefs ++ Seq[ProjectReference](tools) - -+ val ivyLocal = Resolver.file("local", file("IVY_LOCAL"))(Resolver.ivyStylePatterns) -+ - def sharedSettings = Defaults.defaultSettings ++ Seq( -+ externalResolvers := Seq(new sbt.RawRepository(new org.fedoraproject.maven.connector.ivy.IvyResolver), ivyLocal), -+ - organization := "org.apache.spark", - version := "0.9.1", - scalaVersion := "2.10.3", -@@ -131,13 +135,6 @@ object SparkBuild extends Build { - // Only allow one test at a time, even across projects, since they run in the same JVM - concurrentRestrictions in Global += Tags.limit(Tags.Test, 1), - -- // also check the local Maven repository ~/.m2 -- resolvers ++= Seq(Resolver.file("Local Maven Repo", file(Path.userHome + "/.m2/repository"))), -- -- // For Sonatype publishing -- resolvers ++= Seq("sonatype-snapshots" at "https://oss.sonatype.org/content/repositories/snapshots", -- "sonatype-staging" at "https://oss.sonatype.org/service/local/staging/deploy/maven2/"), -- - publishMavenStyle := true, - - //useGpg in Global := true, -@@ -217,10 +214,6 @@ object SparkBuild extends Build { - - def coreSettings = sharedSettings ++ Seq( - name := "spark-core", -- resolvers ++= Seq( -- "JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/", -- "Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/" -- ), - - libraryDependencies ++= Seq( - "com.google.guava" % "guava" % "14.0.1", --- -1.8.3.4 (Apple Git-47) - diff --git a/spark-v0.9.1-rc3-0001-Replace-lift-json-with-json4s-jackson.patch b/spark-v0.9.1-rc3-0001-Replace-lift-json-with-json4s-jackson.patch new file mode 100644 index 0000000..b871f4a --- /dev/null +++ b/spark-v0.9.1-rc3-0001-Replace-lift-json-with-json4s-jackson.patch @@ -0,0 +1,253 @@ +From 0b4c1430341d2f0d44f1339d4f8c7972af1d2821 Mon Sep 17 00:00:00 2001 +From: William Benton +Date: Sun, 23 Feb 2014 17:22:07 -0600 +Subject: [PATCH 1/9] Replace lift-json with json4s-jackson. + +The aim of the Json4s project is to provide a common API for +Scala JSON libraries. It is Apache-licensed, easier for +downstream distributions to package, and mostly API-compatible +with lift-json. Furthermore, the Jackson-backed implementation +parses faster than lift-json on all but the smallest inputs. + +Backported patch from master to 0.9.0. + +Conflicts: + core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala + core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala + core/src/main/scala/org/apache/spark/ui/JettyUtils.scala + core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +--- + core/pom.xml | 5 +++-- + .../apache/spark/deploy/FaultToleranceTest.scala | 9 ++++---- + .../org/apache/spark/deploy/JsonProtocol.scala | 2 +- + .../spark/deploy/master/ui/ApplicationPage.scala | 2 +- + .../apache/spark/deploy/master/ui/IndexPage.scala | 2 +- + .../apache/spark/deploy/worker/ui/IndexPage.scala | 2 +- + .../scala/org/apache/spark/ui/JettyUtils.scala | 8 ++++---- + .../apache/spark/deploy/JsonProtocolSuite.scala | 24 ++++++++++++++++++---- + project/SparkBuild.scala | 2 +- + 9 files changed, 37 insertions(+), 19 deletions(-) + +diff --git a/core/pom.xml b/core/pom.xml +index 8111400..e7aa19a 100644 +--- a/core/pom.xml ++++ b/core/pom.xml +@@ -104,8 +104,9 @@ + scala-library + + +- net.liftweb +- lift-json_${scala.binary.version} ++ org.json4s ++ json4s-jackson_${scala.binary.version} ++ 3.2.6 + + + it.unimi.dsi +diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala +index 4dfb19e..60a87af 100644 +--- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala ++++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala +@@ -29,7 +29,8 @@ import scala.concurrent.ExecutionContext.Implicits.global + import scala.collection.mutable.ListBuffer + import scala.sys.process._ + +-import net.liftweb.json.JsonParser ++import org.json4s._ ++import org.json4s.jackson.JsonMethods + + import org.apache.spark.{Logging, SparkContext} + import org.apache.spark.deploy.master.RecoveryState +@@ -312,7 +313,7 @@ private[spark] object FaultToleranceTest extends App with Logging { + private[spark] class TestMasterInfo(val ip: String, val dockerId: DockerId, val logFile: File) + extends Logging { + +- implicit val formats = net.liftweb.json.DefaultFormats ++ implicit val formats = org.json4s.DefaultFormats + var state: RecoveryState.Value = _ + var liveWorkerIPs: List[String] = _ + var numLiveApps = 0 +@@ -322,7 +323,7 @@ private[spark] class TestMasterInfo(val ip: String, val dockerId: DockerId, val + def readState() { + try { + val masterStream = new InputStreamReader(new URL("http://%s:8080/json".format(ip)).openStream) +- val json = JsonParser.parse(masterStream, closeAutomatically = true) ++ val json = JsonMethods.parse(masterStream) + + val workers = json \ "workers" + val liveWorkers = workers.children.filter(w => (w \ "state").extract[String] == "ALIVE") +@@ -350,7 +351,7 @@ private[spark] class TestMasterInfo(val ip: String, val dockerId: DockerId, val + private[spark] class TestWorkerInfo(val ip: String, val dockerId: DockerId, val logFile: File) + extends Logging { + +- implicit val formats = net.liftweb.json.DefaultFormats ++ implicit val formats = org.json4s.DefaultFormats + + logDebug("Created worker: " + this) + +diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala +index e607b8c..a43d004 100644 +--- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala ++++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala +@@ -17,7 +17,7 @@ + + package org.apache.spark.deploy + +-import net.liftweb.json.JsonDSL._ ++import org.json4s.JsonDSL._ + + import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse} + import org.apache.spark.deploy.master.{ApplicationInfo, WorkerInfo} +diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +index f29a6ad..cba89dc 100644 +--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala ++++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +@@ -22,7 +22,7 @@ import scala.xml.Node + + import akka.pattern.ask + import javax.servlet.http.HttpServletRequest +-import net.liftweb.json.JsonAST.JValue ++import org.json4s.JValue + + import org.apache.spark.deploy.JsonProtocol + import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} +diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala +index b549825..aa8beff 100644 +--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala ++++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala +@@ -23,7 +23,7 @@ import scala.xml.Node + + import akka.pattern.ask + import javax.servlet.http.HttpServletRequest +-import net.liftweb.json.JsonAST.JValue ++import org.json4s.JValue + + import org.apache.spark.deploy.JsonProtocol + import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} +diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala +index 925c6fb..de356dc 100644 +--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala ++++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala +@@ -22,7 +22,7 @@ import scala.xml.Node + + import akka.pattern.ask + import javax.servlet.http.HttpServletRequest +-import net.liftweb.json.JsonAST.JValue ++import org.json4s.JValue + + import org.apache.spark.deploy.JsonProtocol + import org.apache.spark.deploy.DeployMessages.{RequestWorkerState, WorkerStateResponse} +diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +index 7211dbc..4e43fd5 100644 +--- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala ++++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +@@ -23,10 +23,10 @@ import scala.annotation.tailrec + import scala.util.{Try, Success, Failure} + import scala.xml.Node + +-import net.liftweb.json.{JValue, pretty, render} +- +-import org.eclipse.jetty.server.{Server, Request, Handler} +-import org.eclipse.jetty.server.handler.{ResourceHandler, HandlerList, ContextHandler, AbstractHandler} ++import org.json4s.JValue ++import org.json4s.jackson.JsonMethods.{pretty, render} ++import org.eclipse.jetty.server.{Handler, Request, Server} ++import org.eclipse.jetty.server.handler.{AbstractHandler, ContextHandler, HandlerList, ResourceHandler} + import org.eclipse.jetty.util.thread.QueuedThreadPool + + import org.apache.spark.Logging +diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +index d05bbd6..8f1df8a 100644 +--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala ++++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +@@ -20,8 +20,12 @@ package org.apache.spark.deploy + import java.io.File + import java.util.Date + +-import net.liftweb.json.{JsonAST, JsonParser} +-import net.liftweb.json.JsonAST.JValue ++import org.json4s._ ++ ++import org.json4s.JValue ++import org.json4s.jackson.JsonMethods ++import com.fasterxml.jackson.core.JsonParseException ++ + import org.scalatest.FunSuite + + import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse} +@@ -32,21 +36,31 @@ class JsonProtocolSuite extends FunSuite { + test("writeApplicationInfo") { + val output = JsonProtocol.writeApplicationInfo(createAppInfo()) + assertValidJson(output) ++ assertValidDataInJson(output, JsonMethods.parse(JsonConstants.appInfoJsonStr)) + } + + test("writeWorkerInfo") { + val output = JsonProtocol.writeWorkerInfo(createWorkerInfo()) + assertValidJson(output) ++ assertValidDataInJson(output, JsonMethods.parse(JsonConstants.workerInfoJsonStr)) + } + + test("writeApplicationDescription") { + val output = JsonProtocol.writeApplicationDescription(createAppDesc()) + assertValidJson(output) ++ assertValidDataInJson(output, JsonMethods.parse(JsonConstants.appDescJsonStr)) + } + + test("writeExecutorRunner") { + val output = JsonProtocol.writeExecutorRunner(createExecutorRunner()) + assertValidJson(output) ++ assertValidDataInJson(output, JsonMethods.parse(JsonConstants.executorRunnerJsonStr)) ++ } ++ ++ test("writeDriverInfo") { ++ val output = JsonProtocol.writeDriverInfo(createDriverInfo()) ++ assertValidJson(output) ++ assertValidDataInJson(output, JsonMethods.parse(JsonConstants.driverInfoJsonStr)) + } + + test("writeMasterState") { +@@ -59,6 +73,7 @@ class JsonProtocolSuite extends FunSuite { + activeDrivers, completedDrivers, RecoveryState.ALIVE) + val output = JsonProtocol.writeMasterState(stateResponse) + assertValidJson(output) ++ assertValidDataInJson(output, JsonMethods.parse(JsonConstants.masterStateJsonStr)) + } + + test("writeWorkerState") { +@@ -70,6 +85,7 @@ class JsonProtocolSuite extends FunSuite { + finishedExecutors, drivers, finishedDrivers, "masterUrl", 4, 1234, 4, 1234, "masterWebUiUrl") + val output = JsonProtocol.writeWorkerState(stateResponse) + assertValidJson(output) ++ assertValidDataInJson(output, JsonMethods.parse(JsonConstants.workerStateJsonStr)) + } + + def createAppDesc(): ApplicationDescription = { +@@ -106,9 +122,9 @@ class JsonProtocolSuite extends FunSuite { + + def assertValidJson(json: JValue) { + try { +- JsonParser.parse(JsonAST.compactRender(json)) ++ JsonMethods.parse(JsonMethods.compact(json)) + } catch { +- case e: JsonParser.ParseException => fail("Invalid Json detected", e) ++ case e: JsonParseException => fail("Invalid Json detected", e) + } + } + } +diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala +index 52e894e..6346b29 100644 +--- a/project/SparkBuild.scala ++++ b/project/SparkBuild.scala +@@ -274,7 +274,7 @@ object SparkBuild extends Build { + "org.spark-project.akka" %% "akka-remote" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty), + "org.spark-project.akka" %% "akka-slf4j" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty), + "org.spark-project.akka" %% "akka-testkit" % "2.2.3-shaded-protobuf" % "test", +- "net.liftweb" %% "lift-json" % "2.5.1" excludeAll(excludeNetty), ++ "org.json4s" %% "json4s-jackson" % "3.2.6", + "it.unimi.dsi" % "fastutil" % "6.4.4", + "colt" % "colt" % "1.2.0", + "org.apache.mesos" % "mesos" % "0.13.0", +-- +1.8.3.4 (Apple Git-47) + diff --git a/spark-v0.9.1-rc3-0002-use-sbt-0.13.1.patch b/spark-v0.9.1-rc3-0002-use-sbt-0.13.1.patch new file mode 100644 index 0000000..2aeb835 --- /dev/null +++ b/spark-v0.9.1-rc3-0002-use-sbt-0.13.1.patch @@ -0,0 +1,22 @@ +From 3707debe60b331f2c6ad2451b82b7ba8bf2ed6b8 Mon Sep 17 00:00:00 2001 +From: William Benton +Date: Thu, 27 Feb 2014 14:25:34 -0600 +Subject: [PATCH 2/9] use sbt 0.13.1 + +--- + project/build.properties | 2 +- + 1 file changed, 1 insertion(+), 1 deletion(-) + +diff --git a/project/build.properties b/project/build.properties +index 839f5fb..4b52bb9 100644 +--- a/project/build.properties ++++ b/project/build.properties +@@ -14,4 +14,4 @@ + # See the License for the specific language governing permissions and + # limitations under the License. + # +-sbt.version=0.12.4 ++sbt.version=0.13.1 +-- +1.8.3.4 (Apple Git-47) + diff --git a/spark-v0.9.1-rc3-0003-Removed-sbt-plugins.patch b/spark-v0.9.1-rc3-0003-Removed-sbt-plugins.patch new file mode 100644 index 0000000..35620ea --- /dev/null +++ b/spark-v0.9.1-rc3-0003-Removed-sbt-plugins.patch @@ -0,0 +1,175 @@ +From 97173e00a590b7c0d247143e38e6bc7a1919fbae Mon Sep 17 00:00:00 2001 +From: William Benton +Date: Thu, 27 Feb 2014 15:46:41 -0600 +Subject: [PATCH 3/9] Removed sbt plugins. + +Patch ported from Fedora Spark 0.9.0 package. + +Conflicts: + project/SparkBuild.scala + project/project/SparkPluginBuild.scala +--- + project/SparkBuild.scala | 44 ++++------------------------------ + project/plugins.sbt | 18 -------------- + project/project/SparkPluginBuild.scala | 24 ------------------- + 3 files changed, 5 insertions(+), 81 deletions(-) + delete mode 100644 project/plugins.sbt + delete mode 100644 project/project/SparkPluginBuild.scala + +diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala +index 6346b29..1eaa755 100644 +--- a/project/SparkBuild.scala ++++ b/project/SparkBuild.scala +@@ -18,8 +18,6 @@ + import sbt._ + import sbt.Classpaths.publishTask + import Keys._ +-import sbtassembly.Plugin._ +-import AssemblyKeys._ + import scala.util.Properties + // For Sonatype publishing + //import com.jsuereth.pgp.sbtplugin.PgpKeys._ +@@ -60,17 +58,6 @@ object SparkBuild extends Build { + + lazy val mllib = Project("mllib", file("mllib"), settings = mllibSettings) dependsOn(core) + +- lazy val assemblyProj = Project("assembly", file("assembly"), settings = assemblyProjSettings) +- .dependsOn(core, graphx, bagel, mllib, repl, streaming) dependsOn(maybeYarn: _*) dependsOn(maybeGanglia: _*) +- +- lazy val assembleDeps = TaskKey[Unit]("assemble-deps", "Build assembly of dependencies and packages Spark projects") +- +- // A dummy command so we can run the Jenkins pull request builder for older versions of Spark. +- lazy val scalastyle = TaskKey[Unit]("scalastyle", "Dummy scalastyle check") +- val scalastyleTask = scalastyle := { +- println("scalastyle is not configured for this version of Spark project.") +- } +- + // A configuration to set an alternative publishLocalConfiguration + lazy val MavenCompile = config("m2r") extend(Compile) + lazy val publishLocalBoth = TaskKey[Unit]("publish-local", "publish local for m2 and ivy") +@@ -130,7 +117,7 @@ object SparkBuild extends Build { + // Everything except assembly, tools and examples belong to packageProjects + lazy val packageProjects = Seq[ProjectReference](core, repl, bagel, streaming, mllib, graphx) ++ maybeYarnRef ++ maybeGangliaRef + +- lazy val allProjects = packageProjects ++ allExternalRefs ++ Seq[ProjectReference](examples, tools, assemblyProj) ++ lazy val allProjects = packageProjects ++ allExternalRefs ++ Seq[ProjectReference](examples, tools) + + def sharedSettings = Defaults.defaultSettings ++ Seq( + organization := "org.apache.spark", +@@ -143,7 +130,6 @@ object SparkBuild extends Build { + retrieveManaged := true, + retrievePattern := "[type]s/[artifact](-[revision])(-[classifier]).[ext]", + transitiveClassifiers in Scope.GlobalScope := Seq("sources"), +- testListeners <<= target.map(t => Seq(new eu.henkelmann.sbt.JUnitXmlTestsListener(t.getAbsolutePath))), + + // Fork new JVMs for tests and set Java options for those + fork := true, +@@ -245,8 +231,8 @@ object SparkBuild extends Build { + publishMavenStyle in MavenCompile := true, + publishLocal in MavenCompile <<= publishTask(publishLocalConfiguration in MavenCompile, deliverLocal), + publishLocalBoth <<= Seq(publishLocal in MavenCompile, publishLocal).dependOn +- ) ++ net.virtualvoid.sbt.graph.Plugin.graphSettings +- ++ ) ++ + val slf4jVersion = "1.7.2" + + val excludeCglib = ExclusionRule(organization = "org.sonatype.sisu.inject") +@@ -322,11 +308,11 @@ object SparkBuild extends Build { + excludeAll(excludeSnappy) + excludeAll(excludeCglib) + ) +- ) ++ assemblySettings ++ extraAssemblySettings ++ ) + + def toolsSettings = sharedSettings ++ Seq( + name := "spark-tools" +- ) ++ assemblySettings ++ extraAssemblySettings ++ ) + + def graphxSettings = sharedSettings ++ Seq( + name := "spark-graphx", +@@ -395,26 +381,6 @@ object SparkBuild extends Build { + ) + ) + +- def assemblyProjSettings = sharedSettings ++ Seq( +- libraryDependencies += "net.sf.py4j" % "py4j" % "0.8.1", +- name := "spark-assembly", +- scalastyleTask, +- assembleDeps in Compile <<= (packageProjects.map(packageBin in Compile in _) ++ Seq(packageDependency in Compile)).dependOn, +- jarName in assembly <<= version map { v => "spark-assembly-" + v + "-hadoop" + hadoopVersion + ".jar" }, +- jarName in packageDependency <<= version map { v => "spark-assembly-" + v + "-hadoop" + hadoopVersion + "-deps.jar" } +- ) ++ assemblySettings ++ extraAssemblySettings +- +- def extraAssemblySettings() = Seq( +- test in assembly := {}, +- mergeStrategy in assembly := { +- case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard +- case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard +- case "log4j.properties" => MergeStrategy.discard +- case m if m.toLowerCase.startsWith("meta-inf/services/") => MergeStrategy.filterDistinctLines +- case "reference.conf" => MergeStrategy.concat +- case _ => MergeStrategy.first +- } +- ) + + def twitterSettings() = sharedSettings ++ Seq( + name := "spark-streaming-twitter", +diff --git a/project/plugins.sbt b/project/plugins.sbt +deleted file mode 100644 +index 4ba0e42..0000000 +--- a/project/plugins.sbt ++++ /dev/null +@@ -1,18 +0,0 @@ +-resolvers += Resolver.url("artifactory", url("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases"))(Resolver.ivyStylePatterns) +- +-resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/" +- +-resolvers += "Spray Repository" at "http://repo.spray.cc/" +- +-addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.9.2") +- +-addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.2.0") +- +-addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.5.1") +- +-// For Sonatype publishing +-//resolvers += Resolver.url("sbt-plugin-releases", new URL("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases/"))(Resolver.ivyStylePatterns) +- +-//addSbtPlugin("com.jsuereth" % "xsbt-gpg-plugin" % "0.6") +- +-addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.7.3") +diff --git a/project/project/SparkPluginBuild.scala b/project/project/SparkPluginBuild.scala +deleted file mode 100644 +index 4853be2..0000000 +--- a/project/project/SparkPluginBuild.scala ++++ /dev/null +@@ -1,24 +0,0 @@ +-/* +- * Licensed to the Apache Software Foundation (ASF) under one or more +- * contributor license agreements. See the NOTICE file distributed with +- * this work for additional information regarding copyright ownership. +- * The ASF licenses this file to You under the Apache License, Version 2.0 +- * (the "License"); you may not use this file except in compliance with +- * the License. You may obtain a copy of the License at +- * +- * http://www.apache.org/licenses/LICENSE-2.0 +- * +- * Unless required by applicable law or agreed to in writing, software +- * distributed under the License is distributed on an "AS IS" BASIS, +- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +- * See the License for the specific language governing permissions and +- * limitations under the License. +- */ +- +-import sbt._ +- +-object SparkPluginDef extends Build { +- lazy val root = Project("plugins", file(".")) dependsOn(junitXmlListener) +- /* This is not published in a Maven repository, so we get it from GitHub directly */ +- lazy val junitXmlListener = uri("https://github.com/ijuma/junit_xml_listener.git#fe434773255b451a38e8d889536ebc260f4225ce") +-} +-- +1.8.3.4 (Apple Git-47) + diff --git a/spark-v0.9.1-rc3-0004-removed-examples.patch b/spark-v0.9.1-rc3-0004-removed-examples.patch new file mode 100644 index 0000000..547b1ba --- /dev/null +++ b/spark-v0.9.1-rc3-0004-removed-examples.patch @@ -0,0 +1,126 @@ +From 62b6bf1de54e846982609666f5c635e6e85e41ba Mon Sep 17 00:00:00 2001 +From: William Benton +Date: Thu, 27 Feb 2014 16:01:11 -0600 +Subject: [PATCH 4/9] removed examples + +ported patch from Fedora Spark 0.9.0 package + +Conflicts: + project/SparkBuild.scala +--- + project/SparkBuild.scala | 83 ++---------------------------------------------- + 1 file changed, 3 insertions(+), 80 deletions(-) + +diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala +index 1eaa755..3925475 100644 +--- a/project/SparkBuild.scala ++++ b/project/SparkBuild.scala +@@ -93,31 +93,13 @@ object SparkBuild extends Build { + lazy val maybeYarn: Seq[ClasspathDependency] = if (isYarnEnabled) Seq(if (isNewHadoop) yarn else yarnAlpha) else Seq() + lazy val maybeYarnRef: Seq[ProjectReference] = if (isYarnEnabled) Seq(if (isNewHadoop) yarn else yarnAlpha) else Seq() + +- lazy val externalTwitter = Project("external-twitter", file("external/twitter"), settings = twitterSettings) +- .dependsOn(streaming % "compile->compile;test->test") +- +- lazy val externalKafka = Project("external-kafka", file("external/kafka"), settings = kafkaSettings) +- .dependsOn(streaming % "compile->compile;test->test") +- +- lazy val externalFlume = Project("external-flume", file("external/flume"), settings = flumeSettings) +- .dependsOn(streaming % "compile->compile;test->test") +- +- lazy val externalZeromq = Project("external-zeromq", file("external/zeromq"), settings = zeromqSettings) +- .dependsOn(streaming % "compile->compile;test->test") +- +- lazy val externalMqtt = Project("external-mqtt", file("external/mqtt"), settings = mqttSettings) +- .dependsOn(streaming % "compile->compile;test->test") +- +- lazy val allExternal = Seq[ClasspathDependency](externalTwitter, externalKafka, externalFlume, externalZeromq, externalMqtt) +- lazy val allExternalRefs = Seq[ProjectReference](externalTwitter, externalKafka, externalFlume, externalZeromq, externalMqtt) +- +- lazy val examples = Project("examples", file("examples"), settings = examplesSettings) +- .dependsOn(core, mllib, graphx, bagel, streaming, externalTwitter) dependsOn(allExternal: _*) ++ lazy val allExternal = Seq[ClasspathDependency]() ++ lazy val allExternalRefs = Seq[ProjectReference]() + + // Everything except assembly, tools and examples belong to packageProjects + lazy val packageProjects = Seq[ProjectReference](core, repl, bagel, streaming, mllib, graphx) ++ maybeYarnRef ++ maybeGangliaRef + +- lazy val allProjects = packageProjects ++ allExternalRefs ++ Seq[ProjectReference](examples, tools) ++ lazy val allProjects = packageProjects ++ allExternalRefs ++ Seq[ProjectReference](tools) + + def sharedSettings = Defaults.defaultSettings ++ Seq( + organization := "org.apache.spark", +@@ -291,25 +273,6 @@ object SparkBuild extends Build { + libraryDependencies <+= scalaVersion(v => "org.scala-lang" % "scala-reflect" % v ) + ) + +- def examplesSettings = sharedSettings ++ Seq( +- name := "spark-examples", +- libraryDependencies ++= Seq( +- "com.twitter" %% "algebird-core" % "0.1.11", +- "org.apache.hbase" % "hbase" % "0.94.6" excludeAll(excludeNetty, excludeAsm), +- "org.apache.hbase" % "hbase" % HBASE_VERSION excludeAll(excludeNetty, excludeAsm), +- "org.apache.cassandra" % "cassandra-all" % "1.2.6" +- exclude("com.google.guava", "guava") +- exclude("com.googlecode.concurrentlinkedhashmap", "concurrentlinkedhashmap-lru") +- exclude("com.ning","compress-lzf") +- exclude("io.netty", "netty") +- exclude("jline","jline") +- exclude("log4j","log4j") +- exclude("org.apache.cassandra.deps", "avro") +- excludeAll(excludeSnappy) +- excludeAll(excludeCglib) +- ) +- ) +- + def toolsSettings = sharedSettings ++ Seq( + name := "spark-tools" + ) +@@ -380,44 +343,4 @@ object SparkBuild extends Build { + "org.apache.hadoop" % "hadoop-yarn-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib) + ) + ) +- +- +- def twitterSettings() = sharedSettings ++ Seq( +- name := "spark-streaming-twitter", +- libraryDependencies ++= Seq( +- "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty) +- ) +- ) +- +- def kafkaSettings() = sharedSettings ++ Seq( +- name := "spark-streaming-kafka", +- libraryDependencies ++= Seq( +- "com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty), +- "org.apache.kafka" %% "kafka" % "0.8.0" +- exclude("com.sun.jdmk", "jmxtools") +- exclude("com.sun.jmx", "jmxri") +- exclude("net.sf.jopt-simple", "jopt-simple") +- excludeAll(excludeNetty) +- ) +- ) +- +- def flumeSettings() = sharedSettings ++ Seq( +- name := "spark-streaming-flume", +- libraryDependencies ++= Seq( +- "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy) +- ) +- ) +- +- def zeromqSettings() = sharedSettings ++ Seq( +- name := "spark-streaming-zeromq", +- libraryDependencies ++= Seq( +- "org.spark-project.akka" %% "akka-zeromq" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty) +- ) +- ) +- +- def mqttSettings() = streamingSettings ++ Seq( +- name := "spark-streaming-mqtt", +- resolvers ++= Seq("Eclipse Repo" at "https://repo.eclipse.org/content/repositories/paho-releases/"), +- libraryDependencies ++= Seq("org.eclipse.paho" % "mqtt-client" % "0.4.0") +- ) + } +-- +1.8.3.4 (Apple Git-47) + diff --git a/spark-v0.9.1-rc3-0005-Removed-code-depending-on-Kryo.patch b/spark-v0.9.1-rc3-0005-Removed-code-depending-on-Kryo.patch new file mode 100644 index 0000000..845a292 --- /dev/null +++ b/spark-v0.9.1-rc3-0005-Removed-code-depending-on-Kryo.patch @@ -0,0 +1,629 @@ +From 39f1a360d8d8d7f688e0bd238c91d676240e55ad Mon Sep 17 00:00:00 2001 +From: William Benton +Date: Thu, 27 Feb 2014 16:43:44 -0600 +Subject: [PATCH 5/9] Removed code depending on Kryo + +Conflicts: + core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +--- + .../apache/spark/serializer/KryoSerializer.scala | 175 --------------------- + .../apache/spark/storage/StoragePerfTester.scala | 103 ------------ + .../org/apache/spark/storage/ThreadingTest.scala | 115 -------------- + .../util/collection/ExternalAppendOnlyMap.scala | 1 + + .../apache/spark/graphx/GraphKryoRegistrator.scala | 48 ------ + .../apache/spark/mllib/recommendation/ALS.scala | 12 -- + .../spark/streaming/util/RawTextSender.scala | 82 ---------- + 7 files changed, 1 insertion(+), 535 deletions(-) + delete mode 100644 core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala + delete mode 100644 core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala + delete mode 100644 core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala + delete mode 100644 graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala + delete mode 100644 streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala + +diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +deleted file mode 100644 +index c14cd47..0000000 +--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ++++ /dev/null +@@ -1,175 +0,0 @@ +-/* +- * Licensed to the Apache Software Foundation (ASF) under one or more +- * contributor license agreements. See the NOTICE file distributed with +- * this work for additional information regarding copyright ownership. +- * The ASF licenses this file to You under the Apache License, Version 2.0 +- * (the "License"); you may not use this file except in compliance with +- * the License. You may obtain a copy of the License at +- * +- * http://www.apache.org/licenses/LICENSE-2.0 +- * +- * Unless required by applicable law or agreed to in writing, software +- * distributed under the License is distributed on an "AS IS" BASIS, +- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +- * See the License for the specific language governing permissions and +- * limitations under the License. +- */ +- +-package org.apache.spark.serializer +- +-import java.nio.ByteBuffer +-import java.io.{EOFException, InputStream, OutputStream} +- +-import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer} +-import com.esotericsoftware.kryo.{KryoException, Kryo} +-import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} +-import com.twitter.chill.{EmptyScalaKryoInstantiator, AllScalaRegistrar} +- +-import org.apache.spark._ +-import org.apache.spark.broadcast.HttpBroadcast +-import org.apache.spark.scheduler.MapStatus +-import org.apache.spark.storage._ +-import org.apache.spark.storage.{GetBlock, GotBlock, PutBlock} +- +-/** +- * A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]]. +- */ +-class KryoSerializer(conf: SparkConf) extends org.apache.spark.serializer.Serializer with Logging { +- private val bufferSize = { +- conf.getInt("spark.kryoserializer.buffer.mb", 2) * 1024 * 1024 +- } +- +- def newKryoOutput() = new KryoOutput(bufferSize) +- +- def newKryo(): Kryo = { +- val instantiator = new EmptyScalaKryoInstantiator +- val kryo = instantiator.newKryo() +- val classLoader = Thread.currentThread.getContextClassLoader +- +- // Allow disabling Kryo reference tracking if user knows their object graphs don't have loops. +- // Do this before we invoke the user registrator so the user registrator can override this. +- kryo.setReferences(conf.getBoolean("spark.kryo.referenceTracking", true)) +- +- for (cls <- KryoSerializer.toRegister) kryo.register(cls) +- +- // Allow sending SerializableWritable +- kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer()) +- kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer()) +- +- // Allow the user to register their own classes by setting spark.kryo.registrator +- try { +- for (regCls <- conf.getOption("spark.kryo.registrator")) { +- logDebug("Running user registrator: " + regCls) +- val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator] +- reg.registerClasses(kryo) +- } +- } catch { +- case e: Exception => logError("Failed to run spark.kryo.registrator", e) +- } +- +- // Register Chill's classes; we do this after our ranges and the user's own classes to let +- // our code override the generic serialziers in Chill for things like Seq +- new AllScalaRegistrar().apply(kryo) +- +- kryo.setClassLoader(classLoader) +- kryo +- } +- +- def newInstance(): SerializerInstance = { +- new KryoSerializerInstance(this) +- } +-} +- +-private[spark] +-class KryoSerializationStream(kryo: Kryo, outStream: OutputStream) extends SerializationStream { +- val output = new KryoOutput(outStream) +- +- def writeObject[T](t: T): SerializationStream = { +- kryo.writeClassAndObject(output, t) +- this +- } +- +- def flush() { output.flush() } +- def close() { output.close() } +-} +- +-private[spark] +-class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends DeserializationStream { +- val input = new KryoInput(inStream) +- +- def readObject[T](): T = { +- try { +- kryo.readClassAndObject(input).asInstanceOf[T] +- } catch { +- // DeserializationStream uses the EOF exception to indicate stopping condition. +- case _: KryoException => throw new EOFException +- } +- } +- +- def close() { +- // Kryo's Input automatically closes the input stream it is using. +- input.close() +- } +-} +- +-private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance { +- val kryo = ks.newKryo() +- +- // Make these lazy vals to avoid creating a buffer unless we use them +- lazy val output = ks.newKryoOutput() +- lazy val input = new KryoInput() +- +- def serialize[T](t: T): ByteBuffer = { +- output.clear() +- kryo.writeClassAndObject(output, t) +- ByteBuffer.wrap(output.toBytes) +- } +- +- def deserialize[T](bytes: ByteBuffer): T = { +- input.setBuffer(bytes.array) +- kryo.readClassAndObject(input).asInstanceOf[T] +- } +- +- def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = { +- val oldClassLoader = kryo.getClassLoader +- kryo.setClassLoader(loader) +- input.setBuffer(bytes.array) +- val obj = kryo.readClassAndObject(input).asInstanceOf[T] +- kryo.setClassLoader(oldClassLoader) +- obj +- } +- +- def serializeStream(s: OutputStream): SerializationStream = { +- new KryoSerializationStream(kryo, s) +- } +- +- def deserializeStream(s: InputStream): DeserializationStream = { +- new KryoDeserializationStream(kryo, s) +- } +-} +- +-/** +- * Interface implemented by clients to register their classes with Kryo when using Kryo +- * serialization. +- */ +-trait KryoRegistrator { +- def registerClasses(kryo: Kryo) +-} +- +-private[serializer] object KryoSerializer { +- // Commonly used classes. +- private val toRegister: Seq[Class[_]] = Seq( +- ByteBuffer.allocate(1).getClass, +- classOf[StorageLevel], +- classOf[PutBlock], +- classOf[GotBlock], +- classOf[GetBlock], +- classOf[MapStatus], +- classOf[BlockManagerId], +- classOf[Array[Byte]], +- (1 to 10).getClass, +- (1 until 10).getClass, +- (1L to 10L).getClass, +- (1L until 10L).getClass +- ) +-} +diff --git a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala b/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala +deleted file mode 100644 +index 40734aa..0000000 +--- a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala ++++ /dev/null +@@ -1,103 +0,0 @@ +-/* +- * Licensed to the Apache Software Foundation (ASF) under one or more +- * contributor license agreements. See the NOTICE file distributed with +- * this work for additional information regarding copyright ownership. +- * The ASF licenses this file to You under the Apache License, Version 2.0 +- * (the "License"); you may not use this file except in compliance with +- * the License. You may obtain a copy of the License at +- * +- * http://www.apache.org/licenses/LICENSE-2.0 +- * +- * Unless required by applicable law or agreed to in writing, software +- * distributed under the License is distributed on an "AS IS" BASIS, +- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +- * See the License for the specific language governing permissions and +- * limitations under the License. +- */ +- +-package org.apache.spark.storage +- +-import java.util.concurrent.atomic.AtomicLong +-import java.util.concurrent.{CountDownLatch, Executors} +- +-import org.apache.spark.serializer.KryoSerializer +-import org.apache.spark.SparkContext +-import org.apache.spark.util.Utils +- +-/** +- * Utility for micro-benchmarking shuffle write performance. +- * +- * Writes simulated shuffle output from several threads and records the observed throughput. +- */ +-object StoragePerfTester { +- def main(args: Array[String]) = { +- /** Total amount of data to generate. Distributed evenly amongst maps and reduce splits. */ +- val dataSizeMb = Utils.memoryStringToMb(sys.env.getOrElse("OUTPUT_DATA", "1g")) +- +- /** Number of map tasks. All tasks execute concurrently. */ +- val numMaps = sys.env.get("NUM_MAPS").map(_.toInt).getOrElse(8) +- +- /** Number of reduce splits for each map task. */ +- val numOutputSplits = sys.env.get("NUM_REDUCERS").map(_.toInt).getOrElse(500) +- +- val recordLength = 1000 // ~1KB records +- val totalRecords = dataSizeMb * 1000 +- val recordsPerMap = totalRecords / numMaps +- +- val writeData = "1" * recordLength +- val executor = Executors.newFixedThreadPool(numMaps) +- +- System.setProperty("spark.shuffle.compress", "false") +- System.setProperty("spark.shuffle.sync", "true") +- +- // This is only used to instantiate a BlockManager. All thread scheduling is done manually. +- val sc = new SparkContext("local[4]", "Write Tester") +- val blockManager = sc.env.blockManager +- +- def writeOutputBytes(mapId: Int, total: AtomicLong) = { +- val shuffle = blockManager.shuffleBlockManager.forMapTask(1, mapId, numOutputSplits, +- new KryoSerializer(sc.conf)) +- val writers = shuffle.writers +- for (i <- 1 to recordsPerMap) { +- writers(i % numOutputSplits).write(writeData) +- } +- writers.map {w => +- w.commit() +- total.addAndGet(w.fileSegment().length) +- w.close() +- } +- +- shuffle.releaseWriters(true) +- } +- +- val start = System.currentTimeMillis() +- val latch = new CountDownLatch(numMaps) +- val totalBytes = new AtomicLong() +- for (task <- 1 to numMaps) { +- executor.submit(new Runnable() { +- override def run() = { +- try { +- writeOutputBytes(task, totalBytes) +- latch.countDown() +- } catch { +- case e: Exception => +- println("Exception in child thread: " + e + " " + e.getMessage) +- System.exit(1) +- } +- } +- }) +- } +- latch.await() +- val end = System.currentTimeMillis() +- val time = (end - start) / 1000.0 +- val bytesPerSecond = totalBytes.get() / time +- val bytesPerFile = (totalBytes.get() / (numOutputSplits * numMaps.toDouble)).toLong +- +- System.err.println("files_total\t\t%s".format(numMaps * numOutputSplits)) +- System.err.println("bytes_per_file\t\t%s".format(Utils.bytesToString(bytesPerFile))) +- System.err.println("agg_throughput\t\t%s/s".format(Utils.bytesToString(bytesPerSecond.toLong))) +- +- executor.shutdown() +- sc.stop() +- } +-} +diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala +deleted file mode 100644 +index 729ba2c..0000000 +--- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala ++++ /dev/null +@@ -1,115 +0,0 @@ +-/* +- * Licensed to the Apache Software Foundation (ASF) under one or more +- * contributor license agreements. See the NOTICE file distributed with +- * this work for additional information regarding copyright ownership. +- * The ASF licenses this file to You under the Apache License, Version 2.0 +- * (the "License"); you may not use this file except in compliance with +- * the License. You may obtain a copy of the License at +- * +- * http://www.apache.org/licenses/LICENSE-2.0 +- * +- * Unless required by applicable law or agreed to in writing, software +- * distributed under the License is distributed on an "AS IS" BASIS, +- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +- * See the License for the specific language governing permissions and +- * limitations under the License. +- */ +- +-package org.apache.spark.storage +- +-import akka.actor._ +- +-import java.util.concurrent.ArrayBlockingQueue +-import util.Random +-import org.apache.spark.serializer.KryoSerializer +-import org.apache.spark.{SparkConf, SparkContext} +- +-/** +- * This class tests the BlockManager and MemoryStore for thread safety and +- * deadlocks. It spawns a number of producer and consumer threads. Producer +- * threads continuously pushes blocks into the BlockManager and consumer +- * threads continuously retrieves the blocks form the BlockManager and tests +- * whether the block is correct or not. +- */ +-private[spark] object ThreadingTest { +- +- val numProducers = 5 +- val numBlocksPerProducer = 20000 +- +- private[spark] class ProducerThread(manager: BlockManager, id: Int) extends Thread { +- val queue = new ArrayBlockingQueue[(BlockId, Seq[Int])](100) +- +- override def run() { +- for (i <- 1 to numBlocksPerProducer) { +- val blockId = TestBlockId("b-" + id + "-" + i) +- val blockSize = Random.nextInt(1000) +- val block = (1 to blockSize).map(_ => Random.nextInt()) +- val level = randomLevel() +- val startTime = System.currentTimeMillis() +- manager.put(blockId, block.iterator, level, true) +- println("Pushed block " + blockId + " in " + (System.currentTimeMillis - startTime) + " ms") +- queue.add((blockId, block)) +- } +- println("Producer thread " + id + " terminated") +- } +- +- def randomLevel(): StorageLevel = { +- math.abs(Random.nextInt()) % 4 match { +- case 0 => StorageLevel.MEMORY_ONLY +- case 1 => StorageLevel.MEMORY_ONLY_SER +- case 2 => StorageLevel.MEMORY_AND_DISK +- case 3 => StorageLevel.MEMORY_AND_DISK_SER +- } +- } +- } +- +- private[spark] class ConsumerThread( +- manager: BlockManager, +- queue: ArrayBlockingQueue[(BlockId, Seq[Int])] +- ) extends Thread { +- var numBlockConsumed = 0 +- +- override def run() { +- println("Consumer thread started") +- while(numBlockConsumed < numBlocksPerProducer) { +- val (blockId, block) = queue.take() +- val startTime = System.currentTimeMillis() +- manager.get(blockId) match { +- case Some(retrievedBlock) => +- assert(retrievedBlock.toList.asInstanceOf[List[Int]] == block.toList, +- "Block " + blockId + " did not match") +- println("Got block " + blockId + " in " + +- (System.currentTimeMillis - startTime) + " ms") +- case None => +- assert(false, "Block " + blockId + " could not be retrieved") +- } +- numBlockConsumed += 1 +- } +- println("Consumer thread terminated") +- } +- } +- +- def main(args: Array[String]) { +- System.setProperty("spark.kryoserializer.buffer.mb", "1") +- val actorSystem = ActorSystem("test") +- val conf = new SparkConf() +- val serializer = new KryoSerializer(conf) +- val blockManagerMaster = new BlockManagerMaster( +- actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf))), conf) +- val blockManager = new BlockManager( +- "", actorSystem, blockManagerMaster, serializer, 1024 * 1024, conf) +- val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i)) +- val consumers = producers.map(p => new ConsumerThread(blockManager, p.queue)) +- producers.foreach(_.start) +- consumers.foreach(_.start) +- producers.foreach(_.join) +- consumers.foreach(_.join) +- blockManager.stop() +- blockManagerMaster.stop() +- actorSystem.shutdown() +- actorSystem.awaitTermination() +- println("Everything stopped.") +- println( +- "It will take sometime for the JVM to clean all temporary files and shutdown. Sit tight.") +- } +-} +diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +index 6f36817..c4f3efe 100644 +--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala ++++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +@@ -30,6 +30,7 @@ import org.apache.spark.{Logging, SparkEnv} + import org.apache.spark.serializer.Serializer + import org.apache.spark.storage.{BlockId, BlockManager} + ++ + /** + * An append-only map that spills sorted content to disk when there is insufficient space for it + * to grow. +diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala +deleted file mode 100644 +index dd380d8..0000000 +--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala ++++ /dev/null +@@ -1,48 +0,0 @@ +-/* +- * Licensed to the Apache Software Foundation (ASF) under one or more +- * contributor license agreements. See the NOTICE file distributed with +- * this work for additional information regarding copyright ownership. +- * The ASF licenses this file to You under the Apache License, Version 2.0 +- * (the "License"); you may not use this file except in compliance with +- * the License. You may obtain a copy of the License at +- * +- * http://www.apache.org/licenses/LICENSE-2.0 +- * +- * Unless required by applicable law or agreed to in writing, software +- * distributed under the License is distributed on an "AS IS" BASIS, +- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +- * See the License for the specific language governing permissions and +- * limitations under the License. +- */ +- +-package org.apache.spark.graphx +- +-import com.esotericsoftware.kryo.Kryo +- +-import org.apache.spark.graphx.impl._ +-import org.apache.spark.serializer.KryoRegistrator +-import org.apache.spark.util.collection.BitSet +-import org.apache.spark.util.BoundedPriorityQueue +- +-/** +- * Registers GraphX classes with Kryo for improved performance. +- */ +-class GraphKryoRegistrator extends KryoRegistrator { +- +- def registerClasses(kryo: Kryo) { +- kryo.register(classOf[Edge[Object]]) +- kryo.register(classOf[MessageToPartition[Object]]) +- kryo.register(classOf[VertexBroadcastMsg[Object]]) +- kryo.register(classOf[(VertexId, Object)]) +- kryo.register(classOf[EdgePartition[Object]]) +- kryo.register(classOf[BitSet]) +- kryo.register(classOf[VertexIdToIndexMap]) +- kryo.register(classOf[VertexAttributeBlock[Object]]) +- kryo.register(classOf[PartitionStrategy]) +- kryo.register(classOf[BoundedPriorityQueue[Object]]) +- kryo.register(classOf[EdgeDirection]) +- +- // This avoids a large number of hash table lookups. +- kryo.setReferences(false) +- } +-} +diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +index 44db51c..f13781a 100644 +--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala ++++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +@@ -26,10 +26,8 @@ import org.apache.spark.broadcast.Broadcast + import org.apache.spark.{Logging, HashPartitioner, Partitioner, SparkContext, SparkConf} + import org.apache.spark.storage.StorageLevel + import org.apache.spark.rdd.RDD +-import org.apache.spark.serializer.KryoRegistrator + import org.apache.spark.SparkContext._ + +-import com.esotericsoftware.kryo.Kryo + import org.jblas.{DoubleMatrix, SimpleBlas, Solve} + + +@@ -641,12 +639,6 @@ object ALS { + trainImplicit(ratings, rank, iterations, 0.01, -1, 1.0) + } + +- private class ALSRegistrator extends KryoRegistrator { +- override def registerClasses(kryo: Kryo) { +- kryo.register(classOf[Rating]) +- } +- } +- + def main(args: Array[String]) { + if (args.length < 5 || args.length > 9) { + println("Usage: ALS " + +@@ -660,10 +652,6 @@ object ALS { + val alpha = if (args.length >= 8) args(7).toDouble else 1 + val blocks = if (args.length == 9) args(8).toInt else -1 + val conf = new SparkConf() +- .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") +- .set("spark.kryo.registrator", classOf[ALSRegistrator].getName) +- .set("spark.kryo.referenceTracking", "false") +- .set("spark.kryoserializer.buffer.mb", "8") + .set("spark.locality.wait", "10000") + val sc = new SparkContext(master, "ALS", conf) + +diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala +deleted file mode 100644 +index 684b38e..0000000 +--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala ++++ /dev/null +@@ -1,82 +0,0 @@ +-/* +- * Licensed to the Apache Software Foundation (ASF) under one or more +- * contributor license agreements. See the NOTICE file distributed with +- * this work for additional information regarding copyright ownership. +- * The ASF licenses this file to You under the Apache License, Version 2.0 +- * (the "License"); you may not use this file except in compliance with +- * the License. You may obtain a copy of the License at +- * +- * http://www.apache.org/licenses/LICENSE-2.0 +- * +- * Unless required by applicable law or agreed to in writing, software +- * distributed under the License is distributed on an "AS IS" BASIS, +- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +- * See the License for the specific language governing permissions and +- * limitations under the License. +- */ +- +-package org.apache.spark.streaming.util +- +-import java.io.IOException +-import java.net.ServerSocket +-import java.nio.ByteBuffer +- +-import scala.io.Source +- +-import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream +- +-import org.apache.spark.{SparkConf, Logging} +-import org.apache.spark.serializer.KryoSerializer +-import org.apache.spark.util.IntParam +- +-/** +- * A helper program that sends blocks of Kryo-serialized text strings out on a socket at a +- * specified rate. Used to feed data into RawInputDStream. +- */ +-private[streaming] +-object RawTextSender extends Logging { +- def main(args: Array[String]) { +- if (args.length != 4) { +- System.err.println("Usage: RawTextSender ") +- System.exit(1) +- } +- // Parse the arguments using a pattern match +- val Array(IntParam(port), file, IntParam(blockSize), IntParam(bytesPerSec)) = args +- +- // Repeat the input data multiple times to fill in a buffer +- val lines = Source.fromFile(file).getLines().toArray +- val bufferStream = new FastByteArrayOutputStream(blockSize + 1000) +- val ser = new KryoSerializer(new SparkConf()).newInstance() +- val serStream = ser.serializeStream(bufferStream) +- var i = 0 +- while (bufferStream.position < blockSize) { +- serStream.writeObject(lines(i)) +- i = (i + 1) % lines.length +- } +- bufferStream.trim() +- val array = bufferStream.array +- +- val countBuf = ByteBuffer.wrap(new Array[Byte](4)) +- countBuf.putInt(array.length) +- countBuf.flip() +- +- val serverSocket = new ServerSocket(port) +- logInfo("Listening on port " + port) +- +- while (true) { +- val socket = serverSocket.accept() +- logInfo("Got a new connection") +- val out = new RateLimitedOutputStream(socket.getOutputStream, bytesPerSec) +- try { +- while (true) { +- out.write(countBuf.array) +- out.write(array) +- } +- } catch { +- case e: IOException => +- logError("Client disconnected") +- socket.close() +- } +- } +- } +-} +-- +1.8.3.4 (Apple Git-47) + diff --git a/spark-v0.9.1-rc3-0006-remove-unavailable-and-unnecessary-deps.patch b/spark-v0.9.1-rc3-0006-remove-unavailable-and-unnecessary-deps.patch new file mode 100644 index 0000000..305e4a0 --- /dev/null +++ b/spark-v0.9.1-rc3-0006-remove-unavailable-and-unnecessary-deps.patch @@ -0,0 +1,58 @@ +From e110f954300a46c391a91cd61d027b8ce92e831d Mon Sep 17 00:00:00 2001 +From: William Benton +Date: Fri, 28 Feb 2014 09:00:04 -0600 +Subject: [PATCH 6/9] remove unavailable and unnecessary deps + +Conflicts: + project/SparkBuild.scala +--- + project/SparkBuild.scala | 14 ++------------ + 1 file changed, 2 insertions(+), 12 deletions(-) + +diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala +index 3925475..c028cd7 100644 +--- a/project/SparkBuild.scala ++++ b/project/SparkBuild.scala +@@ -191,13 +191,7 @@ object SparkBuild extends Build { + "io.netty" % "netty-all" % "4.0.13.Final", + "org.eclipse.jetty" % "jetty-server" % "7.6.8.v20121106", + /** Workaround for SPARK-959. Dependency used by org.eclipse.jetty. Fixed in ivy 2.3.0. */ +- "org.eclipse.jetty.orbit" % "javax.servlet" % "2.5.0.v201103041518" artifacts Artifact("javax.servlet", "jar", "jar"), +- "org.scalatest" %% "scalatest" % "1.9.1" % "test", +- "org.scalacheck" %% "scalacheck" % "1.10.0" % "test", +- "com.novocode" % "junit-interface" % "0.10" % "test", +- "org.easymock" % "easymock" % "3.1" % "test", +- "org.mockito" % "mockito-all" % "1.8.5" % "test", +- "commons-io" % "commons-io" % "2.4" % "test" ++ "org.eclipse.jetty.orbit" % "javax.servlet" % "2.5.0.v201103041518" artifacts Artifact("javax.servlet", "jar", "jar") + ), + + testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"), +@@ -241,14 +235,12 @@ object SparkBuild extends Build { + "org.xerial.snappy" % "snappy-java" % "1.0.5", + "org.spark-project.akka" %% "akka-remote" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty), + "org.spark-project.akka" %% "akka-slf4j" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty), +- "org.spark-project.akka" %% "akka-testkit" % "2.2.3-shaded-protobuf" % "test", + "org.json4s" %% "json4s-jackson" % "3.2.6", + "it.unimi.dsi" % "fastutil" % "6.4.4", + "colt" % "colt" % "1.2.0", + "org.apache.mesos" % "mesos" % "0.13.0", + "net.java.dev.jets3t" % "jets3t" % "0.7.1", +- "org.apache.derby" % "derby" % "10.4.2.0" % "test", +- "org.apache.hadoop" % hadoopClient % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib), ++ "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib), + "org.apache.avro" % "avro" % "1.7.4", + "org.apache.avro" % "avro-ipc" % "1.7.4" excludeAll(excludeNetty), + "org.apache.zookeeper" % "zookeeper" % "3.4.5" excludeAll(excludeNetty), +@@ -256,8 +248,6 @@ object SparkBuild extends Build { + "com.codahale.metrics" % "metrics-jvm" % "3.0.0", + "com.codahale.metrics" % "metrics-json" % "3.0.0", + "com.codahale.metrics" % "metrics-graphite" % "3.0.0", +- "com.twitter" %% "chill" % "0.3.1", +- "com.twitter" % "chill-java" % "0.3.1", + "com.clearspring.analytics" % "stream" % "2.5.1" + ) + ) +-- +1.8.3.4 (Apple Git-47) + diff --git a/spark-v0.9.1-rc3-0007-use-Jetty-8.patch b/spark-v0.9.1-rc3-0007-use-Jetty-8.patch new file mode 100644 index 0000000..f6ea209 --- /dev/null +++ b/spark-v0.9.1-rc3-0007-use-Jetty-8.patch @@ -0,0 +1,27 @@ +From 972a16c320ceda6ed667e984591e3e5262d6dbcc Mon Sep 17 00:00:00 2001 +From: William Benton +Date: Fri, 28 Feb 2014 15:16:45 -0600 +Subject: [PATCH 7/9] use Jetty 8 + +--- + project/SparkBuild.scala | 4 +--- + 1 file changed, 1 insertion(+), 3 deletions(-) + +diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala +index c028cd7..9fdf9f8 100644 +--- a/project/SparkBuild.scala ++++ b/project/SparkBuild.scala +@@ -189,9 +189,7 @@ object SparkBuild extends Build { + + libraryDependencies ++= Seq( + "io.netty" % "netty-all" % "4.0.13.Final", +- "org.eclipse.jetty" % "jetty-server" % "7.6.8.v20121106", +- /** Workaround for SPARK-959. Dependency used by org.eclipse.jetty. Fixed in ivy 2.3.0. */ +- "org.eclipse.jetty.orbit" % "javax.servlet" % "2.5.0.v201103041518" artifacts Artifact("javax.servlet", "jar", "jar") ++ "org.eclipse.jetty" % "jetty-server" % "8.1.14.v20131031" + ), + + testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"), +-- +1.8.3.4 (Apple Git-47) + diff --git a/spark-v0.9.1-rc3-0008-use-Akka-2.3.0-RC2.patch b/spark-v0.9.1-rc3-0008-use-Akka-2.3.0-RC2.patch new file mode 100644 index 0000000..0762514 --- /dev/null +++ b/spark-v0.9.1-rc3-0008-use-Akka-2.3.0-RC2.patch @@ -0,0 +1,84 @@ +From c0050cf7511547e3009416c5d448ed1ce19be5d7 Mon Sep 17 00:00:00 2001 +From: William Benton +Date: Fri, 28 Feb 2014 15:31:52 -0600 +Subject: [PATCH 8/9] use Akka 2.3.0-RC2 + +--- + core/src/main/scala/org/apache/spark/deploy/Client.scala | 2 +- + core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala | 2 +- + .../main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala | 2 +- + .../main/scala/org/apache/spark/util/IndestructibleActorSystem.scala | 2 +- + project/SparkBuild.scala | 5 +++-- + 5 files changed, 7 insertions(+), 6 deletions(-) + +diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala +index 9987e23..7fda886 100644 +--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala ++++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala +@@ -116,7 +116,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends + println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.") + System.exit(-1) + +- case AssociationErrorEvent(cause, _, remoteAddress, _) => ++ case AssociationErrorEvent(cause, _, remoteAddress, _, _) => + println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.") + println(s"Cause was: $cause") + System.exit(-1) +diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +index 1415e2f..8d732db 100644 +--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala ++++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +@@ -153,7 +153,7 @@ private[spark] class AppClient( + logWarning(s"Connection to $address failed; waiting for master to reconnect...") + markDisconnected() + +- case AssociationErrorEvent(cause, _, address, _) if isPossibleMaster(address) => ++ case AssociationErrorEvent(cause, _, address, _, _) if isPossibleMaster(address) => + logWarning(s"Could not connect to $address: $cause") + + case StopAppClient => +diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala +index 1dc39c4..732a1d7 100644 +--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala ++++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala +@@ -52,7 +52,7 @@ private[spark] class WorkerWatcher(workerUrl: String) extends Actor + case AssociatedEvent(localAddress, remoteAddress, inbound) if isWorker(remoteAddress) => + logInfo(s"Successfully connected to $workerUrl") + +- case AssociationErrorEvent(cause, localAddress, remoteAddress, inbound) ++ case AssociationErrorEvent(cause, localAddress, remoteAddress, inbound, _) + if isWorker(remoteAddress) => + // These logs may not be seen if the worker (and associated pipe) has died + logError(s"Could not initialize connection to worker $workerUrl. Exiting.") +diff --git a/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala +index bf71882..08d703e 100644 +--- a/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala ++++ b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala +@@ -39,7 +39,7 @@ private[akka] class IndestructibleActorSystemImpl( + override val name: String, + applicationConfig: Config, + classLoader: ClassLoader) +- extends ActorSystemImpl(name, applicationConfig, classLoader) { ++ extends ActorSystemImpl(name, applicationConfig, classLoader, None) { + + protected override def uncaughtExceptionHandler: Thread.UncaughtExceptionHandler = { + val fallbackHandler = super.uncaughtExceptionHandler +diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala +index 9fdf9f8..9f9962d 100644 +--- a/project/SparkBuild.scala ++++ b/project/SparkBuild.scala +@@ -231,8 +231,9 @@ object SparkBuild extends Build { + "commons-daemon" % "commons-daemon" % "1.0.10", // workaround for bug HADOOP-9407 + "com.ning" % "compress-lzf" % "1.0.0", + "org.xerial.snappy" % "snappy-java" % "1.0.5", +- "org.spark-project.akka" %% "akka-remote" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty), +- "org.spark-project.akka" %% "akka-slf4j" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty), ++ "org.ow2.asm" % "asm" % "4.0", ++ "com.typesafe.akka" %% "akka-remote" % "2.3.0-RC2" excludeAll(excludeNetty), ++ "com.typesafe.akka" %% "akka-slf4j" % "2.3.0-RC2" excludeAll(excludeNetty), + "org.json4s" %% "json4s-jackson" % "3.2.6", + "it.unimi.dsi" % "fastutil" % "6.4.4", + "colt" % "colt" % "1.2.0", +-- +1.8.3.4 (Apple Git-47) + diff --git a/spark-v0.9.1-rc3-0009-xmvn.patch b/spark-v0.9.1-rc3-0009-xmvn.patch new file mode 100644 index 0000000..b995a0a --- /dev/null +++ b/spark-v0.9.1-rc3-0009-xmvn.patch @@ -0,0 +1,53 @@ +From f07ad8324eb75b2f196ce83317c7dcb844f93d11 Mon Sep 17 00:00:00 2001 +From: William Benton +Date: Fri, 28 Feb 2014 16:39:51 -0600 +Subject: [PATCH 9/9] fedora-only resolver changes + +--- + project/SparkBuild.scala | 15 ++++----------- + 1 file changed, 4 insertions(+), 11 deletions(-) + +diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala +index 9f9962d..6a3f40f 100644 +--- a/project/SparkBuild.scala ++++ b/project/SparkBuild.scala +@@ -101,7 +101,11 @@ object SparkBuild extends Build { + + lazy val allProjects = packageProjects ++ allExternalRefs ++ Seq[ProjectReference](tools) + ++ val ivyLocal = Resolver.file("local", file("IVY_LOCAL"))(Resolver.ivyStylePatterns) ++ + def sharedSettings = Defaults.defaultSettings ++ Seq( ++ externalResolvers := Seq(new sbt.RawRepository(new org.fedoraproject.maven.connector.ivy.IvyResolver), ivyLocal), ++ + organization := "org.apache.spark", + version := "0.9.1", + scalaVersion := "2.10.3", +@@ -131,13 +135,6 @@ object SparkBuild extends Build { + // Only allow one test at a time, even across projects, since they run in the same JVM + concurrentRestrictions in Global += Tags.limit(Tags.Test, 1), + +- // also check the local Maven repository ~/.m2 +- resolvers ++= Seq(Resolver.file("Local Maven Repo", file(Path.userHome + "/.m2/repository"))), +- +- // For Sonatype publishing +- resolvers ++= Seq("sonatype-snapshots" at "https://oss.sonatype.org/content/repositories/snapshots", +- "sonatype-staging" at "https://oss.sonatype.org/service/local/staging/deploy/maven2/"), +- + publishMavenStyle := true, + + //useGpg in Global := true, +@@ -217,10 +214,6 @@ object SparkBuild extends Build { + + def coreSettings = sharedSettings ++ Seq( + name := "spark-core", +- resolvers ++= Seq( +- "JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/", +- "Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/" +- ), + + libraryDependencies ++= Seq( + "com.google.guava" % "guava" % "14.0.1", +-- +1.8.3.4 (Apple Git-47) +