njs使用方法

  1. 1. 物联网模型(用户-产品-设备)
  2. 2. 问题点:
  3. 3. TCP协议(mqtt)
  4. 4. UDP协议(coap)

使用nginx和njs进行负载均衡。

1. 物联网模型(用户-产品-设备)

物联网模型中进行了用户、产品和设备的建模。每个用户包含很多产品,每个产品包含很多设备。每个产品都会有相应的服务。
这样可以保证不同用户的产品和设备互不影响,并能对服务负载进行分担。

njs

2. 问题点:

用户-产品-设备模型将一个服务要做的事情分成了很多服务,但是对外使用时不能暴露不同的端口和域名进行访问。
所以需要有另一个服务进行调度和负载均衡。下面就需要使用nginx和njs进行解决。

3. TCP协议(mqtt)

mqtt是TCP协议。

下面是nginx官方博客介绍怎样对mqtt进行调度:
NGINX Plus for the IoT: Load Balancing MQTT

由于mqtt属于长连接形式的TCP连接,所以只需要在第一次连接时能够进行调度就可以了。

njs日志:

njs中s.log(string)和s.error(string)打出的log都在error_log中,如果需要查看不同等级的log,需要配置error_log的参数。

error_log /var/log/nginx/mqtt_error.log debug;

nginx配置

js_include /etc/nginx/stream_conf.d/steam.js;

js_set     $mqtt_server getMqttServer;

log_format mqtt '$remote_addr [$time_local] $protocol $status $bytes_received '
                '$bytes_sent $upstream_addr $mqtt_server';

server {
    listen 1883;
    preread_buffer_size 1k; # Big enough to read CONNECT packet header
    js_preread parserMqttProductID;
    proxy_pass $mqtt_server;
    proxy_connect_timeout 1s;
    proxy_timeout 1m;

    access_log /var/log/nginx/mqtt_access.log mqtt;
    error_log  /var/log/nginx/mqtt_error.log;
}

njs文件steam.js

///////////////////////////////mqtt///////////////////////////////////////
var client_messages = 1;
var mqttProductID = "";

function getStringLen(s, offset) {
    var len_msb = s.buffer.charCodeAt(offset).toString(16);
    var len_lsb = s.buffer.charCodeAt(offset + 1).toString(16);
    if ( len_lsb.length < 2 ) len_lsb = "0" + len_lsb;
    return parseInt(len_msb + len_lsb, 16);
}

function parserMqttProductID(s) {
    if ( !s.fromUpstream ) {
        if ( s.buffer.toString().length == 0  ) { // Initial calls may contain no data, so
            return s.AGAIN;                       // ask that we get called again
        } else if ( client_messages == 1 ) { // CONNECT is first packet from the client
            // CONNECT packet is 1, using upper 4 bits (00010000 to 00011111)
            var packet_type_flags_byte = s.buffer.charCodeAt(0);
            if ( packet_type_flags_byte >= 16 && packet_type_flags_byte < 32 ) {
                // Calculate remaining length with variable encoding scheme
                var multiplier = 1;
                var remaining_len_val = 0;
                var remaining_len_byte;
                for (var remaining_len_pos = 1; remaining_len_pos < 5; remaining_len_pos++ ) {
                    remaining_len_byte = s.buffer.charCodeAt(remaining_len_pos);
                    if ( remaining_len_byte == 0 ) break; // Stop decoding on 0
                    remaining_len_val += (remaining_len_byte & 127) * multiplier;
                    multiplier *= 128;
                }

                var connect_flags = s.buffer.charCodeAt(remaining_len_pos + 7);
                if((connect_flags & 0x40) == 0 || (connect_flags & 0x80) == 0) {
                    s.log("miss username or password");
                    return s.AGAIN;
                }

                // Extract ClientId based on length defined by 2-byte encoding
                var payload_offset = remaining_len_pos + 10; // Skip fixed header
                payload_offset += 2 + getStringLen(s, payload_offset);

                if(connect_flags & 0x04) {
                    payload_offset += 2 + getStringLen(s, payload_offset);
                    payload_offset += 2 + getStringLen(s, payload_offset);
                }
                var username_len_int = getStringLen(s, payload_offset);
                var username_str = s.buffer.substr(payload_offset + 2, username_len_int);

                var str_arr = username_str.split("&");
                if (str_arr.length > 1) {
                    mqttProductID = str_arr[1];
                } else {
                    s.log("Wrong username format")
                    return s.AGAIN;
                }

            } else {
                s.log("Received unexpected MQTT packet type + flags: " + packet_type_flags_byte.toString());
                return s.AGAIN;
            }
        }
        client_messages++;
    }
    return s.OK;
}

function getMqttServer() {
    return "broker-" + mqttProductID + "." + "default" + ".svc.cluster.local:1883";
}

4. UDP协议(coap)

coap是UDP协议,是http协议的变种。

由于coap协议是短连接协议,所以需要在每次连接时都需要调度。

njs日志:

njs中s.log(string)和s.error(string)打出的log都在error_log中,如果需要查看不同等级的log,需要配置error_log的参数。

error_log /var/log/nginx/mqtt_error.log debug;

nginx配置

js_include /etc/nginx/stream_conf.d/steam.js;

js_set     $coap_server getCoapServer;

log_format coap '$remote_addr [$time_local] $protocol $status $bytes_received '
                '$bytes_sent $upstream_addr $coap_server';

server {
    listen 5683 udp;
    preread_buffer_size 1k; # Big enough to read CONNECT packet header
    js_preread parserCoapProductID;
    proxy_pass $coap_server;
    proxy_connect_timeout 1s;
    proxy_timeout 1m;

    access_log /var/log/nginx/coap_access.log coap;
    error_log  /var/log/nginx/coap_error.log;
}

njs文件steam.js

///////////////////////////////coap///////////////////////////////////////
var coapProductID = "";

function parseChar(c) {
    var hex = '0x' + c.toString('hex');
    return parseInt(hex);
}

function parserCoapProductID(s) {

    s.log("buffer:" + s.buffer.toString('hex'));
    s.log("remoteAddress:" + s.remoteAddress.toString());
    s.log("fromUpstream:" + s.fromUpstream.toString());

    if (s.fromUpstream) {
        return s.OK;
    }

    var data = s.buffer;
    s.log('dataLen:' + data.length.toString());
    if (data.length < 4) {
        s.error("truncated");
        return s.ERROR;
    }

    var data0 = parseChar(data[0]);
    if (data0 >> 6 != 1) {
        s.error("version is error, version need to be 1, but " + (data0 >> 6).toString(10));
        return s.ERROR;
    }

    var tokenLen = data0 & 0x0f;
    s.log("tokenLen:" + tokenLen);
    if (tokenLen > 8) {
        s.error("token length error" + tokenLen.toString());
        return s.ERROR;
    }

    if (data.length < (4 + tokenLen)) {
        s.error("truncated");
        return s.ERROR;
    }

    var b = data.slice(4 + tokenLen, data.length);
    var prev = 0

    var parseExtOpt = function(opt) {
        switch (opt) {
        case 13: // extoptByteCode
            if (b.length < 1) {
                return -1;
            }
            opt = parseChar(b[0]) + 13;
            b = b.slice(1, b.length);
            break;
        case 14: // extoptWordCode
            if (b.length < 2) {
                return -1;
            }
            var b0 = parseChar(b[0])
            var b1 = parseChar(b[1])
            opt = (b1 | (b0 << 8)) + 269;
            b = b.slice(2, b.length);
            break;
        }
        return opt
    }

    while (b.length > 0) {
        var b0 = parseChar(b[0]);
        if (b0 == 0xff) {
            b = b.slice(1, b.length);
            break;
        }

        var delta = b0 >> 4; // b[0] >> 4
        var length = b0 & 0x0f;

        // 0xF0 || 0x0F
        if ((delta == 15) || (length == 15)) {
            return s.ERROR;
        }
        b = b.slice(1, b.length);

        delta = parseExtOpt(delta);
        if (delta == -1) {
            return s.ERROR;
        }
        length = parseExtOpt(length);
        if (length == -1) {
            return s.ERROR;
        }

        if (b.length < length) {
            s.error("truncated");
            return s.ERROR;
        }

        var oid = prev + delta;
        var valueBuf = b.slice(0, length);
        s.log("valueBuf:" + valueBuf.toString('hex'));
        s.log("oid:" + oid.toString())
        if (oid != 11) {
            b = b.slice(length, b.length);
            prev = oid;
            continue;
        }

        if ((valueBuf.length < 0) || (valueBuf.length > 255)) {
            s.error("valueBuf.length:" + valueBuf.length.toString());
            return s.ERROR;
        }
        coapProductID = valueBuf.toString()
        s.log("coapProductID:" + coapProductID);
        return s.OK;
    }
    return s.OK;
}

function getCoapServer() {
    return "broker-" + coapProductID + "." + "default" + ".svc.cluster.local:5683";
}

转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 wind.kaisa@gmail.com

文章标题:njs使用方法

本文作者:kaisawind

发布时间:2019-03-14, 10:48:37

最后更新:2020-11-18, 15:55:44

原始链接:https://kaisawind.gitee.io/2019/03/14/2019-03-14-njsuse/

版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。

目录
×

喜欢就点赞,疼爱就打赏