消息队列

创作人 Leo


编辑时间 Wed Aug 26,2020 at 17:49


消息队列

生产者 producer ,消费者 consumer
producer 产出消息,consumer 订阅自己需要的消息通道,producer 将产出的消息发送到对应的消息通道
Why MQ? 异步 单一进程能够以高吞吐量处理单一任务;解耦 大部分业务不确定性,通过消息队列,让业务模型解耦,比如订单生产者队列Q,现在被支付模块A和风控模块B订阅,B在不需要时可以随时取消订阅,如果用户模块C需要,也可以随时订阅Q;削峰 我们在项目初始一般会进行流量预估,比如预估了50wPV,那么我们就需要对超出流量进行限流,主要是为了保证在特殊的高流量情况下(比如1分钱抢购活动)系统的可用性,使用消息队列将请求缓存,然后由consumer分批依次处理
Why Not? 外部依赖造成的系统可用性降低;提高了系统复杂性;一致性问题,如何确保消息被成功并准确消费
点对点模式:消息只推给一个consumer,不会重复消费
发布-订阅:producer广播消息给所有已经订阅的consumer

ActiveMQ
RabbitMQ
RocketMQ
Kafka
NSQ

Kafka

术语

Broker 集群的一个节点,broker 保存 topic 数据,如果一个 topic 有多个 partition 则分布在多个 broker 上;假设有3个broker和三个partition,则三个partition与broker一一对应;如果有五个broker,则有两个broker空闲;如果有两个broker,则有一个broker负责两个partition,为了性能应该避免partition数量大于broker数量
Topic 一类消息,消费者订阅目标
Partition 每个 topic 至少有一个 partition,可以将 topic 配置成多个 partition,单个 partition 是有序的,如果 topic 被设置成多个 partition 则无法保证消息顺序;因此,如果消息必须有序处理,则必须设置为1个partition

集群消费

每个 consumer 订阅不同 Partition 即可实现集群消费,即集群中订阅了 Partition 的节点会收到不同的数据,从而提升高并发情况下的消费速度

广播消费

订阅了相同 Partition 的 consumer 会收到相同的数据,所以在指定单一 Partition 的情况下,就能够实现所有 consumer 收到全量的消息

入门

启动 Zookeeper
cd ~/Downloads/apache-zookeeper-3.5.6-bin
./bin/zkServer.sh --config ./conf start

启动 Kafka
cd ~/Downloads/kafka_2.12-2.3.0
./bin/kafka-server-start.sh config/server.properties

查看 topic 列表:./bin/kafka-topics.sh --list --bootstrap-server localhost:9092
创建一个 topic:./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic 1-topic-demo
打开生产者交互终端:./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 1-topic-demo
打开一个消费者测试终端:./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 1-topic-demo --from-beginning
为了后续测试,我们创建两个 topic
单一 partition :./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic 1-topic-demo
两个 partition :./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic 2-topic-demo

例,Golang 生产者

我们先写一个生产者供后续程序使用
语言选择是Golang
代码包我们用 sarama

package main

import (
	"github.com/Shopify/sarama"
	"log"
)

func main() {
	config := sarama.NewConfig()
	config.Producer.Return.Successes = true
	client, err := sarama.NewClient([]string{"localhost:9092"}, config)
	if err != nil {
		log.Fatalf("unable to create kafka client: %q", err)
	}

	producer, err := sarama.NewSyncProducerFromClient(client)
	if err != nil {
		log.Fatalf("unable to create kafka producer: %q", err)
	}
	defer producer.Close()

	text := "hello"
	topic := "1-topic-demo"
	_, _, err = producer.SendMessage(&sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)})
	if err != nil {
		log.Fatalf("unable to produce message: %q", err)
	}
}

例,Golang 多进程处理多个 partition

func singlePartition() {

	if len(os.Args)<3 {
		fmt.Println("usage kafka-ser {topic} {partition-id}")
		return
	}

	topic := os.Args[1]
	partition,err := strconv.Atoi(os.Args[2])
	if err != nil {
		panic(err)
	}

	// 根据给定的代理地址和配置创建一个消费者
	consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
	if err != nil {
		panic(err)
	}

	pc, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
	defer pc.AsyncClose()
	wg.Add(1)
	go func(sarama.PartitionConsumer) {
		defer wg.Done()
		//Messages()该方法返回一个消费消息类型的只读通道,由代理产生
		for msg := range pc.Messages() {
			fmt.Printf("%s---Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Topic,msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
		}
	}(pc)

	wg.Wait()
	consumer.Close()
}

开启两个终端,设置分别监听不同的 partition
开启一个 producer ,输入从1到9,观察输出
结论:没有重复消费,不能保证处理顺序
适用于单一任务不能重复执行,但是可以让多个机器处理,做到并发处理多个任务
适用于异步通信(发短信,发邮件,发推送),流量控制,削峰

$ ./kafka-ser 2-topic-demo 0
2-topic-demo---Partition:0, Offset:9, Key:, Value:2
2-topic-demo---Partition:0, Offset:10, Key:, Value:4
2-topic-demo---Partition:0, Offset:11, Key:, Value:6
2-topic-demo---Partition:0, Offset:12, Key:, Value:8

$ ./kafka-ser 2-topic-demo 1
2-topic-demo---Partition:1, Offset:9, Key:, Value:1
2-topic-demo---Partition:1, Offset:10, Key:, Value:3
2-topic-demo---Partition:1, Offset:11, Key:, Value:5
2-topic-demo---Partition:1, Offset:12, Key:, Value:7
2-topic-demo---Partition:1, Offset:13, Key:, Value:9

例,Golang 单进程多Go程处理多个 partition

// 单进程多Go程处理topic,
func multiPartition() {

	if len(os.Args)<2 {
		fmt.Println("usage kafka-ser {topic}")
		return
	}

	topic := os.Args[1]

	// 根据给定的代理地址和配置创建一个消费者
	consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
	if err != nil {
		panic(err)
	}

	//Partitions(topic):该方法返回了该topic的所有分区id
	partitionList, err := consumer.Partitions(topic)
	if err != nil {
		panic(err)
	}

	for partition := range partitionList {
		//ConsumePartition方法根据主题,分区和给定的偏移量创建创建了相应的分区消费者
		//如果该分区消费者已经消费了该信息将会返回error
		//sarama.OffsetNewest:表明了为最新消息
		pc, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
		if err != nil {
			panic(err)
		}
		defer pc.AsyncClose()
		wg.Add(1)
		go func(sarama.PartitionConsumer) {
			defer wg.Done()
			//Messages()该方法返回一个消费消息类型的只读通道,由代理产生
			for msg := range pc.Messages() {
				fmt.Printf("%s---Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Topic,msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
			}
		}(pc)
	}
	wg.Wait()
	consumer.Close()
}

这次同样是订阅有两个 partition 的 topic
不同的是,我们在一个进程中设置了两个两个协程同时处理两个 partition
结论:没有重复消费,不能保证处理顺序
适用于单一任务不能重复执行,但是可以让多个机器处理,做到并发处理多个任务
适用于异步通信(发短信,发邮件,发推送),流量控制,削峰

$ ./kafka-ser 2-topic-demo
2-topic-demo---Partition:0, Offset:13, Key:, Value:1
2-topic-demo---Partition:0, Offset:14, Key:, Value:3
2-topic-demo---Partition:1, Offset:14, Key:, Value:2
2-topic-demo---Partition:1, Offset:15, Key:, Value:4
2-topic-demo---Partition:0, Offset:15, Key:, Value:5
2-topic-demo---Partition:0, Offset:16, Key:, Value:7
2-topic-demo---Partition:1, Offset:16, Key:, Value:6
2-topic-demo---Partition:1, Offset:17, Key:, Value:8
2-topic-demo---Partition:0, Offset:17, Key:, Value:9

这次订阅只有一个 partition 的 topic
结论:没有重复消费,保证处理顺序
适用于严格有序的数据,比如排队系统,抢购系统

leodeMacBook-Pro:kafka-ser leo$ ./kafka-ser 1-topic-demo
1-topic-demo---Partition:0, Offset:10, Key:, Value:1
1-topic-demo---Partition:0, Offset:11, Key:, Value:2
1-topic-demo---Partition:0, Offset:12, Key:, Value:3
1-topic-demo---Partition:0, Offset:13, Key:, Value:4
1-topic-demo---Partition:0, Offset:14, Key:, Value:5
1-topic-demo---Partition:0, Offset:15, Key:, Value:6
1-topic-demo---Partition:0, Offset:16, Key:, Value:7
1-topic-demo---Partition:0, Offset:17, Key:, Value:8
1-topic-demo---Partition:0, Offset:18, Key:, Value:9

PHP 使用 Kafka

安装 php-kafka 扩展 :
安装依赖 librdkafka
Mac OS 可以直接: brew install librdkafka

安装:

$ /usr/local/php/bin/phpize
$ ./configure --with-php-config=/usr/local/php/bin/php-config
$ make
$ sudo make install

安装后记得修改 php.ini 将扩展加进去

代码示例:

<?php

class KafkaDemo {
    public static function consumer() {
        // 生产者
        $objRdKafka = new RdKafka\Consumer();
        $objRdKafka->setLogLevel(LOG_DEBUG);
        $objRdKafka->addBrokers("localhost:9092");

        $oObjTopic = $objRdKafka->newTopic("1-topic-demo");

        /**
         * consumeStart
         *   第一个参数标识分区,生产者是往分区0发送的消息,这里也从分区0拉取消息
         *   第二个参数标识从什么位置开始拉取消息,可选值为
         *     RD_KAFKA_OFFSET_BEGINNING : 从开始拉取消息
         *     RD_KAFKA_OFFSET_END : 从当前位置开始拉取消息
         *     RD_KAFKA_OFFSET_STORED : 猜测跟RD_KAFKA_OFFSET_END一样
         */
        $oObjTopic->consumeStart(0, RD_KAFKA_OFFSET_END);

        while (true) {
            // 第一个参数是分区,第二个参数是超时时间
            $oMsg = $oObjTopic->consume(0, 1000);

            // 没拉取到消息时,返回NULL
            if (!$oMsg) {
                usleep(10000);
                continue;
            }

            if ($oMsg->err) {
                echo $msg->errstr(), "\n";
                break;
            } else {
                echo $oMsg->payload, "\n";
            }
        }
    }

    public static function producer2() {
        // 这个设置了 debug = all 会打印更详细的调用信息

        $conf = new RdKafka\Conf();
        $conf->set('log_level', (string) LOG_DEBUG);
        $conf->set('debug', 'all');
        $rk = new RdKafka\Producer($conf);
        $rk->addBrokers("127.0.0.1:9092");

        $topic = $rk->newTopic("1-topic-demo");
        $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message payload");

        sleep(3); // kafka 是异步的这里让程序暂停,等待消息送达并回传
        echo "done\n";
    }


    public static function producer() {
        // 以阻塞的方式从终端读取输入,并发送到消息队列

        $objRdKafka = new RdKafka\Producer();
        $objRdKafka->setLogLevel(LOG_DEBUG);
        $objRdKafka->addBrokers("localhost:9092");

        $oObjTopic = $objRdKafka->newTopic("1-topic-demo");

        // 从终端接收输入 
        $oInputHandler = fopen('php://stdin', 'r');

        while (true) {
            echo "\nEnter  messages:\n";
            $sMsg = trim(fgets($oInputHandler));

            // 空消息意味着退出
            if (empty($sMsg)) {
                break;
            }

            // 发送消息
            $oObjTopic->produce(RD_KAFKA_PARTITION_UA, 0, $sMsg);
        }

        echo "done\n";
    }

}

$act = $argv[1] ;

KafkaDemo::$act();

运行方法:
开启消费者
php ./kafkademo.php consumer
开启持续生产者
php ./kafkademo.php producer
单次生产,查看详细Debug日志 php ./kafkademo.php producer2

多 broker 集群

http://kafka.apache.org/quickstart#quickstart_multibroker

RocketMQ

官方文档

从官方网站下载最新版 RocketMQ 二进制可执行文件,并进入

linux(mac) 运行:

  1. 开启名字服务
    sh bin/mqnamesrv

  2. 开启单机 broker
    sh bin/mqbroker -n localhost:9876

简单测试

运行发送、接收消息的示例之前,需要为 client 提供名字服务器地址
RocketMQ 提供多种方式为 client 提供名字服务器地址
这里我们通过设置环境变量的方式实现

export NAMESRV_ADDR=localhost:9876

接收消息

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

发送消息

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

NSQ


阅读:82
搜索
  • Linux 高性能网络编程库 Libevent 简介和示例 1927
  • Mac系统编译PHP7【20190929更新】 1796
  • Windows 安装Swoole 1558
  • Hadoop 高可用集群搭建 (Hadoop HA) 1472
  • Hadoop 高可用YARN 配置 1402
  • 小白鼠问题 1324
  • Hadoop Map Reduce 案例:好友推荐 1267
  • 自动化测试工具 Selenium 1131
  • GIT 分支管理 1064
  • Golang 使用 Grpc 985
简介
不定期分享软件开发经验,生活经验