踩坑填坑-个人总结持续更新

踩坑填坑-个人总结持续更新,第1张

踩坑填坑-个人总结持续更新

===================================== AirFlow 部分========================================

1.Airflow 任务假死问题

版本:Airflow 1.10.9
现象:
日志出现: dependency ‘Task Instance State’ FAILED: Task is in the ‘running’ state which is not a valid state for execution. The task must be cleared in order to be run.
然后就会一直hang住在这里,这样这个进程就会一直挂在后台,一直不会被清掉,如果这样的情况多了 ,那么对于服务器压力压力还是很大的。

开始排查:看到这个问题,首先就去看说他依赖到上一个任务,发现上一个任务是完全运行OK 的.这个就很奇怪了,这个作业本身就是一个简单的调用脚本的 *** 作。
后来在看到官方的git上有一个这样的 issue


简单来说就是老版本的bug,这个已经被修复合并到新版本里了。
对于这个结果有两个问题:1.我们其他也有很多5分钟跑的任务也都没出现这个问题 ;2.版本升级暂时不太好处理;
后来索性直接在dag上加了一个超时时间

'execution_timeout': timedelta(seconds=3600)

尽管没能从根本上解决这个问题,但是暂时能避免失败任务堆积的问题;

2.依赖外部Dag Task的一个demo:
import datetime
from airflow import DAG
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.sensors.external_task_sensor import ExternalTaskMarker, ExternalTaskSensor

import pendulum
from tools.utils import list2bash

local_tz = pendulum.timezone("Asia/Shanghai")

default_args = {
    'owner': 'liuge36',
    'depends_on_past': False,
    'start_date': datetime.datetime(2021, 10, 8, 0, tzinfo=local_tz),
    'email': [
        'liuge36@163.com'
    ],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'execution_timeout': datetime.timedelta(seconds=600),
    'retry_delay': datetime.timedelta(seconds=10),
}

ssh_hook_emr = SSHHook(ssh_conn_id='ssh_emr_master',timeout=30)

# 每5分钟
dag = DAG(
    'process_doris_5min',
    default_args=default_args,
    catchup=False,
    schedule_interval="*/5 2-23 * * *"
)


sensor_xx_task = ExternalTaskSensor(
    task_id="sensor_xx_task",
    external_dag_id="process_doris",
    external_task_id="xx",
    allowed_states=['success'],
    execution_delta=datetime.timedelta(minutes=0), # 注意这里的写法
    dag=dag
)

export_xx_mysql = SSHOperator(
    task_id='export_xx_mysql ',
    ssh_hook=ssh_hook_emr,
    retries=1,
    command=list2bash([
        'ssh user@ip "python /datax/bin/datax.py /datax/ads_xx.json "'
    ]),
    dag=dag
)

sensor_xx_task >> export_xx_mysql 


===================================== DataX 部分 ========================================

1.DataX 任务正常启动后,数据写入一直不变,也不报错

现象: INFO StandAloneJobContainerCommunicator - Total 3421 records, 2496950 bytes | Speed 0B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 211.342s | All Task WaitReaderTime 0.814s | Percentage 0.00%

我这边是将 DorisDB中的结果数据导出到MySQL中出现了这个问题,结果表的总数据量大约在600w左右,每当跑到190w左右的时候就会出现这个现象,然后我就把内存调大到4G ,如下:

python /datax/bin/datax.py --jvm="-Xms4g -Xmx4g"  /datax/ads_xxx_all.json

然而并没有什么用。

同样,在网上搜索了一下发现,也有小伙伴遇到类似的情况 issue

ps:
jstack 定位死循环、线程阻塞、死锁等问题
当我们运行java程序时,发现程序不动,但又不知道是哪里出问题时,可以使用JDK自带的jstack工具去定位

找到datax的进程号 ,果然发现是 reader 线程卡在了 close connection

总结:
问题原因:由于拉取数据过大,且sql语句未走索引,导致每次拉取数据都要 全表扫描 导致数据库性能达到瓶颈,最终无法响应closeDBResources()方法。
解决方案: 优化sql语句走索引,分段拉取

最后调整如下:

{

    "core": {
        "transport": {
            "channel": {
                "speed": {
                    "byte": 5242880
                }
            }
        }
    },
    "job": {
        "setting": {
            "speed": {
                "channel":1,
                "batchSize": 2048
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "xxx",
                        "password": "xxx",
                        "column":["account_id","lecture_id","liveroom_id","activity_id","xxxx"],
                        "connection":[
                            {
                                "table":[
                                    "ads_xxx"
                                ],
                                "jdbcUrl":[
                                    "jdbc:mysql://ip:9030/warehouse"
                                ]
                            }
                        ],
                        "splitPk": "lecture_id"
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "update",
                        "username": "xxx",
                        "password": "xxx",
                        "column":["account_id","lecture_id","liveroom_id","activity_id","xxxx"],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://ip:3306/weike_aggregation",
                                "table": [
                                    "ads_xxx"
                                ]
                            }
                        ]
                    }
                }
            }
        ]
    }
}


=================== Git部分 ======================

1.git 放弃本地修改,强制拉取更新

git fetch --all
git reset --hard origin/master
git pull //可以省略

git fetch 指令是下载远程仓库最新内容,不做合并
git reset 指令把HEAD指向master最新版本

===================================== Flink 部分 ========================================

1.ERROR [com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction] - Reporting error:java.lang.NoSuchMethodError: sun.misc.Unsafe.monitorEnter(Ljava/lang/Object;)V

Flink版本:1.12.0
如题:使用Flink CDC同步MySQL数据的时候出现上面报错;

这个问题我这边是由于本地JDK版本不适配的问题,之前使用的是JDK11,后面调整到JDK8就没有报这个错误了;

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/4827394.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-10
下一篇 2022-11-10

发表评论

登录后才能评论

评论列表(0条)

保存