TAGS :Viewed: 2 - Published at: a few seconds ago

[ PySpark Broadcast Variable Join ]

I am performing a join, and I have my data across over 100 nodes. So I have a small list of key/value that I am joining with another key/value pair.

My list looks like such:

[[1, 0], [2, 0], [3, 0], [4, 0], [5, 0], [6, 0], [7, 0], [8, 0], [9, 0], [10, 0], [11, 0], [16, 0], [18, 0], [19, 0], [20, 0], [21, 0], [22, 0], [23, 0], [24, 0], [25, 0], [26, 0], [27, 0], [28, 0], [29, 0], [36, 0], [37, 0], [38, 0], [39, 0], [40, 0], [41, 0], [42, 0], [44, 0], [46, 0]]

I have broadcast variable:

numB = sc.broadcast(numValuesKV)

When I do my join:

numRDD = columnRDD.join(numB.value)

I get the following error:

AttributeError: 'list' object has no attribute 'map'

Answer 1


can you try making numValuesKV a dictionary and see if it works.

Answer 2


rdd.join(other) is mean to join two RDDs, therefore it expects other to be an RDD. To use the efficient 'small table broadcast' join trick, you need to do the join 'by hand'. In Scala, it would look like this:

rdd.mapPartitions{iter =>
    val valueMap = numB.value.toMap
    iter.map{case (k,v) => (k,(v,map(v))}
}

This applies the join using the broadcast value to each partition of the RDD in a distributed manner.

PySpark code should be pretty similar.

Answer 3


you are broadcasting a list, which is absolutely fine.

what you need to do is

b=sc.broadcast(lst)
rdd.map(lambda t: t if t[0] in b.value)

here t[0] should look like [1,0] etc. But I hope you got the idea....