golang的reflect.Select使用方法

  1. 1. 概述
    1. 1.1 使用select选择channel
    2. 1.2 关闭管道能够触发所有信号
    3. 1.3 非close信号只有一个接收者
    4. 1.4 for range遍历channel
  2. 2. reflect.Select等待多个channel

golang的reflect.Select使用方法

1. 概述

总结一下GO的channel使用方法

1.1 使用select选择channel

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan struct{})
    go func() {
        for {
            select {
            case <-time.Tick(2 * time.Second):
                ch <- struct{}{}
            }
        }
    }()

    for {
        select {
        case <-ch:
            fmt.Println("hello channel")
        }
    }
}

1.2 关闭管道能够触发所有信号

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan struct{})
    quit := make(chan struct{})
    go func() {
        for {
            select {
            case <-time.After(10 * time.Second):
                close(quit)
                fmt.Println("timeout")
                return
            }
        }
    }()
    go func() {
        for {
            select {
            case <-quit:
                fmt.Println("close timer")
                return
            case <-time.Tick(2 * time.Second):
                ch <- struct{}{}
            }
        }
    }()

Loop:
    for {
        select {
        case <-quit:
            fmt.Println("close channel")
            break Loop
        case <-ch:
            fmt.Println("hello channel")
        }
    }
    time.Sleep(2 * time.Second)
}

output:

hello channel
hello channel
hello channel
hello channel
timeout
close channel
close timer

1.3 非close信号只有一个接收者

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan struct{})
    quit := make(chan struct{})
    go func() {
        for {
            select {
            case <-time.After(10 * time.Second):
                quit <- struct{}{}
                fmt.Println("timeout")
                return
            }
        }
    }()
    go func() {
        for {
            select {
            case <-quit:
                fmt.Println("close timer")
                return
            case <-time.Tick(2 * time.Second):
                ch <- struct{}{}
            }
        }
    }()

Loop:
    for {
        select {
        case <-quit:
            fmt.Println("close channel")
            break Loop
        case <-ch:
            fmt.Println("hello channel")
        }
    }
    time.Sleep(2 * time.Second)
}

output:

hello channel
hello channel
hello channel
hello channel
close channel true
timeout

1.4 for range遍历channel

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan struct{})
    quit := make(chan struct{})
    go func() {
        for range time.After(10 * time.Second) {
            close(quit)
            fmt.Println("timeout")
            return
        }
    }()
    go func() {
        for {
            select {
            case <-quit:
                fmt.Println("close timer")
                return
            case <-time.Tick(2 * time.Second):
                ch <- struct{}{}
            }
        }
    }()

Loop:
    for {
        select {
        case <-quit:
            fmt.Println("close channel")
            break Loop
        case <-ch:
            fmt.Println("hello channel")
        }
    }
    time.Sleep(2 * time.Second)
}

output:

hello channel
hello channel
hello channel
hello channel
timeout
close channel
close timer

2. reflect.Select等待多个channel

当channel是动态增减的时候,有两种方法可以消费channel的数据。

  1. 每个channel单独启动一个协程,用于消费数据
  2. 使用reflect.Select批量接收数据进行消费
// Handle 连接处理
func (s *Server) Handle() (err error) {
    // ...
    s.cases = append(s.cases, reflect.SelectCase{
        Dir:  reflect.SelectRecv,
        Chan: reflect.ValueOf(ss.PubChan()),
    })
    s.delivery <- struct{}{}
    // ...
}

// Delivery 分发消息
func (s *Server) Delivery() (err error) {
    for {
        chosen, value, ok := reflect.Select(s.cases)
        logrus.Debugln("delivery", chosen, ok, value)
        switch chosen {
        case 0:
            // quit
            return
        case 1:
            // cases changed
            if !ok {
                return
            }
            logrus.Infoln("cases count", len(s.cases))
        default:
            if !ok {
                s.cases = append(s.cases[:chosen], s.cases[chosen+1:]...)
                logrus.Infoln("cases count", len(s.cases))
                continue
            }
            p, ok := value.Interface().(*packets.PublishPacket)
            if !ok {
                continue
            }
            for clientID, session := range s.sessions {
                for topic := range session.Topics {
                    if p.TopicName == topic {
                        logrus.Debugf("send %s %v", clientID, p)
                        _ = session.Send(p)
                        break
                    }
                }
            }
        }
    }
}

转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 wind.kaisa@gmail.com

💰

×

Help us with donation