Commit 27947086 by Sxy

feat: 第一版功能

parents
node_modules/
package-lock.json
const Koa = require('koa')
const app = new Koa()
const views = require('koa-views')
const json = require('koa-json')
const onerror = require('koa-onerror')
const bodyparser = require('koa-bodyparser')
const logger = require('koa-logger')
const parameter = require('koa-parameter')
const moment = require("moment")
const rabbmitmqUtil = require("./utils/rabbitmq");
rabbmitmqUtil.initQueue()
const index = require('./routes/index')
const test = require('./routes/test')
// error handler
onerror(app)
// middlewares
app.use(bodyparser({
enableTypes: ['json', 'form', 'text']
}))
app.use(parameter(app))
app.use(json())
app.use(logger())
app.use(require('koa-static')(__dirname + '/public'))
app.use(views(__dirname + '/views', {
extension: 'html'
}))
// logger
app.use(async (ctx, next) => {
const start = new Date()
await next()
const ms = new Date() - start
console.log(`${moment().format("YYYY-MM-DD HH:mm:ss")} ${ctx.method} ${ctx.url} - ${ms}ms`)
})
// routes
app.use(index.routes(), index.allowedMethods())
app.use(test.routes(), test.allowedMethods())
// error-handling
app.on('error', (err, ctx) => {
console.error('server error', err, ctx)
});
module.exports = app
#!/usr/bin/env node
/**
* Module dependencies.
*/
var app = require('../app');
var debug = require('debug')('demo:server');
var http = require('http');
/**
* Get port from environment and store in Express.
*/
var port = normalizePort(process.env.PORT || '3000');
// app.set('port', port);
/**
* Create HTTP server.
*/
var server = http.createServer(app.callback());
/**
* Listen on provided port, on all network interfaces.
*/
server.listen(port);
server.on('error', onError);
server.on('listening', onListening);
/**
* Normalize a port into a number, string, or false.
*/
function normalizePort(val) {
var port = parseInt(val, 10);
if (isNaN(port)) {
// named pipe
return val;
}
if (port >= 0) {
// port number
return port;
}
return false;
}
/**
* Event listener for HTTP server "error" event.
*/
function onError(error) {
if (error.syscall !== 'listen') {
throw error;
}
var bind = typeof port === 'string'
? 'Pipe ' + port
: 'Port ' + port;
// handle specific listen errors with friendly messages
switch (error.code) {
case 'EACCES':
console.error(bind + ' requires elevated privileges');
process.exit(1);
case 'EADDRINUSE':
console.error(bind + ' is already in use');
process.exit(1);
default:
throw error;
}
}
/**
* Event listener for HTTP server "listening" event.
*/
function onListening() {
var addr = server.address();
var bind = typeof addr === 'string'
? 'pipe ' + addr
: 'port ' + addr.port;
debug('Listening on ' + bind);
}
const amqp = require('amqplib');
const { MQ } = require("../settings")
class RabbitmqClient {
constructor() {
this.socketOptions = MQ
this.connnection = null;
this.confirmChannel = null;
this.channel = null;
}
async getConnnection() {
try {
if (!this.connnection) {
const connnection = await amqp.connect(this.socketOptions);
console.log("--- MQ 连接 成功 ----")
this.connnection = connnection;
this.connnection.on("error", (err) => {
console.log("[x]Error:", err);
this.connnection = null;
});
this.connnection.on("close", (err) => {
console.log("[x]Rabbitmq is closed ", err);
this.connnection = null;
});
process.once('SIGINT', this.connnection.close.bind(this.connnection));
}
return this.connnection;
} catch (err) {
console.log("--- MQ 连接 失败 ------");
console.log(err);
throw new Error(err.message);
}
}
async getConfirmChannel() {
try {
if (!this.confirmChannel) {
const connnection = await this.getConnnection();
const confirmChannel = await connnection.createConfirmChannel();
this.confirmChannel = confirmChannel;
this.confirmChannel.on("error", (err) => {
console.log("[x]Error", err);
this.confirmChannel = null;
});
this.confirmChannel.on("close", (err) => {
console.log("[*]Rabbitmq channel is closed", err);
this.confirmChannel = null;
});
}
return this.confirmChannel;
} catch (err) {
console.log("--- 创建confirmChannel失败 ------");
console.log(err);
throw new Error(err.message);
}
}
async getChannel() {
try {
if (!this.channel) {
const connnection = await this.getConnnection();
const channel = await connnection.createChannel();
this.channel = channel;
this.channel.on("error", (err) => {
console.log("[x]Error", err);
this.channel = null;
});
this.channel.on("close", (err) => {
console.log("[*]Rabbitmq channel is closed", err);
this.channel = null;
});
}
return this.channel;
} catch (err) {
console.log("--- 创建channe失败 ------");
console.log(err);
throw new Error(err.message);
}
}
}
module.exports = new RabbitmqClient();
\ No newline at end of file
{
"name": "message-call-service",
"version": "0.1.0",
"private": true,
"scripts": {
"start": "node bin/www",
"dev": "./node_modules/.bin/nodemon bin/www",
"prd": "pm2 start bin/www",
"test": "echo \"Error: no test specified\" && exit 1"
},
"dependencies": {
"amqplib": "^0.8.0",
"axios": "^0.21.1",
"debug": "^4.1.1",
"koa": "^2.7.0",
"koa-bodyparser": "^4.2.1",
"koa-convert": "^1.2.0",
"koa-json": "^2.0.2",
"koa-logger": "^3.2.0",
"koa-onerror": "^4.1.0",
"koa-parameter": "^3.0.1",
"koa-router": "^7.4.0",
"koa-static": "^5.0.0",
"koa-views": "^6.2.0",
"moment": "^2.29.1",
"parameter": "^3.6.0",
"uuid": "^8.3.2"
},
"devDependencies": {
"nodemon": "^1.19.1"
}
}
const router = require('koa-router')()
const { sendDelayMessage, sendRetryMessage } = require("../utils/rabbitmq")
router.get('/', async (ctx, next) => {
await ctx.render('index')
})
/**
* 发送消息 - 重试机制
*/
router.post('/sendRetryMessage', async (ctx, next) => {
ctx.verifyParams({
url: { type: 'string', required: true },
method: { type: 'string', required: false },
headers: { type: 'object', required: false },
params: { type: 'object', required: false },
data: { type: 'object', required: false },
timeout: { type: 'int', required: false },
})
ctx.body = await sendRetryMessage(ctx.request.body)
})
/**
* 发送消息 - 队列
*/
router.post('/sendDelayMessage', async (ctx, next) => {
ctx.verifyParams({
url: { type: 'string', required: true },
method: { type: 'string', required: false },
headers: { type: 'string', required: false },
params: { type: 'object', required: false },
data: { type: 'object', required: false },
delayTime: { type: 'int', required: true },
timeout: { type: 'int', required: false },
})
ctx.body = await sendDelayMessage(ctx.request.body)
})
module.exports = router
const router = require('koa-router')()
router.get('/test/error', async (ctx, next) => {
ctx.body = {
code: -1,
success: false,
message: "未知错误"
}
})
router.get('/test/success', async (ctx, next) => {
ctx.body = {
code: 0,
success: true,
message: "成功",
data: {}
}
})
module.exports = router
module.exports = {
MQ: {
protocol: 'amqp',
hostname: '192.168.18.102',
port: 5672,
username: 'guest',
password: 'guest',
locale: 'en_US',
frameMax: 0,
heartbeat: 0,
vhost: '/',
},
PREFETCH: 5,
LADDERDELAYOPTIONSL: {
time: [3, 15, 30, 60],// 阶梯延时时间
lodderExchangeDLX: "lodder.exchange.dlx", // 延时交换器
lodderQueueKeyDLX: "lodder.queueKey.dlx", //lodder.queueKey.dlx.3 lodder.queueKey.dlx.15
lodderRoutingKeyDLX: "lodder.routingkey.dlx", // lodder.routingkey.dlx.3 lodder.routingkey.dlx.15
consumerExchange: "consumer.exchange",// 正常消费交换器
consumerQueue: "consumer.queue",// 正常消费队列
consumerRoutingKey: "consumer.routingkey"
},
DELAYOPTIONSL: {
delayExchange: "delay.exchange",
delayQueue: "delay.queue",
delayRoutingKey: "delay.routingkey"
}
}
\ No newline at end of file
const axios = require("axios");
/**
* 请求参数 要求
* url
* method
* headers 'Content-Type': 'application/json',
* params 是与请求一起发送的 URL 参数
* data
* timeout
*
* 返回参数 要求
* code 成功时返回 0 ,失败时返回具体错误码
* success 是否成功
* data 成功时具体返回值,失败时为 null
* message 成功时返回 null ,失败时返回具体错误消息
*/
async function request(message) {
try {
const { url, method = "POST", headers = {}, params = {}, data = {}, timeout = 5 } = message;
const response = await axios({
method,
url,
params,
data,
timeout: timeout * 1000,
headers: {
'Content-Type': 'application/json',
...headers
}
});
if (response.data.code !== 0) {
throw new Error(response.data.message)
}
} catch (err) {
console.log(err.message);
throw new Error(err.message);
}
}
module.exports = {
request
}
\ No newline at end of file
const rabbitmqClient = require("../client/rabbitmq");
const {
PREFETCH,
LADDERDELAYOPTIONSL: {
time,
lodderExchangeDLX,
lodderQueueKeyDLX,
lodderRoutingKeyDLX,
consumerExchange,
consumerQueue,
consumerRoutingKey
},
DELAYOPTIONSL: {
delayExchange,
delayQueue,
delayRoutingKey
}
} = require("../settings")
const { v4: uuidv4 } = require('uuid');
const { request } = require("./axios");
/**
* 重试机制
* 初始化 阶梯延迟队列
* TTL + DLX
*/
async function initLadderQueue() {
const channel = await rabbitmqClient.getConfirmChannel();
// 初始化 阶梯延时队列
await channel.assertExchange(lodderExchangeDLX, 'direct', { durable: true });
for (let val of time) {
const queue = `${lodderQueueKeyDLX}.${val}`;
await channel.assertQueue(queue,
{
exclusive: false,
deadLetterExchange: consumerExchange,
deadLetterRoutingKey: consumerRoutingKey,
arguments: {
"x-message-ttl": val * 1000
}
});
await channel.bindQueue(queue, lodderExchangeDLX, `${lodderRoutingKeyDLX}.${val}`);
}
// 初始化死信队列
await channel.assertExchange(consumerExchange, 'direct', { durable: true });
await channel.assertQueue(consumerQueue, { exclusive: false, });
await channel.bindQueue(consumerQueue, consumerExchange, consumerRoutingKey);
console.log("--- 阶梯延迟队列 && 死信队列 初始化成功 ----")
}
/**
* 重试机制
* 消费者 监听处理 业务
*/
async function listenCustomerLadderQueue() {
await initLadderQueue();
const channel = await rabbitmqClient.getConfirmChannel();
await channel.prefetch(PREFETCH, true);
await channel.consume(consumerQueue, async (msg) => {
const content = JSON.parse(msg.content.toString());
try {
await request(content);
channel.ack(msg);
} catch (err) {
if (content.retryCount < time.length) {
await sendRetryMessage({
...content,
retryCount: content.retryCount + 1
}, `${lodderQueueKeyDLX}.${time[content.retryCount]}`)
channel.ack(msg);
} else {
// 重试次数用完还是失败
console.log("------ 重试 发送消息 彻底 ERROR ------")
console.log(msg.content.toString());
channel.ack(msg);
}
}
},
{ noAck: false }
);
}
/**
* 重试机制
* 发送消息
* 1. 先入库 消息状态为 未知
* 2. 发送消息
* 3. 将消息状态改为 已发送到mq
*/
async function sendRetryMessage(message, queue = consumerQueue) {
if (!message.messageId) {
message.messageId = uuidv4();
}
if (!message.retryCount) {
message.retryCount = 0;
}
const channel = await rabbitmqClient.getConfirmChannel();
await channel.sendToQueue(queue, Buffer.from(JSON.stringify(message)), {
persistent: true, // 持久化 消息
messageId: message.messageId,
timestamp: new Date().getTime(),
}, function (err, ok) {
// console.log(err, ok)
})
return {
messageId: message.messageId
}
}
/**
* 延时
* 初始化 延时队列
* 根据 延时队列插件
*/
async function initDelayedQueue() {
const channel = await rabbitmqClient.getConfirmChannel();
await channel.assertExchange(delayExchange, 'x-delayed-message',
{
durable: true,
arguments: {
'x-delayed-type': 'direct'
}
});
await channel.assertQueue(delayQueue, { exclusive: false, });
await channel.bindQueue(delayQueue, delayExchange, delayRoutingKey);
console.log("--- 延时插件队列 初始化成功 ---")
}
/**
* 延时
* 消费者 监听处理 业务
* 延时插件
*/
async function listenCustomerQueue() {
await initDelayedQueue();
const channel = await rabbitmqClient.getConfirmChannel();
await channel.prefetch(PREFETCH, true);
await channel.consume(delayQueue, async (msg) => {
let content = JSON.parse(msg.content.toString());
try {
await request(content);
channel.ack(msg);
} catch (err) {
console.log("------ 延时 发送消息 彻底 ERROR ------")
console.log(msg.content.toString());
channel.ack(msg);
}
},
{ noAck: false }
);
}
/**
* 延时
* 发送消息 - 延时功能
* 1. 先入库 消息状态为 未知
* 2. 发送消息
* 3. 将消息状态改为 已发送到mq
*/
async function sendDelayMessage(message) {
if (!message.messageId) {
message.messageId = uuidv4();
}
const channel = await rabbitmqClient.getConfirmChannel();
await channel.publish(delayExchange, delayRoutingKey, Buffer.from(JSON.stringify(message)), {
persistent: true, // 持久化 消息
messageId: message.messageId,
timestamp: new Date().getTime(),
headers: {
'x-delay': message.delayTime * 1000, // 一定要设置,否则无效
}
}, function (err, ok) {
})
return {
messageId: message.messageId
}
}
async function initQueue() {
await listenCustomerLadderQueue()
await listenCustomerQueue()
}
module.exports = {
initQueue,
sendRetryMessage,
sendDelayMessage
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment