1
This commit is contained in:
@@ -1,10 +0,0 @@
|
||||
using MyCode.Project.Infrastructure.JackYun;
|
||||
|
||||
namespace MyCode.Project.Services
|
||||
{
|
||||
public interface IJackYunService
|
||||
{
|
||||
JackyunResponse Call(string method, string version, BaseRequestBizData bizData);
|
||||
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,10 @@
|
||||
using MyCode.Project.Infrastructure.JackYun;
|
||||
|
||||
namespace MyCode.Project.Services
|
||||
{
|
||||
public interface ILiQiongHaiService
|
||||
{
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,10 @@
|
||||
using MyCode.Project.Infrastructure.JackYun;
|
||||
|
||||
namespace MyCode.Project.Services
|
||||
{
|
||||
public interface IYuyuboService
|
||||
{
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,10 @@
|
||||
using MyCode.Project.Infrastructure.JackYun;
|
||||
|
||||
namespace MyCode.Project.Services
|
||||
{
|
||||
public interface IZhuBinService
|
||||
{
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
@@ -1,75 +0,0 @@
|
||||
using MyCode.Project.Domain.Message.Request.JackYun;
|
||||
using MyCode.Project.Infrastructure.Common;
|
||||
using MyCode.Project.Infrastructure.JackYun;
|
||||
using System;
|
||||
|
||||
namespace MyCode.Project.Services.Implementation
|
||||
{
|
||||
public class JackYunService : ServiceBase //, IJackYunService
|
||||
{
|
||||
/// <summary>
|
||||
/// 请求开放平台服务
|
||||
/// </summary>
|
||||
/// <param name="method">开放接口方法名</param>
|
||||
/// <param name="version">开放接口版本号(null表示默认)</param>
|
||||
/// <param name="bizData">请求业务数据</param>
|
||||
/// <returns>OpenResponse返回对象</returns>
|
||||
public JackyunResponse Call(string method, string version, BaseRequestBizData bizData)
|
||||
{
|
||||
//接口返回值
|
||||
JackyunResponse response = null;
|
||||
//返回值字符串
|
||||
string strResponse = null;
|
||||
try
|
||||
{
|
||||
//请求吉客云开放接口。
|
||||
strResponse = JackyunOpenHttpUtils.Post(method, version, bizData);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
response = new JackyunResponse();
|
||||
response.onFail(ex.Message, "CLIENT_EXCEPTION");
|
||||
return response;
|
||||
}
|
||||
|
||||
return JsonHelper.ToObject<JackyunResponse>(strResponse);
|
||||
}
|
||||
|
||||
|
||||
#region 订单查询
|
||||
|
||||
/// <summary>
|
||||
/// 订单查询
|
||||
/// </summary>
|
||||
|
||||
public void testTradeFullInfoGet()
|
||||
{
|
||||
Byte total = 0;
|
||||
OrderTradeFullInfoGetRequestBizData requestBizData = new OrderTradeFullInfoGetRequestBizData();
|
||||
string value = "2019-06-05 12:00:00";
|
||||
requestBizData.startModified = DateTime.ParseExact(value, "yyyy-MM-dd HH:mm:ss", System.Globalization.CultureInfo.InvariantCulture, System.Globalization.DateTimeStyles.None);
|
||||
string value1 = "2019-09-05 12:00:00";
|
||||
requestBizData.endModified = DateTime.ParseExact(value1, "yyyy-MM-dd HH:mm:ss", System.Globalization.CultureInfo.InvariantCulture, System.Globalization.DateTimeStyles.None);
|
||||
requestBizData.tradeNo = "JY201906050001,JY201906050002";
|
||||
requestBizData.hasTotal = total;
|
||||
requestBizData.pageSize = 50;
|
||||
requestBizData.pageIndex = 0;
|
||||
requestBizData.fields = "tradeNo,orderNo";
|
||||
requestBizData.startCreated = DateTime.ParseExact(value, "yyyy-MM-dd HH:mm:ss", System.Globalization.CultureInfo.InvariantCulture, System.Globalization.DateTimeStyles.None);
|
||||
requestBizData.endCreated = DateTime.ParseExact(value, "yyyy-MM-dd HH:mm:ss", System.Globalization.CultureInfo.InvariantCulture, System.Globalization.DateTimeStyles.None);
|
||||
requestBizData.startAuditTime = DateTime.ParseExact(value, "yyyy-MM-dd HH:mm:ss", System.Globalization.CultureInfo.InvariantCulture, System.Globalization.DateTimeStyles.None);
|
||||
requestBizData.endAuditTime = DateTime.ParseExact(value, "yyyy-MM-dd HH:mm:ss", System.Globalization.CultureInfo.InvariantCulture, System.Globalization.DateTimeStyles.None);
|
||||
requestBizData.startConsignTime = DateTime.ParseExact(value, "yyyy-MM-dd HH:mm:ss", System.Globalization.CultureInfo.InvariantCulture, System.Globalization.DateTimeStyles.None);
|
||||
requestBizData.endConsignTime = DateTime.ParseExact(value, "yyyy-MM-dd HH:mm:ss", System.Globalization.CultureInfo.InvariantCulture, System.Globalization.DateTimeStyles.None);
|
||||
requestBizData.tradeStatus = Convert.ToByte(1010);
|
||||
requestBizData.tradeType = 1;
|
||||
requestBizData.sourceTradeNos = "10,026,635,314";
|
||||
requestBizData.shopIds = new long[] { 378761130654261100, 378761130654261100 };
|
||||
|
||||
//JackyunResponse response = Call(EnumAttribute.GetAttribute(ApiEnum.TRADEFULLINFOGET).Value, "", requestBizData);
|
||||
//Console.Out.WriteLine("订单查询接口响应信息=" + JsonUtils.ToJson(response));
|
||||
}
|
||||
|
||||
#endregion
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,21 @@
|
||||
using MyCode.Project.Domain.Message.Request.JackYun;
|
||||
using MyCode.Project.Domain.Repositories;
|
||||
using MyCode.Project.Infrastructure.Common;
|
||||
using MyCode.Project.Infrastructure.JackYun;
|
||||
using System;
|
||||
|
||||
namespace MyCode.Project.Services.Implementation
|
||||
{
|
||||
public class LiQiongHaiService : ServiceBase //, ILiQiongHaiService
|
||||
{
|
||||
|
||||
private IPushKingDeeOrderRepository _pushKingDeeOrderRepository;
|
||||
|
||||
public LiQiongHaiService(IPushKingDeeOrderRepository pushKingDeeOrderRepository)
|
||||
{
|
||||
_pushKingDeeOrderRepository = pushKingDeeOrderRepository;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
@@ -7,6 +7,7 @@ using MyCode.Project.Infrastructure.Enumeration;
|
||||
using MyCode.Project.Infrastructure.Exceptions;
|
||||
using MyCode.Project.Infrastructure.Extensions;
|
||||
using MyCode.Project.Infrastructure.UnityExtensions;
|
||||
using MyCode.Project.Repositories;
|
||||
using MyCode.Project.Repositories.Common;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
@@ -20,118 +21,169 @@ namespace MyCode.Project.Services.Implementation
|
||||
{
|
||||
public class WorkProcessService : ServiceBase, IWorkProcessService
|
||||
{
|
||||
#region 初始化
|
||||
private readonly ISysWorkprocessRepository _sysWorkprocessRepository;
|
||||
private readonly ISysWorkprocessHistoryRepository _sysWorkprocessHistoryRepository;
|
||||
private ISysWorkProcessRepository _SysWorkProcessRepository;
|
||||
|
||||
public WorkProcessService(ISysWorkprocessRepository sysWorkprocessRepository,
|
||||
ISysWorkprocessHistoryRepository sysWorkprocessHistoryRepository)
|
||||
private static object locker = new object(); //创建锁
|
||||
public WorkProcessService(ISysWorkProcessRepository SysWorkProcessRepository)
|
||||
{
|
||||
_sysWorkprocessRepository = sysWorkprocessRepository;
|
||||
_sysWorkprocessHistoryRepository = sysWorkprocessHistoryRepository;
|
||||
_SysWorkProcessRepository = SysWorkProcessRepository;
|
||||
}
|
||||
|
||||
|
||||
#region Add(添加调度)
|
||||
/// <summary>
|
||||
/// 添加调度
|
||||
/// </summary>
|
||||
/// <typeparam name="T">执行类</typeparam>
|
||||
/// <param name="merchantId">商家ID</param>
|
||||
/// <param name="methodName">方法名</param>
|
||||
/// <param name="remark">备注</param>
|
||||
/// <param name="entity">参数信息</param>
|
||||
/// <param name="funcType">执行类型</param>
|
||||
public void Add<T>(Guid merchantId, string methodName, string remark = "", object entity = null, int priority = 5, FuncType funcType = FuncType.Method) where T : class
|
||||
{
|
||||
Add(merchantId, typeof(T), methodName, remark, entity, funcType, priority);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 添加调度
|
||||
/// </summary>
|
||||
/// <param name="merchantId">商家ID</param>
|
||||
/// <param name="type">执行类</param>
|
||||
/// <param name="methodName">方法名</param>
|
||||
/// <param name="remark">备注</param>
|
||||
/// <param name="entity">参数信息</param>
|
||||
/// <param name="funcType">执行类型</param>
|
||||
public void Add(Guid merchantId, Type type, string methodName, string remark = "", object entity = null, FuncType funcType = FuncType.Method, int priority = 5)
|
||||
{
|
||||
string typePath = string.Format("{0}, {1}", type.FullName, type.Assembly.GetName().Name);
|
||||
string paramInfo = entity == null
|
||||
? ""
|
||||
: entity is string || entity is Guid || entity is int || entity is long || entity is decimal ||
|
||||
entity is float || entity is double
|
||||
? entity.SafeString()
|
||||
: JsonHelper.ToJson(entity);
|
||||
Add(merchantId, typePath, methodName, remark, paramInfo, funcType, priority);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 添加调度任务
|
||||
/// </summary>
|
||||
/// <param name="merchantId">商家ID</param>
|
||||
/// <param name="typePath">类型路径,如:Lxm.IServices.IWorkProcessService, Lxm.Services</param>
|
||||
/// <param name="methodName">方法名</param>
|
||||
/// <param name="remark">备注</param>
|
||||
/// <param name="paramInfo">参数信息</param>
|
||||
/// <param name="funcType">执行类型</param>
|
||||
public void Add(Guid merchantId, string typePath, string methodName, string remark = "", string paramInfo = "",
|
||||
FuncType funcType = FuncType.Method, int priority = 5)
|
||||
{
|
||||
if (this._SysWorkProcessRepository.Exists(funcType, merchantId, typePath, methodName, paramInfo) && methodName != "RunWechatVSKingDee")
|
||||
{
|
||||
return;
|
||||
}
|
||||
SysWorkProcess entity = new SysWorkProcess();
|
||||
entity.ID = Guid.NewGuid();
|
||||
entity.MerchantID = merchantId;
|
||||
entity.FuncType = funcType.Value();
|
||||
entity.FuncClass = typePath;
|
||||
entity.FuncMethod = methodName;
|
||||
entity.ParamInfo = paramInfo;
|
||||
entity.FuncStatus = FuncStatus.Waiting.Value();
|
||||
entity.Remark = remark;
|
||||
entity.RetryCount = 0;
|
||||
entity.Creater = "系统调度";
|
||||
entity.CreateTime = DateTime.Now;
|
||||
entity.Editor = "系统调度";
|
||||
entity.EditTime = DateTime.Now;
|
||||
entity.Priority = priority;
|
||||
this._SysWorkProcessRepository.Add(entity);
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
|
||||
|
||||
#region SelectInitWorkProcess(返回前几十条数据)
|
||||
public List<SysWorkprocess> SelectInitWorkProcess(int top)
|
||||
public List<SysWorkProcess> SelectInitWorkProcess(int top)
|
||||
{
|
||||
var result = _sysWorkprocessRepository.Queryable()
|
||||
.Where(p => p.Status == (int)WorkProcessStatus.Init)
|
||||
|
||||
return _SysWorkProcessRepository.Queryable()
|
||||
.Where(p => p.FuncStatus == (int)WorkProcessStatus.Waiting && (p.Priority == 1 || p.Priority == null))
|
||||
.Take(top)
|
||||
.OrderBy(p => p.Priority)
|
||||
.OrderBy(p => p.EditTime).ToList();
|
||||
|
||||
return result;
|
||||
.OrderBy(p => p.CreateTime).ToList();
|
||||
}
|
||||
#endregion
|
||||
|
||||
#region Add(批量添加)
|
||||
public void Add(List<SysWorkprocess> workProcess)
|
||||
public void Add(List<SysWorkProcess> workProcess)
|
||||
{
|
||||
_sysWorkprocessRepository.Add(workProcess);
|
||||
_SysWorkProcessRepository.Add(workProcess);
|
||||
}
|
||||
#endregion
|
||||
|
||||
#region RestratStopProcess(重新启用所有暂停了的调度,这里不需要事务,因为修改失败也不影响)
|
||||
public void RestratStopProcess()
|
||||
{
|
||||
var list = _sysWorkprocessRepository.SelectList(p => p.Status == (int)WorkProcessStatus.Stop);
|
||||
var list = _SysWorkProcessRepository.SelectList(p => p.FuncStatus == (int)WorkProcessStatus.Pause);
|
||||
list.ForEach(x => {
|
||||
x.Status = (int)WorkProcessStatus.Running;
|
||||
x.FuncStatus = (int)WorkProcessStatus.Running;
|
||||
x.EditTime = DateTime.Now;
|
||||
_sysWorkprocessRepository.Update(x);
|
||||
|
||||
});
|
||||
_SysWorkProcessRepository.Update(list);
|
||||
}
|
||||
#endregion
|
||||
|
||||
#region RestartStopProcess(重新启用某个暂停了的调度)
|
||||
[TransactionCallHandler]
|
||||
public void RestartStopProcess(long workprocessId)
|
||||
public void RestartStopProcess(Guid workprocessId)
|
||||
{
|
||||
var workprocess = _SysWorkProcessRepository.SelectFirst(p => p.ID == workprocessId);
|
||||
|
||||
var workprocess = _sysWorkprocessRepository.SelectFirst(p => p.Id == workprocessId);
|
||||
if (workprocess.FuncStatus != (int)WorkProcessStatus.Pause) { throw new BaseException("当前进程状态不是停止"); }
|
||||
|
||||
if (workprocess.Status != (int)WorkProcessStatus.Stop) { throw new BaseException("当前进程状态不是停止"); }
|
||||
|
||||
workprocess.Status = (int)WorkProcessStatus.Init;
|
||||
workprocess.FuncStatus = (int)WorkProcessStatus.Running;
|
||||
workprocess.EditTime = DateTime.Now;
|
||||
_sysWorkprocessRepository.Update(workprocess);
|
||||
_SysWorkProcessRepository.Update(workprocess);
|
||||
|
||||
}
|
||||
#endregion
|
||||
|
||||
#region ProcessHandleExpire(进行执行超时)
|
||||
/// <summary>
|
||||
/// 进程执行超时
|
||||
/// </summary>
|
||||
public void ProcessHandleExpire()
|
||||
{
|
||||
var time = DateTime.Now.AddMinutes(-10);
|
||||
|
||||
var count = _sysWorkprocessRepository.Count(p => p.Status == (int)WorkProcessStatus.Running && p.EditTime <= time);
|
||||
|
||||
if (count > 0) { DingDingHelper.SendMsg($"有超时的进程任务产生,条数:{count}"); }
|
||||
}
|
||||
#endregion
|
||||
|
||||
#region 清空历史数据
|
||||
[TransactionCallHandler]
|
||||
public void ClearHistoryTask()
|
||||
{
|
||||
var historyDate = DateTime.Now.AddDays(-7);
|
||||
|
||||
var historyData = _sysWorkprocessRepository.SelectList(p => p.CreateTime < historyDate);
|
||||
|
||||
if (historyData == null || historyData.Count == 0) { return; }
|
||||
|
||||
_sysWorkprocessRepository.Add(historyData, "sys_workprocess_history");
|
||||
|
||||
_sysWorkprocessRepository.DeleteByIds(historyData.Select(p => p.Id).ToArray());
|
||||
_SysWorkProcessRepository.Delete(p => p.FuncStatus == (int)WorkProcessStatus.Complete && p.EditTime < historyDate);
|
||||
}
|
||||
#endregion
|
||||
|
||||
#region ExecuteSingle(执行单个)
|
||||
public void ExecuteSingle(SysWorkprocess process)
|
||||
public void ExecuteSingle(SysWorkProcess process)
|
||||
{
|
||||
|
||||
var type = UnityHelper.GetUnityContainer().Resolve(Type.GetType(process.TypePath));
|
||||
var type = UnityHelper.GetUnityContainer().Resolve(Type.GetType(process.FuncClass));
|
||||
|
||||
MethodInfo method = type.GetType().GetMethod(process.MethodName);
|
||||
MethodInfo method = type.GetType().GetMethod(process.FuncMethod);
|
||||
object result = new object();
|
||||
|
||||
if (!string.IsNullOrEmpty(process.ParameterInfo))
|
||||
if (!string.IsNullOrEmpty(process.ParamInfo))
|
||||
{
|
||||
method.Invoke(type, new object[] { process.ParameterInfo });
|
||||
result = method.Invoke(type, new object[] { process.ParamInfo });
|
||||
}
|
||||
else
|
||||
{
|
||||
method.Invoke(type, new object[] { });
|
||||
result = method.Invoke(type, new object[] { });
|
||||
}
|
||||
|
||||
process.Status = (int)WorkProcessStatus.Finished;
|
||||
if (result == null)
|
||||
result = "";
|
||||
process.FuncStatus = (int)WorkProcessStatus.Complete;
|
||||
process.ExecuteTime = DateTime.Now;
|
||||
process.ExceptionInfo = string.Empty;
|
||||
process.EditTime = DateTime.Now;
|
||||
_sysWorkprocessRepository.Update(process);
|
||||
process.Result = JsonHelper.ToJson(result);
|
||||
_SysWorkProcessRepository.Update(process);
|
||||
|
||||
}
|
||||
#endregion
|
||||
@@ -140,18 +192,94 @@ namespace MyCode.Project.Services.Implementation
|
||||
|
||||
public void Execute()
|
||||
{
|
||||
lock (locker)
|
||||
{
|
||||
var client = UnityHelper.GetService<MyCodeSqlSugarClient>();
|
||||
|
||||
var list = SelectInitWorkProcess(5);
|
||||
|
||||
//先将这10个任务改成运行中
|
||||
if (list != null && list.Count > 0)
|
||||
{
|
||||
var updateList = list.Select(p => new SysWorkProcess { ID = p.ID, FuncStatus = (int)WorkProcessStatus.Running });
|
||||
|
||||
_SysWorkProcessRepository.Update(updateList.ToList(), it => new { it.FuncStatus });
|
||||
}
|
||||
|
||||
foreach (var process in list)
|
||||
{
|
||||
//这里开启事务,同时才开启
|
||||
client.Ado.BeginTran();
|
||||
|
||||
#region 执行一个任务
|
||||
try
|
||||
{
|
||||
ExecuteSingle(process);
|
||||
|
||||
client.Ado.CommitTran();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
client.Ado.RollbackTran();
|
||||
|
||||
while (ex.InnerException != null)
|
||||
{
|
||||
ex = ex.InnerException;
|
||||
}
|
||||
|
||||
process.FuncStatus = (int)WorkProcessStatus.ExceptionStop;
|
||||
process.EditTime = DateTime.Now;
|
||||
process.ExecuteTime = DateTime.Now;
|
||||
if (ex is BaseException)
|
||||
{
|
||||
process.ExceptionInfo = ex.Message;
|
||||
}
|
||||
else
|
||||
{
|
||||
process.ExceptionInfo = ex.ToString();
|
||||
}
|
||||
|
||||
_SysWorkProcessRepository.Update(process);
|
||||
}
|
||||
#endregion
|
||||
}
|
||||
}
|
||||
}
|
||||
#endregion
|
||||
|
||||
#region SelectOtherInitWorkProcess(返回优先级大于2的前几十条数据)
|
||||
/// <summary>
|
||||
/// 获取优先级3,4,5的调度
|
||||
/// </summary>
|
||||
/// <param name="top"></param>
|
||||
/// <returns></returns>
|
||||
public List<SysWorkProcess> SelectOtherInitWorkProcess(int top)
|
||||
{
|
||||
|
||||
return _SysWorkProcessRepository.Queryable()
|
||||
.Where(p => p.FuncStatus == (int)WorkProcessStatus.Waiting && p.Priority > 2 && p.Priority <= 5)
|
||||
.Take(top)
|
||||
.OrderBy(p => p.Priority)
|
||||
.OrderBy(p => p.CreateTime).ToList();
|
||||
}
|
||||
#endregion
|
||||
|
||||
#region ExecuteOther(调度执行优先级比较低的任务)
|
||||
/// <summary>
|
||||
/// 调度执行优先级比较低的任务
|
||||
/// </summary>
|
||||
public void ExecuteOther()
|
||||
{
|
||||
var client = UnityHelper.GetService<MyCodeSqlSugarClient>();
|
||||
|
||||
var list = SelectInitWorkProcess(10);
|
||||
var list = SelectOtherInitWorkProcess(20);
|
||||
|
||||
//先将这20个任务改成运行中
|
||||
//先将这10个任务改成运行中
|
||||
if (list != null && list.Count > 0)
|
||||
{
|
||||
DateTime now = DateTime.Now;
|
||||
var updateList = list.Select(p => new SysWorkprocess { Id = p.Id, Status = (int)WorkProcessStatus.Running, EditTime = now });
|
||||
var updateList = list.Select(p => new SysWorkProcess { ID = p.ID, FuncStatus = (int)WorkProcessStatus.Running });
|
||||
|
||||
_sysWorkprocessRepository.Update(updateList.ToList(), it => new { it.Status, it.EditTime });
|
||||
_SysWorkProcessRepository.Update(updateList.ToList(), it => new { it.FuncStatus });
|
||||
}
|
||||
|
||||
foreach (var process in list)
|
||||
@@ -175,8 +303,9 @@ namespace MyCode.Project.Services.Implementation
|
||||
ex = ex.InnerException;
|
||||
}
|
||||
|
||||
process.Status = (int)WorkProcessStatus.Stop;
|
||||
process.FuncStatus = (int)WorkProcessStatus.ExceptionStop;
|
||||
process.EditTime = DateTime.Now;
|
||||
process.ExecuteTime = DateTime.Now;
|
||||
if (ex is BaseException)
|
||||
{
|
||||
process.ExceptionInfo = ex.Message;
|
||||
@@ -186,124 +315,181 @@ namespace MyCode.Project.Services.Implementation
|
||||
process.ExceptionInfo = ex.ToString();
|
||||
}
|
||||
|
||||
_sysWorkprocessRepository.Update(process);
|
||||
|
||||
DingDingHelper.SendMsg(process.ExceptionInfo);
|
||||
}
|
||||
finally
|
||||
{
|
||||
client.Ado.Context.Queues.Clear();
|
||||
_SysWorkProcessRepository.Update(process);
|
||||
}
|
||||
#endregion
|
||||
}
|
||||
}
|
||||
#endregion
|
||||
|
||||
#region SelectPriority2InitWorkProcess(返回优先级等于2的前几十条数据)
|
||||
/// <summary>
|
||||
/// 返回优先级等于2的前几十条数据
|
||||
/// </summary>
|
||||
/// <param name="top"></param>
|
||||
/// <returns></returns>
|
||||
public List<SysWorkProcess> SelectPriority2InitWorkProcess(int top)
|
||||
{
|
||||
|
||||
return _SysWorkProcessRepository.Queryable()
|
||||
.Where(p => p.FuncStatus == (int)WorkProcessStatus.Waiting && p.Priority == 2)
|
||||
.Take(top)
|
||||
.OrderBy(p => p.Priority)
|
||||
.OrderBy(p => p.CreateTime).ToList();
|
||||
}
|
||||
#endregion
|
||||
|
||||
#region ExecutePriority2Work(调度执行优先级等于2的任务)
|
||||
/// <summary>
|
||||
/// 调度执行优先级等于2的任务
|
||||
/// </summary>
|
||||
public void ExecutePriority2Work()
|
||||
{
|
||||
var client = UnityHelper.GetService<MyCodeSqlSugarClient>();
|
||||
|
||||
var list = SelectPriority2InitWorkProcess(10);
|
||||
|
||||
//先将这10个任务改成运行中
|
||||
if (list != null && list.Count > 0)
|
||||
{
|
||||
var updateList = list.Select(p => new SysWorkProcess { ID = p.ID, FuncStatus = (int)WorkProcessStatus.Running });
|
||||
|
||||
_SysWorkProcessRepository.Update(updateList.ToList(), it => new { it.FuncStatus });
|
||||
}
|
||||
|
||||
}
|
||||
#endregion
|
||||
|
||||
#region Add(添加调度)
|
||||
|
||||
/// <summary>
|
||||
/// 添加调度
|
||||
/// </summary>
|
||||
/// <typeparam name="T">执行类</typeparam>
|
||||
/// <param name="merchantId">商家ID</param>
|
||||
/// <param name="methodName">方法名</param>
|
||||
/// <param name="remark">备注</param>
|
||||
/// <param name="entity">参数信息</param>
|
||||
/// <param name="funcType">执行类型</param>
|
||||
public void Add<T>(string methodName,long companyId, object entity = null, string remark = "", FuncType funcType = FuncType.Function, Priority priority = Priority.Low)
|
||||
{
|
||||
Add(typeof(T), companyId,methodName, remark, entity, funcType, priority);
|
||||
}
|
||||
#endregion
|
||||
|
||||
#region Add(添加调度)
|
||||
/// <summary>
|
||||
/// 添加调度
|
||||
/// </summary>
|
||||
/// <param name="merchantId">商家ID</param>
|
||||
/// <param name="type">执行类</param>
|
||||
/// <param name="methodName">方法名</param>
|
||||
/// <param name="remark">备注</param>
|
||||
/// <param name="entity">参数信息</param>
|
||||
/// <param name="funcType">执行类型</param>
|
||||
public void Add(Type type,long companyId, string methodName, string remark = "", object entity = null, FuncType funcType = FuncType.Function, Priority priority = Priority.Low)
|
||||
{
|
||||
string typePath = string.Format("{0}, {1}", type.FullName, type.Assembly.GetName().Name);
|
||||
string paramInfo = entity == null
|
||||
? ""
|
||||
: entity is string || entity is Guid || entity is int || entity is long || entity is decimal ||
|
||||
entity is float || entity is double
|
||||
? entity.ToString()
|
||||
: entity.ToJson();
|
||||
Add(typePath,companyId, methodName, remark, paramInfo, funcType, priority);
|
||||
}
|
||||
#endregion
|
||||
|
||||
#region AddQueue(添加进程调度)
|
||||
public void AddQueue<T>(string methodName, long companyId,object entity = null,FuncType funcType = FuncType.Function, Priority priority = Priority.Low)
|
||||
{
|
||||
var type = typeof(T);
|
||||
|
||||
string typePath = string.Format("{0}, {1}", type.FullName, type.Assembly.GetName().Name);
|
||||
|
||||
string paramInfo = entity == null
|
||||
? ""
|
||||
: entity is string || entity is Guid || entity is int || entity is long || entity is decimal ||
|
||||
entity is float || entity is double
|
||||
? entity.ToString()
|
||||
: entity.ToJson();
|
||||
|
||||
var workProcess = new SysWorkprocess()
|
||||
foreach (var process in list)
|
||||
{
|
||||
Id = IdHelper.GetNewId(),
|
||||
FuncType = (byte)funcType,
|
||||
TypePath = typePath,
|
||||
MethodName = methodName,
|
||||
ParameterInfo = paramInfo,
|
||||
Status = (int)WorkProcessStatus.Init,
|
||||
EditTime = DateTime.Now,
|
||||
Priority = (byte)priority,
|
||||
CreateTime = DateTime.Now,
|
||||
CompanyId = companyId
|
||||
|
||||
};
|
||||
//这里开启事务,同时才开启
|
||||
client.Ado.BeginTran();
|
||||
|
||||
_sysWorkprocessRepository.AddQueue(workProcess);
|
||||
#region 执行一个任务
|
||||
try
|
||||
{
|
||||
ExecuteSingle(process);
|
||||
|
||||
client.Ado.CommitTran();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
client.Ado.RollbackTran();
|
||||
|
||||
while (ex.InnerException != null)
|
||||
{
|
||||
ex = ex.InnerException;
|
||||
}
|
||||
|
||||
process.FuncStatus = (int)WorkProcessStatus.ExceptionStop;
|
||||
process.EditTime = DateTime.Now;
|
||||
process.ExecuteTime = DateTime.Now;
|
||||
if (ex is BaseException)
|
||||
{
|
||||
process.ExceptionInfo = ex.Message;
|
||||
}
|
||||
else
|
||||
{
|
||||
process.ExceptionInfo = ex.ToString();
|
||||
}
|
||||
|
||||
_SysWorkProcessRepository.Update(process);
|
||||
}
|
||||
#endregion
|
||||
//Thread.Sleep(300);
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
#endregion
|
||||
|
||||
#region Add(添加调度)
|
||||
#region SelectSmsInitWorkProcess(获取优先级6的短信调度)
|
||||
/// <summary>
|
||||
/// 添加调度任务
|
||||
/// 获取优先级6的短信调度
|
||||
/// </summary>
|
||||
/// <param name="merchantId">商家ID</param>
|
||||
/// <param name="typePath">类型路径,如:Lxm.IServices.IWorkProcessService, Lxm.Services</param>
|
||||
/// <param name="methodName">方法名</param>
|
||||
/// <param name="remark">备注</param>
|
||||
/// <param name="paramInfo">参数信息</param>
|
||||
/// <param name="funcType">执行类型</param>
|
||||
public void Add(string typePath,long companyId, string methodName, string remark = "", string paramInfo = "",
|
||||
FuncType funcType = FuncType.Function,
|
||||
Priority priority = Priority.Low)
|
||||
/// <param name="top"></param>
|
||||
/// <returns></returns>
|
||||
public List<SysWorkProcess> SelectPriority6WorkProcess(int top)
|
||||
{
|
||||
DateTime days = DateTime.Now.Date;
|
||||
return _SysWorkProcessRepository.Queryable()
|
||||
.Where(p => p.FuncStatus == (int)WorkProcessStatus.Waiting && p.Priority == 6)
|
||||
.Take(top)
|
||||
.OrderBy(p => p.Priority)
|
||||
.OrderBy(p => p.CreateTime).ToList();
|
||||
}
|
||||
#endregion
|
||||
|
||||
var entity = new SysWorkprocess()
|
||||
#region ExecutePriority6(调度执行优先级=6的任务)
|
||||
/// <summary>
|
||||
/// 调度执行优先级=6的任务
|
||||
/// </summary>
|
||||
public void ExecutePriority6()
|
||||
{
|
||||
var client = UnityHelper.GetService<MyCodeSqlSugarClient>();
|
||||
|
||||
var list = SelectPriority6WorkProcess(20);
|
||||
|
||||
//先将这10个任务改成运行中
|
||||
if (list != null && list.Count > 0)
|
||||
{
|
||||
Id = IdHelper.GetNewId(),
|
||||
FuncType = (byte)funcType,
|
||||
TypePath = typePath,
|
||||
MethodName = methodName,
|
||||
ParameterInfo = paramInfo,
|
||||
Status = (int)WorkProcessStatus.Init,
|
||||
Remark = remark,
|
||||
EditTime = DateTime.Now,
|
||||
Priority = (byte)priority,
|
||||
CreateTime = DateTime.Now,
|
||||
CompanyId = companyId
|
||||
};
|
||||
_sysWorkprocessRepository.Add(entity);
|
||||
var updateList = list.Select(p => new SysWorkProcess { ID = p.ID, FuncStatus = (int)WorkProcessStatus.Running });
|
||||
|
||||
_SysWorkProcessRepository.Update(updateList.ToList(), it => new { it.FuncStatus });
|
||||
}
|
||||
|
||||
foreach (var process in list)
|
||||
{
|
||||
//这里开启事务,同时才开启
|
||||
client.Ado.BeginTran();
|
||||
|
||||
#region 执行一个任务
|
||||
try
|
||||
{
|
||||
ExecuteSingle(process);
|
||||
|
||||
client.Ado.CommitTran();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
client.Ado.RollbackTran();
|
||||
|
||||
while (ex.InnerException != null)
|
||||
{
|
||||
ex = ex.InnerException;
|
||||
}
|
||||
|
||||
process.FuncStatus = (int)WorkProcessStatus.ExceptionStop;
|
||||
process.EditTime = DateTime.Now;
|
||||
process.ExecuteTime = DateTime.Now;
|
||||
if (ex is BaseException)
|
||||
{
|
||||
process.ExceptionInfo = ex.Message;
|
||||
}
|
||||
else
|
||||
{
|
||||
process.ExceptionInfo = ex.ToString();
|
||||
}
|
||||
|
||||
_SysWorkProcessRepository.Update(process);
|
||||
}
|
||||
#endregion
|
||||
}
|
||||
}
|
||||
#endregion
|
||||
|
||||
#region RetryTask(重试失败的任务)
|
||||
/// <summary>
|
||||
/// 重试失败的任务
|
||||
/// </summary>
|
||||
public void RetryTask()
|
||||
{
|
||||
DateTime today = DateTime.Now.Date.AddDays(-1);
|
||||
var list = _SysWorkProcessRepository.Queryable().Where(t => t.FuncStatus == 4 && t.RetryCount <= 10
|
||||
&& t.CreateTime >= today).OrderBy(t => t.EditTime).Take(20).ToList();
|
||||
list.ForEach(t =>
|
||||
{
|
||||
t.RetryCount = t.RetryCount + 1;
|
||||
t.FuncStatus = 0;
|
||||
});
|
||||
_SysWorkProcessRepository.Update(list);
|
||||
}
|
||||
#endregion
|
||||
}
|
||||
|
||||
@@ -0,0 +1,20 @@
|
||||
using MyCode.Project.Domain.Message.Request.JackYun;
|
||||
using MyCode.Project.Domain.Repositories;
|
||||
using MyCode.Project.Infrastructure.Common;
|
||||
using MyCode.Project.Infrastructure.JackYun;
|
||||
using System;
|
||||
|
||||
namespace MyCode.Project.Services.Implementation
|
||||
{
|
||||
public class YuyuboService : ServiceBase //,// IJackYunService
|
||||
{
|
||||
|
||||
private IJackOrdersRepository _jackOrdersRepository;
|
||||
|
||||
public YuyuboService(IJackOrdersRepository jackOrdersRepository)
|
||||
{
|
||||
_jackOrdersRepository = jackOrdersRepository;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,20 @@
|
||||
using MyCode.Project.Domain.Message.Request.JackYun;
|
||||
using MyCode.Project.Domain.Repositories;
|
||||
using MyCode.Project.Infrastructure.Common;
|
||||
using MyCode.Project.Infrastructure.JackYun;
|
||||
using System;
|
||||
|
||||
namespace MyCode.Project.Services.Implementation
|
||||
{
|
||||
public class ZhuBinService : ServiceBase //, IZhuBinService
|
||||
{
|
||||
|
||||
private IYTKJTShopParameterRepository _yTKJTShopParameterRepository;
|
||||
|
||||
public ZhuBinService(IYTKJTShopParameterRepository yTKJTShopParameterRepository)
|
||||
{
|
||||
_yTKJTShopParameterRepository = yTKJTShopParameterRepository;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
@@ -115,19 +115,21 @@
|
||||
<Compile Include="BLL\WebSocketBLL.cs" />
|
||||
<Compile Include="Implementation\AnsyDataProcessService.cs" />
|
||||
<Compile Include="Implementation\ApiLogService.cs" />
|
||||
<Compile Include="Implementation\JackYunService.cs" />
|
||||
<Compile Include="Implementation\ZhuBinService.cs" />
|
||||
<Compile Include="Implementation\LiQiongHaiService.cs" />
|
||||
<Compile Include="Implementation\YuyuboService.cs" />
|
||||
<Compile Include="Implementation\ReportService.cs" />
|
||||
<Compile Include="Implementation\QueueProcessService.cs" />
|
||||
<Compile Include="Implementation\WebSocketService.cs" />
|
||||
<Compile Include="Implementation\WorkProcessService.cs" />
|
||||
<Compile Include="IServices\IAnsyDataProcessService.cs" />
|
||||
<Compile Include="IServices\IJackYunService.cs" />
|
||||
<Compile Include="IServices\ILiQiongHaiService.cs" />
|
||||
<Compile Include="IServices\IZhuBinService.cs" />
|
||||
<Compile Include="IServices\IYuyuboService.cs" />
|
||||
<Compile Include="IServices\IApiLogService.cs" />
|
||||
<Compile Include="IServices\IItemListService.cs" />
|
||||
<Compile Include="IServices\IQueueProcessService.cs" />
|
||||
<Compile Include="IServices\IReportService.cs" />
|
||||
<Compile Include="IServices\IWebSocketService.cs" />
|
||||
<Compile Include="IServices\IWorkProcessService.cs" />
|
||||
<Compile Include="Properties\AssemblyInfo.cs" />
|
||||
<Compile Include="ServiceBase.cs" />
|
||||
</ItemGroup>
|
||||
|
||||
Reference in New Issue
Block a user