From 324784a03fe0c1d8f8a9fd9ecca60b07b4e867d7 Mon Sep 17 00:00:00 2001
From: William Benton <willb@redhat.com>
Date: Thu, 27 Feb 2014 16:50:36 -0600
Subject: [PATCH 6/7] Remove functionality depending on stream-lib.
Most notably, countApproxDistinctByKey
---
.../org/apache/spark/api/java/JavaPairRDD.scala | 36 ----------------
.../org/apache/spark/api/java/JavaRDDLike.scala | 10 -----
.../org/apache/spark/rdd/PairRDDFunctions.scala | 42 ------------------
core/src/main/scala/org/apache/spark/rdd/RDD.scala | 16 +------
.../spark/util/SerializableHyperLogLog.scala | 50 ----------------------
5 files changed, 1 insertion(+), 153 deletions(-)
delete mode 100644 core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index f430a33..348ef04 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -611,42 +611,6 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
*/
def values(): JavaRDD[V] = JavaRDD.fromRDD[V](rdd.map(_._2))
- /**
- * Return approximate number of distinct values for each key in this RDD.
- * The accuracy of approximation can be controlled through the relative standard deviation
- * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
- * more accurate counts but increase the memory footprint and vise versa. Uses the provided
- * Partitioner to partition the output RDD.
- */
- def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaRDD[(K, Long)] = {
- rdd.countApproxDistinctByKey(relativeSD, partitioner)
- }
-
- /**
- * Return approximate number of distinct values for each key this RDD.
- * The accuracy of approximation can be controlled through the relative standard deviation
- * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
- * more accurate counts but increase the memory footprint and vise versa. The default value of
- * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism
- * level.
- */
- def countApproxDistinctByKey(relativeSD: Double = 0.05): JavaRDD[(K, Long)] = {
- rdd.countApproxDistinctByKey(relativeSD)
- }
-
-
- /**
- * Return approximate number of distinct values for each key in this RDD.
- * The accuracy of approximation can be controlled through the relative standard deviation
- * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
- * more accurate counts but increase the memory footprint and vise versa. HashPartitions the
- * output RDD into numPartitions.
- *
- */
- def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaRDD[(K, Long)] = {
- rdd.countApproxDistinctByKey(relativeSD, numPartitions)
- }
-
/** Assign a name to this RDD */
def setName(name: String): JavaPairRDD[K, V] = {
rdd.setName(name)
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index ebbbbd8..98834f7 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -450,15 +450,5 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
takeOrdered(num, comp)
}
- /**
- * Return approximate number of distinct elements in the RDD.
- *
- * The accuracy of approximation can be controlled through the relative standard deviation
- * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
- * more accurate counts but increase the memory footprint and vise versa. The default value of
- * relativeSD is 0.05.
- */
- def countApproxDistinct(relativeSD: Double = 0.05): Long = rdd.countApproxDistinct(relativeSD)
-
def name(): String = rdd.name
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 4148581..93190ed 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -38,8 +38,6 @@ import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob}
import org.apache.hadoop.mapreduce.{RecordWriter => NewRecordWriter}
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
-import com.clearspring.analytics.stream.cardinality.HyperLogLog
-
// SparkHadoopWriter and SparkHadoopMapReduceUtil are actually source files defined in Spark.
import org.apache.hadoop.mapred.SparkHadoopWriter
import org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil
@@ -47,7 +45,6 @@ import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.Partitioner.defaultPartitioner
-import org.apache.spark.util.SerializableHyperLogLog
/**
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
@@ -210,45 +207,6 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
}
/**
- * Return approximate number of distinct values for each key in this RDD.
- * The accuracy of approximation can be controlled through the relative standard deviation
- * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
- * more accurate counts but increase the memory footprint and vise versa. Uses the provided
- * Partitioner to partition the output RDD.
- */
- def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = {
- val createHLL = (v: V) => new SerializableHyperLogLog(new HyperLogLog(relativeSD)).add(v)
- val mergeValueHLL = (hll: SerializableHyperLogLog, v: V) => hll.add(v)
- val mergeHLL = (h1: SerializableHyperLogLog, h2: SerializableHyperLogLog) => h1.merge(h2)
-
- combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.value.cardinality())
- }
-
- /**
- * Return approximate number of distinct values for each key in this RDD.
- * The accuracy of approximation can be controlled through the relative standard deviation
- * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
- * more accurate counts but increase the memory footprint and vise versa. HashPartitions the
- * output RDD into numPartitions.
- *
- */
- def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = {
- countApproxDistinctByKey(relativeSD, new HashPartitioner(numPartitions))
- }
-
- /**
- * Return approximate number of distinct values for each key this RDD.
- * The accuracy of approximation can be controlled through the relative standard deviation
- * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
- * more accurate counts but increase the memory footprint and vise versa. The default value of
- * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism
- * level.
- */
- def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = {
- countApproxDistinctByKey(relativeSD, defaultPartitioner(self))
- }
-
- /**
* Merge the values for each key using an associative reduce function. This will also perform
* the merging locally on each mapper before sending results to a reducer, similarly to a
* "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index cd90a15..1bdb80d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -32,7 +32,6 @@ import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.TextOutputFormat
import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
-import com.clearspring.analytics.stream.cardinality.HyperLogLog
import org.apache.spark.Partitioner._
import org.apache.spark.api.java.JavaRDD
@@ -41,7 +40,7 @@ import org.apache.spark.partial.CountEvaluator
import org.apache.spark.partial.GroupedCountEvaluator
import org.apache.spark.partial.PartialResult
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.{Utils, BoundedPriorityQueue, SerializableHyperLogLog}
+import org.apache.spark.util.{Utils, BoundedPriorityQueue}
import org.apache.spark.SparkContext._
import org.apache.spark._
@@ -798,19 +797,6 @@ abstract class RDD[T: ClassTag](
}
/**
- * Return approximate number of distinct elements in the RDD.
- *
- * The accuracy of approximation can be controlled through the relative standard deviation
- * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
- * more accurate counts but increase the memory footprint and vise versa. The default value of
- * relativeSD is 0.05.
- */
- def countApproxDistinct(relativeSD: Double = 0.05): Long = {
- val zeroCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD))
- aggregate(zeroCounter)(_.add(_), _.merge(_)).value.cardinality()
- }
-
- /**
* Take the first num elements of the RDD. It works by first scanning one partition, and use the
* results from that partition to estimate the number of additional partitions needed to satisfy
* the limit.
diff --git a/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala b/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala
deleted file mode 100644
index 8b4e7c1..0000000
--- a/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import java.io.{Externalizable, ObjectOutput, ObjectInput}
-import com.clearspring.analytics.stream.cardinality.{ICardinality, HyperLogLog}
-
-/**
- * A wrapper around [[com.clearspring.analytics.stream.cardinality.HyperLogLog]] that is serializable.
- */
-private[spark]
-class SerializableHyperLogLog(var value: ICardinality) extends Externalizable {
-
- def this() = this(null) // For deserialization
-
- def merge(other: SerializableHyperLogLog) = new SerializableHyperLogLog(value.merge(other.value))
-
- def add[T](elem: T) = {
- this.value.offer(elem)
- this
- }
-
- def readExternal(in: ObjectInput) {
- val byteLength = in.readInt()
- val bytes = new Array[Byte](byteLength)
- in.readFully(bytes)
- value = HyperLogLog.Builder.build(bytes)
- }
-
- def writeExternal(out: ObjectOutput) {
- val bytes = value.getBytes()
- out.writeInt(bytes.length)
- out.write(bytes)
- }
-}
--
1.8.3.4 (Apple Git-47)