This page contains some statistics functions (mean, stdev, variance, etc.) but it does not contain the median. How can I calculate exact median?
Scala – How to calculate exact median with Apache Spark
apache-sparkhadoopscala
Related Solutions
Ongoing work
SPARK-30569 - Add DSL functions invoking percentile_approx
Spark 2.0+:
You can use approxQuantile
method which implements Greenwald-Khanna algorithm:
Python:
df.approxQuantile("x", [0.5], 0.25)
Scala:
df.stat.approxQuantile("x", Array(0.5), 0.25)
where the last parameter is a relative error. The lower the number the more accurate results and more expensive computation.
Since Spark 2.2 (SPARK-14352) it supports estimation on multiple columns:
df.approxQuantile(["x", "y", "z"], [0.5], 0.25)
and
df.approxQuantile(Array("x", "y", "z"), Array(0.5), 0.25)
Underlying methods can be also used in SQL aggregation (both global and groped) using approx_percentile
function:
> SELECT approx_percentile(10.0, array(0.5, 0.4, 0.1), 100);
[10.0,10.0,10.0]
> SELECT approx_percentile(10.0, 0.5, 100);
10.0
Spark < 2.0
Python
As I've mentioned in the comments it is most likely not worth all the fuss. If data is relatively small like in your case then simply collect and compute median locally:
import numpy as np
np.random.seed(323)
rdd = sc.parallelize(np.random.randint(1000000, size=700000))
%time np.median(rdd.collect())
np.array(rdd.collect()).nbytes
It takes around 0.01 second on my few years old computer and around 5.5MB of memory.
If data is much larger sorting will be a limiting factor so instead of getting an exact value it is probably better to sample, collect, and compute locally. But if you really want a to use Spark something like this should do the trick (if I didn't mess up anything):
from numpy import floor
import time
def quantile(rdd, p, sample=None, seed=None):
"""Compute a quantile of order p ∈ [0, 1]
:rdd a numeric rdd
:p quantile(between 0 and 1)
:sample fraction of and rdd to use. If not provided we use a whole dataset
:seed random number generator seed to be used with sample
"""
assert 0 <= p <= 1
assert sample is None or 0 < sample <= 1
seed = seed if seed is not None else time.time()
rdd = rdd if sample is None else rdd.sample(False, sample, seed)
rddSortedWithIndex = (rdd.
sortBy(lambda x: x).
zipWithIndex().
map(lambda (x, i): (i, x)).
cache())
n = rddSortedWithIndex.count()
h = (n - 1) * p
rddX, rddXPlusOne = (
rddSortedWithIndex.lookup(x)[0]
for x in int(floor(h)) + np.array([0L, 1L]))
return rddX + (h - floor(h)) * (rddXPlusOne - rddX)
And some tests:
np.median(rdd.collect()), quantile(rdd, 0.5)
## (500184.5, 500184.5)
np.percentile(rdd.collect(), 25), quantile(rdd, 0.25)
## (250506.75, 250506.75)
np.percentile(rdd.collect(), 75), quantile(rdd, 0.75)
(750069.25, 750069.25)
Finally lets define median:
from functools import partial
median = partial(quantile, p=0.5)
So far so good but it takes 4.66 s in a local mode without any network communication. There is probably way to improve this, but why even bother?
Language independent (Hive UDAF):
If you use HiveContext
you can also use Hive UDAFs. With integral values:
rdd.map(lambda x: (float(x), )).toDF(["x"]).registerTempTable("df")
sqlContext.sql("SELECT percentile_approx(x, 0.5) FROM df")
With continuous values:
sqlContext.sql("SELECT percentile(x, 0.5) FROM df")
In percentile_approx
you can pass an additional argument which determines a number of records to use.
Here is an example implementation with Dataframe API in Python (Spark 1.6 +).
import pyspark.sql.functions as F
import numpy as np
from pyspark.sql.types import FloatType
Let's assume we have monthly salaries for customers in "salaries" spark dataframe such as:
month | customer_id | salary
and we would like to find the median salary per customer throughout all the months
Step1: Write a user defined function to calculate the median
def find_median(values_list):
try:
median = np.median(values_list) #get the median of values in a list in each row
return round(float(median),2)
except Exception:
return None #if there is anything wrong with the given values
median_finder = F.udf(find_median,FloatType())
Step 2: Aggregate on the salary column by collecting them into a list of salaries in each row:
salaries_list = salaries.groupBy("customer_id").agg(F.collect_list("salary").alias("salaries"))
Step 3: Call the median_finder udf on the salaries column and add the median values as a new column
salaries_list = salaries_list.withColumn("median",median_finder("salaries"))
Best Solution
You need to sort RDD and take element in the middle or average of two elements. Here is example with RDD[Int]: