当前位置:网站首页>Why can't the redis module be referenced in the function method called by the foreachpartition method of DF?
Why can't the redis module be referenced in the function method called by the foreachpartition method of DF?
2022-06-09 18:52:00 【Alibaba cloud Q & A】
I would like to ask you guys . # Why not in Dataframe Of foreachPartition() Method called by the method of redis Module? ? # How to solve this problem ? # The programming environment I use is jupyter. The specific operation scenario is to use dataframe Of foreachPartition Methods and custom functions will DF And advertising DF Information of is entered into redis Go to the database .
Here is my code :
feature_cols_from_ad=[ "price"]feature_cols_from_user=[ "cms_groupid_id", "final_gender_code", "age_level", "shopping_level", "occupation", "pvalue_level", "new_user_class_level"]def foreachPartition(partition): import redis import json client01=redis.StrictRedis(host="192.168.56.5",port=6379,db=10) for r in partition: data={ "price":r.price } client01.hset("ad_features",r.adgroudId,json.dumps(data)) def foreachPartition2(partition): import redis import json client02=redis.StrictRedis(host="192.168.56.5",port=6379,db=10) for r in partition: data={ "cms_group_id":r.cms_group_id, "final_gender_code":r.final_gender_code, "age_level":r.age_level, "shopping_level":r.shopping_level, "occupation":r.occupation, "pvalue_level":r.pvalue_level, "new_user_class_level":r.new_user_class_level } client02.hset("user_features",r.userId,json.dumps(data))ad_feature_df.foreachPartition(foreachPartition)user_profile_df.foreachPartition(foreachPartition2)The following is the specific error information :
22/06/02 21:37:52 WARN TaskSetManager: Lost task 1.0 in stage 376.0 (TID 1054) (192.168.56.6 executor 0): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/sft/spark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main process() File "/usr/sft/spark/python/lib/pyspark.zip/pyspark/worker.py", line 609, in process out_iter = func(split_index, iterator) File "/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py", line 2918, in pipeline_func File "/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py", line 2918, in pipeline_func File "/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py", line 2918, in pipeline_func File "/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py", line 417, in func File "/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py", line 933, in func File "/tmp/ipykernel_2896/4051490123.py", line 17, in foreachPartitionModuleNotFoundError: No module named 'redis' at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:555) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:713) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:695) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:508) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49) at scala.collection.TraversableOnce.to(TraversableOnce.scala:366) at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364) at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358) at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358) at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345) at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339) at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28) at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1030) at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)22/06/02 21:37:52 ERROR TaskSetManager: Task 0 in stage 376.0 failed 4 times; aborting job22/06/02 21:37:52 WARN TaskSetManager: Lost task 1.3 in stage 376.0 (TID 1060) (192.168.56.6 executor 0): TaskKilled (Stage cancelled)---------------------------------------------------------------------------Py4JJavaError Traceback (most recent call last)/tmp/ipykernel_2896/2204389708.py in <module>----> 1 ad_feature_df.foreachPartition(foreachPartition) 2 user_profile_df.foreachPartition(foreachPartition2)/usr/local/python3/lib/python3.7/site-packages/pyspark/sql/dataframe.py in foreachPartition(self, f) 791 >>> df.foreachPartition(f) 792 """--> 793 self.rdd.foreachPartition(f) 794 795 def cache(self):/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py in foreachPartition(self, f) 936 except TypeError: 937 return iter([])--> 938 self.mapPartitions(func).count() # Force evaluation 939 940 def collect(self):/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py in count(self) 1235 3 1236 """-> 1237 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 1238 1239 def stats(self):/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py in sum(self) 1224 6.0 1225 """-> 1226 return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add) 1227 1228 def count(self):/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py in fold(self, zeroValue, op) 1078 # zeroValue provided to each partition is unique from the one provided 1079 # to the final reduce call-> 1080 vals = self.mapPartitions(func).collect() 1081 return reduce(op, vals, zeroValue) 1082 /usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py in collect(self) 948 """ 949 with SCCallSiteSync(self.context) as css:--> 950 sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 951 return list(_load_from_socket(sock_info, self._jrdd_deserializer)) 952 /usr/local/python3/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args) 1320 answer = self.gateway_client.send_command(command) 1321 return_value = get_return_value(-> 1322 answer, self.gateway_client, self.target_id, self.name) 1323 1324 for temp_arg in temp_args:/usr/local/python3/lib/python3.7/site-packages/pyspark/sql/utils.py in deco(*a, **kw) 109 def deco(*a, **kw): 110 try:--> 111 return f(*a, **kw) 112 except py4j.protocol.Py4JJavaError as e: 113 converted = convert_exception(e.java_exception)/usr/local/python3/lib/python3.7/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n".--> 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError(Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 376.0 failed 4 times, most recent failure: Lost task 0.3 in stage 376.0 (TID 1059) (192.168.56.6 executor 0): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/sft/spark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main process() File "/usr/sft/spark/python/lib/pyspark.zip/pyspark/worker.py", line 609, in process out_iter = func(split_index, iterator) File "/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py", line 2918, in pipeline_func File "/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py", line 2918, in pipeline_func File "/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py", line 2918, in pipeline_func File "/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py", line 417, in func File "/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py", line 933, in func File "/tmp/ipykernel_2896/4051490123.py", line 17, in foreachPartitionModuleNotFoundError: No module named 'redis' at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:555) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:713) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:695) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:508) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49) at scala.collection.TraversableOnce.to(TraversableOnce.scala:366) at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364) at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358) at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358) at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345) at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339) at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28) at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1030) at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279) at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) at org.apache.spark.rdd.RDD.collect(RDD.scala:1029) at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180) at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala) at sun.reflect.GeneratedMethodAccessor184.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.lang.Thread.run(Thread.java:750)Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/sft/spark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main process() File "/usr/sft/spark/python/lib/pyspark.zip/pyspark/worker.py", line 609, in process out_iter = func(split_index, iterator) File "/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py", line 2918, in pipeline_func File "/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py", line 2918, in pipeline_func File "/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py", line 2918, in pipeline_func File "/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py", line 417, in func File "/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py", line 933, in func File "/tmp/ipykernel_2896/4051490123.py", line 17, in foreachPartitionModuleNotFoundError: No module named 'redis' at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:555) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:713) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:695) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:508) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49) at scala.collection.TraversableOnce.to(TraversableOnce.scala:366) at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364) at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358) at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358) at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345) at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339) at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28) at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1030) at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more边栏推荐
猜你喜欢

测量电流探头如何降低噪音

避免惊群以及负载均衡的原理与具体实现

Win10安装WSL1在D、E、F盘

Cortex-a9 Samsung itop-4412 development board introduction embedded

【SOLIDWORKS-详细记录】测量方法、设置草图自动正视基准面、设置滚轮放大缩小方向、螺丝添加螺纹线等操作记录

Alibaba open source TTL is used in microservices to gracefully realize inter thread reuse of identity information

Illustration | cache system consistency for high performance server design

CAM350 checking Gerber and drilling files

第一次画板子小记

Singular Value Decomposition(SVD)
随机推荐
WSL 挂载U盘
10 common high-frequency business scenarios that trigger IO bottlenecks
嵌入式软件设计(中期总结)
浪潮cs5280H raid方案详细
Squeeze and exception networks learning notes
Chen's technology exchange group recruits~
什么是集群?为什么要使用集群架构?
Principle and implementation of avoiding group panic and load balancing
Three annotations, elegant implementation of micro service authentication
Analysis: a stable currency is not a "stable currency" but a product in nature
聊聊 延时消息的 6种 实现方案
【SOLIDWORKS-详细记录】解决默认模板无效问题及添加自定义模板
Jsonpath tutorial
C# 34. UdpClient收发
一文彻底理解并发编程中非常重要的票据锁——StampedLock
Introduction to Multivariate Statistics
评“开发人员不喜欢低代码和无代码的8个理由”
MQTT 图形化客户端-MQTTX安装使用教程
技术分享 | Selenium多浏览器处理
mfc连接数据库显示未发现数据源名称并且未指定默认驱动程序