消息队列

创作人 Leo


编辑时间 Sun Mar 14,2021 at 14:16


消息队列

生产者 producer ,消费者 consumer
producer 产出消息,consumer 订阅自己需要的消息通道,producer 将产出的消息发送到对应的消息通道
Why MQ? 异步 单一进程能够以高吞吐量处理单一任务;解耦 大部分业务不确定性,通过消息队列,让业务模型解耦,比如订单生产者队列Q,现在被支付模块A和风控模块B订阅,B在不需要时可以随时取消订阅,如果用户模块C需要,也可以随时订阅Q;削峰 我们在项目初始一般会进行流量预估,比如预估了50wPV,那么我们就需要对超出流量进行限流,主要是为了保证在特殊的高流量情况下(比如1分钱抢购活动)系统的可用性,使用消息队列将请求缓存,然后由consumer分批依次处理
Why Not? 外部依赖造成的系统可用性降低;提高了系统复杂性;一致性问题,如何确保消息被成功并准确消费
点对点模式:消息只推给一个consumer,不会重复消费
发布-订阅:producer广播消息给所有已经订阅的consumer

ActiveMQ
RabbitMQ
RocketMQ
Kafka
NSQ

Kafka

术语

Broker 集群的一个节点,broker 保存 topic 数据,如果一个 topic 有多个 partition 则分布在多个 broker 上;假设有3个broker和三个partition,则三个partition与broker一一对应;如果有五个broker,则有两个broker空闲;如果有两个broker,则有一个broker负责两个partition,为了性能应该避免partition数量大于broker数量
Topic 一类消息,消费者订阅目标
Partition 每个 topic 至少有一个 partition,可以将 topic 配置成多个 partition,单个 partition 是有序的,如果 topic 被设置成多个 partition 则无法保证消息顺序;因此,如果消息必须有序处理,则必须设置为1个partition

集群消费

每个 consumer 订阅不同 Partition 即可实现集群消费,即集群中订阅了 Partition 的节点会收到不同的数据,从而提升高并发情况下的消费速度

广播消费

订阅了相同 Partition 的 consumer 会收到相同的数据,所以在指定单一 Partition 的情况下,就能够实现所有 consumer 收到全量的消息

入门

启动 Zookeeper
cd ~/Downloads/apache-zookeeper-3.5.6-bin
./bin/zkServer.sh --config ./conf start

启动 Kafka
cd ~/Downloads/kafka_2.12-2.3.0
./bin/kafka-server-start.sh config/server.properties

查看 topic 列表:./bin/kafka-topics.sh --list --bootstrap-server localhost:9092
创建一个 topic:./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic 1-topic-demo
打开生产者交互终端:./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 1-topic-demo
打开一个消费者测试终端:./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 1-topic-demo --from-beginning
为了后续测试,我们创建两个 topic
单一 partition :./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic 1-topic-demo
两个 partition :./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic 2-topic-demo

例,Golang 生产者

我们先写一个生产者供后续程序使用
语言选择是Golang
代码包我们用 sarama

package main

import (
	"github.com/Shopify/sarama"
	"log"
)

func main() {
	config := sarama.NewConfig()
	config.Producer.Return.Successes = true
	client, err := sarama.NewClient([]string{"localhost:9092"}, config)
	if err != nil {
		log.Fatalf("unable to create kafka client: %q", err)
	}

	producer, err := sarama.NewSyncProducerFromClient(client)
	if err != nil {
		log.Fatalf("unable to create kafka producer: %q", err)
	}
	defer producer.Close()

	text := "hello"
	topic := "1-topic-demo"
	_, _, err = producer.SendMessage(&sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)})
	if err != nil {
		log.Fatalf("unable to produce message: %q", err)
	}
}

例,Golang 多进程处理多个 partition

func singlePartition() {

	if len(os.Args)<3 {
		fmt.Println("usage kafka-ser {topic} {partition-id}")
		return
	}

	topic := os.Args[1]
	partition,err := strconv.Atoi(os.Args[2])
	if err != nil {
		panic(err)
	}

	// 根据给定的代理地址和配置创建一个消费者
	consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
	if err != nil {
		panic(err)
	}

	pc, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
	defer pc.AsyncClose()
	wg.Add(1)
	go func(sarama.PartitionConsumer) {
		defer wg.Done()
		//Messages()该方法返回一个消费消息类型的只读通道,由代理产生
		for msg := range pc.Messages() {
			fmt.Printf("%s---Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Topic,msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
		}
	}(pc)

	wg.Wait()
	consumer.Close()
}

开启两个终端,设置分别监听不同的 partition
开启一个 producer ,输入从1到9,观察输出
结论:没有重复消费,不能保证处理顺序
适用于单一任务不能重复执行,但是可以让多个机器处理,做到并发处理多个任务
适用于异步通信(发短信,发邮件,发推送),流量控制,削峰

$ ./kafka-ser 2-topic-demo 0
2-topic-demo---Partition:0, Offset:9, Key:, Value:2
2-topic-demo---Partition:0, Offset:10, Key:, Value:4
2-topic-demo---Partition:0, Offset:11, Key:, Value:6
2-topic-demo---Partition:0, Offset:12, Key:, Value:8

$ ./kafka-ser 2-topic-demo 1
2-topic-demo---Partition:1, Offset:9, Key:, Value:1
2-topic-demo---Partition:1, Offset:10, Key:, Value:3
2-topic-demo---Partition:1, Offset:11, Key:, Value:5
2-topic-demo---Partition:1, Offset:12, Key:, Value:7
2-topic-demo---Partition:1, Offset:13, Key:, Value:9

例,Golang 单进程多Go程处理多个 partition

// 单进程多Go程处理topic,
func multiPartition() {

	if len(os.Args)<2 {
		fmt.Println("usage kafka-ser {topic}")
		return
	}

	topic := os.Args[1]

	// 根据给定的代理地址和配置创建一个消费者
	consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
	if err != nil {
		panic(err)
	}

	//Partitions(topic):该方法返回了该topic的所有分区id
	partitionList, err := consumer.Partitions(topic)
	if err != nil {
		panic(err)
	}

	for partition := range partitionList {
		//ConsumePartition方法根据主题,分区和给定的偏移量创建创建了相应的分区消费者
		//如果该分区消费者已经消费了该信息将会返回error
		//sarama.OffsetNewest:表明了为最新消息
		pc, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
		if err != nil {
			panic(err)
		}
		defer pc.AsyncClose()
		wg.Add(1)
		go func(sarama.PartitionConsumer) {
			defer wg.Done()
			//Messages()该方法返回一个消费消息类型的只读通道,由代理产生
			for msg := range pc.Messages() {
				fmt.Printf("%s---Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Topic,msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
			}
		}(pc)
	}
	wg.Wait()
	consumer.Close()
}

这次同样是订阅有两个 partition 的 topic
不同的是,我们在一个进程中设置了两个两个协程同时处理两个 partition
结论:没有重复消费,不能保证处理顺序
适用于单一任务不能重复执行,但是可以让多个机器处理,做到并发处理多个任务
适用于异步通信(发短信,发邮件,发推送),流量控制,削峰

$ ./kafka-ser 2-topic-demo
2-topic-demo---Partition:0, Offset:13, Key:, Value:1
2-topic-demo---Partition:0, Offset:14, Key:, Value:3
2-topic-demo---Partition:1, Offset:14, Key:, Value:2
2-topic-demo---Partition:1, Offset:15, Key:, Value:4
2-topic-demo---Partition:0, Offset:15, Key:, Value:5
2-topic-demo---Partition:0, Offset:16, Key:, Value:7
2-topic-demo---Partition:1, Offset:16, Key:, Value:6
2-topic-demo---Partition:1, Offset:17, Key:, Value:8
2-topic-demo---Partition:0, Offset:17, Key:, Value:9

这次订阅只有一个 partition 的 topic
结论:没有重复消费,保证处理顺序
适用于严格有序的数据,比如排队系统,抢购系统

leodeMacBook-Pro:kafka-ser leo$ ./kafka-ser 1-topic-demo
1-topic-demo---Partition:0, Offset:10, Key:, Value:1
1-topic-demo---Partition:0, Offset:11, Key:, Value:2
1-topic-demo---Partition:0, Offset:12, Key:, Value:3
1-topic-demo---Partition:0, Offset:13, Key:, Value:4
1-topic-demo---Partition:0, Offset:14, Key:, Value:5
1-topic-demo---Partition:0, Offset:15, Key:, Value:6
1-topic-demo---Partition:0, Offset:16, Key:, Value:7
1-topic-demo---Partition:0, Offset:17, Key:, Value:8
1-topic-demo---Partition:0, Offset:18, Key:, Value:9

消费组

消费组用来自动负载均衡多个消费者,比配置消费固定 Partition 实现的消费负载均衡更容易水平扩展

  1. 设置 GroupID 配置消费组
  2. GroupID 和 Partition 不能同时配置
  3. 客户端配置了 GroupID ,就不能设置 Offset
  4. Topic 下 Partition 的消息只会被组内一个 Consumer 消费(如果创建 TOPIC 时设置的 Partition 是 1 则组内同时只有一个 Consumer 在工作),当然也可以被不同消费组消费
  5. 当 Consumer 离线,服务端会自动将 Partition 的消息分配给其他 Consumer(使用消费组的主要好处在于此,不需要也不能手动管理 offset,多开几个 Consumer 能够防止程序中断造成特定 Partition 无法被消费)
  6. 需要注意的是,当 Consumer 突然中断,服务端需要一段时间才能重新找到组中的可用节点,并将 Partition 代理过去,测试大概几秒钟

例:golang 消费组使用

func fetchMsgWithGroupTest() {
	reader := kafka.NewReader(kafka.ReaderConfig{
		Brokers: []string{"192.168.3.16:9092"},
		Topic:   "test_topic_1021_2",
		//Partition: 0,
		GroupID:  "test_topic_1021_gid_1", // Partition 和 GroupID 不能同时设置
		MinBytes: 1,
		MaxBytes: 10e6,
	})

	//err := reader.SetOffset(1) // panic: unavailable when GroupID is set
	//if err != nil {
	//	panic(err)
	//}

	for {
		msg, err := reader.ReadMessage(context.Background())
		// 使用了消费组的情况下需要用 ReadMessage ,因为它会在内部执行 CommitMessages
		// broker 需要这个操作来确定是否可以保存已完成的 offset
		if err != nil {
			panic(err)
		}
		fmt.Println("msg ", string(msg.Value), " offset ", msg.Offset)
	}
}

测试说明:

  1. 首先创建 Topic,这里指定 2 个 Partition,并打开一个测试的写交互程序
./bin/kafka-topics.sh --create --bootstrap-server 192.168.3.16:9092 --replication-factor 1 --partitions 2 --topic test_topic_1021_2
./bin/kafka-console-producer.sh --broker-list 192.168.3.16:9092 --topic test_topic_1021_2
  1. 编译 go 程序,并同时打开 3 个读进程;由于我们只有两个 Partition ,所以说消费时有一个进程没有消费数据
  2. 关闭一个有消费的进程;服务端会在几秒后自动将 Partition 转移到一个闲置的消费进程
  3. 再次关闭一个有消费的进程;服务端会在几秒后自动将全部两个 Partition 转移到同一个消费进程

PHP 使用 Kafka

安装 php-kafka 扩展 :
安装依赖 librdkafka
Mac OS 可以直接: brew install librdkafka

安装:

$ /usr/local/php/bin/phpize
$ ./configure --with-php-config=/usr/local/php/bin/php-config
$ make
$ sudo make install

安装后记得修改 php.ini 将扩展加进去

代码示例:

<?php

class KafkaDemo {
    public static function consumer() {
        // 生产者
        $objRdKafka = new RdKafka\Consumer();
        $objRdKafka->setLogLevel(LOG_DEBUG);
        $objRdKafka->addBrokers("localhost:9092");

        $oObjTopic = $objRdKafka->newTopic("1-topic-demo");

        /**
         * consumeStart
         *   第一个参数标识分区,生产者是往分区0发送的消息,这里也从分区0拉取消息
         *   第二个参数标识从什么位置开始拉取消息,可选值为
         *     RD_KAFKA_OFFSET_BEGINNING : 从开始拉取消息
         *     RD_KAFKA_OFFSET_END : 从当前位置开始拉取消息
         *     RD_KAFKA_OFFSET_STORED : 猜测跟RD_KAFKA_OFFSET_END一样
         */
        $oObjTopic->consumeStart(0, RD_KAFKA_OFFSET_END);

        while (true) {
            // 第一个参数是分区,第二个参数是超时时间
            $oMsg = $oObjTopic->consume(0, 1000);

            // 没拉取到消息时,返回NULL
            if (!$oMsg) {
                usleep(10000);
                continue;
            }

            if ($oMsg->err) {
                echo $msg->errstr(), "\n";
                break;
            } else {
                echo $oMsg->payload, "\n";
            }
        }
    }

    public static function producer2() {
        // 这个设置了 debug = all 会打印更详细的调用信息

        $conf = new RdKafka\Conf();
        $conf->set('log_level', (string) LOG_DEBUG);
        $conf->set('debug', 'all');
        $rk = new RdKafka\Producer($conf);
        $rk->addBrokers("127.0.0.1:9092");

        $topic = $rk->newTopic("1-topic-demo");
        $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message payload");

        sleep(3); // kafka 是异步的这里让程序暂停,等待消息送达并回传
        echo "done\n";
    }


    public static function producer() {
        // 以阻塞的方式从终端读取输入,并发送到消息队列

        $objRdKafka = new RdKafka\Producer();
        $objRdKafka->setLogLevel(LOG_DEBUG);
        $objRdKafka->addBrokers("localhost:9092");

        $oObjTopic = $objRdKafka->newTopic("1-topic-demo");

        // 从终端接收输入 
        $oInputHandler = fopen('php://stdin', 'r');

        while (true) {
            echo "\nEnter  messages:\n";
            $sMsg = trim(fgets($oInputHandler));

            // 空消息意味着退出
            if (empty($sMsg)) {
                break;
            }

            // 发送消息
            $oObjTopic->produce(RD_KAFKA_PARTITION_UA, 0, $sMsg);
        }

        echo "done\n";
    }

}

$act = $argv[1] ;

KafkaDemo::$act();

运行方法:
开启消费者
php ./kafkademo.php consumer
开启持续生产者
php ./kafkademo.php producer
单次生产,查看详细Debug日志 php ./kafkademo.php producer2

Clang kafka 生产者

#include "common.h"
#include <librdkafka/rdkafka.h>

rd_kafka_t *rk;

void initMessageQueue()
{
    char errstr[512];
    rd_kafka_conf_t *conf = rd_kafka_conf_new();

    if (rd_kafka_conf_set(conf, "bootstrap.servers", server->kafkaBroker,
                          errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
    {
        serverLog(LL_ERROR, "rd_kafka_conf_set failed %s", errstr);
        exit(1);
    }

    rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
    if (!rk)
    {
        serverLog(LL_ERROR, "rd_kafka_conf_set Failed to create new producer: %s", errstr);
        exit(1);
    }
}

void mqSendMsg(char *topic, char *msg, size_t len, char *key, size_t klen)
{

    rd_kafka_resp_err_t err;
    rd_kafka_topic_t *rkt;

retry:

    err = rd_kafka_producev(
        /* Producer handle */
        rk,
        /* Topic name */
        RD_KAFKA_V_TOPIC(topic),
        /* Make a copy of the payload. */
        RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
        /* Message value and length */
        RD_KAFKA_V_VALUE(msg, len),
        /* Per-Message opaque, provided in
                * delivery report callback as
                * msg_opaque. */
        RD_KAFKA_V_OPAQUE(NULL),
        /* Message key to split MSG to partitions 
        		* key 可以用来自动分 partition */
        RD_KAFKA_V_KEY(key, klen),
        /* Unassigned partition 
        		* 需要通过 key 分 partition 的情况下,这里必须设置 RD_KAFKA_PARTITION_UA */
        RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA),
        /* End sentinel */
        RD_KAFKA_V_END);

    if (err)
    {
        serverLog(LL_ERROR, "failed to produce to topic %s: %s", topic, rd_kafka_err2str(err));
        if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL)
        {
            serverLog(LL_INFO, "internal queue is full");
        }

        // waiting for seconds and retry ; to avoid consume Message from redis MQ or it will lost Message.
        rd_kafka_poll(rk, 1000 /*block for max 1000ms*/);
        goto retry;
    }
    else
    {
        serverLog(LL_DEBUG, "MQ:enqueued message %s", msg);
    }

    rd_kafka_poll(rk, 0 /*non-blocking*/);
}

多 broker 集群

http://kafka.apache.org/quickstart#quickstart_multibroker

RocketMQ

官方文档

从官方网站下载最新版 RocketMQ 二进制可执行文件,并进入

linux(mac) 运行:

  1. 开启名字服务
    sh bin/mqnamesrv

  2. 开启单机 broker
    sh bin/mqbroker -n localhost:9876

简单测试

运行发送、接收消息的示例之前,需要为 client 提供名字服务器地址
RocketMQ 提供多种方式为 client 提供名字服务器地址
这里我们通过设置环境变量的方式实现

export NAMESRV_ADDR=localhost:9876

接收消息

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

发送消息

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

NSQ


阅读:880
搜索
  • Linux 高性能网络编程库 Libevent 简介和示例 2557
  • Mac系统编译PHP7【20190929更新】 2290
  • zksync 和 layer2 2191
  • web rtc 学习笔记(一) 2172
  • Hadoop 高可用集群搭建 (Hadoop HA) 2160
  • Hadoop Map Reduce 案例:好友推荐 2104
  • react 学习笔记(一) 2065
  • Linux 常用命令 2057
  • 小白鼠问题 2040
  • 安徽黄山游 2038
简介
不定期分享软件开发经验,生活经验