using Microsoft.Practices.Unity; using MyCode.Project.Domain.Config; using MyCode.Project.Domain.Model; using MyCode.Project.Domain.Repositories; using MyCode.Project.Infrastructure.Common; using MyCode.Project.Infrastructure.Enumeration; using MyCode.Project.Infrastructure.Exceptions; using MyCode.Project.Infrastructure.Extensions; using MyCode.Project.Infrastructure.UnityExtensions; using MyCode.Project.Repositories.Common; using System; using System.Collections.Generic; using System.Linq; using System.Reflection; using System.Text; using System.Threading; using System.Threading.Tasks; namespace MyCode.Project.Services.Implementation { public class WorkProcessService : ServiceBase, IWorkProcessService { #region 初始化 private readonly ISysWorkprocessRepository _sysWorkprocessRepository; private readonly ISysWorkprocessHistoryRepository _sysWorkprocessHistoryRepository; public WorkProcessService(ISysWorkprocessRepository sysWorkprocessRepository, ISysWorkprocessHistoryRepository sysWorkprocessHistoryRepository) { _sysWorkprocessRepository = sysWorkprocessRepository; _sysWorkprocessHistoryRepository = sysWorkprocessHistoryRepository; } #endregion #region SelectInitWorkProcess(返回前几十条数据) public List SelectInitWorkProcess(int top) { var result = _sysWorkprocessRepository.Queryable() .Where(p => p.Status == (int)WorkProcessStatus.Init) .Take(top) .OrderBy(p => p.Priority) .OrderBy(p => p.EditTime).ToList(); return result; } #endregion #region Add(批量添加) public void Add(List workProcess) { _sysWorkprocessRepository.Add(workProcess); } #endregion #region RestratStopProcess(重新启用所有暂停了的调度,这里不需要事务,因为修改失败也不影响) public void RestratStopProcess() { var list = _sysWorkprocessRepository.SelectList(p => p.Status == (int)WorkProcessStatus.Stop); list.ForEach(x => { x.Status = (int)WorkProcessStatus.Running; x.EditTime = DateTime.Now; _sysWorkprocessRepository.Update(x); }); } #endregion #region RestartStopProcess(重新启用某个暂停了的调度) [TransactionCallHandler] public void RestartStopProcess(long workprocessId) { var workprocess = _sysWorkprocessRepository.SelectFirst(p => p.Id == workprocessId); if (workprocess.Status != (int)WorkProcessStatus.Stop) { throw new BaseException("当前进程状态不是停止"); } workprocess.Status = (int)WorkProcessStatus.Init; workprocess.EditTime = DateTime.Now; _sysWorkprocessRepository.Update(workprocess); } #endregion #region ProcessHandleExpire(进行执行超时) /// /// 进程执行超时 /// public void ProcessHandleExpire() { var time = DateTime.Now.AddMinutes(-10); var count = _sysWorkprocessRepository.Count(p => p.Status == (int)WorkProcessStatus.Running && p.EditTime <= time); if (count > 0) { DingDingHelper.SendMsg($"有超时的进程任务产生,条数:{count}"); } } #endregion #region 清空历史数据 [TransactionCallHandler] public void ClearHistoryTask() { var historyDate = DateTime.Now.AddDays(-7); var historyData = _sysWorkprocessRepository.SelectList(p => p.CreateTime < historyDate); if (historyData == null || historyData.Count == 0) { return; } _sysWorkprocessRepository.Add(historyData, "sys_workprocess_history"); _sysWorkprocessRepository.DeleteByIds(historyData.Select(p => p.Id).ToArray()); } #endregion #region ExecuteSingle(执行单个) public void ExecuteSingle(SysWorkprocess process) { var type = UnityHelper.GetUnityContainer().Resolve(Type.GetType(process.TypePath)); MethodInfo method = type.GetType().GetMethod(process.MethodName); if (!string.IsNullOrEmpty(process.ParameterInfo)) { method.Invoke(type, new object[] { process.ParameterInfo }); } else { method.Invoke(type, new object[] { }); } process.Status = (int)WorkProcessStatus.Finished; process.EditTime = DateTime.Now; _sysWorkprocessRepository.Update(process); } #endregion #region Execute(调度执行) public void Execute() { var client = UnityHelper.GetService(); var list = SelectInitWorkProcess(10); //先将这20个任务改成运行中 if (list != null && list.Count > 0) { DateTime now = DateTime.Now; var updateList = list.Select(p => new SysWorkprocess { Id = p.Id, Status = (int)WorkProcessStatus.Running, EditTime = now }); _sysWorkprocessRepository.Update(updateList.ToList(), it => new { it.Status, it.EditTime }); } foreach (var process in list) { //这里开启事务,同时才开启 client.Ado.BeginTran(); #region 执行一个任务 try { ExecuteSingle(process); client.Ado.CommitTran(); } catch (Exception ex) { client.Ado.RollbackTran(); while (ex.InnerException != null) { ex = ex.InnerException; } process.Status = (int)WorkProcessStatus.Stop; process.EditTime = DateTime.Now; if (ex is BaseException) { process.ExceptionInfo = ex.Message; } else { process.ExceptionInfo = ex.ToString(); } _sysWorkprocessRepository.Update(process); DingDingHelper.SendMsg(process.ExceptionInfo); } finally { client.Ado.Context.Queues.Clear(); } #endregion } } #endregion #region Add(添加调度) /// /// 添加调度 /// /// 执行类 /// 商家ID /// 方法名 /// 备注 /// 参数信息 /// 执行类型 public void Add(string methodName,long companyId, object entity = null, string remark = "", FuncType funcType = FuncType.Function, Priority priority = Priority.Low) { Add(typeof(T), companyId,methodName, remark, entity, funcType, priority); } #endregion #region Add(添加调度) /// /// 添加调度 /// /// 商家ID /// 执行类 /// 方法名 /// 备注 /// 参数信息 /// 执行类型 public void Add(Type type,long companyId, string methodName, string remark = "", object entity = null, FuncType funcType = FuncType.Function, Priority priority = Priority.Low) { string typePath = string.Format("{0}, {1}", type.FullName, type.Assembly.GetName().Name); string paramInfo = entity == null ? "" : entity is string || entity is Guid || entity is int || entity is long || entity is decimal || entity is float || entity is double ? entity.ToString() : entity.ToJson(); Add(typePath,companyId, methodName, remark, paramInfo, funcType, priority); } #endregion #region AddQueue(添加进程调度) public void AddQueue(string methodName, long companyId,object entity = null,FuncType funcType = FuncType.Function, Priority priority = Priority.Low) { var type = typeof(T); string typePath = string.Format("{0}, {1}", type.FullName, type.Assembly.GetName().Name); string paramInfo = entity == null ? "" : entity is string || entity is Guid || entity is int || entity is long || entity is decimal || entity is float || entity is double ? entity.ToString() : entity.ToJson(); var workProcess = new SysWorkprocess() { Id = IdHelper.GetNewId(), FuncType = (byte)funcType, TypePath = typePath, MethodName = methodName, ParameterInfo = paramInfo, Status = (int)WorkProcessStatus.Init, EditTime = DateTime.Now, Priority = (byte)priority, CreateTime = DateTime.Now, CompanyId = companyId }; _sysWorkprocessRepository.AddQueue(workProcess); } #endregion #region Add(添加调度) /// /// 添加调度任务 /// /// 商家ID /// 类型路径,如:Lxm.IServices.IWorkProcessService, Lxm.Services /// 方法名 /// 备注 /// 参数信息 /// 执行类型 public void Add(string typePath,long companyId, string methodName, string remark = "", string paramInfo = "", FuncType funcType = FuncType.Function, Priority priority = Priority.Low) { var entity = new SysWorkprocess() { Id = IdHelper.GetNewId(), FuncType = (byte)funcType, TypePath = typePath, MethodName = methodName, ParameterInfo = paramInfo, Status = (int)WorkProcessStatus.Init, Remark = remark, EditTime = DateTime.Now, Priority = (byte)priority, CreateTime = DateTime.Now, CompanyId = companyId }; _sysWorkprocessRepository.Add(entity); } #endregion } }