Quartz作为开源作业调度中的佼佼者,是作业调度的首选。但是集群环境中Quartz采用API的方式对任务进行管理,从而可以避免上述问题,但是同样存在以下问题:
问题一:调用API的的方式操作任务,不人性化;
问题二:需要持久化业务QuartzJobBean到底层数据表中,系统侵入性相当严重。
问题三:调度逻辑和QuartzJobBean耦合在同一个项目中,这将导致一个问题,在调度任务数量逐渐增多,同时调度任务逻辑逐渐加重的情况下,此时调度系统的性能将大大受限于业务;
问题四:quartz底层以“抢占式”获取DB锁并由抢占成功节点负责运行任务,会导致节点负载悬殊非常大;而XXL-JOB通过执行器实现“协同分配式”运行任务,充分发挥集群优势,负载各节点均衡。
XXL-JOB弥补了quartz的上述不足之处。

经其他开发人员反馈,在使用中一直会报错,查看日志如下:
06:28:22.377 logback [xxl-job, admin JobLogReportHelper] ERROR c.x.j.a.c.thread.JobLogReportHelper - >>>>>>>>>>> xxl-job, job log report thread error:{}
org.springframework.dao.DuplicateKeyException:
### Error updating database. Cause: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry '2022-08-16 08:00:00' for key 'xxl_job_log_report.i_trigger_day'
### The error may exist in class path resource [mybatis-mapper/XxlJobLogReportMapper.xml]
### The error may involve com.xxl.job.admin.dao.XxlJobLogReportDao.save-Inline
### The error occurred while setting parameters
### SQL: INSERT INTO xxl_job_log_report ( `trigger_day`, `running_count`, `suc_count`, `fail_count` ) VALUES ( ?, ?, ?, ? );
### Cause: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry '2022-08-16 08:00:00' for key 'xxl_job_log_report.i_trigger_day'
; Duplicate entry '2022-08-16 08:00:00' for key 'xxl_job_log_report.i_trigger_day'; nested exception is java.sql.SQLIntegrityConstraintViolationException: Duplicate entry '2022-08-16 08:00:00' for key 'xxl_job_log_report.i_trigger_day'
at org.springframework.jdbc.support.SQLErrorCodeSQLExceptionTranslator.doTranslate(SQLErrorCodeSQLExceptionTranslator.java:247)
at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:70)
at org.mybatis.spring.MyBatisExceptionTranslator.translateExceptionIfPossible(MyBatisExceptionTranslator.java:91)
at org.mybatis.spring.SqlSessionTemplate$SqlSessionInterceptor.invoke(SqlSessionTemplate.java:441)
at com.sun.proxy.$Proxy75.insert(Unknown Source)
at org.mybatis.spring.SqlSessionTemplate.insert(SqlSessionTemplate.java:272)
at org.apache.ibatis.binding.MapperMethod.execute(MapperMethod.java:62)
at org.apache.ibatis.binding.MapperProxy$PlainMethodInvoker.invoke(MapperProxy.java:152)
at org.apache.ibatis.binding.MapperProxy.invoke(MapperProxy.java:85)
at com.sun.proxy.$Proxy80.save(Unknown Source)
at com.xxl.job.admin.core.thread.JobLogReportHelper$1.run(JobLogReportHelper.java:86)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry '2022-08-16 08:00:00' for key 'xxl_job_log_report.i_trigger_day'
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:117)
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:953)
at com.mysql.cj.jdbc.ClientPreparedStatement.execute(ClientPreparedStatement.java:370)
at com.alibaba.druid.pool.DruidPooledPreparedStatement.execute(DruidPooledPreparedStatement.java:497)
at org.apache.ibatis.executor.statement.PreparedStatementHandler.update(PreparedStatementHandler.java:47)
at org.apache.ibatis.executor.statement.RoutingStatementHandler.update(RoutingStatementHandler.java:74)
at org.apache.ibatis.executor.SimpleExecutor.doUpdate(SimpleExecutor.java:50)
at org.apache.ibatis.executor.BaseExecutor.update(BaseExecutor.java:117)
at org.apache.ibatis.executor.CachingExecutor.update(CachingExecutor.java:76)
at org.apache.ibatis.session.defaults.DefaultSqlSession.update(DefaultSqlSession.java:197)
at org.apache.ibatis.session.defaults.DefaultSqlSession.insert(DefaultSqlSession.java:184)
at sun.reflect.GeneratedMethodAccessor91.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.mybatis.spring.SqlSessionTemplate$SqlSessionInterceptor.invoke(SqlSessionTemplate.java:427)
... 8 common frames omitted查看这个表的结构
CREATE TABLE `xxl_job_log_report` ( `id` int NOT NULL AUTO_INCREMENT, `trigger_day` datetime DEFAULT NULL COMMENT '调度-时间', `running_count` int NOT NULL DEFAULT '0' COMMENT '运行中-日志数量', `suc_count` int NOT NULL DEFAULT '0' COMMENT '执行成功-日志数量', `fail_count` int NOT NULL DEFAULT '0' COMMENT '执行失败-日志数量', `update_time` datetime DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE, UNIQUE KEY `i_trigger_day` (`trigger_day`) USING BTREE ) ENGINE=InnoDB AUTO_INCREMENT=170924 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci ROW_FORMAT=COMPACT;
查看的话,表中确实有这条数据了,但是为什么还是会录入呢?录入的地方,是一个独立线程,持续循环汇总日志报告,代码如下
public void start() {
logrThread = new Thread(new Runnable() {
@Override
public void run() {
// last clean log time
long lastCleanLogTime = 0;
while (!toStop) {
// 1、log-report refresh: refresh log report in 3 days
try {
for (int i = 0; i < 3; i++) {
// today
Calendar itemDay = Calendar.getInstance(TimeZone.getTimeZone(ZoneId.of("Asia/Shanghai")));
itemDay.add(Calendar.DAY_OF_MONTH, -i);
itemDay.set(Calendar.HOUR_OF_DAY, 0);
itemDay.set(Calendar.MINUTE, 0);
itemDay.set(Calendar.SECOND, 0);
itemDay.set(Calendar.MILLISECOND, 0);
Date todayFrom = itemDay.getTime();
itemDay.set(Calendar.HOUR_OF_DAY, 23);
itemDay.set(Calendar.MINUTE, 59);
itemDay.set(Calendar.SECOND, 59);
itemDay.set(Calendar.MILLISECOND, 999);
Date todayTo = itemDay.getTime();
// refresh log-report every minute
XxlJobLogReport xxlJobLogReport = new XxlJobLogReport();
xxlJobLogReport.setTriggerDay(todayFrom);
xxlJobLogReport.setRunningCount(0);
xxlJobLogReport.setSucCount(0);
xxlJobLogReport.setFailCount(0);
Map<String, Object> triggerCountMap = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLogReport(todayFrom, todayTo);
if (triggerCountMap != null && triggerCountMap.size() > 0) {
int triggerDayCount = triggerCountMap.containsKey("triggerDayCount") ? Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCount"))) : 0;
int triggerDayCountRunning = triggerCountMap.containsKey("triggerDayCountRunning") ? Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountRunning"))) : 0;
int triggerDayCountSuc = triggerCountMap.containsKey("triggerDayCountSuc") ? Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountSuc"))) : 0;
int triggerDayCountFail = triggerDayCount - triggerDayCountRunning - triggerDayCountSuc;
xxlJobLogReport.setRunningCount(triggerDayCountRunning);
xxlJobLogReport.setSucCount(triggerDayCountSuc);
xxlJobLogReport.setFailCount(triggerDayCountFail);
}
// do refresh
int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().update(xxlJobLogReport);
if (ret < 1) {
XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().save(xxlJobLogReport);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job log report thread error:{}", e);
}
}
// 2、log-clean: switch open & once each day
if (XxlJobAdminConfig.getAdminConfig().getLogretentiondays() > 0
&& System.currentTimeMillis() - lastCleanLogTime > 24 * 60 * 60 * 1000) {
// expire-time
Calendar expiredDay = Calendar.getInstance(TimeZone.getTimeZone(ZoneId.of("Asia/Shanghai")));
expiredDay.add(Calendar.DAY_OF_MONTH, -1 * XxlJobAdminConfig.getAdminConfig().getLogretentiondays());
expiredDay.set(Calendar.HOUR_OF_DAY, 0);
expiredDay.set(Calendar.MINUTE, 0);
expiredDay.set(Calendar.SECOND, 0);
expiredDay.set(Calendar.MILLISECOND, 0);
Date clearBeforeTime = expiredDay.getTime();
// clean expired log
List<Long> logIds = null;
do {
logIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findClearLogIds(0, 0, clearBeforeTime, 0, 1000);
if (logIds != null && logIds.size() > 0) {
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().clearLog(logIds);
}
} while (logIds != null && logIds.size() > 0);
// update clean time
lastCleanLogTime = System.currentTimeMillis();
}
try {
TimeUnit.MINUTES.sleep(1);
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, job log report thread stop");
}
});
logrThread.setDaemon(true);
logrThread.setName("xxl-job, admin JobLogReportHelper");
logrThread.start();
}看到有一行代码,意思是更新这个表,如果没有,就新增,这个逻辑的前提是更新时每次能更新内容,如果没有更新内容,那么返回的就是0,这时不是数据没有,再新增时就会报错了。
因此,这个判断的逻辑是不严谨的,我们做个处理,先查下是否有该数据,查不到就新增,查到了就更新。
XxlJobLogReportMapper.xml中:
<select id="queryLogReportByTriggerDay" resultType="java.lang.Integer">
SELECT COUNT(*) FROM xxl_job_log_report WHERE `trigger_day` = #{triggerDay}
</select>XxlJobLogReportDao中:
public int queryLogReportByTriggerDay(XxlJobLogReport xxlJobLogReport);
处理时:
// 如果要更新的内容,和之前的内容一致,那么这个更新就会返回0,此时执行插入,就会报错
int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().queryLogReportByTriggerDay(xxlJobLogReport);
if (ret < 1) {
XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().save(xxlJobLogReport);
} else {
XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().update(xxlJobLogReport);
}END
Java小强
未曾清贫难成人,不经打击老天真。
自古英雄出炼狱,从来富贵入凡尘。
发表评论: