using Microsoft.Practices.Unity; using MyCode.Project.Domain.Model; using MyCode.Project.Domain.Repositories; using MyCode.Project.Infrastructure.Common; using MyCode.Project.Infrastructure.Enumeration; using MyCode.Project.Infrastructure.Exceptions; using MyCode.Project.Infrastructure.Extensions; using MyCode.Project.Infrastructure.UnityExtensions; using MyCode.Project.Repositories.Common; using System; using System.Collections.Generic; using System.Linq; using System.Reflection; using System.Threading; namespace MyCode.Project.Services.Implementation { /// /// 工作调度模块 相关服务 /// public class WorkProcessService : ServiceBase, IWorkProcessService { private ISysWorkProcessV2Repository _sysWorkProcessV2Repository; private static object locker = new object(); //创建锁 public WorkProcessService(ISysWorkProcessV2Repository sysWorkProcessV2Repository) { _sysWorkProcessV2Repository = sysWorkProcessV2Repository; } #region Add(添加调度) /// /// 添加调度 /// /// 执行类 /// 商家ID /// 方法名 /// 备注 /// 参数信息 /// 执行类型 public void Add(Guid merchantId, string methodName, string remark = "", object entity = null, int priority = 5, FuncType funcType = FuncType.Method) where T : class { Add(merchantId, typeof(T), methodName, remark, entity, funcType, priority); } /// /// 添加调度 /// /// 商家ID /// 执行类 /// 方法名 /// 备注 /// 参数信息 /// 执行类型 public void Add(Guid merchantId, Type type, string methodName, string remark = "", object entity = null, FuncType funcType = FuncType.Method, int priority = 5) { string typePath = string.Format("{0}, {1}", type.FullName, type.Assembly.GetName().Name ); string paramInfo = entity == null ? "" : entity is string || entity is Guid || entity is int || entity is long || entity is decimal || entity is float || entity is double ? entity.SafeString() : JsonHelper.ToJson(entity); Add(merchantId, typePath, methodName, remark, paramInfo, funcType, priority); } /// /// 添加调度任务 /// /// 商家ID /// 类型路径,如:Lxm.IServices.IWorkProcessService, Lxm.Services /// 方法名 /// 备注 /// 参数信息 /// 执行类型 public void Add(Guid merchantId, string typePath, string methodName, string remark = "", string paramInfo = "", FuncType funcType = FuncType.Method, int priority = 5) { if (this._sysWorkProcessV2Repository.Exists(funcType, merchantId, typePath, methodName, paramInfo) && methodName!= "RunWechatVSKingDee") { return; } SysWorkProcessV2 entity = new SysWorkProcessV2(); entity.ID = Guid.NewGuid(); entity.MerchantID = merchantId; entity.FuncType = funcType.Value(); entity.FuncClass = typePath; entity.FuncMethod = methodName; entity.ParamInfo = paramInfo; entity.FuncStatus = FuncStatus.Waiting.Value(); entity.Remark = remark; entity.RetryCount = 0; entity.Creater = "系统调度"; entity.CreateTime = DateTime.Now; entity.Editor = "系统调度"; entity.EditTime = DateTime.Now; entity.Priority = priority; this._sysWorkProcessV2Repository.Add(entity); } #endregion #region SelectInitWorkProcess(返回前几十条数据) public List SelectInitWorkProcess(int top) { return _sysWorkProcessV2Repository.Queryable() .Where(p => p.FuncStatus == (int)WorkProcessStatus.Waiting && ( p.Priority==1 || p.Priority==null) ) .Take(top) .OrderBy(p => p.Priority) .OrderBy(p => p.CreateTime).ToList(); } #endregion #region Add(批量添加) public void Add(List workProcess) { _sysWorkProcessV2Repository.Add(workProcess); } #endregion #region RestratStopProcess(重新启用所有暂停了的调度,这里不需要事务,因为修改失败也不影响) public void RestratStopProcess() { var list = _sysWorkProcessV2Repository.SelectList(p => p.FuncStatus == (int)WorkProcessStatus.Pause); list.ForEach(x => { x.FuncStatus = (int)WorkProcessStatus.Running; x.EditTime = DateTime.Now; }); _sysWorkProcessV2Repository.Update(list); } #endregion #region RestartStopProcess(重新启用某个暂停了的调度) [TransactionCallHandler] public void RestartStopProcess(Guid workprocessId) { var workprocess = _sysWorkProcessV2Repository.SelectFirst(p => p.ID == workprocessId); if (workprocess.FuncStatus != (int)WorkProcessStatus.Pause) { throw new BaseException("当前进程状态不是停止"); } workprocess.FuncStatus = (int)WorkProcessStatus.Running; workprocess.EditTime = DateTime.Now; _sysWorkProcessV2Repository.Update(workprocess); } #endregion #region 清空历史数据 public void ClearHistoryTask() { var historyDate = DateTime.Now.AddDays(-7); _sysWorkProcessV2Repository.Delete(p => p.FuncStatus == (int)WorkProcessStatus.Complete && p.EditTime < historyDate); } #endregion #region ExecuteSingle(执行单个) public void ExecuteSingle(SysWorkProcessV2 process) { var type = UnityHelper.GetUnityContainer().Resolve(Type.GetType(process.FuncClass)); MethodInfo method = type.GetType().GetMethod(process.FuncMethod); if (!string.IsNullOrEmpty(process.ParamInfo)) { method.Invoke(type, new object[] { process.ParamInfo }); } else { method.Invoke(type, new object[] { }); } process.FuncStatus = (int)WorkProcessStatus.Complete; process.ExecuteTime = DateTime.Now; process.ExceptionInfo = string.Empty; _sysWorkProcessV2Repository.Update(process); } #endregion #region Execute(调度执行) public void Execute() { lock (locker) { var client = UnityHelper.GetService(); var list = SelectInitWorkProcess(5); //先将这10个任务改成运行中 if (list != null && list.Count > 0) { var updateList = list.Select(p => new SysWorkProcessV2 { ID = p.ID, FuncStatus = (int)WorkProcessStatus.Running }); _sysWorkProcessV2Repository.Update(updateList.ToList(), it => new { it.FuncStatus }); } foreach (var process in list) { //这里开启事务,同时才开启 client.Ado.BeginTran(); #region 执行一个任务 try { ExecuteSingle(process); client.Ado.CommitTran(); } catch (Exception ex) { client.Ado.RollbackTran(); while (ex.InnerException != null) { ex = ex.InnerException; } process.FuncStatus = (int)WorkProcessStatus.ExceptionStop; process.EditTime = DateTime.Now; process.ExecuteTime = DateTime.Now; if (ex is BaseException) { process.ExceptionInfo = ex.Message; } else { process.ExceptionInfo = ex.ToString(); } _sysWorkProcessV2Repository.Update(process); } #endregion } } } #endregion #region SelectOtherInitWorkProcess(返回优先级大于2的前几十条数据) /// /// 获取优先级3,4,5的调度 /// /// /// public List SelectOtherInitWorkProcess(int top) { return _sysWorkProcessV2Repository.Queryable() .Where(p => p.FuncStatus == (int)WorkProcessStatus.Waiting && p.Priority > 2 && p.Priority <= 5) .Take(top) .OrderBy(p => p.Priority) .OrderBy(p => p.CreateTime).ToList(); } #endregion #region ExecuteOther(调度执行优先级比较低的任务) /// /// 调度执行优先级比较低的任务 /// public void ExecuteOther() { var client = UnityHelper.GetService(); var list = SelectOtherInitWorkProcess(20); //先将这10个任务改成运行中 if (list != null && list.Count > 0) { var updateList = list.Select(p => new SysWorkProcessV2 { ID = p.ID, FuncStatus = (int)WorkProcessStatus.Running }); _sysWorkProcessV2Repository.Update(updateList.ToList(), it => new { it.FuncStatus }); } foreach (var process in list) { //这里开启事务,同时才开启 client.Ado.BeginTran(); #region 执行一个任务 try { ExecuteSingle(process); client.Ado.CommitTran(); } catch (Exception ex) { client.Ado.RollbackTran(); while (ex.InnerException != null) { ex = ex.InnerException; } process.FuncStatus = (int)WorkProcessStatus.ExceptionStop; process.EditTime = DateTime.Now; process.ExecuteTime = DateTime.Now; if (ex is BaseException) { process.ExceptionInfo = ex.Message; } else { process.ExceptionInfo = ex.ToString(); } _sysWorkProcessV2Repository.Update(process); } #endregion } } #endregion #region SelectPriority2InitWorkProcess(返回优先级等于2的前几十条数据) /// /// 返回优先级等于2的前几十条数据 /// /// /// public List SelectPriority2InitWorkProcess(int top) { return _sysWorkProcessV2Repository.Queryable() .Where(p => p.FuncStatus == (int)WorkProcessStatus.Waiting && p.Priority == 2) .Take(top) .OrderBy(p => p.Priority) .OrderBy(p => p.CreateTime).ToList(); } #endregion #region ExecutePriority2Work(调度执行优先级等于2的任务) /// /// 调度执行优先级等于2的任务 /// public void ExecutePriority2Work() { var client = UnityHelper.GetService(); var list = SelectPriority2InitWorkProcess(10); //先将这10个任务改成运行中 if (list != null && list.Count > 0) { var updateList = list.Select(p => new SysWorkProcessV2 { ID = p.ID, FuncStatus = (int)WorkProcessStatus.Running }); _sysWorkProcessV2Repository.Update(updateList.ToList(), it => new { it.FuncStatus }); } foreach (var process in list) { //这里开启事务,同时才开启 client.Ado.BeginTran(); #region 执行一个任务 try { ExecuteSingle(process); client.Ado.CommitTran(); } catch (Exception ex) { client.Ado.RollbackTran(); while (ex.InnerException != null) { ex = ex.InnerException; } process.FuncStatus = (int)WorkProcessStatus.ExceptionStop; process.EditTime = DateTime.Now; process.ExecuteTime = DateTime.Now; if (ex is BaseException) { process.ExceptionInfo = ex.Message; } else { process.ExceptionInfo = ex.ToString(); } _sysWorkProcessV2Repository.Update(process); } #endregion //Thread.Sleep(300); } } #endregion #region SelectSmsInitWorkProcess(获取优先级6的短信调度) /// /// 获取优先级6的短信调度 /// /// /// public List SelectPriority6WorkProcess(int top) { DateTime days = DateTime.Now.Date; return _sysWorkProcessV2Repository.Queryable() .Where(p => p.FuncStatus == (int)WorkProcessStatus.Waiting && p.Priority == 6) .Take(top) .OrderBy(p => p.Priority) .OrderBy(p => p.CreateTime).ToList(); } #endregion #region ExecutePriority6(调度执行优先级=6的任务) /// /// 调度执行优先级=6的任务 /// public void ExecutePriority6() { var client = UnityHelper.GetService(); var list = SelectPriority6WorkProcess(20); //先将这10个任务改成运行中 if (list != null && list.Count > 0) { var updateList = list.Select(p => new SysWorkProcessV2 { ID = p.ID, FuncStatus = (int)WorkProcessStatus.Running }); _sysWorkProcessV2Repository.Update(updateList.ToList(), it => new { it.FuncStatus }); } foreach (var process in list) { //这里开启事务,同时才开启 client.Ado.BeginTran(); #region 执行一个任务 try { ExecuteSingle(process); client.Ado.CommitTran(); } catch (Exception ex) { client.Ado.RollbackTran(); while (ex.InnerException != null) { ex = ex.InnerException; } process.FuncStatus = (int)WorkProcessStatus.ExceptionStop; process.EditTime = DateTime.Now; process.ExecuteTime = DateTime.Now; if (ex is BaseException) { process.ExceptionInfo = ex.Message; } else { process.ExceptionInfo = ex.ToString(); } _sysWorkProcessV2Repository.Update(process); } #endregion } } #endregion #region RetryTask(重试失败的任务) /// /// 重试失败的任务 /// public void RetryTask() { DateTime today = DateTime.Now.Date.AddDays(-1); var list = _sysWorkProcessV2Repository.Queryable().Where(t => t.FuncStatus == 4 && t.RetryCount <= 10 && t.CreateTime >= today).OrderBy(t => t.EditTime).Take(20).ToList(); list.ForEach(t => { t.RetryCount = t.RetryCount + 1; t.FuncStatus = 0; }); _sysWorkProcessV2Repository.Update(list); } #endregion } }