Commit b11fde54 by 蒋勇

d

parent db2bda0f
......@@ -4,52 +4,52 @@ class TaskBase{
this.redisClient=system.getObject("util.redisClient");
this.serviceName=className;
this.isThrough=false;
<<<<<<< HEAD:taskexecutor/app/base/task.base.js
this.isDaemon=false;
=======
this.cacheManager=system.getObject("db.common.cacheManager");
this.db = system.getObject("db.common.connection").getCon();
>>>>>>> 1bccc0e0e608e84ed0a0ddd5bc63e6a076047c01:taskexecutor/app/base/db/task.base.js
this.TASK_CHANNEL="k8stask";
}
async subBeforeTask(params){
console.log("前置操作......",this.serviceName);
}
async beforeTask(params){
console.log("前置操作......",this.serviceName);
var self=this;
await this.redisClient.subscribe(this.TASK_CHANNEL);
this.redisClient.subclient.on("message", async function (channel, message) {
await self.taskHandle(channel, message);
});
await this.subBeforeTask(params);
}
async postTask(params){
async taskHandle(channel, message){
await this.subBeforeTask(channel, message);
}
async subTaskHandle(channel, message){
console.log(channel, message);
}
async subPostTask(params){
console.log("后置操作......",this.serviceName);
}
async postTask(params){
await this.subPostTask();
}
async doTask(params){
try {
await this.beforeTask(params);
if(!this.isThrough){
await this.subDoTask(params);
await this.postTask(params);
if(this.isDaemon){
var http = require('http');
var server = http.createServer((req,res) => {
//设置 HTTP 头部,状态码是 200,文件类型是 html,字符集是 utf-8
res.writeHead(200,{"Content-Type":"text/html;charset='utf-8'"});
res.write('<h1>Node.js</h1>');
res.end('Hello World\\n');
this.reqHandler(req,res);
})
server.listen(3000)//监听端口号是3000的服务器
server.listen(process.env.TASK_PORT);//监听端口号是3000的服务器
console.log("is listenning........on "+process.env.TASK_PORT);
}
}
await this.postTask(params);
//日志记录
console.log(JSON.stringify({
optitle:this.serviceName+",任务成功执行完成",
op:"base/db/task.base.js",
content:"",
clientIp:"",
status:"ok"
}));
} catch (e) {
await this.redisClient.unsubscribe(this.TASK_CHANNEL);
//日志记录
console.log(JSON.stringify({
optitle:this.serviceName+",任务成功执行完成",
optitle:this.serviceName+",任务执行存在错误",
op:"base/db/task.base.js",
content:"",
clientIp:"",
......@@ -57,6 +57,15 @@ class TaskBase{
}));
}
}
async subReqHandler(req,res){
//设置 HTTP 头部,状态码是 200,文件类型是 html,字符集是 utf-8
res.writeHead(200,{"Content-Type":"text/html;charset='utf-8'"});
res.write('<h1>Node.js</h1>');
}
async reqHandler(req,res){
await this.subReqHandler(req,res);
res.end('Hello World\\n');
}
async subDoTask(params){
throw new Error("请在子类中重写此方法进行操作业务逻辑............................!");
}
......
......@@ -3,10 +3,11 @@ class TestTask extends TaskBase{
constructor(){
super(TaskBase.getServiceName(TestTask));
}
async beforeTask(params){
async subBeforeTask(params){
console.log("前置操作......",this.serviceName);
//this.isThrough=true;
//console.log(this.cacheManager);
this.isDaemon=true;
}
async subDoTask(params){
console.log(params);
......
......@@ -31,96 +31,23 @@ class RedisClient {
return Math.min(options.attempt * 100, 3000);
}
});
// return client.multi().get('foo').execAsync().then(function(res) {
// console.log(res); // => 'bar'
// });
this.client.on("error", function (err) {
console.log("Error " + err);
// //日志记录
// logCtl.error({
// optitle:"redis this.client.on异常:",
// op:"base/utils/redisClient/this.client.on",
// content:err,
// clientIp:""
// });
});
this.subclient = this.client.duplicate();
this.subclient.on("error", function (err) {
console.log("Error " + err);
// //日志记录
// logCtl.error({
// optitle:"redis this.subclient.on异常:",
// op:"base/utils/redisClient/this.subclient.on",
// content:err,
// clientIp:""
// });
});
var self = this;
this.subclient.on("message", async function (channel, message) {
console.log(channel, '------------- redis message ------------------- ');
if (self.taskmanager) {
if (channel == "task") {
if (message == "newtask") {
(async (that) => {
var msg2 = await that.rpop("tasklist");
if (msg2) {
console.log("taskName+++++" + msg2);
var msgs2 = msg2.split("_");
var action = msgs2[0];
var taskName = msgs2[1];
var exp = msgs2[2];
await that.taskmanager.addTask(taskName, exp);
}
})(self)
} else {
(async (msg, that) => {
var msgs = msg.split("_");
var action = msgs[0];
if (action == "delete") {
var taskName = msgs[1];
await that.taskmanager.deleteTask(taskName);
}
})(message, self);
}
}
}
if (self.chatserver) {
if (channel != "task") {
var message = JSON.parse(message);
console.log(message, "------------------------------------------ publish message");
if (channel == "brc") {//如果是广播频道,则发送广播到客户端
self.chatserver.server.emit("brc", message);
} else if (self.chatserver.users[channel]) {
if (message.type) {
self.chatserver.users[channel].client.emit(message.type, message.data);
} else {
//持久化
self.chatserver.users[channel].client.emit("chatmsg", message);
}
}
}
}
});
}
async subscribe(channel, chatserver) {
if (!this.chatserver) {
this.chatserver = chatserver;
}
async subscribe(channel) {
return this.subclient.subscribeAsync(channel);
}
async unsubscribe(channel) {
//this.chatserver=null;
return this.subclient.unsubscribeAsync(channel);
}
async subscribeTask(channel, taskmanager) {
if (!this.taskmanager) {
this.taskmanager = taskmanager;
}
return this.subclient.subscribeAsync(channel);
}
async publish(channel, msg) {
console.log(channel + ":" + msg);
return this.client.publishAsync(channel, msg);
......
......@@ -4,13 +4,12 @@ const fs=require("fs");
// var dbf=system.getObject("db.common.connection");
// con=dbf.getCon();
var taskName = process.env.TASK_NAME;
var params= process.env.TASK_PARAM;
var params= process.env.TASK_PARAM?process.env.TASK_PARAM:"";
var port=process.env.TASK_PORT;
if(taskName){
var task=system.getObject("task."+taskName);
(async()=>{
await task.doTask(params);
console.log("process over............");
})();
}else{
console.log("not find task,please check ............................");
......
var http = require('http');
var server = http.createServer((req,res) => {
//设置 HTTP 头部,状态码是 200,文件类型是 html,字符集是 utf-8
res.writeHead(200,{"Content-Type":"text/html;charset='utf-8'"});
res.write('<h1>Node.js</h1>');
res.end('Hello World\\n');
})
server.listen(3000);//监听端口号是3000的服务器
console.log("end");
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment