最新公告
  • 欢迎您光临起源地模板网,本站秉承服务宗旨 履行“站长”责任,销售只是起点 服务永无止境!立即加入钻石VIP
  • 使用NestJS+Redis+Kafka实现简单秒杀系统

    正文概述 掘金(wenqieqiu)   2020-11-24   1747

    技术栈:我们的老伙计NestJS,以及iorediskafka-node

    最近在研究kafka消息队列,所以想写个秒杀来试试手,看了好几篇博客都没有具体的项目示例,所以参考了一下各种实现用nestjs写了一个可运行的项目。

    第一步,创建项目

    1. 安装@nest/cli脚手架用于生成项目;

    npm i -g @nest/cli   #安装nest-cli
    

    1. 生成项目

    nest new nest-seckill   #使用nest cli生成项目
    cd ./nest-seckill
    yarn                    #安装依赖
    yarn add -S redis ioredis nestjs-redis kafka-node mysql2 typeorm uuid-random    #添加依赖
    

    第二步,生成seckill模块


    1. 生成 seckill.module.ts文件;

      用于创建kafka消费者,接收kafka消息,写入订单信息;

      nest generate module seckill 
      # 可以简写为 `nest g mo seckill`
      

    1. 生成 seckill.controller.ts

      用于实现秒杀的RESTful接口;
      nest g co seckill
      

    1. 生成 seckill.service.ts;

      service里使用redis乐观锁(watch)事务(mult)实现秒杀逻辑, 再使用kafka的Producer生产一条消费数据;

      nest g service seckill
      

    1. 生成 redis.service.ts;

      用于连接redis;

      nest g service redis
      

      修改内容:

       import { Injectable } from '@nestjs/common'
       import { RedisService } from 'nestjs-redis'
      
       @Injectable()
       export class RedisClientService {
         constructor(private readonly redisService: RedisService) {}
      
         // 连接配置已在app.module设置
         async getSeckillRedisClient() {
           return await this.redisService.getClient('seckill')
         }
       }
      

    第三步,编写秒杀逻辑;


    1. 定义秒杀接口:

      seckill.controller.ts里新增一个Post接口:

       import { Body, Controller, Post } from '@nestjs/common'
       import * as uuid from 'uuid-random'                   // 使用uuid生成订单号
       import { CreateOrderDTO } from '../order/order.dto'   // 新增订单字段定义
       import { SeckillService } from './seckill.service'   // 秒杀逻辑具体实现
       import { awaitWrap } from '@/utils/index'            // async返回值简化方法
      
      @Controller('seckill')
       export class SeckillController {
        constructor(private readonly seckillService: SeckillService) {}
           
         @Post('/add')
         async addOrder(@Body() order: CreateOrderDTO) {
           const params: CreateOrderDTO = {
             ...order,
             openid: `${uuid()}-${new Date().valueOf()}`,
           }
      
           // 调用service的secKill方法,并等待完成
           const [error, result] = await awaitWrap(this.seckillService.secKill(params))
           return error || result
         }
       }
      

    1. 实现秒杀逻辑:

      seckill.service.ts里新增一个secKill方法;

      使用redis乐观锁(watch)事务(mult),实现并发下修改数据,详情可参考node redis文档;

      import { Injectable, Logger } from '@nestjs/common'
      import * as kafka from 'kafka-node'
      import * as Redis from 'ioredis'
      import { RedisClientService } from '../redis/redis.service'
      import { getConfig } from '@root/config/index' // redis和 kafka的连接配置
      import { awaitWrap } from '@/utils'
      
      const { redisSeckill, kafkaConfig } = getConfig()
      
      // 创建kafka Client
      const kafkaClient = new kafka.KafkaClient({ kafkaHost: kafkaConfig.kafkaHost })
      // 创建kafka生产者
      const producer = new kafka.Producer(kafkaClient, {
        // Configuration for when to consider a message as acknowledged, default 1
        requireAcks: 1,
        // The amount of time in milliseconds to wait for all acks before considered, default 100ms
        ackTimeoutMs: 100,
        // Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3, custom = 4), default 0
        partitionerType: 2,
      })
      
      @Injectable()
      export class SeckillService {
        logger = new Logger('SeckillService') // 创建nest自带的日志实例
        seckillRedisClient!: Redis.Redis // redis连接实例
        count = 0 // 当前请求的次数
      
        constructor(private readonly redisClientService: RedisClientService) {
          // service 创建时异步初始化redis连接
          this.redisClientService.getSeckillRedisClient().then(client => {
            this.seckillRedisClient = client
          })
        }
      
        /*
         * ***********************
         * @desc 秒杀具体实现
         * ***********************
         */
        async secKill(params) {
          const { seckillCounterKey } = redisSeckill
          this.logger.log(`当前请求count:${this.count++}`)
      
          // tips:使用乐观锁解决并发
          const [watchError] = await awaitWrap(this.seckillRedisClient.watch(seckillCounterKey)) //监听'counter'字段更改
          watchError && this.logger.error(watchError)
          if (watchError) return watchError
      
          // 获取当前当前订单剩余数量
          const [getError, reply] = await awaitWrap(this.seckillRedisClient.get(seckillCounterKey))
          getError && this.logger.error(getError)
          if (getError) return getError
          if (parseInt(reply) <= 0) {
            this.logger.warn('已经卖光了')
            return '已经卖光了'
          }
      
          //tips: 使用redis事务修改redis的counter数量减一
          const [execError, replies] = await awaitWrap(this.seckillRedisClient.multi().decr(seckillCounterKey).exec())
          execError && this.logger.error(execError)
          if (execError) return execError
      
          // counter字段正在操作中,等待counter被其他释放
          if (!replies) {
            this.logger.warn('counter被使用')
            this.secKill(params) // 自动重试
            return
          }
      
          // kafka消费数据的内容
          const payload = [
            {
              topic: kafkaConfig.topic,
              partition: 0,
              messages: [JSON.stringify(params)],
            },
          ]
      
          this.logger.log('生产数据payload:')
          this.logger.verbose(payload)
      
          // 异步等待发送kafka消费数据
          return new Promise((resolve, reject) => {
            producer.send(payload, (err, kafkaProducerResponse) => {
              if (err) {
                this.logger.error(err)
                reject(err)
                return err
              }
      
              this.logger.verbose(kafkaProducerResponse)
              resolve({ payload, kafkaProducerResponse })
            })
          })
        }
           
      }
      

    1. 监听kafka消息,消费订单队列消息;

      seckill.module.ts内新增handleListenerKafkaMessage()方法,用于处理kafka消息;

      同时需要在seckill模块挂载(onApplicationBootstrap)时调用此方法,开始订阅kafka消息;

      import { Logger, Module, OnApplicationBootstrap } from '@nestjs/common'
      import * as Redis from 'ioredis'
      import { awaitWrap } from '@/utils'
      import { CreateOrderDTO } from '../order/order.dto'
      import { OrderModule } from '../order/order.module'
      import { OrderService } from '../order/order.service'
      import { RedisClientService } from '../redis/redis.service'
      import { getKafkaConsumer } from './kafka-utils'
      import { SeckillController } from './seckill.controller'
      import { SeckillService } from './seckill.service'
      import { getConfig } from '@root/config'
      
      const { kafkaConfig } = getConfig()
      
      @Module({
        imports: [OrderModule],
        providers: [RedisClientService, SeckillService],
        controllers: [SeckillController],
      })
      export class SeckillModule implements OnApplicationBootstrap {
        logger = new Logger('SeckillModule')
        seckillRedisClient!: Redis.Redis
      
        constructor(
          private readonly orderService: OrderService, //处理订单的Service
          private readonly seckillService: SeckillService, //秒杀相关实现
          private readonly redisClientService: RedisClientService //redis连接
        ) {
          this.redisClientService.getSeckillRedisClient().then(client => {
            this.seckillRedisClient = client
          })
        }
      
        async handleListenerKafkaMessage() {
          const kafkaConsumer = getKafkaConsumer() //抽取出创建消费者实现方法为函数
      
          kafkaConsumer.on('message', async message => {
            this.logger.log('得到的生产者的数据为:')
            this.logger.verbose(message)
      
            let order!: CreateOrderDTO // 从kafka队列得到的订单数据,即service里producer.send的messages内容
      
            if (typeof message.value === 'string') {
              order = JSON.parse(message.value)
            } else {
              order = JSON.parse(message.value.toString())
            }
      
            // 写入数据库,完成订单创建
            const [err, order] = await awaitWrap(this.orderService.saveOne(value))
            if (err) {
              this.logger.error(err)
              return
            }
            this.logger.log(`订单【${order.id}】信息已存入数据库`)
          })
        }
      
        async onApplicationBootstrap() {
          this.logger.log('onApplicationBootstrap: ')
          await this.seckillService.initCount()         //重置redis里商品剩余库存数
          this.handleListenerKafkaMessage()
        }
      }
      
      

    1. kafka消费者getKafkaConsumer方法实现如下:

      在seckill模块文件夹下新增kafka-utils.ts文件:

      import * as kafka from 'kafka-node'
      import * as Redis from 'ioredis'
      import { getConfig } from '@root/config/index'
      import { awaitWrap } from '@/utils'
      
      const { kafkaConfig } = getConfig()
      let kafkaConsumer!: kafka.Consumer
      
      // 获取kafka client
      function getKafkaClient() {
        let kafkaClient!: kafka.KafkaClient
      
        return () => {
          if (!kafkaClient) {
            kafkaClient = new kafka.KafkaClient({
              kafkaHost: kafkaConfig.kafkaHost,
            })
          }
      
          return kafkaClient
        }
      }
      
          /**
         * @desc 获取消费者实例
         */
        export function getKafkaConsumer() {
          // consumer要订阅的topics配置
          const topics = [
            {
              topic: kafkaConfig.topic,
              partition: 0,
              offset: 0,
            },
          ]
      
          const options = {
            //  自动提交配置   (false 不会提交偏移量,每次都从头开始读取)
            autoCommit: true,
            autoCommitIntervalMs: 5000,
            //  如果设置为true,则consumer将从有效负载中的给定偏移量中获取消息
            fromOffset: false,
          }
          const kafkaClient = getKafkaClient()()
      
          if (!kafkaConsumer) {
            kafkaConsumer = new kafka.Consumer(kafkaClient, topics, options)
          }
      
          return kafkaConsumer
        }
      

    最后我们得到的文件结构大概是这样: 使用NestJS+Redis+Kafka实现简单秒杀系统

    运行项目:

    yarn dev
    

    一些说明

    1. 如果需要并发测试秒杀接口,可以使用postmanrunner多开;简单测试接口逻辑的话,可以打开项目默认配置的swagger-ui页面http://localhost:3000/api-docs

    2. 至此我们的主要秒杀逻辑就写的差不多了。由于我们主要为了实现秒杀逻辑,所有订单模块的代码就没有在这里展开了。我们只需要像第二步那样几行命令就可以简单创建Order模块,用于订单curd;

    3. 关于redis,mysql,kafka等服务的话可以编写docker-compose.yaml快速启动起来,具体可以参考本项目代码;

      kafka容器可能会由于centos的防火墙导致启动失败,解决办法是:先关闭宿主机防火墙再重启docker;
    4. kafka容器创建后,需要我们在打开浏览器访问kafka-manager容器映射的9000端口上kafka管理页面,创建cluster和我们的Topic,具体初始化操作较为简单,可自行搜索kafka-manager

      例如Kafka集群管理工具kafka-manager的安装使用


    起源地下载网 » 使用NestJS+Redis+Kafka实现简单秒杀系统

    常见问题FAQ

    免费下载或者VIP会员专享资源能否直接商用?
    本站所有资源版权均属于原作者所有,这里所提供资源均只能用于参考学习用,请勿直接商用。若由于商用引起版权纠纷,一切责任均由使用者承担。更多说明请参考 VIP介绍。
    提示下载完但解压或打开不了?
    最常见的情况是下载不完整: 可对比下载完压缩包的与网盘上的容量,若小于网盘提示的容量则是这个原因。这是浏览器下载的bug,建议用百度网盘软件或迅雷下载。若排除这种情况,可在对应资源底部留言,或 联络我们.。
    找不到素材资源介绍文章里的示例图片?
    对于PPT,KEY,Mockups,APP,网页模版等类型的素材,文章内用于介绍的图片通常并不包含在对应可供下载素材包内。这些相关商业图片需另外购买,且本站不负责(也没有办法)找到出处。 同样地一些字体文件也是这种情况,但部分素材会在素材包内有一份字体下载链接清单。
    模板不会安装或需要功能定制以及二次开发?
    请QQ联系我们

    发表评论

    还没有评论,快来抢沙发吧!

    如需帝国cms功能定制以及二次开发请联系我们

    联系作者

    请选择支付方式

    ×
    迅虎支付宝
    迅虎微信
    支付宝当面付
    余额支付
    ×
    微信扫码支付 0 元