from random import sample
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("probabilistic counts").getOrCreate()
clients_per_day = 1000
def get_n_distinct_integers(n):
return sample(range(n*10), n)
def get_sample_day(day):
return spark.createDataFrame(
map(lambda x: (day, x), get_n_distinct_integers(clients_per_day)),
['day', 'client']
)
def generate_data(days=7):
out = get_sample_day(0)
for day in range(1, days):
out = out.unionAll(get_sample_day(day))
return(out)
dd = generate_data()
from pyspark.sql.functions import approxCountDistinct, col
errors = (
dd
.groupBy('day')
.agg(approxCountDistinct('client').alias('approx_count'))
.orderBy('day')
.withColumn('error', col('approx_count')/clients_per_day-1)
.toPandas()
)
from plotnine import ggplot, geom_line, geom_hline, aes
ggplot(errors, aes('day', 'error')) + geom_line() + geom_hline(yintercept=0, linetype='dotted')
errors