Quartz作为开源作业调度中的佼佼者,是作业调度的首选。但是集群环境中Quartz采用API的方式对任务进行管理,从而可以避免上述问题,但是同样存在以下问题:
问题一:调用API的的方式操作任务,不人性化;
问题二:需要持久化业务QuartzJobBean到底层数据表中,系统侵入性相当严重。
问题三:调度逻辑和QuartzJobBean耦合在同一个项目中,这将导致一个问题,在调度任务数量逐渐增多,同时调度任务逻辑逐渐加重的情况下,此时调度系统的性能将大大受限于业务;
问题四:quartz底层以“抢占式”获取DB锁并由抢占成功节点负责运行任务,会导致节点负载悬殊非常大;而XXL-JOB通过执行器实现“协同分配式”运行任务,充分发挥集群优势,负载各节点均衡。
XXL-JOB弥补了quartz的上述不足之处。
经其他开发人员反馈,在使用中一直会报错,查看日志如下:
06:28:22.377 logback [xxl-job, admin JobLogReportHelper] ERROR c.x.j.a.c.thread.JobLogReportHelper - >>>>>>>>>>> xxl-job, job log report thread error:{} org.springframework.dao.DuplicateKeyException: ### Error updating database. Cause: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry '2022-08-16 08:00:00' for key 'xxl_job_log_report.i_trigger_day' ### The error may exist in class path resource [mybatis-mapper/XxlJobLogReportMapper.xml] ### The error may involve com.xxl.job.admin.dao.XxlJobLogReportDao.save-Inline ### The error occurred while setting parameters ### SQL: INSERT INTO xxl_job_log_report ( `trigger_day`, `running_count`, `suc_count`, `fail_count` ) VALUES ( ?, ?, ?, ? ); ### Cause: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry '2022-08-16 08:00:00' for key 'xxl_job_log_report.i_trigger_day' ; Duplicate entry '2022-08-16 08:00:00' for key 'xxl_job_log_report.i_trigger_day'; nested exception is java.sql.SQLIntegrityConstraintViolationException: Duplicate entry '2022-08-16 08:00:00' for key 'xxl_job_log_report.i_trigger_day' at org.springframework.jdbc.support.SQLErrorCodeSQLExceptionTranslator.doTranslate(SQLErrorCodeSQLExceptionTranslator.java:247) at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:70) at org.mybatis.spring.MyBatisExceptionTranslator.translateExceptionIfPossible(MyBatisExceptionTranslator.java:91) at org.mybatis.spring.SqlSessionTemplate$SqlSessionInterceptor.invoke(SqlSessionTemplate.java:441) at com.sun.proxy.$Proxy75.insert(Unknown Source) at org.mybatis.spring.SqlSessionTemplate.insert(SqlSessionTemplate.java:272) at org.apache.ibatis.binding.MapperMethod.execute(MapperMethod.java:62) at org.apache.ibatis.binding.MapperProxy$PlainMethodInvoker.invoke(MapperProxy.java:152) at org.apache.ibatis.binding.MapperProxy.invoke(MapperProxy.java:85) at com.sun.proxy.$Proxy80.save(Unknown Source) at com.xxl.job.admin.core.thread.JobLogReportHelper$1.run(JobLogReportHelper.java:86) at java.lang.Thread.run(Thread.java:748) Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry '2022-08-16 08:00:00' for key 'xxl_job_log_report.i_trigger_day' at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:117) at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122) at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:953) at com.mysql.cj.jdbc.ClientPreparedStatement.execute(ClientPreparedStatement.java:370) at com.alibaba.druid.pool.DruidPooledPreparedStatement.execute(DruidPooledPreparedStatement.java:497) at org.apache.ibatis.executor.statement.PreparedStatementHandler.update(PreparedStatementHandler.java:47) at org.apache.ibatis.executor.statement.RoutingStatementHandler.update(RoutingStatementHandler.java:74) at org.apache.ibatis.executor.SimpleExecutor.doUpdate(SimpleExecutor.java:50) at org.apache.ibatis.executor.BaseExecutor.update(BaseExecutor.java:117) at org.apache.ibatis.executor.CachingExecutor.update(CachingExecutor.java:76) at org.apache.ibatis.session.defaults.DefaultSqlSession.update(DefaultSqlSession.java:197) at org.apache.ibatis.session.defaults.DefaultSqlSession.insert(DefaultSqlSession.java:184) at sun.reflect.GeneratedMethodAccessor91.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.mybatis.spring.SqlSessionTemplate$SqlSessionInterceptor.invoke(SqlSessionTemplate.java:427) ... 8 common frames omitted
查看这个表的结构
CREATE TABLE `xxl_job_log_report` ( `id` int NOT NULL AUTO_INCREMENT, `trigger_day` datetime DEFAULT NULL COMMENT '调度-时间', `running_count` int NOT NULL DEFAULT '0' COMMENT '运行中-日志数量', `suc_count` int NOT NULL DEFAULT '0' COMMENT '执行成功-日志数量', `fail_count` int NOT NULL DEFAULT '0' COMMENT '执行失败-日志数量', `update_time` datetime DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE, UNIQUE KEY `i_trigger_day` (`trigger_day`) USING BTREE ) ENGINE=InnoDB AUTO_INCREMENT=170924 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci ROW_FORMAT=COMPACT;
查看的话,表中确实有这条数据了,但是为什么还是会录入呢?录入的地方,是一个独立线程,持续循环汇总日志报告,代码如下
public void start() { logrThread = new Thread(new Runnable() { @Override public void run() { // last clean log time long lastCleanLogTime = 0; while (!toStop) { // 1、log-report refresh: refresh log report in 3 days try { for (int i = 0; i < 3; i++) { // today Calendar itemDay = Calendar.getInstance(TimeZone.getTimeZone(ZoneId.of("Asia/Shanghai"))); itemDay.add(Calendar.DAY_OF_MONTH, -i); itemDay.set(Calendar.HOUR_OF_DAY, 0); itemDay.set(Calendar.MINUTE, 0); itemDay.set(Calendar.SECOND, 0); itemDay.set(Calendar.MILLISECOND, 0); Date todayFrom = itemDay.getTime(); itemDay.set(Calendar.HOUR_OF_DAY, 23); itemDay.set(Calendar.MINUTE, 59); itemDay.set(Calendar.SECOND, 59); itemDay.set(Calendar.MILLISECOND, 999); Date todayTo = itemDay.getTime(); // refresh log-report every minute XxlJobLogReport xxlJobLogReport = new XxlJobLogReport(); xxlJobLogReport.setTriggerDay(todayFrom); xxlJobLogReport.setRunningCount(0); xxlJobLogReport.setSucCount(0); xxlJobLogReport.setFailCount(0); Map<String, Object> triggerCountMap = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLogReport(todayFrom, todayTo); if (triggerCountMap != null && triggerCountMap.size() > 0) { int triggerDayCount = triggerCountMap.containsKey("triggerDayCount") ? Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCount"))) : 0; int triggerDayCountRunning = triggerCountMap.containsKey("triggerDayCountRunning") ? Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountRunning"))) : 0; int triggerDayCountSuc = triggerCountMap.containsKey("triggerDayCountSuc") ? Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountSuc"))) : 0; int triggerDayCountFail = triggerDayCount - triggerDayCountRunning - triggerDayCountSuc; xxlJobLogReport.setRunningCount(triggerDayCountRunning); xxlJobLogReport.setSucCount(triggerDayCountSuc); xxlJobLogReport.setFailCount(triggerDayCountFail); } // do refresh int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().update(xxlJobLogReport); if (ret < 1) { XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().save(xxlJobLogReport); } } } catch (Exception e) { if (!toStop) { logger.error(">>>>>>>>>>> xxl-job, job log report thread error:{}", e); } } // 2、log-clean: switch open & once each day if (XxlJobAdminConfig.getAdminConfig().getLogretentiondays() > 0 && System.currentTimeMillis() - lastCleanLogTime > 24 * 60 * 60 * 1000) { // expire-time Calendar expiredDay = Calendar.getInstance(TimeZone.getTimeZone(ZoneId.of("Asia/Shanghai"))); expiredDay.add(Calendar.DAY_OF_MONTH, -1 * XxlJobAdminConfig.getAdminConfig().getLogretentiondays()); expiredDay.set(Calendar.HOUR_OF_DAY, 0); expiredDay.set(Calendar.MINUTE, 0); expiredDay.set(Calendar.SECOND, 0); expiredDay.set(Calendar.MILLISECOND, 0); Date clearBeforeTime = expiredDay.getTime(); // clean expired log List<Long> logIds = null; do { logIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findClearLogIds(0, 0, clearBeforeTime, 0, 1000); if (logIds != null && logIds.size() > 0) { XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().clearLog(logIds); } } while (logIds != null && logIds.size() > 0); // update clean time lastCleanLogTime = System.currentTimeMillis(); } try { TimeUnit.MINUTES.sleep(1); } catch (Exception e) { if (!toStop) { logger.error(e.getMessage(), e); } } } logger.info(">>>>>>>>>>> xxl-job, job log report thread stop"); } }); logrThread.setDaemon(true); logrThread.setName("xxl-job, admin JobLogReportHelper"); logrThread.start(); }
看到有一行代码,意思是更新这个表,如果没有,就新增,这个逻辑的前提是更新时每次能更新内容,如果没有更新内容,那么返回的就是0,这时不是数据没有,再新增时就会报错了。
因此,这个判断的逻辑是不严谨的,我们做个处理,先查下是否有该数据,查不到就新增,查到了就更新。
XxlJobLogReportMapper.xml中:
<select id="queryLogReportByTriggerDay" resultType="java.lang.Integer"> SELECT COUNT(*) FROM xxl_job_log_report WHERE `trigger_day` = #{triggerDay} </select>
XxlJobLogReportDao中:
public int queryLogReportByTriggerDay(XxlJobLogReport xxlJobLogReport);
处理时:
// 如果要更新的内容,和之前的内容一致,那么这个更新就会返回0,此时执行插入,就会报错 int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().queryLogReportByTriggerDay(xxlJobLogReport); if (ret < 1) { XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().save(xxlJobLogReport); } else { XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().update(xxlJobLogReport); }
END
Java小强
未曾清贫难成人,不经打击老天真。
自古英雄出炼狱,从来富贵入凡尘。
发表评论: