优秀的编程知识分享平台

网站首页 > 技术文章 正文

dpdk内存管理—rte_malloc实现(dpdk memzone)

nanyue 2024-07-25 05:58:38 技术文章 26 ℃

DPDK以两种方式对外提供内存管理方法,一个是rte_mempool,主要用于网卡数据包的收发;一个是rte_malloc,主要为应用程序提供内存使用接口。这里我们主要讲一下rte_malloc函数。

rte_malloc实现的大体流程如下图所示。

下面我们逐个函数分析。

rte_malloc

/*
 * Allocate memory on default heap.
 */
void *
rte_malloc(const char *type, size_t size, unsigned align)
{
         return rte_malloc_socket(type, size, align, SOCKET_ID_ANY);
}

这个函数没什么可说的,直接调用rte_malloc_socket,但注意传入的socketid参数为SOCKET_ID_ANY。

rte_malloc_socket

从这个函数的入口检查可以看出,如果传入的分配内存大小size为0或对其align不是2次方的倍数就返回NULL。

void *
rte_malloc_socket(const char *type, size_t size, unsigned align, int socket_arg)
{
         struct rte_mem_config *mcfg = rte_eal_get_configuration()->mem_config;
         int socket, i;
         void *ret;
 
         /* return NULL if size is 0 or alignment is not power-of-2 */
         if (size == 0 || (align && !rte_is_power_of_2(align)))
                   return NULL;
 
         if (!rte_eal_has_hugepages())
                   socket_arg = SOCKET_ID_ANY;
    /*如果传入的socket参数为SOCKET_ID_ANY ,则会先尝试在当前socket上分配内存*/
         if (socket_arg == SOCKET_ID_ANY)
                   socket = malloc_get_numa_socket(); /*获取当前socket_id*/
         else
                   socket = socket_arg;
 
         /* Check socket parameter */
         if (socket >= RTE_MAX_NUMA_NODES)
                   return NULL;
    /*尝试在当前socket上分配内存,如果分配成功则返回*/
         ret = malloc_heap_alloc(&mcfg->malloc_heaps[socket], type,
                                     size, 0, align == 0 ? 1 : align, 0);
         if (ret != NULL || socket_arg != SOCKET_ID_ANY)
                   return ret;
    /*尝试在其他socket上分配内存,直到分配成功或者所有socket都尝试失败*/
         /* try other heaps */
         for (i = 0; i < RTE_MAX_NUMA_NODES; i++) {
                   /* we already tried this one */
                   if (i == socket)
                            continue;
 
                   ret = malloc_heap_alloc(&mcfg->malloc_heaps[i], type,
                                               size, 0, align == 0 ? 1 : align, 0);
                   if (ret != NULL)
                            return ret;
         }
 
         return NULL;
}

到这里我们可以得到一个结论,在开启NUMA时rte_malloc会优先在当前socket上分配内存,如果分配失败再尝试在其他socket上分配内存

malloc_heap_alloc

这个函数用来模拟从heap中(也就是struct malloc_heap)分配内存,其调用逻辑图如下:

void *
malloc_heap_alloc(struct malloc_heap *heap,
                   const char *type __attribute__((unused)), size_t size, unsigned flags,
                   size_t align, size_t bound)
{
         struct malloc_elem *elem;
    /*将size调整为cache line对齐*/
         size = RTE_CACHE_LINE_ROUNDUP(size);
         align = RTE_CACHE_LINE_ROUNDUP(align);
 
         rte_spinlock_lock(&heap->lock);
    /*找到合适的malloc_elem结构*/
         elem = find_suitable_element(heap, size, flags, align, bound);
         if (elem != NULL) {
                   elem = malloc_elem_alloc(elem, size, align, bound);
                   /* increase heap's count of allocated elements */
                   heap->alloc_count++; /*计数加一*/
         }
         rte_spinlock_unlock(&heap->lock);
 
         return elem == NULL ? NULL : (void *)(&elem[1]);
}

注意最后的返回值,返回的是elem[1]的地址,而不是elem的地址。elem[1]是什么呢?其实就是elem+1。说的直观点,rte_malloc其实就是分配了一个内存块,也可以说是分配了一个malloc_elem,这个malloc_elem作为这个内存块的一部分(存放在开头),相当于这个内存块的描述符,真正可以使用的内存是malloc_elem之后的内存区域。如下图所示。

在补一张内存初始化中讲到的数据结构关系图。


下面看下find_suitable_element函数是如何找到合适的malloc_elem的。

【文章福利】:小编整理了一些个人觉得比较好的学习书籍、视频资料共享在qun文件里面,有需要的可以自行添加哦!832218493

find_suitable_elemen

static struct malloc_elem *
find_suitable_element(struct malloc_heap *heap, size_t size,
                   unsigned flags, size_t align, size_t bound)
{
         size_t idx;
         struct malloc_elem *elem, *alt_elem = NULL;
    /*根据申请内存的大小,在struct malloc_heap->free_head数组中找到合适的idx*/
         for (idx = malloc_elem_free_list_index(size);
                            idx < RTE_HEAP_NUM_FREELISTS; idx++) {
                   /*在heap->free_head[idx]链表中找到合适的malloc_elem*/
                   for (elem = LIST_FIRST(&heap->free_head[idx]);
                                     !!elem; elem = LIST_NEXT(elem, free_list)) {
                            if (malloc_elem_can_hold(elem, size, align, bound)) {
                                     if (check_hugepage_sz(flags, elem->ms->hugepage_sz))
                                               return elem;
                                     if (alt_elem == NULL)
                                               alt_elem = elem;
                            }
                   }
         }
 
         if ((alt_elem != NULL) && (flags & RTE_MEMZONE_SIZE_HINT_ONLY))
                   return alt_elem;
 
         return NULL;
}

我们知道malloc_elem的组织结构是个二维的链表,如下图所示。所以第一步要找到合适的一维链表。也就是在struct malloc_heap->free_head数组中找到合适的idx。

我们在前面介绍过,struct malloc_heap->free_head数组的下标和数组中malloc_elem的大小有类似如下对应关系。所以malloc_elem_free_list_index就是返回能够满足申请大小size的最小的idx。

heap->free_head[0] - (0 , 2^8]

heap->free_head[1] - (2^8 , 2^10]

heap->free_head[2] - (2^10 ,2^12]

heap->free_head[3] - (2^12, 2^14]

heap->free_head[4] - (2^14, MAX_SIZE]

之后尝试heap->free_head[idx]上的malloc_elem分配内存,如果分配失败,再尝试更大一点的(idx++)。

下面malloc_elem_can_hold负责在heap->free_head[idx]找到一个合适的malloc_elem。而其内部只是调用了elem_start_pt。

elem_start_pt

static void *
elem_start_pt(struct malloc_elem *elem, size_t size, unsigned align,
                   size_t bound)
{
         const size_t bmask = ~(bound - 1);
         /*在debug模式下MALLOC_ELEM_TRAILER_LEN为cacheline大小,正常为0*/
         uintptr_t end_pt = (uintptr_t)elem +
                            elem->size - MALLOC_ELEM_TRAILER_LEN;
         uintptr_t new_data_start = RTE_ALIGN_FLOOR((end_pt - size), align);
         uintptr_t new_elem_start;
 
         /* check boundary */
         if ((new_data_start & bmask) != ((end_pt - 1) & bmask)) {
                   end_pt = RTE_ALIGN_FLOOR(end_pt, bound);
                   new_data_start = RTE_ALIGN_FLOOR((end_pt - size), align);
                   if (((end_pt - 1) & bmask) != (new_data_start & bmask))
                            return NULL;
         }
 
         new_elem_start = new_data_start - MALLOC_ELEM_HEADER_LEN;
 
         /* if the new start point is before the exist start, it won't fit */
         return (new_elem_start < (uintptr_t)elem) ? NULL : (void *)new_elem_start;
}

代码中的几个指针如下如所示,其本质就是在当前malloc_elem中尝试按照size分配一个新的malloc_elem,看下其起始地址是否越界。如果不越界就将当前malloc_elem返回(不是新的malloc_elem,这时还没有真的分配新malloc_elem)。

找到合适的malloc_elem后,就调用malloc_elem_alloc从此malloc_elem分配新的满足size大小的malloc_elem。

malloc_elem_alloc

struct malloc_elem *
malloc_elem_alloc(struct malloc_elem *elem, size_t size, unsigned align,
                   size_t bound)
{
         struct malloc_elem *new_elem = elem_start_pt(elem, size, align, bound);
         const size_t old_elem_size = (uintptr_t)new_elem - (uintptr_t)elem;
         /*trailer_size就是align-MALLOC_ELEM_TRAILER_LEN的大小,而MALLOC_ELEM_TRAILER_LEN在debug下为cacheline,否则为0*/
         const size_t trailer_size = elem->size - old_elem_size - size -
                   MALLOC_ELEM_OVERHEAD;
    /*将老的elem从链表中删除*/
         elem_free_list_remove(elem);
 
         if (trailer_size > MALLOC_ELEM_OVERHEAD + MIN_DATA_SIZE) {
                   /* split it, too much free space after elem */
                   struct malloc_elem *new_free_elem =
                                     RTE_PTR_ADD(new_elem, size + MALLOC_ELEM_OVERHEAD);
 
                   split_elem(elem, new_free_elem);
                   malloc_elem_free_list_insert(new_free_elem);
         }
 
    /*如果old_elem_size太小,就将老的elem状态设置为ELEM_BUSY*/
         if (old_elem_size < MALLOC_ELEM_OVERHEAD + MIN_DATA_SIZE) {
                   /* don't split it, pad the element instead */
                   elem->state = ELEM_BUSY;
                   elem->pad = old_elem_size;
 
                   /* put a dummy header in padding, to point to real element header */
                   if (elem->pad > 0){ /* pad will be at least 64-bytes, as everything
                                        * is cache-line aligned */
                            new_elem->pad = elem->pad;
                            new_elem->state = ELEM_PAD;
                            new_elem->size = elem->size - elem->pad;/*elem->size -old_elem_size*/
                            set_header(new_elem);
                   }
 
                   return new_elem;
         }
 
         /* we are going to split the element in two. The original element
          * remains free, and the new element is the one allocated.
          * Re-insert original element, in case its new size makes it
          * belong on a different list.
          */
         /*如果old_elem_size足够大则将原有的elem分隔成两个elem,分别设置elem,new_elem的size*/
         split_elem(elem, new_elem);
         new_elem->state = ELEM_BUSY;/*设置new_elem的状态*/
         malloc_elem_free_list_insert(elem);/*根据原有的elem调整后的size再找到合适的idx,将其插入heap->free_head[idx]*/
 
         return new_elem;
}

elem分裂前后对比如下图所示:

rte_free

rte_free的过程就是rte_malloc的逆过程,也就是上述分裂elem的逆过程,这里不再展开。

Tags:

最近发表
标签列表