程序员大数据 爬虫Python AI Sql架构算法设计模式和编程理论

共享内存数据库--设计及实现

2017-03-28  本文已影响535人  BlackChen

共享内存:

共享内存是三种IPC机制中的一种.
共享享内存允许两个或多个进程进程共享同一块内存(这块内存会映射到各个进程自己独立的地址空间).
共享内存会映射到进程的虚拟地址空间,进程对其可以直接访问,避免了数据的复制过程。
因此,共享内存是GNU/Linux现在可用的最快速的IPC机制。

我们利用共享内存可以让多个进程共享的特点,并且访问速度快的优点,设计实现共享内存数据库.

简单理解内存映射.png
具体内存映射关系,请查看内存映射,并且请查找关于内核空间,用户空间,虚拟内存映射相关的文章.

共享内存数据库:

说是数据库,只是一块连续的内存,就像malloc分配的一样,把数据放在内存中,然后根据一定的规则来进行数据的插入,删除,更新.

共享内存数据库结构:

共享内存数据库由三部分组成:

/*key 的结构体*/
typedef struct
{
    int iKeyOffSet;    /* key的偏移量,根据这个偏移量在一块内存中来找到key */
    int iKeyLen;       /* key的长度 */
    int iKeyAttr;      /* key的类型,int 类型 或者 char类型 */
    int iKeySortAttr;  /* 排序的方式,是正序排序,还是倒序排序 */
}TABLE_KEY_DEF;
typedef struct
{
    char sTableName[HEAD_TABLE_NAME_LEN]; /* 表的名称*/
    int  iRecordLen;      /* 每条记录的长度 每条记录都是一个结构体 */
    long lMaxRecordNum;   /* 最大数据条数,根据iRecordLen 和LMaxRecordNum 来开辟初始的内存大小 */
    int  iDoubleMemBufFlag; /*是否创建备库1 创建,0 不创建*/
    int  iCurrentMem;   /*当前指向的内存*/
    int  iKeyNum;         /* key的数量 */
    TABLE_KEY_DEF stuTableKeyBuf[MAX_KEY_NUM];   /* key的定义 */
}TABLE_HEAD_DEF;
typedef struct {
    long lRecordNum;      /* 当前记录条数 */
} MEM_HEAD_DEF;

共享内存数据库结构图:


共享内存结构.png

数据存储在DATA_SPACE中,根据iRecordLen来区分每条数据,根据key的信息,可以找到对应的数据.

共享内存同步:
我们使用信号量,来进行共享内存之间的同步,并且如果只是访问共享内存,不设锁,只有在更新共享内存,插入共享内存操作的时候,才进行互斥访问.

如何使用共享内存:
我们使用结构体MEMDATABASE来存放共享内存的信息,是使用内存数据库的句柄,通过MEMDATABASE,我们可以操作对应的共享内存.

typedef struct
{
    int iIpcKey;       /* 共享内存和信号量的 key*/
    int iShmId;        /* 共享内存ID*/
    int iSemId;        /* 信号量ID */
    char *pShmArea;    /* 指向共享内存首地址的指针 */
}MEMDATABASE;

进程与共享内存之间关系结构图:


进程与共享内存数据库的关系.png

具体实施步骤:

  1. 进程1,根据IPCKEY创建共享内存数据库,把数据库中的值fetch到共享内存数据库中.
  2. 进程2,根据IPCKEY连接到创建好的内存数据库中,访问内存数据库.操作完成后,进程退出.
  3. 进程3,根据IPCKEY连接到内存数据库,进行数据库访问,操作.....
    .....

共享内存数据库实现细节:
宏定义返回码:

返回码宏定义
int CreateMemDB(MEMDATABASE *pMemDb, char *pTableName, long iIpcKey, long lMaxRecord, int RecordLen, int iDoubleMemBufFlag)
{
  int         iRet;
  int         iMultiple = 1;
  memset((char*)&stuTableHeadBuf, 0, sizeof(TABLE_HEAD_DEF));
  memset((char*)&stuMemHeadBuf, 0, sizeof(MEM_HEAD_DEF));
  if (strlen(pTableName) > HEAD_TABLE_NAME_LEN)
           return T_MDB_ENAMELEN;

  /*TABLE_HEAD_DEF  初始化*/
  strcpy(stuTableHeadBuf.sTableName, pTableName);
  stuTableHeadBuf.iRecordLen = RecordLen;
  stuTableHeadBuf.lMaxRecordNum = lMaxRecord;
  stuTableHeadBuf.iKeyNum = 0;
  stuTableHeadBuf.iDoubleMemBufFlag = iDoubleMemBufFlag;
  stuTableHeadBuf.iCurrentMem = 0;
  
  /*是否创建备库*/
  if (pMemDb == NULL) return T_MDB_ENOMEM;
  if (iDoubleMemBufFlag == 1) iMultiple = 2;

  pMemDb->iIpcKey = iIpcKey;
//根据IPCKEY来申请共享内存数据库
  pMemDb->iShmId = shmget(iIpcKey, 
          sizeof(stuTableHeadBuf) + 
          (stuTableHeadBuf.iRecordLen * stuTableHeadBuf.lMaxRecordNum + 
           sizeof(MEM_HEAD_DEF)) * iMultiple,
          IPC_CREAT|MDB_IPC_MODE);
  if (pMemDb->iShmId < 0) {
      return T_MDB_ESHMGET;
  }   
  //获取共享内存的首地址
  pMemDb->pShmArea = (char*)shmat(pMemDb->iShmId, NULL, 0); 
  if ((long)pMemDb->pShmArea == - 1) {
      return T_MDB_ESHMAT;
  }
  memcpy(pMemDb->pShmArea, (char*)&stuTableHeadBuf, sizeof(stuTableHeadBuf));

  stuMemHeadBuf.lRecordNum = 0;
  memcpy(pMemDb->pShmArea + sizeof(stuTableHeadBuf), &stuMemHeadBuf, sizeof(stuMemHeadBuf)); 
  if (iDoubleMemBufFlag == 1)  
      memcpy(pMemDb->pShmArea + stuTableHeadBuf.iRecordLen * stuTableHeadBuf.lMaxRecordNum
              + sizeof(stuTableHeadBuf) + sizeof(stuMemHeadBuf), (char*)&stuMemHeadBuf, sizeof(stuMemHeadBuf));
  //设置信号量
  pMemDb->iSemId = semget(iIpcKey, 1, IPC_CREAT|MDB_IPC_MODE);
  if (pMemDb->iSemId < 0) {
      printf("CreateMemDB error, errno=[%d]", errno);
      return T_MDB_ESEMGET;
  }
//初始值是0,如果新增一个进程访问,信号量加一
  iRet = semctl(pMemDb->iSemId, 0, SETVAL, 0);
  if (iRet < 0) {
      return T_MDB_ESEMCTL;
  }

  return T_SUCCESS;
}
//向备库插入一条数据
int InsertMemDB(MEMDATABASE *pMemDb, char *pInBuffer)
{
    int iRet, iNum, iPosition;
    char *pRecordBuf;
    char *pShmArea;
    if (ShmLock(pMemDb->iSemId)) return T_MDB_ESHMLOCK;
    memset(&stuMemHeadBuf,0x00,sizeof(stuMemHeadBuf));
    GetHead(pMemDb->pShmArea);
    GetUnUseMemHead(pMemDb->pShmArea);
    pShmArea = LocateUnUseShm(pMemDb);
    iRet = SearchRecord(pMemDb, pShmArea, pInBuffer, &iPosition);
    if (iRet == T_SUCCESS) {
        if (ShmUnLock(pMemDb->iSemId)) return T_MDB_ESHMUNLOCK;
        return T_MDB_DUPKEY;
    }

    if (stuMemHeadBuf.lRecordNum >= stuTableHeadBuf.lMaxRecordNum) {
        if (ShmUnLock(pMemDb->iSemId)) return T_MDB_ESHMUNLOCK;
        return T_MDB_LACKSPACE;
    }

    stuMemHeadBuf.lRecordNum++;
    iNum = stuMemHeadBuf.lRecordNum - iPosition;
    if (iNum > 0) {
        memmove(LocateShm(pShmArea, iPosition + 1), LocateShm(pShmArea, iPosition),
                stuTableHeadBuf.iRecordLen * iNum);
    }
    memcpy(LocateShm(pShmArea, iPosition), pInBuffer, stuTableHeadBuf.iRecordLen);
    PutUnUseMemHead(pMemDb->pShmArea);
    if (ShmUnLock(pMemDb->iSemId)) return T_MDB_ESHMUNLOCK;
    return T_SUCCESS;
}
//信号量锁
//只有信号量为0才可以从阻塞状态中恢复,并增加一个信号量的值
int ShmLock(int iSemId)
{   
    int    iRet;
    struct sembuf stuSemBuf[2];
    
    stuSemBuf[0].sem_num = 0;
    stuSemBuf[0].sem_op =  0;
    stuSemBuf[0].sem_flg = 0;
    
    stuSemBuf[1].sem_num = 0;
    stuSemBuf[1].sem_op =  1;
    stuSemBuf[1].sem_flg = SEM_UNDO;
    iRet = semop(iSemId, &stuSemBuf[0], 2);
    return iRet;
}
//解锁,信号量的值减一
int ShmUnLock(int iSemId)
{   
    int iRet;
    struct sembuf stuSemBuf;
   
    stuSemBuf.sem_num = 0;
    stuSemBuf.sem_op = -1;
    stuSemBuf.sem_flg = IPC_NOWAIT|SEM_UNDO;
    iRet = semop(iSemId, &stuSemBuf, 1);
    return iRet;
}

获取结构体某字段偏移量和结构体大小的两个宏定义:

#define FPOS( type, field ) ( (int)&(( type *) 0)-> field )
#define FLEN( type, field ) ( sizeof((( type *) 0)-> field ))

FPOS: 是把0地址强制转换为type * 类型,然后取field的内存偏移地址,转化为int类型,就是field字段对结构体首地址的偏移量.画图:


结构体偏移量计算.png

以上是共享内存数据库中比较重要的事项,理解了共享内存数据库的结构,基本可以想出如何对共享内存数据库进行操作. 详细代码,在github中,并且附有一个测试案例.查看的话最好结合源码查看.本文只是一个梳理的过程.

共享内存数据库的详细信息都存储在共享内存头部的位置,每次操作,通过传入的MEMDATABASE中的共享内存指针,确定内存块位置,然后获取头部位置的信息(iRecordLen,iMaxRecordlen,key等..),通过头部信息来对数据块进行操作.所以理解头部位置存放的信息很重要.剩下的就是对指针的操作.

上一篇下一篇

猜你喜欢

热点阅读