Commit 93769e8e by 蒋勇

d

parent 13ba8a9a
const system=require("../system") const system = require("../system")
class TaskBase{ class TaskBase {
constructor(className){ constructor(className) {
this.redisClient=system.getObject("util.redisClient"); this.redisClient = system.getObject("util.redisClient");
this.restS=system.getObject("util.restClient"); this.restS = system.getObject("util.restClient");
this.serviceName=className; this.serviceName = className;
this.isThrough=false; this.isThrough = false;
this.isDaemon=false; this.isDaemon = false;
this.TASK_CHANNEL="k8stask"; this.TASK_CHANNEL = "k8stask";
} this.dingurl = "https://oapi.dingtalk.com/robot/send?access_token=530ecf80f8465a68d54c92c4afc9793aa5ce1c349756fdbefe6eea94d6c3d7a7"
async subBeforeTask(params){ }
console.log("前置操作......",this.serviceName); async subBeforeTask (params) {
} console.log("前置操作......", this.serviceName);
async beforeTask(params){ }
var self=this; /**
//订阅任务频道 *
await this.redisClient.subscribe(this.TASK_CHANNEL); * @param {*} taskName
//频道事件处理函数 * 返回格式
this.redisClient.subclient.on("message", async function (channel, message) { * xxx|i|12233
if(channel==self.TASK_CHANNEL){ * i---标示间隔
await self.taskHandle(channel, message); * a--标示具体某个时间
} */
}); formatedTaskName (taskName) {
return ""
}
async beforeTask (params) {
var self = this;
//任务打点
//key taskuptime
let taskName = this.formatedTaskName(this.serviceName)
if (taskName && taskName.trim() != "") {
let nowtime = Date.now()
this.redisClient.hmset("taskuptime", { taskName: nowtime })
}
//订阅任务频道
await this.redisClient.subscribe(this.TASK_CHANNEL);
//频道事件处理函数
this.redisClient.subclient.on("message", async function (channel, message) {
if (channel == self.TASK_CHANNEL) {
await self.taskHandle(channel, message);
}
});
await this.subBeforeTask(params); await this.subBeforeTask(params);
} }
async taskHandle(channel, message){ async taskHandle (channel, message) {
await this.subTaskHandle(channel, message); await this.subTaskHandle(channel, message);
} }
async subTaskHandle(channel, message){ async subTaskHandle (channel, message) {
console.log(channel, message); console.log(channel, message);
} }
async subPostTask(params){ async subPostTask (params) {
console.log("后置操作......",this.serviceName); console.log("后置操作......", this.serviceName);
} }
async postTask(params){ async postTask (params) {
await this.subPostTask(); await this.subPostTask();
this.redisClient.client.quit(); this.redisClient.client.quit();
this.redisClient.subclient.quit(); this.redisClient.subclient.quit();
} }
async doTask(params){ async doTask (params) {
try { try {
await this.beforeTask(params); await this.beforeTask(params);
if(!this.isThrough){ if (!this.isThrough) {
await this.subDoTask(params); await this.subDoTask(params);
if(this.isDaemon){ if (this.isDaemon) {
var http = require('http'); var http = require('http');
var server = http.createServer((req,res) => { var server = http.createServer((req, res) => {
this.reqHandler(req,res); this.reqHandler(req, res);
}) })
server.listen(process.env.TASK_PORT);//监听端口号是3000的服务器 server.listen(process.env.TASK_PORT);//监听端口号是3000的服务器
console.log("is listenning........on "+process.env.TASK_PORT); console.log("is listenning........on " + process.env.TASK_PORT);
}else{ } else {
await this.postTask(params); await this.postTask(params);
} }
} }
} catch (e) { } catch (e) {
await this.redisClient.unsubscribe(this.TASK_CHANNEL); await this.redisClient.unsubscribe(this.TASK_CHANNEL);
console.log(e); console.log(e);
//日志记录 //日志记录
console.log(JSON.stringify({ console.log(JSON.stringify({
optitle:this.serviceName+",任务执行存在错误", optitle: this.serviceName + ",任务执行存在错误",
op:"base/db/task.base.js", op: "base/db/task.base.js",
content:"", content: "",
clientIp:"", clientIp: "",
status:"error" status: "error"
})); }));
} }
} }
async subReqHandler(req,res){ async subReqHandler (req, res) {
//设置 HTTP 头部,状态码是 200,文件类型是 html,字符集是 utf-8 //设置 HTTP 头部,状态码是 200,文件类型是 html,字符集是 utf-8
res.writeHead(200,{"Content-Type":"text/html;charset='utf-8'"}); res.writeHead(200, { "Content-Type": "text/html;charset='utf-8'" });
res.write('<h1>Node.js</h1>'); res.write('<h1>Node.js</h1>');
} }
async reqHandler(req,res){ async reqHandler (req, res) {
await this.subReqHandler(req,res); await this.subReqHandler(req, res);
res.end('Hello World\\n'); res.end('Hello World\\n');
} }
async subDoTask(params){ async subDoTask (params) {
throw new Error("请在子类中重写此方法进行操作业务逻辑............................!"); throw new Error("请在子类中重写此方法进行操作业务逻辑............................!");
} }
static getServiceName(ClassObj){ static getServiceName (ClassObj) {
return ClassObj["name"]; return ClassObj["name"];
} }
async apiCallWithAk(url,params){ async sendDing (text) {
var acck="task"; let c = text ? text : "我就是我, 是不一样的烟火"
this.apiCallWithAk(this.dingurl,
{
"msgtype": "text",
"text": { "content": "异常提醒:" + c }
}
)
}
async apiCallWithAk (url, params) {
var acck = "task";
//按照访问token //按照访问token
var restResult=await this.restS.execPostWithAK(params,url,acck); var restResult = await this.restS.execPostWithAK(params, url, acck);
console.log("restResultrestResultrestResultrestResultrestResultrestResultrestResultrestResult"); console.log("restResultrestResultrestResultrestResultrestResultrestResultrestResultrestResult");
console.log(restResult); console.log(restResult);
if(restResult){ if (restResult) {
return restResult; return restResult;
} }
return null; return null;
} }
sleep(milliSeconds) { sleep (milliSeconds) {
var startTime = new Date().getTime(); var startTime = new Date().getTime();
while (new Date().getTime() < startTime + milliSeconds); while (new Date().getTime() < startTime + milliSeconds);
} }
} }
module.exports=TaskBase; module.exports = TaskBase;
new TaskBase().sendDing()
const TaskBase = require("../../task.base");
const settings = require("../../../../config/settings");
var excel = require('exceljs');
const system = require("../../../system");
const fs = require('fs');
class MonitorTask extends TaskBase {
constructor() {
super(TaskBase.getServiceName(MonitorTask));
}
/**
* 每隔10分钟检查一次
* @param {*} taskName
*/
formatedTaskName (taskName) {
return taskName + "|i|10"
}
isFail (taskName, lastAccess) {
let n = Date.now()
let postMinute = (n - lastAccess) / 1000 / 60
let tnameparams = taskName.split("|")
let tname = tnameparams[0].trim()
let mtype = tnameparams[1].trim()
let pvalue = Number(tnameparams[2].trim())
if (mtype == "i") {//每几分钟执行一次
console.log(taskName, postMinute, "xxxxxxxxxxxxxxxxxxxxxxx")
//如果流失的时间大于循环调度时间,则报异常到丁丁
if (postMinute > pvalue) {
this.sendDing(tname + "可能失联啦...请检查")
} else {
if (tname == this.serviceName) {
this.sendDing(tname + "监控精灵依然在线,请放心....")
}
}
}
if (mtype == "a") {//
let onedaym = 24 * 60
if (postMinute > onedaym) {
this.sendDing(tname + "可能失联啦...请检查")
}
}
}
async subBeforeTask (params) {
console.log("前置操作......", this.serviceName);
}
async subDoTask (params) {
let x = await this.redisClient.hgetall("taskuptime")
if (x) {
for (let entry in x) {
this.isFail(entry, x[entry])
}
} else {
console.log("没有可以监控的任务.....");
}
console.log("MonitorTask.....");
}
}
module.exports = MonitorTask;
\ No newline at end of file
...@@ -18,6 +18,7 @@ class RedisClient { ...@@ -18,6 +18,7 @@ class RedisClient {
// // a individual error // // a individual error
// return new Error('The server refused the connection'); // return new Error('The server refused the connection');
// } // }
console.log("redis connecting.....try")
if (options.total_retry_time > 1000 * 60 * 60) { if (options.total_retry_time > 1000 * 60 * 60) {
// End reconnecting after a specific timeout and flush all commands // End reconnecting after a specific timeout and flush all commands
// with a individual error // with a individual error
...@@ -39,60 +40,60 @@ class RedisClient { ...@@ -39,60 +40,60 @@ class RedisClient {
console.log("Error " + err); console.log("Error " + err);
}); });
var self = this; var self = this;
} }
async subscribe(channel) { async subscribe (channel) {
return this.subclient.subscribeAsync(channel); return this.subclient.subscribeAsync(channel);
} }
async unsubscribe(channel) { async unsubscribe (channel) {
//this.chatserver=null; //this.chatserver=null;
return this.subclient.unsubscribeAsync(channel); return this.subclient.unsubscribeAsync(channel);
} }
async publish(channel, msg) { async publish (channel, msg) {
console.log("publish--"+channel + ":" + msg); console.log("publish--" + channel + ":" + msg);
return this.client.publishAsync(channel, msg); return this.client.publishAsync(channel, msg);
} }
async notifyConsume(channel, consumetarget,val) { async notifyConsume (channel, consumetarget, val) {
await this.client.rpush(consumetarget,val); await this.client.rpush(consumetarget, val);
console.log("publish--"+channel + ":" + consumetarget); console.log("publish--" + channel + ":" + consumetarget);
return this.client.publishAsync(channel, consumetarget); return this.client.publishAsync(channel, consumetarget);
} }
async rpush(key, val) { async rpush (key, val) {
return this.client.rpushAsync(key, val); return this.client.rpushAsync(key, val);
} }
async llen(key) { async llen (key) {
return this.client.llenAsync(key); return this.client.llenAsync(key);
} }
async rpushWithEx(key, val, t) { async rpushWithEx (key, val, t) {
var p = this.rpush(key, val); var p = this.rpush(key, val);
this.client.expire(key, t); this.client.expire(key, t);
return p; return p;
} }
async rpop(key) { async rpop (key) {
return this.client.rpopAsync(key); return this.client.rpopAsync(key);
} }
async lpop(key) { async lpop (key) {
return this.client.lpopAsync(key); return this.client.lpopAsync(key);
} }
async lrem(key, val) { async lrem (key, val) {
return this.client.lremAsync(key, 1, val); return this.client.lremAsync(key, 1, val);
} }
async ltrim(key, s, e) { async ltrim (key, s, e) {
return this.client.ltrimAsync(key, s, e); return this.client.ltrimAsync(key, s, e);
} }
async clearlist(key) { async clearlist (key) {
await this.client.ltrim(key, -1, -1); await this.client.ltrim(key, -1, -1);
await this.client.ltrim(key, 1, -1); await this.client.ltrim(key, 1, -1);
return 0; return 0;
} }
async flushall() { async flushall () {
console.log("sss"); console.log("sss");
return this.client.flushallAsync(); return this.client.flushallAsync();
} }
async keys(p) { async keys (p) {
return this.client.keysAsync(p); return this.client.keysAsync(p);
} }
async set(key, val) { async set (key, val) {
if (typeof val == "undefined" || typeof key == "undefined") { if (typeof val == "undefined" || typeof key == "undefined") {
console.log("......................cache val undefined"); console.log("......................cache val undefined");
console.log(key); console.log(key);
...@@ -100,99 +101,125 @@ class RedisClient { ...@@ -100,99 +101,125 @@ class RedisClient {
} }
return this.client.setAsync(key, val); return this.client.setAsync(key, val);
} }
async setWithEx(key, val, t) { async setWithEx (key, val, t) {
var p = this.client.setAsync(key, val); var p = this.client.setAsync(key, val);
this.client.expire(key, t); this.client.expire(key, t);
return p; return p;
} }
async get(key) { async get (key) {
return this.client.getAsync(key); return this.client.getAsync(key);
} }
async delete(key) { async delete (key) {
return this.client.delAsync(key); return this.client.delAsync(key);
} }
async hmset(key, jsonObj) { async hmset (key, jsonObj) {
return this.client.hmsetAsync(key, jsonObj); return this.client.hmsetAsync(key, jsonObj);
} }
async hmsetWithEx(key, jsonObj, t) { async hmsetWithEx (key, jsonObj, t) {
var p = this.client.hmsetAsync(key, jsonObj); var p = this.client.hmsetAsync(key, jsonObj);
this.client.expire(key, t); this.client.expire(key, t);
return p; return p;
} }
async hgetall(key) { async hgetall (key) {
return this.client.hgetallAsync(key); return this.client.hgetallAsync(key);
} }
async hincrby(key, f, n) { async hincrby (key, f, n) {
return this.client.hincrbyAsync(key, f, n); return this.client.hincrbyAsync(key, f, n);
} }
async sadd(key, vals) { async sadd (key, vals) {
await this.client.saddAsync(key, ...vals); await this.client.saddAsync(key, ...vals);
return this.scard(key); return this.scard(key);
} }
async scard(key) { async scard (key) {
return this.client.scardAsync(key); return this.client.scardAsync(key);
} }
async srem(key, val) { async srem (key, val) {
return this.client.sremAsync(key, val); return this.client.sremAsync(key, val);
} }
async sismember(key, val) { async sismember (key, val) {
return this.client.sismemberAsync(key, val); return this.client.sismemberAsync(key, val);
} }
async smembers(key) { async smembers (key) {
return this.client.smembersAsync(key); return this.client.smembersAsync(key);
} }
async exists(key) { async exists (key) {
return this.client.existsAsync(key); return this.client.existsAsync(key);
} }
async incr(key) { async incr (key) {
return this.client.incrAsync(key); return this.client.incrAsync(key);
} }
async close(){ async close () {
this.client.quit(); this.client.quit();
} }
} }
module.exports = RedisClient; module.exports = RedisClient;
// (async ()=>{ // (async () => {
// var c=new RedisClient(); // function isFail (taskName, lastAccess) {
// await c.set("testtest",1); // let n = Date.now()
// var tt=await c.get("testtest"); // let postMinute = (n - lastAccess) / 1000 / 60
// console.log("dddddddddddddddddddddddd"); // let tnameparams = taskName.split("|")
// console.log(tt); // let tname = tnameparams[0].trim()
// let mtype = tnameparams[1].trim()
// let pvalue = Number(tnameparams[2].trim())
// if (mtype == "i") {//每几分钟执行一次
// console.log(taskName, postMinute, "xxxxxxxxxxxxxxxxxxxxxxx")
// //如果流失的时间大于循环调度时间,则报异常到丁丁
// if (postMinute > pvalue) {
// this.sendDing(tname + "可能失联啦...请检查")
// }
// }
// if (mtype == "a") {//
// let onedaym = 24 * 60
// if (postMinute > onedaym) {
// this.sendDing(tname + "可能失联啦...请检查")
// }
// }
// }
// var client = new RedisClient();
// client.hmset("tasks", { "task1|i|1000": 1599793212470, "task2|i|1000": 1599793212476 }).then(function (r) {
// console.log(r);
// });
// let x = await client.hgetall("tasks")
// for (let entry in x) {
// isFail(entry, x[entry])
// }
// })() // })()
// client.keys('*').then(s=>{
// client.keys('*').then(s => {
// console.log(s); // console.log(s);
// }); // });
// let clients = {}; // let clients = {};
// clients.watcher = redis.createClient({ ... } ); // clients.watcher = redis.createClient({ ... });
// clients.alterer = clients.watcher.duplicate(); // clients.alterer = clients.watcher.duplicate();
// client.sadd("h",["ok","jy","ok"]).then(function(r){ // client.sadd("h", ["ok", "jy", "ok"]).then(function (r) {
// console.log(r); // console.log(r);
// }); // });
// client.smembers("h").then(function(r){ // client.smembers("h").then(function (r) {
// console.log(r); // console.log(r);
// }); // });
// client.sismember("h","ok").then(function(r){ // client.sismember("h", "ok").then(function (r) {
// console.log(r); // console.log(r);
// }); // });
// console.dir(client);ti.exec( callback )回调函数参数err:返回null或者Array,出错则返回对应命令序列链中发生错误的错误信息,这个数组中最后一个元素是源自exec本身的一个EXECABORT类型的错误 // console.dir(client); ti.exec(callback)回调函数参数err:返回null或者Array,出错则返回对应命令序列链中发生错误的错误信息,这个数组中最后一个元素是源自exec本身的一个EXECABORT类型的错误
// r.set("hello","oooo").then(function(result){ // r.set("hello", "oooo").then(function (result) {
// console.log(result); // console.log(result);
// }); // });
// r.get("hello").then(function(result){ // r.get("hello").then(function (result) {
// console.log(result); // console.log(result);
// }); // });
// client.hmset("user_1",{name:"jy",age:13}).then(function(r){ // client.hmset("user_1", { name: "jy", age: 13 }).then(function (r) {
// console.log(r); // console.log(r);
//
// }); // });
// client.hincrby("user_1","age",2).then(function(r){ // client.hincrby("user_1", "age", 2).then(function (r) {
// console.log(r); // console.log(r);
// setTimeout(function(){ // setTimeout(function () {
// client.hgetall("user_1").then(function(u){ // client.hgetall("user_1").then(function (u) {
// console.log(u); // console.log(u);
// }); // });
// },3000); // }, 3000);
// }); // });
var settings = { var settings = {
redis: { redis: {
host: "43.247.184.32", host: "192.168.4.119",
port: 8967, port: 6379,
password: "Gongsibao2018", password: "Gongsibao2018",
db: 15, db: 15,
}, },
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment