创作人 Leo
编辑时间 Sun Mar 14,2021 at 14:16
生产者 producer ,消费者 consumer
producer 产出消息,consumer 订阅自己需要的消息通道,producer 将产出的消息发送到对应的消息通道
Why MQ? 异步 单一进程能够以高吞吐量处理单一任务;解耦 大部分业务不确定性,通过消息队列,让业务模型解耦,比如订单生产者队列Q,现在被支付模块A和风控模块B订阅,B在不需要时可以随时取消订阅,如果用户模块C需要,也可以随时订阅Q;削峰 我们在项目初始一般会进行流量预估,比如预估了50wPV,那么我们就需要对超出流量进行限流,主要是为了保证在特殊的高流量情况下(比如1分钱抢购活动)系统的可用性,使用消息队列将请求缓存,然后由consumer分批依次处理
Why Not? 外部依赖造成的系统可用性降低;提高了系统复杂性;一致性问题,如何确保消息被成功并准确消费
点对点模式:消息只推给一个consumer,不会重复消费
发布-订阅:producer广播消息给所有已经订阅的consumerActiveMQ
RabbitMQ
RocketMQ
Kafka
NSQ
Broker 集群的一个节点,broker 保存 topic 数据,如果一个 topic 有多个 partition 则分布在多个 broker 上;假设有3个broker和三个partition,则三个partition与broker一一对应;如果有五个broker,则有两个broker空闲;如果有两个broker,则有一个broker负责两个partition,为了性能应该避免partition数量大于broker数量
Topic 一类消息,消费者订阅目标
Partition 每个 topic 至少有一个 partition,可以将 topic 配置成多个 partition,单个 partition 是有序的,如果 topic 被设置成多个 partition 则无法保证消息顺序;因此,如果消息必须有序处理,则必须设置为1个partition
每个 consumer 订阅不同 Partition 即可实现集群消费,即集群中订阅了 Partition 的节点会收到不同的数据,从而提升高并发情况下的消费速度
订阅了相同 Partition 的 consumer 会收到相同的数据,所以在指定单一 Partition 的情况下,就能够实现所有 consumer 收到全量的消息
启动 Zookeeper
cd ~/Downloads/apache-zookeeper-3.5.6-bin
./bin/zkServer.sh --config ./conf start
启动 Kafka
cd ~/Downloads/kafka_2.12-2.3.0
./bin/kafka-server-start.sh config/server.properties
查看 topic 列表:
./bin/kafka-topics.sh --list --bootstrap-server localhost:9092
创建一个 topic:./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic 1-topic-demo
打开生产者交互终端:./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 1-topic-demo
打开一个消费者测试终端:./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 1-topic-demo --from-beginning
为了后续测试,我们创建两个 topic
单一 partition :./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic 1-topic-demo
两个 partition :./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic 2-topic-demo
我们先写一个生产者供后续程序使用
语言选择是Golang
代码包我们用 sarama
package main
import (
"github.com/Shopify/sarama"
"log"
)
func main() {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
client, err := sarama.NewClient([]string{"localhost:9092"}, config)
if err != nil {
log.Fatalf("unable to create kafka client: %q", err)
}
producer, err := sarama.NewSyncProducerFromClient(client)
if err != nil {
log.Fatalf("unable to create kafka producer: %q", err)
}
defer producer.Close()
text := "hello"
topic := "1-topic-demo"
_, _, err = producer.SendMessage(&sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)})
if err != nil {
log.Fatalf("unable to produce message: %q", err)
}
}
func singlePartition() {
if len(os.Args)<3 {
fmt.Println("usage kafka-ser {topic} {partition-id}")
return
}
topic := os.Args[1]
partition,err := strconv.Atoi(os.Args[2])
if err != nil {
panic(err)
}
// 根据给定的代理地址和配置创建一个消费者
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
panic(err)
}
pc, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
defer pc.AsyncClose()
wg.Add(1)
go func(sarama.PartitionConsumer) {
defer wg.Done()
//Messages()该方法返回一个消费消息类型的只读通道,由代理产生
for msg := range pc.Messages() {
fmt.Printf("%s---Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Topic,msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
}
}(pc)
wg.Wait()
consumer.Close()
}
开启两个终端,设置分别监听不同的 partition
开启一个 producer ,输入从1到9,观察输出
结论:没有重复消费,不能保证处理顺序
适用于单一任务不能重复执行,但是可以让多个机器处理,做到并发处理多个任务
适用于异步通信(发短信,发邮件,发推送),流量控制,削峰
$ ./kafka-ser 2-topic-demo 0
2-topic-demo---Partition:0, Offset:9, Key:, Value:2
2-topic-demo---Partition:0, Offset:10, Key:, Value:4
2-topic-demo---Partition:0, Offset:11, Key:, Value:6
2-topic-demo---Partition:0, Offset:12, Key:, Value:8
$ ./kafka-ser 2-topic-demo 1
2-topic-demo---Partition:1, Offset:9, Key:, Value:1
2-topic-demo---Partition:1, Offset:10, Key:, Value:3
2-topic-demo---Partition:1, Offset:11, Key:, Value:5
2-topic-demo---Partition:1, Offset:12, Key:, Value:7
2-topic-demo---Partition:1, Offset:13, Key:, Value:9
// 单进程多Go程处理topic,
func multiPartition() {
if len(os.Args)<2 {
fmt.Println("usage kafka-ser {topic}")
return
}
topic := os.Args[1]
// 根据给定的代理地址和配置创建一个消费者
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
panic(err)
}
//Partitions(topic):该方法返回了该topic的所有分区id
partitionList, err := consumer.Partitions(topic)
if err != nil {
panic(err)
}
for partition := range partitionList {
//ConsumePartition方法根据主题,分区和给定的偏移量创建创建了相应的分区消费者
//如果该分区消费者已经消费了该信息将会返回error
//sarama.OffsetNewest:表明了为最新消息
pc, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
if err != nil {
panic(err)
}
defer pc.AsyncClose()
wg.Add(1)
go func(sarama.PartitionConsumer) {
defer wg.Done()
//Messages()该方法返回一个消费消息类型的只读通道,由代理产生
for msg := range pc.Messages() {
fmt.Printf("%s---Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Topic,msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
}
}(pc)
}
wg.Wait()
consumer.Close()
}
这次同样是订阅有两个 partition 的 topic
不同的是,我们在一个进程中设置了两个两个协程同时处理两个 partition
结论:没有重复消费,不能保证处理顺序
适用于单一任务不能重复执行,但是可以让多个机器处理,做到并发处理多个任务
适用于异步通信(发短信,发邮件,发推送),流量控制,削峰
$ ./kafka-ser 2-topic-demo
2-topic-demo---Partition:0, Offset:13, Key:, Value:1
2-topic-demo---Partition:0, Offset:14, Key:, Value:3
2-topic-demo---Partition:1, Offset:14, Key:, Value:2
2-topic-demo---Partition:1, Offset:15, Key:, Value:4
2-topic-demo---Partition:0, Offset:15, Key:, Value:5
2-topic-demo---Partition:0, Offset:16, Key:, Value:7
2-topic-demo---Partition:1, Offset:16, Key:, Value:6
2-topic-demo---Partition:1, Offset:17, Key:, Value:8
2-topic-demo---Partition:0, Offset:17, Key:, Value:9
这次订阅只有一个 partition 的 topic
结论:没有重复消费,保证处理顺序
适用于严格有序的数据,比如排队系统,抢购系统
leodeMacBook-Pro:kafka-ser leo$ ./kafka-ser 1-topic-demo
1-topic-demo---Partition:0, Offset:10, Key:, Value:1
1-topic-demo---Partition:0, Offset:11, Key:, Value:2
1-topic-demo---Partition:0, Offset:12, Key:, Value:3
1-topic-demo---Partition:0, Offset:13, Key:, Value:4
1-topic-demo---Partition:0, Offset:14, Key:, Value:5
1-topic-demo---Partition:0, Offset:15, Key:, Value:6
1-topic-demo---Partition:0, Offset:16, Key:, Value:7
1-topic-demo---Partition:0, Offset:17, Key:, Value:8
1-topic-demo---Partition:0, Offset:18, Key:, Value:9
消费组用来自动负载均衡多个消费者,比配置消费固定 Partition 实现的消费负载均衡更容易水平扩展
例:golang 消费组使用
func fetchMsgWithGroupTest() {
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"192.168.3.16:9092"},
Topic: "test_topic_1021_2",
//Partition: 0,
GroupID: "test_topic_1021_gid_1", // Partition 和 GroupID 不能同时设置
MinBytes: 1,
MaxBytes: 10e6,
})
//err := reader.SetOffset(1) // panic: unavailable when GroupID is set
//if err != nil {
// panic(err)
//}
for {
msg, err := reader.ReadMessage(context.Background())
// 使用了消费组的情况下需要用 ReadMessage ,因为它会在内部执行 CommitMessages
// broker 需要这个操作来确定是否可以保存已完成的 offset
if err != nil {
panic(err)
}
fmt.Println("msg ", string(msg.Value), " offset ", msg.Offset)
}
}
测试说明:
./bin/kafka-topics.sh --create --bootstrap-server 192.168.3.16:9092 --replication-factor 1 --partitions 2 --topic test_topic_1021_2
./bin/kafka-console-producer.sh --broker-list 192.168.3.16:9092 --topic test_topic_1021_2
安装 php-kafka 扩展 :
安装依赖 librdkafka
Mac OS 可以直接: brew install librdkafka
安装:
$ /usr/local/php/bin/phpize
$ ./configure --with-php-config=/usr/local/php/bin/php-config
$ make
$ sudo make install
安装后记得修改 php.ini 将扩展加进去
代码示例:
<?php
class KafkaDemo {
public static function consumer() {
// 生产者
$objRdKafka = new RdKafka\Consumer();
$objRdKafka->setLogLevel(LOG_DEBUG);
$objRdKafka->addBrokers("localhost:9092");
$oObjTopic = $objRdKafka->newTopic("1-topic-demo");
/**
* consumeStart
* 第一个参数标识分区,生产者是往分区0发送的消息,这里也从分区0拉取消息
* 第二个参数标识从什么位置开始拉取消息,可选值为
* RD_KAFKA_OFFSET_BEGINNING : 从开始拉取消息
* RD_KAFKA_OFFSET_END : 从当前位置开始拉取消息
* RD_KAFKA_OFFSET_STORED : 猜测跟RD_KAFKA_OFFSET_END一样
*/
$oObjTopic->consumeStart(0, RD_KAFKA_OFFSET_END);
while (true) {
// 第一个参数是分区,第二个参数是超时时间
$oMsg = $oObjTopic->consume(0, 1000);
// 没拉取到消息时,返回NULL
if (!$oMsg) {
usleep(10000);
continue;
}
if ($oMsg->err) {
echo $msg->errstr(), "\n";
break;
} else {
echo $oMsg->payload, "\n";
}
}
}
public static function producer2() {
// 这个设置了 debug = all 会打印更详细的调用信息
$conf = new RdKafka\Conf();
$conf->set('log_level', (string) LOG_DEBUG);
$conf->set('debug', 'all');
$rk = new RdKafka\Producer($conf);
$rk->addBrokers("127.0.0.1:9092");
$topic = $rk->newTopic("1-topic-demo");
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message payload");
sleep(3); // kafka 是异步的这里让程序暂停,等待消息送达并回传
echo "done\n";
}
public static function producer() {
// 以阻塞的方式从终端读取输入,并发送到消息队列
$objRdKafka = new RdKafka\Producer();
$objRdKafka->setLogLevel(LOG_DEBUG);
$objRdKafka->addBrokers("localhost:9092");
$oObjTopic = $objRdKafka->newTopic("1-topic-demo");
// 从终端接收输入
$oInputHandler = fopen('php://stdin', 'r');
while (true) {
echo "\nEnter messages:\n";
$sMsg = trim(fgets($oInputHandler));
// 空消息意味着退出
if (empty($sMsg)) {
break;
}
// 发送消息
$oObjTopic->produce(RD_KAFKA_PARTITION_UA, 0, $sMsg);
}
echo "done\n";
}
}
$act = $argv[1] ;
KafkaDemo::$act();
运行方法:
开启消费者
php ./kafkademo.php consumer
开启持续生产者
php ./kafkademo.php producer
单次生产,查看详细Debug日志
php ./kafkademo.php producer2
#include "common.h"
#include <librdkafka/rdkafka.h>
rd_kafka_t *rk;
void initMessageQueue()
{
char errstr[512];
rd_kafka_conf_t *conf = rd_kafka_conf_new();
if (rd_kafka_conf_set(conf, "bootstrap.servers", server->kafkaBroker,
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
{
serverLog(LL_ERROR, "rd_kafka_conf_set failed %s", errstr);
exit(1);
}
rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!rk)
{
serverLog(LL_ERROR, "rd_kafka_conf_set Failed to create new producer: %s", errstr);
exit(1);
}
}
void mqSendMsg(char *topic, char *msg, size_t len, char *key, size_t klen)
{
rd_kafka_resp_err_t err;
rd_kafka_topic_t *rkt;
retry:
err = rd_kafka_producev(
/* Producer handle */
rk,
/* Topic name */
RD_KAFKA_V_TOPIC(topic),
/* Make a copy of the payload. */
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
/* Message value and length */
RD_KAFKA_V_VALUE(msg, len),
/* Per-Message opaque, provided in
* delivery report callback as
* msg_opaque. */
RD_KAFKA_V_OPAQUE(NULL),
/* Message key to split MSG to partitions
* key 可以用来自动分 partition */
RD_KAFKA_V_KEY(key, klen),
/* Unassigned partition
* 需要通过 key 分 partition 的情况下,这里必须设置 RD_KAFKA_PARTITION_UA */
RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA),
/* End sentinel */
RD_KAFKA_V_END);
if (err)
{
serverLog(LL_ERROR, "failed to produce to topic %s: %s", topic, rd_kafka_err2str(err));
if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL)
{
serverLog(LL_INFO, "internal queue is full");
}
// waiting for seconds and retry ; to avoid consume Message from redis MQ or it will lost Message.
rd_kafka_poll(rk, 1000 /*block for max 1000ms*/);
goto retry;
}
else
{
serverLog(LL_DEBUG, "MQ:enqueued message %s", msg);
}
rd_kafka_poll(rk, 0 /*non-blocking*/);
}
从官方网站下载最新版 RocketMQ 二进制可执行文件,并进入
linux(mac) 运行:
开启名字服务
sh bin/mqnamesrv
开启单机 broker
sh bin/mqbroker -n localhost:9876
简单测试
运行发送、接收消息的示例之前,需要为 client 提供名字服务器地址
RocketMQ 提供多种方式为 client 提供名字服务器地址
这里我们通过设置环境变量的方式实现
export NAMESRV_ADDR=localhost:9876
接收消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
发送消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer