共享内存数据库--设计及实现
共享内存:
共享内存是三种IPC机制中的一种.
共享享内存允许两个或多个进程进程共享同一块内存(这块内存会映射到各个进程自己独立的地址空间).
共享内存会映射到进程的虚拟地址空间,进程对其可以直接访问,避免了数据的复制过程。
因此,共享内存是GNU/Linux现在可用的最快速的IPC机制。
我们利用共享内存可以让多个进程共享的特点,并且访问速度快的优点,设计实现共享内存数据库.
简单理解内存映射.png具体内存映射关系,请查看内存映射,并且请查找关于内核空间,用户空间,虚拟内存映射相关的文章.
共享内存数据库:
说是数据库,只是一块连续的内存,就像malloc分配的一样,把数据放在内存中,然后根据一定的规则来进行数据的插入,删除,更新.
共享内存数据库结构:
共享内存数据库由三部分组成:
- TABLE_HEAD_DEF --内存表头,记录当前内存中的详细信息
/*key 的结构体*/
typedef struct
{
int iKeyOffSet; /* key的偏移量,根据这个偏移量在一块内存中来找到key */
int iKeyLen; /* key的长度 */
int iKeyAttr; /* key的类型,int 类型 或者 char类型 */
int iKeySortAttr; /* 排序的方式,是正序排序,还是倒序排序 */
}TABLE_KEY_DEF;
typedef struct
{
char sTableName[HEAD_TABLE_NAME_LEN]; /* 表的名称*/
int iRecordLen; /* 每条记录的长度 每条记录都是一个结构体 */
long lMaxRecordNum; /* 最大数据条数,根据iRecordLen 和LMaxRecordNum 来开辟初始的内存大小 */
int iDoubleMemBufFlag; /*是否创建备库1 创建,0 不创建*/
int iCurrentMem; /*当前指向的内存*/
int iKeyNum; /* key的数量 */
TABLE_KEY_DEF stuTableKeyBuf[MAX_KEY_NUM]; /* key的定义 */
}TABLE_HEAD_DEF;
- MEM_HEAD_DEF --内存头--记录当前空间的记录条数
typedef struct {
long lRecordNum; /* 当前记录条数 */
} MEM_HEAD_DEF;
- DATA_SPACE --数据存储区
共享内存数据库结构图:
共享内存结构.png
数据存储在DATA_SPACE中,根据iRecordLen来区分每条数据,根据key的信息,可以找到对应的数据.
共享内存同步:
我们使用信号量,来进行共享内存之间的同步,并且如果只是访问共享内存,不设锁,只有在更新共享内存,插入共享内存操作的时候,才进行互斥访问.
如何使用共享内存:
我们使用结构体MEMDATABASE
来存放共享内存的信息,是使用内存数据库的句柄,通过MEMDATABASE,我们可以操作对应的共享内存.
typedef struct
{
int iIpcKey; /* 共享内存和信号量的 key*/
int iShmId; /* 共享内存ID*/
int iSemId; /* 信号量ID */
char *pShmArea; /* 指向共享内存首地址的指针 */
}MEMDATABASE;
进程与共享内存之间关系结构图:
进程与共享内存数据库的关系.png
具体实施步骤:
- 进程1,根据IPCKEY创建共享内存数据库,把数据库中的值fetch到共享内存数据库中.
- 进程2,根据IPCKEY连接到创建好的内存数据库中,访问内存数据库.操作完成后,进程退出.
- 进程3,根据IPCKEY连接到内存数据库,进行数据库访问,操作.....
.....
共享内存数据库实现细节:
宏定义返回码:
- 创建数据库:
int CreateMemDB(MEMDATABASE *pMemDb, char *pTableName, long iIpcKey, long lMaxRecord, int RecordLen, int iDoubleMemBufFlag)
{
int iRet;
int iMultiple = 1;
memset((char*)&stuTableHeadBuf, 0, sizeof(TABLE_HEAD_DEF));
memset((char*)&stuMemHeadBuf, 0, sizeof(MEM_HEAD_DEF));
if (strlen(pTableName) > HEAD_TABLE_NAME_LEN)
return T_MDB_ENAMELEN;
/*TABLE_HEAD_DEF 初始化*/
strcpy(stuTableHeadBuf.sTableName, pTableName);
stuTableHeadBuf.iRecordLen = RecordLen;
stuTableHeadBuf.lMaxRecordNum = lMaxRecord;
stuTableHeadBuf.iKeyNum = 0;
stuTableHeadBuf.iDoubleMemBufFlag = iDoubleMemBufFlag;
stuTableHeadBuf.iCurrentMem = 0;
/*是否创建备库*/
if (pMemDb == NULL) return T_MDB_ENOMEM;
if (iDoubleMemBufFlag == 1) iMultiple = 2;
pMemDb->iIpcKey = iIpcKey;
//根据IPCKEY来申请共享内存数据库
pMemDb->iShmId = shmget(iIpcKey,
sizeof(stuTableHeadBuf) +
(stuTableHeadBuf.iRecordLen * stuTableHeadBuf.lMaxRecordNum +
sizeof(MEM_HEAD_DEF)) * iMultiple,
IPC_CREAT|MDB_IPC_MODE);
if (pMemDb->iShmId < 0) {
return T_MDB_ESHMGET;
}
//获取共享内存的首地址
pMemDb->pShmArea = (char*)shmat(pMemDb->iShmId, NULL, 0);
if ((long)pMemDb->pShmArea == - 1) {
return T_MDB_ESHMAT;
}
memcpy(pMemDb->pShmArea, (char*)&stuTableHeadBuf, sizeof(stuTableHeadBuf));
stuMemHeadBuf.lRecordNum = 0;
memcpy(pMemDb->pShmArea + sizeof(stuTableHeadBuf), &stuMemHeadBuf, sizeof(stuMemHeadBuf));
if (iDoubleMemBufFlag == 1)
memcpy(pMemDb->pShmArea + stuTableHeadBuf.iRecordLen * stuTableHeadBuf.lMaxRecordNum
+ sizeof(stuTableHeadBuf) + sizeof(stuMemHeadBuf), (char*)&stuMemHeadBuf, sizeof(stuMemHeadBuf));
//设置信号量
pMemDb->iSemId = semget(iIpcKey, 1, IPC_CREAT|MDB_IPC_MODE);
if (pMemDb->iSemId < 0) {
printf("CreateMemDB error, errno=[%d]", errno);
return T_MDB_ESEMGET;
}
//初始值是0,如果新增一个进程访问,信号量加一
iRet = semctl(pMemDb->iSemId, 0, SETVAL, 0);
if (iRet < 0) {
return T_MDB_ESEMCTL;
}
return T_SUCCESS;
}
- 插入
插入的时候,如果设置了备库标志,则插入操作是向备库中插入,插入成功后,我们需要修改指向主库的指针,来重新插入.
//向备库插入一条数据
int InsertMemDB(MEMDATABASE *pMemDb, char *pInBuffer)
{
int iRet, iNum, iPosition;
char *pRecordBuf;
char *pShmArea;
if (ShmLock(pMemDb->iSemId)) return T_MDB_ESHMLOCK;
memset(&stuMemHeadBuf,0x00,sizeof(stuMemHeadBuf));
GetHead(pMemDb->pShmArea);
GetUnUseMemHead(pMemDb->pShmArea);
pShmArea = LocateUnUseShm(pMemDb);
iRet = SearchRecord(pMemDb, pShmArea, pInBuffer, &iPosition);
if (iRet == T_SUCCESS) {
if (ShmUnLock(pMemDb->iSemId)) return T_MDB_ESHMUNLOCK;
return T_MDB_DUPKEY;
}
if (stuMemHeadBuf.lRecordNum >= stuTableHeadBuf.lMaxRecordNum) {
if (ShmUnLock(pMemDb->iSemId)) return T_MDB_ESHMUNLOCK;
return T_MDB_LACKSPACE;
}
stuMemHeadBuf.lRecordNum++;
iNum = stuMemHeadBuf.lRecordNum - iPosition;
if (iNum > 0) {
memmove(LocateShm(pShmArea, iPosition + 1), LocateShm(pShmArea, iPosition),
stuTableHeadBuf.iRecordLen * iNum);
}
memcpy(LocateShm(pShmArea, iPosition), pInBuffer, stuTableHeadBuf.iRecordLen);
PutUnUseMemHead(pMemDb->pShmArea);
if (ShmUnLock(pMemDb->iSemId)) return T_MDB_ESHMUNLOCK;
return T_SUCCESS;
}
- 加锁解锁
PV操作和平时的正好相反,有进程使用共享内存,则信号量加一,如果此时有进程需要加锁共享内存,则必须等信号量的值为0后,才可以从阻塞状态中返回.
//信号量锁
//只有信号量为0才可以从阻塞状态中恢复,并增加一个信号量的值
int ShmLock(int iSemId)
{
int iRet;
struct sembuf stuSemBuf[2];
stuSemBuf[0].sem_num = 0;
stuSemBuf[0].sem_op = 0;
stuSemBuf[0].sem_flg = 0;
stuSemBuf[1].sem_num = 0;
stuSemBuf[1].sem_op = 1;
stuSemBuf[1].sem_flg = SEM_UNDO;
iRet = semop(iSemId, &stuSemBuf[0], 2);
return iRet;
}
//解锁,信号量的值减一
int ShmUnLock(int iSemId)
{
int iRet;
struct sembuf stuSemBuf;
stuSemBuf.sem_num = 0;
stuSemBuf.sem_op = -1;
stuSemBuf.sem_flg = IPC_NOWAIT|SEM_UNDO;
iRet = semop(iSemId, &stuSemBuf, 1);
return iRet;
}
获取结构体某字段偏移量和结构体大小的两个宏定义:
#define FPOS( type, field ) ( (int)&(( type *) 0)-> field )
#define FLEN( type, field ) ( sizeof((( type *) 0)-> field ))
FPOS: 是把0地址强制转换为type * 类型,然后取field的内存偏移地址,转化为int类型,就是field字段对结构体首地址的偏移量.画图:
结构体偏移量计算.png
以上是共享内存数据库中比较重要的事项,理解了共享内存数据库的结构,基本可以想出如何对共享内存数据库进行操作. 详细代码,在github中,并且附有一个测试案例.查看的话最好结合源码查看.本文只是一个梳理的过程.
共享内存数据库的详细信息都存储在共享内存头部的位置,每次操作,通过传入的MEMDATABASE中的共享内存指针,确定内存块位置,然后获取头部位置的信息(iRecordLen,iMaxRecordlen,key等..),通过头部信息来对数据块进行操作.所以理解头部位置存放的信息很重要.剩下的就是对指针的操作.