Java小强个人技术博客站点    手机版
当前位置: 首页 >> 开源 >> 分布式任务调度平台XXL-JOB:调度报告生成报错

分布式任务调度平台XXL-JOB:调度报告生成报错

3040 开源 | 2022-8-16

Quartz作为开源作业调度中的佼佼者,是作业调度的首选。但是集群环境中Quartz采用API的方式对任务进行管理,从而可以避免上述问题,但是同样存在以下问题:

问题一:调用API的的方式操作任务,不人性化;

问题二:需要持久化业务QuartzJobBean到底层数据表中,系统侵入性相当严重。

问题三:调度逻辑和QuartzJobBean耦合在同一个项目中,这将导致一个问题,在调度任务数量逐渐增多,同时调度任务逻辑逐渐加重的情况下,此时调度系统的性能将大大受限于业务;

问题四:quartz底层以“抢占式”获取DB锁并由抢占成功节点负责运行任务,会导致节点负载悬殊非常大;而XXL-JOB通过执行器实现“协同分配式”运行任务,充分发挥集群优势,负载各节点均衡。

XXL-JOB弥补了quartz的上述不足之处。

xxljob3.jpg


经其他开发人员反馈,在使用中一直会报错,查看日志如下:


06:28:22.377 logback [xxl-job, admin JobLogReportHelper] ERROR c.x.j.a.c.thread.JobLogReportHelper - >>>>>>>>>>> xxl-job, job log report thread error:{}
org.springframework.dao.DuplicateKeyException: 
### Error updating database.  Cause: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry '2022-08-16 08:00:00' for key 'xxl_job_log_report.i_trigger_day'
### The error may exist in class path resource [mybatis-mapper/XxlJobLogReportMapper.xml]
### The error may involve com.xxl.job.admin.dao.XxlJobLogReportDao.save-Inline
### The error occurred while setting parameters
### SQL: INSERT INTO xxl_job_log_report (    `trigger_day`,    `running_count`,    `suc_count`,    `fail_count`   ) VALUES (    ?,    ?,    ?,    ?   );
### Cause: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry '2022-08-16 08:00:00' for key 'xxl_job_log_report.i_trigger_day'
; Duplicate entry '2022-08-16 08:00:00' for key 'xxl_job_log_report.i_trigger_day'; nested exception is java.sql.SQLIntegrityConstraintViolationException: Duplicate entry '2022-08-16 08:00:00' for key 'xxl_job_log_report.i_trigger_day'
	at org.springframework.jdbc.support.SQLErrorCodeSQLExceptionTranslator.doTranslate(SQLErrorCodeSQLExceptionTranslator.java:247)
	at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:70)
	at org.mybatis.spring.MyBatisExceptionTranslator.translateExceptionIfPossible(MyBatisExceptionTranslator.java:91)
	at org.mybatis.spring.SqlSessionTemplate$SqlSessionInterceptor.invoke(SqlSessionTemplate.java:441)
	at com.sun.proxy.$Proxy75.insert(Unknown Source)
	at org.mybatis.spring.SqlSessionTemplate.insert(SqlSessionTemplate.java:272)
	at org.apache.ibatis.binding.MapperMethod.execute(MapperMethod.java:62)
	at org.apache.ibatis.binding.MapperProxy$PlainMethodInvoker.invoke(MapperProxy.java:152)
	at org.apache.ibatis.binding.MapperProxy.invoke(MapperProxy.java:85)
	at com.sun.proxy.$Proxy80.save(Unknown Source)
	at com.xxl.job.admin.core.thread.JobLogReportHelper$1.run(JobLogReportHelper.java:86)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry '2022-08-16 08:00:00' for key 'xxl_job_log_report.i_trigger_day'
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:117)
	at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:953)
	at com.mysql.cj.jdbc.ClientPreparedStatement.execute(ClientPreparedStatement.java:370)
	at com.alibaba.druid.pool.DruidPooledPreparedStatement.execute(DruidPooledPreparedStatement.java:497)
	at org.apache.ibatis.executor.statement.PreparedStatementHandler.update(PreparedStatementHandler.java:47)
	at org.apache.ibatis.executor.statement.RoutingStatementHandler.update(RoutingStatementHandler.java:74)
	at org.apache.ibatis.executor.SimpleExecutor.doUpdate(SimpleExecutor.java:50)
	at org.apache.ibatis.executor.BaseExecutor.update(BaseExecutor.java:117)
	at org.apache.ibatis.executor.CachingExecutor.update(CachingExecutor.java:76)
	at org.apache.ibatis.session.defaults.DefaultSqlSession.update(DefaultSqlSession.java:197)
	at org.apache.ibatis.session.defaults.DefaultSqlSession.insert(DefaultSqlSession.java:184)
	at sun.reflect.GeneratedMethodAccessor91.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.mybatis.spring.SqlSessionTemplate$SqlSessionInterceptor.invoke(SqlSessionTemplate.java:427)
	... 8 common frames omitted


查看这个表的结构


CREATE TABLE `xxl_job_log_report` (
  `id` int NOT NULL AUTO_INCREMENT,
  `trigger_day` datetime DEFAULT NULL COMMENT '调度-时间',
  `running_count` int NOT NULL DEFAULT '0' COMMENT '运行中-日志数量',
  `suc_count` int NOT NULL DEFAULT '0' COMMENT '执行成功-日志数量',
  `fail_count` int NOT NULL DEFAULT '0' COMMENT '执行失败-日志数量',
  `update_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE,
  UNIQUE KEY `i_trigger_day` (`trigger_day`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=170924 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci ROW_FORMAT=COMPACT;


查看的话,表中确实有这条数据了,但是为什么还是会录入呢?录入的地方,是一个独立线程,持续循环汇总日志报告,代码如下

public void start() {
    logrThread = new Thread(new Runnable() {
        @Override
        public void run() {
            // last clean log time
            long lastCleanLogTime = 0;
            while (!toStop) {
                // 1、log-report refresh: refresh log report in 3 days
                try {
                    for (int i = 0; i < 3; i++) {
                        // today
                        Calendar itemDay = Calendar.getInstance(TimeZone.getTimeZone(ZoneId.of("Asia/Shanghai")));
                        itemDay.add(Calendar.DAY_OF_MONTH, -i);
                        itemDay.set(Calendar.HOUR_OF_DAY, 0);
                        itemDay.set(Calendar.MINUTE, 0);
                        itemDay.set(Calendar.SECOND, 0);
                        itemDay.set(Calendar.MILLISECOND, 0);

                        Date todayFrom = itemDay.getTime();

                        itemDay.set(Calendar.HOUR_OF_DAY, 23);
                        itemDay.set(Calendar.MINUTE, 59);
                        itemDay.set(Calendar.SECOND, 59);
                        itemDay.set(Calendar.MILLISECOND, 999);

                        Date todayTo = itemDay.getTime();

                        // refresh log-report every minute
                        XxlJobLogReport xxlJobLogReport = new XxlJobLogReport();
                        xxlJobLogReport.setTriggerDay(todayFrom);
                        xxlJobLogReport.setRunningCount(0);
                        xxlJobLogReport.setSucCount(0);
                        xxlJobLogReport.setFailCount(0);

                        Map<String, Object> triggerCountMap = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLogReport(todayFrom, todayTo);
                        if (triggerCountMap != null && triggerCountMap.size() > 0) {
                            int triggerDayCount = triggerCountMap.containsKey("triggerDayCount") ? Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCount"))) : 0;
                            int triggerDayCountRunning = triggerCountMap.containsKey("triggerDayCountRunning") ? Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountRunning"))) : 0;
                            int triggerDayCountSuc = triggerCountMap.containsKey("triggerDayCountSuc") ? Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountSuc"))) : 0;
                            int triggerDayCountFail = triggerDayCount - triggerDayCountRunning - triggerDayCountSuc;

                            xxlJobLogReport.setRunningCount(triggerDayCountRunning);
                            xxlJobLogReport.setSucCount(triggerDayCountSuc);
                            xxlJobLogReport.setFailCount(triggerDayCountFail);
                        }
                        // do refresh
                        int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().update(xxlJobLogReport);
                        if (ret < 1) {
                            XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().save(xxlJobLogReport);
                        }
                    }
                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(">>>>>>>>>>> xxl-job, job log report thread error:{}", e);
                    }
                }

                // 2、log-clean: switch open & once each day
                if (XxlJobAdminConfig.getAdminConfig().getLogretentiondays() > 0
                        && System.currentTimeMillis() - lastCleanLogTime > 24 * 60 * 60 * 1000) {
                    // expire-time
                    Calendar expiredDay = Calendar.getInstance(TimeZone.getTimeZone(ZoneId.of("Asia/Shanghai")));
                    expiredDay.add(Calendar.DAY_OF_MONTH, -1 * XxlJobAdminConfig.getAdminConfig().getLogretentiondays());
                    expiredDay.set(Calendar.HOUR_OF_DAY, 0);
                    expiredDay.set(Calendar.MINUTE, 0);
                    expiredDay.set(Calendar.SECOND, 0);
                    expiredDay.set(Calendar.MILLISECOND, 0);
                    Date clearBeforeTime = expiredDay.getTime();

                    // clean expired log
                    List<Long> logIds = null;
                    do {
                        logIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findClearLogIds(0, 0, clearBeforeTime, 0, 1000);
                        if (logIds != null && logIds.size() > 0) {
                            XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().clearLog(logIds);
                        }
                    } while (logIds != null && logIds.size() > 0);
                    // update clean time
                    lastCleanLogTime = System.currentTimeMillis();
                }

                try {
                    TimeUnit.MINUTES.sleep(1);
                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
            logger.info(">>>>>>>>>>> xxl-job, job log report thread stop");

        }
    });
    logrThread.setDaemon(true);
    logrThread.setName("xxl-job, admin JobLogReportHelper");
    logrThread.start();
}


看到有一行代码,意思是更新这个表,如果没有,就新增,这个逻辑的前提是更新时每次能更新内容,如果没有更新内容,那么返回的就是0,这时不是数据没有,再新增时就会报错了。

因此,这个判断的逻辑是不严谨的,我们做个处理,先查下是否有该数据,查不到就新增,查到了就更新。


XxlJobLogReportMapper.xml中:

<select id="queryLogReportByTriggerDay" resultType="java.lang.Integer">
    SELECT COUNT(*) FROM xxl_job_log_report WHERE `trigger_day` = #{triggerDay}
</select>


XxlJobLogReportDao中:

public int queryLogReportByTriggerDay(XxlJobLogReport xxlJobLogReport);


处理时:

// 如果要更新的内容,和之前的内容一致,那么这个更新就会返回0,此时执行插入,就会报错
int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().queryLogReportByTriggerDay(xxlJobLogReport);
if (ret < 1) {
	XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().save(xxlJobLogReport);
} else {
	XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().update(xxlJobLogReport);
}


END   

推荐您阅读更多有关于“ 报错 分布式 任务 XXLJOB Quartz 定时任务 ”的文章

上一篇:分布式任务调度平台XXL-JOB:调度日志打印时区问题 下一篇:Kafka 消费端消费重试和死信队列

猜你喜欢

发表评论: