python浮点数网络字节序编码

2021-02-22  本文已影响0人  help_youself

 使用python的struct模块,将浮点数据编码成网络字节序,便于在网络进行数据交换。
 byte_codec.py

import struct
# data types
INT8_TYPE=0x01
INT16_TYPE=0x02
INT32_TYPE=0x03
FLOAT_TYPE=0x04
DOUBLE_TYPE=0x05
STRING_TYPE=0x06
def varint_encode(number):
    s = b""
    while True:
        byte = number % 128
        number = number // 128
        # If there are more digits to encode, set the top bit of this digit
        if number > 0:
            byte = byte|0x80
        s = s + struct.pack("!B", byte)
        if number == 0:
            return s
def varint_decode(buffer):
    occupy=0
    multy=1
    remain=b''
    number=0
    l=len(buffer)
    while occupy<l:
        if buffer[occupy]&128:
            number=number+(buffer[occupy]&127)*multy
            occupy=occupy+1
            multy*=128
        else:
            number=number+(buffer[occupy]&127)*multy
            occupy=occupy+1
            break
    if occupy<l:
        remain+=buffer[occupy:]
    return number,remain
def peek_int_encode_len(data):
    length=-1;
    if data<0:
        raise Exception("data less zero")
    if data>=0 and data<=255:
        length=2
    elif data<=65535:
        length=3
    elif data<=4294967295:
        length=5
    else:
        raise Exception("out of range")
    return length
def encode_int(data):
    result=b''
    if data<0:
        raise Exception("data less zero")
    if data>=0 and data<=255:
        result+=struct.pack("B",INT8_TYPE)
        result+=struct.pack("B",data)
    elif data<=65535:
        result+=struct.pack("B",INT16_TYPE)
        result+=struct.pack("!H",data)
    elif data<=4294967295:
        result+=struct.pack("B",INT32_TYPE)
        result+=struct.pack("!I",data)
    else:
        raise Exception("out of range")
    return result
def encode_float(data):
    result=struct.pack("B",FLOAT_TYPE)
    result+=struct.pack("!f",data)
    return result
def encode_double(data):
    result=struct.pack("B",DOUBLE_TYPE)
    result+=struct.pack("!d",data)
    return result
def encode_string(content):
    result=struct.pack("B",STRING_TYPE)
    l=len(content)
    temp=bytes(content, 'utf-8')
    result+=varint_encode(l)
    format=str(l)+"s"
    result+=struct.pack(format,temp)
    return result
def decode_one_element(buffer):
    l=len(buffer)
    type=buffer[0]
    remain=b''
    if type==INT8_TYPE:
        v=buffer[1]
        if l>2:
            remain=buffer[2:]
        return v,remain
    elif type==INT16_TYPE:
        temp=buffer[1:3]
        v,=struct.unpack("!H",temp)
        if l>3:
            remain=buffer[3:]
        return v,remain
    elif type==INT32_TYPE:
        temp=buffer[1:5]
        v,=struct.unpack("!I",temp)
        if l>5:
            remain=buffer[5:]
        return v,remain
    elif type==FLOAT_TYPE:
        temp=buffer[1:5]
        v,=struct.unpack("!f",temp)
        if l>5:
            remain=buffer[5:]
        return v,remain
    elif type==DOUBLE_TYPE:
        temp=buffer[1:9]
        v,=struct.unpack("!d",temp)
        if l>9:
            remain=buffer[9:]
        return v,remain
    elif type==STRING_TYPE:
        n,temp=varint_decode(buffer[1:])
        remain_len=len(temp)-n
        content=temp[0:n]
        if remain_len>0:
            remain=temp[n:]
        return content,remain

 py_codec.py

import struct
import byte_codec as bc
def test_data_deserialization(buffer):
    remain=buffer
    while len(remain)>0:
        v,remain=bc.decode_one_element(remain)
        print (v)
def test_data_serialization():
    a=1234
    b=456.123456
    c="1234567890"
    d=12
    temp=b''
    temp+=bc.encode_int(a)
    temp+=bc.encode_double(b)
    temp+=bc.encode_string(c)
    temp+=bc.encode_int(d)
    length=len(temp)
    result=bc.varint_encode(length)
    result+=temp
    return result
if __name__ == '__main__':
    v=1234.45
    buffer= struct.pack( ">f" ,v)
    result=buffer.hex()
    print (result)
    v=4321.391
    buffer= struct.pack( ">d" ,v)
    result=buffer.hex()
    print (result)
 
    packet=b'\x44\x9a\x4e\x66\x40\xb0\xe1\x64\x18\x93\x74\xbc'
    length=len(packet)
    format=">f"+str(length-4)+"s"
    a,remain= struct.unpack(format,packet)
    print (type(a))
    b,=struct.unpack( ">d" ,remain)
    print (b)
    c,=struct.unpack( ">d" ,packet[4:])
    print (c)
    
    result=test_data_serialization()
    print(result.hex())
    n,remain=bc.varint_decode(result)
    test_data_deserialization(remain)

 我也实现C语言版本,便于交叉验证。
pack.cpp

//https://www.cnblogs.com/luxiaoxun/archive/2012/09/05/2671697.html
#include <iostream>
#include <stdio.h>
#include <memory.h>
#include <stdint.h>
static uint16_t HostToNet16(uint16_t x) { return __builtin_bswap16(x); }
static uint32_t HostToNet32(uint32_t x) { return __builtin_bswap32(x); }
static uint64_t HostToNet64(uint64_t x) { return __builtin_bswap64(x); }
static uint16_t NetToHost16(uint16_t x) { return HostToNet16(x); }
static uint32_t NetToHost32(uint32_t x) { return HostToNet32(x); }
static uint64_t NetToHost64(uint64_t x) { return HostToNet64(x); }
using namespace std;
void pack_float(float *v,char *buffer){
    int a=0;
    memcpy(&a,v,sizeof(int));
    a=HostToNet32(a);
    memcpy(buffer,&a,sizeof(int));
}
float unpack_float(char *buffer){
    float v=0;
    int a=0;
    memcpy(&a,buffer,sizeof(int));
    int tmp=NetToHost32(a);
    memcpy(&v,&tmp,sizeof(int));
    return v;

}
void pack_double(double *v,char*buffer){
    uint64_t a=0;
    memcpy(&a,v,sizeof(uint64_t));
    a=HostToNet64(a);
    memcpy(buffer,&a,sizeof(uint64_t));
}
double unpack_double(char *buffer){
    double v=0;
    uint64_t a=0;
    memcpy(&a,buffer,sizeof(uint64_t));
    uint64_t tmp=NetToHost64(a);
    memcpy(&v,&tmp,sizeof(uint64_t));
    return v;

}
void test_int(){
    int a=0x12345678;
    int b=HostToNet32(a);
    char *p =(char*)(&b);
    for(int i=0;i<4;i++){
        printf("%hhx",p[i]);
    }
    printf("\n");
}
void test_float_codec(){
    float v=1234.45;
    char buffer[4];
    pack_float(&v,buffer);
    float decode=unpack_float(buffer);
    std::cout<<decode<<std::endl;
    for(int i=0;i<4;i++){
        printf("%hhx",buffer[i]);
    }
    printf("\n");
}
void test_double_codec(){
    double v=1234.45;
    char buffer[8];
    pack_double(&v,buffer);
    double decode=unpack_double(buffer);
    std::cout<<decode<<std::endl;
    for(int i=0;i<8;i++){
        printf("%hhx",buffer[i]);
    }
    printf("\n");
}
int main()
{
    test_float_codec();
    test_double_codec();
    return 0;
}

上一篇下一篇

猜你喜欢

热点阅读