当前位置:网站首页>flink问题整理

flink问题整理

2022-08-04 05:27:00 第一片心意

  1. 提交的flink程序重启失败
    1. 错误日志:
            
            从图中可以看出,flink程序在自动重启时,需要寻找 /tmp 下面的一些文件。但是由于linux系统的/tmp目录会被很多程序以及操作系统本身用到,所以很难避免文件的误删除操作。
    2. 解决方案
            在flink的 flink-conf.yaml 文件中,有个配置项叫 io.tmp.dirs ,该配置用于决定程序运行过程中一些临时文件保存的目录。建议将该目录配置为flink专用目录。
     
  2. flink集群无法通过 stop-cluster.sh 脚本停止
    1. 错误现象
            
           
            通过脚本停止集群,发现无法在对应的机器上找到对应的flink服务。
    2. 解决方案
            在flink的安装目录下的 /bin 目录下有个 config.sh 脚本文件,里面有一项配置用来配置flink服务的pid文件目录,配置名称为: DEFAULT_ENV_PID_DIR ,默认值为 /tmp 。由于linux系统的/tmp目录会被很多程序以及操作系统本身用到,所以很难避免文件的误删除操作。出现上述日志就是因为pid文件被删除,导致flink找不到机器上的进程pid编号所致。因此我们需要修改该默认配置为一个flink专用目录。
  3. flink sql on hive
    flink版本:1.11.1
    运行了一个sql语句,查询hive表的数据,有group by,然后对很多字段取最大值,示例代码如下:
    select user_id, max(column1), max(column2), max(column3)
    from test.test
    group by user_id;

    select后面的max字段有150多个,然后运行代码报错如下:
     

    java.lang.RuntimeException: Could not instantiate generated class 'LocalSortAggregateWithKeys$1975'
        at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:67)
        at org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:40)
        at org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:70)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:470)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:459)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:459)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:155)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:453)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: org.apache.flink.util.FlinkRuntimeException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
        at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
        at org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
        at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:65)
        ... 14 more
    Caused by: org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
        at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66)
        ... 16 more
    Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
        at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81)
        at org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66)
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
        ... 19 more
    Caused by: java.lang.StackOverflowError
        at org.codehaus.janino.CodeContext.extract16BitValue(CodeContext.java:700)
        at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:478)
        at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:557)
        at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:557)
    

    主要错误是:StackOverflowError,栈溢出。
    这个错误意思是说,方法嵌套太多。
    之后试了一下,发现是一个sql里面的聚合函数不能写太多,最大大概是120左右。建议聚合函数个数超过100个,就写两个sql,然后再把两个sql的结果进行合并。

  4. 调用太多次get_json_value函数

    将处理完的数据写入pulsar主题,主题字段特别多,超过200个,通过调用get_json_value(类似于hive中的get_json_object)函数,将处理完的结果数据(为json字符串)的内容提取出来,然后发现处理速度很慢。

    解决方式就是,结果就保存一个字段,将json字符串输入,下游处理时再通过get_json_value函数获取需要的字段值,或者是在自定义UDF中获取。

  5.  

原网站

版权声明
本文为[第一片心意]所创,转载请带上原文链接,感谢
https://blog.csdn.net/u012443641/article/details/103905059