pulsar简介
家电维修 2023-07-16 19:17www.caominkang.com家电维修技术
pulsar简介
pulsar是和kafak同类型的消息处理平台,这两年开始走入大众视野。相比于老牌kafka,新秀pulsar带来了一些让我喜欢的新功能
- 多种订阅方式: 独占、灾备、共享、Key_Shared
- 延迟发送: 消息发送到ic后,consumer过一段时间才消费到这条消息
- 易用: 不需要像kafka那样去计算分区、副本数量等,pulsar不够用了直接无脑扩broker
详细的说明就不赘述了,我们来快速使用体验下。
安装在安装前需要有java环境,本次使用的系统是Ubuntu 20.04, java版本是17.02。
本次使用的pulsar版本是2.10.1。
- 获取安装包
$ get https://archive.apache./dist/pulsar/pulsar-2.10.1/apache-pulsar-2.10.1-bin.tar.gz
- 解压
$ tar xvfz apache-pulsar-2.10.1-bin.tar.gz $ cd apache-pulsar-2.10.1
- 启动服务端
$ ./bin/pulsar standalone
看到如下信息就是启动成功了
2022-08-13T21:34:46,652+0800 [orker-scheduler-0] INFO .apache.pulsar.functions.orker.SchedulerManager - Schedule summary - execution time: 0.033926031 sec | total unassigned: 0 | stats: {"Added": 0, "Updated": 0, "removed": 0} { "c-standalone-f-localhost-8080" : { "originalNumAssignments" : 0, "finalNumAssignments" : 0, "instancesAdded" : 0, "instancesRemoved" : 0, "instancesUpdated" : 0, "alive" : true } }
如果想后台启动可以执行如下命令
$ ./bin/pulsar-daemon start standalone使用
- 查看健康状态
$ ./bin/pulsar-admin brokers healthcheck ok
- 发送一个消息
$ ./bin/pulsar-client produce k-ic --messages "hello test"
如果正常发送的话可以看到如下提示
2022-08-13T21:43:50,559+0800 [main] INFO .apache.pulsar.client.cli.PulsarClientTool - 1 messages suessfully produced
- 消费一个消息
$ ./bin/pulsar-client consume j-ic -s "first-subscription"
如果正常发送的话可以看到刚才发的那个消息
----- got message ----- key:[null], properties:[], content:hello test
可以看到整个过程我们不需要预先做其他的设置,不需要手动去处理zookeeper,启动pulsar后就可以直接使用了。
Go使用pulsar官方客户端除了有java之外,还有Python、go、c++、nodejs、C#,还支持ebsocket、以及REST api
来发送消息,可以说是很方便了。
我们来用go试试。
- 准备好相关扩展
创建一个测试目录,并初始化go mod:
$ mkdir test_dir && cd test_dir $ go mod init test_dir $ go mod tidy
安装pulsar客户端:
$ go get -u "github./apache/pulsar-client-go/pulsar"
- 创建生产者
package main import ( "context" "fmt" "github./apache/pulsar-client-go/pulsar" "log" "time" ) func main() { // 创建客户端 client, err := pulsar.NeClient(pulsar.ClientOptions{ URL: "pulsar://localhost:6650", OperationTimeout: 30 time.Second, ConnectionTimeout: 30 time.Second, }) if err != nil { log.Fatalf("Could not instantiate Pulsar client: %v", err) } // 创建生产者 producer, err := client.CreateProducer(pulsar.ProducerOptions{ Topic: "go-ic", }) if err != nil { log.Fatal(err) } defer client.Close() // 每隔3秒发一次消息 num := 0 for { msg := fmt.Sprintf("hello %d", num) _, err = producer.Send(context.Background(), &pulsar.ProducerMessage{ Payload: []byte(msg), }) num += 1 time.Sleep(time.Duration(3) time.Second) } }
- 创建消费者
package main import ( "context" "fmt" "github./apache/pulsar-client-go/pulsar" "log" "time" ) func main() { // 创建客户端 client, err := pulsar.NeClient(pulsar.ClientOptions{ URL: "pulsar://localhost:6650", OperationTimeout: 30 time.Second, ConnectionTimeout: 30 time.Second, }) if err != nil { log.Fatalf("Could not instantiate Pulsar client: %v", err) } // 创建消费者 consumer, err := client.Subscribe(pulsar.ConsumerOptions{ Topic: "go-ic", SubscriptionName: "my-sub", Type: pulsar.Shared, }) if err != nil { log.Fatal(err) } defer consumer.Close() // 消费消息 for { msg, err := consumer.Receive(context.Background()) if err != nil { log.Fatal(err) } fmt.Printf("Received message msgId: %#v -- content: '%s'n", msg.ID(), string(msg.Payload())) consumer.Ack(msg) } }
- 打包测试
$ go build -o consumer consumer.go $ go build -o produce produce.go $ chmod +x consumer produce
启动一个生产者
$ ./produce INFO[0000] [Connecting to broker] remote_addr="pulsar://localhost:6650" INFO[0000] [TCP connection established] local_addr="127.0.0.1:33210" remote_addr="pulsar://localhost:6650" INFO[0000] [Connection is ready] local_addr="127.0.0.1:33210" remote_addr="pulsar://localhost:6650" INFO[0000] [Created producer] x="127.0.0.1:33210 -> 127.0.0.1:6650" producerID=1 producer_name=standalone-2-9 ic="persistent://public/default/go-ic"
再启动一个消费者,获取消息
$ ./consumer INFO[0000] [Connecting to broker] remote_addr="pulsar://localhost:6650" INFO[0000] [TCP connection established] local_addr="127.0.0.1:33212" remote_addr="pulsar://localhost:6650" INFO[0000] [Connection is ready] local_addr="127.0.0.1:33212" remote_addr="pulsar://localhost:6650" INFO[0000] [Connected consumer] consumerID=1 name=xgqua subscription=my-sub ic="persistent://public/default/go-ic" INFO[0000] [Created consumer] consumerID=1 name=xgqua subscription=my-sub ic="persistent://public/default/go-ic" Received message msgId: pulsar.trackingMessageID{messageID:pulsar.messageID{ledgerID:242, entryID:5, batchIdx:0, partitionIdx:0}, tracker:(pulsar.ackTracker)(nil), consumer:(pulsar.partitionConsumer)(0xc000274ea0), receivedTime:time.Time{all:0xc0b60af1958c2114, ext:2499737025, loc:(time.Location)(0x1024400)}} -- content: 'hello 4' Received message msgId: pulsar.trackingMessageID{messageID:pulsar.messageID{ledgerID:242, entryID:6, batchIdx:0, partitionIdx:0}, tracker:(pulsar.ackTracker)(nil), consumer:(pulsar.partitionConsumer)(0xc000274ea0), receivedTime:time.Time{all:0xc0b60af255a034ea, ext:5501052821, loc:(time.Location)(0x1024400)}} -- content: 'hello 5' Received message msgId: pulsar.trackingMessageID{messageID:pulsar.messageID{ledgerID:242, entryID:7, batchIdx:0, partitionIdx:0}, tracker:(pulsar.ackTracker)(nil), consumer:(pulsar.partitionConsumer)(0xc000274ea0), receivedTime:time.Time{all:0xc0b60af315c8fc87, ext:8503725362, loc:(time.Location)(0x1024400)}} -- content: 'hello 6'
今天的介绍就到这里了,感谢阅读。