Stay Hungry.Stay Foolish.
集群环境实现定时任务只在单个节点执行

场景

机器是k8s环境集群部署的,有同步数据的任务需要定时执行,如果同时有两个节点去执行任务,就可能导致数据出错,所以需要有一种方式来保证定时任务同一个时间只能在一台机器上去执行

解决方案

利用MySQL InnoDb的行级锁和事务处理

  1. 建表

+———————-+——————+———+——-+————-+————————+

| Field | Type | Null | Key | Default | Extra |

+———————-+——————+———+——-+————-+————————+

| id | bigint(20) | NO | PRI | NULL | auto_increment |

| status | bigint(20) | NO | | 0 | |

| datetime | datetime | NO | | NULL | |

| intervallen | bigint(20) | NO | | 0 | |

| schedule_type | bigint(20) | NO | UNI | 0 | |

| createtime | datetime | NO | | NULL | |

| updatetime | datetime | NO | | NULL | |

+———————-+——————+———+——-+————-+————————+

插入初始化SQL

insert into schedule_job(status, datetime, intervallen, schedule_type,createtime,updatetime) values(1, "2023-03-23 11:49:44", 1800, 1, "2023-03-23 11:49:44", "2023-03-23 11:49:44")

利用MySQL事务和

func GetAndSetScheduleStatus(schedule_type int64) (bool, error) {
    var status int64
    var isovertime int64
    o := orm.NewOrm()
    tx, err := o.Begin()
    if err != nil {
        return false, err
    }
    sql := fmt.Sprintf("select status,case when (UNIX_TIMESTAMP('%s') - UNIX_TIMESTAMP(datetime)) > intervallen then 1 else 0 end isovertime "+
        "from schedule_job t where t.schedule_type=%d for update", time.Now().Format("2006/01/02 15:04:05"), schedule_type)
    //fmt.Println(sql)
    if err := tx.Raw(sql).QueryRow(&status, &isovertime); err != nil {
        tx.Rollback()
        return false, err
    }
    if status == 1 || isovertime == 1 {
        // 将job设置为不可执行
        _, err := tx.Raw("update schedule_job set status=?,datetime=? where schedule_type=?", 0, time.Now(), schedule_type).Exec()
        if err != nil {
            tx.Rollback()
            return false, err
        }
        tx.Commit()
        return true, nil
    }
    tx.Commit()
    return false, nil
}

func UpdateScheduleStatus(status int64, schedule_type int64) {
    o := orm.NewOrm()
    schedule_job := ScheduleJob{ScheduleType: schedule_type}
    o.Read(&schedule_job, "ScheduleType")
    schedule_job.Datetime = time.Now()
    schedule_job.Status = status
    schedule_job.Updatetime = time.Now()
    o.Update(&schedule_job)
}

func VulSyncTask() error {
    isRunning, err := models.GetAndSetScheduleStatus(1)
    if err != nil {
        logs.Error(err.Error())
    }
    if !isRunning {
        logs.Info("已经有任务在执行,跳过")
    } else {
        doAction()
        if isRunning {
            models.UpdateScheduleStatus(1, 1)
        }
    }
    return nil
}
自由转载-非商用-非衍生-保持署名(创意共享3.0许可证
评论

暂无评论~~