dpdk从环形队列ring获取解析数据包,并使用hashtable统计

dpdk从环形队列ring获取解析数据包,并使用hashtable统计,第1张

简介

本文是上一篇的延续继dpdk从给定的port/queue捕获流量_晓镁的博客-CSDN博客

rte_ring_dequeue_burst

result = rte_ring_dequeue_burst(arg0,arg1,arg2,arg3)
将多个对象从一个环形队列ring取出,直到达到最大数量。
该函数调用多消费者或单消费者批量出队列,取决于在创建ring时指定的默认行为。

arg0为环形队列ring指针
arg1为 指用数据包填充的void*指针数组指针,示例为void* dequeued[]数组
arg2为从环出列到 obj_table 的对象数。
arg3如果如果非NULL,则为返回出队完成后剩余的环条目数。
返回值为出队的对象数。

#include 
#include 
#include 
#include 
#include 
#include 
#include 

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

#include "core_write.h"
#include "redis.h"
#define MIN(a,b) (((a)<(b))?(a):(b))

#define RTE_LOGTYPE_DPDKCAP RTE_LOGTYPE_USER1


#include 
#include 
#include 
#include "hash.h"
#include "redis.h"

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

#define RTE_LOGTYPE_DPDKCAP RTE_LOGTYPE_USER1


/*
 * process the packets form the ring
 */
int write_core(const struct core_write_config * config) {
  unsigned int packet_length, wire_packet_length = 0;
  int result;
  void* dequeued[DPDKCAP_WRITE_BURST_SIZE];
  struct rte_mbuf* bufptr;
  int retval = 0;
  //Init current time
  int i = 0;

  //Init stats
  *(config->stats) = (struct core_write_stats) {
    .core_id=rte_lcore_id(),
      .packets = 0,
      .bytes = 0,
      .compressed_bytes = 0,
  };


  RTE_LOG(INFO, DPDKCAP, "Core %d is process packet.\n",
      rte_lcore_id());

  for (;;) {
    if (unlikely(*(config->stop_condition))) {
      break;
    }

    //从环形队列ring获取packets
    result = rte_ring_dequeue_burst(config->ring,
        dequeued, DPDKCAP_WRITE_BURST_SIZE);
    if (result == 0) {
      continue;
    }

    //从环形ring取到数据计数
    config->stats->packets += result;

    for (i = 0; i < result; i++) {
      // 逐一处理数据包
      bufptr = dequeued[i];
  
      // 基于hashtable对数据包进行分析统计
      addPacket(config->table,bufptr);
  
      // 数据流量统计
      packet_length = wire_packet_length;
      config->stats->bytes += packet_length;

      // Free buffer
      rte_pktmbuf_free(bufptr);
    }
  }
  RTE_LOG(INFO, DPDKCAP, "count core %d\n", rte_lcore_id());
  return retval;
}

数据包分析hash表统计

addPacket()函数pkt为数据包结构体。

数据包分析时首先从以太头、ip报文、tcp/udp/icmp逐层offset偏移进行解包分析。
通过原目的ip计算哈希key值,value为struct info结构体,该结构体记录数据包分析结果。


如果哈希冲突怎么办?

如果哈希冲突,那么基于原有的哈希节点创建链表,把原有哈希节点的next指针指向冲突的节点。
这是典型的哈希冲突处理方式。

/* 计算hashtable下标 */
int getIndex(uint32_t src,uint32_t dst){
        uint32_t index = src+dst;
        return ((index>>16) + (index&65535))&65535;
}

/* 增加一个数据包 */
int addPacket(struct info **table,struct rte_mbuf *pkt){
	struct ipv4_hdr *ip;
	struct ether_hdr *eth_hdr = rte_pktmbuf_mtod_offset(pkt, struct ether_hdr *,0);
	ip = rte_pktmbuf_mtod_offset(pkt, struct ipv4_hdr *,sizeof(struct ether_hdr));
	/* 处理IPv4数据包 */
	if (eth_hdr->ether_type == 0x0008) {
		int index = getIndex(ip->src_addr,ip->dst_addr);
		struct info *p;
		/* 查找hash table中该数据包对应的节点 */
		if(table[index]==NULL){
			p = (struct info *)malloc(sizeof(struct info));
			p->src = ip->src_addr;
			p->dst = ip->dst_addr;
                        p->icmp = 0;
                        p->tcp = 0;
                        p->http = 0;
                        p->udp = 0;
                        p->syn = 0;
                        p->ack = 0;
                        p->count = 0;
                        p->other = 0;
			p->next = NULL;
			p->time = time(0);
			table[index] = p;
		}else{
			p = table[index];
			while(p->next != NULL){
				if(p->src == ip->src_addr && p->dst == ip->dst_addr){
					break;
				}
				p = p->next;
			}
			/* p是尾节点并且和当前数据包信息不匹配 */
			if(p->next != NULL){
				if(p->src != ip->src_addr || p->dst != ip->dst_addr){
					p->next =  (struct info *)malloc(sizeof(struct info));
					p = p->next;
					p->src = ip->src_addr;
					p->dst = ip->dst_addr;
					p->icmp = 0;
					p->tcp = 0;
					p->http = 0;
					p->udp = 0;
					p->syn = 0;
					p->ack = 0;
					p->count = 0;
					p->other = 0;
					p->time = time(0);
					p->next = NULL;
				}
			}
		}
		p->count += pkt->data_len;
		/* 处理tcp数据包 */
		if(ip->next_proto_id ==6){
			p->tcp++;
		struct tcp_hdr *tcp = rte_pktmbuf_mtod_offset(pkt, struct tcp_hdr *,
						   sizeof(struct ether_hdr)+sizeof(struct ipv4_hdr));
		if(ntohs(tcp->src_port) == 80 || ntohs(tcp->src_port) == 443 || ntohs(tcp->src_port) == 8080 || ntohs(tcp->dst_port)==443 || ntohs(tcp->dst_port)==80 || ntohs(tcp->dst_port)==8080){
			p->http++;
		}
			if(tcp->tcp_flags & 2){
				p->syn++;
			}else if(tcp->tcp_flags & 16){
				p->ack++;
			}
		/* 处理udp数据包 */
		}else if(ip->next_proto_id ==17){
			p->udp++;
		/* 处理icmp数据包 */
		}else if(ip->next_proto_id ==1){
			p->icmp++;
		/* 其它协议的包 */
		}else{
			p->other++;
		}
	} else if (RTE_ETH_IS_IPV6_HDR(pkt->packet_type)) {
		/* Fill acl structure */

	} else {
		/* Unknown type, drop the packet */
	}
	return 0;
}

void initTable(struct info **table){
	int i;
	for(i=0;i

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/717564.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-25
下一篇 2022-04-25

发表评论

登录后才能评论

评论列表(0条)