2025-04-21 14:10:27 +08:00

474 lines
17 KiB
C#
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using Microsoft.Practices.Unity;
using MyCode.Project.Domain.Model;
using MyCode.Project.Domain.Repositories;
using MyCode.Project.Infrastructure.Common;
using MyCode.Project.Infrastructure.Enumeration;
using MyCode.Project.Infrastructure.Exceptions;
using MyCode.Project.Infrastructure.Extensions;
using MyCode.Project.Infrastructure.UnityExtensions;
using MyCode.Project.Repositories.Common;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading;
namespace MyCode.Project.Services.Implementation
{
/// <summary>
/// 工作调度模块 相关服务
/// </summary>
public class WorkProcessService : ServiceBase, IWorkProcessService
{
private ISysWorkProcessV2Repository _sysWorkProcessV2Repository;
private static object locker = new object(); //创建锁
public WorkProcessService(ISysWorkProcessV2Repository sysWorkProcessV2Repository)
{
_sysWorkProcessV2Repository = sysWorkProcessV2Repository;
}
#region Add()
/// <summary>
/// 添加调度
/// </summary>
/// <typeparam name="T">执行类</typeparam>
/// <param name="merchantId">商家ID</param>
/// <param name="methodName">方法名</param>
/// <param name="remark">备注</param>
/// <param name="entity">参数信息</param>
/// <param name="funcType">执行类型</param>
public void Add<T>(Guid merchantId, string methodName, string remark = "", object entity = null, int priority = 5, FuncType funcType = FuncType.Method) where T : class
{
Add(merchantId, typeof(T), methodName, remark, entity, funcType, priority);
}
/// <summary>
/// 添加调度
/// </summary>
/// <param name="merchantId">商家ID</param>
/// <param name="type">执行类</param>
/// <param name="methodName">方法名</param>
/// <param name="remark">备注</param>
/// <param name="entity">参数信息</param>
/// <param name="funcType">执行类型</param>
public void Add(Guid merchantId, Type type, string methodName, string remark = "", object entity = null, FuncType funcType = FuncType.Method, int priority = 5)
{
string typePath = string.Format("{0}, {1}", type.FullName, type.Assembly.GetName().Name );
string paramInfo = entity == null
? ""
: entity is string || entity is Guid || entity is int || entity is long || entity is decimal ||
entity is float || entity is double
? entity.SafeString()
: JsonHelper.ToJson(entity);
Add(merchantId, typePath, methodName, remark, paramInfo, funcType, priority);
}
/// <summary>
/// 添加调度任务
/// </summary>
/// <param name="merchantId">商家ID</param>
/// <param name="typePath">类型路径Lxm.IServices.IWorkProcessService, Lxm.Services</param>
/// <param name="methodName">方法名</param>
/// <param name="remark">备注</param>
/// <param name="paramInfo">参数信息</param>
/// <param name="funcType">执行类型</param>
public void Add(Guid merchantId, string typePath, string methodName, string remark = "", string paramInfo = "",
FuncType funcType = FuncType.Method, int priority = 5)
{
if (this._sysWorkProcessV2Repository.Exists(funcType, merchantId, typePath, methodName, paramInfo) && methodName!= "RunWechatVSKingDee")
{
return;
}
SysWorkProcessV2 entity = new SysWorkProcessV2();
entity.ID = Guid.NewGuid();
entity.MerchantID = merchantId;
entity.FuncType = funcType.Value();
entity.FuncClass = typePath;
entity.FuncMethod = methodName;
entity.ParamInfo = paramInfo;
entity.FuncStatus = FuncStatus.Waiting.Value();
entity.Remark = remark;
entity.RetryCount = 0;
entity.Creater = "系统调度";
entity.CreateTime = DateTime.Now;
entity.Editor = "系统调度";
entity.EditTime = DateTime.Now;
entity.Priority = priority;
this._sysWorkProcessV2Repository.Add(entity);
}
#endregion
#region SelectInitWorkProcess
public List<SysWorkProcessV2> SelectInitWorkProcess(int top)
{
return _sysWorkProcessV2Repository.Queryable()
.Where(p => p.FuncStatus == (int)WorkProcessStatus.Waiting && ( p.Priority==1 || p.Priority==null) )
.Take(top)
.OrderBy(p => p.Priority)
.OrderBy(p => p.CreateTime).ToList();
}
#endregion
#region Add()
public void Add(List<SysWorkProcessV2> workProcess)
{
_sysWorkProcessV2Repository.Add(workProcess);
}
#endregion
#region RestratStopProcess()
public void RestratStopProcess()
{
var list = _sysWorkProcessV2Repository.SelectList(p => p.FuncStatus == (int)WorkProcessStatus.Pause);
list.ForEach(x => {
x.FuncStatus = (int)WorkProcessStatus.Running;
x.EditTime = DateTime.Now;
});
_sysWorkProcessV2Repository.Update(list);
}
#endregion
#region RestartStopProcess
[TransactionCallHandler]
public void RestartStopProcess(Guid workprocessId)
{
var workprocess = _sysWorkProcessV2Repository.SelectFirst(p => p.ID == workprocessId);
if (workprocess.FuncStatus != (int)WorkProcessStatus.Pause) { throw new BaseException("当前进程状态不是停止"); }
workprocess.FuncStatus = (int)WorkProcessStatus.Running;
workprocess.EditTime = DateTime.Now;
_sysWorkProcessV2Repository.Update(workprocess);
}
#endregion
#region
public void ClearHistoryTask()
{
var historyDate = DateTime.Now.AddDays(-7);
_sysWorkProcessV2Repository.Delete(p => p.FuncStatus == (int)WorkProcessStatus.Complete && p.EditTime < historyDate);
}
#endregion
#region ExecuteSingle()
public void ExecuteSingle(SysWorkProcessV2 process)
{
var type = UnityHelper.GetUnityContainer().Resolve(Type.GetType(process.FuncClass));
MethodInfo method = type.GetType().GetMethod(process.FuncMethod);
if (!string.IsNullOrEmpty(process.ParamInfo))
{
method.Invoke(type, new object[] { process.ParamInfo });
}
else
{
method.Invoke(type, new object[] { });
}
process.FuncStatus = (int)WorkProcessStatus.Complete;
process.ExecuteTime = DateTime.Now;
process.ExceptionInfo = string.Empty;
_sysWorkProcessV2Repository.Update(process);
}
#endregion
#region Execute
public void Execute()
{
lock (locker)
{
var client = UnityHelper.GetService<MyCodeSqlSugarClient>();
var list = SelectInitWorkProcess(5);
//先将这10个任务改成运行中
if (list != null && list.Count > 0)
{
var updateList = list.Select(p => new SysWorkProcessV2 { ID = p.ID, FuncStatus = (int)WorkProcessStatus.Running });
_sysWorkProcessV2Repository.Update(updateList.ToList(), it => new { it.FuncStatus });
}
foreach (var process in list)
{
//这里开启事务,同时才开启
client.Ado.BeginTran();
#region
try
{
ExecuteSingle(process);
client.Ado.CommitTran();
}
catch (Exception ex)
{
client.Ado.RollbackTran();
while (ex.InnerException != null)
{
ex = ex.InnerException;
}
process.FuncStatus = (int)WorkProcessStatus.ExceptionStop;
process.EditTime = DateTime.Now;
process.ExecuteTime = DateTime.Now;
if (ex is BaseException)
{
process.ExceptionInfo = ex.Message;
}
else
{
process.ExceptionInfo = ex.ToString();
}
_sysWorkProcessV2Repository.Update(process);
}
#endregion
}
}
}
#endregion
#region SelectOtherInitWorkProcess2
/// <summary>
/// 获取优先级34,5的调度
/// </summary>
/// <param name="top"></param>
/// <returns></returns>
public List<SysWorkProcessV2> SelectOtherInitWorkProcess(int top)
{
return _sysWorkProcessV2Repository.Queryable()
.Where(p => p.FuncStatus == (int)WorkProcessStatus.Waiting && p.Priority > 2 && p.Priority <= 5)
.Take(top)
.OrderBy(p => p.Priority)
.OrderBy(p => p.CreateTime).ToList();
}
#endregion
#region ExecuteOther
/// <summary>
/// 调度执行优先级比较低的任务
/// </summary>
public void ExecuteOther()
{
var client = UnityHelper.GetService<MyCodeSqlSugarClient>();
var list = SelectOtherInitWorkProcess(20);
//先将这10个任务改成运行中
if (list != null && list.Count > 0)
{
var updateList = list.Select(p => new SysWorkProcessV2 { ID = p.ID, FuncStatus = (int)WorkProcessStatus.Running });
_sysWorkProcessV2Repository.Update(updateList.ToList(), it => new { it.FuncStatus });
}
foreach (var process in list)
{
//这里开启事务,同时才开启
client.Ado.BeginTran();
#region
try
{
ExecuteSingle(process);
client.Ado.CommitTran();
}
catch (Exception ex)
{
client.Ado.RollbackTran();
while (ex.InnerException != null)
{
ex = ex.InnerException;
}
process.FuncStatus = (int)WorkProcessStatus.ExceptionStop;
process.EditTime = DateTime.Now;
process.ExecuteTime = DateTime.Now;
if (ex is BaseException)
{
process.ExceptionInfo = ex.Message;
}
else
{
process.ExceptionInfo = ex.ToString();
}
_sysWorkProcessV2Repository.Update(process);
}
#endregion
}
}
#endregion
#region SelectPriority2InitWorkProcess2
/// <summary>
/// 返回优先级等于2的前几十条数据
/// </summary>
/// <param name="top"></param>
/// <returns></returns>
public List<SysWorkProcessV2> SelectPriority2InitWorkProcess(int top)
{
return _sysWorkProcessV2Repository.Queryable()
.Where(p => p.FuncStatus == (int)WorkProcessStatus.Waiting && p.Priority == 2)
.Take(top)
.OrderBy(p => p.Priority)
.OrderBy(p => p.CreateTime).ToList();
}
#endregion
#region ExecutePriority2Work2
/// <summary>
/// 调度执行优先级等于2的任务
/// </summary>
public void ExecutePriority2Work()
{
var client = UnityHelper.GetService<MyCodeSqlSugarClient>();
var list = SelectPriority2InitWorkProcess(10);
//先将这10个任务改成运行中
if (list != null && list.Count > 0)
{
var updateList = list.Select(p => new SysWorkProcessV2 { ID = p.ID, FuncStatus = (int)WorkProcessStatus.Running });
_sysWorkProcessV2Repository.Update(updateList.ToList(), it => new { it.FuncStatus });
}
foreach (var process in list)
{
//这里开启事务,同时才开启
client.Ado.BeginTran();
#region
try
{
ExecuteSingle(process);
client.Ado.CommitTran();
}
catch (Exception ex)
{
client.Ado.RollbackTran();
while (ex.InnerException != null)
{
ex = ex.InnerException;
}
process.FuncStatus = (int)WorkProcessStatus.ExceptionStop;
process.EditTime = DateTime.Now;
process.ExecuteTime = DateTime.Now;
if (ex is BaseException)
{
process.ExceptionInfo = ex.Message;
}
else
{
process.ExceptionInfo = ex.ToString();
}
_sysWorkProcessV2Repository.Update(process);
}
#endregion
//Thread.Sleep(300);
}
}
#endregion
#region SelectSmsInitWorkProcess6
/// <summary>
/// 获取优先级6的短信调度
/// </summary>
/// <param name="top"></param>
/// <returns></returns>
public List<SysWorkProcessV2> SelectPriority6WorkProcess(int top)
{
DateTime days = DateTime.Now.Date;
return _sysWorkProcessV2Repository.Queryable()
.Where(p => p.FuncStatus == (int)WorkProcessStatus.Waiting && p.Priority == 6)
.Take(top)
.OrderBy(p => p.Priority)
.OrderBy(p => p.CreateTime).ToList();
}
#endregion
#region ExecutePriority6=6
/// <summary>
/// 调度执行优先级=6的任务
/// </summary>
public void ExecutePriority6()
{
var client = UnityHelper.GetService<MyCodeSqlSugarClient>();
var list = SelectPriority6WorkProcess(20);
//先将这10个任务改成运行中
if (list != null && list.Count > 0)
{
var updateList = list.Select(p => new SysWorkProcessV2 { ID = p.ID, FuncStatus = (int)WorkProcessStatus.Running });
_sysWorkProcessV2Repository.Update(updateList.ToList(), it => new { it.FuncStatus });
}
foreach (var process in list)
{
//这里开启事务,同时才开启
client.Ado.BeginTran();
#region
try
{
ExecuteSingle(process);
client.Ado.CommitTran();
}
catch (Exception ex)
{
client.Ado.RollbackTran();
while (ex.InnerException != null)
{
ex = ex.InnerException;
}
process.FuncStatus = (int)WorkProcessStatus.ExceptionStop;
process.EditTime = DateTime.Now;
process.ExecuteTime = DateTime.Now;
if (ex is BaseException)
{
process.ExceptionInfo = ex.Message;
}
else
{
process.ExceptionInfo = ex.ToString();
}
_sysWorkProcessV2Repository.Update(process);
}
#endregion
}
}
#endregion
}
}