egg-mqc
ampq-plugin
Last updated 3 months ago by adrian82 .
MIT · Repository · Bugs · Original npm · Tarball · package.json
$ cnpm install egg-mqc 
SYNC missed versions from official npm registry.

egg-mqc

NPM version build status Test coverage David deps Known Vulnerabilities npm download

Install

$ npm i egg-mqc --save

Usage

// {app_root}/config/plugin.js
exports.mqc = {
  enable: true,
  package: 'egg-mqc',
};

Configuration

// {app_root}/config/config.default.js
exports.amqp = {
  protocol: 'amqp',
  hostname: 'localhost',
  port: 5672,
  username: 'Thomas',
  password: 'Thomas.A.Edison',
  vhost: '/',
  // http://www.squaremobius.net/amqp.node/ssl.html
  // opts: {
  //   cert: certificateAsBuffer,      client cert
  //   key: privateKeyAsBuffer,        client key
  //   passphrase: 'MySecretPassword', passphrase for key
  //   ca: [caCertAsBuffer],           array of trusted CA certs
  // },
  pool: {
    max: 10, // maximum size of the pool
    min: 2, // minimum size of the pool
    acquireTimeoutMillis?: number,
  },
};

see config/config.default.js for more detail.

Example

  • app/extends/application.ts
  /**
   * publish data to topic 
   * @param this 
   * @param topicKey 
   * @param data 
   */
  mqPublish(this: Application, topicKey: string, data: Buffer) {
    const exchange = 'amq.topic';
    return this.amqp()
    .then(conn => conn.createChannel())
    .then(channel => {
      const result = channel.publish(exchange, topicKey, data, { persistent: true });
      channel.close();
      return result;
    })
    .catch(() => false);
  }
  • publish message to topic
    const rst = await this.app.mqPublish('this_is_your_topic', Buffer.from('hello there'));
  • consume in app.ts
  async serverDidReady() {
    // Server is listening.
    subscribe(this.app, "#", "fund.queue", (channel: Channel, msg: ConsumeMessage | null) => {
      if (msg) {
        let text = msg.content.toString();
        let { exchange, routingKey } = msg.fields;
        this.app.logger.info(` - [${exchange}, ${routingKey}] ${text}`);
        channel.ack(msg);
      }
    });
  }

define AmqpConsumer in file app/lib/amqpconsumer

import { Connection, ConsumeMessage, Channel } from 'amqplib';
import { Application } from 'egg';

export class AmqpConsumer {
  private readonly tag: string;
  private readonly app: Application;
  private handler: (conn: Connection, tag: string) => {};

  constructor(app: Application, handler, tag?: string) {
    this.tag = tag && `[${tag}]` || '';
    this.app = app;
    this.handler = handler;
    this.process();
  }

  public process() {
    this.app.logger.info(`${this.tag} start consume...`);
    this.app.amqp().then(conn => {
      conn.on('close', () => {
        this.app.logger.info(`${this.tag} consume is closed!!!`);
        this.process();
      });
      return conn;
    }).then(conn => {
      this.app.logger.info(`${this.tag} consuming`);
      this.handler(conn, this.tag);
    }).catch(() => {
      this.app.logger.info(`${this.tag} error!`);
      this.process();
    });
  }
}

/**
 * 订阅消息
 * 使用示例
 * subscribe(this.app, "#", "fund.queue", (channel: Channel, msg: ConsumeMessage | null) => {
      if (msg) {
        let text = msg.content.toString();
        let { exchange, routingKey } = msg.fields;
        this.app.logger.info(`[${exchange}, ${routingKey}] ${text}`);
        channel.ack(msg);
      }
    });
 * @param app 
 * @param topic 主题名字
 * @param queue 消息队列名字
 * @param handler 订阅处理,处理完消息务必调用channel.ack(msg)
 */
export function subscribe(app: Application, topic: string, queue: string, handler: (channel: Channel, msg: ConsumeMessage | null) => void) {
  new AmqpConsumer(app, (conn) => {
    const exchange = 'amq.topic';
    conn.createChannel().then((channel => {
      // 消费时,绑定队列到exchange
      channel.assertExchange(exchange, 'topic', { durable: true });
      channel.assertQueue(queue, { durable: true });
      channel.bindQueue(queue, exchange, topic);
      channel.consume(queue, (msg) => {
        try {
          handler(channel, msg);
        } catch (err) {
          app.logger.error(`consume ${queue} error ${err}`);
        }
      });
    }))
  }, topic);
}

Questions & Suggestions

Please open an issue here.

License

MIT

Current Tags

  • 1.2.2                                ...           latest (3 months ago)

8 Versions

  • 1.2.2                                ...           3 months ago
  • 1.2.1                                ...           5 months ago
  • 1.2.0                                ...           5 months ago
  • 1.1.3                                ...           5 months ago
  • 1.1.2                                ...           5 months ago
  • 1.1.0                                ...           5 months ago
  • 1.0.2                                ...           5 months ago
  • 1.0.0                                ...           5 months ago
Maintainers (1)
Downloads
Today 0
This Week 9
This Month 10
Last Day 0
Last Week 1
Last Month 6
Dependencies (4)
Dev Dependencies (8)
Dependents (0)
None

Copyright 2014 - 2016 © taobao.org |