Commit 9fedbdec by Sxy

feat: 界面功能

parent 8361d500
......@@ -2,6 +2,7 @@
* 消息请求失败重试
* 消息请求延时发送
* 全链路日志
* 界面展示
#### 启动
......@@ -9,6 +10,12 @@
>
> 默认 3000端口
#### 界面
> http://127.0.0.1:3000/index.html
<img src="/Users/shaoxingye/Library/Application Support/typora-user-images/image-20210801203634043.png" alt="image-20210801203634043" style="zoom:50%;" />
#### API
> 注: 全链路追踪 请求头 X-Request-Id
......@@ -95,4 +102,3 @@ method : post
./filebeat -e -c {自定义}/filebeat.yml
```
......@@ -7,9 +7,9 @@ import bodyparser from 'koa-bodyparser'
// import logger from 'koa-logger'
import parameter from 'koa-parameter'
import moment from "moment"
import rabbmitmqUtil from "./utils/rabbitmq"
rabbmitmqUtil.initQueue()
import { initQueue } from "./utils/rabbitmq"
initQueue();
import correlation from './middleware/correlation'
import { createNamespace, getNamespace } from 'cls-hooked'
......@@ -31,14 +31,15 @@ onerror(app)
app.use(bodyparser({
enableTypes: ['json', 'form', 'text']
}))
app.use(parameter(app))
app.use(json())
// app.use(logger())
app.use(require('koa-static')(__dirname + '/public'))
app.use(views(__dirname + '/views', {
extension: 'html'
}))
// app.use(views(__dirname + '/views', {
// extension: 'html'
// }))
app.use(correlation(logNameSpace));
......@@ -56,6 +57,9 @@ app.use(async (ctx, next) => {
})
})
app.use(parameter(app))
// routes
app.use(index.routes(), index.allowedMethods())
app.use(test.routes(), test.allowedMethods())
......
import mongoose from 'mongoose'
import settings from "../settings"
import logger from "../utils/logger"
const { MONGO } = settings;
mongoose.connect(MONGO.url, {
useNewUrlParser: true,
useUnifiedTopology: true,
useCreateIndex: true
}, function (err) {
if (err) {
logger.error(` MongoDD 连接失败 ${err.message} `);
process.exit(1);
}
})
mongoose.connection.on('disconnected', () => {
mongoose.connect(MONGO.url)
})
mongoose.connection.on('error', err => {
logger.error(err)
})
mongoose.connection.on('open', async () => {
logger.info(`--- MongoDD 连接成功 ----`)
})
export default mongoose;
......@@ -2,10 +2,10 @@ import { v4 as uuidv4 } from 'uuid';
function correlation(namespace) {
return async (ctx, next) => {
await namespace.runPromise(() => {
const requestId = ctx.header['x-request-id'] || uuidv4();
namespace.run(async () => {
namespace.set('requestId', requestId);
await next();
return next();
});
}
}
......
import mongoose from "../client/mongo";
import messageSchema from "./message";
export const Message = mongoose.model('Message', messageSchema);
\ No newline at end of file
import mongoose from "mongoose"
const messageSchema = new mongoose.Schema({
messageId: { type: String, unique: true },// 发送ID
type: { type: Number, enum: [1, 2], index: true },// 发送类型 1延时 , 2重试
reqContent: { type: Object },// 发送内容
resContent: { type: Array },// 相应内容
status: { type: Number, enum: [-2, -1, 0, 1, 2], default: 0 }, // 发送状态 -2 发送失败 -1 投递失败 0 待发送 1 投递成功, 2 发送成功
}, {
timestamps: {
createdAt: 'createdAt',
updatedAt: 'updatedAt'
}
})
export default messageSchema
......@@ -24,6 +24,7 @@
"koa-static": "^5.0.0",
"koa-views": "^6.2.0",
"moment": "^2.29.1",
"mongoose": "^5.13.5",
"parameter": "^3.6.0",
"uuid": "^8.3.2",
"winston": "^3.3.3",
......
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<!-- import CSS -->
<link rel="stylesheet" href="https://unpkg.com/element-ui/lib/theme-chalk/index.css">
<style>
a {
color: #00A4FF;
text-decoration: none;
outline: none;
-webkit-tap-highlight-color: transparent;
}
a:link,
a:visited {
text-decoration: none;
/*超链接无下划线*/
}
a:hover {
text-decoration: none;
/*鼠标放上去有下划线*/
}
</style>
</head>
<body>
<div id="app">
<el-form :inline="true" :model="formInline" class="demo-form-inline">
<el-form-item label="消息Id">
<el-input v-model="formInline.messageId" placeholder="消息Id"></el-input>
</el-form-item>
<el-form-item>
<el-button type="primary" @click="onSubmit">查询</el-button>
</el-form-item>
<el-table :data="tableData" border style="width: 100%" max-height="700" v-loading="loading">
<el-table-column fixed prop="messageId" label="消息Id" width="300">
</el-table-column>
<el-table-column fixed prop="type" label="消息类型" width="150">
<template slot-scope="scope">
<el-tag v-if="scope.row.type === 1">延时</el-tag>
<el-tag v-if="scope.row.type === 2" color="#e1f3d8">重试</el-tag>
</template>
</el-table-column>
<el-table-column fixed prop="status" label="消息状态" width="200">
<template slot-scope="scope">
<el-tag v-if="scope.row.status === -2" color="#f56c6c">发送失败</el-tag>
<el-tag v-if="scope.row.status === -1" color="#fde2e2">投递失败</el-tag>
<el-tag v-if="scope.row.status === 0" color="#dcdfe6">待定</el-tag>
<el-tag v-if="scope.row.status === 1" color="#d9ecff">投递成功</el-tag>
<el-tag v-if="scope.row.status === 2" color="#e1f3d8">发送成功</el-tag>
</template>
</el-table-column>
<el-table-column prop="createdAt" label="创建时间" width="200">
<template slot-scope="scope">
{{formatDate(scope.row.createdAt)}}
</template>
</el-table-column>
<el-table-column prop="updatedAt" label="更新时间" width="200">
<template slot-scope="scope">
{{formatDate(scope.row.updatedAt)}}
</template>
</el-table-column>
<el-table-column fixed="right" label="详情">
<template slot-scope="scope">
<el-button @click="handleClick(scope.row)" type="text" size="small">详情</el-button>
</template>
</el-table>
<div style="text-align: center;margin-top: 15px;">
<el-pagination @size-change="handleSizeChange" @current-change="handleCurrentChange" :current-page="page"
:page-sizes="[10,30,50,100,200]" :page-size="limit" layout="total, sizes, prev, pager, next" :total="total">
</el-pagination>
</div>
<el-drawer title="消息详情" :visible.sync="drawer" size="50%">
<div style="font-size: 15px;white-space: pre-wrap;" v-html="jsonData">
</div>
</el-drawer>
</div>
</body>
<!-- import Vue before Element -->
<script src="https://unpkg.com/vue/dist/vue.js"></script>
<!-- import JavaScript -->
<script src="https://unpkg.com/element-ui/lib/index.js"></script>
<script src="https://unpkg.com/axios/dist/axios.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/moment@2.18.1/min/moment.min.js"></script>
<script>
new Vue({
el: '#app',
data: function () {
return {
formInline: {},
tableData: [],
total: 0,
limit: 10,
page: 1,
loading: true,
jsonData: {},
drawer: false
}
},
methods: {
async initData(filter) {
try {
this.loading = true;
let data = await axios.post("/list", {
page: this.page,
limit: this.limit,
filter
});
this.loading = false;
data = data.data;
if (data.code === 0) {
data = data.data;
this.tableData = data.list;
this.total = data.count;
} else {
throw new Error(data.message)
}
} catch (err) {
alert(err.message)
}
},
handleSizeChange(val) {
this.limit = val;
this.initData()
},
handleCurrentChange(val) {
this.page = val;
this.initData()
},
handleClick(row) {
this.jsonData = row
this.drawer = true
},
formatDate(date) {
return moment(date).format('YYYY-MM-DD HH:mm:ss')
},
onSubmit() {
const messageId = this.formInline.messageId
if (messageId && messageId !== "") {
this.page = 1
this.initData({
messageId
})
} else {
this.initData()
}
}
},
mounted() {
this.initData();
}
})
</script>
</html>
\ No newline at end of file
......@@ -2,11 +2,28 @@ import Router from 'koa-router'
const router = Router()
import { sendDelayMessage, sendRetryMessage } from "../utils/rabbitmq"
import { ok, error } from "../utils/system"
import { Message } from "../model/index"
router.get('/', async (ctx, next) => {
await ctx.render('index')
router.post('/list', async (ctx, next) => {
ctx.verifyParams({
page: { type: 'int', required: false },
limit: { type: 'int', required: false },
filter: { type: 'object', required: false },
})
const { page = 1, limit = 10, filter = {} } = ctx.request.body;
ctx.body = ok({
list: await Message.find(filter, {}, {
skip: (page - 1) * limit,
limit,
sort: {
createdAt: -1
}
}),
count: await Message.countDocuments(filter)
})
})
/**
* 发送消息 - 重试机制
*/
......@@ -19,8 +36,12 @@ router.post('/sendRetryMessage', async (ctx, next) => {
data: { type: 'object', required: false },
timeout: { type: 'int', required: false },
})
try {
const result = await sendRetryMessage(ctx.request.body)
ctx.body = ok(result)
} catch (err) {
ctx.body = error(err.message)
}
})
/**
......@@ -36,8 +57,12 @@ router.post('/sendDelayMessage', async (ctx, next) => {
delayTime: { type: 'int', required: true },
timeout: { type: 'int', required: false },
})
try {
const result = await sendDelayMessage(ctx.request.body)
ctx.body = ok(result)
} catch (err) {
ctx.body = error(err.message)
}
})
......
export default {
MONGO: {
url: "mongodb://192.168.124.20:27017/message-call"
},
MQ: {
protocol: 'amqp',
hostname: '192.168.18.102',
hostname: '192.168.124.20',
port: 5672,
username: 'guest',
password: 'guest',
......
......@@ -22,9 +22,10 @@ import logger from "../utils/logger"
export async function request(message) {
const { url, method = "POST", headers = {}, params = {}, data = {}, timeout = 5 } = message;
const start = new Date()
let response;
try {
const logNameSpace = getNamespace('logger');
const response = await axios({
response = await axios({
method,
url,
params,
......@@ -46,10 +47,11 @@ export async function request(message) {
requestId: headers["X-Request-Id"]
})
}
return response.data
} catch (err) {
logger.error({
request: message,
response: err.message,
response: response.data,
usedTime: new Date() - start,
requestId: headers["X-Request-Id"]
})
......
......@@ -25,6 +25,8 @@ import logger from "../utils/logger"
import { getNamespace } from 'cls-hooked';
import { Message } from "../model/index"
/**
* 重试机制
* 初始化 阶梯延迟队列
......@@ -71,14 +73,32 @@ async function listenCustomerLadderQueue() {
await channel.consume(consumerQueue, async (msg) => {
const content = JSON.parse(msg.content.toString());
try {
await request(content);
const response = await request(content);
await Message.updateOne({ messageId: content.messageId }, {
status: 2,
$push: {
resContent: response
}
});
channel.ack(msg);
} catch (err) {
await Message.updateOne({ messageId: content.messageId }, {
status: -2,
$push: {
resContent: {
errMsg: err.message || "未知"
}
}
});
if (content.retryCount < time.length) {
try {
await sendRetryMessage({
...content,
retryCount: content.retryCount + 1
}, `${lodderQueueKeyDLX}.${time[content.retryCount]}`)
} catch (err) {
log.error(err)
}
channel.ack(msg);
} else {
// 重试次数用完还是失败
......@@ -98,13 +118,7 @@ async function listenCustomerLadderQueue() {
* 2. 发送消息
* 3. 将消息状态改为 已发送到mq
*/
async function sendRetryMessage(message, queue = consumerQueue) {
if (!message.messageId) {
message.messageId = uuidv4();
}
if (!message.retryCount) {
message.retryCount = 0;
}
export async function sendRetryMessage(message, queue = consumerQueue) {
if (!message.headers) {
message.headers = {}
}
......@@ -112,17 +126,52 @@ async function sendRetryMessage(message, queue = consumerQueue) {
const logNameSpace = getNamespace('logger');
message.headers["X-Request-Id"] = logNameSpace.get("requestId") || uuidv4();
}
if (!message.messageId) {
message.messageId = uuidv4();
await new Message({
messageId: message.messageId,
type: 2,
reqContent: message,
status: 0,
}).save()
} else {
await Message.updateOne({ messageId: message.messageId }, {
messageId: message.messageId,
type: 2,
reqContent: message,
status: 0,
});
}
if (!message.retryCount) {
message.retryCount = 0;
}
const channel = await rabbitmqClient.getConfirmChannel();
await channel.sendToQueue(queue, Buffer.from(JSON.stringify(message)), {
const fun = () => {
return new Promise((resolve, reject) => {
channel.sendToQueue(queue, Buffer.from(JSON.stringify(message)), {
persistent: true, // 持久化 消息
messageId: message.messageId,
timestamp: new Date().getTime(),
}, function (err, ok) {
// console.log(err, ok)
}, async function (err, ok) {
if (err !== null) {
await Message.updateOne({ messageId: message.messageId, status: 0 }, {
status: -1
});
logger.error(err);
reject(new Error(`消息Id [${message.messageId}] 发送mq失败`))
} else {
await Message.updateOne({ messageId: message.messageId, status: 0 }, {
status: 1
});
resolve(message.messageId)
}
})
})
return {
messageId: message.messageId,
}
return fun()
}
......@@ -160,9 +209,23 @@ async function listenCustomerQueue() {
await channel.consume(delayQueue, async (msg) => {
let content = JSON.parse(msg.content.toString());
try {
await request(content);
const response = await request(content);
await Message.updateOne({ messageId: content.messageId }, {
status: 2,
$push: {
resContent: response
}
});
channel.ack(msg);
} catch (err) {
await Message.updateOne({ messageId: content.messageId }, {
status: -2,
$push: {
resContent: {
errMsg: err.message || "未知"
}
}
});
channel.ack(msg);
}
},
......@@ -179,10 +242,7 @@ async function listenCustomerQueue() {
* 2. 发送消息
* 3. 将消息状态改为 已发送到mq
*/
async function sendDelayMessage(message) {
if (!message.messageId) {
message.messageId = uuidv4();
}
export async function sendDelayMessage(message) {
if (!message.headers) {
message.headers = {}
}
......@@ -190,28 +250,56 @@ async function sendDelayMessage(message) {
const logNameSpace = getNamespace('logger');
message.headers["X-Request-Id"] = logNameSpace.get("requestId") || uuidv4();
}
if (!message.messageId) {
message.messageId = uuidv4();
await new Message({
messageId: message.messageId,
type: 1,
reqContent: message,
status: 0,
}).save();
} else {
await Message.updateOne({ messageId: message.messageId }, {
messageId: message.messageId,
type: 1,
reqContent: message,
status: 0,
});
}
const channel = await rabbitmqClient.getConfirmChannel();
await channel.publish(delayExchange, delayRoutingKey, Buffer.from(JSON.stringify(message)), {
const fun = () => {
return new Promise((resolve, reject) => {
channel.publish(delayExchange, delayRoutingKey, Buffer.from(JSON.stringify(message)), {
persistent: true, // 持久化 消息
messageId: message.messageId,
timestamp: new Date().getTime(),
headers: {
'x-delay': message.delayTime * 1000, // 一定要设置,否则无效
}
}, function (err, ok) {
}, async function (err, ok) {
if (err !== null) {
await Message.updateOne({ messageId: message.messageId, status: 0 }, {
status: -1
});
logger.error(err);
reject(new Error(`消息Id [${message.messageId}] 发送mq失败`))
} else {
await Message.updateOne({ messageId: message.messageId, status: 0 }, {
status: 1
});
resolve(message.messageId)
}
})
})
return {
messageId: message.messageId,
}
return fun()
}
async function initQueue() {
export async function initQueue() {
await listenCustomerLadderQueue()
await listenCustomerQueue()
}
module.exports = {
initQueue,
sendRetryMessage,
sendDelayMessage
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment