From 42acb0c396f7c9e51679f3c30cb0ca4aa50d159e Mon Sep 17 00:00:00 2001 From: William Benton Date: Mar 14 2014 19:35:08 +0000 Subject: initial import --- diff --git a/.gitignore b/.gitignore index e69de29..6407f31 100644 --- a/.gitignore +++ b/.gitignore @@ -0,0 +1 @@ +/v0.9.0-incubating.tar.gz diff --git a/sources b/sources index e69de29..a17853d 100644 --- a/sources +++ b/sources @@ -0,0 +1 @@ +f31821e3d54e0335e55eccbceda49b6c v0.9.0-incubating.tar.gz diff --git a/spark-v0.9.0-0001-Replace-lift-json-with-json4s-jackson.patch b/spark-v0.9.0-0001-Replace-lift-json-with-json4s-jackson.patch new file mode 100644 index 0000000..73479d2 --- /dev/null +++ b/spark-v0.9.0-0001-Replace-lift-json-with-json4s-jackson.patch @@ -0,0 +1,253 @@ +From ab4e57f1acd2ee95622a0d264f9b81c4bbb43f9e Mon Sep 17 00:00:00 2001 +From: William Benton +Date: Sun, 23 Feb 2014 17:22:07 -0600 +Subject: [PATCH 1/7] Replace lift-json with json4s-jackson. + +The aim of the Json4s project is to provide a common API for +Scala JSON libraries. It is Apache-licensed, easier for +downstream distributions to package, and mostly API-compatible +with lift-json. Furthermore, the Jackson-backed implementation +parses faster than lift-json on all but the smallest inputs. + +Backported patch from master to 0.9.0. + +Conflicts: + core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala + core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala + core/src/main/scala/org/apache/spark/ui/JettyUtils.scala + core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +--- + core/pom.xml | 5 +++-- + .../apache/spark/deploy/FaultToleranceTest.scala | 9 ++++---- + .../org/apache/spark/deploy/JsonProtocol.scala | 2 +- + .../spark/deploy/master/ui/ApplicationPage.scala | 2 +- + .../apache/spark/deploy/master/ui/IndexPage.scala | 2 +- + .../apache/spark/deploy/worker/ui/IndexPage.scala | 2 +- + .../scala/org/apache/spark/ui/JettyUtils.scala | 8 ++++---- + .../apache/spark/deploy/JsonProtocolSuite.scala | 24 ++++++++++++++++++---- + project/SparkBuild.scala | 2 +- + 9 files changed, 37 insertions(+), 19 deletions(-) + +diff --git a/core/pom.xml b/core/pom.xml +index 62ceba1..afae171 100644 +--- a/core/pom.xml ++++ b/core/pom.xml +@@ -108,8 +108,9 @@ + scala-library + + +- net.liftweb +- lift-json_${scala.binary.version} ++ org.json4s ++ json4s-jackson_${scala.binary.version} ++ 3.2.6 + + + it.unimi.dsi +diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala +index 4dfb19e..60a87af 100644 +--- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala ++++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala +@@ -29,7 +29,8 @@ import scala.concurrent.ExecutionContext.Implicits.global + import scala.collection.mutable.ListBuffer + import scala.sys.process._ + +-import net.liftweb.json.JsonParser ++import org.json4s._ ++import org.json4s.jackson.JsonMethods + + import org.apache.spark.{Logging, SparkContext} + import org.apache.spark.deploy.master.RecoveryState +@@ -312,7 +313,7 @@ private[spark] object FaultToleranceTest extends App with Logging { + private[spark] class TestMasterInfo(val ip: String, val dockerId: DockerId, val logFile: File) + extends Logging { + +- implicit val formats = net.liftweb.json.DefaultFormats ++ implicit val formats = org.json4s.DefaultFormats + var state: RecoveryState.Value = _ + var liveWorkerIPs: List[String] = _ + var numLiveApps = 0 +@@ -322,7 +323,7 @@ private[spark] class TestMasterInfo(val ip: String, val dockerId: DockerId, val + def readState() { + try { + val masterStream = new InputStreamReader(new URL("http://%s:8080/json".format(ip)).openStream) +- val json = JsonParser.parse(masterStream, closeAutomatically = true) ++ val json = JsonMethods.parse(masterStream) + + val workers = json \ "workers" + val liveWorkers = workers.children.filter(w => (w \ "state").extract[String] == "ALIVE") +@@ -350,7 +351,7 @@ private[spark] class TestMasterInfo(val ip: String, val dockerId: DockerId, val + private[spark] class TestWorkerInfo(val ip: String, val dockerId: DockerId, val logFile: File) + extends Logging { + +- implicit val formats = net.liftweb.json.DefaultFormats ++ implicit val formats = org.json4s.DefaultFormats + + logDebug("Created worker: " + this) + +diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala +index e607b8c..a43d004 100644 +--- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala ++++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala +@@ -17,7 +17,7 @@ + + package org.apache.spark.deploy + +-import net.liftweb.json.JsonDSL._ ++import org.json4s.JsonDSL._ + + import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse} + import org.apache.spark.deploy.master.{ApplicationInfo, WorkerInfo} +diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +index 9485bfd..1b234d6 100644 +--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala ++++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +@@ -22,7 +22,7 @@ import scala.xml.Node + + import akka.pattern.ask + import javax.servlet.http.HttpServletRequest +-import net.liftweb.json.JsonAST.JValue ++import org.json4s.JValue + + import org.apache.spark.deploy.JsonProtocol + import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} +diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala +index a9af8df..a55264b 100644 +--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala ++++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala +@@ -23,7 +23,7 @@ import scala.xml.Node + + import akka.pattern.ask + import javax.servlet.http.HttpServletRequest +-import net.liftweb.json.JsonAST.JValue ++import org.json4s.JValue + + import org.apache.spark.deploy.{DeployWebUI, JsonProtocol} + import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} +diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala +index 925c6fb..de356dc 100644 +--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala ++++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala +@@ -22,7 +22,7 @@ import scala.xml.Node + + import akka.pattern.ask + import javax.servlet.http.HttpServletRequest +-import net.liftweb.json.JsonAST.JValue ++import org.json4s.JValue + + import org.apache.spark.deploy.JsonProtocol + import org.apache.spark.deploy.DeployMessages.{RequestWorkerState, WorkerStateResponse} +diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +index 7211dbc..4e43fd5 100644 +--- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala ++++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +@@ -23,10 +23,10 @@ import scala.annotation.tailrec + import scala.util.{Try, Success, Failure} + import scala.xml.Node + +-import net.liftweb.json.{JValue, pretty, render} +- +-import org.eclipse.jetty.server.{Server, Request, Handler} +-import org.eclipse.jetty.server.handler.{ResourceHandler, HandlerList, ContextHandler, AbstractHandler} ++import org.json4s.JValue ++import org.json4s.jackson.JsonMethods.{pretty, render} ++import org.eclipse.jetty.server.{Handler, Request, Server} ++import org.eclipse.jetty.server.handler.{AbstractHandler, ContextHandler, HandlerList, ResourceHandler} + import org.eclipse.jetty.util.thread.QueuedThreadPool + + import org.apache.spark.Logging +diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +index d05bbd6..8f1df8a 100644 +--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala ++++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +@@ -20,8 +20,12 @@ package org.apache.spark.deploy + import java.io.File + import java.util.Date + +-import net.liftweb.json.{JsonAST, JsonParser} +-import net.liftweb.json.JsonAST.JValue ++import org.json4s._ ++ ++import org.json4s.JValue ++import org.json4s.jackson.JsonMethods ++import com.fasterxml.jackson.core.JsonParseException ++ + import org.scalatest.FunSuite + + import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse} +@@ -32,21 +36,31 @@ class JsonProtocolSuite extends FunSuite { + test("writeApplicationInfo") { + val output = JsonProtocol.writeApplicationInfo(createAppInfo()) + assertValidJson(output) ++ assertValidDataInJson(output, JsonMethods.parse(JsonConstants.appInfoJsonStr)) + } + + test("writeWorkerInfo") { + val output = JsonProtocol.writeWorkerInfo(createWorkerInfo()) + assertValidJson(output) ++ assertValidDataInJson(output, JsonMethods.parse(JsonConstants.workerInfoJsonStr)) + } + + test("writeApplicationDescription") { + val output = JsonProtocol.writeApplicationDescription(createAppDesc()) + assertValidJson(output) ++ assertValidDataInJson(output, JsonMethods.parse(JsonConstants.appDescJsonStr)) + } + + test("writeExecutorRunner") { + val output = JsonProtocol.writeExecutorRunner(createExecutorRunner()) + assertValidJson(output) ++ assertValidDataInJson(output, JsonMethods.parse(JsonConstants.executorRunnerJsonStr)) ++ } ++ ++ test("writeDriverInfo") { ++ val output = JsonProtocol.writeDriverInfo(createDriverInfo()) ++ assertValidJson(output) ++ assertValidDataInJson(output, JsonMethods.parse(JsonConstants.driverInfoJsonStr)) + } + + test("writeMasterState") { +@@ -59,6 +73,7 @@ class JsonProtocolSuite extends FunSuite { + activeDrivers, completedDrivers, RecoveryState.ALIVE) + val output = JsonProtocol.writeMasterState(stateResponse) + assertValidJson(output) ++ assertValidDataInJson(output, JsonMethods.parse(JsonConstants.masterStateJsonStr)) + } + + test("writeWorkerState") { +@@ -70,6 +85,7 @@ class JsonProtocolSuite extends FunSuite { + finishedExecutors, drivers, finishedDrivers, "masterUrl", 4, 1234, 4, 1234, "masterWebUiUrl") + val output = JsonProtocol.writeWorkerState(stateResponse) + assertValidJson(output) ++ assertValidDataInJson(output, JsonMethods.parse(JsonConstants.workerStateJsonStr)) + } + + def createAppDesc(): ApplicationDescription = { +@@ -106,9 +122,9 @@ class JsonProtocolSuite extends FunSuite { + + def assertValidJson(json: JValue) { + try { +- JsonParser.parse(JsonAST.compactRender(json)) ++ JsonMethods.parse(JsonMethods.compact(json)) + } catch { +- case e: JsonParser.ParseException => fail("Invalid Json detected", e) ++ case e: JsonParseException => fail("Invalid Json detected", e) + } + } + } +diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala +index b891ffa..398f5ec 100644 +--- a/project/SparkBuild.scala ++++ b/project/SparkBuild.scala +@@ -260,7 +260,7 @@ object SparkBuild extends Build { + "org.spark-project.akka" %% "akka-remote" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty), + "org.spark-project.akka" %% "akka-slf4j" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty), + "org.spark-project.akka" %% "akka-testkit" % "2.2.3-shaded-protobuf" % "test", +- "net.liftweb" %% "lift-json" % "2.5.1" excludeAll(excludeNetty), ++ "org.json4s" %% "json4s-jackson" % "3.2.6", + "it.unimi.dsi" % "fastutil" % "6.4.4", + "colt" % "colt" % "1.2.0", + "org.apache.mesos" % "mesos" % "0.13.0", +-- +1.8.3.4 (Apple Git-47) + diff --git a/spark-v0.9.0-0002-use-sbt-0.13.1.patch b/spark-v0.9.0-0002-use-sbt-0.13.1.patch new file mode 100644 index 0000000..fb38677 --- /dev/null +++ b/spark-v0.9.0-0002-use-sbt-0.13.1.patch @@ -0,0 +1,22 @@ +From 64d595b9cc0a92ab133f1e13512c5d7da2982fdb Mon Sep 17 00:00:00 2001 +From: William Benton +Date: Thu, 27 Feb 2014 14:25:34 -0600 +Subject: [PATCH 2/7] use sbt 0.13.1 + +--- + project/build.properties | 2 +- + 1 file changed, 1 insertion(+), 1 deletion(-) + +diff --git a/project/build.properties b/project/build.properties +index 839f5fb..4b52bb9 100644 +--- a/project/build.properties ++++ b/project/build.properties +@@ -14,4 +14,4 @@ + # See the License for the specific language governing permissions and + # limitations under the License. + # +-sbt.version=0.12.4 ++sbt.version=0.13.1 +-- +1.8.3.4 (Apple Git-47) + diff --git a/spark-v0.9.0-0003-Removed-sbt-plugins.patch b/spark-v0.9.0-0003-Removed-sbt-plugins.patch new file mode 100644 index 0000000..7e86a1f --- /dev/null +++ b/spark-v0.9.0-0003-Removed-sbt-plugins.patch @@ -0,0 +1,164 @@ +From 88286b1bffc4eb65f6a259c71e613fb76667470b Mon Sep 17 00:00:00 2001 +From: William Benton +Date: Thu, 27 Feb 2014 15:46:41 -0600 +Subject: [PATCH 3/7] Removed sbt plugins. + +--- + project/SparkBuild.scala | 38 +++++----------------------------- + project/plugins.sbt | 18 ---------------- + project/project/SparkPluginBuild.scala | 24 --------------------- + 3 files changed, 5 insertions(+), 75 deletions(-) + delete mode 100644 project/plugins.sbt + delete mode 100644 project/project/SparkPluginBuild.scala + +diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala +index 398f5ec..d401c71 100644 +--- a/project/SparkBuild.scala ++++ b/project/SparkBuild.scala +@@ -18,8 +18,6 @@ + import sbt._ + import sbt.Classpaths.publishTask + import Keys._ +-import sbtassembly.Plugin._ +-import AssemblyKeys._ + import scala.util.Properties + // For Sonatype publishing + //import com.jsuereth.pgp.sbtplugin.PgpKeys._ +@@ -60,11 +58,6 @@ object SparkBuild extends Build { + + lazy val mllib = Project("mllib", file("mllib"), settings = mllibSettings) dependsOn(core) + +- lazy val assemblyProj = Project("assembly", file("assembly"), settings = assemblyProjSettings) +- .dependsOn(core, graphx, bagel, mllib, repl, streaming) dependsOn(maybeYarn: _*) +- +- lazy val assembleDeps = TaskKey[Unit]("assemble-deps", "Build assembly of dependencies and packages Spark projects") +- + // A configuration to set an alternative publishLocalConfiguration + lazy val MavenCompile = config("m2r") extend(Compile) + lazy val publishLocalBoth = TaskKey[Unit]("publish-local", "publish local for m2 and ivy") +@@ -116,7 +109,7 @@ object SparkBuild extends Build { + // Everything except assembly, tools and examples belong to packageProjects + lazy val packageProjects = Seq[ProjectReference](core, repl, bagel, streaming, mllib, graphx) ++ maybeYarnRef + +- lazy val allProjects = packageProjects ++ allExternalRefs ++ Seq[ProjectReference](examples, tools, assemblyProj) ++ lazy val allProjects = packageProjects ++ allExternalRefs ++ Seq[ProjectReference](examples, tools) + + def sharedSettings = Defaults.defaultSettings ++ Seq( + organization := "org.apache.spark", +@@ -129,7 +122,6 @@ object SparkBuild extends Build { + retrieveManaged := true, + retrievePattern := "[type]s/[artifact](-[revision])(-[classifier]).[ext]", + transitiveClassifiers in Scope.GlobalScope := Seq("sources"), +- testListeners <<= target.map(t => Seq(new eu.henkelmann.sbt.JUnitXmlTestsListener(t.getAbsolutePath))), + + // Fork new JVMs for tests and set Java options for those + fork := true, +@@ -230,8 +222,8 @@ object SparkBuild extends Build { + publishMavenStyle in MavenCompile := true, + publishLocal in MavenCompile <<= publishTask(publishLocalConfiguration in MavenCompile, deliverLocal), + publishLocalBoth <<= Seq(publishLocal in MavenCompile, publishLocal).dependOn +- ) ++ net.virtualvoid.sbt.graph.Plugin.graphSettings +- ++ ) ++ + val slf4jVersion = "1.7.2" + + val excludeCglib = ExclusionRule(organization = "org.sonatype.sisu.inject") +@@ -309,11 +301,11 @@ object SparkBuild extends Build { + excludeAll(excludeSnappy) + excludeAll(excludeCglib) + ) +- ) ++ assemblySettings ++ extraAssemblySettings ++ ) + + def toolsSettings = sharedSettings ++ Seq( + name := "spark-tools" +- ) ++ assemblySettings ++ extraAssemblySettings ++ ) + + def graphxSettings = sharedSettings ++ Seq( + name := "spark-graphx", +@@ -377,26 +369,6 @@ object SparkBuild extends Build { + ) + ) + +- def assemblyProjSettings = sharedSettings ++ Seq( +- libraryDependencies += "net.sf.py4j" % "py4j" % "0.8.1", +- name := "spark-assembly", +- assembleDeps in Compile <<= (packageProjects.map(packageBin in Compile in _) ++ Seq(packageDependency in Compile)).dependOn, +- jarName in assembly <<= version map { v => "spark-assembly-" + v + "-hadoop" + hadoopVersion + ".jar" }, +- jarName in packageDependency <<= version map { v => "spark-assembly-" + v + "-hadoop" + hadoopVersion + "-deps.jar" } +- ) ++ assemblySettings ++ extraAssemblySettings +- +- def extraAssemblySettings() = Seq( +- test in assembly := {}, +- mergeStrategy in assembly := { +- case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard +- case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard +- case "log4j.properties" => MergeStrategy.discard +- case m if m.toLowerCase.startsWith("meta-inf/services/") => MergeStrategy.filterDistinctLines +- case "reference.conf" => MergeStrategy.concat +- case _ => MergeStrategy.first +- } +- ) +- + def twitterSettings() = sharedSettings ++ Seq( + name := "spark-streaming-twitter", + libraryDependencies ++= Seq( +diff --git a/project/plugins.sbt b/project/plugins.sbt +deleted file mode 100644 +index 4ba0e42..0000000 +--- a/project/plugins.sbt ++++ /dev/null +@@ -1,18 +0,0 @@ +-resolvers += Resolver.url("artifactory", url("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases"))(Resolver.ivyStylePatterns) +- +-resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/" +- +-resolvers += "Spray Repository" at "http://repo.spray.cc/" +- +-addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.9.2") +- +-addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.2.0") +- +-addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.5.1") +- +-// For Sonatype publishing +-//resolvers += Resolver.url("sbt-plugin-releases", new URL("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases/"))(Resolver.ivyStylePatterns) +- +-//addSbtPlugin("com.jsuereth" % "xsbt-gpg-plugin" % "0.6") +- +-addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.7.3") +diff --git a/project/project/SparkPluginBuild.scala b/project/project/SparkPluginBuild.scala +deleted file mode 100644 +index 6a66bd1..0000000 +--- a/project/project/SparkPluginBuild.scala ++++ /dev/null +@@ -1,24 +0,0 @@ +-/* +- * Licensed to the Apache Software Foundation (ASF) under one or more +- * contributor license agreements. See the NOTICE file distributed with +- * this work for additional information regarding copyright ownership. +- * The ASF licenses this file to You under the Apache License, Version 2.0 +- * (the "License"); you may not use this file except in compliance with +- * the License. You may obtain a copy of the License at +- * +- * http://www.apache.org/licenses/LICENSE-2.0 +- * +- * Unless required by applicable law or agreed to in writing, software +- * distributed under the License is distributed on an "AS IS" BASIS, +- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +- * See the License for the specific language governing permissions and +- * limitations under the License. +- */ +- +-import sbt._ +- +-object SparkPluginDef extends Build { +- lazy val root = Project("plugins", file(".")) dependsOn(junitXmlListener) +- /* This is not published in a Maven repository, so we get it from GitHub directly */ +- lazy val junitXmlListener = uri("git://github.com/ijuma/junit_xml_listener.git#fe434773255b451a38e8d889536ebc260f4225ce") +-} +-- +1.8.3.4 (Apple Git-47) + diff --git a/spark-v0.9.0-0004-removed-examples.patch b/spark-v0.9.0-0004-removed-examples.patch new file mode 100644 index 0000000..ac75621 --- /dev/null +++ b/spark-v0.9.0-0004-removed-examples.patch @@ -0,0 +1,121 @@ +From e0d10f720e46f3c20a517079cf05642edd18e2bf Mon Sep 17 00:00:00 2001 +From: William Benton +Date: Thu, 27 Feb 2014 16:01:11 -0600 +Subject: [PATCH 4/7] removed examples + +--- + project/SparkBuild.scala | 82 ++---------------------------------------------- + 1 file changed, 3 insertions(+), 79 deletions(-) + +diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala +index d401c71..068bf74 100644 +--- a/project/SparkBuild.scala ++++ b/project/SparkBuild.scala +@@ -85,31 +85,13 @@ object SparkBuild extends Build { + lazy val maybeYarn = if (isYarnEnabled) Seq[ClasspathDependency](if (isNewHadoop) yarn else yarnAlpha) else Seq[ClasspathDependency]() + lazy val maybeYarnRef = if (isYarnEnabled) Seq[ProjectReference](if (isNewHadoop) yarn else yarnAlpha) else Seq[ProjectReference]() + +- lazy val externalTwitter = Project("external-twitter", file("external/twitter"), settings = twitterSettings) +- .dependsOn(streaming % "compile->compile;test->test") +- +- lazy val externalKafka = Project("external-kafka", file("external/kafka"), settings = kafkaSettings) +- .dependsOn(streaming % "compile->compile;test->test") +- +- lazy val externalFlume = Project("external-flume", file("external/flume"), settings = flumeSettings) +- .dependsOn(streaming % "compile->compile;test->test") +- +- lazy val externalZeromq = Project("external-zeromq", file("external/zeromq"), settings = zeromqSettings) +- .dependsOn(streaming % "compile->compile;test->test") +- +- lazy val externalMqtt = Project("external-mqtt", file("external/mqtt"), settings = mqttSettings) +- .dependsOn(streaming % "compile->compile;test->test") +- +- lazy val allExternal = Seq[ClasspathDependency](externalTwitter, externalKafka, externalFlume, externalZeromq, externalMqtt) +- lazy val allExternalRefs = Seq[ProjectReference](externalTwitter, externalKafka, externalFlume, externalZeromq, externalMqtt) +- +- lazy val examples = Project("examples", file("examples"), settings = examplesSettings) +- .dependsOn(core, mllib, graphx, bagel, streaming, externalTwitter) dependsOn(allExternal: _*) ++ lazy val allExternal = Seq[ClasspathDependency]() ++ lazy val allExternalRefs = Seq[ProjectReference]() + + // Everything except assembly, tools and examples belong to packageProjects + lazy val packageProjects = Seq[ProjectReference](core, repl, bagel, streaming, mllib, graphx) ++ maybeYarnRef + +- lazy val allProjects = packageProjects ++ allExternalRefs ++ Seq[ProjectReference](examples, tools) ++ lazy val allProjects = packageProjects ++ allExternalRefs ++ Seq[ProjectReference](tools) + + def sharedSettings = Defaults.defaultSettings ++ Seq( + organization := "org.apache.spark", +@@ -284,25 +266,6 @@ object SparkBuild extends Build { + libraryDependencies <+= scalaVersion(v => "org.scala-lang" % "scala-reflect" % v ) + ) + +- def examplesSettings = sharedSettings ++ Seq( +- name := "spark-examples", +- libraryDependencies ++= Seq( +- "com.twitter" %% "algebird-core" % "0.1.11", +- "org.apache.hbase" % "hbase" % "0.94.6" excludeAll(excludeNetty, excludeAsm), +- "org.apache.hbase" % "hbase" % HBASE_VERSION excludeAll(excludeNetty, excludeAsm), +- "org.apache.cassandra" % "cassandra-all" % "1.2.6" +- exclude("com.google.guava", "guava") +- exclude("com.googlecode.concurrentlinkedhashmap", "concurrentlinkedhashmap-lru") +- exclude("com.ning","compress-lzf") +- exclude("io.netty", "netty") +- exclude("jline","jline") +- exclude("log4j","log4j") +- exclude("org.apache.cassandra.deps", "avro") +- excludeAll(excludeSnappy) +- excludeAll(excludeCglib) +- ) +- ) +- + def toolsSettings = sharedSettings ++ Seq( + name := "spark-tools" + ) +@@ -368,43 +331,4 @@ object SparkBuild extends Build { + "org.apache.hadoop" % "hadoop-yarn-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib) + ) + ) +- +- def twitterSettings() = sharedSettings ++ Seq( +- name := "spark-streaming-twitter", +- libraryDependencies ++= Seq( +- "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty) +- ) +- ) +- +- def kafkaSettings() = sharedSettings ++ Seq( +- name := "spark-streaming-kafka", +- libraryDependencies ++= Seq( +- "com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty), +- "org.apache.kafka" %% "kafka" % "0.8.0" +- exclude("com.sun.jdmk", "jmxtools") +- exclude("com.sun.jmx", "jmxri") +- exclude("net.sf.jopt-simple", "jopt-simple") +- excludeAll(excludeNetty) +- ) +- ) +- +- def flumeSettings() = sharedSettings ++ Seq( +- name := "spark-streaming-flume", +- libraryDependencies ++= Seq( +- "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy) +- ) +- ) +- +- def zeromqSettings() = sharedSettings ++ Seq( +- name := "spark-streaming-zeromq", +- libraryDependencies ++= Seq( +- "org.spark-project.akka" %% "akka-zeromq" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty) +- ) +- ) +- +- def mqttSettings() = streamingSettings ++ Seq( +- name := "spark-streaming-mqtt", +- resolvers ++= Seq("Eclipse Repo" at "https://repo.eclipse.org/content/repositories/paho-releases/"), +- libraryDependencies ++= Seq("org.eclipse.paho" % "mqtt-client" % "0.4.0") +- ) + } +-- +1.8.3.4 (Apple Git-47) + diff --git a/spark-v0.9.0-0005-Removed-code-depending-on-Kryo.patch b/spark-v0.9.0-0005-Removed-code-depending-on-Kryo.patch new file mode 100644 index 0000000..18ed918 --- /dev/null +++ b/spark-v0.9.0-0005-Removed-code-depending-on-Kryo.patch @@ -0,0 +1,643 @@ +From d5dfc0ec1e07c9eebae4fac002e0046b8d0c4f8e Mon Sep 17 00:00:00 2001 +From: William Benton +Date: Thu, 27 Feb 2014 16:43:44 -0600 +Subject: [PATCH 5/7] Removed code depending on Kryo + +--- + .../apache/spark/serializer/KryoSerializer.scala | 175 --------------------- + .../apache/spark/storage/StoragePerfTester.scala | 103 ------------ + .../org/apache/spark/storage/ThreadingTest.scala | 115 -------------- + .../util/collection/ExternalAppendOnlyMap.scala | 10 +- + .../apache/spark/graphx/GraphKryoRegistrator.scala | 48 ------ + .../apache/spark/mllib/recommendation/ALS.scala | 12 -- + .../spark/streaming/util/RawTextSender.scala | 82 ---------- + 7 files changed, 1 insertion(+), 544 deletions(-) + delete mode 100644 core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala + delete mode 100644 core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala + delete mode 100644 core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala + delete mode 100644 graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala + delete mode 100644 streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala + +diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +deleted file mode 100644 +index c14cd47..0000000 +--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ++++ /dev/null +@@ -1,175 +0,0 @@ +-/* +- * Licensed to the Apache Software Foundation (ASF) under one or more +- * contributor license agreements. See the NOTICE file distributed with +- * this work for additional information regarding copyright ownership. +- * The ASF licenses this file to You under the Apache License, Version 2.0 +- * (the "License"); you may not use this file except in compliance with +- * the License. You may obtain a copy of the License at +- * +- * http://www.apache.org/licenses/LICENSE-2.0 +- * +- * Unless required by applicable law or agreed to in writing, software +- * distributed under the License is distributed on an "AS IS" BASIS, +- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +- * See the License for the specific language governing permissions and +- * limitations under the License. +- */ +- +-package org.apache.spark.serializer +- +-import java.nio.ByteBuffer +-import java.io.{EOFException, InputStream, OutputStream} +- +-import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer} +-import com.esotericsoftware.kryo.{KryoException, Kryo} +-import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} +-import com.twitter.chill.{EmptyScalaKryoInstantiator, AllScalaRegistrar} +- +-import org.apache.spark._ +-import org.apache.spark.broadcast.HttpBroadcast +-import org.apache.spark.scheduler.MapStatus +-import org.apache.spark.storage._ +-import org.apache.spark.storage.{GetBlock, GotBlock, PutBlock} +- +-/** +- * A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]]. +- */ +-class KryoSerializer(conf: SparkConf) extends org.apache.spark.serializer.Serializer with Logging { +- private val bufferSize = { +- conf.getInt("spark.kryoserializer.buffer.mb", 2) * 1024 * 1024 +- } +- +- def newKryoOutput() = new KryoOutput(bufferSize) +- +- def newKryo(): Kryo = { +- val instantiator = new EmptyScalaKryoInstantiator +- val kryo = instantiator.newKryo() +- val classLoader = Thread.currentThread.getContextClassLoader +- +- // Allow disabling Kryo reference tracking if user knows their object graphs don't have loops. +- // Do this before we invoke the user registrator so the user registrator can override this. +- kryo.setReferences(conf.getBoolean("spark.kryo.referenceTracking", true)) +- +- for (cls <- KryoSerializer.toRegister) kryo.register(cls) +- +- // Allow sending SerializableWritable +- kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer()) +- kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer()) +- +- // Allow the user to register their own classes by setting spark.kryo.registrator +- try { +- for (regCls <- conf.getOption("spark.kryo.registrator")) { +- logDebug("Running user registrator: " + regCls) +- val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator] +- reg.registerClasses(kryo) +- } +- } catch { +- case e: Exception => logError("Failed to run spark.kryo.registrator", e) +- } +- +- // Register Chill's classes; we do this after our ranges and the user's own classes to let +- // our code override the generic serialziers in Chill for things like Seq +- new AllScalaRegistrar().apply(kryo) +- +- kryo.setClassLoader(classLoader) +- kryo +- } +- +- def newInstance(): SerializerInstance = { +- new KryoSerializerInstance(this) +- } +-} +- +-private[spark] +-class KryoSerializationStream(kryo: Kryo, outStream: OutputStream) extends SerializationStream { +- val output = new KryoOutput(outStream) +- +- def writeObject[T](t: T): SerializationStream = { +- kryo.writeClassAndObject(output, t) +- this +- } +- +- def flush() { output.flush() } +- def close() { output.close() } +-} +- +-private[spark] +-class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends DeserializationStream { +- val input = new KryoInput(inStream) +- +- def readObject[T](): T = { +- try { +- kryo.readClassAndObject(input).asInstanceOf[T] +- } catch { +- // DeserializationStream uses the EOF exception to indicate stopping condition. +- case _: KryoException => throw new EOFException +- } +- } +- +- def close() { +- // Kryo's Input automatically closes the input stream it is using. +- input.close() +- } +-} +- +-private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance { +- val kryo = ks.newKryo() +- +- // Make these lazy vals to avoid creating a buffer unless we use them +- lazy val output = ks.newKryoOutput() +- lazy val input = new KryoInput() +- +- def serialize[T](t: T): ByteBuffer = { +- output.clear() +- kryo.writeClassAndObject(output, t) +- ByteBuffer.wrap(output.toBytes) +- } +- +- def deserialize[T](bytes: ByteBuffer): T = { +- input.setBuffer(bytes.array) +- kryo.readClassAndObject(input).asInstanceOf[T] +- } +- +- def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = { +- val oldClassLoader = kryo.getClassLoader +- kryo.setClassLoader(loader) +- input.setBuffer(bytes.array) +- val obj = kryo.readClassAndObject(input).asInstanceOf[T] +- kryo.setClassLoader(oldClassLoader) +- obj +- } +- +- def serializeStream(s: OutputStream): SerializationStream = { +- new KryoSerializationStream(kryo, s) +- } +- +- def deserializeStream(s: InputStream): DeserializationStream = { +- new KryoDeserializationStream(kryo, s) +- } +-} +- +-/** +- * Interface implemented by clients to register their classes with Kryo when using Kryo +- * serialization. +- */ +-trait KryoRegistrator { +- def registerClasses(kryo: Kryo) +-} +- +-private[serializer] object KryoSerializer { +- // Commonly used classes. +- private val toRegister: Seq[Class[_]] = Seq( +- ByteBuffer.allocate(1).getClass, +- classOf[StorageLevel], +- classOf[PutBlock], +- classOf[GotBlock], +- classOf[GetBlock], +- classOf[MapStatus], +- classOf[BlockManagerId], +- classOf[Array[Byte]], +- (1 to 10).getClass, +- (1 until 10).getClass, +- (1L to 10L).getClass, +- (1L until 10L).getClass +- ) +-} +diff --git a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala b/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala +deleted file mode 100644 +index 40734aa..0000000 +--- a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala ++++ /dev/null +@@ -1,103 +0,0 @@ +-/* +- * Licensed to the Apache Software Foundation (ASF) under one or more +- * contributor license agreements. See the NOTICE file distributed with +- * this work for additional information regarding copyright ownership. +- * The ASF licenses this file to You under the Apache License, Version 2.0 +- * (the "License"); you may not use this file except in compliance with +- * the License. You may obtain a copy of the License at +- * +- * http://www.apache.org/licenses/LICENSE-2.0 +- * +- * Unless required by applicable law or agreed to in writing, software +- * distributed under the License is distributed on an "AS IS" BASIS, +- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +- * See the License for the specific language governing permissions and +- * limitations under the License. +- */ +- +-package org.apache.spark.storage +- +-import java.util.concurrent.atomic.AtomicLong +-import java.util.concurrent.{CountDownLatch, Executors} +- +-import org.apache.spark.serializer.KryoSerializer +-import org.apache.spark.SparkContext +-import org.apache.spark.util.Utils +- +-/** +- * Utility for micro-benchmarking shuffle write performance. +- * +- * Writes simulated shuffle output from several threads and records the observed throughput. +- */ +-object StoragePerfTester { +- def main(args: Array[String]) = { +- /** Total amount of data to generate. Distributed evenly amongst maps and reduce splits. */ +- val dataSizeMb = Utils.memoryStringToMb(sys.env.getOrElse("OUTPUT_DATA", "1g")) +- +- /** Number of map tasks. All tasks execute concurrently. */ +- val numMaps = sys.env.get("NUM_MAPS").map(_.toInt).getOrElse(8) +- +- /** Number of reduce splits for each map task. */ +- val numOutputSplits = sys.env.get("NUM_REDUCERS").map(_.toInt).getOrElse(500) +- +- val recordLength = 1000 // ~1KB records +- val totalRecords = dataSizeMb * 1000 +- val recordsPerMap = totalRecords / numMaps +- +- val writeData = "1" * recordLength +- val executor = Executors.newFixedThreadPool(numMaps) +- +- System.setProperty("spark.shuffle.compress", "false") +- System.setProperty("spark.shuffle.sync", "true") +- +- // This is only used to instantiate a BlockManager. All thread scheduling is done manually. +- val sc = new SparkContext("local[4]", "Write Tester") +- val blockManager = sc.env.blockManager +- +- def writeOutputBytes(mapId: Int, total: AtomicLong) = { +- val shuffle = blockManager.shuffleBlockManager.forMapTask(1, mapId, numOutputSplits, +- new KryoSerializer(sc.conf)) +- val writers = shuffle.writers +- for (i <- 1 to recordsPerMap) { +- writers(i % numOutputSplits).write(writeData) +- } +- writers.map {w => +- w.commit() +- total.addAndGet(w.fileSegment().length) +- w.close() +- } +- +- shuffle.releaseWriters(true) +- } +- +- val start = System.currentTimeMillis() +- val latch = new CountDownLatch(numMaps) +- val totalBytes = new AtomicLong() +- for (task <- 1 to numMaps) { +- executor.submit(new Runnable() { +- override def run() = { +- try { +- writeOutputBytes(task, totalBytes) +- latch.countDown() +- } catch { +- case e: Exception => +- println("Exception in child thread: " + e + " " + e.getMessage) +- System.exit(1) +- } +- } +- }) +- } +- latch.await() +- val end = System.currentTimeMillis() +- val time = (end - start) / 1000.0 +- val bytesPerSecond = totalBytes.get() / time +- val bytesPerFile = (totalBytes.get() / (numOutputSplits * numMaps.toDouble)).toLong +- +- System.err.println("files_total\t\t%s".format(numMaps * numOutputSplits)) +- System.err.println("bytes_per_file\t\t%s".format(Utils.bytesToString(bytesPerFile))) +- System.err.println("agg_throughput\t\t%s/s".format(Utils.bytesToString(bytesPerSecond.toLong))) +- +- executor.shutdown() +- sc.stop() +- } +-} +diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala +deleted file mode 100644 +index 729ba2c..0000000 +--- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala ++++ /dev/null +@@ -1,115 +0,0 @@ +-/* +- * Licensed to the Apache Software Foundation (ASF) under one or more +- * contributor license agreements. See the NOTICE file distributed with +- * this work for additional information regarding copyright ownership. +- * The ASF licenses this file to You under the Apache License, Version 2.0 +- * (the "License"); you may not use this file except in compliance with +- * the License. You may obtain a copy of the License at +- * +- * http://www.apache.org/licenses/LICENSE-2.0 +- * +- * Unless required by applicable law or agreed to in writing, software +- * distributed under the License is distributed on an "AS IS" BASIS, +- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +- * See the License for the specific language governing permissions and +- * limitations under the License. +- */ +- +-package org.apache.spark.storage +- +-import akka.actor._ +- +-import java.util.concurrent.ArrayBlockingQueue +-import util.Random +-import org.apache.spark.serializer.KryoSerializer +-import org.apache.spark.{SparkConf, SparkContext} +- +-/** +- * This class tests the BlockManager and MemoryStore for thread safety and +- * deadlocks. It spawns a number of producer and consumer threads. Producer +- * threads continuously pushes blocks into the BlockManager and consumer +- * threads continuously retrieves the blocks form the BlockManager and tests +- * whether the block is correct or not. +- */ +-private[spark] object ThreadingTest { +- +- val numProducers = 5 +- val numBlocksPerProducer = 20000 +- +- private[spark] class ProducerThread(manager: BlockManager, id: Int) extends Thread { +- val queue = new ArrayBlockingQueue[(BlockId, Seq[Int])](100) +- +- override def run() { +- for (i <- 1 to numBlocksPerProducer) { +- val blockId = TestBlockId("b-" + id + "-" + i) +- val blockSize = Random.nextInt(1000) +- val block = (1 to blockSize).map(_ => Random.nextInt()) +- val level = randomLevel() +- val startTime = System.currentTimeMillis() +- manager.put(blockId, block.iterator, level, true) +- println("Pushed block " + blockId + " in " + (System.currentTimeMillis - startTime) + " ms") +- queue.add((blockId, block)) +- } +- println("Producer thread " + id + " terminated") +- } +- +- def randomLevel(): StorageLevel = { +- math.abs(Random.nextInt()) % 4 match { +- case 0 => StorageLevel.MEMORY_ONLY +- case 1 => StorageLevel.MEMORY_ONLY_SER +- case 2 => StorageLevel.MEMORY_AND_DISK +- case 3 => StorageLevel.MEMORY_AND_DISK_SER +- } +- } +- } +- +- private[spark] class ConsumerThread( +- manager: BlockManager, +- queue: ArrayBlockingQueue[(BlockId, Seq[Int])] +- ) extends Thread { +- var numBlockConsumed = 0 +- +- override def run() { +- println("Consumer thread started") +- while(numBlockConsumed < numBlocksPerProducer) { +- val (blockId, block) = queue.take() +- val startTime = System.currentTimeMillis() +- manager.get(blockId) match { +- case Some(retrievedBlock) => +- assert(retrievedBlock.toList.asInstanceOf[List[Int]] == block.toList, +- "Block " + blockId + " did not match") +- println("Got block " + blockId + " in " + +- (System.currentTimeMillis - startTime) + " ms") +- case None => +- assert(false, "Block " + blockId + " could not be retrieved") +- } +- numBlockConsumed += 1 +- } +- println("Consumer thread terminated") +- } +- } +- +- def main(args: Array[String]) { +- System.setProperty("spark.kryoserializer.buffer.mb", "1") +- val actorSystem = ActorSystem("test") +- val conf = new SparkConf() +- val serializer = new KryoSerializer(conf) +- val blockManagerMaster = new BlockManagerMaster( +- actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf))), conf) +- val blockManager = new BlockManager( +- "", actorSystem, blockManagerMaster, serializer, 1024 * 1024, conf) +- val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i)) +- val consumers = producers.map(p => new ConsumerThread(blockManager, p.queue)) +- producers.foreach(_.start) +- consumers.foreach(_.start) +- producers.foreach(_.join) +- consumers.foreach(_.join) +- blockManager.stop() +- blockManagerMaster.stop() +- actorSystem.shutdown() +- actorSystem.awaitTermination() +- println("Everything stopped.") +- println( +- "It will take sometime for the JVM to clean all temporary files and shutdown. Sit tight.") +- } +-} +diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +index 3d9b09e..63ec782 100644 +--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala ++++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +@@ -27,7 +27,7 @@ import it.unimi.dsi.fastutil.io.FastBufferedInputStream + + import org.apache.spark.{Logging, SparkEnv} + import org.apache.spark.io.LZFCompressionCodec +-import org.apache.spark.serializer.{KryoDeserializationStream, Serializer} ++import org.apache.spark.serializer.Serializer + import org.apache.spark.storage.{BlockId, BlockManager, DiskBlockObjectWriter} + + /** +@@ -378,14 +378,6 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( + try { + if (objectsRead == serializerBatchSize) { + val newInputStream = deserializeStream match { +- case stream: KryoDeserializationStream => +- // Kryo's serializer stores an internal buffer that pre-fetches from the underlying +- // stream. We need to capture this buffer and feed it to the new serialization +- // stream so that the bytes are not lost. +- val kryoInput = stream.input +- val remainingBytes = kryoInput.limit() - kryoInput.position() +- val extraBuf = kryoInput.readBytes(remainingBytes) +- new SequenceInputStream(new ByteArrayInputStream(extraBuf), compressedStream) + case _ => compressedStream + } + deserializeStream = ser.deserializeStream(newInputStream) +diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala +deleted file mode 100644 +index dd380d8..0000000 +--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala ++++ /dev/null +@@ -1,48 +0,0 @@ +-/* +- * Licensed to the Apache Software Foundation (ASF) under one or more +- * contributor license agreements. See the NOTICE file distributed with +- * this work for additional information regarding copyright ownership. +- * The ASF licenses this file to You under the Apache License, Version 2.0 +- * (the "License"); you may not use this file except in compliance with +- * the License. You may obtain a copy of the License at +- * +- * http://www.apache.org/licenses/LICENSE-2.0 +- * +- * Unless required by applicable law or agreed to in writing, software +- * distributed under the License is distributed on an "AS IS" BASIS, +- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +- * See the License for the specific language governing permissions and +- * limitations under the License. +- */ +- +-package org.apache.spark.graphx +- +-import com.esotericsoftware.kryo.Kryo +- +-import org.apache.spark.graphx.impl._ +-import org.apache.spark.serializer.KryoRegistrator +-import org.apache.spark.util.collection.BitSet +-import org.apache.spark.util.BoundedPriorityQueue +- +-/** +- * Registers GraphX classes with Kryo for improved performance. +- */ +-class GraphKryoRegistrator extends KryoRegistrator { +- +- def registerClasses(kryo: Kryo) { +- kryo.register(classOf[Edge[Object]]) +- kryo.register(classOf[MessageToPartition[Object]]) +- kryo.register(classOf[VertexBroadcastMsg[Object]]) +- kryo.register(classOf[(VertexId, Object)]) +- kryo.register(classOf[EdgePartition[Object]]) +- kryo.register(classOf[BitSet]) +- kryo.register(classOf[VertexIdToIndexMap]) +- kryo.register(classOf[VertexAttributeBlock[Object]]) +- kryo.register(classOf[PartitionStrategy]) +- kryo.register(classOf[BoundedPriorityQueue[Object]]) +- kryo.register(classOf[EdgeDirection]) +- +- // This avoids a large number of hash table lookups. +- kryo.setReferences(false) +- } +-} +diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +index 89ee070..66d581d 100644 +--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala ++++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +@@ -25,10 +25,8 @@ import org.apache.spark.broadcast.Broadcast + import org.apache.spark.{Logging, HashPartitioner, Partitioner, SparkContext, SparkConf} + import org.apache.spark.storage.StorageLevel + import org.apache.spark.rdd.RDD +-import org.apache.spark.serializer.KryoRegistrator + import org.apache.spark.SparkContext._ + +-import com.esotericsoftware.kryo.Kryo + import org.jblas.{DoubleMatrix, SimpleBlas, Solve} + + +@@ -560,12 +558,6 @@ object ALS { + trainImplicit(ratings, rank, iterations, 0.01, -1, 1.0) + } + +- private class ALSRegistrator extends KryoRegistrator { +- override def registerClasses(kryo: Kryo) { +- kryo.register(classOf[Rating]) +- } +- } +- + def main(args: Array[String]) { + if (args.length < 5 || args.length > 9) { + println("Usage: ALS " + +@@ -579,10 +571,6 @@ object ALS { + val alpha = if (args.length >= 8) args(7).toDouble else 1 + val blocks = if (args.length == 9) args(8).toInt else -1 + val conf = new SparkConf() +- .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") +- .set("spark.kryo.registrator", classOf[ALSRegistrator].getName) +- .set("spark.kryo.referenceTracking", "false") +- .set("spark.kryoserializer.buffer.mb", "8") + .set("spark.locality.wait", "10000") + val sc = new SparkContext(master, "ALS", conf) + +diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala +deleted file mode 100644 +index 684b38e..0000000 +--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala ++++ /dev/null +@@ -1,82 +0,0 @@ +-/* +- * Licensed to the Apache Software Foundation (ASF) under one or more +- * contributor license agreements. See the NOTICE file distributed with +- * this work for additional information regarding copyright ownership. +- * The ASF licenses this file to You under the Apache License, Version 2.0 +- * (the "License"); you may not use this file except in compliance with +- * the License. You may obtain a copy of the License at +- * +- * http://www.apache.org/licenses/LICENSE-2.0 +- * +- * Unless required by applicable law or agreed to in writing, software +- * distributed under the License is distributed on an "AS IS" BASIS, +- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +- * See the License for the specific language governing permissions and +- * limitations under the License. +- */ +- +-package org.apache.spark.streaming.util +- +-import java.io.IOException +-import java.net.ServerSocket +-import java.nio.ByteBuffer +- +-import scala.io.Source +- +-import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream +- +-import org.apache.spark.{SparkConf, Logging} +-import org.apache.spark.serializer.KryoSerializer +-import org.apache.spark.util.IntParam +- +-/** +- * A helper program that sends blocks of Kryo-serialized text strings out on a socket at a +- * specified rate. Used to feed data into RawInputDStream. +- */ +-private[streaming] +-object RawTextSender extends Logging { +- def main(args: Array[String]) { +- if (args.length != 4) { +- System.err.println("Usage: RawTextSender ") +- System.exit(1) +- } +- // Parse the arguments using a pattern match +- val Array(IntParam(port), file, IntParam(blockSize), IntParam(bytesPerSec)) = args +- +- // Repeat the input data multiple times to fill in a buffer +- val lines = Source.fromFile(file).getLines().toArray +- val bufferStream = new FastByteArrayOutputStream(blockSize + 1000) +- val ser = new KryoSerializer(new SparkConf()).newInstance() +- val serStream = ser.serializeStream(bufferStream) +- var i = 0 +- while (bufferStream.position < blockSize) { +- serStream.writeObject(lines(i)) +- i = (i + 1) % lines.length +- } +- bufferStream.trim() +- val array = bufferStream.array +- +- val countBuf = ByteBuffer.wrap(new Array[Byte](4)) +- countBuf.putInt(array.length) +- countBuf.flip() +- +- val serverSocket = new ServerSocket(port) +- logInfo("Listening on port " + port) +- +- while (true) { +- val socket = serverSocket.accept() +- logInfo("Got a new connection") +- val out = new RateLimitedOutputStream(socket.getOutputStream, bytesPerSec) +- try { +- while (true) { +- out.write(countBuf.array) +- out.write(array) +- } +- } catch { +- case e: IOException => +- logError("Client disconnected") +- socket.close() +- } +- } +- } +-} +-- +1.8.3.4 (Apple Git-47) + diff --git a/spark-v0.9.0-0006-Remove-functionality-depending-on-stream-lib.patch b/spark-v0.9.0-0006-Remove-functionality-depending-on-stream-lib.patch new file mode 100644 index 0000000..cbc9a44 --- /dev/null +++ b/spark-v0.9.0-0006-Remove-functionality-depending-on-stream-lib.patch @@ -0,0 +1,249 @@ +From 324784a03fe0c1d8f8a9fd9ecca60b07b4e867d7 Mon Sep 17 00:00:00 2001 +From: William Benton +Date: Thu, 27 Feb 2014 16:50:36 -0600 +Subject: [PATCH 6/7] Remove functionality depending on stream-lib. + +Most notably, countApproxDistinctByKey +--- + .../org/apache/spark/api/java/JavaPairRDD.scala | 36 ---------------- + .../org/apache/spark/api/java/JavaRDDLike.scala | 10 ----- + .../org/apache/spark/rdd/PairRDDFunctions.scala | 42 ------------------ + core/src/main/scala/org/apache/spark/rdd/RDD.scala | 16 +------ + .../spark/util/SerializableHyperLogLog.scala | 50 ---------------------- + 5 files changed, 1 insertion(+), 153 deletions(-) + delete mode 100644 core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala + +diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +index f430a33..348ef04 100644 +--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala ++++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +@@ -611,42 +611,6 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K + */ + def values(): JavaRDD[V] = JavaRDD.fromRDD[V](rdd.map(_._2)) + +- /** +- * Return approximate number of distinct values for each key in this RDD. +- * The accuracy of approximation can be controlled through the relative standard deviation +- * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in +- * more accurate counts but increase the memory footprint and vise versa. Uses the provided +- * Partitioner to partition the output RDD. +- */ +- def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaRDD[(K, Long)] = { +- rdd.countApproxDistinctByKey(relativeSD, partitioner) +- } +- +- /** +- * Return approximate number of distinct values for each key this RDD. +- * The accuracy of approximation can be controlled through the relative standard deviation +- * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in +- * more accurate counts but increase the memory footprint and vise versa. The default value of +- * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism +- * level. +- */ +- def countApproxDistinctByKey(relativeSD: Double = 0.05): JavaRDD[(K, Long)] = { +- rdd.countApproxDistinctByKey(relativeSD) +- } +- +- +- /** +- * Return approximate number of distinct values for each key in this RDD. +- * The accuracy of approximation can be controlled through the relative standard deviation +- * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in +- * more accurate counts but increase the memory footprint and vise versa. HashPartitions the +- * output RDD into numPartitions. +- * +- */ +- def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaRDD[(K, Long)] = { +- rdd.countApproxDistinctByKey(relativeSD, numPartitions) +- } +- + /** Assign a name to this RDD */ + def setName(name: String): JavaPairRDD[K, V] = { + rdd.setName(name) +diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +index ebbbbd8..98834f7 100644 +--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala ++++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +@@ -450,15 +450,5 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { + takeOrdered(num, comp) + } + +- /** +- * Return approximate number of distinct elements in the RDD. +- * +- * The accuracy of approximation can be controlled through the relative standard deviation +- * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in +- * more accurate counts but increase the memory footprint and vise versa. The default value of +- * relativeSD is 0.05. +- */ +- def countApproxDistinct(relativeSD: Double = 0.05): Long = rdd.countApproxDistinct(relativeSD) +- + def name(): String = rdd.name + } +diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +index 4148581..93190ed 100644 +--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala ++++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +@@ -38,8 +38,6 @@ import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob} + import org.apache.hadoop.mapreduce.{RecordWriter => NewRecordWriter} + import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat} + +-import com.clearspring.analytics.stream.cardinality.HyperLogLog +- + // SparkHadoopWriter and SparkHadoopMapReduceUtil are actually source files defined in Spark. + import org.apache.hadoop.mapred.SparkHadoopWriter + import org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil +@@ -47,7 +45,6 @@ import org.apache.spark._ + import org.apache.spark.SparkContext._ + import org.apache.spark.partial.{BoundedDouble, PartialResult} + import org.apache.spark.Partitioner.defaultPartitioner +-import org.apache.spark.util.SerializableHyperLogLog + + /** + * Extra functions available on RDDs of (key, value) pairs through an implicit conversion. +@@ -210,45 +207,6 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) + } + + /** +- * Return approximate number of distinct values for each key in this RDD. +- * The accuracy of approximation can be controlled through the relative standard deviation +- * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in +- * more accurate counts but increase the memory footprint and vise versa. Uses the provided +- * Partitioner to partition the output RDD. +- */ +- def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = { +- val createHLL = (v: V) => new SerializableHyperLogLog(new HyperLogLog(relativeSD)).add(v) +- val mergeValueHLL = (hll: SerializableHyperLogLog, v: V) => hll.add(v) +- val mergeHLL = (h1: SerializableHyperLogLog, h2: SerializableHyperLogLog) => h1.merge(h2) +- +- combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.value.cardinality()) +- } +- +- /** +- * Return approximate number of distinct values for each key in this RDD. +- * The accuracy of approximation can be controlled through the relative standard deviation +- * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in +- * more accurate counts but increase the memory footprint and vise versa. HashPartitions the +- * output RDD into numPartitions. +- * +- */ +- def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = { +- countApproxDistinctByKey(relativeSD, new HashPartitioner(numPartitions)) +- } +- +- /** +- * Return approximate number of distinct values for each key this RDD. +- * The accuracy of approximation can be controlled through the relative standard deviation +- * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in +- * more accurate counts but increase the memory footprint and vise versa. The default value of +- * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism +- * level. +- */ +- def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = { +- countApproxDistinctByKey(relativeSD, defaultPartitioner(self)) +- } +- +- /** + * Merge the values for each key using an associative reduce function. This will also perform + * the merging locally on each mapper before sending results to a reducer, similarly to a + * "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions. +diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala +index cd90a15..1bdb80d 100644 +--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala ++++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala +@@ -32,7 +32,6 @@ import org.apache.hadoop.io.Text + import org.apache.hadoop.mapred.TextOutputFormat + + import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap} +-import com.clearspring.analytics.stream.cardinality.HyperLogLog + + import org.apache.spark.Partitioner._ + import org.apache.spark.api.java.JavaRDD +@@ -41,7 +40,7 @@ import org.apache.spark.partial.CountEvaluator + import org.apache.spark.partial.GroupedCountEvaluator + import org.apache.spark.partial.PartialResult + import org.apache.spark.storage.StorageLevel +-import org.apache.spark.util.{Utils, BoundedPriorityQueue, SerializableHyperLogLog} ++import org.apache.spark.util.{Utils, BoundedPriorityQueue} + + import org.apache.spark.SparkContext._ + import org.apache.spark._ +@@ -798,19 +797,6 @@ abstract class RDD[T: ClassTag]( + } + + /** +- * Return approximate number of distinct elements in the RDD. +- * +- * The accuracy of approximation can be controlled through the relative standard deviation +- * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in +- * more accurate counts but increase the memory footprint and vise versa. The default value of +- * relativeSD is 0.05. +- */ +- def countApproxDistinct(relativeSD: Double = 0.05): Long = { +- val zeroCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD)) +- aggregate(zeroCounter)(_.add(_), _.merge(_)).value.cardinality() +- } +- +- /** + * Take the first num elements of the RDD. It works by first scanning one partition, and use the + * results from that partition to estimate the number of additional partitions needed to satisfy + * the limit. +diff --git a/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala b/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala +deleted file mode 100644 +index 8b4e7c1..0000000 +--- a/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala ++++ /dev/null +@@ -1,50 +0,0 @@ +-/* +- * Licensed to the Apache Software Foundation (ASF) under one or more +- * contributor license agreements. See the NOTICE file distributed with +- * this work for additional information regarding copyright ownership. +- * The ASF licenses this file to You under the Apache License, Version 2.0 +- * (the "License"); you may not use this file except in compliance with +- * the License. You may obtain a copy of the License at +- * +- * http://www.apache.org/licenses/LICENSE-2.0 +- * +- * Unless required by applicable law or agreed to in writing, software +- * distributed under the License is distributed on an "AS IS" BASIS, +- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +- * See the License for the specific language governing permissions and +- * limitations under the License. +- */ +- +-package org.apache.spark.util +- +-import java.io.{Externalizable, ObjectOutput, ObjectInput} +-import com.clearspring.analytics.stream.cardinality.{ICardinality, HyperLogLog} +- +-/** +- * A wrapper around [[com.clearspring.analytics.stream.cardinality.HyperLogLog]] that is serializable. +- */ +-private[spark] +-class SerializableHyperLogLog(var value: ICardinality) extends Externalizable { +- +- def this() = this(null) // For deserialization +- +- def merge(other: SerializableHyperLogLog) = new SerializableHyperLogLog(value.merge(other.value)) +- +- def add[T](elem: T) = { +- this.value.offer(elem) +- this +- } +- +- def readExternal(in: ObjectInput) { +- val byteLength = in.readInt() +- val bytes = new Array[Byte](byteLength) +- in.readFully(bytes) +- value = HyperLogLog.Builder.build(bytes) +- } +- +- def writeExternal(out: ObjectOutput) { +- val bytes = value.getBytes() +- out.writeInt(bytes.length) +- out.write(bytes) +- } +-} +-- +1.8.3.4 (Apple Git-47) + diff --git a/spark-v0.9.0-0007-Removed-mesos.patch b/spark-v0.9.0-0007-Removed-mesos.patch new file mode 100644 index 0000000..5241ae9 --- /dev/null +++ b/spark-v0.9.0-0007-Removed-mesos.patch @@ -0,0 +1,851 @@ +From c321a448cd80331df07cda0f5f20955b3b148aac Mon Sep 17 00:00:00 2001 +From: William Benton +Date: Thu, 27 Feb 2014 17:05:12 -0600 +Subject: [PATCH 7/7] Removed mesos + +--- + .../main/scala/org/apache/spark/SparkContext.scala | 15 - + .../main/scala/org/apache/spark/TaskState.scala | 21 -- + .../spark/executor/MesosExecutorBackend.scala | 104 ------- + .../mesos/CoarseMesosSchedulerBackend.scala | 289 ----------------- + .../cluster/mesos/MesosSchedulerBackend.scala | 344 --------------------- + 5 files changed, 773 deletions(-) + delete mode 100644 core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala + delete mode 100644 core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala + delete mode 100644 core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala + +diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala +index 566472e..f3b2941 100644 +--- a/core/src/main/scala/org/apache/spark/SparkContext.scala ++++ b/core/src/main/scala/org/apache/spark/SparkContext.scala +@@ -36,7 +36,6 @@ import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, Sequence + TextInputFormat} + import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob} + import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} +-import org.apache.mesos.MesosNativeLibrary + + import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} + import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} +@@ -44,7 +43,6 @@ import org.apache.spark.rdd._ + import org.apache.spark.scheduler._ + import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, + SparkDeploySchedulerBackend, SimrSchedulerBackend} +-import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} + import org.apache.spark.scheduler.local.LocalBackend + import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils} + import org.apache.spark.ui.SparkUI +@@ -1281,19 +1279,6 @@ object SparkContext { + scheduler.initialize(backend) + scheduler + +- case mesosUrl @ MESOS_REGEX(_) => +- MesosNativeLibrary.load() +- val scheduler = new TaskSchedulerImpl(sc) +- val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", false) +- val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs +- val backend = if (coarseGrained) { +- new CoarseMesosSchedulerBackend(scheduler, sc, url, appName) +- } else { +- new MesosSchedulerBackend(scheduler, sc, url, appName) +- } +- scheduler.initialize(backend) +- scheduler +- + case SIMR_REGEX(simrUrl) => + val scheduler = new TaskSchedulerImpl(sc) + val backend = new SimrSchedulerBackend(scheduler, sc, simrUrl) +diff --git a/core/src/main/scala/org/apache/spark/TaskState.scala b/core/src/main/scala/org/apache/spark/TaskState.scala +index 0bf1e4a..cdd8baf 100644 +--- a/core/src/main/scala/org/apache/spark/TaskState.scala ++++ b/core/src/main/scala/org/apache/spark/TaskState.scala +@@ -17,8 +17,6 @@ + + package org.apache.spark + +-import org.apache.mesos.Protos.{TaskState => MesosTaskState} +- + private[spark] object TaskState extends Enumeration { + + val LAUNCHING, RUNNING, FINISHED, FAILED, KILLED, LOST = Value +@@ -28,23 +26,4 @@ private[spark] object TaskState extends Enumeration { + type TaskState = Value + + def isFinished(state: TaskState) = FINISHED_STATES.contains(state) +- +- def toMesos(state: TaskState): MesosTaskState = state match { +- case LAUNCHING => MesosTaskState.TASK_STARTING +- case RUNNING => MesosTaskState.TASK_RUNNING +- case FINISHED => MesosTaskState.TASK_FINISHED +- case FAILED => MesosTaskState.TASK_FAILED +- case KILLED => MesosTaskState.TASK_KILLED +- case LOST => MesosTaskState.TASK_LOST +- } +- +- def fromMesos(mesosState: MesosTaskState): TaskState = mesosState match { +- case MesosTaskState.TASK_STAGING => LAUNCHING +- case MesosTaskState.TASK_STARTING => LAUNCHING +- case MesosTaskState.TASK_RUNNING => RUNNING +- case MesosTaskState.TASK_FINISHED => FINISHED +- case MesosTaskState.TASK_FAILED => FAILED +- case MesosTaskState.TASK_KILLED => KILLED +- case MesosTaskState.TASK_LOST => LOST +- } + } +diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala +deleted file mode 100644 +index b56d8c9..0000000 +--- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala ++++ /dev/null +@@ -1,104 +0,0 @@ +-/* +- * Licensed to the Apache Software Foundation (ASF) under one or more +- * contributor license agreements. See the NOTICE file distributed with +- * this work for additional information regarding copyright ownership. +- * The ASF licenses this file to You under the Apache License, Version 2.0 +- * (the "License"); you may not use this file except in compliance with +- * the License. You may obtain a copy of the License at +- * +- * http://www.apache.org/licenses/LICENSE-2.0 +- * +- * Unless required by applicable law or agreed to in writing, software +- * distributed under the License is distributed on an "AS IS" BASIS, +- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +- * See the License for the specific language governing permissions and +- * limitations under the License. +- */ +- +-package org.apache.spark.executor +- +-import java.nio.ByteBuffer +- +-import com.google.protobuf.ByteString +- +-import org.apache.mesos.{Executor => MesosExecutor, MesosExecutorDriver, MesosNativeLibrary, ExecutorDriver} +-import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _} +- +-import org.apache.spark.Logging +-import org.apache.spark.TaskState +-import org.apache.spark.TaskState.TaskState +-import org.apache.spark.util.Utils +- +- +-private[spark] class MesosExecutorBackend +- extends MesosExecutor +- with ExecutorBackend +- with Logging { +- +- var executor: Executor = null +- var driver: ExecutorDriver = null +- +- override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { +- val mesosTaskId = TaskID.newBuilder().setValue(taskId.toString).build() +- driver.sendStatusUpdate(MesosTaskStatus.newBuilder() +- .setTaskId(mesosTaskId) +- .setState(TaskState.toMesos(state)) +- .setData(ByteString.copyFrom(data)) +- .build()) +- } +- +- override def registered( +- driver: ExecutorDriver, +- executorInfo: ExecutorInfo, +- frameworkInfo: FrameworkInfo, +- slaveInfo: SlaveInfo) { +- logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue) +- this.driver = driver +- val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) +- executor = new Executor( +- executorInfo.getExecutorId.getValue, +- slaveInfo.getHostname, +- properties) +- } +- +- override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) { +- val taskId = taskInfo.getTaskId.getValue.toLong +- if (executor == null) { +- logError("Received launchTask but executor was null") +- } else { +- executor.launchTask(this, taskId, taskInfo.getData.asReadOnlyByteBuffer) +- } +- } +- +- override def error(d: ExecutorDriver, message: String) { +- logError("Error from Mesos: " + message) +- } +- +- override def killTask(d: ExecutorDriver, t: TaskID) { +- if (executor == null) { +- logError("Received KillTask but executor was null") +- } else { +- executor.killTask(t.getValue.toLong) +- } +- } +- +- override def reregistered(d: ExecutorDriver, p2: SlaveInfo) {} +- +- override def disconnected(d: ExecutorDriver) {} +- +- override def frameworkMessage(d: ExecutorDriver, data: Array[Byte]) {} +- +- override def shutdown(d: ExecutorDriver) {} +-} +- +-/** +- * Entry point for Mesos executor. +- */ +-private[spark] object MesosExecutorBackend { +- def main(args: Array[String]) { +- MesosNativeLibrary.load() +- // Create a new Executor and start it running +- val runner = new MesosExecutorBackend() +- new MesosExecutorDriver(runner).run() +- } +-} +diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +deleted file mode 100644 +index c27049b..0000000 +--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala ++++ /dev/null +@@ -1,289 +0,0 @@ +-/* +- * Licensed to the Apache Software Foundation (ASF) under one or more +- * contributor license agreements. See the NOTICE file distributed with +- * this work for additional information regarding copyright ownership. +- * The ASF licenses this file to You under the Apache License, Version 2.0 +- * (the "License"); you may not use this file except in compliance with +- * the License. You may obtain a copy of the License at +- * +- * http://www.apache.org/licenses/LICENSE-2.0 +- * +- * Unless required by applicable law or agreed to in writing, software +- * distributed under the License is distributed on an "AS IS" BASIS, +- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +- * See the License for the specific language governing permissions and +- * limitations under the License. +- */ +- +-package org.apache.spark.scheduler.cluster.mesos +- +-import java.io.File +-import java.util.{ArrayList => JArrayList, List => JList} +-import java.util.Collections +- +-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} +-import scala.collection.JavaConversions._ +- +-import com.google.protobuf.ByteString +-import org.apache.mesos.{Scheduler => MScheduler} +-import org.apache.mesos._ +-import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _} +- +-import org.apache.spark.{SparkException, Logging, SparkContext, TaskState} +-import org.apache.spark.scheduler.TaskSchedulerImpl +-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +- +-/** +- * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds +- * onto each Mesos node for the duration of the Spark job instead of relinquishing cores whenever +- * a task is done. It launches Spark tasks within the coarse-grained Mesos tasks using the +- * CoarseGrainedSchedulerBackend mechanism. This class is useful for lower and more predictable +- * latency. +- * +- * Unfortunately this has a bit of duplication from MesosSchedulerBackend, but it seems hard to +- * remove this. +- */ +-private[spark] class CoarseMesosSchedulerBackend( +- scheduler: TaskSchedulerImpl, +- sc: SparkContext, +- master: String, +- appName: String) +- extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) +- with MScheduler +- with Logging { +- +- val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures +- +- // Lock used to wait for scheduler to be registered +- var isRegistered = false +- val registeredLock = new Object() +- +- // Driver for talking to Mesos +- var driver: SchedulerDriver = null +- +- // Maximum number of cores to acquire (TODO: we'll need more flexible controls here) +- val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt +- +- // Cores we have acquired with each Mesos task ID +- val coresByTaskId = new HashMap[Int, Int] +- var totalCoresAcquired = 0 +- +- val slaveIdsWithExecutors = new HashSet[String] +- +- val taskIdToSlaveId = new HashMap[Int, String] +- val failuresBySlaveId = new HashMap[String, Int] // How many times tasks on each slave failed +- +- val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException( +- "Spark home is not set; set it through the spark.home system " + +- "property, the SPARK_HOME environment variable or the SparkContext constructor")) +- +- val extraCoresPerSlave = conf.getInt("spark.mesos.extra.cores", 0) +- +- var nextMesosTaskId = 0 +- +- def newMesosTaskId(): Int = { +- val id = nextMesosTaskId +- nextMesosTaskId += 1 +- id +- } +- +- override def start() { +- super.start() +- +- synchronized { +- new Thread("CoarseMesosSchedulerBackend driver") { +- setDaemon(true) +- override def run() { +- val scheduler = CoarseMesosSchedulerBackend.this +- val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(appName).build() +- driver = new MesosSchedulerDriver(scheduler, fwInfo, master) +- try { { +- val ret = driver.run() +- logInfo("driver.run() returned with code " + ret) +- } +- } catch { +- case e: Exception => logError("driver.run() failed", e) +- } +- } +- }.start() +- +- waitForRegister() +- } +- } +- +- def createCommand(offer: Offer, numCores: Int): CommandInfo = { +- val environment = Environment.newBuilder() +- sc.executorEnvs.foreach { case (key, value) => +- environment.addVariables(Environment.Variable.newBuilder() +- .setName(key) +- .setValue(value) +- .build()) +- } +- val command = CommandInfo.newBuilder() +- .setEnvironment(environment) +- val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( +- conf.get("spark.driver.host"), +- conf.get("spark.driver.port"), +- CoarseGrainedSchedulerBackend.ACTOR_NAME) +- val uri = conf.get("spark.executor.uri", null) +- if (uri == null) { +- val runScript = new File(sparkHome, "./bin/spark-class").getCanonicalPath +- command.setValue( +- "\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d".format( +- runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)) +- } else { +- // Grab everything to the first '.'. We'll use that and '*' to +- // glob the directory "correctly". +- val basename = uri.split('/').last.split('.').head +- command.setValue( +- "cd %s*; ./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d" +- .format(basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)) +- command.addUris(CommandInfo.URI.newBuilder().setValue(uri)) +- } +- command.build() +- } +- +- override def offerRescinded(d: SchedulerDriver, o: OfferID) {} +- +- override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { +- logInfo("Registered as framework ID " + frameworkId.getValue) +- registeredLock.synchronized { +- isRegistered = true +- registeredLock.notifyAll() +- } +- } +- +- def waitForRegister() { +- registeredLock.synchronized { +- while (!isRegistered) { +- registeredLock.wait() +- } +- } +- } +- +- override def disconnected(d: SchedulerDriver) {} +- +- override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {} +- +- /** +- * Method called by Mesos to offer resources on slaves. We respond by launching an executor, +- * unless we've already launched more than we wanted to. +- */ +- override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { +- synchronized { +- val filters = Filters.newBuilder().setRefuseSeconds(-1).build() +- +- for (offer <- offers) { +- val slaveId = offer.getSlaveId.toString +- val mem = getResource(offer.getResourcesList, "mem") +- val cpus = getResource(offer.getResourcesList, "cpus").toInt +- if (totalCoresAcquired < maxCores && mem >= sc.executorMemory && cpus >= 1 && +- failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && +- !slaveIdsWithExecutors.contains(slaveId)) { +- // Launch an executor on the slave +- val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired) +- totalCoresAcquired += cpusToUse +- val taskId = newMesosTaskId() +- taskIdToSlaveId(taskId) = slaveId +- slaveIdsWithExecutors += slaveId +- coresByTaskId(taskId) = cpusToUse +- val task = MesosTaskInfo.newBuilder() +- .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build()) +- .setSlaveId(offer.getSlaveId) +- .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave)) +- .setName("Task " + taskId) +- .addResources(createResource("cpus", cpusToUse)) +- .addResources(createResource("mem", sc.executorMemory)) +- .build() +- d.launchTasks(offer.getId, Collections.singletonList(task), filters) +- } else { +- // Filter it out +- d.launchTasks(offer.getId, Collections.emptyList[MesosTaskInfo](), filters) +- } +- } +- } +- } +- +- /** Helper function to pull out a resource from a Mesos Resources protobuf */ +- private def getResource(res: JList[Resource], name: String): Double = { +- for (r <- res if r.getName == name) { +- return r.getScalar.getValue +- } +- // If we reached here, no resource with the required name was present +- throw new IllegalArgumentException("No resource called " + name + " in " + res) +- } +- +- /** Build a Mesos resource protobuf object */ +- private def createResource(resourceName: String, quantity: Double): Protos.Resource = { +- Resource.newBuilder() +- .setName(resourceName) +- .setType(Value.Type.SCALAR) +- .setScalar(Value.Scalar.newBuilder().setValue(quantity).build()) +- .build() +- } +- +- /** Check whether a Mesos task state represents a finished task */ +- private def isFinished(state: MesosTaskState) = { +- state == MesosTaskState.TASK_FINISHED || +- state == MesosTaskState.TASK_FAILED || +- state == MesosTaskState.TASK_KILLED || +- state == MesosTaskState.TASK_LOST +- } +- +- override def statusUpdate(d: SchedulerDriver, status: TaskStatus) { +- val taskId = status.getTaskId.getValue.toInt +- val state = status.getState +- logInfo("Mesos task " + taskId + " is now " + state) +- synchronized { +- if (isFinished(state)) { +- val slaveId = taskIdToSlaveId(taskId) +- slaveIdsWithExecutors -= slaveId +- taskIdToSlaveId -= taskId +- // Remove the cores we have remembered for this task, if it's in the hashmap +- for (cores <- coresByTaskId.get(taskId)) { +- totalCoresAcquired -= cores +- coresByTaskId -= taskId +- } +- // If it was a failure, mark the slave as failed for blacklisting purposes +- if (state == MesosTaskState.TASK_FAILED || state == MesosTaskState.TASK_LOST) { +- failuresBySlaveId(slaveId) = failuresBySlaveId.getOrElse(slaveId, 0) + 1 +- if (failuresBySlaveId(slaveId) >= MAX_SLAVE_FAILURES) { +- logInfo("Blacklisting Mesos slave " + slaveId + " due to too many failures; " + +- "is Spark installed on it?") +- } +- } +- driver.reviveOffers() // In case we'd rejected everything before but have now lost a node +- } +- } +- } +- +- override def error(d: SchedulerDriver, message: String) { +- logError("Mesos error: " + message) +- scheduler.error(message) +- } +- +- override def stop() { +- super.stop() +- if (driver != null) { +- driver.stop() +- } +- } +- +- override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} +- +- override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) { +- logInfo("Mesos slave lost: " + slaveId.getValue) +- synchronized { +- if (slaveIdsWithExecutors.contains(slaveId.getValue)) { +- // Note that the slave ID corresponds to the executor ID on that slave +- slaveIdsWithExecutors -= slaveId.getValue +- removeExecutor(slaveId.getValue, "Mesos slave lost") +- } +- } +- } +- +- override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) { +- logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue)) +- slaveLost(d, s) +- } +-} +diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +deleted file mode 100644 +index 4978148..0000000 +--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala ++++ /dev/null +@@ -1,344 +0,0 @@ +-/* +- * Licensed to the Apache Software Foundation (ASF) under one or more +- * contributor license agreements. See the NOTICE file distributed with +- * this work for additional information regarding copyright ownership. +- * The ASF licenses this file to You under the Apache License, Version 2.0 +- * (the "License"); you may not use this file except in compliance with +- * the License. You may obtain a copy of the License at +- * +- * http://www.apache.org/licenses/LICENSE-2.0 +- * +- * Unless required by applicable law or agreed to in writing, software +- * distributed under the License is distributed on an "AS IS" BASIS, +- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +- * See the License for the specific language governing permissions and +- * limitations under the License. +- */ +- +-package org.apache.spark.scheduler.cluster.mesos +- +-import java.io.File +-import java.util.{ArrayList => JArrayList, List => JList} +-import java.util.Collections +- +-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} +-import scala.collection.JavaConversions._ +- +-import com.google.protobuf.ByteString +-import org.apache.mesos.{Scheduler => MScheduler} +-import org.apache.mesos._ +-import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _} +- +-import org.apache.spark.{Logging, SparkException, SparkContext, TaskState} +-import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SchedulerBackend, SlaveLost, +- TaskDescription, TaskSchedulerImpl, WorkerOffer} +-import org.apache.spark.util.Utils +- +-/** +- * A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a +- * separate Mesos task, allowing multiple applications to share cluster nodes both in space (tasks +- * from multiple apps can run on different cores) and in time (a core can switch ownership). +- */ +-private[spark] class MesosSchedulerBackend( +- scheduler: TaskSchedulerImpl, +- sc: SparkContext, +- master: String, +- appName: String) +- extends SchedulerBackend +- with MScheduler +- with Logging { +- +- // Lock used to wait for scheduler to be registered +- var isRegistered = false +- val registeredLock = new Object() +- +- // Driver for talking to Mesos +- var driver: SchedulerDriver = null +- +- // Which slave IDs we have executors on +- val slaveIdsWithExecutors = new HashSet[String] +- val taskIdToSlaveId = new HashMap[Long, String] +- +- // An ExecutorInfo for our tasks +- var execArgs: Array[Byte] = null +- +- var classLoader: ClassLoader = null +- +- override def start() { +- synchronized { +- classLoader = Thread.currentThread.getContextClassLoader +- +- new Thread("MesosSchedulerBackend driver") { +- setDaemon(true) +- override def run() { +- val scheduler = MesosSchedulerBackend.this +- val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(appName).build() +- driver = new MesosSchedulerDriver(scheduler, fwInfo, master) +- try { +- val ret = driver.run() +- logInfo("driver.run() returned with code " + ret) +- } catch { +- case e: Exception => logError("driver.run() failed", e) +- } +- } +- }.start() +- +- waitForRegister() +- } +- } +- +- def createExecutorInfo(execId: String): ExecutorInfo = { +- val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException( +- "Spark home is not set; set it through the spark.home system " + +- "property, the SPARK_HOME environment variable or the SparkContext constructor")) +- val environment = Environment.newBuilder() +- sc.executorEnvs.foreach { case (key, value) => +- environment.addVariables(Environment.Variable.newBuilder() +- .setName(key) +- .setValue(value) +- .build()) +- } +- val command = CommandInfo.newBuilder() +- .setEnvironment(environment) +- val uri = sc.conf.get("spark.executor.uri", null) +- if (uri == null) { +- command.setValue(new File(sparkHome, "/sbin/spark-executor").getCanonicalPath) +- } else { +- // Grab everything to the first '.'. We'll use that and '*' to +- // glob the directory "correctly". +- val basename = uri.split('/').last.split('.').head +- command.setValue("cd %s*; ./sbin/spark-executor".format(basename)) +- command.addUris(CommandInfo.URI.newBuilder().setValue(uri)) +- } +- val memory = Resource.newBuilder() +- .setName("mem") +- .setType(Value.Type.SCALAR) +- .setScalar(Value.Scalar.newBuilder().setValue(sc.executorMemory).build()) +- .build() +- ExecutorInfo.newBuilder() +- .setExecutorId(ExecutorID.newBuilder().setValue(execId).build()) +- .setCommand(command) +- .setData(ByteString.copyFrom(createExecArg())) +- .addResources(memory) +- .build() +- } +- +- /** +- * Create and serialize the executor argument to pass to Mesos. Our executor arg is an array +- * containing all the spark.* system properties in the form of (String, String) pairs. +- */ +- private def createExecArg(): Array[Byte] = { +- if (execArgs == null) { +- val props = new HashMap[String, String] +- val iterator = System.getProperties.entrySet.iterator +- while (iterator.hasNext) { +- val entry = iterator.next +- val (key, value) = (entry.getKey.toString, entry.getValue.toString) +- if (key.startsWith("spark.")) { +- props(key) = value +- } +- } +- // Serialize the map as an array of (String, String) pairs +- execArgs = Utils.serialize(props.toArray) +- } +- execArgs +- } +- +- private def setClassLoader(): ClassLoader = { +- val oldClassLoader = Thread.currentThread.getContextClassLoader +- Thread.currentThread.setContextClassLoader(classLoader) +- oldClassLoader +- } +- +- private def restoreClassLoader(oldClassLoader: ClassLoader) { +- Thread.currentThread.setContextClassLoader(oldClassLoader) +- } +- +- override def offerRescinded(d: SchedulerDriver, o: OfferID) {} +- +- override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { +- val oldClassLoader = setClassLoader() +- try { +- logInfo("Registered as framework ID " + frameworkId.getValue) +- registeredLock.synchronized { +- isRegistered = true +- registeredLock.notifyAll() +- } +- } finally { +- restoreClassLoader(oldClassLoader) +- } +- } +- +- def waitForRegister() { +- registeredLock.synchronized { +- while (!isRegistered) { +- registeredLock.wait() +- } +- } +- } +- +- override def disconnected(d: SchedulerDriver) {} +- +- override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {} +- +- /** +- * Method called by Mesos to offer resources on slaves. We resond by asking our active task sets +- * for tasks in order of priority. We fill each node with tasks in a round-robin manner so that +- * tasks are balanced across the cluster. +- */ +- override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { +- val oldClassLoader = setClassLoader() +- try { +- synchronized { +- // Build a big list of the offerable workers, and remember their indices so that we can +- // figure out which Offer to reply to for each worker +- val offerableIndices = new ArrayBuffer[Int] +- val offerableWorkers = new ArrayBuffer[WorkerOffer] +- +- def enoughMemory(o: Offer) = { +- val mem = getResource(o.getResourcesList, "mem") +- val slaveId = o.getSlaveId.getValue +- mem >= sc.executorMemory || slaveIdsWithExecutors.contains(slaveId) +- } +- +- for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) { +- offerableIndices += index +- offerableWorkers += new WorkerOffer( +- offer.getSlaveId.getValue, +- offer.getHostname, +- getResource(offer.getResourcesList, "cpus").toInt) +- } +- +- // Call into the ClusterScheduler +- val taskLists = scheduler.resourceOffers(offerableWorkers) +- +- // Build a list of Mesos tasks for each slave +- val mesosTasks = offers.map(o => Collections.emptyList[MesosTaskInfo]()) +- for ((taskList, index) <- taskLists.zipWithIndex) { +- if (!taskList.isEmpty) { +- val offerNum = offerableIndices(index) +- val slaveId = offers(offerNum).getSlaveId.getValue +- slaveIdsWithExecutors += slaveId +- mesosTasks(offerNum) = new JArrayList[MesosTaskInfo](taskList.size) +- for (taskDesc <- taskList) { +- taskIdToSlaveId(taskDesc.taskId) = slaveId +- mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId)) +- } +- } +- } +- +- // Reply to the offers +- val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout? +- for (i <- 0 until offers.size) { +- d.launchTasks(offers(i).getId, mesosTasks(i), filters) +- } +- } +- } finally { +- restoreClassLoader(oldClassLoader) +- } +- } +- +- /** Helper function to pull out a resource from a Mesos Resources protobuf */ +- def getResource(res: JList[Resource], name: String): Double = { +- for (r <- res if r.getName == name) { +- return r.getScalar.getValue +- } +- // If we reached here, no resource with the required name was present +- throw new IllegalArgumentException("No resource called " + name + " in " + res) +- } +- +- /** Turn a Spark TaskDescription into a Mesos task */ +- def createMesosTask(task: TaskDescription, slaveId: String): MesosTaskInfo = { +- val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build() +- val cpuResource = Resource.newBuilder() +- .setName("cpus") +- .setType(Value.Type.SCALAR) +- .setScalar(Value.Scalar.newBuilder().setValue(1).build()) +- .build() +- MesosTaskInfo.newBuilder() +- .setTaskId(taskId) +- .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build()) +- .setExecutor(createExecutorInfo(slaveId)) +- .setName(task.name) +- .addResources(cpuResource) +- .setData(ByteString.copyFrom(task.serializedTask)) +- .build() +- } +- +- /** Check whether a Mesos task state represents a finished task */ +- def isFinished(state: MesosTaskState) = { +- state == MesosTaskState.TASK_FINISHED || +- state == MesosTaskState.TASK_FAILED || +- state == MesosTaskState.TASK_KILLED || +- state == MesosTaskState.TASK_LOST +- } +- +- override def statusUpdate(d: SchedulerDriver, status: TaskStatus) { +- val oldClassLoader = setClassLoader() +- try { +- val tid = status.getTaskId.getValue.toLong +- val state = TaskState.fromMesos(status.getState) +- synchronized { +- if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) { +- // We lost the executor on this slave, so remember that it's gone +- slaveIdsWithExecutors -= taskIdToSlaveId(tid) +- } +- if (isFinished(status.getState)) { +- taskIdToSlaveId.remove(tid) +- } +- } +- scheduler.statusUpdate(tid, state, status.getData.asReadOnlyByteBuffer) +- } finally { +- restoreClassLoader(oldClassLoader) +- } +- } +- +- override def error(d: SchedulerDriver, message: String) { +- val oldClassLoader = setClassLoader() +- try { +- logError("Mesos error: " + message) +- scheduler.error(message) +- } finally { +- restoreClassLoader(oldClassLoader) +- } +- } +- +- override def stop() { +- if (driver != null) { +- driver.stop() +- } +- } +- +- override def reviveOffers() { +- driver.reviveOffers() +- } +- +- override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} +- +- private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) { +- val oldClassLoader = setClassLoader() +- try { +- logInfo("Mesos slave lost: " + slaveId.getValue) +- synchronized { +- slaveIdsWithExecutors -= slaveId.getValue +- } +- scheduler.executorLost(slaveId.getValue, reason) +- } finally { +- restoreClassLoader(oldClassLoader) +- } +- } +- +- override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) { +- recordSlaveLost(d, slaveId, SlaveLost()) +- } +- +- override def executorLost(d: SchedulerDriver, executorId: ExecutorID, +- slaveId: SlaveID, status: Int) { +- logInfo("Executor lost: %s, marking slave %s as lost".format(executorId.getValue, +- slaveId.getValue)) +- recordSlaveLost(d, slaveId, ExecutorExited(status)) +- } +- +- // TODO: query Mesos for number of cores +- override def defaultParallelism() = sc.conf.getInt("spark.default.parallelism", 8) +-} +-- +1.8.3.4 (Apple Git-47) + diff --git a/spark-v0.9.0-0008-remove-unavailable-and-unnecessary-deps.patch b/spark-v0.9.0-0008-remove-unavailable-and-unnecessary-deps.patch new file mode 100644 index 0000000..6832c2c --- /dev/null +++ b/spark-v0.9.0-0008-remove-unavailable-and-unnecessary-deps.patch @@ -0,0 +1,57 @@ +From 73a6adeaa4f763b09ff25451825f05faa3caafb4 Mon Sep 17 00:00:00 2001 +From: William Benton +Date: Fri, 28 Feb 2014 09:00:04 -0600 +Subject: [PATCH] remove unavailable and unnecessary deps + +--- + project/SparkBuild.scala | 15 ++------------- + 1 file changed, 2 insertions(+), 13 deletions(-) + +diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala +index 068bf74..956312f 100644 +--- a/project/SparkBuild.scala ++++ b/project/SparkBuild.scala +@@ -183,13 +183,7 @@ object SparkBuild extends Build { + "io.netty" % "netty-all" % "4.0.13.Final", + "org.eclipse.jetty" % "jetty-server" % "7.6.8.v20121106", + /** Workaround for SPARK-959. Dependency used by org.eclipse.jetty. Fixed in ivy 2.3.0. */ +- "org.eclipse.jetty.orbit" % "javax.servlet" % "2.5.0.v201103041518" artifacts Artifact("javax.servlet", "jar", "jar"), +- "org.scalatest" %% "scalatest" % "1.9.1" % "test", +- "org.scalacheck" %% "scalacheck" % "1.10.0" % "test", +- "com.novocode" % "junit-interface" % "0.9" % "test", +- "org.easymock" % "easymock" % "3.1" % "test", +- "org.mockito" % "mockito-all" % "1.8.5" % "test", +- "commons-io" % "commons-io" % "2.4" % "test" ++ "org.eclipse.jetty.orbit" % "javax.servlet" % "2.5.0.v201103041518" artifacts Artifact("javax.servlet", "jar", "jar") + ), + + parallelExecution := true, +@@ -233,13 +227,11 @@ object SparkBuild extends Build { + "org.ow2.asm" % "asm" % "4.0", + "org.spark-project.akka" %% "akka-remote" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty), + "org.spark-project.akka" %% "akka-slf4j" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty), +- "org.spark-project.akka" %% "akka-testkit" % "2.2.3-shaded-protobuf" % "test", + "org.json4s" %% "json4s-jackson" % "3.2.6", + "it.unimi.dsi" % "fastutil" % "6.4.4", + "colt" % "colt" % "1.2.0", + "org.apache.mesos" % "mesos" % "0.13.0", + "net.java.dev.jets3t" % "jets3t" % "0.7.1", +- "org.apache.derby" % "derby" % "10.4.2.0" % "test", + "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib), + "org.apache.avro" % "avro" % "1.7.4", + "org.apache.avro" % "avro-ipc" % "1.7.4" excludeAll(excludeNetty), +@@ -248,10 +240,7 @@ object SparkBuild extends Build { + "com.codahale.metrics" % "metrics-jvm" % "3.0.0", + "com.codahale.metrics" % "metrics-json" % "3.0.0", + "com.codahale.metrics" % "metrics-ganglia" % "3.0.0", +- "com.codahale.metrics" % "metrics-graphite" % "3.0.0", +- "com.twitter" %% "chill" % "0.3.1", +- "com.twitter" % "chill-java" % "0.3.1", +- "com.clearspring.analytics" % "stream" % "2.5.1" ++ "com.codahale.metrics" % "metrics-graphite" % "3.0.0" + ) + ) + +-- +1.8.3.4 (Apple Git-47) + diff --git a/spark-v0.9.0-0009-use-Jetty-8.patch b/spark-v0.9.0-0009-use-Jetty-8.patch new file mode 100644 index 0000000..6f79761 --- /dev/null +++ b/spark-v0.9.0-0009-use-Jetty-8.patch @@ -0,0 +1,27 @@ +From cbc6b801cbf717ecab12f6cbad48e8bfd60c5e82 Mon Sep 17 00:00:00 2001 +From: William Benton +Date: Fri, 28 Feb 2014 15:16:45 -0600 +Subject: [PATCH 1/2] use Jetty 8 + +--- + project/SparkBuild.scala | 4 +--- + 1 file changed, 1 insertion(+), 3 deletions(-) + +diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala +index 956312f..63f5297 100644 +--- a/project/SparkBuild.scala ++++ b/project/SparkBuild.scala +@@ -181,9 +181,7 @@ object SparkBuild extends Build { + + libraryDependencies ++= Seq( + "io.netty" % "netty-all" % "4.0.13.Final", +- "org.eclipse.jetty" % "jetty-server" % "7.6.8.v20121106", +- /** Workaround for SPARK-959. Dependency used by org.eclipse.jetty. Fixed in ivy 2.3.0. */ +- "org.eclipse.jetty.orbit" % "javax.servlet" % "2.5.0.v201103041518" artifacts Artifact("javax.servlet", "jar", "jar") ++ "org.eclipse.jetty" % "jetty-server" % "8.1.14.v20131031" + ), + + parallelExecution := true, +-- +1.8.3.4 (Apple Git-47) + diff --git a/spark-v0.9.0-0010-use-Akka-2.3.0-RC2.patch b/spark-v0.9.0-0010-use-Akka-2.3.0-RC2.patch new file mode 100644 index 0000000..d0cd68f --- /dev/null +++ b/spark-v0.9.0-0010-use-Akka-2.3.0-RC2.patch @@ -0,0 +1,83 @@ +From 474037191b66da67aef3d2038e85448294d848c1 Mon Sep 17 00:00:00 2001 +From: William Benton +Date: Fri, 28 Feb 2014 15:31:52 -0600 +Subject: [PATCH 2/2] use Akka 2.3.0-RC2 + +--- + core/src/main/scala/org/apache/spark/deploy/Client.scala | 2 +- + core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala | 2 +- + .../src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala | 2 +- + .../main/scala/org/apache/spark/util/IndestructibleActorSystem.scala | 2 +- + project/SparkBuild.scala | 4 ++-- + 5 files changed, 6 insertions(+), 6 deletions(-) + +diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala +index 9987e23..7fda886 100644 +--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala ++++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala +@@ -116,7 +116,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends + println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.") + System.exit(-1) + +- case AssociationErrorEvent(cause, _, remoteAddress, _) => ++ case AssociationErrorEvent(cause, _, remoteAddress, _, _) => + println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.") + println(s"Cause was: $cause") + System.exit(-1) +diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +index 1415e2f..8d732db 100644 +--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala ++++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +@@ -153,7 +153,7 @@ private[spark] class AppClient( + logWarning(s"Connection to $address failed; waiting for master to reconnect...") + markDisconnected() + +- case AssociationErrorEvent(cause, _, address, _) if isPossibleMaster(address) => ++ case AssociationErrorEvent(cause, _, address, _, _) if isPossibleMaster(address) => + logWarning(s"Could not connect to $address: $cause") + + case StopAppClient => +diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala +index 1dc39c4..732a1d7 100644 +--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala ++++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala +@@ -52,7 +52,7 @@ private[spark] class WorkerWatcher(workerUrl: String) extends Actor + case AssociatedEvent(localAddress, remoteAddress, inbound) if isWorker(remoteAddress) => + logInfo(s"Successfully connected to $workerUrl") + +- case AssociationErrorEvent(cause, localAddress, remoteAddress, inbound) ++ case AssociationErrorEvent(cause, localAddress, remoteAddress, inbound, _) + if isWorker(remoteAddress) => + // These logs may not be seen if the worker (and associated pipe) has died + logError(s"Could not initialize connection to worker $workerUrl. Exiting.") +diff --git a/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala +index bf71882..08d703e 100644 +--- a/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala ++++ b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala +@@ -39,7 +39,7 @@ private[akka] class IndestructibleActorSystemImpl( + override val name: String, + applicationConfig: Config, + classLoader: ClassLoader) +- extends ActorSystemImpl(name, applicationConfig, classLoader) { ++ extends ActorSystemImpl(name, applicationConfig, classLoader, None) { + + protected override def uncaughtExceptionHandler: Thread.UncaughtExceptionHandler = { + val fallbackHandler = super.uncaughtExceptionHandler +diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala +index 63f5297..2bfa6b5 100644 +--- a/project/SparkBuild.scala ++++ b/project/SparkBuild.scala +@@ -223,8 +223,8 @@ object SparkBuild extends Build { + "com.ning" % "compress-lzf" % "1.0.0", + "org.xerial.snappy" % "snappy-java" % "1.0.5", + "org.ow2.asm" % "asm" % "4.0", +- "org.spark-project.akka" %% "akka-remote" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty), +- "org.spark-project.akka" %% "akka-slf4j" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty), ++ "com.typesafe.akka" %% "akka-remote" % "2.3.0-RC2" excludeAll(excludeNetty), ++ "com.typesafe.akka" %% "akka-slf4j" % "2.3.0-RC2" excludeAll(excludeNetty), + "org.json4s" %% "json4s-jackson" % "3.2.6", + "it.unimi.dsi" % "fastutil" % "6.4.4", + "colt" % "colt" % "1.2.0", +-- +1.8.3.4 (Apple Git-47) + diff --git a/spark-v0.9.0-0011-xmvn.patch b/spark-v0.9.0-0011-xmvn.patch new file mode 100644 index 0000000..1426527 --- /dev/null +++ b/spark-v0.9.0-0011-xmvn.patch @@ -0,0 +1,53 @@ +From eab118d8acf17f64cb76a966715bbe8f15397da5 Mon Sep 17 00:00:00 2001 +From: William Benton +Date: Fri, 28 Feb 2014 16:39:51 -0600 +Subject: [PATCH] fedora-only resolver changes + +--- + project/SparkBuild.scala | 15 ++++----------- + 1 file changed, 4 insertions(+), 11 deletions(-) + +diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala +index 2bfa6b5..895c4bb 100644 +--- a/project/SparkBuild.scala ++++ b/project/SparkBuild.scala +@@ -93,7 +93,11 @@ object SparkBuild extends Build { + + lazy val allProjects = packageProjects ++ allExternalRefs ++ Seq[ProjectReference](tools) + ++ val ivyLocal = Resolver.file("local", file("IVY_LOCAL"))(Resolver.ivyStylePatterns) ++ + def sharedSettings = Defaults.defaultSettings ++ Seq( ++ externalResolvers := Seq(new sbt.RawRepository(new org.fedoraproject.maven.connector.ivy.IvyResolver), ivyLocal), ++ + organization := "org.apache.spark", + version := "0.9.0-incubating", + scalaVersion := "2.10.3", +@@ -123,13 +127,6 @@ object SparkBuild extends Build { + // Only allow one test at a time, even across projects, since they run in the same JVM + concurrentRestrictions in Global += Tags.limit(Tags.Test, 1), + +- // also check the local Maven repository ~/.m2 +- resolvers ++= Seq(Resolver.file("Local Maven Repo", file(Path.userHome + "/.m2/repository"))), +- +- // For Sonatype publishing +- resolvers ++= Seq("sonatype-snapshots" at "https://oss.sonatype.org/content/repositories/snapshots", +- "sonatype-staging" at "https://oss.sonatype.org/service/local/staging/deploy/maven2/"), +- + publishMavenStyle := true, + + //useGpg in Global := true, +@@ -208,10 +205,6 @@ object SparkBuild extends Build { + + def coreSettings = sharedSettings ++ Seq( + name := "spark-core", +- resolvers ++= Seq( +- "JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/", +- "Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/" +- ), + + libraryDependencies ++= Seq( + "com.google.guava" % "guava" % "14.0.1", +-- +1.8.3.4 (Apple Git-47) + diff --git a/spark.spec b/spark.spec new file mode 100644 index 0000000..863a539 --- /dev/null +++ b/spark.spec @@ -0,0 +1,273 @@ +%global spark_version 0.9.0 +%global spark_version_suffix -incubating +%global scala_version 2.10 + +%if 0%{?fedora} >= 20 +%global want_hadoop 1 +%else +%global want_hadoop 0 +%endif + +%global remap_version_to_installed() sed -i -e 's/"%{1}"[\t ]*%%[\t ]*"%{2}"[\t ]*%%[\t ]*"[^"]*"/"%{1}" %% "%{2}" %% "'$(rpm -q --qf "%%%%{version}" $(rpm -q --whatprovides "mvn(%{1}:%{2})" ))'"/g' %{3} + +%global climbing_nemesis() ./climbing-nemesis.py %{1} %{2} ivy-local --log debug --version $(rpm -q --qf "%%%%{version}" $(rpm -q --whatprovides "mvn(%{1}:%{2})" )) + +Name: spark +Version: %{spark_version} +Release: 0.3%{?dist} +Summary: Lightning-fast cluster computing + +License: ASL 2.0 +URL: http://spark.apache.org +Source0: https://github.com/apache/spark/archive/v%{spark_version}%{spark_version_suffix}.tar.gz +Source1: https://raw.github.com/willb/rpm-packaging/v%{spark_version}/spark-packaging/xmvn-sbt +Source2: https://raw.github.com/willb/rpm-packaging/v%{spark_version}/spark-packaging/xmvn-sbt.properties + +Patch0: spark-v0.9.0-0001-Replace-lift-json-with-json4s-jackson.patch +Patch1: spark-v0.9.0-0002-use-sbt-0.13.1.patch +Patch2: spark-v0.9.0-0003-Removed-sbt-plugins.patch +Patch3: spark-v0.9.0-0004-removed-examples.patch +Patch4: spark-v0.9.0-0005-Removed-code-depending-on-Kryo.patch +Patch5: spark-v0.9.0-0006-Remove-functionality-depending-on-stream-lib.patch +Patch6: spark-v0.9.0-0007-Removed-mesos.patch +Patch7: spark-v0.9.0-0008-remove-unavailable-and-unnecessary-deps.patch +Patch8: spark-v0.9.0-0009-use-Jetty-8.patch +Patch9: spark-v0.9.0-0010-use-Akka-2.3.0-RC2.patch +Patch10: spark-v0.9.0-0011-xmvn.patch + +BuildArch: noarch +BuildRequires: sbt >= 0.13.1-5 +BuildRequires: scala +BuildRequires: python +BuildRequires: maven-local +BuildRequires: javapackages-tools +Requires: javapackages-tools +Requires: scala + +BuildRequires: jetty8 +Requires: jetty8 + +BuildRequires: plexus-containers-component-annotations + +BuildRequires: mvn(org.json4s:json4s-jackson_%{scala_version}) +Requires: mvn(org.json4s:json4s-jackson_%{scala_version}) + +BuildRequires: mvn(com.thoughtworks.paranamer:paranamer) +Requires: mvn(com.thoughtworks.paranamer:paranamer) + +BuildRequires: mvn(com.codahale.metrics:metrics-core) +BuildRequires: mvn(com.codahale.metrics:metrics-ganglia) +BuildRequires: mvn(com.codahale.metrics:metrics-graphite) +BuildRequires: mvn(com.codahale.metrics:metrics-json) +BuildRequires: mvn(com.codahale.metrics:metrics-jvm) +BuildRequires: mvn(com.google.code.findbugs:jsr305) +BuildRequires: mvn(com.google.guava:guava) +BuildRequires: mvn(commons-daemon:commons-daemon) +BuildRequires: mvn(com.ning:compress-lzf) +BuildRequires: mvn(io.netty:netty-all) +BuildRequires: mvn(it.unimi.dsi:fastutil) +BuildRequires: mvn(log4j:log4j) +BuildRequires: mvn(net.java.dev.jets3t:jets3t) +%if %{want_hadoop} +BuildRequires: mvn(org.apache.hadoop:hadoop-client) +%endif +BuildRequires: mvn(org.easymock:easymock) +BuildRequires: mvn(org.eclipse.jetty:jetty-server) +BuildRequires: mvn(org.eclipse.jetty.orbit:javax.servlet) +BuildRequires: mvn(org.jblas:jblas) +BuildRequires: mvn(org.ow2.asm:asm) +BuildRequires: mvn(org.slf4j:slf4j-api) +BuildRequires: mvn(org.slf4j:slf4j-log4j12) +BuildRequires: mvn(com.typesafe.akka:akka-actor_%{scala_version}) +BuildRequires: mvn(com.typesafe.akka:akka-remote_%{scala_version}) +BuildRequires: mvn(org.xerial.snappy:snappy-java) +BuildRequires: mvn(com.freevariable.lancer:lancer) + +Requires: mvn(com.codahale.metrics:metrics-core) +Requires: mvn(com.codahale.metrics:metrics-ganglia) +Requires: mvn(com.codahale.metrics:metrics-graphite) +Requires: mvn(com.codahale.metrics:metrics-json) +Requires: mvn(com.codahale.metrics:metrics-jvm) +Requires: mvn(com.google.code.findbugs:jsr305) +Requires: mvn(com.google.guava:guava) +Requires: mvn(commons-daemon:commons-daemon) +Requires: mvn(com.ning:compress-lzf) +Requires: mvn(io.netty:netty-all) +Requires: mvn(it.unimi.dsi:fastutil) +Requires: mvn(log4j:log4j) +Requires: mvn(net.java.dev.jets3t:jets3t) +%if %{want_hadoop} +Requires: mvn(org.apache.hadoop:hadoop-client) +%endif +Requires: mvn(org.apache.zookeeper:zookeeper) +Requires: mvn(org.easymock:easymock) +Requires: mvn(org.eclipse.jetty:jetty-server) +Requires: mvn(org.eclipse.jetty.orbit:javax.servlet) +Requires: mvn(org.jblas:jblas) +Requires: mvn(org.ow2.asm:asm) +Requires: mvn(org.slf4j:slf4j-api) +Requires: mvn(org.slf4j:slf4j-log4j12) +Requires: mvn(com.typesafe.akka:akka-actor_%{scala_version}) +Requires: mvn(com.typesafe.akka:akka-remote_%{scala_version}) +Requires: mvn(org.xerial.snappy:snappy-java) +Requires: mvn(com.freevariable.lancer:lancer) + +%description + +Apache Spark is a fast and general engine for large-scale data processing. + +%package javadoc +Summary: Javadoc for %{name} + +%description javadoc +Javadoc for %{name}. + +%prep +%setup -q -n %{name}-%{spark_version}%{spark_version_suffix} +%patch0 -p1 +%patch1 -p1 +%patch2 -p1 +%patch3 -p1 +%patch4 -p1 +%patch5 -p1 +%patch6 -p1 +%patch7 -p1 +%patch8 -p1 +%patch9 -p1 +%patch10 -p1 + +sed -i -e 's/\(val [A-Z]\+_JVM_VERSION =[^1]\+\)1.6"/\11.7"/' project/SparkBuild.scala + +# replace Colt with Lancer +sed -i -e 's/"colt.*1[.]2[.]0"/"com.freevariable.lancer" % "lancer" % "0.0.1"/' project/SparkBuild.scala + +for jetfile in $(find . -name \*.scala | xargs grep -l cern\\.jet) ; do + sed -i -e 's|cern[.]jet[.]random[.]engine|com.freevariable.lancer.random|' $jetfile + sed -i -e 's|cern[.]jet[.]random|com.freevariable.lancer.random|' $jetfile + sed -i -e 's|cern[.]jet[.]stat|com.freevariable.lancer.stat|' $jetfile +done + +# remove examples dependent upon Colt functionality not yet available in Lancer +rm ./examples/src/main/scala/org/apache/spark/examples/LocalALS.scala +rm ./examples/src/main/scala/org/apache/spark/examples/SparkALS.scala + +# remove chill dependency (not available yet) +sed -i -e '/com.twitter.*chill/d' project/SparkBuild.scala + +# remove stream-lib dependency (not available yet) +sed -i -e '/com.clearspring.*stream/d' project/SparkBuild.scala + +# remove avro because it's only used for flume (which we don't build) +sed -i -e '/org.apache.*avro/d' project/SparkBuild.scala + +# remove mesos dependency (java support not available yet) +sed -i -e '/org[.]apache.*mesos/d' project/SparkBuild.scala + +# remove all test deps for now +sed -i -e '/%[[:space:]]*"test"/d' project/SparkBuild.scala + +# fix up json4s-jackson version +sed -i -e 's|\(json4s-jackson"[^"]*"\)3[.]2[.]6|\13.2.7|' project/SparkBuild.scala + +# don't use scala bundled jline +sed -i -e 's|"org.scala-lang".*"jline"|"jline" % "jline"|g' project/SparkBuild.scala + +mkdir boot + +# remove bundled sbt script +rm -rf sbt + +cp %{SOURCE1} sbt-xmvn +chmod 755 sbt-xmvn + +cp %{SOURCE2} xmvn-sbt.properties + +%build + +export XMVN_CLASSPATH=$(build-classpath aether/api guava ivy maven/maven-model plexus-classworlds plexus-containers/plexus-container-default plexus/utils xbean/xbean-reflect xmvn/xmvn-connector xmvn/xmvn-core atinject google-guice-no_aop) + +export SPARK_HADOOP_VERSION=2.2.0 +export DEFAULT_IS_NEW_HADOOP=true + +mkdir ivy-local +cp -r /usr/share/sbt/ivy-local/* ivy-local + +export SBT_BOOT_DIR=boot +export SBT_IVY_DIR=ivy-local + +mkdir lib + +for f in $(echo ${XMVN_CLASSPATH} | tr : \ ); do + cp $f lib +done + +cp /usr/share/java/plexus/containers-component-annotations.jar lib + +for sub in project tools bagel mllib streaming core graphx repl; do + ln -s $(pwd)/lib $sub/lib +done + +# HACK HACK HACK +(echo q | SBT_BOOT_PROPERTIES=/etc/sbt/rpmbuild-sbt.boot.properties sbt quit) || true +cp lib/* boot/scala-2.10.3/lib/ + +alltargets() { for f in "$@" ; do echo $f/package $f/makePom $f/doc $f/publishLocal; done } + +export SBT_BOOT_PROPERTIES=xmvn-sbt.properties + +# ./sbt-xmvn core/package core/makePom core/doc core/publishLocal +# NB: repl doesn't build atm due to jline +./sbt-xmvn $(alltargets core mllib graphx bagel streaming) + +%install +mkdir -p %{buildroot}/%{_javadir}/%{name} +mkdir -p %{buildroot}/%{_mavenpomdir} + +mkdir -p %{buildroot}/%{_javadocdir}/%{name} +for apidir in $(find . -name api -type d) ; do + pushd $apidir + mod=$(echo $apidir | cut -f2 -d/) + mkdir -p %{buildroot}/%{_javadocdir}/%{name}/$mod + cp -rp . %{buildroot}/%{_javadocdir}/%{name}/$mod + popd +done + +for jar in $(find . -name \*.jar | grep target | grep _%{scala_version}-%{spark_version}%{spark_version_suffix}.jar) ; do + install -m 644 $jar %{buildroot}/%{_javadir}/%{name}/$(echo $jar | cut -f5 -d/ | cut -f1 -d_).jar +done + +declare -a shortnames + +for pom in $(find . -name \*.pom | grep target | grep _%{scala_version}-%{spark_version}%{spark_version_suffix}.pom ) ; do + shortname=$(echo $pom | cut -f5 -d/ | cut -f1 -d_) + echo installing POM $pom to %{_mavenpomdir}/JPP.%{name}-${shortname}.pom + install -pm 644 $pom %{buildroot}/%{_mavenpomdir}/JPP.%{name}-${shortname}.pom + echo %{_mavenpomdir}/JPP.%{name}-${shortname}.pom >> .rpm_pomfiles + shortnames=( "${shortnames[@]}" $shortname ) +done + +for sub in ${shortnames[@]} ; do + echo running add_maven_depmap JPP.%{name}-${sub}.pom %{name}/${sub}.jar + %add_maven_depmap JPP.%{name}-${sub}.pom %{name}/${sub}.jar +done + +%files -f .mfiles +%dir %{_javadir}/%{name} + +%doc LICENSE README.md + +%files javadoc +%{_javadocdir}/%{name} +%doc LICENSE + + +%changelog + +* Tue Mar 11 2014 William Benton - 0.9.0-0.3 +- fixes to work with newer Fedora sbt package + +* Sat Mar 1 2014 William Benton - 0.9.0-0.2 +- include mllib, bagel, streaming, and graphx + +* Mon Feb 10 2014 William Benton - 0.9.0-0.1 +- initial package diff --git a/xmvn-sbt b/xmvn-sbt new file mode 100644 index 0000000..873b630 --- /dev/null +++ b/xmvn-sbt @@ -0,0 +1,18 @@ +#!/bin/sh + +if \[ $(rpm -q --queryformat '%{version}' fedora-release) -ge 21 \] ; then +export JLINE=jline +else +export JLINE=jline2 +fi + +export SBT_BOOT_DIR=${SBT_BOOT_DIR:-/usr/share/sbt/boot} +export SBT_IVY_DIR=${SBT_IVY_DIR:-/usr/share/sbt/ivy-local} +export SBT_BOOT_PROPERTIES=${SBT_BOOT_PROPERTIES:-/etc/sbt/sbt.boot.properties} +export BASE_CLASSPATH=$(build-classpath scala ivy sbt ${JLINE} jansi test-interface sbinary) +export XMVN_CLASSPATH=$(build-classpath aether/api guava ivy maven/maven-model plexus-classworlds plexus-containers/plexus-container-default plexus/utils xbean/xbean-reflect xmvn/xmvn-connector xmvn/xmvn-core xmvn atinject google-guice-no_aop) +export JAVA_OPTS="-Xms512M -Xmx1536M -Xss1M -XX:+CMSClassUnloadingEnabled" + +echo CLASSPATH is $BASE_CLASSPATH:${XMVN_CLASSPATH} + +java -Xms512M -Xmx1536M -Xss1M -XX:+CMSClassUnloadingEnabled -Dsisu.debug=1 -Divy.mode=local -Divysettings.xml=/etc/ivy/ivysettings.xml -Dfedora.sbt.boot.dir=${SBT_BOOT_DIR} -Dfedora.sbt.ivy.dir=${SBT_IVY_DIR} -Dsbt.boot.properties=${SBT_BOOT_PROPERTIES} -cp ${BASE_CLASSPATH}:${XMVN_CLASSPATH} xsbt.boot.Boot "$@" diff --git a/xmvn-sbt.properties b/xmvn-sbt.properties new file mode 100644 index 0000000..d9cabb4 --- /dev/null +++ b/xmvn-sbt.properties @@ -0,0 +1,20 @@ +[scala] + version: 2.10.3 + +[app] + org: ${sbt.organization-org.scala-sbt} + name: sbt + version: ${sbt.version-read(sbt.version)[0.13.1]} + class: ${sbt.main.class-sbt.xMain} + components: xsbti,extra + cross-versioned: ${sbt.cross.versioned-false} + +[boot] + directory: ${fedora.sbt.boot.dir-/usr/share/sbt/boot} + +[log] + level: debug + +[ivy] + ivy-home: ${fedora.sbt.ivy.dir-ivy-local} + checksums: ${sbt.checksums-sha1,md5}