This commit is contained in:
2025-07-07 13:54:21 +08:00
parent e1e5a82566
commit 38140986e4
18 changed files with 1103 additions and 284 deletions

View File

@@ -20,118 +20,165 @@ namespace MyCode.Project.Services.Implementation
{
public class WorkProcessService : ServiceBase, IWorkProcessService
{
#region
private readonly ISysWorkprocessRepository _sysWorkprocessRepository;
private readonly ISysWorkprocessHistoryRepository _sysWorkprocessHistoryRepository;
private ISysWorkProcessRepository _SysWorkProcessRepository;
public WorkProcessService(ISysWorkprocessRepository sysWorkprocessRepository,
ISysWorkprocessHistoryRepository sysWorkprocessHistoryRepository)
private static object locker = new object(); //创建锁
public WorkProcessService(ISysWorkProcessRepository SysWorkProcessRepository)
{
_sysWorkprocessRepository = sysWorkprocessRepository;
_sysWorkprocessHistoryRepository = sysWorkprocessHistoryRepository;
_SysWorkProcessRepository = SysWorkProcessRepository;
}
#region Add()
/// <summary>
/// 添加调度
/// </summary>
/// <typeparam name="T">执行类</typeparam>
/// <param name="merchantId">商家ID</param>
/// <param name="methodName">方法名</param>
/// <param name="remark">备注</param>
/// <param name="entity">参数信息</param>
/// <param name="funcType">执行类型</param>
public void Add<T>(Guid merchantId, string methodName, string remark = "", object entity = null, int priority = 5, FuncType funcType = FuncType.Method) where T : class
{
Add(merchantId, typeof(T), methodName, remark, entity, funcType, priority);
}
/// <summary>
/// 添加调度
/// </summary>
/// <param name="merchantId">商家ID</param>
/// <param name="type">执行类</param>
/// <param name="methodName">方法名</param>
/// <param name="remark">备注</param>
/// <param name="entity">参数信息</param>
/// <param name="funcType">执行类型</param>
public void Add(Guid merchantId, Type type, string methodName, string remark = "", object entity = null, FuncType funcType = FuncType.Method, int priority = 5)
{
string typePath = string.Format("{0}, {1}", type.FullName, type.Assembly.GetName().Name);
string paramInfo = entity == null
? ""
: entity is string || entity is Guid || entity is int || entity is long || entity is decimal ||
entity is float || entity is double
? entity.SafeString()
: JsonHelper.ToJson(entity);
Add(merchantId, typePath, methodName, remark, paramInfo, funcType, priority);
}
/// <summary>
/// 添加调度任务
/// </summary>
/// <param name="merchantId">商家ID</param>
/// <param name="typePath">类型路径Lxm.IServices.IWorkProcessService, Lxm.Services</param>
/// <param name="methodName">方法名</param>
/// <param name="remark">备注</param>
/// <param name="paramInfo">参数信息</param>
/// <param name="funcType">执行类型</param>
public void Add(Guid merchantId, string typePath, string methodName, string remark = "", string paramInfo = "",
FuncType funcType = FuncType.Method, int priority = 5)
{
if (this._SysWorkProcessRepository.Exists(funcType, merchantId, typePath, methodName, paramInfo) && methodName != "RunWechatVSKingDee")
{
return;
}
SysWorkProcess entity = new SysWorkProcess();
entity.ID = Guid.NewGuid();
entity.MerchantID = merchantId;
entity.FuncType = funcType.Value();
entity.FuncClass = typePath;
entity.FuncMethod = methodName;
entity.ParamInfo = paramInfo;
entity.FuncStatus = FuncStatus.Waiting.Value();
entity.Remark = remark;
entity.RetryCount = 0;
entity.Creater = "系统调度";
entity.CreateTime = DateTime.Now;
entity.Editor = "系统调度";
entity.EditTime = DateTime.Now;
entity.Priority = priority;
this._SysWorkProcessRepository.Add(entity);
}
#endregion
#region SelectInitWorkProcess
public List<SysWorkprocess> SelectInitWorkProcess(int top)
public List<SysWorkProcess> SelectInitWorkProcess(int top)
{
var result = _sysWorkprocessRepository.Queryable()
.Where(p => p.Status == (int)WorkProcessStatus.Init)
return _SysWorkProcessRepository.Queryable()
.Where(p => p.FuncStatus == (int)WorkProcessStatus.Waiting && (p.Priority == 1 || p.Priority == null))
.Take(top)
.OrderBy(p => p.Priority)
.OrderBy(p => p.EditTime).ToList();
return result;
.OrderBy(p => p.CreateTime).ToList();
}
#endregion
#region Add()
public void Add(List<SysWorkprocess> workProcess)
public void Add(List<SysWorkProcess> workProcess)
{
_sysWorkprocessRepository.Add(workProcess);
_SysWorkProcessRepository.Add(workProcess);
}
#endregion
#region RestratStopProcess()
public void RestratStopProcess()
{
var list = _sysWorkprocessRepository.SelectList(p => p.Status == (int)WorkProcessStatus.Stop);
var list = _SysWorkProcessRepository.SelectList(p => p.FuncStatus == (int)WorkProcessStatus.Pause);
list.ForEach(x => {
x.Status = (int)WorkProcessStatus.Running;
x.FuncStatus = (int)WorkProcessStatus.Running;
x.EditTime = DateTime.Now;
_sysWorkprocessRepository.Update(x);
});
_SysWorkProcessRepository.Update(list);
}
#endregion
#region RestartStopProcess
[TransactionCallHandler]
public void RestartStopProcess(long workprocessId)
public void RestartStopProcess(Guid workprocessId)
{
var workprocess = _SysWorkProcessRepository.SelectFirst(p => p.ID == workprocessId);
var workprocess = _sysWorkprocessRepository.SelectFirst(p => p.Id == workprocessId);
if (workprocess.FuncStatus != (int)WorkProcessStatus.Pause) { throw new BaseException("当前进程状态不是停止"); }
if (workprocess.Status != (int)WorkProcessStatus.Stop) { throw new BaseException("当前进程状态不是停止"); }
workprocess.Status = (int)WorkProcessStatus.Init;
workprocess.FuncStatus = (int)WorkProcessStatus.Running;
workprocess.EditTime = DateTime.Now;
_sysWorkprocessRepository.Update(workprocess);
_SysWorkProcessRepository.Update(workprocess);
}
#endregion
#region ProcessHandleExpire()
/// <summary>
/// 进程执行超时
/// </summary>
public void ProcessHandleExpire()
{
var time = DateTime.Now.AddMinutes(-10);
var count = _sysWorkprocessRepository.Count(p => p.Status == (int)WorkProcessStatus.Running && p.EditTime <= time);
if (count > 0) { DingDingHelper.SendMsg($"有超时的进程任务产生,条数:{count}"); }
}
#endregion
#region
[TransactionCallHandler]
public void ClearHistoryTask()
{
var historyDate = DateTime.Now.AddDays(-7);
var historyData = _sysWorkprocessRepository.SelectList(p => p.CreateTime < historyDate);
if (historyData == null || historyData.Count == 0) { return; }
_sysWorkprocessRepository.Add(historyData, "sys_workprocess_history");
_sysWorkprocessRepository.DeleteByIds(historyData.Select(p => p.Id).ToArray());
_SysWorkProcessRepository.Delete(p => p.FuncStatus == (int)WorkProcessStatus.Complete && p.EditTime < historyDate);
}
#endregion
#region ExecuteSingle()
public void ExecuteSingle(SysWorkprocess process)
public void ExecuteSingle(SysWorkProcess process)
{
var type = UnityHelper.GetUnityContainer().Resolve(Type.GetType(process.TypePath));
var type = UnityHelper.GetUnityContainer().Resolve(Type.GetType(process.FuncClass));
MethodInfo method = type.GetType().GetMethod(process.MethodName);
MethodInfo method = type.GetType().GetMethod(process.FuncMethod);
if (!string.IsNullOrEmpty(process.ParameterInfo))
if (!string.IsNullOrEmpty(process.ParamInfo))
{
method.Invoke(type, new object[] { process.ParameterInfo });
method.Invoke(type, new object[] { process.ParamInfo });
}
else
{
method.Invoke(type, new object[] { });
}
process.Status = (int)WorkProcessStatus.Finished;
process.EditTime = DateTime.Now;
_sysWorkprocessRepository.Update(process);
process.FuncStatus = (int)WorkProcessStatus.Complete;
process.ExecuteTime = DateTime.Now;
process.ExceptionInfo = string.Empty;
_SysWorkProcessRepository.Update(process);
}
#endregion
@@ -140,18 +187,94 @@ namespace MyCode.Project.Services.Implementation
public void Execute()
{
lock (locker)
{
var client = UnityHelper.GetService<MyCodeSqlSugarClient>();
var list = SelectInitWorkProcess(5);
//先将这10个任务改成运行中
if (list != null && list.Count > 0)
{
var updateList = list.Select(p => new SysWorkProcess { ID = p.ID, FuncStatus = (int)WorkProcessStatus.Running });
_SysWorkProcessRepository.Update(updateList.ToList(), it => new { it.FuncStatus });
}
foreach (var process in list)
{
//这里开启事务,同时才开启
client.Ado.BeginTran();
#region
try
{
ExecuteSingle(process);
client.Ado.CommitTran();
}
catch (Exception ex)
{
client.Ado.RollbackTran();
while (ex.InnerException != null)
{
ex = ex.InnerException;
}
process.FuncStatus = (int)WorkProcessStatus.ExceptionStop;
process.EditTime = DateTime.Now;
process.ExecuteTime = DateTime.Now;
if (ex is BaseException)
{
process.ExceptionInfo = ex.Message;
}
else
{
process.ExceptionInfo = ex.ToString();
}
_SysWorkProcessRepository.Update(process);
}
#endregion
}
}
}
#endregion
#region SelectOtherInitWorkProcess2
/// <summary>
/// 获取优先级34,5的调度
/// </summary>
/// <param name="top"></param>
/// <returns></returns>
public List<SysWorkProcess> SelectOtherInitWorkProcess(int top)
{
return _SysWorkProcessRepository.Queryable()
.Where(p => p.FuncStatus == (int)WorkProcessStatus.Waiting && p.Priority > 2 && p.Priority <= 5)
.Take(top)
.OrderBy(p => p.Priority)
.OrderBy(p => p.CreateTime).ToList();
}
#endregion
#region ExecuteOther
/// <summary>
/// 调度执行优先级比较低的任务
/// </summary>
public void ExecuteOther()
{
var client = UnityHelper.GetService<MyCodeSqlSugarClient>();
var list = SelectInitWorkProcess(10);
var list = SelectOtherInitWorkProcess(20);
//先将这20个任务改成运行中
//先将这10个任务改成运行中
if (list != null && list.Count > 0)
{
DateTime now = DateTime.Now;
var updateList = list.Select(p => new SysWorkprocess { Id = p.Id, Status = (int)WorkProcessStatus.Running, EditTime = now });
var updateList = list.Select(p => new SysWorkProcess { ID = p.ID, FuncStatus = (int)WorkProcessStatus.Running });
_sysWorkprocessRepository.Update(updateList.ToList(), it => new { it.Status, it.EditTime });
_SysWorkProcessRepository.Update(updateList.ToList(), it => new { it.FuncStatus });
}
foreach (var process in list)
@@ -175,8 +298,9 @@ namespace MyCode.Project.Services.Implementation
ex = ex.InnerException;
}
process.Status = (int)WorkProcessStatus.Stop;
process.FuncStatus = (int)WorkProcessStatus.ExceptionStop;
process.EditTime = DateTime.Now;
process.ExecuteTime = DateTime.Now;
if (ex is BaseException)
{
process.ExceptionInfo = ex.Message;
@@ -186,124 +310,181 @@ namespace MyCode.Project.Services.Implementation
process.ExceptionInfo = ex.ToString();
}
_sysWorkprocessRepository.Update(process);
DingDingHelper.SendMsg(process.ExceptionInfo);
}
finally
{
client.Ado.Context.Queues.Clear();
_SysWorkProcessRepository.Update(process);
}
#endregion
}
}
#endregion
#region SelectPriority2InitWorkProcess2
/// <summary>
/// 返回优先级等于2的前几十条数据
/// </summary>
/// <param name="top"></param>
/// <returns></returns>
public List<SysWorkProcess> SelectPriority2InitWorkProcess(int top)
{
return _SysWorkProcessRepository.Queryable()
.Where(p => p.FuncStatus == (int)WorkProcessStatus.Waiting && p.Priority == 2)
.Take(top)
.OrderBy(p => p.Priority)
.OrderBy(p => p.CreateTime).ToList();
}
#endregion
#region ExecutePriority2Work2
/// <summary>
/// 调度执行优先级等于2的任务
/// </summary>
public void ExecutePriority2Work()
{
var client = UnityHelper.GetService<MyCodeSqlSugarClient>();
var list = SelectPriority2InitWorkProcess(10);
//先将这10个任务改成运行中
if (list != null && list.Count > 0)
{
var updateList = list.Select(p => new SysWorkProcess { ID = p.ID, FuncStatus = (int)WorkProcessStatus.Running });
_SysWorkProcessRepository.Update(updateList.ToList(), it => new { it.FuncStatus });
}
}
#endregion
#region Add()
/// <summary>
/// 添加调度
/// </summary>
/// <typeparam name="T">执行类</typeparam>
/// <param name="merchantId">商家ID</param>
/// <param name="methodName">方法名</param>
/// <param name="remark">备注</param>
/// <param name="entity">参数信息</param>
/// <param name="funcType">执行类型</param>
public void Add<T>(string methodName,long companyId, object entity = null, string remark = "", FuncType funcType = FuncType.Function, Priority priority = Priority.Low)
{
Add(typeof(T), companyId,methodName, remark, entity, funcType, priority);
}
#endregion
#region Add()
/// <summary>
/// 添加调度
/// </summary>
/// <param name="merchantId">商家ID</param>
/// <param name="type">执行类</param>
/// <param name="methodName">方法名</param>
/// <param name="remark">备注</param>
/// <param name="entity">参数信息</param>
/// <param name="funcType">执行类型</param>
public void Add(Type type,long companyId, string methodName, string remark = "", object entity = null, FuncType funcType = FuncType.Function, Priority priority = Priority.Low)
{
string typePath = string.Format("{0}, {1}", type.FullName, type.Assembly.GetName().Name);
string paramInfo = entity == null
? ""
: entity is string || entity is Guid || entity is int || entity is long || entity is decimal ||
entity is float || entity is double
? entity.ToString()
: entity.ToJson();
Add(typePath,companyId, methodName, remark, paramInfo, funcType, priority);
}
#endregion
#region AddQueue()
public void AddQueue<T>(string methodName, long companyId,object entity = null,FuncType funcType = FuncType.Function, Priority priority = Priority.Low)
{
var type = typeof(T);
string typePath = string.Format("{0}, {1}", type.FullName, type.Assembly.GetName().Name);
string paramInfo = entity == null
? ""
: entity is string || entity is Guid || entity is int || entity is long || entity is decimal ||
entity is float || entity is double
? entity.ToString()
: entity.ToJson();
var workProcess = new SysWorkprocess()
foreach (var process in list)
{
Id = IdHelper.GetNewId(),
FuncType = (byte)funcType,
TypePath = typePath,
MethodName = methodName,
ParameterInfo = paramInfo,
Status = (int)WorkProcessStatus.Init,
EditTime = DateTime.Now,
Priority = (byte)priority,
CreateTime = DateTime.Now,
CompanyId = companyId
};
//这里开启事务,同时才开启
client.Ado.BeginTran();
_sysWorkprocessRepository.AddQueue(workProcess);
#region
try
{
ExecuteSingle(process);
client.Ado.CommitTran();
}
catch (Exception ex)
{
client.Ado.RollbackTran();
while (ex.InnerException != null)
{
ex = ex.InnerException;
}
process.FuncStatus = (int)WorkProcessStatus.ExceptionStop;
process.EditTime = DateTime.Now;
process.ExecuteTime = DateTime.Now;
if (ex is BaseException)
{
process.ExceptionInfo = ex.Message;
}
else
{
process.ExceptionInfo = ex.ToString();
}
_SysWorkProcessRepository.Update(process);
}
#endregion
//Thread.Sleep(300);
}
}
#endregion
#region Add()
#region SelectSmsInitWorkProcess6
/// <summary>
/// 添加调度任务
/// 获取优先级6的短信调度
/// </summary>
/// <param name="merchantId">商家ID</param>
/// <param name="typePath">类型路径Lxm.IServices.IWorkProcessService, Lxm.Services</param>
/// <param name="methodName">方法名</param>
/// <param name="remark">备注</param>
/// <param name="paramInfo">参数信息</param>
/// <param name="funcType">执行类型</param>
public void Add(string typePath,long companyId, string methodName, string remark = "", string paramInfo = "",
FuncType funcType = FuncType.Function,
Priority priority = Priority.Low)
/// <param name="top"></param>
/// <returns></returns>
public List<SysWorkProcess> SelectPriority6WorkProcess(int top)
{
DateTime days = DateTime.Now.Date;
return _SysWorkProcessRepository.Queryable()
.Where(p => p.FuncStatus == (int)WorkProcessStatus.Waiting && p.Priority == 6)
.Take(top)
.OrderBy(p => p.Priority)
.OrderBy(p => p.CreateTime).ToList();
}
#endregion
var entity = new SysWorkprocess()
#region ExecutePriority6=6
/// <summary>
/// 调度执行优先级=6的任务
/// </summary>
public void ExecutePriority6()
{
var client = UnityHelper.GetService<MyCodeSqlSugarClient>();
var list = SelectPriority6WorkProcess(20);
//先将这10个任务改成运行中
if (list != null && list.Count > 0)
{
Id = IdHelper.GetNewId(),
FuncType = (byte)funcType,
TypePath = typePath,
MethodName = methodName,
ParameterInfo = paramInfo,
Status = (int)WorkProcessStatus.Init,
Remark = remark,
EditTime = DateTime.Now,
Priority = (byte)priority,
CreateTime = DateTime.Now,
CompanyId = companyId
};
_sysWorkprocessRepository.Add(entity);
var updateList = list.Select(p => new SysWorkProcess { ID = p.ID, FuncStatus = (int)WorkProcessStatus.Running });
_SysWorkProcessRepository.Update(updateList.ToList(), it => new { it.FuncStatus });
}
foreach (var process in list)
{
//这里开启事务,同时才开启
client.Ado.BeginTran();
#region
try
{
ExecuteSingle(process);
client.Ado.CommitTran();
}
catch (Exception ex)
{
client.Ado.RollbackTran();
while (ex.InnerException != null)
{
ex = ex.InnerException;
}
process.FuncStatus = (int)WorkProcessStatus.ExceptionStop;
process.EditTime = DateTime.Now;
process.ExecuteTime = DateTime.Now;
if (ex is BaseException)
{
process.ExceptionInfo = ex.Message;
}
else
{
process.ExceptionInfo = ex.ToString();
}
_SysWorkProcessRepository.Update(process);
}
#endregion
}
}
#endregion
#region RetryTask()
/// <summary>
/// 重试失败的任务
/// </summary>
public void RetryTask()
{
DateTime today = DateTime.Now.Date.AddDays(-1);
var list = _SysWorkProcessRepository.Queryable().Where(t => t.FuncStatus == 4 && t.RetryCount <= 10
&& t.CreateTime >= today).OrderBy(t => t.EditTime).Take(20).ToList();
list.ForEach(t =>
{
t.RetryCount = t.RetryCount + 1;
t.FuncStatus = 0;
});
_SysWorkProcessRepository.Update(list);
}
#endregion
}