技术栈:我们的老伙计NestJS
,以及ioredis
,kafka-node
最近在研究kafka消息队列,所以想写个秒杀来试试手,看了好几篇博客都没有具体的项目示例,所以参考了一下各种实现用nestjs写了一个可运行的项目。
第一步,创建项目
-
安装@nest/cli脚手架用于生成项目;
npm i -g @nest/cli #安装nest-cli
-
生成项目
nest new nest-seckill #使用nest cli生成项目
cd ./nest-seckill
yarn #安装依赖
yarn add -S redis ioredis nestjs-redis kafka-node mysql2 typeorm uuid-random #添加依赖
第二步,生成seckill模块
-
生成
seckill.module.ts
文件;用于创建kafka消费者,接收kafka消息,写入订单信息;
nest generate module seckill # 可以简写为 `nest g mo seckill`
-
生成
用于实现秒杀的RESTful接口;seckill.controller.ts
;nest g co seckill
-
生成
seckill.service.ts
;在
service
里使用redis乐观锁(watch)
与事务(mult)
实现秒杀逻辑, 再使用kafka的Producer
生产一条消费数据;nest g service seckill
-
生成
redis.service.ts
;用于连接redis;
nest g service redis
修改内容:
import { Injectable } from '@nestjs/common' import { RedisService } from 'nestjs-redis' @Injectable() export class RedisClientService { constructor(private readonly redisService: RedisService) {} // 连接配置已在app.module设置 async getSeckillRedisClient() { return await this.redisService.getClient('seckill') } }
第三步,编写秒杀逻辑;
-
定义秒杀接口:
在
seckill.controller.ts
里新增一个Post接口:import { Body, Controller, Post } from '@nestjs/common' import * as uuid from 'uuid-random' // 使用uuid生成订单号 import { CreateOrderDTO } from '../order/order.dto' // 新增订单字段定义 import { SeckillService } from './seckill.service' // 秒杀逻辑具体实现 import { awaitWrap } from '@/utils/index' // async返回值简化方法 @Controller('seckill') export class SeckillController { constructor(private readonly seckillService: SeckillService) {} @Post('/add') async addOrder(@Body() order: CreateOrderDTO) { const params: CreateOrderDTO = { ...order, openid: `${uuid()}-${new Date().valueOf()}`, } // 调用service的secKill方法,并等待完成 const [error, result] = await awaitWrap(this.seckillService.secKill(params)) return error || result } }
-
实现秒杀逻辑:
在
seckill.service.ts
里新增一个secKill
方法;使用redis
乐观锁(watch)
和事务(mult)
,实现并发下修改数据,详情可参考node redis文档;import { Injectable, Logger } from '@nestjs/common' import * as kafka from 'kafka-node' import * as Redis from 'ioredis' import { RedisClientService } from '../redis/redis.service' import { getConfig } from '@root/config/index' // redis和 kafka的连接配置 import { awaitWrap } from '@/utils' const { redisSeckill, kafkaConfig } = getConfig() // 创建kafka Client const kafkaClient = new kafka.KafkaClient({ kafkaHost: kafkaConfig.kafkaHost }) // 创建kafka生产者 const producer = new kafka.Producer(kafkaClient, { // Configuration for when to consider a message as acknowledged, default 1 requireAcks: 1, // The amount of time in milliseconds to wait for all acks before considered, default 100ms ackTimeoutMs: 100, // Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3, custom = 4), default 0 partitionerType: 2, }) @Injectable() export class SeckillService { logger = new Logger('SeckillService') // 创建nest自带的日志实例 seckillRedisClient!: Redis.Redis // redis连接实例 count = 0 // 当前请求的次数 constructor(private readonly redisClientService: RedisClientService) { // service 创建时异步初始化redis连接 this.redisClientService.getSeckillRedisClient().then(client => { this.seckillRedisClient = client }) } /* * *********************** * @desc 秒杀具体实现 * *********************** */ async secKill(params) { const { seckillCounterKey } = redisSeckill this.logger.log(`当前请求count:${this.count++}`) // tips:使用乐观锁解决并发 const [watchError] = await awaitWrap(this.seckillRedisClient.watch(seckillCounterKey)) //监听'counter'字段更改 watchError && this.logger.error(watchError) if (watchError) return watchError // 获取当前当前订单剩余数量 const [getError, reply] = await awaitWrap(this.seckillRedisClient.get(seckillCounterKey)) getError && this.logger.error(getError) if (getError) return getError if (parseInt(reply) <= 0) { this.logger.warn('已经卖光了') return '已经卖光了' } //tips: 使用redis事务修改redis的counter数量减一 const [execError, replies] = await awaitWrap(this.seckillRedisClient.multi().decr(seckillCounterKey).exec()) execError && this.logger.error(execError) if (execError) return execError // counter字段正在操作中,等待counter被其他释放 if (!replies) { this.logger.warn('counter被使用') this.secKill(params) // 自动重试 return } // kafka消费数据的内容 const payload = [ { topic: kafkaConfig.topic, partition: 0, messages: [JSON.stringify(params)], }, ] this.logger.log('生产数据payload:') this.logger.verbose(payload) // 异步等待发送kafka消费数据 return new Promise((resolve, reject) => { producer.send(payload, (err, kafkaProducerResponse) => { if (err) { this.logger.error(err) reject(err) return err } this.logger.verbose(kafkaProducerResponse) resolve({ payload, kafkaProducerResponse }) }) }) } }
-
监听kafka消息,消费订单队列消息;
在
seckill.module.ts
内新增handleListenerKafkaMessage()
方法,用于处理kafka消息;同时需要在
seckill
模块挂载(onApplicationBootstrap)
时调用此方法,开始订阅kafka消息;import { Logger, Module, OnApplicationBootstrap } from '@nestjs/common' import * as Redis from 'ioredis' import { awaitWrap } from '@/utils' import { CreateOrderDTO } from '../order/order.dto' import { OrderModule } from '../order/order.module' import { OrderService } from '../order/order.service' import { RedisClientService } from '../redis/redis.service' import { getKafkaConsumer } from './kafka-utils' import { SeckillController } from './seckill.controller' import { SeckillService } from './seckill.service' import { getConfig } from '@root/config' const { kafkaConfig } = getConfig() @Module({ imports: [OrderModule], providers: [RedisClientService, SeckillService], controllers: [SeckillController], }) export class SeckillModule implements OnApplicationBootstrap { logger = new Logger('SeckillModule') seckillRedisClient!: Redis.Redis constructor( private readonly orderService: OrderService, //处理订单的Service private readonly seckillService: SeckillService, //秒杀相关实现 private readonly redisClientService: RedisClientService //redis连接 ) { this.redisClientService.getSeckillRedisClient().then(client => { this.seckillRedisClient = client }) } async handleListenerKafkaMessage() { const kafkaConsumer = getKafkaConsumer() //抽取出创建消费者实现方法为函数 kafkaConsumer.on('message', async message => { this.logger.log('得到的生产者的数据为:') this.logger.verbose(message) let order!: CreateOrderDTO // 从kafka队列得到的订单数据,即service里producer.send的messages内容 if (typeof message.value === 'string') { order = JSON.parse(message.value) } else { order = JSON.parse(message.value.toString()) } // 写入数据库,完成订单创建 const [err, order] = await awaitWrap(this.orderService.saveOne(value)) if (err) { this.logger.error(err) return } this.logger.log(`订单【${order.id}】信息已存入数据库`) }) } async onApplicationBootstrap() { this.logger.log('onApplicationBootstrap: ') await this.seckillService.initCount() //重置redis里商品剩余库存数 this.handleListenerKafkaMessage() } }
-
kafka消费者
getKafkaConsumer
方法实现如下:在seckill模块文件夹下新增
kafka-utils.ts
文件:import * as kafka from 'kafka-node' import * as Redis from 'ioredis' import { getConfig } from '@root/config/index' import { awaitWrap } from '@/utils' const { kafkaConfig } = getConfig() let kafkaConsumer!: kafka.Consumer // 获取kafka client function getKafkaClient() { let kafkaClient!: kafka.KafkaClient return () => { if (!kafkaClient) { kafkaClient = new kafka.KafkaClient({ kafkaHost: kafkaConfig.kafkaHost, }) } return kafkaClient } } /** * @desc 获取消费者实例 */ export function getKafkaConsumer() { // consumer要订阅的topics配置 const topics = [ { topic: kafkaConfig.topic, partition: 0, offset: 0, }, ] const options = { // 自动提交配置 (false 不会提交偏移量,每次都从头开始读取) autoCommit: true, autoCommitIntervalMs: 5000, // 如果设置为true,则consumer将从有效负载中的给定偏移量中获取消息 fromOffset: false, } const kafkaClient = getKafkaClient()() if (!kafkaConsumer) { kafkaConsumer = new kafka.Consumer(kafkaClient, topics, options) } return kafkaConsumer }
最后我们得到的文件结构大概是这样:
运行项目:
yarn dev
一些说明
-
如果需要并发测试秒杀接口,可以使用
postman
的runner
多开;简单测试接口逻辑的话,可以打开项目默认配置的swagger-ui
页面http://localhost:3000/api-docs -
至此我们的主要秒杀逻辑就写的差不多了。由于我们主要为了实现秒杀逻辑,所有订单模块的代码就没有在这里展开了。我们只需要像第二步那样几行命令就可以简单创建Order模块,用于订单curd;
-
关于redis,mysql,kafka等服务的话可以编写
docker-compose.yaml
快速启动起来,具体可以参考本项目代码;kafka容器可能会由于centos的防火墙导致启动失败,解决办法是:先关闭宿主机防火墙再重启docker;
-
kafka容器创建后,需要我们在打开浏览器访问
kafka-manager
容器映射的9000
端口上kafka管理页面,创建cluster和我们的Topic,具体初始化操作较为简单,可自行搜索kafka-manager
;例如Kafka集群管理工具kafka-manager的安装使用
常见问题FAQ
- 免费下载或者VIP会员专享资源能否直接商用?
- 本站所有资源版权均属于原作者所有,这里所提供资源均只能用于参考学习用,请勿直接商用。若由于商用引起版权纠纷,一切责任均由使用者承担。更多说明请参考 VIP介绍。
- 提示下载完但解压或打开不了?
- 找不到素材资源介绍文章里的示例图片?
- 模板不会安装或需要功能定制以及二次开发?
发表评论
还没有评论,快来抢沙发吧!