机器是k8s环境集群部署的,有同步数据的任务需要定时执行,如果同时有两个节点去执行任务,就可能导致数据出错,所以需要有一种方式来保证定时任务同一个时间只能在一台机器上去执行
利用MySQL InnoDb的行级锁和事务处理
+———————-+——————+———+——-+————-+————————+
| Field | Type | Null | Key | Default | Extra |
+———————-+——————+———+——-+————-+————————+
| id | bigint(20) | NO | PRI | NULL | auto_increment |
| status | bigint(20) | NO | | 0 | |
| datetime | datetime | NO | | NULL | |
| intervallen | bigint(20) | NO | | 0 | |
| schedule_type | bigint(20) | NO | UNI | 0 | |
| createtime | datetime | NO | | NULL | |
| updatetime | datetime | NO | | NULL | |
+———————-+——————+———+——-+————-+————————+
插入初始化SQL
insert into schedule_job(status, datetime, intervallen, schedule_type,createtime,updatetime) values(1, "2023-03-23 11:49:44", 1800, 1, "2023-03-23 11:49:44", "2023-03-23 11:49:44")
利用MySQL事务和
func GetAndSetScheduleStatus(schedule_type int64) (bool, error) {
var status int64
var isovertime int64
o := orm.NewOrm()
tx, err := o.Begin()
if err != nil {
return false, err
}
sql := fmt.Sprintf("select status,case when (UNIX_TIMESTAMP('%s') - UNIX_TIMESTAMP(datetime)) > intervallen then 1 else 0 end isovertime "+
"from schedule_job t where t.schedule_type=%d for update", time.Now().Format("2006/01/02 15:04:05"), schedule_type)
//fmt.Println(sql)
if err := tx.Raw(sql).QueryRow(&status, &isovertime); err != nil {
tx.Rollback()
return false, err
}
if status == 1 || isovertime == 1 {
// 将job设置为不可执行
_, err := tx.Raw("update schedule_job set status=?,datetime=? where schedule_type=?", 0, time.Now(), schedule_type).Exec()
if err != nil {
tx.Rollback()
return false, err
}
tx.Commit()
return true, nil
}
tx.Commit()
return false, nil
}
func UpdateScheduleStatus(status int64, schedule_type int64) {
o := orm.NewOrm()
schedule_job := ScheduleJob{ScheduleType: schedule_type}
o.Read(&schedule_job, "ScheduleType")
schedule_job.Datetime = time.Now()
schedule_job.Status = status
schedule_job.Updatetime = time.Now()
o.Update(&schedule_job)
}
func VulSyncTask() error {
isRunning, err := models.GetAndSetScheduleStatus(1)
if err != nil {
logs.Error(err.Error())
}
if !isRunning {
logs.Info("已经有任务在执行,跳过")
} else {
doAction()
if isRunning {
models.UpdateScheduleStatus(1, 1)
}
}
return nil
}
评论
暂无评论~~