Blob Blame History Raw
From 324784a03fe0c1d8f8a9fd9ecca60b07b4e867d7 Mon Sep 17 00:00:00 2001
From: William Benton <willb@redhat.com>
Date: Thu, 27 Feb 2014 16:50:36 -0600
Subject: [PATCH 6/7] Remove functionality depending on stream-lib.

Most notably, countApproxDistinctByKey
---
 .../org/apache/spark/api/java/JavaPairRDD.scala    | 36 ----------------
 .../org/apache/spark/api/java/JavaRDDLike.scala    | 10 -----
 .../org/apache/spark/rdd/PairRDDFunctions.scala    | 42 ------------------
 core/src/main/scala/org/apache/spark/rdd/RDD.scala | 16 +------
 .../spark/util/SerializableHyperLogLog.scala       | 50 ----------------------
 5 files changed, 1 insertion(+), 153 deletions(-)
 delete mode 100644 core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala

diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index f430a33..348ef04 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -611,42 +611,6 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
    */
   def values(): JavaRDD[V] = JavaRDD.fromRDD[V](rdd.map(_._2))
 
-  /**
-   * Return approximate number of distinct values for each key in this RDD.
-   * The accuracy of approximation can be controlled through the relative standard deviation
-   * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
-   * more accurate counts but increase the memory footprint and vise versa. Uses the provided
-   * Partitioner to partition the output RDD.
-   */
-  def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaRDD[(K, Long)] = {
-    rdd.countApproxDistinctByKey(relativeSD, partitioner)
-  }
-
-  /**
-   * Return approximate number of distinct values for each key this RDD.
-   * The accuracy of approximation can be controlled through the relative standard deviation
-   * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
-   * more accurate counts but increase the memory footprint and vise versa. The default value of
-   * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism
-   * level.
-   */
-  def countApproxDistinctByKey(relativeSD: Double = 0.05): JavaRDD[(K, Long)] = {
-    rdd.countApproxDistinctByKey(relativeSD)
-  }
-
-
-  /**
-   * Return approximate number of distinct values for each key in this RDD.
-   * The accuracy of approximation can be controlled through the relative standard deviation
-   * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
-   * more accurate counts but increase the memory footprint and vise versa. HashPartitions the
-   * output RDD into numPartitions.
-   *
-   */
-  def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaRDD[(K, Long)] = {
-    rdd.countApproxDistinctByKey(relativeSD, numPartitions)
-  }
-
   /** Assign a name to this RDD */
   def setName(name: String): JavaPairRDD[K, V] = {
     rdd.setName(name)
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index ebbbbd8..98834f7 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -450,15 +450,5 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
     takeOrdered(num, comp)
   }
 
-  /**
-   * Return approximate number of distinct elements in the RDD.
-   *
-   * The accuracy of approximation can be controlled through the relative standard deviation
-   * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
-   * more accurate counts but increase the memory footprint and vise versa. The default value of
-   * relativeSD is 0.05.
-   */
-  def countApproxDistinct(relativeSD: Double = 0.05): Long = rdd.countApproxDistinct(relativeSD)
-
   def name(): String = rdd.name
 }
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 4148581..93190ed 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -38,8 +38,6 @@ import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob}
 import org.apache.hadoop.mapreduce.{RecordWriter => NewRecordWriter}
 import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
 
-import com.clearspring.analytics.stream.cardinality.HyperLogLog
-
 // SparkHadoopWriter and SparkHadoopMapReduceUtil are actually source files defined in Spark.
 import org.apache.hadoop.mapred.SparkHadoopWriter
 import org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil
@@ -47,7 +45,6 @@ import org.apache.spark._
 import org.apache.spark.SparkContext._
 import org.apache.spark.partial.{BoundedDouble, PartialResult}
 import org.apache.spark.Partitioner.defaultPartitioner
-import org.apache.spark.util.SerializableHyperLogLog
 
 /**
  * Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
@@ -210,45 +207,6 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
   }
 
   /**
-   * Return approximate number of distinct values for each key in this RDD.
-   * The accuracy of approximation can be controlled through the relative standard deviation
-   * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
-   * more accurate counts but increase the memory footprint and vise versa. Uses the provided
-   * Partitioner to partition the output RDD.
-   */
-  def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = {
-    val createHLL = (v: V) => new SerializableHyperLogLog(new HyperLogLog(relativeSD)).add(v)
-    val mergeValueHLL = (hll: SerializableHyperLogLog, v: V) => hll.add(v)
-    val mergeHLL = (h1: SerializableHyperLogLog, h2: SerializableHyperLogLog) => h1.merge(h2)
-
-    combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.value.cardinality())
-  }
-
-  /**
-   * Return approximate number of distinct values for each key in this RDD.
-   * The accuracy of approximation can be controlled through the relative standard deviation
-   * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
-   * more accurate counts but increase the memory footprint and vise versa. HashPartitions the
-   * output RDD into numPartitions.
-   *
-   */
-  def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = {
-    countApproxDistinctByKey(relativeSD, new HashPartitioner(numPartitions))
-  }
-
-  /**
-   * Return approximate number of distinct values for each key this RDD.
-   * The accuracy of approximation can be controlled through the relative standard deviation
-   * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
-   * more accurate counts but increase the memory footprint and vise versa. The default value of
-   * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism
-   * level.
-   */
-  def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = {
-    countApproxDistinctByKey(relativeSD, defaultPartitioner(self))
-  }
-
-  /**
    * Merge the values for each key using an associative reduce function. This will also perform
    * the merging locally on each mapper before sending results to a reducer, similarly to a
    * "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index cd90a15..1bdb80d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -32,7 +32,6 @@ import org.apache.hadoop.io.Text
 import org.apache.hadoop.mapred.TextOutputFormat
 
 import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
-import com.clearspring.analytics.stream.cardinality.HyperLogLog
 
 import org.apache.spark.Partitioner._
 import org.apache.spark.api.java.JavaRDD
@@ -41,7 +40,7 @@ import org.apache.spark.partial.CountEvaluator
 import org.apache.spark.partial.GroupedCountEvaluator
 import org.apache.spark.partial.PartialResult
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.{Utils, BoundedPriorityQueue, SerializableHyperLogLog}
+import org.apache.spark.util.{Utils, BoundedPriorityQueue}
 
 import org.apache.spark.SparkContext._
 import org.apache.spark._
@@ -798,19 +797,6 @@ abstract class RDD[T: ClassTag](
   }
 
   /**
-   * Return approximate number of distinct elements in the RDD.
-   *
-   * The accuracy of approximation can be controlled through the relative standard deviation
-   * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
-   * more accurate counts but increase the memory footprint and vise versa. The default value of
-   * relativeSD is 0.05.
-   */
-  def countApproxDistinct(relativeSD: Double = 0.05): Long = {
-    val zeroCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD))
-    aggregate(zeroCounter)(_.add(_), _.merge(_)).value.cardinality()
-  }
-
-  /**
    * Take the first num elements of the RDD. It works by first scanning one partition, and use the
    * results from that partition to estimate the number of additional partitions needed to satisfy
    * the limit.
diff --git a/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala b/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala
deleted file mode 100644
index 8b4e7c1..0000000
--- a/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import java.io.{Externalizable, ObjectOutput, ObjectInput}
-import com.clearspring.analytics.stream.cardinality.{ICardinality, HyperLogLog}
-
-/**
- * A wrapper around [[com.clearspring.analytics.stream.cardinality.HyperLogLog]] that is serializable.
- */
-private[spark]
-class SerializableHyperLogLog(var value: ICardinality) extends Externalizable {
-
-  def this() = this(null)  // For deserialization
-
-  def merge(other: SerializableHyperLogLog) = new SerializableHyperLogLog(value.merge(other.value))
-
-  def add[T](elem: T) = {
-    this.value.offer(elem)
-    this
-  }
-
-  def readExternal(in: ObjectInput) {
-    val byteLength = in.readInt()
-    val bytes = new Array[Byte](byteLength)
-    in.readFully(bytes)
-    value = HyperLogLog.Builder.build(bytes)
-  }
-
-  def writeExternal(out: ObjectOutput) {
-    val bytes = value.getBytes()
-    out.writeInt(bytes.length)
-    out.write(bytes)
-  }
-}
-- 
1.8.3.4 (Apple Git-47)