ETL调度系统

ETL调度系统,第1张

概述ETL调度系统

下面是内存溢出 jb51.cc 通过网络收集整理的代码片段。

内存溢出小编现在分享给大家,也给大家做个参考。

CREATE OR REPLACE PACKAGE PKG_ETL_CTL IS  -- 调用主程序,调用存储过程  PROCEDURE SP_EXEC_PROC(I_JOB_ID NUMBER);  -- 创建新的时间周期,运行数据周期作业及作业流状态处理  PROCEDURE SP_FLOW_RUN_DEAL;  -- 创建新的时间周期  PROCEDURE SP_FLOW_CREATE_NEW_PERIOD;  -- 运行数据周期内的作业流及作业  PROCEDURE SP_FLOW_RUN_NEW_PERIOD;  -- 同步作业流运行状态  PROCEDURE SP_FLOW_RUN_STATUS;  -- 失败作业流及作业重置为未处理  PROCEDURE SP_FLOW_RUN_ERROR_reset;  -- 作业状态更新  PROCEDURE SP_JOB_RUN_STATUS  (    I_JOB_ID NUMBER,I_ORG_ID VARCHAR2,I_JOB_RUN_STATUS VARCHAR2,I_JOB_RUN_INFO VARCHAR2  );  -- 作业流日志处理  PROCEDURE SP_ETL_FLOW_LOG_INFO(I_FLOW_ID NUMBER);  -- 作业日志处理  PROCEDURE SP_ETL_JOB_LOG_INFO(I_JOB_ID NUMBER);  PROCEDURE SP_INSERT_MONITOR_SMS  (    O_RESulT_FLAG OUT VARCHAR2 /*过程执行结果返回给调度 9 成功 2 失败*/,O_RESulT_MSG OUT VARCHAR2 /*过程执行结果信息返回给调度*/  );  PROCEDURE SP_SEND_MONITOR_SMS;  -- 获取作业流前置依赖  FUNCTION FN_GET_FLOW_DEPEND(I_FLOW_ID NUMBER) RETURN VARCHAR2;  -- 获取作业前置依赖  FUNCTION FN_GET_JOB_DEPEND(I_JOB_ID NUMBER) RETURN VARCHAR2;  -- 获取作业流下属作业运行状态  FUNCTION FN_GET_FLOW_RUN_STATUS(I_FLOW_ID NUMBER) RETURN VARCHAR2;  -- 获取下个数据日期  FUNCTION FN_GET_NEXT_DATA_TIME(I_FLOW_ID NUMBER) RETURN DATE;  -- 获取上级作业流运行状态  FUNCTION FN_GET_SUPER_FLOW_RUN_STATUS(I_FLOW_ID NUMBER) RETURN VARCHAR2;  -- 获取上级作业流数据开始时间  FUNCTION FN_GET_SUPER_DATA_START_TIME(I_FLOW_ID NUMBER) RETURN DATE;  -- 获取上级作业流数据结束时间  FUNCTION FN_GET_SUPER_DATA_END_TIME(I_FLOW_ID NUMBER) RETURN DATE;  -- 获取周期代码  FUNCTION FN_GET_CYC_CODE(I_FLOW_ID VARCHAR2) RETURN VARCHAR2;END PKG_ETL_CTL;/CREATE OR REPLACE PACKAGE BODY PKG_ETL_CTL IS  /*******************************************************************    程序名   :SP_EXEC_PROC    创建人   : zhuyh    创建时间 : 2013/8/20    功能描述 : 调用主程序,调用存储过程    修改人   :    修改时间 :    修改原因 :  *******************************************************************/  PROCEDURE SP_EXEC_PROC(I_JOB_ID NUMBER) IS    VAR_PRO_name        VARCHAR2(100);    VAR_DATA_START_TIME VARCHAR2(20);    VAR_DATA_END_TIME   VARCHAR2(20);    VAR_sql             VARCHAR2(4000);    VAR_ParaMS          VARCHAR2(1000);    VAR_ORG_ID          VARCHAR2(10);    VAR_JOB_RUN_DESC    VARCHAR2(100);    VAR_JOB_ERR_DESC    VARCHAR2(100);    BEGIN    -- 获取作业正在运行描述    SELECT T.ETL_Para_VAL INTO VAR_JOB_RUN_DESC FROM ETL_CTL_Para T WHERE UPPER(T.ETL_Para_name) = 'ETL_JOB_RUN_DESC';      -- 获取作业运行失败描述    SELECT T.ETL_Para_VAL INTO VAR_JOB_ERR_DESC FROM ETL_CTL_Para T WHERE UPPER(T.ETL_Para_name) = 'ETL_JOB_ERR_DESC';      -- 获取作业所调用的存储过程,数据开始时间,数据结束时间    SELECT T.ETL_JOB_PROC,TO_CHAR(A.ETL_DATA_START_TIME,'YYYYMMDDHH24MISS'),TO_CHAR(A.ETL_DATA_END_TIME,'YYYYMMDDHH24MISS')      INTO VAR_PRO_name,VAR_DATA_START_TIME,VAR_DATA_END_TIME      FROM ETL_CTL_JOB_INFO T,ETL_JOB_RUN_STS A     WHERE T.ETL_JOB_ID = A.ETL_JOB_ID       AND T.ETL_JOB_ID = I_JOB_ID;      -- 获取作业全部参数拼接到一起    FOR LOOP_ParaM IN (SELECT T.ETL_Para_name,DECODE(T.ETL_Para_TYPE,2,'TO_DATE(' || T.ETL_Para_VAL || ',''YYYYMMDDHH24MISS'')' -- 日期类型参数转化成日期格式,T.ETL_Para_VAL) ETL_Para_VAL,T.ETL_Para_TYPE                         FROM ETL_JOB_Para T                        WHERE T.ETL_JOB_ID = I_JOB_ID) LOOP      -- 获取机构号      IF LOOP_ParaM.ETL_Para_name = 'I_ORG_ID'      THEN        VAR_ORG_ID := LOOP_ParaM.ETL_Para_VAL;      END IF;      -- 参数拼接      VAR_ParaMS := VAR_ParaMS || LOOP_ParaM.ETL_Para_name || ' => ' || LOOP_ParaM.ETL_Para_VAL || ',';    END LOOP;      -- 参数加上输出参数(存储过程运行结果和运行信息)    VAR_ParaMS := UPPER(VAR_ParaMS) || 'O_RESulT_FLAG => LO_RESulT_FLAG,O_RESulT_MSG => LO_RESulT_MSG';    -- 参数替换为变量    VAR_ParaMS := REPLACE(VAR_ParaMS,'#$I_DATA_START_TIME#',VAR_DATA_START_TIME);      VAR_ParaMS := REPLACE(VAR_ParaMS,'#$I_DATA_END_TIME#',VAR_DATA_END_TIME);      -- 拼接存储过程进行调用    VAR_sql := 'DECLARE  LO_RESulT_FLAG VARCHAR2(10);LO_RESulT_MSG VARCHAR2(300);BEGIN ' || VAR_PRO_name || '(' ||               VAR_ParaMS || ');PKG_ETL_CTL.SP_JOB_RUN_STATUS(' || I_JOB_ID || ',''' || VAR_ORG_ID ||               ''',LO_RESulT_FLAG,LO_RESulT_MSG);END;';      -- 修改作业状态为正在运行    SP_JOB_RUN_STATUS(I_JOB_ID,VAR_ORG_ID,'1',VAR_JOB_RUN_DESC);    -- 运行存储过程    EXECUTE IMMEDIATE VAR_sql;    EXCEPTION    WHEN OTHERS THEN      -- 运行出错,返回错误信息      SP_JOB_RUN_STATUS(I_JOB_ID,'2',sqlERRM);  END;  /*******************************************************************    程序名   :SP_FLOW_RUN_DEAL    创建人   : zhuyh    创建时间 : 2013/8/20    功能描述 : 创建新的时间周期,运行数据周期作业及作业流状态处理    修改人   :    修改时间 :    修改原因 :  *******************************************************************/  PROCEDURE SP_FLOW_RUN_DEAL IS  BEGIN      -- 创建新的时间周期    SP_FLOW_CREATE_NEW_PERIOD;    -- 运行时间周期内的作业    SP_FLOW_RUN_NEW_PERIOD;    -- 更新作业流状态    SP_FLOW_RUN_STATUS;    -- 失败任务重置    SP_FLOW_RUN_ERROR_reset;  END;  /*******************************************************************    程序名   :SP_FLOW_CREATE_NEW_PERIOD    创建人   : zhuyh    创建时间 : 2013/8/20    功能描述 : 生成新的数据周期运行新周期的数据    修改人   :    修改时间 :    修改原因 :  *******************************************************************/  PROCEDURE SP_FLOW_CREATE_NEW_PERIOD IS    DTE_DATA_NEXT_TIME  DATE;    DTE_DATA_START_TIME DATE;    DTE_DATA_END_TIME   DATE;  BEGIN      FOR LOOP_FLOW IN (SELECT T.ETL_FLOW_ID,A.ETL_NEXT_EXPIRY_TIME                        FROM ETL_CTL_JOB_FLOW T,ETL_FLOW_RUN_STS A                       WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID                         AND T.ETL_FLOW_LEVEL = 1                         AND T.ETL_FLOW_STATUS = 1) LOOP      DTE_DATA_NEXT_TIME := FN_GET_NEXT_DATA_TIME(LOOP_FLOW.ETL_FLOW_ID);      IF DTE_DATA_NEXT_TIME <= LOOP_FLOW.ETL_NEXT_EXPIRY_TIME      THEN        UPDATE ETL_FLOW_RUN_STS T           SET T.ETL_NEXT_DATA_TIME = DTE_DATA_NEXT_TIME         WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID;        COMMIT;      END IF;    END LOOP;      FOR LOOP_FLOW IN (SELECT T.ETL_FLOW_ID,T.ETL_DATA_SUCC_TIME,T.ETL_DATA_START_TIME,T.ETL_DATA_END_TIME,T.ETL_NEXT_DATA_TIME,T.ETL_FLOW_RUN_STATUS,A.ETL_CYC_CODE,A.ETL_FLOW_LEVEL,A.ETL_FLOW_STATUS                        FROM ETL_FLOW_RUN_STS T,ETL_CTL_JOB_FLOW A                       WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID                         AND A.ETL_FLOW_STATUS = 1                       ORDER BY A.ETL_FLOW_LEVEL) LOOP      IF LOOP_FLOW.ETL_FLOW_LEVEL = 1         AND LOOP_FLOW.ETL_FLOW_STATUS = 1         AND LOOP_FLOW.ETL_FLOW_RUN_STATUS = 9         AND LOOP_FLOW.ETL_DATA_SUCC_TIME < LOOP_FLOW.ETL_NEXT_DATA_TIME      THEN        UPDATE ETL_FLOW_RUN_STS T           SET T.ETL_DATA_START_TIME = CASE                                         WHEN LOOP_FLOW.ETL_CYC_CODE = '01' THEN                                          T.ETL_DATA_SUCC_TIME + 1                                         WHEN LOOP_FLOW.ETL_CYC_CODE = '02' THEN                                          T.ETL_DATA_SUCC_TIME + 1 / 24 / 60 / 60                                         WHEN LOOP_FLOW.ETL_CYC_CODE = '03' THEN                                          T.ETL_DATA_SUCC_TIME + 1                                       END,T.ETL_DATA_END_TIME = T.ETL_NEXT_DATA_TIME,T.ETL_START_TIME = NulL,T.ETL_END_TIME = NulL,T.ETL_FLOW_RUN_STATUS = 0,T.ETL_reset_TIME = 0         WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID;        COMMIT;      ELSIF LOOP_FLOW.ETL_FLOW_LEVEL > 1            AND LOOP_FLOW.ETL_FLOW_STATUS = 1            AND FN_GET_SUPER_FLOW_RUN_STATUS(LOOP_FLOW.ETL_FLOW_ID) = 0      THEN        DTE_DATA_START_TIME := FN_GET_SUPER_DATA_START_TIME(LOOP_FLOW.ETL_FLOW_ID);        DTE_DATA_END_TIME   := FN_GET_SUPER_DATA_END_TIME(LOOP_FLOW.ETL_FLOW_ID);        UPDATE ETL_FLOW_RUN_STS T           SET T.ETL_DATA_START_TIME = DTE_DATA_START_TIME,T.ETL_DATA_END_TIME = DTE_DATA_END_TIME,T.ETL_FLOW_RUN_STATUS = 0         WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID;        COMMIT;      END IF;    END LOOP;      FOR LOOP_JOB IN (SELECT A.ETL_JOB_ID,T.ETL_DATA_START_TIME FLOW_DATA_START_TIME,T.ETL_DATA_END_TIME FLOW_DATA_END_TIME,A.ETL_DATA_START_TIME JOB_DATA_START_TIME,A.ETL_DATA_END_TIME JOB_DATA_END_TIME,B.ETL_JOB_STATUS                       FROM ETL_FLOW_RUN_STS T,ETL_JOB_RUN_STS A,ETL_CTL_JOB_INFO B                      WHERE T.ETL_FLOW_ID = B.ETL_FLOW_ID                        AND A.ETL_JOB_ID = B.ETL_JOB_ID                        AND B.ETL_JOB_STATUS = 1) LOOP      IF LOOP_JOB.ETL_FLOW_RUN_STATUS = 0         AND LOOP_JOB.ETL_JOB_STATUS = 1      THEN        UPDATE ETL_JOB_RUN_STS T           SET T.ETL_DATA_START_TIME = LOOP_JOB.FLOW_DATA_START_TIME,T.ETL_DATA_END_TIME = LOOP_JOB.FLOW_DATA_END_TIME,T.ETL_JOB_RUN_STATUS = 0,T.ETL_SESSION_ID = NulL,T.ETL_LOG_DESC = NulL         WHERE T.ETL_JOB_ID = LOOP_JOB.ETL_JOB_ID;      END IF;    END LOOP;    COMMIT;  END;  /*******************************************************************    程序名   :SP_FLOW_RUN_NEW_PERIOD    创建人   : zhuyh    创建时间 : 2013/8/20    功能描述 : 运行新周期的数据    修改人   :    修改时间 :    修改原因 :  *******************************************************************/  PROCEDURE SP_FLOW_RUN_NEW_PERIOD IS    BEGIN    FOR LOOP_FLOW IN (SELECT T.ETL_FLOW_ID,ETL_CTL_JOB_FLOW A                       WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID                         AND A.ETL_FLOW_STATUS = 1                       ORDER BY A.ETL_FLOW_LEVEL) LOOP      IF LOOP_FLOW.ETL_FLOW_LEVEL = 1         AND LOOP_FLOW.ETL_FLOW_RUN_STATUS IN (0,3)         AND FN_GET_FLOW_DEPEND(LOOP_FLOW.ETL_FLOW_ID) = 1      THEN        UPDATE ETL_FLOW_RUN_STS T           SET T.ETL_START_TIME = SYSDATE,T.ETL_FLOW_RUN_STATUS = 1         WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID;        COMMIT;      ELSIF LOOP_FLOW.ETL_FLOW_LEVEL > 1            AND FN_GET_SUPER_FLOW_RUN_STATUS(LOOP_FLOW.ETL_FLOW_ID) = 1            AND LOOP_FLOW.ETL_FLOW_RUN_STATUS IN (0,3)            AND FN_GET_FLOW_DEPEND(LOOP_FLOW.ETL_FLOW_ID) = 1      THEN        UPDATE ETL_FLOW_RUN_STS T           SET T.ETL_START_TIME = SYSDATE,T.ETL_FLOW_RUN_STATUS = 1         WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID;        COMMIT;      END IF;    END LOOP;    END;  /*******************************************************************    程序名   :SP_FLOW_RUN_STATUS    创建人   : zhuyh    创建时间 : 2013/8/20    功能描述 : 作业流状态处理    修改人   :    修改时间 :    修改原因 :  *******************************************************************/  PROCEDURE SP_FLOW_RUN_STATUS IS    -- VAR_JOB_NO_SESSION    VARCHAR2(100); -- 数据库进程不存在    VAR_JOB_OVERTIME_DESC VARCHAR2(100); -- 作业运行超时描述    INT_JOB_OVERTIME      NUMBER; -- 作业超时告警时间    INT_LAST_RUNTIME      NUMBER; -- 作业上次运行时间    -- INT_JOB_DEAD_TIME     NUMBER; -- 调度作业数据库进程不存在运行判定时间  BEGIN    -- 获取数据库进程不存在描述    /*SELECT T.ETL_Para_VAL     INTO VAR_JOB_NO_SESSION     FROM ETL_CTL_Para T    WHERE UPPER(T.ETL_Para_name) = 'ETL_JOB_NO_SESSION_DESC';*/      -- 获取作业运行超时告警时间    SELECT T.ETL_Para_VAL INTO INT_JOB_OVERTIME FROM ETL_CTL_Para T WHERE UPPER(T.ETL_Para_name) = 'ETL_JOB_OVERTIME';      -- 调度作业数据库进程不存在运行判定时间    /*SELECT T.ETL_Para_VAL     INTO INT_JOB_DEAD_TIME     FROM ETL_CTL_Para T    WHERE UPPER(T.ETL_Para_name) = 'ETL_JOB_DEAD_TIME';*/      -- 获取作业运行超时描述    SELECT T.ETL_Para_VAL      INTO VAR_JOB_OVERTIME_DESC      FROM ETL_CTL_Para T     WHERE UPPER(T.ETL_Para_name) = 'ETL_JOB_OVERTIME_DESC';      FOR LOOP_JOB IN (SELECT T.ETL_JOB_ID,A.ETL_OVERTIME_REM_WAY                            --,T.ETL_SESSION_ID,(SYSDATE - T.ETL_START_TIME) RUNTIME                       FROM ETL_JOB_RUN_STS T,ETL_CTL_JOB_INFO A                      WHERE T.ETL_JOB_ID = A.ETL_JOB_ID                        AND T.ETL_JOB_RUN_STATUS = 1) LOOP      -- 作业为正在运行,但数据库进程已经不存在(10分钟)的作业置为运行失败      /*IF FN_GET_SESSION_STATUS(LOOP_JOB.ETL_SESSION_ID) = 0         AND LOOP_JOB.RUNTIME * 24 * 60 >= INT_JOB_DEAD_TIME      THEN        SP_JOB_RUN_STATUS(LOOP_JOB.ETL_JOB_ID,'',VAR_JOB_NO_SESSION);        -- 超时提醒方式为超过上次运行时间      ELS*/      IF LOOP_JOB.ETL_OVERTIME_REM_WAY = 1         AND INT_JOB_OVERTIME > 0      THEN        BEGIN          SELECT RUNTIME            INTO INT_LAST_RUNTIME            FROM (SELECT T.ETL_LOGID,T.ETL_JOB_ID,(T.ETL_END_TIME - T.ETL_START_TIME) * 24 * 60 RUNTIME,ROW_NUMBER() OVER(ORDER BY T.ETL_LOGID DESC) ROW_NUM                    FROM ETL_JOB_RUN_LOG T                   WHERE T.ETL_JOB_ID = LOOP_JOB.ETL_JOB_ID                     AND T.ETL_JOB_RUN_STATUS = 9)           WHERE ROW_NUM = 1;          IF LOOP_JOB.RUNTIME - INT_LAST_RUNTIME > INT_JOB_OVERTIME          THEN            -- 修改作业状态为运行超时            SP_JOB_RUN_STATUS(LOOP_JOB.ETL_JOB_ID,4,VAR_JOB_OVERTIME_DESC);          END IF;        EXCEPTION          WHEN OTHERS THEN            NulL;        END;        -- 超时提醒方式为超过前三次的平均值      ELSIF LOOP_JOB.ETL_OVERTIME_REM_WAY = 2            AND INT_JOB_OVERTIME > 0      THEN        BEGIN          SELECT AVG(RUNTIME)            INTO INT_LAST_RUNTIME            FROM (SELECT T.ETL_LOGID,ROW_NUMBER() OVER(ORDER BY T.ETL_LOGID DESC) ROW_NUM                    FROM ETL_JOB_RUN_LOG T                   WHERE T.ETL_JOB_ID = LOOP_JOB.ETL_JOB_ID                     AND T.ETL_JOB_RUN_STATUS = 9)           WHERE ROW_NUM <= 3;          IF LOOP_JOB.RUNTIME - INT_LAST_RUNTIME > INT_JOB_OVERTIME          THEN            -- 修改作业状态为运行超时            SP_JOB_RUN_STATUS(LOOP_JOB.ETL_JOB_ID,VAR_JOB_OVERTIME_DESC);          END IF;        EXCEPTION          WHEN OTHERS THEN            NulL;        END;      END IF;    END LOOP;      FOR LOOP_FLOW IN (SELECT T.ETL_FLOW_ID,T.ETL_DATA_END_TIME                        FROM ETL_FLOW_RUN_STS T,ETL_CTL_JOB_FLOW A                       WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID                         AND T.ETL_FLOW_RUN_STATUS = 1                       ORDER BY A.ETL_FLOW_LEVEL DESC) LOOP      -- 下属作业运行成功将作业流状态置为成功      IF FN_GET_FLOW_RUN_STATUS(LOOP_FLOW.ETL_FLOW_ID) = 9      THEN        UPDATE ETL_FLOW_RUN_STS T           SET T.ETL_DATA_SUCC_TIME = LOOP_FLOW.ETL_DATA_END_TIME,T.ETL_END_TIME = SYSDATE,T.ETL_FLOW_RUN_STATUS = 9         WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID;        COMMIT;        SP_ETL_FLOW_LOG_INFO(LOOP_FLOW.ETL_FLOW_ID);        -- 下属作业运行失败将作业流置为失败      ELSIF FN_GET_FLOW_RUN_STATUS(LOOP_FLOW.ETL_FLOW_ID) = 2      THEN        UPDATE ETL_FLOW_RUN_STS T           SET T.ETL_END_TIME = SYSDATE,T.ETL_FLOW_RUN_STATUS = 2         WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID;        COMMIT;        SP_ETL_FLOW_LOG_INFO(LOOP_FLOW.ETL_FLOW_ID);      END IF;    END LOOP;  END;  /*******************************************************************    程序名   :SP_FLOW_RUN_ERROR_reset    创建人   : zhuyh    创建时间 : 2013/8/20    功能描述 : 失败任务重置,等待重新运行    修改人   :    修改时间 :    修改原因 :  *******************************************************************/  PROCEDURE SP_FLOW_RUN_ERROR_reset IS    VAR_MAX_reset_TIME NUMBER;  BEGIN    -- 获取最大任务重置次数    SELECT T.ETL_Para_VAL      INTO VAR_MAX_reset_TIME      FROM ETL_CTL_Para T     WHERE UPPER(T.ETL_Para_name) = 'ETL_MAX_reset_TIME';      FOR LOOP_FLOW IN (SELECT T.ETL_FLOW_ID,T.ETL_reset_TIME                        FROM ETL_FLOW_RUN_STS T,ETL_CTL_JOB_FLOW A                       WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID                         AND A.ETL_FLOW_STATUS = 1                         AND T.ETL_FLOW_RUN_STATUS = 2                       ORDER BY A.ETL_FLOW_LEVEL) LOOP      IF LOOP_FLOW.ETL_FLOW_LEVEL = 1         AND (VAR_MAX_reset_TIME = 0 OR LOOP_FLOW.ETL_reset_TIME < VAR_MAX_reset_TIME)      THEN        UPDATE ETL_FLOW_RUN_STS T           SET /*T.ETL_START_TIME = NulL,*/ T.ETL_END_TIME = NulL,T.ETL_FLOW_RUN_STATUS = 3,T.ETL_reset_TIME = T.ETL_reset_TIME + 1         WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID;        COMMIT;      ELSIF LOOP_FLOW.ETL_FLOW_LEVEL > 1            AND FN_GET_SUPER_FLOW_RUN_STATUS(LOOP_FLOW.ETL_FLOW_ID) = 3      THEN        UPDATE ETL_FLOW_RUN_STS T           SET /*T.ETL_START_TIME = NulL,T.ETL_FLOW_RUN_STATUS = 3         WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID;        COMMIT;      END IF;    END LOOP;      FOR LOOP_JOB IN (SELECT A.ETL_JOB_ID,A.ETL_JOB_RUN_STATUS                       FROM ETL_FLOW_RUN_STS T,ETL_CTL_JOB_INFO B                      WHERE T.ETL_FLOW_ID = B.ETL_FLOW_ID                        AND A.ETL_JOB_ID = B.ETL_JOB_ID                        AND B.ETL_JOB_STATUS = 1) LOOP      -- 将运行失败的作业置为重新运行,等待重新运行      IF LOOP_JOB.ETL_FLOW_RUN_STATUS = 3         AND LOOP_JOB.ETL_JOB_RUN_STATUS = 2      THEN        UPDATE ETL_JOB_RUN_STS T           SET /*T.ETL_START_TIME = NulL,T.ETL_JOB_RUN_STATUS = 3,T.ETL_LOG_DESC = NulL         WHERE T.ETL_JOB_ID = LOOP_JOB.ETL_JOB_ID;      END IF;    END LOOP;    COMMIT;  END;  /*******************************************************************    程序名   :SP_JOB_RUN_STATUS    创建人   : zhuyh    创建时间 : 2013/8/20    功能描述 : 作业运行状态处理    修改人   : zhuyh    修改时间 :2013/9/30    修改原因 : 进程ID由记录数据库进程改为记录 *** 作系统进程  *******************************************************************/  PROCEDURE SP_JOB_RUN_STATUS  (    I_JOB_ID NUMBER,I_JOB_RUN_INFO VARCHAR2  ) IS    -- VAR_SESSION_ID    VARCHAR2(10);    DTE_DATA_END_TIME DATE;    VAR_JOB_SUCC_DESC VARCHAR2(100);    VAR_JOB_ERR_DESC  VARCHAR2(100);    BEGIN    -- 获取作业运行成功描述    SELECT T.ETL_Para_VAL      INTO VAR_JOB_SUCC_DESC      FROM ETL_CTL_Para T     WHERE UPPER(T.ETL_Para_name) = 'ETL_JOB_SUCC_DESC';      -- 获取作业运行失败描述    SELECT T.ETL_Para_VAL INTO VAR_JOB_ERR_DESC FROM ETL_CTL_Para T WHERE UPPER(T.ETL_Para_name) = 'ETL_JOB_ERR_DESC';      SELECT ETL_DATA_END_TIME INTO DTE_DATA_END_TIME FROM ETL_JOB_RUN_STS T WHERE T.ETL_JOB_ID = I_JOB_ID;      -- 正在运行    IF I_JOB_RUN_STATUS = 1    THEN      -- del by zhuyh 2013/9/30 进程ID由记录数据库进程改为记录 *** 作系统进程      -- 获取数据库进程      -- SELECT SYS_CONTEXT('USERENV','SID') INTO VAR_SESSION_ID FROM DUAL;      UPDATE ETL_JOB_RUN_STS T         SET /*T.ETL_START_TIME = SYSDATE,*/ T.ETL_DATA_ORG_ID = I_ORG_ID,T.ETL_JOB_RUN_STATUS = I_JOB_RUN_STATUS             -- del by zhuyh 2013/9/30 进程ID由记录数据库进程改为记录 *** 作系统进程             --,T.ETL_SESSION_ID = VAR_SESSION_ID,T.ETL_LOG_DESC = I_JOB_RUN_INFO       WHERE T.ETL_JOB_ID = I_JOB_ID;      COMMIT;      -- 运行失败    ELSIF I_JOB_RUN_STATUS = 2    THEN      UPDATE ETL_JOB_RUN_STS T         SET T.ETL_END_TIME = SYSDATE,T.ETL_JOB_RUN_STATUS = I_JOB_RUN_STATUS,T.ETL_LOG_DESC = VAR_JOB_ERR_DESC || I_JOB_RUN_INFO       WHERE T.ETL_JOB_ID = I_JOB_ID;      COMMIT;      -- 写失败日志      SP_ETL_JOB_LOG_INFO(I_JOB_ID);      -- 运行成功    ELSIF I_JOB_RUN_STATUS = 9    THEN      UPDATE ETL_JOB_RUN_STS T         SET T.ETL_DATA_SUCC_TIME = DTE_DATA_END_TIME,T.ETL_LOG_DESC = VAR_JOB_SUCC_DESC || I_JOB_RUN_INFO       WHERE T.ETL_JOB_ID = I_JOB_ID;      COMMIT;      -- 写失败日志      SP_ETL_JOB_LOG_INFO(I_JOB_ID);    END IF;  END;  /*******************************************************************    程序名   :SP_ETL_FLOW_LOG_INFO    创建人   : zhuyh    创建时间 : 2013/8/20    功能描述 : 记录作业流运行日志    修改人   :    修改时间 :    修改原因 :  *******************************************************************/  PROCEDURE SP_ETL_FLOW_LOG_INFO(I_FLOW_ID NUMBER) IS    DTE_DATA_START_TIME DATE;    DTE_DATA_END_TIME   DATE;    INT_COUNT           NUMBER;    VAR_ETL_LOGID       VARCHAR2(30); -- 日志序号在每个数据周期内排序  BEGIN    SELECT T.ETL_DATA_START_TIME,T.ETL_DATA_END_TIME      INTO DTE_DATA_START_TIME,DTE_DATA_END_TIME      FROM ETL_FLOW_RUN_STS T     WHERE T.ETL_FLOW_ID = I_FLOW_ID;      -- 检查该数据周期有没有运行过    SELECT COUNT(*)      INTO INT_COUNT      FROM ETL_FLOW_RUN_LOG T     WHERE T.ETL_FLOW_ID = I_FLOW_ID       AND T.ETL_DATA_START_TIME = DTE_DATA_START_TIME       AND T.ETL_DATA_END_TIME = DTE_DATA_END_TIME;      -- 未运行过的使用数据开始时间从新编号    IF INT_COUNT = 0    THEN      VAR_ETL_LOGID := TO_CHAR(DTE_DATA_START_TIME,'YYYYMMDDHH24MISS') ||                       TO_CHAR(DTE_DATA_END_TIME,'YYYYMMDDHH24MISS') || '01';    ELSE      -- 运行过的用最大编号加1      SELECT MAX(ETL_LOGID)        INTO VAR_ETL_LOGID        FROM ETL_FLOW_RUN_LOG T       WHERE T.ETL_FLOW_ID = I_FLOW_ID         AND T.ETL_DATA_START_TIME = DTE_DATA_START_TIME         AND T.ETL_DATA_END_TIME = DTE_DATA_END_TIME;          VAR_ETL_LOGID := VAR_ETL_LOGID + 1;    END IF;      -- 记日志表    INSERT INTO ETL_FLOW_RUN_LOG      (ETL_LOGID,ETL_FLOW_ID,ETL_DATA_START_TIME,ETL_DATA_END_TIME,ETL_START_TIME,ETL_END_TIME,ETL_FLOW_RUN_STATUS,ETL_LOG_DESC)      SELECT VAR_ETL_LOGID,T.ETL_FLOW_ID,T.ETL_START_TIME,T.ETL_END_TIME,T.ETL_LOG_DESC        FROM ETL_FLOW_RUN_STS T       WHERE T.ETL_FLOW_ID = I_FLOW_ID;    COMMIT;  END;  /*******************************************************************    程序名   :SP_ETL_JOB_LOG_INFO    创建人   : zhuyh    创建时间 : 2013/8/20    功能描述 : 记录作业运行日志    修改人   :    修改时间 :    修改原因 :  *******************************************************************/  PROCEDURE SP_ETL_JOB_LOG_INFO(I_JOB_ID NUMBER) IS    DTE_DATA_START_TIME DATE;    DTE_DATA_END_TIME   DATE;    INT_COUNT           NUMBER;    VAR_ETL_LOGID       VARCHAR2(30); -- 日志序号在每个数据周期内排序  BEGIN    SELECT T.ETL_DATA_START_TIME,DTE_DATA_END_TIME      FROM ETL_JOB_RUN_STS T     WHERE T.ETL_JOB_ID = I_JOB_ID;      -- 检测该数据周期任务有没有运行过    SELECT COUNT(*)      INTO INT_COUNT      FROM ETL_JOB_RUN_LOG T     WHERE T.ETL_JOB_ID = I_JOB_ID       AND T.ETL_DATA_START_TIME = DTE_DATA_START_TIME       AND T.ETL_DATA_END_TIME = DTE_DATA_END_TIME;      -- 未运行过的作业使用数据结束时间重新编号    IF INT_COUNT = 0    THEN      VAR_ETL_LOGID := TO_CHAR(DTE_DATA_START_TIME,'YYYYMMDDHH24MISS') || '01';    ELSE      -- 运行过的作业用最大编号加1      SELECT MAX(ETL_LOGID)        INTO VAR_ETL_LOGID        FROM ETL_JOB_RUN_LOG T       WHERE T.ETL_JOB_ID = I_JOB_ID         AND T.ETL_DATA_START_TIME = DTE_DATA_START_TIME         AND T.ETL_DATA_END_TIME = DTE_DATA_END_TIME;          VAR_ETL_LOGID := VAR_ETL_LOGID + 1;    END IF;      -- 记日志表    INSERT INTO ETL_JOB_RUN_LOG      (ETL_LOGID,ETL_JOB_ID,ETL_DATA_ORG_ID,ETL_JOB_RUN_STATUS,T.ETL_DATA_ORG_ID,T.ETL_JOB_RUN_STATUS,T.ETL_LOG_DESC        FROM ETL_JOB_RUN_STS T       WHERE T.ETL_JOB_ID = I_JOB_ID;    COMMIT;  END;  /*******************************************************************    程序名   :SP_INSERT_JOB_FAIL_SMS    创建人   : zhuyh    创建时间 : 2013/8/20    功能描述 : 生成失败作业短信    修改人   :    修改时间 :    修改原因 :  *******************************************************************/  PROCEDURE SP_INSERT_MONITOR_SMS  (    O_RESulT_FLAG OUT VARCHAR2 /*过程执行结果返回给调度 9 成功 2 失败*/,O_RESulT_MSG OUT VARCHAR2 /*过程执行结果信息返回给调度*/  ) IS    V_DTE_RUN_BEGIN_DT DATE; /*程序每一步骤运行开始*/    V_DTE_RUN_END_DT   DATE; /*程序每一步骤运行结束时间*/    V_INT_STEP         NUMBER := 0; /*程序执行步骤*/    /*步骤描述信息*/    V_VAR_STEP_DESC VARCHAR2(1000);    /*步骤所执行的DML类型*/    V_VAR_STEP_DML_TYPE VARCHAR2(10);    /*受影响行数*/    V_INT_ROW_CNT INTEGER := 0;    /*过程名称*/    V_VAR_PROC_name VARCHAR2(70) := 'PKG_SEND_SMS.SP_INSERT_SMS';    BEGIN      V_INT_STEP          := V_INT_STEP + 1; /*第一步骤*/    V_VAR_STEP_DESC     := V_INT_STEP || '.0:作业运行失败发送短信给运营人员 ';    V_VAR_STEP_DML_TYPE := 'INSERT'; /* *** 作类型*/      /*DML开始运行时间*/    V_DTE_RUN_BEGIN_DT := SYSDATE;      /*执行相应的SQL语句*/    FOR LOOP_JOB IN (SELECT C.MOBLIE_PHONE,T.ETL_LOGID,A.ETL_JOB_ID,A.ETL_JOB_name,A.ETL_JOB_DESC,T.ETL_LOG_DESC                       FROM ETL_JOB_RUN_LOG T,ETL_CTL_JOB_INFO A,ETL_SEND_SMS_List C                      WHERE T.ETL_JOB_ID = A.ETL_JOB_ID                        AND T.ETL_JOB_RUN_STATUS = 2                        AND T.ETL_SEND_FLAG = 0                        AND C.SEND_MONITOR_SMS = 1) LOOP      INSERT INTO ETL_SEND_SMS_LOG        (ETL_LOGID,MOBLIE_PHONE,SMS_CONTENT,SMS_LEVEL,SEND_TIME,ETL_DATE)      VALUES        (LOOP_JOB.ETL_LOGID,LOOP_JOB.ETL_JOB_ID,LOOP_JOB.MOBLIE_PHONE,'作业ID号[' || LOOP_JOB.ETL_JOB_ID || '],作业名称[' || LOOP_JOB.ETL_JOB_name || '],作业描述[' || LOOP_JOB.ETL_JOB_DESC ||         '],数据周期[' || TO_CHAR(LOOP_JOB.ETL_DATA_START_TIME,'YYYYMMDDHH24MISS') || '-' ||         TO_CHAR(LOOP_JOB.ETL_DATA_END_TIME,'YYYYMMDDHH24MISS') || '],机构号[' || LOOP_JOB.ETL_DATA_ORG_ID || '],运行描述[' ||         LOOP_JOB.ETL_LOG_DESC || ']',1,Trunc(SYSDATE),SYSDATE);      /*获取受影响行数*/      V_INT_ROW_CNT := V_INT_ROW_CNT + 1;      UPDATE ETL_JOB_RUN_LOG T         SET T.ETL_SEND_FLAG = 1       WHERE T.ETL_JOB_ID = LOOP_JOB.ETL_JOB_ID         AND T.ETL_LOGID = LOOP_JOB.ETL_LOGID;      COMMIT; ---提交DML *** 作    END LOOP;    /*DML运行结束时间*/    V_DTE_RUN_END_DT := SYSDATE;      /*记录成功的日志信息*/    IF V_INT_ROW_CNT > 0    THEN      PKG_PUBliC.SP_ETL_LOAD_DML_LOG(V_INT_STEP,SYSDATE /*数据开始日期*/,SYSDATE /*数据结束日期*/,1 /*机构代码*/,V_VAR_PROC_name /*存储过程名称*/,V_VAR_STEP_DESC /* *** 作步骤描述*/,V_VAR_STEP_DML_TYPE /* *** 作类型*/,V_INT_ROW_CNT /*受影响行数*/,1 /*执行结果*/,V_DTE_RUN_BEGIN_DT /*运行开始时间*/,V_DTE_RUN_END_DT /*运行结束时间*/,'' /*运行结果详细信息*/);    END IF;    /*整个过程执行成功*/    O_RESulT_FLAG := 9;    /*整个过程运行结果描述信息*/    O_RESulT_MSG := '';      /*异常处理部分*/  EXCEPTION    WHEN OTHERS THEN      ---回滚DML *** 作      RolLBACK;      O_RESulT_FLAG := 2; ----失败      O_RESulT_MSG  := sqlERRM;      ---记录异常日志信息      PKG_PUBliC.SP_ETL_LOAD_DML_LOG(V_INT_STEP,V_VAR_STEP_DML_TYPE /* *** 作步骤类型*/,V_INT_ROW_CNT /*返回的受影响行数*/,0 /*运行结果 0 失败; 1 成功*/,sqlERRM /*运行结果详细信息*/);  END;  /*******************************************************************    程序名   :SP_SEND_JOB_FAIL_SMS    创建人   : zhuyh    创建时间 : 2013/8/20    功能描述 : 发送失败作业短信    修改人   :    修改时间 :    修改原因 :  *******************************************************************/  PROCEDURE SP_SEND_MONITOR_SMS IS    V_VAR_RESulT_FLAG   CHAR(1);    V_VAR_RESulT_MSG    CHAR(300);    V_VAR_RETURN_STATUS INT;      CURSOR C_DATA IS      SELECT T.ETL_LOGID,T.MOBLIE_PHONE,T.SMS_CONTENT,T.SMS_LEVEL,T.SEND_TIME        FROM ETL_SEND_SMS_LOG T       WHERE T.SEND_STATUS = 0;    V_USER_CODE VARCHAR2(100) DEFAulT 'MIS';    V_PASSWORD  VARCHAR2(100) DEFAulT 'MIS#2013';  BEGIN      SP_INSERT_MONITOR_SMS(V_VAR_RESulT_FLAG,V_VAR_RESulT_MSG);      IF V_VAR_RESulT_FLAG = 9    THEN          FOR CC_DATA IN C_DATA LOOP              PKG_SMS_INTERFACE.SEND_SMS(V_USER_CODE,V_PASSWORD,CC_DATA.MOBLIE_PHONE,CC_DATA.SMS_CONTENT,CC_DATA.SMS_LEVEL,CC_DATA.SEND_TIME,V_VAR_RETURN_STATUS);        --发送成功,更新标志        IF V_VAR_RETURN_STATUS > 0        THEN          UPDATE ETL_SEND_SMS_LOG T             SET T.SEND_STATUS = '1',T.RECEIVE_STATUS = V_VAR_RETURN_STATUS           WHERE T.ETL_LOGID = CC_DATA.ETL_LOGID             AND T.ETL_JOB_ID = CC_DATA.ETL_JOB_ID             AND T.MOBLIE_PHONE = CC_DATA.MOBLIE_PHONE;        END IF;      END LOOP;    END IF;    COMMIT;  END;  /*******************************************************************    程序名   :FN_GET_FLOW_DEPEND    创建人   : zhuyh    创建时间 : 2013/8/20    功能描述 : 获取作业流前置依赖    修改人   :    修改时间 :    修改原因 :  *******************************************************************/  FUNCTION FN_GET_FLOW_DEPEND(I_FLOW_ID NUMBER) RETURN VARCHAR2 IS    VAR_DEP_STS VARCHAR2(100) := 1;    BEGIN    FOR LOOP_DEP IN (SELECT A.ETL_DATA_END_TIME,NVL(B.ETL_DATA_SUCC_TIME,DATE '1900-1-1') ETL_DEP_SUCC_TIME,PKG_ETL_CTL.FN_GET_CYC_CODE(T.ETL_FLOW_ID) ETL_CYC_CODE,PKG_ETL_CTL.FN_GET_CYC_CODE(T.ETL_DEPD_FLOW_ID) DEP_CYC_CODE                       FROM ETL_CTL_FLOW_DEPD T,ETL_FLOW_RUN_STS A,ETL_FLOW_RUN_STS B                      WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID                        AND T.ETL_DEPD_FLOW_ID = B.ETL_FLOW_ID                        AND T.ETL_FLOW_ID = I_FLOW_ID) LOOP      -- 数据结束时间大于所依赖的数据成功时间      IF (LOOP_DEP.ETL_DATA_END_TIME > LOOP_DEP.ETL_DEP_SUCC_TIME AND LOOP_DEP.ETL_CYC_CODE = LOOP_DEP.DEP_CYC_CODE)         OR (LOOP_DEP.ETL_DATA_END_TIME >= Trunc(LOOP_DEP.ETL_DEP_SUCC_TIME) AND LOOP_DEP.ETL_CYC_CODE <> '02' AND         LOOP_DEP.DEP_CYC_CODE = '02')         OR (LOOP_DEP.ETL_DATA_END_TIME > LOOP_DEP.ETL_DEP_SUCC_TIME AND LOOP_DEP.DEP_CYC_CODE <> '02')      THEN        VAR_DEP_STS := 0;        RETURN VAR_DEP_STS;      END IF;    END LOOP;    RETURN VAR_DEP_STS;  EXCEPTION    WHEN OTHERS THEN      RETURN NulL;  END;  /*******************************************************************    程序名   :FN_GET_JOB_DEPEND    创建人   : zhuyh    创建时间 : 2013/8/20    功能描述 : 获取作业前置依赖    修改人   :    修改时间 :    修改原因 :  *******************************************************************/  FUNCTION FN_GET_JOB_DEPEND(I_JOB_ID NUMBER) RETURN VARCHAR2 IS    VAR_DEP_STS VARCHAR2(100) := 1;    BEGIN    FOR LOOP_DEP IN (SELECT A.ETL_DATA_END_TIME,DATE '1900-1-1') ETL_DEP_SUCC_TIME                       FROM ETL_CTL_JOB_DEPD T,ETL_JOB_RUN_STS B                      WHERE T.ETL_JOB_ID = A.ETL_JOB_ID                        AND T.ETL_DEPD_JOB_ID = B.ETL_JOB_ID                        AND T.ETL_JOB_ID = I_JOB_ID) LOOP      -- 数据结束时间大于所依赖的数据成功时间      IF LOOP_DEP.ETL_DATA_END_TIME > LOOP_DEP.ETL_DEP_SUCC_TIME      THEN        VAR_DEP_STS := 0;        RETURN VAR_DEP_STS;      END IF;    END LOOP;    RETURN VAR_DEP_STS;  END;  /*******************************************************************    程序名   :FN_GET_FLOW_RUN_STATUS    创建人   : zhuyh    创建时间 : 2013/8/20    功能描述 : 获取作业流运行状态    修改人   :    修改时间 :    修改原因 :  *******************************************************************/  FUNCTION FN_GET_FLOW_RUN_STATUS(I_FLOW_ID NUMBER) RETURN VARCHAR2 IS    VAR_FLOW_RUN_STATUS VARCHAR2(100) := 1;    VAR_CHILD_FLAG      CHAR(1);    INT_STS_NOT_9       NUMBER;    INT_STS_0_3         NUMBER;    INT_STS_1_4         NUMBER;    INT_STS_2           NUMBER;  BEGIN    SELECT T.ETL_CHILD_FLAG INTO VAR_CHILD_FLAG FROM ETL_CTL_JOB_FLOW T WHERE T.ETL_FLOW_ID = I_FLOW_ID;      -- 不成功的数量    IF VAR_CHILD_FLAG = 0    THEN      SELECT COUNT(*)        INTO INT_STS_NOT_9        FROM ETL_FLOW_RUN_STS T,ETL_CTL_JOB_FLOW A       WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID         AND A.ETL_FLOW_STATUS = 1         AND A.ETL_SUPER_FLOW_ID = I_FLOW_ID         AND ETL_FLOW_RUN_STATUS <> 9;          -- 正在运行的数量      SELECT COUNT(*)        INTO INT_STS_1_4        FROM ETL_FLOW_RUN_STS T,ETL_CTL_JOB_FLOW A       WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID         AND A.ETL_FLOW_STATUS = 1         AND A.ETL_SUPER_FLOW_ID = I_FLOW_ID         AND ETL_FLOW_RUN_STATUS IN (1,4);          -- 满足条件但未运行的数量      SELECT COUNT(*)        INTO INT_STS_0_3        FROM ETL_FLOW_RUN_STS T,ETL_CTL_JOB_FLOW A       WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID         AND A.ETL_FLOW_STATUS = 1         AND A.ETL_SUPER_FLOW_ID = I_FLOW_ID         AND ETL_FLOW_RUN_STATUS IN (0,3)         AND FN_GET_FLOW_DEPEND(T.ETL_FLOW_ID) = 1;          -- 运行失败的数量      SELECT COUNT(*)        INTO INT_STS_2        FROM ETL_FLOW_RUN_STS T,ETL_CTL_JOB_FLOW A       WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID         AND A.ETL_FLOW_STATUS = 1         AND A.ETL_SUPER_FLOW_ID = I_FLOW_ID         AND ETL_FLOW_RUN_STATUS = 2;    ELSIF VAR_CHILD_FLAG = 1    THEN      SELECT COUNT(*)        INTO INT_STS_NOT_9        FROM ETL_JOB_RUN_STS T,ETL_CTL_JOB_INFO A       WHERE T.ETL_JOB_ID = A.ETL_JOB_ID         AND A.ETL_JOB_STATUS = 1         AND A.ETL_FLOW_ID = I_FLOW_ID         AND ETL_JOB_RUN_STATUS <> 9;          -- 正在运行的数量      SELECT COUNT(*)        INTO INT_STS_1_4        FROM ETL_JOB_RUN_STS T,ETL_CTL_JOB_INFO A       WHERE T.ETL_JOB_ID = A.ETL_JOB_ID         AND A.ETL_JOB_STATUS = 1         AND A.ETL_FLOW_ID = I_FLOW_ID         AND ETL_JOB_RUN_STATUS IN (1,4);          -- 满足条件但未运行的数量      SELECT COUNT(*)        INTO INT_STS_0_3        FROM ETL_JOB_RUN_STS T,ETL_CTL_JOB_INFO A       WHERE T.ETL_JOB_ID = A.ETL_JOB_ID         AND A.ETL_JOB_STATUS = 1         AND A.ETL_FLOW_ID = I_FLOW_ID         AND ETL_JOB_RUN_STATUS IN (0,3)         AND FN_GET_FLOW_DEPEND(T.ETL_JOB_ID) = 1;          -- 运行失败的数量      SELECT COUNT(*)        INTO INT_STS_2        FROM ETL_JOB_RUN_STS T,ETL_CTL_JOB_INFO A       WHERE T.ETL_JOB_ID = A.ETL_JOB_ID         AND A.ETL_JOB_STATUS = 1         AND A.ETL_FLOW_ID = I_FLOW_ID         AND ETL_JOB_RUN_STATUS = 2;    END IF;      -- 不成功的数量为0,则全部成功    IF INT_STS_NOT_9 = 0    THEN      VAR_FLOW_RUN_STATUS := 9;      -- 不成功的不为0,正在运行的为0,运行失败的数量大于0    ELSIF INT_STS_1_4 = 0          AND INT_STS_0_3 = 0          AND INT_STS_2 > 0    THEN      VAR_FLOW_RUN_STATUS := 2;    END IF;      RETURN VAR_FLOW_RUN_STATUS;  END;  /*******************************************************************    程序名   :FN_GET_NEXT_DATA_TIME    创建人   : zhuyh    创建时间 : 2013/8/20    功能描述 : 获取下个数据时间    修改人   :    修改时间 :    修改原因 :  *******************************************************************/  FUNCTION FN_GET_NEXT_DATA_TIME(I_FLOW_ID NUMBER) RETURN DATE IS    VAR_CTL_CYC_CODE   VARCHAR2(2);    VAR_sql            VARCHAR2(1000);    VAR_FREQ_TIME      VARCHAR2(100);    VAR_SRC_DB         VARCHAR2(100);    DTE_SRC_SYS_TIME   DATE;    DTE_NEXT_DATA_TIME DATE;    DTE_DATA_SUCC_TIME DATE;    BEGIN    -- 获取运行周期、源数据库,下次运行时间、数据成功时间    SELECT T.ETL_CYC_CODE,T.ETL_SRC_DB,NVL(A.ETL_NEXT_DATA_TIME,A.ETL_DATA_END_TIME),NVL(A.ETL_DATA_SUCC_TIME,A.ETL_DATA_END_TIME)      INTO VAR_CTL_CYC_CODE,VAR_SRC_DB,DTE_NEXT_DATA_TIME,DTE_DATA_SUCC_TIME      FROM ETL_CTL_JOB_FLOW T,ETL_FLOW_RUN_STS A     WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID       AND T.ETL_FLOW_ID = I_FLOW_ID;      -- 非本数据库    VAR_SRC_DB := VAR_SRC_DB || 'dual';      -- 每天运行的作业流    IF VAR_CTL_CYC_CODE = '01'    THEN      VAR_FREQ_TIME := 1;      VAR_sql       := 'SELECT Trunc(SYSDATE) FROM ' || VAR_SRC_DB;      EXECUTE IMMEDIATE VAR_sql        INTO DTE_SRC_SYS_TIME;      DTE_NEXT_DATA_TIME := DTE_DATA_SUCC_TIME + VAR_FREQ_TIME;      IF DTE_NEXT_DATA_TIME < DTE_SRC_SYS_TIME      THEN        RETURN DTE_NEXT_DATA_TIME;      ELSE        RETURN DTE_DATA_SUCC_TIME;      END IF;      -- 每十分钟运行的作业流    ELSIF VAR_CTL_CYC_CODE = '02'    THEN      VAR_FREQ_TIME := 1 / 24 / 6;      VAR_sql       := 'SELECT SYSDATE - 1 / 24 / 6 FROM ' || VAR_SRC_DB;      EXECUTE IMMEDIATE VAR_sql        INTO DTE_SRC_SYS_TIME;          WHILE DTE_NEXT_DATA_TIME < DTE_SRC_SYS_TIME LOOP        DTE_NEXT_DATA_TIME := DTE_NEXT_DATA_TIME + VAR_FREQ_TIME;      END LOOP;      RETURN DTE_NEXT_DATA_TIME;        ELSIF VAR_CTL_CYC_CODE = '03'    THEN      VAR_FREQ_TIME := 1;      VAR_sql       := 'SELECT Trunc(SYSDATE) FROM ' || VAR_SRC_DB;      EXECUTE IMMEDIATE VAR_sql        INTO DTE_SRC_SYS_TIME;      DTE_NEXT_DATA_TIME := ADD_MONTHS(DTE_DATA_SUCC_TIME,VAR_FREQ_TIME);      IF DTE_NEXT_DATA_TIME < DTE_SRC_SYS_TIME      THEN        RETURN DTE_NEXT_DATA_TIME;      ELSE        RETURN DTE_DATA_SUCC_TIME;      END IF;    END IF;  END;  /*******************************************************************    程序名   :FN_GET_SUPER_FLOW_RUN_STATUS    创建人   : zhuyh    创建时间 : 2013/8/20    功能描述 : 获取上级作业流运行状态    修改人   :    修改时间 :    修改原因 :  *******************************************************************/  FUNCTION FN_GET_SUPER_FLOW_RUN_STATUS(I_FLOW_ID NUMBER) RETURN VARCHAR2 IS    VAR_SUPER_FLOW_RUN_STATUS VARCHAR2(10);  BEGIN    SELECT ETL_FLOW_RUN_STATUS      INTO VAR_SUPER_FLOW_RUN_STATUS      FROM ETL_CTL_JOB_FLOW T,ETL_FLOW_RUN_STS A     WHERE T.ETL_SUPER_FLOW_ID = A.ETL_FLOW_ID       AND T.ETL_FLOW_ID = I_FLOW_ID;    RETURN VAR_SUPER_FLOW_RUN_STATUS;  END;  /*******************************************************************    程序名   :FN_GET_SUPER_DATA_START_TIME    创建人   : zhuyh    创建时间 : 2013/8/20    功能描述 : 获取上级作业流数据开始时间    修改人   :    修改时间 :    修改原因 :  *******************************************************************/  FUNCTION FN_GET_SUPER_DATA_START_TIME(I_FLOW_ID NUMBER) RETURN DATE IS    VAR_SUPER_FLOW_DATA_START_TIME DATE;  BEGIN    SELECT A.ETL_DATA_START_TIME      INTO VAR_SUPER_FLOW_DATA_START_TIME      FROM ETL_CTL_JOB_FLOW T,ETL_FLOW_RUN_STS A     WHERE T.ETL_SUPER_FLOW_ID = A.ETL_FLOW_ID       AND T.ETL_FLOW_ID = I_FLOW_ID;    RETURN VAR_SUPER_FLOW_DATA_START_TIME;  END;  /*******************************************************************    程序名   :FN_GET_SUPER_DATA_END_TIME    创建人   : zhuyh    创建时间 : 2013/8/20    功能描述 : 获取上级作业流数据结束时间    修改人   :    修改时间 :    修改原因 :  *******************************************************************/  FUNCTION FN_GET_SUPER_DATA_END_TIME(I_FLOW_ID NUMBER) RETURN DATE IS    VAR_SUPER_FLOW_DATA_END_TIME DATE;  BEGIN    SELECT A.ETL_DATA_END_TIME      INTO VAR_SUPER_FLOW_DATA_END_TIME      FROM ETL_CTL_JOB_FLOW T,ETL_FLOW_RUN_STS A     WHERE T.ETL_SUPER_FLOW_ID = A.ETL_FLOW_ID       AND T.ETL_FLOW_ID = I_FLOW_ID;    RETURN VAR_SUPER_FLOW_DATA_END_TIME;  END;  /*******************************************************************    程序名   :FN_GET_CYC_CODE    创建人   : zhuyh    创建时间 : 2013/8/26    功能描述 : 获取周期代码    修改人   :    修改时间 :    修改原因 :  *******************************************************************/  FUNCTION FN_GET_CYC_CODE(I_FLOW_ID VARCHAR2) RETURN VARCHAR2 IS    VAR_CYC_CODE VARCHAR2(100);  BEGIN    SELECT T.ETL_CYC_CODE INTO VAR_CYC_CODE FROM ETL_CTL_JOB_FLOW T WHERE T.ETL_FLOW_ID = I_FLOW_ID;    RETURN VAR_CYC_CODE;  END;END PKG_ETL_CTL;/

以上是内存溢出(jb51.cc)为你收集整理的全部代码内容,希望文章能够帮你解决所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

总结

以上是内存溢出为你收集整理的ETL调度系统全部内容,希望文章能够帮你解决ETL调度系统所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/sjk/1164543.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-01
下一篇 2022-06-01

发表评论

登录后才能评论

评论列表(0条)

保存