TAGS :Viewed: 9 - Published at: a few seconds ago

[ PySpark Job throwing IOError ]

I have just pasted the long stacktrace for clarity in case someone has seen this error previously. I am trying to write a simple KNN job using pyspark on a hdfs cluster. I am using very few input files to perform the job so I don't think it's a memory (space). I do not do a broadcast in any part of my code. So it is surprising to me when the broadcast.py fails? I however do have python dictionaries that I have in shared memory without explicitly doing a broadcast.

Can anyone help me understand what is going on?

I have also pasted my entire code below (above the stacktrace).

from pyspark.mllib.linalg import SparseVector
from pyspark import SparkContext
import glob
import sys
import time
import subprocess
from itertools import combinations
"""We create user and item indices starting from 0 to #users and 0 to #items respectively. This is done to store them in sparseVectors as dicts."""
def create_indices(inputdir):
    items=dict()
    user_id_to_idx=dict()
    user_idx_to_id=dict()
    item_idx_to_id=dict()
    item_id_to_idx=dict()
    item_idx=0
    user_idx=0

    cat=subprocess.Popen(["hadoop","fs","-cat","/user/hadoop/"+inputdir+"/*.txt"],stdout=subprocess.PIPE)
    for line in cat.stdout:
        toks=map(str,line.strip().split("\t"))
        try:
            user_id_to_idx[toks[1].strip()]
        except KeyError:
            if toks[1].strip()!=None:
                user_id_to_idx[toks[1].strip()]=user_idx
                user_idx_to_id[user_idx]=toks[1].strip()
                user_idx+=1
        try:
            item_id_to_idx[toks[0].strip()]
        except KeyError:
            if toks[0].strip()!=None:
                item_id_to_idx[toks[0].strip()]=item_idx
                item_idx_to_id[item_idx]=toks[0].strip()
                item_idx+=1
    return user_idx_to_id,user_id_to_idx,item_idx_to_id,item_id_to_idx,user_idx,item_idx

def concat_helper(a,b):
    if(a!= None and b!=None):
        print a,b,a.update(b)
        temp=dict()
        temp.update(a)
        temp.update(b)
        return temp
    elif a!=None:
        return a
    elif b!=None:
        return b

# pass in the hdfs path to the input files and the spark context.
def runKNN(inputdir,sc,user_id_to_idx,item_id_to_idx):
    rdd_text=sc.textFile(inputdir)
    try:
        new_rdd = rdd_text.map(lambda x: (item_id_to_idx[str(x.strip().split("\t")[0])],{user_id_to_idx[str(x.strip().split("\t")[1])]:1})).reduceByKey(lambda x,y: concat_helper(x,y)).sortByKey()
    except KeyError:
        print item_id_to_idx.keys()
        pass
    return new_rdd

if __name__=="__main__":
    sc = SparkContext()
    u_idx_to_id,u_id_to_idx,i_idx_to_id,i_id_to_idx,u_idx,i_idx=create_indices(sys.argv[1])

    u_idx_to_id_b=sc.broadcast(u_idx_to_id)
    u_id_to_idx_b=sc.broadcast(u_id_to_idx)
    i_idx_to_idx_b=sc.broadcast(i_idx_to_id)
    i_id_to_idx_b=sc.broadcast(i_id_to_idx)
    num_users=sc.broadcast(u_idx)
    num_items=sc.broadcast(i_idx)
    item_dict_rdd=runKNN(sys.argv[1],sc,u_id_to_idx,i_id_to_idx)

    item_dict_rdd_new=item_dict_rdd.map(lambda x: (x[0],SparseVector(i_idx,x[1])))
    item_dict_rdd_new.saveAsTextFile("hdfs://output_path")
    #dot_products_rdd=map(lambda (x,y): (x,y),combinations(item_dict_rdd_new.map(lambda x: x),2))
    dot_products_rdd.saveAsTextFile("hdfs://output_path_2")

Stack Trace:

15/05/19 13:44:11 WARN spark.SparkConf: Setting 'spark.executor.extraClassPath' to '/home/hadoop/spark/conf:/home/hadoop/conf:/home/hadoop/spark/classpath/emr/*:/home/hadoop/spark/classpath/emrfs/*:/home/hadoop/share/hadoop/common/lib/*:/home/hadoop/share/hadoop/common/lib/hadoop-lzo.jar' as a work-around.
15/05/19 13:44:11 WARN spark.SparkConf: Setting 'spark.driver.extraClassPath' to '/home/hadoop/spark/conf:/home/hadoop/conf:/home/hadoop/spark/classpath/emr/*:/home/hadoop/spark/classpath/emrfs/*:/home/hadoop/share/hadoop/common/lib/*:/home/hadoop/share/hadoop/common/lib/hadoop-lzo.jar' as a work-around.
15/05/19 13:44:11 INFO spark.SecurityManager: Changing view acls to: hadoop
15/05/19 13:44:11 INFO spark.SecurityManager: Changing modify acls to: hadoop
15/05/19 13:44:11 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/05/19 13:44:12 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/05/19 13:44:12 INFO Remoting: Starting remoting
15/05/19 13:44:12 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@ip-.ec2.internal:38952]
15/05/19 13:44:12 INFO util.Utils: Successfully started service 'sparkDriver' on port 38952.
15/05/19 13:44:12 INFO spark.SparkEnv: Registering MapOutputTracker
15/05/19 13:44:12 INFO spark.SparkEnv: Registering BlockManagerMaster
15/05/19 13:44:12 INFO storage.DiskBlockManager: Created local directory at /mnt/spark/spark-7898e98c-1685-450b-a47a-2fbede361cf3/blockmgr-c3ce83af-b195-4ec9-8e1f-8d71ee0589b1
15/05/19 13:44:12 INFO storage.MemoryStore: MemoryStore started with capacity 265.4 MB
15/05/19 13:44:12 INFO spark.HttpFileServer: HTTP File server directory is /mnt/spark/spark-94ffb583-1f85-4505-8d4d-1f104d612be6/httpd-8e8e619f-c4e6-4813-ac0d-b8793e7ff84d
15/05/19 13:44:12 INFO spark.HttpServer: Starting HTTP Server
15/05/19 13:44:12 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/05/19 13:44:12 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:41841
15/05/19 13:44:12 INFO util.Utils: Successfully started service 'HTTP file server' on port 41841.
15/05/19 13:44:12 INFO spark.SparkEnv: Registering OutputCommitCoordinator
15/05/19 13:44:12 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/05/19 13:44:12 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
15/05/19 13:44:12 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
15/05/19 13:44:12 INFO ui.SparkUI: Started SparkUI at http://ip.ec2.internal:4040
15/05/19 13:44:13 INFO util.Utils: Copying /mnt/user/spark_practice/knn.py to /mnt/spark/spark-afbde2d3-d58a-468f-b84f-c131ecd708cd/userFiles-6918221d-be13-4ce6-adbc-a4fcbd787996/knn.py
15/05/19 13:44:13 INFO spark.SparkContext: Added file file:/mnt/user/spark_practice/knn.py at file:/mnt/user/spark_practice/knn.py with timestamp 1432043053065
15/05/19 13:44:13 INFO executor.Executor: Starting executor ID <driver> on host localhost
15/05/19 13:44:13 INFO util.AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@ip-.ec2.internal:38952/user/HeartbeatReceiver
15/05/19 13:44:13 INFO netty.NettyBlockTransferService: Server created on 44689
15/05/19 13:44:13 INFO storage.BlockManagerMaster: Trying to register BlockManager
15/05/19 13:44:13 INFO storage.BlockManagerMasterActor: Registering block manager localhost:44689 with 265.4 MB RAM, BlockManagerId(<driver>, localhost, 44689)
15/05/19 13:44:13 INFO storage.BlockManagerMaster: Registered BlockManager
7412 ==================================================
55134 ==================================================
15/05/19 13:44:16 INFO storage.MemoryStore: ensureFreeSpace(253503) called with curMem=0, maxMem=278302556
15/05/19 13:44:16 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 247.6 KB, free 265.2 MB)
15/05/19 13:44:16 INFO storage.MemoryStore: ensureFreeSpace(19226) called with curMem=253503, maxMem=278302556
15/05/19 13:44:16 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 18.8 KB, free 265.1 MB)
15/05/19 13:44:16 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:44689 (size: 18.8 KB, free: 265.4 MB)
15/05/19 13:44:16 INFO storage.BlockManagerMaster: Updated info of block broadcast_0_piece0
15/05/19 13:44:16 INFO spark.SparkContext: Created broadcast 0 from textFile at NativeMethodAccessorImpl.java:-2
15/05/19 13:44:17 INFO storage.MemoryStore: ensureFreeSpace(296) called with curMem=272729, maxMem=278302556
15/05/19 13:44:17 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 296.0 B, free 265.1 MB)
15/05/19 13:44:17 INFO storage.MemoryStore: ensureFreeSpace(2504167) called with curMem=273025, maxMem=278302556
15/05/19 13:44:17 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.4 MB, free 262.8 MB)
15/05/19 13:44:17 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:44689 (size: 2.4 MB, free: 263.0 MB)
15/05/19 13:44:17 INFO storage.BlockManagerMaster: Updated info of block broadcast_1_piece0
15/05/19 13:44:17 INFO spark.SparkContext: Created broadcast 1 from broadcast at PythonRDD.scala:399
15/05/19 13:44:17 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library from the embedded binaries
15/05/19 13:44:17 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 77cfa96225d62546008ca339b7c2076a3da91578]
15/05/19 13:44:17 INFO mapred.FileInputFormat: Total input paths to process : 5
15/05/19 13:44:18 INFO storage.MemoryStore: ensureFreeSpace(296) called with curMem=2777192, maxMem=278302556
15/05/19 13:44:18 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 296.0 B, free 262.8 MB)
15/05/19 13:44:18 INFO storage.MemoryStore: ensureFreeSpace(2506273) called with curMem=2777488, maxMem=278302556
15/05/19 13:44:18 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 2.4 MB, free 260.4 MB)
15/05/19 13:44:18 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:44689 (size: 2.4 MB, free: 260.6 MB)
15/05/19 13:44:18 INFO storage.BlockManagerMaster: Updated info of block broadcast_2_piece0
15/05/19 13:44:18 INFO spark.SparkContext: Created broadcast 2 from reduceByKey at /mnt/user/spark_practice/knn.py:59
15/05/19 13:44:18 INFO spark.SparkContext: Starting job: sortByKey at /mnt/user/spark_practice/knn.py:59
15/05/19 13:44:18 INFO scheduler.DAGScheduler: Registering RDD 4 (reduceByKey at /mnt/user/spark_practice/knn.py:59)
15/05/19 13:44:18 INFO scheduler.DAGScheduler: Got job 0 (sortByKey at /mnt/user/spark_practice/knn.py:59) with 5 output partitions (allowLocal=false)
15/05/19 13:44:18 INFO scheduler.DAGScheduler: Final stage: Stage 1(sortByKey at /mnt/user/spark_practice/knn.py:59)
15/05/19 13:44:18 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 0)
15/05/19 13:44:18 INFO scheduler.DAGScheduler: Missing parents: List(Stage 0)
15/05/19 13:44:18 INFO scheduler.DAGScheduler: Submitting Stage 0 (PairwiseRDD[4] at reduceByKey at /mnt/user/spark_practice/knn.py:59), which has no missing parents
15/05/19 13:44:18 INFO storage.MemoryStore: ensureFreeSpace(5288) called with curMem=5283761, maxMem=278302556
15/05/19 13:44:18 INFO storage.MemoryStore: Block broadcast_3 stored as values in memory (estimated size 5.2 KB, free 260.4 MB)
15/05/19 13:44:18 INFO storage.MemoryStore: ensureFreeSpace(3048) called with curMem=5289049, maxMem=278302556
15/05/19 13:44:18 INFO storage.MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 3.0 KB, free 260.4 MB)
15/05/19 13:44:18 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:44689 (size: 3.0 KB, free: 260.6 MB)
15/05/19 13:44:18 INFO storage.BlockManagerMaster: Updated info of block broadcast_3_piece0
15/05/19 13:44:18 INFO spark.SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:839
15/05/19 13:44:18 INFO scheduler.DAGScheduler: Submitting 5 missing tasks from Stage 0 (PairwiseRDD[4] at reduceByKey at /mnt/user/spark_practice/knn.py:59)
15/05/19 13:44:18 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 5 tasks
15/05/19 13:44:18 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, ANY, 1407 bytes)
15/05/19 13:44:18 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, ANY, 1407 bytes)
15/05/19 13:44:18 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, localhost, ANY, 1407 bytes)
15/05/19 13:44:18 INFO scheduler.TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, localhost, ANY, 1407 bytes)
15/05/19 13:44:18 INFO scheduler.TaskSetManager: Starting task 4.0 in stage 0.0 (TID 4, localhost, ANY, 1407 bytes)
15/05/19 13:44:18 INFO executor.Executor: Running task 2.0 in stage 0.0 (TID 2)
15/05/19 13:44:18 INFO executor.Executor: Running task 1.0 in stage 0.0 (TID 1)
15/05/19 13:44:18 INFO executor.Executor: Running task 4.0 in stage 0.0 (TID 4)
15/05/19 13:44:18 INFO executor.Executor: Running task 3.0 in stage 0.0 (TID 3)
15/05/19 13:44:18 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 0)
15/05/19 13:44:18 INFO executor.Executor: Fetching file:/mnt/user/spark_practice/knn.py with timestamp 1432043053065
15/05/19 13:44:18 INFO util.Utils: /mnt/user/spark_practice/knn.py has been previously copied to /mnt/spark/spark-afbde2d3-d58a-468f-b84f-c131ecd708cd/userFiles-6918221d-be13-4ce6-adbc-a4fcbd787996/knn.py
15/05/19 13:44:18 INFO rdd.HadoopRDD: Input split: hdfs://10.64.10.43:9000/user/hadoop/user/practice/knn/inputfile
15/05/19 13:44:18 INFO rdd.HadoopRDD: Input split: hdfs://10.64.10.43:9000/user/hadoop/user/practice/knn/inputfile2
15/05/19 13:44:18 INFO rdd.HadoopRDD: Input split: hdfs://10.64.10.43:9000/user/hadoop/user/practice/knn/inputfile3
15/05/19 13:44:18 INFO rdd.HadoopRDD: Input split: hdfs://10.64.10.43:9000/user/hadoop/user/practice/knn/inputfile4
15/05/19 13:44:18 INFO rdd.HadoopRDD: Input split: hdfs://10.64.10.43:9000/user/hadoop/user/practice/knn/inputfile5
15/05/19 13:44:18 INFO Configuration.deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
15/05/19 13:44:18 INFO Configuration.deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
15/05/19 13:44:18 INFO Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
15/05/19 13:44:18 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
15/05/19 13:44:18 INFO Configuration.deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
15/05/19 13:44:19 INFO metrics.MetricsSaver: MetricsConfigRecord disabledInCluster: false instanceEngineCycleSec: 60 clusterEngineCycleSec: 60 disableClusterEngine: false
15/05/19 13:44:19 INFO metrics.MetricsSaver: Created MetricsSaver j-24SI04Y9O1ZVF:i-58ae838e:SparkSubmit:24683 period:60 /mnt/var/em/raw/i-58ae838e_20150519_SparkSubmit_24683_raw.bin
15/05/19 13:44:19 ERROR executor.Executor: Exception in task 4.0 in stage 0.0 (TID 4)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/hadoop/spark/python/pyspark/worker.py", line 90, in main
    command = pickleSer.loads(command.value)
  File "/home/hadoop/spark/python/pyspark/broadcast.py", line 106, in value
    self._value = self.load(self._path)
  File "/home/hadoop/spark/python/pyspark/broadcast.py", line 87, in load
    with open(path, 'rb', 1 << 20) as f:
IOError: [Errno 2] No such file or directory: '/mnt/spark/spark-ea646b94-3f68-47a5-8e1c-b23ac0799718/pyspark-7d842875-fae2-4563-b367-92d89d292b60/tmpjlEm1f'

    at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:135)
    at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:176)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:311)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:64)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
15/05/19 13:44:19 ERROR executor.Executor: Exception in task 2.0 in stage 0.0 (TID 2)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/hadoop/spark/python/pyspark/worker.py", line 90, in main
    command = pickleSer.loads(command.value)
  File "/home/hadoop/spark/python/pyspark/broadcast.py", line 106, in value
    self._value = self.load(self._path)
  File "/home/hadoop/spark/python/pyspark/broadcast.py", line 87, in load
    with open(path, 'rb', 1 << 20) as f:
IOError: [Errno 2] No such file or directory: '/mnt/spark/spark-ea646b94-3f68-47a5-8e1c-b23ac0799718/pyspark-7d842875-fae2-4563-b367-92d89d292b60/tmpjlEm1f'

    at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:135)
    at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:176)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:311)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:64)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
15/05/19 13:44:19 ERROR executor.Executor: Exception in task 1.0 in stage 0.0 (TID 1)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/hadoop/spark/python/pyspark/worker.py", line 90, in main
    command = pickleSer.loads(command.value)
  File "/home/hadoop/spark/python/pyspark/broadcast.py", line 106, in value
    self._value = self.load(self._path)
  File "/home/hadoop/spark/python/pyspark/broadcast.py", line 87, in load
    with open(path, 'rb', 1 << 20) as f:
IOError: [Errno 2] No such file or directory: '/mnt/spark/spark-ea646b94-3f68-47a5-8e1c-b23ac0799718/pyspark-7d842875-fae2-4563-b367-92d89d292b60/tmpjlEm1f'

    at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:135)
    at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:176)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:311)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:64)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
15/05/19 13:44:19 ERROR executor.Executor: Exception in task 3.0 in stage 0.0 (TID 3)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/hadoop/spark/python/pyspark/worker.py", line 90, in main
    command = pickleSer.loads(command.value)
  File "/home/hadoop/spark/python/pyspark/broadcast.py", line 106, in value
    self._value = self.load(self._path)
  File "/home/hadoop/spark/python/pyspark/broadcast.py", line 87, in load
    with open(path, 'rb', 1 << 20) as f:
IOError: [Errno 2] No such file or directory: '/mnt/spark/spark-ea646b94-3f68-47a5-8e1c-b23ac0799718/pyspark-7d842875-fae2-4563-b367-92d89d292b60/tmpjlEm1f'

    at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:135)
    at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:176)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:311)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:64)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
15/05/19 13:44:19 ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/hadoop/spark/python/pyspark/worker.py", line 90, in main
    command = pickleSer.loads(command.value)
  File "/home/hadoop/spark/python/pyspark/broadcast.py", line 106, in value
    self._value = self.load(self._path)
  File "/home/hadoop/spark/python/pyspark/broadcast.py", line 87, in load
    with open(path, 'rb', 1 << 20) as f:
IOError: [Errno 2] No such file or directory: '/mnt/spark/spark-ea646b94-3f68-47a5-8e1c-b23ac0799718/pyspark-7d842875-fae2-4563-b367-92d89d292b60/tmpjlEm1f

'

Answer 1


you can read files that match the patterns and run a flatMap on the folder. data_loc= "/user/hadoop/"+inputdir+"/*.txt" refer to this: read files recursively from sub directories with spark from s3 or local filesystem

When you apply flatMap on that data_loc, keys are the file names and values are the content of the file.

You don't need to run a command line process.

Hope it helps