当前位置:网站首页>Why can't the redis module be referenced in the function method called by the foreachpartition method of DF?

Why can't the redis module be referenced in the function method called by the foreachpartition method of DF?

2022-06-09 18:52:00 Alibaba cloud Q & A

I would like to ask you guys . # Why not in Dataframe Of foreachPartition() Method called by the method of redis Module? ? # How to solve this problem ? # The programming environment I use is jupyter. The specific operation scenario is to use dataframe Of foreachPartition Methods and custom functions will DF And advertising DF Information of is entered into redis Go to the database .

Here is my code :

feature_cols_from_ad=[    "price"]feature_cols_from_user=[    "cms_groupid_id",    "final_gender_code",    "age_level",    "shopping_level",    "occupation",    "pvalue_level",    "new_user_class_level"]def foreachPartition(partition):    import redis    import json    client01=redis.StrictRedis(host="192.168.56.5",port=6379,db=10)    for r in partition:        data={            "price":r.price        }        client01.hset("ad_features",r.adgroudId,json.dumps(data))        def foreachPartition2(partition):    import redis    import json    client02=redis.StrictRedis(host="192.168.56.5",port=6379,db=10)    for r in partition:        data={            "cms_group_id":r.cms_group_id,            "final_gender_code":r.final_gender_code,            "age_level":r.age_level,            "shopping_level":r.shopping_level,            "occupation":r.occupation,            "pvalue_level":r.pvalue_level,            "new_user_class_level":r.new_user_class_level        }        client02.hset("user_features",r.userId,json.dumps(data))ad_feature_df.foreachPartition(foreachPartition)user_profile_df.foreachPartition(foreachPartition2)

The following is the specific error information :

22/06/02 21:37:52 WARN TaskSetManager: Lost task 1.0 in stage 376.0 (TID 1054) (192.168.56.6 executor 0): org.apache.spark.api.python.PythonException: Traceback (most recent call last):  File "/usr/sft/spark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main    process()  File "/usr/sft/spark/python/lib/pyspark.zip/pyspark/worker.py", line 609, in process    out_iter = func(split_index, iterator)  File "/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py", line 2918, in pipeline_func  File "/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py", line 2918, in pipeline_func  File "/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py", line 2918, in pipeline_func  File "/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py", line 417, in func  File "/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py", line 933, in func  File "/tmp/ipykernel_2896/4051490123.py", line 17, in foreachPartitionModuleNotFoundError: No module named 'redis'	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:555)	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:713)	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:695)	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:508)	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)	at scala.collection.Iterator.foreach(Iterator.scala:943)	at scala.collection.Iterator.foreach$(Iterator.scala:943)	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1030)	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254)	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)	at org.apache.spark.scheduler.Task.run(Task.scala:131)	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)	at java.lang.Thread.run(Thread.java:750)22/06/02 21:37:52 ERROR TaskSetManager: Task 0 in stage 376.0 failed 4 times; aborting job22/06/02 21:37:52 WARN TaskSetManager: Lost task 1.3 in stage 376.0 (TID 1060) (192.168.56.6 executor 0): TaskKilled (Stage cancelled)---------------------------------------------------------------------------Py4JJavaError                             Traceback (most recent call last)/tmp/ipykernel_2896/2204389708.py in <module>----> 1 ad_feature_df.foreachPartition(foreachPartition)      2 user_profile_df.foreachPartition(foreachPartition2)/usr/local/python3/lib/python3.7/site-packages/pyspark/sql/dataframe.py in foreachPartition(self, f)    791         >>> df.foreachPartition(f)    792         """--> 793         self.rdd.foreachPartition(f)    794     795     def cache(self):/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py in foreachPartition(self, f)    936             except TypeError:    937                 return iter([])--> 938         self.mapPartitions(func).count()  # Force evaluation    939     940     def collect(self):/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py in count(self)   1235         3   1236         """-> 1237         return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()   1238    1239     def stats(self):/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py in sum(self)   1224         6.0   1225         """-> 1226         return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)   1227    1228     def count(self):/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py in fold(self, zeroValue, op)   1078         # zeroValue provided to each partition is unique from the one provided   1079         # to the final reduce call-> 1080         vals = self.mapPartitions(func).collect()   1081         return reduce(op, vals, zeroValue)   1082 /usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py in collect(self)    948         """    949         with SCCallSiteSync(self.context) as css:--> 950             sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())    951         return list(_load_from_socket(sock_info, self._jrdd_deserializer))    952 /usr/local/python3/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args)   1320         answer = self.gateway_client.send_command(command)   1321         return_value = get_return_value(-> 1322             answer, self.gateway_client, self.target_id, self.name)   1323    1324         for temp_arg in temp_args:/usr/local/python3/lib/python3.7/site-packages/pyspark/sql/utils.py in deco(*a, **kw)    109     def deco(*a, **kw):    110         try:--> 111             return f(*a, **kw)    112         except py4j.protocol.Py4JJavaError as e:    113             converted = convert_exception(e.java_exception)/usr/local/python3/lib/python3.7/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)    326                 raise Py4JJavaError(    327                     "An error occurred while calling {0}{1}{2}.\n".--> 328                     format(target_id, ".", name), value)    329             else:    330                 raise Py4JError(Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 376.0 failed 4 times, most recent failure: Lost task 0.3 in stage 376.0 (TID 1059) (192.168.56.6 executor 0): org.apache.spark.api.python.PythonException: Traceback (most recent call last):  File "/usr/sft/spark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main    process()  File "/usr/sft/spark/python/lib/pyspark.zip/pyspark/worker.py", line 609, in process    out_iter = func(split_index, iterator)  File "/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py", line 2918, in pipeline_func  File "/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py", line 2918, in pipeline_func  File "/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py", line 2918, in pipeline_func  File "/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py", line 417, in func  File "/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py", line 933, in func  File "/tmp/ipykernel_2896/4051490123.py", line 17, in foreachPartitionModuleNotFoundError: No module named 'redis'	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:555)	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:713)	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:695)	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:508)	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)	at scala.collection.Iterator.foreach(Iterator.scala:943)	at scala.collection.Iterator.foreach$(Iterator.scala:943)	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1030)	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254)	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)	at org.apache.spark.scheduler.Task.run(Task.scala:131)	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)	at java.lang.Thread.run(Thread.java:750)Driver stacktrace:	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)	at scala.Option.foreach(Option.scala:407)	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)	at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180)	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)	at sun.reflect.GeneratedMethodAccessor184.invoke(Unknown Source)	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)	at java.lang.reflect.Method.invoke(Method.java:498)	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)	at py4j.Gateway.invoke(Gateway.java:282)	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)	at py4j.commands.CallCommand.execute(CallCommand.java:79)	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)	at java.lang.Thread.run(Thread.java:750)Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):  File "/usr/sft/spark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main    process()  File "/usr/sft/spark/python/lib/pyspark.zip/pyspark/worker.py", line 609, in process    out_iter = func(split_index, iterator)  File "/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py", line 2918, in pipeline_func  File "/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py", line 2918, in pipeline_func  File "/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py", line 2918, in pipeline_func  File "/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py", line 417, in func  File "/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py", line 933, in func  File "/tmp/ipykernel_2896/4051490123.py", line 17, in foreachPartitionModuleNotFoundError: No module named 'redis'	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:555)	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:713)	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:695)	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:508)	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)	at scala.collection.Iterator.foreach(Iterator.scala:943)	at scala.collection.Iterator.foreach$(Iterator.scala:943)	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1030)	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254)	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)	at org.apache.spark.scheduler.Task.run(Task.scala:131)	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)	... 1 more

原网站

版权声明
本文为[Alibaba cloud Q & A]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/160/202206030201556509.html