Blob Blame History Raw
From c321a448cd80331df07cda0f5f20955b3b148aac Mon Sep 17 00:00:00 2001
From: William Benton <willb@redhat.com>
Date: Thu, 27 Feb 2014 17:05:12 -0600
Subject: [PATCH 7/7] Removed mesos

---
 .../main/scala/org/apache/spark/SparkContext.scala |  15 -
 .../main/scala/org/apache/spark/TaskState.scala    |  21 --
 .../spark/executor/MesosExecutorBackend.scala      | 104 -------
 .../mesos/CoarseMesosSchedulerBackend.scala        | 289 -----------------
 .../cluster/mesos/MesosSchedulerBackend.scala      | 344 ---------------------
 5 files changed, 773 deletions(-)
 delete mode 100644 core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
 delete mode 100644 core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 delete mode 100644 core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 566472e..f3b2941 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -36,7 +36,6 @@ import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, Sequence
   TextInputFormat}
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob}
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
-import org.apache.mesos.MesosNativeLibrary
 
 import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
 import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
@@ -44,7 +43,6 @@ import org.apache.spark.rdd._
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend,
   SparkDeploySchedulerBackend, SimrSchedulerBackend}
-import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
 import org.apache.spark.scheduler.local.LocalBackend
 import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
 import org.apache.spark.ui.SparkUI
@@ -1281,19 +1279,6 @@ object SparkContext {
         scheduler.initialize(backend)
         scheduler
 
-      case mesosUrl @ MESOS_REGEX(_) =>
-        MesosNativeLibrary.load()
-        val scheduler = new TaskSchedulerImpl(sc)
-        val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", false)
-        val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs
-        val backend = if (coarseGrained) {
-          new CoarseMesosSchedulerBackend(scheduler, sc, url, appName)
-        } else {
-          new MesosSchedulerBackend(scheduler, sc, url, appName)
-        }
-        scheduler.initialize(backend)
-        scheduler
-
       case SIMR_REGEX(simrUrl) =>
         val scheduler = new TaskSchedulerImpl(sc)
         val backend = new SimrSchedulerBackend(scheduler, sc, simrUrl)
diff --git a/core/src/main/scala/org/apache/spark/TaskState.scala b/core/src/main/scala/org/apache/spark/TaskState.scala
index 0bf1e4a..cdd8baf 100644
--- a/core/src/main/scala/org/apache/spark/TaskState.scala
+++ b/core/src/main/scala/org/apache/spark/TaskState.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark
 
-import org.apache.mesos.Protos.{TaskState => MesosTaskState}
-
 private[spark] object TaskState extends Enumeration {
 
   val LAUNCHING, RUNNING, FINISHED, FAILED, KILLED, LOST = Value
@@ -28,23 +26,4 @@ private[spark] object TaskState extends Enumeration {
   type TaskState = Value
 
   def isFinished(state: TaskState) = FINISHED_STATES.contains(state)
-
-  def toMesos(state: TaskState): MesosTaskState = state match {
-    case LAUNCHING => MesosTaskState.TASK_STARTING
-    case RUNNING => MesosTaskState.TASK_RUNNING
-    case FINISHED => MesosTaskState.TASK_FINISHED
-    case FAILED => MesosTaskState.TASK_FAILED
-    case KILLED => MesosTaskState.TASK_KILLED
-    case LOST => MesosTaskState.TASK_LOST
-  }
-
-  def fromMesos(mesosState: MesosTaskState): TaskState = mesosState match {
-    case MesosTaskState.TASK_STAGING => LAUNCHING
-    case MesosTaskState.TASK_STARTING => LAUNCHING
-    case MesosTaskState.TASK_RUNNING => RUNNING
-    case MesosTaskState.TASK_FINISHED => FINISHED
-    case MesosTaskState.TASK_FAILED => FAILED
-    case MesosTaskState.TASK_KILLED => KILLED
-    case MesosTaskState.TASK_LOST => LOST
-  }
 }
diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
deleted file mode 100644
index b56d8c9..0000000
--- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.executor
-
-import java.nio.ByteBuffer
-
-import com.google.protobuf.ByteString
-
-import org.apache.mesos.{Executor => MesosExecutor, MesosExecutorDriver, MesosNativeLibrary, ExecutorDriver}
-import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _}
-
-import org.apache.spark.Logging
-import org.apache.spark.TaskState
-import org.apache.spark.TaskState.TaskState
-import org.apache.spark.util.Utils
-
-
-private[spark] class MesosExecutorBackend
-  extends MesosExecutor
-  with ExecutorBackend
-  with Logging {
-
-  var executor: Executor = null
-  var driver: ExecutorDriver = null
-
-  override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
-    val mesosTaskId = TaskID.newBuilder().setValue(taskId.toString).build()
-    driver.sendStatusUpdate(MesosTaskStatus.newBuilder()
-      .setTaskId(mesosTaskId)
-      .setState(TaskState.toMesos(state))
-      .setData(ByteString.copyFrom(data))
-      .build())
-  }
-
-  override def registered(
-      driver: ExecutorDriver,
-      executorInfo: ExecutorInfo,
-      frameworkInfo: FrameworkInfo,
-      slaveInfo: SlaveInfo) {
-    logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue)
-    this.driver = driver
-    val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray)
-    executor = new Executor(
-      executorInfo.getExecutorId.getValue,
-      slaveInfo.getHostname,
-      properties)
-  }
-
-  override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) {
-    val taskId = taskInfo.getTaskId.getValue.toLong
-    if (executor == null) {
-      logError("Received launchTask but executor was null")
-    } else {
-      executor.launchTask(this, taskId, taskInfo.getData.asReadOnlyByteBuffer)
-    }
-  }
-
-  override def error(d: ExecutorDriver, message: String) {
-    logError("Error from Mesos: " + message)
-  }
-
-  override def killTask(d: ExecutorDriver, t: TaskID) {
-    if (executor == null) {
-      logError("Received KillTask but executor was null")
-    } else {
-      executor.killTask(t.getValue.toLong)
-    }
-  }
-
-  override def reregistered(d: ExecutorDriver, p2: SlaveInfo) {}
-
-  override def disconnected(d: ExecutorDriver) {}
-
-  override def frameworkMessage(d: ExecutorDriver, data: Array[Byte]) {}
-
-  override def shutdown(d: ExecutorDriver) {}
-}
-
-/**
- * Entry point for Mesos executor.
- */
-private[spark] object MesosExecutorBackend {
-  def main(args: Array[String]) {
-    MesosNativeLibrary.load()
-    // Create a new Executor and start it running
-    val runner = new MesosExecutorBackend()
-    new MesosExecutorDriver(runner).run()
-  }
-}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
deleted file mode 100644
index c27049b..0000000
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ /dev/null
@@ -1,289 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import java.io.File
-import java.util.{ArrayList => JArrayList, List => JList}
-import java.util.Collections
-
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import scala.collection.JavaConversions._
-
-import com.google.protobuf.ByteString
-import org.apache.mesos.{Scheduler => MScheduler}
-import org.apache.mesos._
-import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
-
-import org.apache.spark.{SparkException, Logging, SparkContext, TaskState}
-import org.apache.spark.scheduler.TaskSchedulerImpl
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-
-/**
- * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
- * onto each Mesos node for the duration of the Spark job instead of relinquishing cores whenever
- * a task is done. It launches Spark tasks within the coarse-grained Mesos tasks using the
- * CoarseGrainedSchedulerBackend mechanism. This class is useful for lower and more predictable
- * latency.
- *
- * Unfortunately this has a bit of duplication from MesosSchedulerBackend, but it seems hard to
- * remove this.
- */
-private[spark] class CoarseMesosSchedulerBackend(
-    scheduler: TaskSchedulerImpl,
-    sc: SparkContext,
-    master: String,
-    appName: String)
-  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
-  with MScheduler
-  with Logging {
-
-  val MAX_SLAVE_FAILURES = 2     // Blacklist a slave after this many failures
-
-  // Lock used to wait for scheduler to be registered
-  var isRegistered = false
-  val registeredLock = new Object()
-
-  // Driver for talking to Mesos
-  var driver: SchedulerDriver = null
-
-  // Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
-  val maxCores = conf.get("spark.cores.max",  Int.MaxValue.toString).toInt
-
-  // Cores we have acquired with each Mesos task ID
-  val coresByTaskId = new HashMap[Int, Int]
-  var totalCoresAcquired = 0
-
-  val slaveIdsWithExecutors = new HashSet[String]
-
-  val taskIdToSlaveId = new HashMap[Int, String]
-  val failuresBySlaveId = new HashMap[String, Int] // How many times tasks on each slave failed
-
-  val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException(
-    "Spark home is not set; set it through the spark.home system " +
-    "property, the SPARK_HOME environment variable or the SparkContext constructor"))
-
-  val extraCoresPerSlave = conf.getInt("spark.mesos.extra.cores", 0)
-
-  var nextMesosTaskId = 0
-
-  def newMesosTaskId(): Int = {
-    val id = nextMesosTaskId
-    nextMesosTaskId += 1
-    id
-  }
-
-  override def start() {
-    super.start()
-
-    synchronized {
-      new Thread("CoarseMesosSchedulerBackend driver") {
-        setDaemon(true)
-        override def run() {
-          val scheduler = CoarseMesosSchedulerBackend.this
-          val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(appName).build()
-          driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
-          try { {
-            val ret = driver.run()
-            logInfo("driver.run() returned with code " + ret)
-          }
-          } catch {
-            case e: Exception => logError("driver.run() failed", e)
-          }
-        }
-      }.start()
-
-      waitForRegister()
-    }
-  }
-
-  def createCommand(offer: Offer, numCores: Int): CommandInfo = {
-    val environment = Environment.newBuilder()
-    sc.executorEnvs.foreach { case (key, value) =>
-      environment.addVariables(Environment.Variable.newBuilder()
-        .setName(key)
-        .setValue(value)
-        .build())
-    }
-    val command = CommandInfo.newBuilder()
-      .setEnvironment(environment)
-    val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
-      conf.get("spark.driver.host"),
-      conf.get("spark.driver.port"),
-      CoarseGrainedSchedulerBackend.ACTOR_NAME)
-    val uri = conf.get("spark.executor.uri", null)
-    if (uri == null) {
-      val runScript = new File(sparkHome, "./bin/spark-class").getCanonicalPath
-      command.setValue(
-        "\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d".format(
-          runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
-    } else {
-      // Grab everything to the first '.'. We'll use that and '*' to
-      // glob the directory "correctly".
-      val basename = uri.split('/').last.split('.').head
-      command.setValue(
-        "cd %s*; ./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d"
-          .format(basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
-      command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
-    }
-    command.build()
-  }
-
-  override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
-
-  override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
-    logInfo("Registered as framework ID " + frameworkId.getValue)
-    registeredLock.synchronized {
-      isRegistered = true
-      registeredLock.notifyAll()
-    }
-  }
-
-  def waitForRegister() {
-    registeredLock.synchronized {
-      while (!isRegistered) {
-        registeredLock.wait()
-      }
-    }
-  }
-
-  override def disconnected(d: SchedulerDriver) {}
-
-  override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {}
-
-  /**
-   * Method called by Mesos to offer resources on slaves. We respond by launching an executor,
-   * unless we've already launched more than we wanted to.
-   */
-  override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
-    synchronized {
-      val filters = Filters.newBuilder().setRefuseSeconds(-1).build()
-
-      for (offer <- offers) {
-        val slaveId = offer.getSlaveId.toString
-        val mem = getResource(offer.getResourcesList, "mem")
-        val cpus = getResource(offer.getResourcesList, "cpus").toInt
-        if (totalCoresAcquired < maxCores && mem >= sc.executorMemory && cpus >= 1 &&
-            failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
-            !slaveIdsWithExecutors.contains(slaveId)) {
-          // Launch an executor on the slave
-          val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
-          totalCoresAcquired += cpusToUse
-          val taskId = newMesosTaskId()
-          taskIdToSlaveId(taskId) = slaveId
-          slaveIdsWithExecutors += slaveId
-          coresByTaskId(taskId) = cpusToUse
-          val task = MesosTaskInfo.newBuilder()
-            .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
-            .setSlaveId(offer.getSlaveId)
-            .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave))
-            .setName("Task " + taskId)
-            .addResources(createResource("cpus", cpusToUse))
-            .addResources(createResource("mem", sc.executorMemory))
-            .build()
-          d.launchTasks(offer.getId, Collections.singletonList(task), filters)
-        } else {
-          // Filter it out
-          d.launchTasks(offer.getId, Collections.emptyList[MesosTaskInfo](), filters)
-        }
-      }
-    }
-  }
-
-  /** Helper function to pull out a resource from a Mesos Resources protobuf */
-  private def getResource(res: JList[Resource], name: String): Double = {
-    for (r <- res if r.getName == name) {
-      return r.getScalar.getValue
-    }
-    // If we reached here, no resource with the required name was present
-    throw new IllegalArgumentException("No resource called " + name + " in " + res)
-  }
-
-  /** Build a Mesos resource protobuf object */
-  private def createResource(resourceName: String, quantity: Double): Protos.Resource = {
-    Resource.newBuilder()
-      .setName(resourceName)
-      .setType(Value.Type.SCALAR)
-      .setScalar(Value.Scalar.newBuilder().setValue(quantity).build())
-      .build()
-  }
-
-  /** Check whether a Mesos task state represents a finished task */
-  private def isFinished(state: MesosTaskState) = {
-    state == MesosTaskState.TASK_FINISHED ||
-      state == MesosTaskState.TASK_FAILED ||
-      state == MesosTaskState.TASK_KILLED ||
-      state == MesosTaskState.TASK_LOST
-  }
-
-  override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
-    val taskId = status.getTaskId.getValue.toInt
-    val state = status.getState
-    logInfo("Mesos task " + taskId + " is now " + state)
-    synchronized {
-      if (isFinished(state)) {
-        val slaveId = taskIdToSlaveId(taskId)
-        slaveIdsWithExecutors -= slaveId
-        taskIdToSlaveId -= taskId
-        // Remove the cores we have remembered for this task, if it's in the hashmap
-        for (cores <- coresByTaskId.get(taskId)) {
-          totalCoresAcquired -= cores
-          coresByTaskId -= taskId
-        }
-        // If it was a failure, mark the slave as failed for blacklisting purposes
-        if (state == MesosTaskState.TASK_FAILED || state == MesosTaskState.TASK_LOST) {
-          failuresBySlaveId(slaveId) = failuresBySlaveId.getOrElse(slaveId, 0) + 1
-          if (failuresBySlaveId(slaveId) >= MAX_SLAVE_FAILURES) {
-            logInfo("Blacklisting Mesos slave " + slaveId + " due to too many failures; " +
-                "is Spark installed on it?")
-          }
-        }
-        driver.reviveOffers() // In case we'd rejected everything before but have now lost a node
-      }
-    }
-  }
-
-  override def error(d: SchedulerDriver, message: String) {
-    logError("Mesos error: " + message)
-    scheduler.error(message)
-  }
-
-  override def stop() {
-    super.stop()
-    if (driver != null) {
-      driver.stop()
-    }
-  }
-
-  override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
-
-  override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
-    logInfo("Mesos slave lost: " + slaveId.getValue)
-    synchronized {
-      if (slaveIdsWithExecutors.contains(slaveId.getValue)) {
-        // Note that the slave ID corresponds to the executor ID on that slave
-        slaveIdsWithExecutors -= slaveId.getValue
-        removeExecutor(slaveId.getValue, "Mesos slave lost")
-      }
-    }
-  }
-
-  override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) {
-    logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue))
-    slaveLost(d, s)
-  }
-}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
deleted file mode 100644
index 4978148..0000000
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ /dev/null
@@ -1,344 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import java.io.File
-import java.util.{ArrayList => JArrayList, List => JList}
-import java.util.Collections
-
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import scala.collection.JavaConversions._
-
-import com.google.protobuf.ByteString
-import org.apache.mesos.{Scheduler => MScheduler}
-import org.apache.mesos._
-import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
-
-import org.apache.spark.{Logging, SparkException, SparkContext, TaskState}
-import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SchedulerBackend, SlaveLost,
-  TaskDescription, TaskSchedulerImpl, WorkerOffer}
-import org.apache.spark.util.Utils
-
-/**
- * A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a
- * separate Mesos task, allowing multiple applications to share cluster nodes both in space (tasks
- * from multiple apps can run on different cores) and in time (a core can switch ownership).
- */
-private[spark] class MesosSchedulerBackend(
-    scheduler: TaskSchedulerImpl,
-    sc: SparkContext,
-    master: String,
-    appName: String)
-  extends SchedulerBackend
-  with MScheduler
-  with Logging {
-
-  // Lock used to wait for scheduler to be registered
-  var isRegistered = false
-  val registeredLock = new Object()
-
-  // Driver for talking to Mesos
-  var driver: SchedulerDriver = null
-
-  // Which slave IDs we have executors on
-  val slaveIdsWithExecutors = new HashSet[String]
-  val taskIdToSlaveId = new HashMap[Long, String]
-
-  // An ExecutorInfo for our tasks
-  var execArgs: Array[Byte] = null
-
-  var classLoader: ClassLoader = null
-
-  override def start() {
-    synchronized {
-      classLoader = Thread.currentThread.getContextClassLoader
-
-      new Thread("MesosSchedulerBackend driver") {
-        setDaemon(true)
-        override def run() {
-          val scheduler = MesosSchedulerBackend.this
-          val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(appName).build()
-          driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
-          try {
-            val ret = driver.run()
-            logInfo("driver.run() returned with code " + ret)
-          } catch {
-            case e: Exception => logError("driver.run() failed", e)
-          }
-        }
-      }.start()
-
-      waitForRegister()
-    }
-  }
-
-  def createExecutorInfo(execId: String): ExecutorInfo = {
-    val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException(
-      "Spark home is not set; set it through the spark.home system " +
-      "property, the SPARK_HOME environment variable or the SparkContext constructor"))
-    val environment = Environment.newBuilder()
-    sc.executorEnvs.foreach { case (key, value) =>
-      environment.addVariables(Environment.Variable.newBuilder()
-        .setName(key)
-        .setValue(value)
-        .build())
-    }
-    val command = CommandInfo.newBuilder()
-      .setEnvironment(environment)
-    val uri = sc.conf.get("spark.executor.uri", null)
-    if (uri == null) {
-      command.setValue(new File(sparkHome, "/sbin/spark-executor").getCanonicalPath)
-    } else {
-      // Grab everything to the first '.'. We'll use that and '*' to
-      // glob the directory "correctly".
-      val basename = uri.split('/').last.split('.').head
-      command.setValue("cd %s*; ./sbin/spark-executor".format(basename))
-      command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
-    }
-    val memory = Resource.newBuilder()
-      .setName("mem")
-      .setType(Value.Type.SCALAR)
-      .setScalar(Value.Scalar.newBuilder().setValue(sc.executorMemory).build())
-      .build()
-    ExecutorInfo.newBuilder()
-      .setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
-      .setCommand(command)
-      .setData(ByteString.copyFrom(createExecArg()))
-      .addResources(memory)
-      .build()
-  }
-
-  /**
-   * Create and serialize the executor argument to pass to Mesos. Our executor arg is an array
-   * containing all the spark.* system properties in the form of (String, String) pairs.
-   */
-  private def createExecArg(): Array[Byte] = {
-    if (execArgs == null) {
-      val props = new HashMap[String, String]
-      val iterator = System.getProperties.entrySet.iterator
-      while (iterator.hasNext) {
-        val entry = iterator.next
-        val (key, value) = (entry.getKey.toString, entry.getValue.toString)
-        if (key.startsWith("spark.")) {
-          props(key) = value
-        }
-      }
-      // Serialize the map as an array of (String, String) pairs
-      execArgs = Utils.serialize(props.toArray)
-    }
-    execArgs
-  }
-
-  private def setClassLoader(): ClassLoader = {
-    val oldClassLoader = Thread.currentThread.getContextClassLoader
-    Thread.currentThread.setContextClassLoader(classLoader)
-    oldClassLoader
-  }
-
-  private def restoreClassLoader(oldClassLoader: ClassLoader) {
-    Thread.currentThread.setContextClassLoader(oldClassLoader)
-  }
-
-  override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
-
-  override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
-    val oldClassLoader = setClassLoader()
-    try {
-      logInfo("Registered as framework ID " + frameworkId.getValue)
-      registeredLock.synchronized {
-        isRegistered = true
-        registeredLock.notifyAll()
-      }
-    } finally {
-      restoreClassLoader(oldClassLoader)
-    }
-  }
-
-  def waitForRegister() {
-    registeredLock.synchronized {
-      while (!isRegistered) {
-        registeredLock.wait()
-      }
-    }
-  }
-
-  override def disconnected(d: SchedulerDriver) {}
-
-  override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {}
-
-  /**
-   * Method called by Mesos to offer resources on slaves. We resond by asking our active task sets
-   * for tasks in order of priority. We fill each node with tasks in a round-robin manner so that
-   * tasks are balanced across the cluster.
-   */
-  override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
-    val oldClassLoader = setClassLoader()
-    try {
-      synchronized {
-        // Build a big list of the offerable workers, and remember their indices so that we can
-        // figure out which Offer to reply to for each worker
-        val offerableIndices = new ArrayBuffer[Int]
-        val offerableWorkers = new ArrayBuffer[WorkerOffer]
-
-        def enoughMemory(o: Offer) = {
-          val mem = getResource(o.getResourcesList, "mem")
-          val slaveId = o.getSlaveId.getValue
-          mem >= sc.executorMemory || slaveIdsWithExecutors.contains(slaveId)
-        }
-
-        for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) {
-          offerableIndices += index
-          offerableWorkers += new WorkerOffer(
-            offer.getSlaveId.getValue,
-            offer.getHostname,
-            getResource(offer.getResourcesList, "cpus").toInt)
-        }
-
-        // Call into the ClusterScheduler
-        val taskLists = scheduler.resourceOffers(offerableWorkers)
-
-        // Build a list of Mesos tasks for each slave
-        val mesosTasks = offers.map(o => Collections.emptyList[MesosTaskInfo]())
-        for ((taskList, index) <- taskLists.zipWithIndex) {
-          if (!taskList.isEmpty) {
-            val offerNum = offerableIndices(index)
-            val slaveId = offers(offerNum).getSlaveId.getValue
-            slaveIdsWithExecutors += slaveId
-            mesosTasks(offerNum) = new JArrayList[MesosTaskInfo](taskList.size)
-            for (taskDesc <- taskList) {
-              taskIdToSlaveId(taskDesc.taskId) = slaveId
-              mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId))
-            }
-          }
-        }
-
-        // Reply to the offers
-        val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?
-        for (i <- 0 until offers.size) {
-          d.launchTasks(offers(i).getId, mesosTasks(i), filters)
-        }
-      }
-    } finally {
-      restoreClassLoader(oldClassLoader)
-    }
-  }
-
-  /** Helper function to pull out a resource from a Mesos Resources protobuf */
-  def getResource(res: JList[Resource], name: String): Double = {
-    for (r <- res if r.getName == name) {
-      return r.getScalar.getValue
-    }
-    // If we reached here, no resource with the required name was present
-    throw new IllegalArgumentException("No resource called " + name + " in " + res)
-  }
-
-  /** Turn a Spark TaskDescription into a Mesos task */
-  def createMesosTask(task: TaskDescription, slaveId: String): MesosTaskInfo = {
-    val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build()
-    val cpuResource = Resource.newBuilder()
-      .setName("cpus")
-      .setType(Value.Type.SCALAR)
-      .setScalar(Value.Scalar.newBuilder().setValue(1).build())
-      .build()
-    MesosTaskInfo.newBuilder()
-      .setTaskId(taskId)
-      .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
-      .setExecutor(createExecutorInfo(slaveId))
-      .setName(task.name)
-      .addResources(cpuResource)
-      .setData(ByteString.copyFrom(task.serializedTask))
-      .build()
-  }
-
-  /** Check whether a Mesos task state represents a finished task */
-  def isFinished(state: MesosTaskState) = {
-    state == MesosTaskState.TASK_FINISHED ||
-      state == MesosTaskState.TASK_FAILED ||
-      state == MesosTaskState.TASK_KILLED ||
-      state == MesosTaskState.TASK_LOST
-  }
-
-  override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
-    val oldClassLoader = setClassLoader()
-    try {
-      val tid = status.getTaskId.getValue.toLong
-      val state = TaskState.fromMesos(status.getState)
-      synchronized {
-        if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) {
-          // We lost the executor on this slave, so remember that it's gone
-          slaveIdsWithExecutors -= taskIdToSlaveId(tid)
-        }
-        if (isFinished(status.getState)) {
-          taskIdToSlaveId.remove(tid)
-        }
-      }
-      scheduler.statusUpdate(tid, state, status.getData.asReadOnlyByteBuffer)
-    } finally {
-      restoreClassLoader(oldClassLoader)
-    }
-  }
-
-  override def error(d: SchedulerDriver, message: String) {
-    val oldClassLoader = setClassLoader()
-    try {
-      logError("Mesos error: " + message)
-      scheduler.error(message)
-    } finally {
-      restoreClassLoader(oldClassLoader)
-    }
-  }
-
-  override def stop() {
-    if (driver != null) {
-      driver.stop()
-    }
-  }
-
-  override def reviveOffers() {
-    driver.reviveOffers()
-  }
-
-  override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
-
-  private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) {
-    val oldClassLoader = setClassLoader()
-    try {
-      logInfo("Mesos slave lost: " + slaveId.getValue)
-      synchronized {
-        slaveIdsWithExecutors -= slaveId.getValue
-      }
-      scheduler.executorLost(slaveId.getValue, reason)
-    } finally {
-      restoreClassLoader(oldClassLoader)
-    }
-  }
-
-  override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
-    recordSlaveLost(d, slaveId, SlaveLost())
-  }
-
-  override def executorLost(d: SchedulerDriver, executorId: ExecutorID,
-                            slaveId: SlaveID, status: Int) {
-    logInfo("Executor lost: %s, marking slave %s as lost".format(executorId.getValue,
-                                                                 slaveId.getValue))
-    recordSlaveLost(d, slaveId, ExecutorExited(status))
-  }
-
-  // TODO: query Mesos for number of cores
-  override def defaultParallelism() = sc.conf.getInt("spark.default.parallelism", 8)
-}
-- 
1.8.3.4 (Apple Git-47)