===================================== AirFlow 部分========================================
1.Airflow 任务假死问题版本:Airflow 1.10.9
现象:
日志出现: dependency ‘Task Instance State’ FAILED: Task is in the ‘running’ state which is not a valid state for execution. The task must be cleared in order to be run.
然后就会一直hang住在这里,这样这个进程就会一直挂在后台,一直不会被清掉,如果这样的情况多了 ,那么对于服务器压力压力还是很大的。
开始排查:看到这个问题,首先就去看说他依赖到上一个任务,发现上一个任务是完全运行OK 的.这个就很奇怪了,这个作业本身就是一个简单的调用脚本的 *** 作。
后来在看到官方的git上有一个这样的 issue
简单来说就是老版本的bug,这个已经被修复合并到新版本里了。
对于这个结果有两个问题:1.我们其他也有很多5分钟跑的任务也都没出现这个问题 ;2.版本升级暂时不太好处理;
后来索性直接在dag上加了一个超时时间
'execution_timeout': timedelta(seconds=3600)
尽管没能从根本上解决这个问题,但是暂时能避免失败任务堆积的问题;
2.依赖外部Dag Task的一个demo:import datetime from airflow import DAG from airflow.contrib.hooks.ssh_hook import SSHHook from airflow.contrib.operators.ssh_operator import SSHOperator from airflow.sensors.external_task_sensor import ExternalTaskMarker, ExternalTaskSensor import pendulum from tools.utils import list2bash local_tz = pendulum.timezone("Asia/Shanghai") default_args = { 'owner': 'liuge36', 'depends_on_past': False, 'start_date': datetime.datetime(2021, 10, 8, 0, tzinfo=local_tz), 'email': [ 'liuge36@163.com' ], 'email_on_failure': True, 'email_on_retry': False, 'retries': 2, 'execution_timeout': datetime.timedelta(seconds=600), 'retry_delay': datetime.timedelta(seconds=10), } ssh_hook_emr = SSHHook(ssh_conn_id='ssh_emr_master',timeout=30) # 每5分钟 dag = DAG( 'process_doris_5min', default_args=default_args, catchup=False, schedule_interval="*/5 2-23 * * *" ) sensor_xx_task = ExternalTaskSensor( task_id="sensor_xx_task", external_dag_id="process_doris", external_task_id="xx", allowed_states=['success'], execution_delta=datetime.timedelta(minutes=0), # 注意这里的写法 dag=dag ) export_xx_mysql = SSHOperator( task_id='export_xx_mysql ', ssh_hook=ssh_hook_emr, retries=1, command=list2bash([ 'ssh user@ip "python /datax/bin/datax.py /datax/ads_xx.json "' ]), dag=dag ) sensor_xx_task >> export_xx_mysql
===================================== DataX 部分 ========================================
1.DataX 任务正常启动后,数据写入一直不变,也不报错现象: INFO StandAloneJobContainerCommunicator - Total 3421 records, 2496950 bytes | Speed 0B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 211.342s | All Task WaitReaderTime 0.814s | Percentage 0.00%
我这边是将 DorisDB中的结果数据导出到MySQL中出现了这个问题,结果表的总数据量大约在600w左右,每当跑到190w左右的时候就会出现这个现象,然后我就把内存调大到4G ,如下:
python /datax/bin/datax.py --jvm="-Xms4g -Xmx4g" /datax/ads_xxx_all.json
然而并没有什么用。
同样,在网上搜索了一下发现,也有小伙伴遇到类似的情况 issue
ps: jstack 定位死循环、线程阻塞、死锁等问题 当我们运行java程序时,发现程序不动,但又不知道是哪里出问题时,可以使用JDK自带的jstack工具去定位
找到datax的进程号 ,果然发现是 reader 线程卡在了 close connection
总结:
问题原因:由于拉取数据过大,且sql语句未走索引,导致每次拉取数据都要 全表扫描 导致数据库性能达到瓶颈,最终无法响应closeDBResources()方法。
解决方案: 优化sql语句走索引,分段拉取
最后调整如下:
{ "core": { "transport": { "channel": { "speed": { "byte": 5242880 } } } }, "job": { "setting": { "speed": { "channel":1, "batchSize": 2048 } }, "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username": "xxx", "password": "xxx", "column":["account_id","lecture_id","liveroom_id","activity_id","xxxx"], "connection":[ { "table":[ "ads_xxx" ], "jdbcUrl":[ "jdbc:mysql://ip:9030/warehouse" ] } ], "splitPk": "lecture_id" } }, "writer": { "name": "mysqlwriter", "parameter": { "writeMode": "update", "username": "xxx", "password": "xxx", "column":["account_id","lecture_id","liveroom_id","activity_id","xxxx"], "connection": [ { "jdbcUrl": "jdbc:mysql://ip:3306/weike_aggregation", "table": [ "ads_xxx" ] } ] } } } ] } }
=================== Git部分 ======================
1.git 放弃本地修改,强制拉取更新git fetch --all
git reset --hard origin/master
git pull //可以省略
git fetch 指令是下载远程仓库最新内容,不做合并
git reset 指令把HEAD指向master最新版本
===================================== Flink 部分 ========================================
1.ERROR [com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction] - Reporting error:java.lang.NoSuchMethodError: sun.misc.Unsafe.monitorEnter(Ljava/lang/Object;)VFlink版本:1.12.0
如题:使用Flink CDC同步MySQL数据的时候出现上面报错;
这个问题我这边是由于本地JDK版本不适配的问题,之前使用的是JDK11,后面调整到JDK8就没有报这个错误了;
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)