kaisawind's blog
  • 关于
  • 所有帖子

go-nsq简单教程 - Tue, Jul 7, 2020

go-nsq简单教程

1. 概述

go-nsq是nsq的GO语言客户端。

2. 示例

2.1 单nsqd

生产者Producer

package main

import (
	"os"
	"os/signal"
	"time"

	"github.com/nsqio/go-nsq"
	"github.com/sirupsen/logrus"
)

func main() {
	c := make(chan os.Signal)
	signal.Notify(c, os.Interrupt)

	producer, err := nsq.NewProducer("192.168.1.168:4150", nsq.NewConfig())
	if err != nil {
		logrus.WithError(err).Errorln("new producer error")
		return
	}
	defer producer.Stop()
	err = producer.Ping()
	if err != nil {
		logrus.WithError(err).Errorln("producer ping err")
		return
	}
Loop:
	for {
		select {
		case <-c:
			break Loop
		case <-time.Tick(1 * time.Second):
			err = producer.Publish("test", []byte("hello world"))
			if err != nil {
				logrus.WithError(err).Errorln("producer publish error")
				break Loop
			}
			logrus.Infoln("publish ok")
		}
	}
}

消费者Consumer

package main

import (
	"github.com/nsqio/go-nsq"
	"github.com/sirupsen/logrus"
)

// MyHandler consumer handler
type MyHandler struct {
	Title string
}

// HandleMessage interface for consumer
func (h *MyHandler) HandleMessage(message *nsq.Message) (err error) {
	logrus.Infoln(message)
	return
}

func main() {
	consumer, err := nsq.NewConsumer("test", "consumer1", nsq.NewConfig())
	if err != nil {
		logrus.WithError(err).Errorln("new consumer error")
		return
	}
	defer consumer.Stop()
	consumer.AddHandler(&MyHandler{Title: "test consumer"})
	err = consumer.ConnectToNSQD("192.168.1.168:4150")
	if err != nil {
		logrus.WithError(err).Errorln("connect to nsq lookup error")
		return
	}
	select {}
}

2.2 单nsqlookupd

生产者Producer(没变化)

package main

import (
	"os"
	"os/signal"
	"time"

	"github.com/nsqio/go-nsq"
	"github.com/sirupsen/logrus"
)

func main() {
	c := make(chan os.Signal)
	signal.Notify(c, os.Interrupt)

	producer, err := nsq.NewProducer("192.168.1.168:4150", nsq.NewConfig())
	if err != nil {
		logrus.WithError(err).Errorln("new producer error")
		return
	}
	defer producer.Stop()
	err = producer.Ping()
	if err != nil {
		logrus.WithError(err).Errorln("producer ping err")
		return
	}
Loop:
	for {
		select {
		case <-c:
			break Loop
		case <-time.Tick(1 * time.Second):
			err = producer.Publish("test", []byte("hello world"))
			if err != nil {
				logrus.WithError(err).Errorln("producer publish error")
				break Loop
			}
			logrus.Infoln("publish ok")
		}
	}
}

消费者Consumer

package main

import (
	"github.com/nsqio/go-nsq"
	"github.com/sirupsen/logrus"
)

// MyHandler consumer handler
type MyHandler struct {
	Title string
}

// HandleMessage interface for consumer
func (h *MyHandler) HandleMessage(message *nsq.Message) (err error) {
	logrus.Infoln(message)
	return
}

func main() {
	consumer, err := nsq.NewConsumer("test", "consumer1", nsq.NewConfig())
	if err != nil {
		logrus.WithError(err).Errorln("new consumer error")
		return
	}
	defer consumer.Stop()
	consumer.AddHandler(&MyHandler{Title: "test consumer"})
	err = consumer.ConnectToNSQLookupd("192.168.1.168:4161") // different
	if err != nil {
		logrus.WithError(err).Errorln("connect to nsq lookup error")
		return
	}
	select {}
}


辽ICP备2021007608号 | © 2025 | kaisawind

Facebook Twitter GitHub

▄