CherieLi Student

libaio学习

2023-04-10
CherieLi

  • content

libaio是linux版本的aio库,aio指的是异步io。libaio的使用,主要过程为:libaio的初始化,io请求的下发和回收,libaio销毁。

libaio的接口

libaio的初始化

int io_setup(int maxevents, io_context_t *ctxp);
// maxevents表示能处理的最大事件数,同时初始化结构io_context_t,调用之前ctxp的内容必须初始化为0

提交IO读写请求

long io_submit (io_context_t ctx_id, long nr, struct iocb **iocbpp);
// 提交任务前,必须先填充iocb结构体。可以使用io_prep_pwrite和io_prep_pread接口生成iocb的结构体,这其中涉及到fd,需要先open对应的文件,并设置为O_DIRECT标志。另外,buff需要先对齐。

struct io_iocb_common {
	PADDEDptr(void	*buf, __pad1); // 缓存空间
	PADDEDul(nbytes, __pad2); // 字节数
	long long	offset; // 偏移
	long long	__pad3;
	unsigned	flags;
	unsigned	resfd;
};	/* result code is the amount read or -'ve errno */

struct iocb {
	PADDEDptr(void *data, __pad1);	/* Return in the io completion event */ 用户回调函数可以通过io_set_callback设置
	PADDED(unsigned key, __pad2);	/* For use in identifying io requests */
 
	short		aio_lio_opcode;	
	short		aio_reqprio;
	int		aio_fildes;
 
	union {
		struct io_iocb_common		c;
		struct io_iocb_vector		v;
		struct io_iocb_poll		poll;
		struct io_iocb_sockaddr	saddr;
	} u;
};

void io_prep_pread(struct iocb *iocb, int fd, void *buf, size_t count, long long offset)
{
    memset(iocb, 0, sizeof(*iocb));
    iocb->aio_fildes = fd;
    iocb->aio_lio_opcode = IO_CMD_PREAD;
    iocb->aio_reqprio = 0;
    iocb->u.c.buf = buf;
    iocb->u.c.nbytes = count;
    iocb->u.c.offset = offset;
}

void io_prep_pwrite(struct iocb *iocb, int fd, void *buf, size_t count, long long offset)
{
    memset(iocb, 0, sizeof(*iocb));
    iocb->aio_fildes = fd;
    iocb->aio_lio_opcode = IO_CMD_PWRITE;
    iocb->aio_reqprio = 0;
    iocb->u.c.buf = buf; // 缓存空间
    iocb->u.c.nbytes = count; // 字节数
    iocb->u.c.offset = offset; // 偏移
}
// 这里需要注意:buf必须是按照扇区对齐的(512的倍数),可以使用posix_memalign进行对齐。

(1)为什么是O_DIRECT标志

O_DIRECT标志指采用DIRECT I/O,相对于此,有个Buffered I/O。后者需要将用户空间的数据拷贝到内核缓存区,然后再拷贝到磁盘,这操作造成了CPU资源和内存资源的浪费。而DIRECT I/O,不需要通过内核缓存数据,直接将用户空间数据拷贝到磁盘。所以选择O_DIRECT标志,利于减少内存拷贝和CPU利用率。但是这样做的话,需要用户保证内存对齐,否则数据是无法正常写入的,可以使用posix_memalign进行内存对齐。读写缓冲区的地址、内容的大小、文件偏移必须是扇区的倍数(通常是512字节)。

(2)posix_memalign的使用

#include <stdlib.h>
int posix_memalign (void **memptr,
                    size_t alignment,
                    size_t size);

如果调用posix_memalign()成功时,会返回size字节的动态内存,并且这块内存的地址是alignment的倍数。参数alignment必须是2的幂,还是void指针的大小的倍数。返回的内存块的地址放在了memptr里面,函数返回值是0。

如果调用失败时,没有内存会被分配,memptr的值没有被定义,返回如下错误码之一:

EINVAL:参数不是2的幂,或者不是void指针的倍数。

ENOMEM:没有足够的内存去满足函数的请求。

要注意的是,对于这个函数,errno不会被设置,只能通过返回值得到。

由posix_memalign()获得的内存通过free( )释放。

IO请求返回

读写io请求下发后,使用io_getevents函数等待io结束信号。

int io_getevents(io_context_t ctx_id, long min_nr, long nr, struct io_event *events, struct timespec *timeout);
// 返回events数组,events为数组首地址;
// nr为数组长度,即最大返回的event数;
// min_nr为最少返回的event数;
// timieout表示超时时间,填写NULL表示无等待超时

// io_event的结构体内容:
struct io_event {undefined
    PADDEDptr(void *data, __pad1);
    PADDEDptr(struct iocb *obj,  __pad2);
    PADDEDul(res,  __pad3);
    PADDEDul(res2, __pad4);
};
// res为实际完成的字节数
// res2位读写成功的状态,0表示成功
// obj为io_submit中下发的内容

IO取消或者销毁

int io_destroy(io_context_t ctx); // 跟io_setup对应
int io_cancel(io_context_t ctx, struct iocb *iocb, struct io_event *evt); // 取消之前通过io_submit传入的iocb

实例

单独使用aio

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <libaio.h>
#include <fcntl.h>
#include <errno.h>

int aio_write(void *buff, struct iocb* p)
{
    strcpy(buff, "hello world");
    int fd = open("./aiotest.txt", O_WRONLY | O_CREAT | O_APPEND | O_DIRECT, 0644);
    io_prep_pwrite(p, fd, buff, 1024, 0);
    return fd;
}

int aio_read(void *buff, struct iocb* p)
{
    int fd = open("./aiotest.txt", O_RDONLY | O_CREAT | O_APPEND | O_DIRECT, 0644);
    io_prep_pread(p, fd, buff, 1024, 0);
    return fd;
}

int main()
{
    io_context_t ctx;
    memset(&ctx, 0, sizeof(ctx));
    int ret = io_setup(10, &ctx);
    if (ret != 0)
    {
        printf("io_setup fail: errno=%d, ret=%d\n", errno, ret);// 注意,errno并没有记录相关的错误
        return -1;
    }

    struct iocb iocbpp;
    struct iocb *p = &iocbpp;
    void *buff = NULL;
    posix_memalign(&buff, 512, 1024);
    int ret1 = aio_write(buff, p);
    int fd = aio_read(buff, p); // aio_write(buff, p);

    ret = io_submit(ctx, 1, &p);
    if (ret != 1)
    {
        io_destroy(ctx);
        free(buff);
        printf("io_submit fail, errno=%d, ret=%d\n", errno, ret);
        return -1;
    }

    struct io_event events[10];
    if (io_getevents(ctx, 1, 10, events, NULL) != 1)
    {
        printf("io_getevents fail\n");
        close(fd);
        io_destroy(ctx);
        free(buff);
        return -1;
    }

    printf("read result: %s\n", events[0].obj->u.c.buf);
    printf("read res: %s\n", buff);

    close(fd);
    io_destroy(ctx);
    free(buff);

    return 0;
}

遇到的问题1:fatal error: libaio.h: No such file or directory

原因:缺少 libaio-devel

解决方法:yum install libaio-devel

遇到的问题2:error: invalid conversion from ‘void’ to ‘char’ [-fpermissive]

解决办法:编译选项中添加 -fpermissive

遇到的问题3:

aio_test.cpp:(.text.startup+0x21): undefined reference to `io_setup'
/usr/bin/ld: aio_test.cpp:(.text.startup+0x75): undefined reference to `io_submit'
/usr/bin/ld: aio_test.cpp:(.text.startup+0x99): undefined reference to `io_getevents'
/usr/bin/ld: aio_test.cpp:(.text.startup+0xd8): undefined reference to `io_destroy'
/usr/bin/ld: aio_test.cpp:(.text.startup+0x107): undefined reference to `io_destroy'
/usr/bin/ld: aio_test.cpp:(.text.startup+0x146): undefined reference to `io_destroy'

解决办法:编译选项中添加-laio

最后的编译选项:g++ -O2 -fpermissive -laio aio_test.cpp

输出结果:

read result: hello world
read res: hello world

aio和epoll结合

需要使用到io_set_eventfd,将evenfd写到iocb中,io_set_eventfd的定义如下:

void io_set_eventfd(struct iocb *iocb, int eventfd)
{
    iocb->u.c.flags |= (1 << 0) /* IOCB_FLAG_RESFD */;
    iocb->u.c.resfd = eventfd;
}

具体步骤:

(1)创建eventfd

efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);

(2)将eventfd设置到iocb中

io_set_eventfd(iocb, efd);

(3)提交aio请求

io_submit(ctx, NUM_EVENTS, iocb);

(4)创建epollfd,并将eventfd加到epoll中

epfd = epoll_create(1);
epoll_ctl(epfd, EPOLL_CTL_ADD, efd, &epevent);
epoll_wait(epfd, &epevent, 1, -1);

(5)当eventfd可读时,从eventfd读出完成IO请求的数量,并用io_getevents获取这些IO

read(efd, &finished_aio, sizeof(finished_aio);
r = io_getevents(ctx, 1, NUM_EVENTS, events, &tms);

具体实例

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <libaio.h>
#include <fcntl.h>
#include <errno.h>
#include <sys/eventfd.h>
#include <sys/epoll.h>

int aio_write(void *buff, struct iocb* p)
{
    strcpy((char*)buff, "hello world");
    int fd = open("./aiotest.txt", O_WRONLY | O_CREAT | O_APPEND | O_DIRECT, 0644);
    io_prep_pwrite(p, fd, buff, 1024, 0);
    return fd;
}

int aio_read(void *buff, struct iocb* p)
{
    int fd = open("./aiotest.txt", O_RDONLY | O_CREAT | O_APPEND | O_DIRECT, 0644);
    io_prep_pread(p, fd, buff, 1024, 0);
    return fd;
}

int main()
{
    io_context_t ctx;
    memset(&ctx, 0, sizeof(ctx));
    int ret = io_setup(10, &ctx);
    if (ret != 0)
    {
        printf("io_setup fail: errno=%d, ret=%d\n", errno, ret);
        return -1;
    }

    struct iocb iocbpp;
    struct iocb *p = &iocbpp;
    void *buff = NULL;
    posix_memalign(&buff, 512, 1024);
    int ret1 = aio_write(buff, p);
    int fd = aio_read(buff, p); // aio_write(buff, p);

    int efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
    io_set_eventfd(p, efd);

    ret = io_submit(ctx, 1, &p);
    if (ret != 1)
    {
        io_destroy(ctx);
        free(buff);
        printf("io_submit fail, errno=%d, ret=%d\n", errno, ret);
        return -1;
    }

    int epfd = epoll_create(1);
    struct epoll_event event;
    event.events = EPOLLET | EPOLLIN;
    event.data.fd = efd;
    epoll_ctl(epfd, EPOLL_CTL_ADD, efd, &event);
    uint64_t data;

    while(true)
    {
        ret = epoll_wait(epfd, &event, 1, -1);
        if (ret > 0)
        {
            ret = read(efd, &data, sizeof(data)); // 这里会阻塞
            if (ret < 0)
            {
                printf("read fail\n");
            }
            printf("success read from efd, read %d bytes, data is %d\n", ret, data);
            struct io_event events[10];
            if (io_getevents(ctx, 1, 10, events, NULL) != 1)
            {
                printf("io_getevents fail\n");
                close(fd);
                io_destroy(ctx);
                free(buff);
                return -1;
            }
             printf("read result: %s\n", events[0].obj->u.c.buf);
             break;
        }
        else if (ret == 0)
        {
            printf("epoll wait timeout\n");
            break;
        }
        else
        {
            printf("epoll wait error\n");
            break;
        }
    }

    close(fd);
    io_destroy(ctx);
    free(buff);

    return 0;
}

编译选项:g++ -O2 -fpermissive -laio aio.cpp

运行结果:

success read from efd, read 8 bytes, data is 1
read result: hello world

参考文档

https://bbs.huaweicloud.com/blogs/327536

https://blog.csdn.net/enlaihe/article/details/112320114?spm=1001.2014.3001.5506

https://blog.csdn.net/cjsycyl/article/details/9332241

https://www.bilibili.com/read/cv16197439

https://bbs.huaweicloud.com/blogs/327536

https://zhuanlan.zhihu.com/p/145394613?from_voters_page=true

https://blog.csdn.net/cllcsy/article/details/107029887

https://cloud.tencent.com/developer/article/1810605

https://bbs.huaweicloud.com/blogs/327109 linux epoll的使用


上一篇 vscode

下一篇 clion学习

Comments

Content