From c321a448cd80331df07cda0f5f20955b3b148aac Mon Sep 17 00:00:00 2001
From: William Benton <willb@redhat.com>
Date: Thu, 27 Feb 2014 17:05:12 -0600
Subject: [PATCH 7/7] Removed mesos
---
.../main/scala/org/apache/spark/SparkContext.scala | 15 -
.../main/scala/org/apache/spark/TaskState.scala | 21 --
.../spark/executor/MesosExecutorBackend.scala | 104 -------
.../mesos/CoarseMesosSchedulerBackend.scala | 289 -----------------
.../cluster/mesos/MesosSchedulerBackend.scala | 344 ---------------------
5 files changed, 773 deletions(-)
delete mode 100644 core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
delete mode 100644 core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
delete mode 100644 core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 566472e..f3b2941 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -36,7 +36,6 @@ import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, Sequence
TextInputFormat}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
-import org.apache.mesos.MesosNativeLibrary
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
@@ -44,7 +43,6 @@ import org.apache.spark.rdd._
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend,
SparkDeploySchedulerBackend, SimrSchedulerBackend}
-import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import org.apache.spark.scheduler.local.LocalBackend
import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
import org.apache.spark.ui.SparkUI
@@ -1281,19 +1279,6 @@ object SparkContext {
scheduler.initialize(backend)
scheduler
- case mesosUrl @ MESOS_REGEX(_) =>
- MesosNativeLibrary.load()
- val scheduler = new TaskSchedulerImpl(sc)
- val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", false)
- val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs
- val backend = if (coarseGrained) {
- new CoarseMesosSchedulerBackend(scheduler, sc, url, appName)
- } else {
- new MesosSchedulerBackend(scheduler, sc, url, appName)
- }
- scheduler.initialize(backend)
- scheduler
-
case SIMR_REGEX(simrUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val backend = new SimrSchedulerBackend(scheduler, sc, simrUrl)
diff --git a/core/src/main/scala/org/apache/spark/TaskState.scala b/core/src/main/scala/org/apache/spark/TaskState.scala
index 0bf1e4a..cdd8baf 100644
--- a/core/src/main/scala/org/apache/spark/TaskState.scala
+++ b/core/src/main/scala/org/apache/spark/TaskState.scala
@@ -17,8 +17,6 @@
package org.apache.spark
-import org.apache.mesos.Protos.{TaskState => MesosTaskState}
-
private[spark] object TaskState extends Enumeration {
val LAUNCHING, RUNNING, FINISHED, FAILED, KILLED, LOST = Value
@@ -28,23 +26,4 @@ private[spark] object TaskState extends Enumeration {
type TaskState = Value
def isFinished(state: TaskState) = FINISHED_STATES.contains(state)
-
- def toMesos(state: TaskState): MesosTaskState = state match {
- case LAUNCHING => MesosTaskState.TASK_STARTING
- case RUNNING => MesosTaskState.TASK_RUNNING
- case FINISHED => MesosTaskState.TASK_FINISHED
- case FAILED => MesosTaskState.TASK_FAILED
- case KILLED => MesosTaskState.TASK_KILLED
- case LOST => MesosTaskState.TASK_LOST
- }
-
- def fromMesos(mesosState: MesosTaskState): TaskState = mesosState match {
- case MesosTaskState.TASK_STAGING => LAUNCHING
- case MesosTaskState.TASK_STARTING => LAUNCHING
- case MesosTaskState.TASK_RUNNING => RUNNING
- case MesosTaskState.TASK_FINISHED => FINISHED
- case MesosTaskState.TASK_FAILED => FAILED
- case MesosTaskState.TASK_KILLED => KILLED
- case MesosTaskState.TASK_LOST => LOST
- }
}
diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
deleted file mode 100644
index b56d8c9..0000000
--- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.executor
-
-import java.nio.ByteBuffer
-
-import com.google.protobuf.ByteString
-
-import org.apache.mesos.{Executor => MesosExecutor, MesosExecutorDriver, MesosNativeLibrary, ExecutorDriver}
-import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _}
-
-import org.apache.spark.Logging
-import org.apache.spark.TaskState
-import org.apache.spark.TaskState.TaskState
-import org.apache.spark.util.Utils
-
-
-private[spark] class MesosExecutorBackend
- extends MesosExecutor
- with ExecutorBackend
- with Logging {
-
- var executor: Executor = null
- var driver: ExecutorDriver = null
-
- override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
- val mesosTaskId = TaskID.newBuilder().setValue(taskId.toString).build()
- driver.sendStatusUpdate(MesosTaskStatus.newBuilder()
- .setTaskId(mesosTaskId)
- .setState(TaskState.toMesos(state))
- .setData(ByteString.copyFrom(data))
- .build())
- }
-
- override def registered(
- driver: ExecutorDriver,
- executorInfo: ExecutorInfo,
- frameworkInfo: FrameworkInfo,
- slaveInfo: SlaveInfo) {
- logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue)
- this.driver = driver
- val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray)
- executor = new Executor(
- executorInfo.getExecutorId.getValue,
- slaveInfo.getHostname,
- properties)
- }
-
- override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) {
- val taskId = taskInfo.getTaskId.getValue.toLong
- if (executor == null) {
- logError("Received launchTask but executor was null")
- } else {
- executor.launchTask(this, taskId, taskInfo.getData.asReadOnlyByteBuffer)
- }
- }
-
- override def error(d: ExecutorDriver, message: String) {
- logError("Error from Mesos: " + message)
- }
-
- override def killTask(d: ExecutorDriver, t: TaskID) {
- if (executor == null) {
- logError("Received KillTask but executor was null")
- } else {
- executor.killTask(t.getValue.toLong)
- }
- }
-
- override def reregistered(d: ExecutorDriver, p2: SlaveInfo) {}
-
- override def disconnected(d: ExecutorDriver) {}
-
- override def frameworkMessage(d: ExecutorDriver, data: Array[Byte]) {}
-
- override def shutdown(d: ExecutorDriver) {}
-}
-
-/**
- * Entry point for Mesos executor.
- */
-private[spark] object MesosExecutorBackend {
- def main(args: Array[String]) {
- MesosNativeLibrary.load()
- // Create a new Executor and start it running
- val runner = new MesosExecutorBackend()
- new MesosExecutorDriver(runner).run()
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
deleted file mode 100644
index c27049b..0000000
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ /dev/null
@@ -1,289 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import java.io.File
-import java.util.{ArrayList => JArrayList, List => JList}
-import java.util.Collections
-
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import scala.collection.JavaConversions._
-
-import com.google.protobuf.ByteString
-import org.apache.mesos.{Scheduler => MScheduler}
-import org.apache.mesos._
-import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
-
-import org.apache.spark.{SparkException, Logging, SparkContext, TaskState}
-import org.apache.spark.scheduler.TaskSchedulerImpl
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-
-/**
- * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
- * onto each Mesos node for the duration of the Spark job instead of relinquishing cores whenever
- * a task is done. It launches Spark tasks within the coarse-grained Mesos tasks using the
- * CoarseGrainedSchedulerBackend mechanism. This class is useful for lower and more predictable
- * latency.
- *
- * Unfortunately this has a bit of duplication from MesosSchedulerBackend, but it seems hard to
- * remove this.
- */
-private[spark] class CoarseMesosSchedulerBackend(
- scheduler: TaskSchedulerImpl,
- sc: SparkContext,
- master: String,
- appName: String)
- extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
- with MScheduler
- with Logging {
-
- val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures
-
- // Lock used to wait for scheduler to be registered
- var isRegistered = false
- val registeredLock = new Object()
-
- // Driver for talking to Mesos
- var driver: SchedulerDriver = null
-
- // Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
- val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt
-
- // Cores we have acquired with each Mesos task ID
- val coresByTaskId = new HashMap[Int, Int]
- var totalCoresAcquired = 0
-
- val slaveIdsWithExecutors = new HashSet[String]
-
- val taskIdToSlaveId = new HashMap[Int, String]
- val failuresBySlaveId = new HashMap[String, Int] // How many times tasks on each slave failed
-
- val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException(
- "Spark home is not set; set it through the spark.home system " +
- "property, the SPARK_HOME environment variable or the SparkContext constructor"))
-
- val extraCoresPerSlave = conf.getInt("spark.mesos.extra.cores", 0)
-
- var nextMesosTaskId = 0
-
- def newMesosTaskId(): Int = {
- val id = nextMesosTaskId
- nextMesosTaskId += 1
- id
- }
-
- override def start() {
- super.start()
-
- synchronized {
- new Thread("CoarseMesosSchedulerBackend driver") {
- setDaemon(true)
- override def run() {
- val scheduler = CoarseMesosSchedulerBackend.this
- val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(appName).build()
- driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
- try { {
- val ret = driver.run()
- logInfo("driver.run() returned with code " + ret)
- }
- } catch {
- case e: Exception => logError("driver.run() failed", e)
- }
- }
- }.start()
-
- waitForRegister()
- }
- }
-
- def createCommand(offer: Offer, numCores: Int): CommandInfo = {
- val environment = Environment.newBuilder()
- sc.executorEnvs.foreach { case (key, value) =>
- environment.addVariables(Environment.Variable.newBuilder()
- .setName(key)
- .setValue(value)
- .build())
- }
- val command = CommandInfo.newBuilder()
- .setEnvironment(environment)
- val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
- conf.get("spark.driver.host"),
- conf.get("spark.driver.port"),
- CoarseGrainedSchedulerBackend.ACTOR_NAME)
- val uri = conf.get("spark.executor.uri", null)
- if (uri == null) {
- val runScript = new File(sparkHome, "./bin/spark-class").getCanonicalPath
- command.setValue(
- "\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d".format(
- runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
- } else {
- // Grab everything to the first '.'. We'll use that and '*' to
- // glob the directory "correctly".
- val basename = uri.split('/').last.split('.').head
- command.setValue(
- "cd %s*; ./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d"
- .format(basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
- command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
- }
- command.build()
- }
-
- override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
-
- override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
- logInfo("Registered as framework ID " + frameworkId.getValue)
- registeredLock.synchronized {
- isRegistered = true
- registeredLock.notifyAll()
- }
- }
-
- def waitForRegister() {
- registeredLock.synchronized {
- while (!isRegistered) {
- registeredLock.wait()
- }
- }
- }
-
- override def disconnected(d: SchedulerDriver) {}
-
- override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {}
-
- /**
- * Method called by Mesos to offer resources on slaves. We respond by launching an executor,
- * unless we've already launched more than we wanted to.
- */
- override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
- synchronized {
- val filters = Filters.newBuilder().setRefuseSeconds(-1).build()
-
- for (offer <- offers) {
- val slaveId = offer.getSlaveId.toString
- val mem = getResource(offer.getResourcesList, "mem")
- val cpus = getResource(offer.getResourcesList, "cpus").toInt
- if (totalCoresAcquired < maxCores && mem >= sc.executorMemory && cpus >= 1 &&
- failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
- !slaveIdsWithExecutors.contains(slaveId)) {
- // Launch an executor on the slave
- val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
- totalCoresAcquired += cpusToUse
- val taskId = newMesosTaskId()
- taskIdToSlaveId(taskId) = slaveId
- slaveIdsWithExecutors += slaveId
- coresByTaskId(taskId) = cpusToUse
- val task = MesosTaskInfo.newBuilder()
- .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
- .setSlaveId(offer.getSlaveId)
- .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave))
- .setName("Task " + taskId)
- .addResources(createResource("cpus", cpusToUse))
- .addResources(createResource("mem", sc.executorMemory))
- .build()
- d.launchTasks(offer.getId, Collections.singletonList(task), filters)
- } else {
- // Filter it out
- d.launchTasks(offer.getId, Collections.emptyList[MesosTaskInfo](), filters)
- }
- }
- }
- }
-
- /** Helper function to pull out a resource from a Mesos Resources protobuf */
- private def getResource(res: JList[Resource], name: String): Double = {
- for (r <- res if r.getName == name) {
- return r.getScalar.getValue
- }
- // If we reached here, no resource with the required name was present
- throw new IllegalArgumentException("No resource called " + name + " in " + res)
- }
-
- /** Build a Mesos resource protobuf object */
- private def createResource(resourceName: String, quantity: Double): Protos.Resource = {
- Resource.newBuilder()
- .setName(resourceName)
- .setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(quantity).build())
- .build()
- }
-
- /** Check whether a Mesos task state represents a finished task */
- private def isFinished(state: MesosTaskState) = {
- state == MesosTaskState.TASK_FINISHED ||
- state == MesosTaskState.TASK_FAILED ||
- state == MesosTaskState.TASK_KILLED ||
- state == MesosTaskState.TASK_LOST
- }
-
- override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
- val taskId = status.getTaskId.getValue.toInt
- val state = status.getState
- logInfo("Mesos task " + taskId + " is now " + state)
- synchronized {
- if (isFinished(state)) {
- val slaveId = taskIdToSlaveId(taskId)
- slaveIdsWithExecutors -= slaveId
- taskIdToSlaveId -= taskId
- // Remove the cores we have remembered for this task, if it's in the hashmap
- for (cores <- coresByTaskId.get(taskId)) {
- totalCoresAcquired -= cores
- coresByTaskId -= taskId
- }
- // If it was a failure, mark the slave as failed for blacklisting purposes
- if (state == MesosTaskState.TASK_FAILED || state == MesosTaskState.TASK_LOST) {
- failuresBySlaveId(slaveId) = failuresBySlaveId.getOrElse(slaveId, 0) + 1
- if (failuresBySlaveId(slaveId) >= MAX_SLAVE_FAILURES) {
- logInfo("Blacklisting Mesos slave " + slaveId + " due to too many failures; " +
- "is Spark installed on it?")
- }
- }
- driver.reviveOffers() // In case we'd rejected everything before but have now lost a node
- }
- }
- }
-
- override def error(d: SchedulerDriver, message: String) {
- logError("Mesos error: " + message)
- scheduler.error(message)
- }
-
- override def stop() {
- super.stop()
- if (driver != null) {
- driver.stop()
- }
- }
-
- override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
-
- override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
- logInfo("Mesos slave lost: " + slaveId.getValue)
- synchronized {
- if (slaveIdsWithExecutors.contains(slaveId.getValue)) {
- // Note that the slave ID corresponds to the executor ID on that slave
- slaveIdsWithExecutors -= slaveId.getValue
- removeExecutor(slaveId.getValue, "Mesos slave lost")
- }
- }
- }
-
- override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) {
- logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue))
- slaveLost(d, s)
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
deleted file mode 100644
index 4978148..0000000
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ /dev/null
@@ -1,344 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import java.io.File
-import java.util.{ArrayList => JArrayList, List => JList}
-import java.util.Collections
-
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import scala.collection.JavaConversions._
-
-import com.google.protobuf.ByteString
-import org.apache.mesos.{Scheduler => MScheduler}
-import org.apache.mesos._
-import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
-
-import org.apache.spark.{Logging, SparkException, SparkContext, TaskState}
-import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SchedulerBackend, SlaveLost,
- TaskDescription, TaskSchedulerImpl, WorkerOffer}
-import org.apache.spark.util.Utils
-
-/**
- * A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a
- * separate Mesos task, allowing multiple applications to share cluster nodes both in space (tasks
- * from multiple apps can run on different cores) and in time (a core can switch ownership).
- */
-private[spark] class MesosSchedulerBackend(
- scheduler: TaskSchedulerImpl,
- sc: SparkContext,
- master: String,
- appName: String)
- extends SchedulerBackend
- with MScheduler
- with Logging {
-
- // Lock used to wait for scheduler to be registered
- var isRegistered = false
- val registeredLock = new Object()
-
- // Driver for talking to Mesos
- var driver: SchedulerDriver = null
-
- // Which slave IDs we have executors on
- val slaveIdsWithExecutors = new HashSet[String]
- val taskIdToSlaveId = new HashMap[Long, String]
-
- // An ExecutorInfo for our tasks
- var execArgs: Array[Byte] = null
-
- var classLoader: ClassLoader = null
-
- override def start() {
- synchronized {
- classLoader = Thread.currentThread.getContextClassLoader
-
- new Thread("MesosSchedulerBackend driver") {
- setDaemon(true)
- override def run() {
- val scheduler = MesosSchedulerBackend.this
- val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(appName).build()
- driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
- try {
- val ret = driver.run()
- logInfo("driver.run() returned with code " + ret)
- } catch {
- case e: Exception => logError("driver.run() failed", e)
- }
- }
- }.start()
-
- waitForRegister()
- }
- }
-
- def createExecutorInfo(execId: String): ExecutorInfo = {
- val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException(
- "Spark home is not set; set it through the spark.home system " +
- "property, the SPARK_HOME environment variable or the SparkContext constructor"))
- val environment = Environment.newBuilder()
- sc.executorEnvs.foreach { case (key, value) =>
- environment.addVariables(Environment.Variable.newBuilder()
- .setName(key)
- .setValue(value)
- .build())
- }
- val command = CommandInfo.newBuilder()
- .setEnvironment(environment)
- val uri = sc.conf.get("spark.executor.uri", null)
- if (uri == null) {
- command.setValue(new File(sparkHome, "/sbin/spark-executor").getCanonicalPath)
- } else {
- // Grab everything to the first '.'. We'll use that and '*' to
- // glob the directory "correctly".
- val basename = uri.split('/').last.split('.').head
- command.setValue("cd %s*; ./sbin/spark-executor".format(basename))
- command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
- }
- val memory = Resource.newBuilder()
- .setName("mem")
- .setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(sc.executorMemory).build())
- .build()
- ExecutorInfo.newBuilder()
- .setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
- .setCommand(command)
- .setData(ByteString.copyFrom(createExecArg()))
- .addResources(memory)
- .build()
- }
-
- /**
- * Create and serialize the executor argument to pass to Mesos. Our executor arg is an array
- * containing all the spark.* system properties in the form of (String, String) pairs.
- */
- private def createExecArg(): Array[Byte] = {
- if (execArgs == null) {
- val props = new HashMap[String, String]
- val iterator = System.getProperties.entrySet.iterator
- while (iterator.hasNext) {
- val entry = iterator.next
- val (key, value) = (entry.getKey.toString, entry.getValue.toString)
- if (key.startsWith("spark.")) {
- props(key) = value
- }
- }
- // Serialize the map as an array of (String, String) pairs
- execArgs = Utils.serialize(props.toArray)
- }
- execArgs
- }
-
- private def setClassLoader(): ClassLoader = {
- val oldClassLoader = Thread.currentThread.getContextClassLoader
- Thread.currentThread.setContextClassLoader(classLoader)
- oldClassLoader
- }
-
- private def restoreClassLoader(oldClassLoader: ClassLoader) {
- Thread.currentThread.setContextClassLoader(oldClassLoader)
- }
-
- override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
-
- override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
- val oldClassLoader = setClassLoader()
- try {
- logInfo("Registered as framework ID " + frameworkId.getValue)
- registeredLock.synchronized {
- isRegistered = true
- registeredLock.notifyAll()
- }
- } finally {
- restoreClassLoader(oldClassLoader)
- }
- }
-
- def waitForRegister() {
- registeredLock.synchronized {
- while (!isRegistered) {
- registeredLock.wait()
- }
- }
- }
-
- override def disconnected(d: SchedulerDriver) {}
-
- override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {}
-
- /**
- * Method called by Mesos to offer resources on slaves. We resond by asking our active task sets
- * for tasks in order of priority. We fill each node with tasks in a round-robin manner so that
- * tasks are balanced across the cluster.
- */
- override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
- val oldClassLoader = setClassLoader()
- try {
- synchronized {
- // Build a big list of the offerable workers, and remember their indices so that we can
- // figure out which Offer to reply to for each worker
- val offerableIndices = new ArrayBuffer[Int]
- val offerableWorkers = new ArrayBuffer[WorkerOffer]
-
- def enoughMemory(o: Offer) = {
- val mem = getResource(o.getResourcesList, "mem")
- val slaveId = o.getSlaveId.getValue
- mem >= sc.executorMemory || slaveIdsWithExecutors.contains(slaveId)
- }
-
- for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) {
- offerableIndices += index
- offerableWorkers += new WorkerOffer(
- offer.getSlaveId.getValue,
- offer.getHostname,
- getResource(offer.getResourcesList, "cpus").toInt)
- }
-
- // Call into the ClusterScheduler
- val taskLists = scheduler.resourceOffers(offerableWorkers)
-
- // Build a list of Mesos tasks for each slave
- val mesosTasks = offers.map(o => Collections.emptyList[MesosTaskInfo]())
- for ((taskList, index) <- taskLists.zipWithIndex) {
- if (!taskList.isEmpty) {
- val offerNum = offerableIndices(index)
- val slaveId = offers(offerNum).getSlaveId.getValue
- slaveIdsWithExecutors += slaveId
- mesosTasks(offerNum) = new JArrayList[MesosTaskInfo](taskList.size)
- for (taskDesc <- taskList) {
- taskIdToSlaveId(taskDesc.taskId) = slaveId
- mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId))
- }
- }
- }
-
- // Reply to the offers
- val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?
- for (i <- 0 until offers.size) {
- d.launchTasks(offers(i).getId, mesosTasks(i), filters)
- }
- }
- } finally {
- restoreClassLoader(oldClassLoader)
- }
- }
-
- /** Helper function to pull out a resource from a Mesos Resources protobuf */
- def getResource(res: JList[Resource], name: String): Double = {
- for (r <- res if r.getName == name) {
- return r.getScalar.getValue
- }
- // If we reached here, no resource with the required name was present
- throw new IllegalArgumentException("No resource called " + name + " in " + res)
- }
-
- /** Turn a Spark TaskDescription into a Mesos task */
- def createMesosTask(task: TaskDescription, slaveId: String): MesosTaskInfo = {
- val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build()
- val cpuResource = Resource.newBuilder()
- .setName("cpus")
- .setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(1).build())
- .build()
- MesosTaskInfo.newBuilder()
- .setTaskId(taskId)
- .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
- .setExecutor(createExecutorInfo(slaveId))
- .setName(task.name)
- .addResources(cpuResource)
- .setData(ByteString.copyFrom(task.serializedTask))
- .build()
- }
-
- /** Check whether a Mesos task state represents a finished task */
- def isFinished(state: MesosTaskState) = {
- state == MesosTaskState.TASK_FINISHED ||
- state == MesosTaskState.TASK_FAILED ||
- state == MesosTaskState.TASK_KILLED ||
- state == MesosTaskState.TASK_LOST
- }
-
- override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
- val oldClassLoader = setClassLoader()
- try {
- val tid = status.getTaskId.getValue.toLong
- val state = TaskState.fromMesos(status.getState)
- synchronized {
- if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) {
- // We lost the executor on this slave, so remember that it's gone
- slaveIdsWithExecutors -= taskIdToSlaveId(tid)
- }
- if (isFinished(status.getState)) {
- taskIdToSlaveId.remove(tid)
- }
- }
- scheduler.statusUpdate(tid, state, status.getData.asReadOnlyByteBuffer)
- } finally {
- restoreClassLoader(oldClassLoader)
- }
- }
-
- override def error(d: SchedulerDriver, message: String) {
- val oldClassLoader = setClassLoader()
- try {
- logError("Mesos error: " + message)
- scheduler.error(message)
- } finally {
- restoreClassLoader(oldClassLoader)
- }
- }
-
- override def stop() {
- if (driver != null) {
- driver.stop()
- }
- }
-
- override def reviveOffers() {
- driver.reviveOffers()
- }
-
- override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
-
- private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) {
- val oldClassLoader = setClassLoader()
- try {
- logInfo("Mesos slave lost: " + slaveId.getValue)
- synchronized {
- slaveIdsWithExecutors -= slaveId.getValue
- }
- scheduler.executorLost(slaveId.getValue, reason)
- } finally {
- restoreClassLoader(oldClassLoader)
- }
- }
-
- override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
- recordSlaveLost(d, slaveId, SlaveLost())
- }
-
- override def executorLost(d: SchedulerDriver, executorId: ExecutorID,
- slaveId: SlaveID, status: Int) {
- logInfo("Executor lost: %s, marking slave %s as lost".format(executorId.getValue,
- slaveId.getValue))
- recordSlaveLost(d, slaveId, ExecutorExited(status))
- }
-
- // TODO: query Mesos for number of cores
- override def defaultParallelism() = sc.conf.getInt("spark.default.parallelism", 8)
-}
--
1.8.3.4 (Apple Git-47)