Commit 2b71bfe7 by Sxy

feat: add requestId

parent e355c054
......@@ -10,6 +10,10 @@
#### API
> 注: 全链路追踪 请求头 X-Request-Id
##### 消息请求失败重试
> 重试的阶梯时间 在 setting中配置
......@@ -33,7 +37,13 @@ method : post
```
{
"messageId": "cc6a5cb3-baef-4718-9420-09e54899b232"
"code": 0,
"success": true,
"message": null,
"data": {
"messageId": "a102fa5c-5a5f-4282-9033-c9dfd9380584"
},
"requestId": "11adcc90-98c2-4ab8-8eb6-d48d57be1374"
}
```
备注:
......@@ -71,7 +81,13 @@ method : post
```
{
"messageId": "cc6a5cb3-baef-4718-9420-09e54899b232"
"code": 0,
"success": true,
"message": null,
"data": {
"messageId": "a102fa5c-5a5f-4282-9033-c9dfd9380584"
},
"requestId": "11adcc90-98c2-4ab8-8eb6-d48d57be1374"
}
```
......@@ -6,13 +6,20 @@ const onerror = require('koa-onerror')
const bodyparser = require('koa-bodyparser')
const logger = require('koa-logger')
const parameter = require('koa-parameter')
const moment = require("moment")
const rabbmitmqUtil = require("./utils/rabbitmq");
rabbmitmqUtil.initQueue()
const correlation = require('./middleware/correlation');
const createNameSpace = require('cls-hooked').createNamespace;
const logNameSpace = createNameSpace('logger');
const index = require('./routes/index')
const test = require('./routes/test')
// error handler
onerror(app)
......@@ -29,13 +36,15 @@ app.use(views(__dirname + '/views', {
extension: 'html'
}))
app.use(correlation(logNameSpace));
// logger
app.use(async (ctx, next) => {
const start = new Date()
await next()
const ms = new Date() - start
console.log(`${moment().format("YYYY-MM-DD HH:mm:ss")} ${ctx.method} ${ctx.url} - ${ms}ms`)
console.log(`${moment().format("YYYY-MM-DD HH:mm:ss")} ${logNameSpace.get("requestId")} ${ctx.method} ${ctx.url} - ${ms}ms`)
})
// routes
......@@ -43,6 +52,8 @@ app.use(index.routes(), index.allowedMethods())
app.use(test.routes(), test.allowedMethods())
// error-handling
app.on('error', (err, ctx) => {
console.error('server error', err, ctx)
......
const amqp = require('amqplib');
const { MQ } = require("../settings")
const logger = require("../utils/logger")
class RabbitmqClient {
constructor() {
......@@ -13,23 +14,23 @@ class RabbitmqClient {
try {
if (!this.connnection) {
const connnection = await amqp.connect(this.socketOptions);
console.log("--- MQ 连接 成功 ----")
logger.info("--- MQ 连接 成功 ----")
this.connnection = connnection;
this.connnection.on("error", (err) => {
console.log("[x]Error:", err);
logger.error("[x]Error:", err);
this.connnection = null;
});
this.connnection.on("close", (err) => {
console.log("[x]Rabbitmq is closed ", err);
logger.info("[x]Rabbitmq is closed ", err);
this.connnection = null;
});
process.once('SIGINT', this.connnection.close.bind(this.connnection));
}
return this.connnection;
} catch (err) {
console.log("--- MQ 连接 失败 ------");
console.log(err);
logger.info("--- MQ 连接 失败 ------");
logger.info(err);
throw new Error(err.message);
}
}
......@@ -41,20 +42,20 @@ class RabbitmqClient {
const confirmChannel = await connnection.createConfirmChannel();
this.confirmChannel = confirmChannel;
this.confirmChannel.on("error", (err) => {
console.log("[x]Error", err);
logger.error("[x]Error", err);
this.confirmChannel = null;
});
this.confirmChannel.on("close", (err) => {
console.log("[*]Rabbitmq channel is closed", err);
logger.info("[*]Rabbitmq channel is closed", err);
this.confirmChannel = null;
});
}
return this.confirmChannel;
} catch (err) {
console.log("--- 创建confirmChannel失败 ------");
console.log(err);
logger.error("--- 创建confirmChannel失败 ------");
logger.error(err);
throw new Error(err.message);
}
}
......@@ -66,18 +67,18 @@ class RabbitmqClient {
const channel = await connnection.createChannel();
this.channel = channel;
this.channel.on("error", (err) => {
console.log("[x]Error", err);
logger.error("[x]Error", err);
this.channel = null;
});
this.channel.on("close", (err) => {
console.log("[*]Rabbitmq channel is closed", err);
logger.info("[*]Rabbitmq channel is closed", err);
this.channel = null;
});
}
return this.channel;
} catch (err) {
console.log("--- 创建channe失败 ------");
console.log(err);
logger.error("--- 创建channe失败 ------");
logger.error(err);
throw new Error(err.message);
}
}
......
const { v4: uuidv4 } = require('uuid');
function correlation(namespace) {
return async (ctx, next) => {
const requestId = ctx.header['x-request-id'] || uuidv4();
namespace.run(async () => {
namespace.set('requestId', requestId);
await next();
});
}
}
module.exports = correlation;
\ No newline at end of file
......@@ -11,6 +11,7 @@
"dependencies": {
"amqplib": "^0.8.0",
"axios": "^0.21.1",
"cls-hooked": "^4.2.2",
"debug": "^4.1.1",
"koa": "^2.7.0",
"koa-bodyparser": "^4.2.1",
......@@ -24,7 +25,9 @@
"koa-views": "^6.2.0",
"moment": "^2.29.1",
"parameter": "^3.6.0",
"uuid": "^8.3.2"
"uuid": "^8.3.2",
"winston": "^3.3.3",
"winston-daily-rotate-file": "^4.5.5"
},
"devDependencies": {
"nodemon": "^1.19.1"
......
const router = require('koa-router')()
const { sendDelayMessage, sendRetryMessage } = require("../utils/rabbitmq")
const { ok, error } = require("../utils/system")
router.get('/', async (ctx, next) => {
await ctx.render('index')
})
......@@ -16,7 +17,8 @@ router.post('/sendRetryMessage', async (ctx, next) => {
data: { type: 'object', required: false },
timeout: { type: 'int', required: false },
})
ctx.body = await sendRetryMessage(ctx.request.body)
const result = await sendRetryMessage(ctx.request.body)
ctx.body = ok(result)
})
/**
......@@ -32,7 +34,8 @@ router.post('/sendDelayMessage', async (ctx, next) => {
delayTime: { type: 'int', required: true },
timeout: { type: 'int', required: false },
})
ctx.body = await sendDelayMessage(ctx.request.body)
const result = await sendDelayMessage(ctx.request.body)
ctx.body = ok(result)
})
......
const axios = require("axios");
const { v4: uuidv4 } = require('uuid');
/**
* 请求参数 要求
* url
......@@ -26,6 +26,7 @@ async function request(message) {
timeout: timeout * 1000,
headers: {
'Content-Type': 'application/json',
"X-Request-Id": uuidv4(),
...headers
}
});
......
const { createLogger, format, transports } = require('winston');
const getNamespace = require('cls-hooked').getNamespace;
const myFormat = format.printf(({ level, message, timestamp }) => {
const loggerNamespace = getNamespace('logger');
return `[${timestamp}] [${level}] ${loggerNamespace.get('requestId') ? ("[" + loggerNamespace.get('requestId') + "] :") : ''} ${message}`;
});
const logger = createLogger({
level: 'info',
format: format.combine(
format.timestamp(),
format.splat(),
myFormat
),
transports: [
new transports.Console()
],
});
module.exports = logger;
\ No newline at end of file
......@@ -21,6 +21,10 @@ const { v4: uuidv4 } = require('uuid');
const { request } = require("./axios");
const logger = require("../utils/logger")
const getNamespace = require('cls-hooked').getNamespace;
/**
* 重试机制
* 初始化 阶梯延迟队列
......@@ -51,7 +55,7 @@ async function initLadderQueue() {
await channel.bindQueue(consumerQueue, consumerExchange, consumerRoutingKey);
console.log("--- 阶梯延迟队列 && 死信队列 初始化成功 ----")
logger.info("--- 阶梯延迟队列 && 死信队列 初始化成功 ----")
}
......@@ -78,8 +82,8 @@ async function listenCustomerLadderQueue() {
channel.ack(msg);
} else {
// 重试次数用完还是失败
console.log("------ 重试 发送消息 彻底 ERROR ------")
console.log(msg.content.toString());
logger.error("------ 重试 发送消息 彻底 ERROR ------")
logger.error(msg.content.toString());
channel.ack(msg);
}
}
......@@ -97,6 +101,8 @@ async function listenCustomerLadderQueue() {
* 3. 将消息状态改为 已发送到mq
*/
async function sendRetryMessage(message, queue = consumerQueue) {
const logNameSpace = getNamespace('logger');
message.headers["X-Request-Id"] = logNameSpace.get("requestId");
if (!message.messageId) {
message.messageId = uuidv4();
}
......@@ -112,7 +118,7 @@ async function sendRetryMessage(message, queue = consumerQueue) {
// console.log(err, ok)
})
return {
messageId: message.messageId
messageId: message.messageId,
}
}
......@@ -134,7 +140,7 @@ async function initDelayedQueue() {
});
await channel.assertQueue(delayQueue, { exclusive: false, });
await channel.bindQueue(delayQueue, delayExchange, delayRoutingKey);
console.log("--- 延时插件队列 初始化成功 ---")
logger.info("--- 延时插件队列 初始化成功 ---")
}
/**
......@@ -154,8 +160,8 @@ async function listenCustomerQueue() {
await request(content);
channel.ack(msg);
} catch (err) {
console.log("------ 延时 发送消息 彻底 ERROR ------")
console.log(msg.content.toString());
logger.error("------ 延时 发送消息 彻底 ERROR ------")
logger.error(msg.content.toString());
channel.ack(msg);
}
},
......@@ -173,9 +179,14 @@ async function listenCustomerQueue() {
* 3. 将消息状态改为 已发送到mq
*/
async function sendDelayMessage(message) {
const logNameSpace = getNamespace('logger');
if (!message.messageId) {
message.messageId = uuidv4();
}
if (!message.headers) {
message.headers = {}
}
message.headers["X-Request-Id"] = logNameSpace.get("requestId");
const channel = await rabbitmqClient.getConfirmChannel();
await channel.publish(delayExchange, delayRoutingKey, Buffer.from(JSON.stringify(message)), {
persistent: true, // 持久化 消息
......@@ -187,7 +198,7 @@ async function sendDelayMessage(message) {
}, function (err, ok) {
})
return {
messageId: message.messageId
messageId: message.messageId,
}
}
......
const getNamespace = require('cls-hooked').getNamespace;
const logNameSpace = getNamespace('logger');
const ok = (data) => {
return {
code: 0,
success: true,
message: null,
data,
requestId: logNameSpace.get("requestId"),
}
}
const error = (msg = "未知错误", code = -1) => {
return {
code,
success: false,
message: msg,
data: null,
requestId: logNameSpace.get("requestId"),
}
}
module.exports = {
ok,
error
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment