multiple interface input queues

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

multiple interface input queues

David Gwynne-5
this replaces the single mbuf_queue and task in struct ifnet with
a new ifiqueue structure modelled on ifqueues.

the main motivation behind this was to show mpsafe input counters.

ifiqueues, like ifqueues, allow a driver to configure multiple
queueus. this in turn allows a driver with multiple rx rings to
have them queue and account for packets independently.

ifiq_input generally replaces if_input. if_input now simply queues
on ifnets builtin ifiqueue (ifp->if_rcv) to support the current set
of drivers that only configure a single rx context/ring.

it is up to the driver to ensure that an ifiqueue is not used
concurrently. in practice this should be easy cos nics generally
only rx packets from processing a single ring from a single interrupt
source.

ifiq counters are updated and read using the same semantic as percpu
counters, ie, there's a generation number updated before and after
a counter update. this means writers dont block, but a reader may
loop a couple of times waiting for a writer to finish. readers call
yield(), which causes splasserts to fire if if_getdata is still
holding the kernel lock.

ifiq_input is set up to interact with the interface rx ring moderation
code (ie, if_rxring code). you pass what the current rxr watermark
is, and it will look at the backlog of packets on that ifiq to
determine if the ring is producing too fast. if it is, itll return
1 to slow down packet reception. i have a later diff that adds
functionality to the if_rxring bits so a driver can say say a ring
is livelocked, rather than relying on the system to detect it. so
drv_rxeof would have the following at the end of it:

        if (ifiq_input(rxr->ifiq, &ml, if_rxr_cwm(&rxr->rx_ring))
                if_rxr_livelocked(&rxr->rx_ring);
        drv_rx_refill(rxr);

ive run with that on a hacked up ix(4) that runs with 8 rx rings,
and this works pretty well. if you're doing a single stream, you
see one rx ring grow the number of descs it will handle, but if you
flood it with a range of traffic you'll see that one ring scale
down and balance out with the rest of the rings. it turns out you
can still get reasonable throughput even if the ifiqs are dynamically
scaling themselves to only 100 packets. however, the interactivity
of the system improves a lot.

currently if one interface is being DoSed, it'll end up with 8192
packet on if_inputqueue. that takes a while to process those packets,
which blocks the processing of packets from the other interface.
by scaling the input queues to relatively small counts, softnet can
service packets frmo other interfaces sooner.

Index: if.c
===================================================================
RCS file: /cvs/src/sys/net/if.c,v
retrieving revision 1.527
diff -u -p -r1.527 if.c
--- if.c 14 Nov 2017 04:08:11 -0000 1.527
+++ if.c 14 Nov 2017 04:18:41 -0000
@@ -156,7 +156,6 @@ int if_group_egress_build(void);
 
 void if_watchdog_task(void *);
 
-void if_input_process(void *);
 void if_netisr(void *);
 
 #ifdef DDB
@@ -437,8 +436,6 @@ if_attachsetup(struct ifnet *ifp)
 
  ifidx = ifp->if_index;
 
- mq_init(&ifp->if_inputqueue, 8192, IPL_NET);
- task_set(ifp->if_inputtask, if_input_process, (void *)ifidx);
  task_set(ifp->if_watchdogtask, if_watchdog_task, (void *)ifidx);
  task_set(ifp->if_linkstatetask, if_linkstate_task, (void *)ifidx);
 
@@ -563,6 +560,30 @@ if_attach_queues(struct ifnet *ifp, unsi
 }
 
 void
+if_attach_iqueues(struct ifnet *ifp, unsigned int niqs)
+{
+ struct ifiqueue **map;
+ struct ifiqueue *ifiq;
+ unsigned int i;
+
+ KASSERT(niqs != 0);
+
+ map = mallocarray(niqs, sizeof(*map), M_DEVBUF, M_WAITOK);
+
+ ifp->if_rcv.ifiq_softc = NULL;
+ map[0] = &ifp->if_rcv;
+
+ for (i = 1; i < niqs; i++) {
+ ifiq = malloc(sizeof(*ifiq), M_DEVBUF, M_WAITOK|M_ZERO);
+ ifiq_init(ifiq, ifp, i);
+ map[i] = ifiq;
+ }
+
+ ifp->if_iqs = map;
+ ifp->if_niqs = niqs;
+}
+
+void
 if_attach_common(struct ifnet *ifp)
 {
  KASSERT(ifp->if_ioctl != NULL);
@@ -587,6 +608,12 @@ if_attach_common(struct ifnet *ifp)
  ifp->if_ifqs = ifp->if_snd.ifq_ifqs;
  ifp->if_nifqs = 1;
 
+ ifiq_init(&ifp->if_rcv, ifp, 0);
+
+ ifp->if_rcv.ifiq_ifiqs[0] = &ifp->if_rcv;
+ ifp->if_iqs = ifp->if_rcv.ifiq_ifiqs;
+ ifp->if_niqs = 1;
+
  ifp->if_addrhooks = malloc(sizeof(*ifp->if_addrhooks),
     M_TEMP, M_WAITOK);
  TAILQ_INIT(ifp->if_addrhooks);
@@ -605,8 +632,6 @@ if_attach_common(struct ifnet *ifp)
     M_TEMP, M_WAITOK|M_ZERO);
  ifp->if_linkstatetask = malloc(sizeof(*ifp->if_linkstatetask),
     M_TEMP, M_WAITOK|M_ZERO);
- ifp->if_inputtask = malloc(sizeof(*ifp->if_inputtask),
-    M_TEMP, M_WAITOK|M_ZERO);
  ifp->if_llprio = IFQ_DEFPRIO;
 
  SRPL_INIT(&ifp->if_inputs);
@@ -694,47 +719,7 @@ if_enqueue(struct ifnet *ifp, struct mbu
 void
 if_input(struct ifnet *ifp, struct mbuf_list *ml)
 {
- struct mbuf *m;
- size_t ibytes = 0;
-#if NBPFILTER > 0
- caddr_t if_bpf;
-#endif
-
- if (ml_empty(ml))
- return;
-
- MBUF_LIST_FOREACH(ml, m) {
- m->m_pkthdr.ph_ifidx = ifp->if_index;
- m->m_pkthdr.ph_rtableid = ifp->if_rdomain;
- ibytes += m->m_pkthdr.len;
- }
-
- ifp->if_ipackets += ml_len(ml);
- ifp->if_ibytes += ibytes;
-
-#if NBPFILTER > 0
- if_bpf = ifp->if_bpf;
- if (if_bpf) {
- struct mbuf_list ml0;
-
- ml_init(&ml0);
- ml_enlist(&ml0, ml);
- ml_init(ml);
-
- while ((m = ml_dequeue(&ml0)) != NULL) {
- if (bpf_mtap_ether(if_bpf, m, BPF_DIRECTION_IN))
- m_freem(m);
- else
- ml_enqueue(ml, m);
- }
-
- if (ml_empty(ml))
- return;
- }
-#endif
-
- if (mq_enlist(&ifp->if_inputqueue, ml) == 0)
- task_add(net_tq(ifp->if_index), ifp->if_inputtask);
+ ifiq_input(&ifp->if_rcv, ml, 2048);
 }
 
 int
@@ -789,6 +774,24 @@ if_input_local(struct ifnet *ifp, struct
  return (0);
 }
 
+int
+if_output_local(struct ifnet *ifp, struct mbuf *m, sa_family_t af)
+{
+ struct ifiqueue *ifiq;
+ unsigned int flow = 0;
+
+ m->m_pkthdr.ph_family = af;
+ m->m_pkthdr.ph_ifidx = ifp->if_index;
+ m->m_pkthdr.ph_rtableid = ifp->if_rdomain;
+
+ if (ISSET(m->m_pkthdr.ph_flowid, M_FLOWID_VALID))
+ flow = m->m_pkthdr.ph_flowid & M_FLOWID_MASK;
+
+ ifiq = ifp->if_iqs[flow % ifp->if_niqs];
+
+ return (ifiq_enqueue(ifiq, m) == 0 ? 0 : ENOBUFS);
+}
+
 struct ifih {
  SRPL_ENTRY(ifih)  ifih_next;
  int (*ifih_input)(struct ifnet *, struct mbuf *,
@@ -873,26 +876,18 @@ if_ih_remove(struct ifnet *ifp, int (*in
 }
 
 void
-if_input_process(void *xifidx)
+if_input_process(struct ifnet *ifp, struct mbuf_list *ml)
 {
- unsigned int ifidx = (unsigned long)xifidx;
- struct mbuf_list ml;
  struct mbuf *m;
- struct ifnet *ifp;
  struct ifih *ifih;
  struct srp_ref sr;
  int s;
 
- ifp = if_get(ifidx);
- if (ifp == NULL)
+ if (ml_empty(ml))
  return;
 
- mq_delist(&ifp->if_inputqueue, &ml);
- if (ml_empty(&ml))
- goto out;
-
  if (!ISSET(ifp->if_xflags, IFXF_CLONED))
- add_net_randomness(ml_len(&ml));
+ add_net_randomness(ml_len(ml));
 
  /*
  * We grab the NET_LOCK() before processing any packet to
@@ -908,7 +903,7 @@ if_input_process(void *xifidx)
  */
  NET_RLOCK();
  s = splnet();
- while ((m = ml_dequeue(&ml)) != NULL) {
+ while ((m = ml_dequeue(ml)) != NULL) {
  /*
  * Pass this mbuf to all input handlers of its
  * interface until it is consumed.
@@ -924,8 +919,6 @@ if_input_process(void *xifidx)
  }
  splx(s);
  NET_RUNLOCK();
-out:
- if_put(ifp);
 }
 
 void
@@ -1033,10 +1026,6 @@ if_detach(struct ifnet *ifp)
  ifp->if_ioctl = if_detached_ioctl;
  ifp->if_watchdog = NULL;
 
- /* Remove the input task */
- task_del(net_tq(ifp->if_index), ifp->if_inputtask);
- mq_purge(&ifp->if_inputqueue);
-
  /* Remove the watchdog timeout & task */
  timeout_del(ifp->if_slowtimo);
  task_del(net_tq(ifp->if_index), ifp->if_watchdogtask);
@@ -1090,7 +1079,6 @@ if_detach(struct ifnet *ifp)
  free(ifp->if_slowtimo, M_TEMP, sizeof(*ifp->if_slowtimo));
  free(ifp->if_watchdogtask, M_TEMP, sizeof(*ifp->if_watchdogtask));
  free(ifp->if_linkstatetask, M_TEMP, sizeof(*ifp->if_linkstatetask));
- free(ifp->if_inputtask, M_TEMP, sizeof(*ifp->if_inputtask));
 
  for (i = 0; (dp = domains[i]) != NULL; i++) {
  if (dp->dom_ifdetach && ifp->if_afdata[dp->dom_family])
@@ -1113,6 +1101,17 @@ if_detach(struct ifnet *ifp)
  free(ifp->if_ifqs, M_DEVBUF,
     sizeof(struct ifqueue *) * ifp->if_nifqs);
  }
+
+ for (i = 0; i < ifp->if_niqs; i++)
+ ifiq_destroy(ifp->if_iqs[i]);
+ if (ifp->if_iqs != ifp->if_rcv.ifiq_ifiqs) {
+ for (i = 1; i < ifp->if_niqs; i++) {
+ free(ifp->if_iqs[i], M_DEVBUF,
+    sizeof(struct ifiqueue));
+ }
+ free(ifp->if_iqs, M_DEVBUF,
+    sizeof(struct ifiqueue *) * ifp->if_niqs);
+ }
 }
 
 /*
@@ -2280,11 +2279,21 @@ if_getdata(struct ifnet *ifp, struct if_
 
  *data = ifp->if_data;
 
+ NET_UNLOCK();
+
  for (i = 0; i < ifp->if_nifqs; i++) {
  struct ifqueue *ifq = ifp->if_ifqs[i];
 
  ifq_add_data(ifq, data);
  }
+
+ for (i = 0; i < ifp->if_niqs; i++) {
+ struct ifiqueue *ifiq = ifp->if_iqs[i];
+
+ ifiq_add_data(ifiq, data);
+ }
+
+ NET_LOCK();
 }
 
 /*
Index: if_var.h
===================================================================
RCS file: /cvs/src/sys/net/if_var.h,v
retrieving revision 1.83
diff -u -p -r1.83 if_var.h
--- if_var.h 31 Oct 2017 22:05:12 -0000 1.83
+++ if_var.h 14 Nov 2017 04:18:41 -0000
@@ -140,8 +140,6 @@ struct ifnet { /* and the entries */
  struct task *if_linkstatetask; /* task to do route updates */
 
  /* procedure handles */
- struct mbuf_queue if_inputqueue;
- struct task *if_inputtask; /* input task */
  SRPL_HEAD(, ifih) if_inputs; /* input routines (dequeue) */
 
  /* output routine (enqueue) */
@@ -164,6 +162,10 @@ struct ifnet { /* and the entries */
  void (*if_qstart)(struct ifqueue *);
  unsigned int if_nifqs;
 
+ struct ifiqueue if_rcv; /* rx/input queue */
+ struct ifiqueue **if_iqs;      /* pointer to the array of iqs */
+ unsigned int if_niqs;
+
  struct sockaddr_dl *if_sadl; /* pointer to our sockaddr_dl */
 
  void *if_afdata[AF_MAX];
@@ -303,7 +305,9 @@ void if_start(struct ifnet *);
 int if_enqueue_try(struct ifnet *, struct mbuf *);
 int if_enqueue(struct ifnet *, struct mbuf *);
 void if_input(struct ifnet *, struct mbuf_list *);
+void if_input_process(struct ifnet *, struct mbuf_list *);
 int if_input_local(struct ifnet *, struct mbuf *, sa_family_t);
+int if_output_local(struct ifnet *, struct mbuf *, sa_family_t);
 void if_rtrequest_dummy(struct ifnet *, int, struct rtentry *);
 void p2p_rtrequest(struct ifnet *, int, struct rtentry *);
 
Index: ifq.c
===================================================================
RCS file: /cvs/src/sys/net/ifq.c,v
retrieving revision 1.14
diff -u -p -r1.14 ifq.c
--- ifq.c 14 Nov 2017 04:08:11 -0000 1.14
+++ ifq.c 14 Nov 2017 04:18:41 -0000
@@ -16,15 +16,22 @@
  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  */
 
+#include "bpfilter.h"
+
 #include <sys/param.h>
 #include <sys/systm.h>
 #include <sys/socket.h>
 #include <sys/mbuf.h>
 #include <sys/proc.h>
+#include <sys/atomic.h>
 
 #include <net/if.h>
 #include <net/if_var.h>
 
+#if NBPFILTER > 0
+#include <net/bpf.h>
+#endif
+
 /*
  * priq glue
  */
@@ -457,6 +464,223 @@ ifq_mfreeml(struct ifqueue *ifq, struct
  ifq->ifq_len -= ml_len(ml);
  ifq->ifq_qdrops += ml_len(ml);
  ml_enlist(&ifq->ifq_free, ml);
+}
+
+/*
+ * ifiq
+ */
+
+struct ifiq_ref {
+ unsigned int gen;
+}; /* __upunused */
+
+static void ifiq_process(void *);
+
+void
+ifiq_init(struct ifiqueue *ifiq, struct ifnet *ifp, unsigned int idx)
+{
+ ifiq->ifiq_if = ifp;
+ ifiq->ifiq_softnet = net_tq(ifp->if_index); /* + idx */
+ ifiq->ifiq_softc = NULL;
+
+ mtx_init(&ifiq->ifiq_mtx, IPL_NET);
+ ml_init(&ifiq->ifiq_ml);
+ task_set(&ifiq->ifiq_task, ifiq_process, ifiq);
+
+ ifiq->ifiq_qdrops = 0;
+ ifiq->ifiq_packets = 0;
+ ifiq->ifiq_bytes = 0;
+ ifiq->ifiq_qdrops = 0;
+ ifiq->ifiq_errors = 0;
+
+ ifiq->ifiq_idx = idx;
+}
+
+void
+ifiq_destroy(struct ifiqueue *ifiq)
+{
+ if (!task_del(ifiq->ifiq_softnet, &ifiq->ifiq_task)) {
+ int netlocked = (rw_status(&netlock) == RW_WRITE);
+
+ if (netlocked) /* XXXSMP breaks atomicity */
+ NET_UNLOCK();
+
+ taskq_barrier(ifiq->ifiq_softnet);
+
+ if (netlocked)
+ NET_LOCK();
+ }
+
+ /* don't need to lock because this is the last use of the ifiq */
+ ml_purge(&ifiq->ifiq_ml);
+}
+
+static inline void
+ifiq_enter(struct ifiq_ref *ref, struct ifiqueue *ifiq)
+{
+ ref->gen = ++ifiq->ifiq_gen;
+ membar_producer();
+}
+
+static inline void
+ifiq_leave(struct ifiq_ref *ref, struct ifiqueue *ifiq)
+{
+ membar_producer();
+ ifiq->ifiq_gen = ++ref->gen;
+}
+
+static inline void
+ifiq_count(struct ifiqueue *ifiq, uint64_t packets, uint64_t bytes)
+{
+ struct ifiq_ref ref;
+
+ ifiq_enter(&ref, ifiq);
+ ifiq->ifiq_packets += packets;
+ ifiq->ifiq_bytes += bytes;
+ ifiq_leave(&ref, ifiq);
+}
+
+static inline void
+ifiq_qdrop(struct ifiqueue *ifiq, struct mbuf_list *ml)
+{
+ struct ifiq_ref ref;
+ unsigned int qdrops;
+
+ qdrops = ml_purge(ml);
+
+ ifiq_enter(&ref, ifiq);
+ ifiq->ifiq_qdrops += qdrops;
+ ifiq_leave(&ref, ifiq);
+}
+
+int
+ifiq_input(struct ifiqueue *ifiq, struct mbuf_list *ml, unsigned int cwm)
+{
+ struct ifnet *ifp = ifiq->ifiq_if;
+ struct mbuf *m;
+ uint64_t bytes = 0;
+#if NBPFILTER > 0
+ caddr_t if_bpf;
+#endif
+ int rv;
+
+ if (ml_empty(ml))
+ return (0);
+
+ MBUF_LIST_FOREACH(ml, m) {
+ m->m_pkthdr.ph_ifidx = ifp->if_index;
+ m->m_pkthdr.ph_rtableid = ifp->if_rdomain;
+ bytes += m->m_pkthdr.len;
+ }
+
+ ifiq_count(ifiq, ml_len(ml), bytes);
+
+#if NBPFILTER > 0
+ if_bpf = ifp->if_bpf;
+ if (if_bpf) {
+ struct mbuf_list ml0 = *ml;
+
+ ml_init(ml);
+
+ while ((m = ml_dequeue(&ml0)) != NULL) {
+ if (bpf_mtap_ether(if_bpf, m, BPF_DIRECTION_IN))
+ m_freem(m);
+ else
+ ml_enqueue(ml, m);
+ }
+
+ if (ml_empty(ml))
+ return (0);
+ }
+#endif
+
+ if (ifiq_len(ifiq) >= cwm * 5) {
+ /* the backlock is way too high, so drop these packets */
+ ifiq_qdrop(ifiq, ml);
+ return (1); /* tell the caller to slow down */
+ }
+
+ /* tell the caller to slow down if the backlock is getting high */
+ rv = (ifiq_len(ifiq) >= cwm * 3);
+
+ mtx_enter(&ifiq->ifiq_mtx);
+ ml_enlist(&ifiq->ifiq_ml, ml);
+ mtx_leave(&ifiq->ifiq_mtx);
+
+ task_add(ifiq->ifiq_softnet, &ifiq->ifiq_task);
+
+ return (rv);
+}
+
+void
+ifiq_add_data(struct ifiqueue *ifiq, struct if_data *data)
+{
+ unsigned int enter, leave;
+ uint64_t packets, bytes, qdrops;
+
+ enter = ifiq->ifiq_gen;
+ for (;;) {
+ /* the generation number is odd during an update */
+ while (enter & 1) {
+ yield();
+ enter = ifiq->ifiq_gen;
+ }
+
+ membar_consumer();
+ packets = ifiq->ifiq_packets;
+ bytes = ifiq->ifiq_bytes;
+ qdrops = ifiq->ifiq_qdrops;
+ membar_consumer();
+
+ leave = ifiq->ifiq_gen;
+
+ if (enter == leave)
+ break;
+
+ enter = leave;
+ }
+
+ data->ifi_ipackets += packets;
+ data->ifi_ibytes += bytes;
+ data->ifi_iqdrops += qdrops;
+}
+
+void
+ifiq_barrier(struct ifiqueue *ifiq)
+{
+ if (!task_del(ifiq->ifiq_softnet, &ifiq->ifiq_task))
+ taskq_barrier(ifiq->ifiq_softnet);
+}
+
+int
+ifiq_enqueue(struct ifiqueue *ifiq, struct mbuf *m)
+{
+ /* this can be called from anywhere at any time, so must lock */
+
+ mtx_enter(&ifiq->ifiq_mtx);
+ ml_enqueue(&ifiq->ifiq_ml, m);
+ mtx_leave(&ifiq->ifiq_mtx);
+
+ task_add(ifiq->ifiq_softnet, &ifiq->ifiq_task);
+
+ return (0);
+}
+
+static void
+ifiq_process(void *arg)
+{
+ struct ifiqueue *ifiq = arg;
+ struct mbuf_list ml;
+
+ if (ifiq_empty(ifiq))
+ return;
+
+ mtx_enter(&ifiq->ifiq_mtx);
+ ml = ifiq->ifiq_ml;
+ ml_init(&ifiq->ifiq_ml);
+ mtx_leave(&ifiq->ifiq_mtx);
+
+ if_input_process(ifiq->ifiq_if, &ml);
 }
 
 /*
Index: ifq.h
===================================================================
RCS file: /cvs/src/sys/net/ifq.h,v
retrieving revision 1.15
diff -u -p -r1.15 ifq.h
--- ifq.h 14 Nov 2017 04:08:11 -0000 1.15
+++ ifq.h 14 Nov 2017 04:18:41 -0000
@@ -69,6 +69,34 @@ struct ifqueue {
  unsigned int ifq_idx;
 };
 
+struct ifiqueue {
+ struct ifnet *ifiq_if;
+ struct taskq *ifiq_softnet;
+ union {
+ void *_ifiq_softc;
+ struct ifiqueue *_ifiq_ifiqs[1];
+ } _ifiq_ptr;
+#define ifiq_softc _ifiq_ptr._ifiq_softc
+#define ifiq_ifiqs _ifiq_ptr._ifiq_ifiqs
+
+ struct mutex ifiq_mtx;
+ struct mbuf_list ifiq_ml;
+ struct task ifiq_task;
+
+ /* counters */
+ unsigned int ifiq_gen;
+
+ uint64_t ifiq_packets;
+ uint64_t ifiq_bytes;
+ uint64_t ifiq_qdrops;
+ uint64_t ifiq_errors;
+ uint64_t ifiq_mcasts;
+ uint64_t ifiq_noproto;
+
+ /* properties */
+ unsigned int ifiq_idx;
+};
+
 #ifdef _KERNEL
 
 #define IFQ_MAXLEN 256
@@ -432,6 +460,18 @@ ifq_idx(struct ifqueue *ifq, unsigned in
 #define IFQ_ASSERT_SERIALIZED(_ifq) KASSERT(ifq_is_serialized(_ifq))
 
 extern const struct ifq_ops * const ifq_priq_ops;
+
+/* ifiq */
+
+void ifiq_init(struct ifiqueue *, struct ifnet *, unsigned int);
+void ifiq_destroy(struct ifiqueue *);
+int ifiq_input(struct ifiqueue *, struct mbuf_list *,
+     unsigned int);
+int ifiq_enqueue(struct ifiqueue *, struct mbuf *);
+void ifiq_add_data(struct ifiqueue *, struct if_data *);
+
+#define ifiq_len(_ifiq) ml_len(&(_ifiq)->ifiq_ml)
+#define ifiq_empty(_ifiq) ml_empty(&(_ifiq)->ifiq_ml)
 
 #endif /* _KERNEL */
 
Index: if_loop.c
===================================================================
RCS file: /cvs/src/sys/net/if_loop.c,v
retrieving revision 1.83
diff -u -p -r1.83 if_loop.c
--- if_loop.c 31 Oct 2017 22:05:12 -0000 1.83
+++ if_loop.c 14 Nov 2017 04:18:41 -0000
@@ -241,12 +241,7 @@ looutput(struct ifnet *ifp, struct mbuf
  if ((m->m_flags & M_LOOP) == 0)
  return (if_input_local(ifp, m, dst->sa_family));
 
- m->m_pkthdr.ph_family = dst->sa_family;
- if (mq_enqueue(&ifp->if_inputqueue, m))
- return ENOBUFS;
- task_add(net_tq(ifp->if_index), ifp->if_inputtask);
-
- return (0);
+ return (if_output_local(ifp, m, dst->sa_family));
 }
 
 void

Reply | Threaded
Open this post in threaded view
|

Re: multiple interface input queues

Hrvoje Popovski
On 14.11.2017. 5:42, David Gwynne wrote:
> this replaces the single mbuf_queue and task in struct ifnet with
> a new ifiqueue structure modelled on ifqueues.


i've tested this diff with ix, myx, em and bge with down/up interfaces
and everything is working fine ...

Reply | Threaded
Open this post in threaded view
|

Re: multiple interface input queues

David Gwynne-5
In reply to this post by David Gwynne-5
On Tue, Nov 14, 2017 at 02:42:30PM +1000, David Gwynne wrote:

> this replaces the single mbuf_queue and task in struct ifnet with
> a new ifiqueue structure modelled on ifqueues.
>
> the main motivation behind this was to show mpsafe input counters.
>
> ifiqueues, like ifqueues, allow a driver to configure multiple
> queueus. this in turn allows a driver with multiple rx rings to
> have them queue and account for packets independently.
>
> ifiq_input generally replaces if_input. if_input now simply queues
> on ifnets builtin ifiqueue (ifp->if_rcv) to support the current set
> of drivers that only configure a single rx context/ring.
>
> it is up to the driver to ensure that an ifiqueue is not used
> concurrently. in practice this should be easy cos nics generally
> only rx packets from processing a single ring from a single interrupt
> source.
>
> ifiq counters are updated and read using the same semantic as percpu
> counters, ie, there's a generation number updated before and after
> a counter update. this means writers dont block, but a reader may
> loop a couple of times waiting for a writer to finish. readers call
> yield(), which causes splasserts to fire if if_getdata is still
> holding the kernel lock.
>
> ifiq_input is set up to interact with the interface rx ring moderation
> code (ie, if_rxring code). you pass what the current rxr watermark
> is, and it will look at the backlog of packets on that ifiq to
> determine if the ring is producing too fast. if it is, itll return
> 1 to slow down packet reception. i have a later diff that adds
> functionality to the if_rxring bits so a driver can say say a ring
> is livelocked, rather than relying on the system to detect it. so
> drv_rxeof would have the following at the end of it:
>
> if (ifiq_input(rxr->ifiq, &ml, if_rxr_cwm(&rxr->rx_ring))
> if_rxr_livelocked(&rxr->rx_ring);
> drv_rx_refill(rxr);
>
> ive run with that on a hacked up ix(4) that runs with 8 rx rings,
> and this works pretty well. if you're doing a single stream, you
> see one rx ring grow the number of descs it will handle, but if you
> flood it with a range of traffic you'll see that one ring scale
> down and balance out with the rest of the rings. it turns out you
> can still get reasonable throughput even if the ifiqs are dynamically
> scaling themselves to only 100 packets. however, the interactivity
> of the system improves a lot.
>
> currently if one interface is being DoSed, it'll end up with 8192
> packet on if_inputqueue. that takes a while to process those packets,
> which blocks the processing of packets from the other interface.
> by scaling the input queues to relatively small counts, softnet can
> service packets frmo other interfaces sooner.

two things occurred to me while driving home today.

firstly, while it is true that hardware rings are the only source
of packets for an input queue, this is not true for pseudo drivers,
eg, vlan, gre, gif, etc. packets can spontaneously come in from
anywhere. the result of this is that ifiq_gen dance wont work,
particularly if it gets stuck with an odd value, which will block
readers indefinitely.

secondly, the ifiq_gen stuff exists to avoid atomic ops, but
we use a mutex to protect the list of mbufs.

the diff below gets rid of the ifiq_gen stuff and uses the ifiq_mutex
to protect the counters as well as the mbuf list.

Index: if.c
===================================================================
RCS file: /cvs/src/sys/net/if.c,v
retrieving revision 1.527
diff -u -p -r1.527 if.c
--- if.c 14 Nov 2017 04:08:11 -0000 1.527
+++ if.c 14 Nov 2017 10:51:24 -0000
@@ -156,7 +156,6 @@ int if_group_egress_build(void);
 
 void if_watchdog_task(void *);
 
-void if_input_process(void *);
 void if_netisr(void *);
 
 #ifdef DDB
@@ -437,8 +436,6 @@ if_attachsetup(struct ifnet *ifp)
 
  ifidx = ifp->if_index;
 
- mq_init(&ifp->if_inputqueue, 8192, IPL_NET);
- task_set(ifp->if_inputtask, if_input_process, (void *)ifidx);
  task_set(ifp->if_watchdogtask, if_watchdog_task, (void *)ifidx);
  task_set(ifp->if_linkstatetask, if_linkstate_task, (void *)ifidx);
 
@@ -563,6 +560,30 @@ if_attach_queues(struct ifnet *ifp, unsi
 }
 
 void
+if_attach_iqueues(struct ifnet *ifp, unsigned int niqs)
+{
+ struct ifiqueue **map;
+ struct ifiqueue *ifiq;
+ unsigned int i;
+
+ KASSERT(niqs != 0);
+
+ map = mallocarray(niqs, sizeof(*map), M_DEVBUF, M_WAITOK);
+
+ ifp->if_rcv.ifiq_softc = NULL;
+ map[0] = &ifp->if_rcv;
+
+ for (i = 1; i < niqs; i++) {
+ ifiq = malloc(sizeof(*ifiq), M_DEVBUF, M_WAITOK|M_ZERO);
+ ifiq_init(ifiq, ifp, i);
+ map[i] = ifiq;
+ }
+
+ ifp->if_iqs = map;
+ ifp->if_niqs = niqs;
+}
+
+void
 if_attach_common(struct ifnet *ifp)
 {
  KASSERT(ifp->if_ioctl != NULL);
@@ -587,6 +608,12 @@ if_attach_common(struct ifnet *ifp)
  ifp->if_ifqs = ifp->if_snd.ifq_ifqs;
  ifp->if_nifqs = 1;
 
+ ifiq_init(&ifp->if_rcv, ifp, 0);
+
+ ifp->if_rcv.ifiq_ifiqs[0] = &ifp->if_rcv;
+ ifp->if_iqs = ifp->if_rcv.ifiq_ifiqs;
+ ifp->if_niqs = 1;
+
  ifp->if_addrhooks = malloc(sizeof(*ifp->if_addrhooks),
     M_TEMP, M_WAITOK);
  TAILQ_INIT(ifp->if_addrhooks);
@@ -605,8 +632,6 @@ if_attach_common(struct ifnet *ifp)
     M_TEMP, M_WAITOK|M_ZERO);
  ifp->if_linkstatetask = malloc(sizeof(*ifp->if_linkstatetask),
     M_TEMP, M_WAITOK|M_ZERO);
- ifp->if_inputtask = malloc(sizeof(*ifp->if_inputtask),
-    M_TEMP, M_WAITOK|M_ZERO);
  ifp->if_llprio = IFQ_DEFPRIO;
 
  SRPL_INIT(&ifp->if_inputs);
@@ -694,47 +719,7 @@ if_enqueue(struct ifnet *ifp, struct mbu
 void
 if_input(struct ifnet *ifp, struct mbuf_list *ml)
 {
- struct mbuf *m;
- size_t ibytes = 0;
-#if NBPFILTER > 0
- caddr_t if_bpf;
-#endif
-
- if (ml_empty(ml))
- return;
-
- MBUF_LIST_FOREACH(ml, m) {
- m->m_pkthdr.ph_ifidx = ifp->if_index;
- m->m_pkthdr.ph_rtableid = ifp->if_rdomain;
- ibytes += m->m_pkthdr.len;
- }
-
- ifp->if_ipackets += ml_len(ml);
- ifp->if_ibytes += ibytes;
-
-#if NBPFILTER > 0
- if_bpf = ifp->if_bpf;
- if (if_bpf) {
- struct mbuf_list ml0;
-
- ml_init(&ml0);
- ml_enlist(&ml0, ml);
- ml_init(ml);
-
- while ((m = ml_dequeue(&ml0)) != NULL) {
- if (bpf_mtap_ether(if_bpf, m, BPF_DIRECTION_IN))
- m_freem(m);
- else
- ml_enqueue(ml, m);
- }
-
- if (ml_empty(ml))
- return;
- }
-#endif
-
- if (mq_enlist(&ifp->if_inputqueue, ml) == 0)
- task_add(net_tq(ifp->if_index), ifp->if_inputtask);
+ ifiq_input(&ifp->if_rcv, ml, 2048);
 }
 
 int
@@ -789,6 +774,24 @@ if_input_local(struct ifnet *ifp, struct
  return (0);
 }
 
+int
+if_output_local(struct ifnet *ifp, struct mbuf *m, sa_family_t af)
+{
+ struct ifiqueue *ifiq;
+ unsigned int flow = 0;
+
+ m->m_pkthdr.ph_family = af;
+ m->m_pkthdr.ph_ifidx = ifp->if_index;
+ m->m_pkthdr.ph_rtableid = ifp->if_rdomain;
+
+ if (ISSET(m->m_pkthdr.ph_flowid, M_FLOWID_VALID))
+ flow = m->m_pkthdr.ph_flowid & M_FLOWID_MASK;
+
+ ifiq = ifp->if_iqs[flow % ifp->if_niqs];
+
+ return (ifiq_enqueue(ifiq, m) == 0 ? 0 : ENOBUFS);
+}
+
 struct ifih {
  SRPL_ENTRY(ifih)  ifih_next;
  int (*ifih_input)(struct ifnet *, struct mbuf *,
@@ -873,26 +876,18 @@ if_ih_remove(struct ifnet *ifp, int (*in
 }
 
 void
-if_input_process(void *xifidx)
+if_input_process(struct ifnet *ifp, struct mbuf_list *ml)
 {
- unsigned int ifidx = (unsigned long)xifidx;
- struct mbuf_list ml;
  struct mbuf *m;
- struct ifnet *ifp;
  struct ifih *ifih;
  struct srp_ref sr;
  int s;
 
- ifp = if_get(ifidx);
- if (ifp == NULL)
+ if (ml_empty(ml))
  return;
 
- mq_delist(&ifp->if_inputqueue, &ml);
- if (ml_empty(&ml))
- goto out;
-
  if (!ISSET(ifp->if_xflags, IFXF_CLONED))
- add_net_randomness(ml_len(&ml));
+ add_net_randomness(ml_len(ml));
 
  /*
  * We grab the NET_LOCK() before processing any packet to
@@ -908,7 +903,7 @@ if_input_process(void *xifidx)
  */
  NET_RLOCK();
  s = splnet();
- while ((m = ml_dequeue(&ml)) != NULL) {
+ while ((m = ml_dequeue(ml)) != NULL) {
  /*
  * Pass this mbuf to all input handlers of its
  * interface until it is consumed.
@@ -924,8 +919,6 @@ if_input_process(void *xifidx)
  }
  splx(s);
  NET_RUNLOCK();
-out:
- if_put(ifp);
 }
 
 void
@@ -1033,10 +1026,6 @@ if_detach(struct ifnet *ifp)
  ifp->if_ioctl = if_detached_ioctl;
  ifp->if_watchdog = NULL;
 
- /* Remove the input task */
- task_del(net_tq(ifp->if_index), ifp->if_inputtask);
- mq_purge(&ifp->if_inputqueue);
-
  /* Remove the watchdog timeout & task */
  timeout_del(ifp->if_slowtimo);
  task_del(net_tq(ifp->if_index), ifp->if_watchdogtask);
@@ -1090,7 +1079,6 @@ if_detach(struct ifnet *ifp)
  free(ifp->if_slowtimo, M_TEMP, sizeof(*ifp->if_slowtimo));
  free(ifp->if_watchdogtask, M_TEMP, sizeof(*ifp->if_watchdogtask));
  free(ifp->if_linkstatetask, M_TEMP, sizeof(*ifp->if_linkstatetask));
- free(ifp->if_inputtask, M_TEMP, sizeof(*ifp->if_inputtask));
 
  for (i = 0; (dp = domains[i]) != NULL; i++) {
  if (dp->dom_ifdetach && ifp->if_afdata[dp->dom_family])
@@ -1113,6 +1101,17 @@ if_detach(struct ifnet *ifp)
  free(ifp->if_ifqs, M_DEVBUF,
     sizeof(struct ifqueue *) * ifp->if_nifqs);
  }
+
+ for (i = 0; i < ifp->if_niqs; i++)
+ ifiq_destroy(ifp->if_iqs[i]);
+ if (ifp->if_iqs != ifp->if_rcv.ifiq_ifiqs) {
+ for (i = 1; i < ifp->if_niqs; i++) {
+ free(ifp->if_iqs[i], M_DEVBUF,
+    sizeof(struct ifiqueue));
+ }
+ free(ifp->if_iqs, M_DEVBUF,
+    sizeof(struct ifiqueue *) * ifp->if_niqs);
+ }
 }
 
 /*
@@ -2284,6 +2283,12 @@ if_getdata(struct ifnet *ifp, struct if_
  struct ifqueue *ifq = ifp->if_ifqs[i];
 
  ifq_add_data(ifq, data);
+ }
+
+ for (i = 0; i < ifp->if_niqs; i++) {
+ struct ifiqueue *ifiq = ifp->if_iqs[i];
+
+ ifiq_add_data(ifiq, data);
  }
 }
 
Index: if_var.h
===================================================================
RCS file: /cvs/src/sys/net/if_var.h,v
retrieving revision 1.83
diff -u -p -r1.83 if_var.h
--- if_var.h 31 Oct 2017 22:05:12 -0000 1.83
+++ if_var.h 14 Nov 2017 10:51:24 -0000
@@ -140,8 +140,6 @@ struct ifnet { /* and the entries */
  struct task *if_linkstatetask; /* task to do route updates */
 
  /* procedure handles */
- struct mbuf_queue if_inputqueue;
- struct task *if_inputtask; /* input task */
  SRPL_HEAD(, ifih) if_inputs; /* input routines (dequeue) */
 
  /* output routine (enqueue) */
@@ -164,6 +162,10 @@ struct ifnet { /* and the entries */
  void (*if_qstart)(struct ifqueue *);
  unsigned int if_nifqs;
 
+ struct ifiqueue if_rcv; /* rx/input queue */
+ struct ifiqueue **if_iqs; /* pointer to the array of iqs */
+ unsigned int if_niqs;
+
  struct sockaddr_dl *if_sadl; /* pointer to our sockaddr_dl */
 
  void *if_afdata[AF_MAX];
@@ -303,7 +305,9 @@ void if_start(struct ifnet *);
 int if_enqueue_try(struct ifnet *, struct mbuf *);
 int if_enqueue(struct ifnet *, struct mbuf *);
 void if_input(struct ifnet *, struct mbuf_list *);
+void if_input_process(struct ifnet *, struct mbuf_list *);
 int if_input_local(struct ifnet *, struct mbuf *, sa_family_t);
+int if_output_local(struct ifnet *, struct mbuf *, sa_family_t);
 void if_rtrequest_dummy(struct ifnet *, int, struct rtentry *);
 void p2p_rtrequest(struct ifnet *, int, struct rtentry *);
 
Index: ifq.c
===================================================================
RCS file: /cvs/src/sys/net/ifq.c,v
retrieving revision 1.15
diff -u -p -r1.15 ifq.c
--- ifq.c 14 Nov 2017 08:44:11 -0000 1.15
+++ ifq.c 14 Nov 2017 10:51:24 -0000
@@ -16,6 +16,8 @@
  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  */
 
+#include "bpfilter.h"
+
 #include <sys/param.h>
 #include <sys/systm.h>
 #include <sys/socket.h>
@@ -25,6 +27,10 @@
 #include <net/if.h>
 #include <net/if_var.h>
 
+#if NBPFILTER > 0
+#include <net/bpf.h>
+#endif
+
 /*
  * priq glue
  */
@@ -416,6 +422,166 @@ ifq_mfreeml(struct ifqueue *ifq, struct
  ifq->ifq_len -= ml_len(ml);
  ifq->ifq_qdrops += ml_len(ml);
  ml_enlist(&ifq->ifq_free, ml);
+}
+
+/*
+ * ifiq
+ */
+
+static void ifiq_process(void *);
+
+void
+ifiq_init(struct ifiqueue *ifiq, struct ifnet *ifp, unsigned int idx)
+{
+ ifiq->ifiq_if = ifp;
+ ifiq->ifiq_softnet = net_tq(ifp->if_index); /* + idx */
+ ifiq->ifiq_softc = NULL;
+
+ mtx_init(&ifiq->ifiq_mtx, IPL_NET);
+ ml_init(&ifiq->ifiq_ml);
+ task_set(&ifiq->ifiq_task, ifiq_process, ifiq);
+
+ ifiq->ifiq_qdrops = 0;
+ ifiq->ifiq_packets = 0;
+ ifiq->ifiq_bytes = 0;
+ ifiq->ifiq_qdrops = 0;
+ ifiq->ifiq_errors = 0;
+
+ ifiq->ifiq_idx = idx;
+}
+
+void
+ifiq_destroy(struct ifiqueue *ifiq)
+{
+ if (!task_del(ifiq->ifiq_softnet, &ifiq->ifiq_task)) {
+ int netlocked = (rw_status(&netlock) == RW_WRITE);
+
+ if (netlocked) /* XXXSMP breaks atomicity */
+ NET_UNLOCK();
+
+ taskq_barrier(ifiq->ifiq_softnet);
+
+ if (netlocked)
+ NET_LOCK();
+ }
+
+ /* don't need to lock because this is the last use of the ifiq */
+ ml_purge(&ifiq->ifiq_ml);
+}
+
+int
+ifiq_input(struct ifiqueue *ifiq, struct mbuf_list *ml, unsigned int cwm)
+{
+ struct ifnet *ifp = ifiq->ifiq_if;
+ struct mbuf *m;
+ uint64_t packets;
+ uint64_t bytes = 0;
+#if NBPFILTER > 0
+ caddr_t if_bpf;
+#endif
+ int rv = 1;
+
+ if (ml_empty(ml))
+ return (0);
+
+ MBUF_LIST_FOREACH(ml, m) {
+ m->m_pkthdr.ph_ifidx = ifp->if_index;
+ m->m_pkthdr.ph_rtableid = ifp->if_rdomain;
+ bytes += m->m_pkthdr.len;
+ }
+ packets = ml_len(ml);
+
+#if NBPFILTER > 0
+ if_bpf = ifp->if_bpf;
+ if (if_bpf) {
+ struct mbuf_list ml0 = *ml;
+
+ ml_init(ml);
+
+ while ((m = ml_dequeue(&ml0)) != NULL) {
+ if (bpf_mtap_ether(if_bpf, m, BPF_DIRECTION_IN))
+ m_freem(m);
+ else
+ ml_enqueue(ml, m);
+ }
+
+ if (ml_empty(ml)) {
+ mtx_enter(&ifiq->ifiq_mtx);
+ ifiq->ifiq_packets += packets;
+ ifiq->ifiq_bytes += bytes;
+ mtx_leave(&ifiq->ifiq_mtx);
+
+ return (0);
+ }
+ }
+#endif
+
+ mtx_enter(&ifiq->ifiq_mtx);
+ ifiq->ifiq_packets += packets;
+ ifiq->ifiq_bytes += bytes;
+
+ if (ifiq_len(ifiq) >= cwm * 5)
+ ifiq->ifiq_qdrops += ml_len(ml);
+ else {
+ rv = (ifiq_len(ifiq) >= cwm * 3);
+ ml_enlist(&ifiq->ifiq_ml, ml);
+ }
+ mtx_leave(&ifiq->ifiq_mtx);
+
+ if (ml_empty(ml))
+ task_add(ifiq->ifiq_softnet, &ifiq->ifiq_task);
+ else
+ ml_purge(ml);
+
+ return (rv);
+}
+
+void
+ifiq_add_data(struct ifiqueue *ifiq, struct if_data *data)
+{
+ mtx_enter(&ifiq->ifiq_mtx);
+ data->ifi_ipackets = ifiq->ifiq_packets;
+ data->ifi_ibytes = ifiq->ifiq_bytes;
+ data->ifi_iqdrops = ifiq->ifiq_qdrops;
+ mtx_leave(&ifiq->ifiq_mtx);
+}
+
+void
+ifiq_barrier(struct ifiqueue *ifiq)
+{
+ if (!task_del(ifiq->ifiq_softnet, &ifiq->ifiq_task))
+ taskq_barrier(ifiq->ifiq_softnet);
+}
+
+int
+ifiq_enqueue(struct ifiqueue *ifiq, struct mbuf *m)
+{
+ /* this can be called from anywhere at any time, so must lock */
+
+ mtx_enter(&ifiq->ifiq_mtx);
+ ml_enqueue(&ifiq->ifiq_ml, m);
+ mtx_leave(&ifiq->ifiq_mtx);
+
+ task_add(ifiq->ifiq_softnet, &ifiq->ifiq_task);
+
+ return (0);
+}
+
+static void
+ifiq_process(void *arg)
+{
+ struct ifiqueue *ifiq = arg;
+ struct mbuf_list ml;
+
+ if (ifiq_empty(ifiq))
+ return;
+
+ mtx_enter(&ifiq->ifiq_mtx);
+ ml = ifiq->ifiq_ml;
+ ml_init(&ifiq->ifiq_ml);
+ mtx_leave(&ifiq->ifiq_mtx);
+
+ if_input_process(ifiq->ifiq_if, &ml);
 }
 
 /*
Index: ifq.h
===================================================================
RCS file: /cvs/src/sys/net/ifq.h,v
retrieving revision 1.16
diff -u -p -r1.16 ifq.h
--- ifq.h 14 Nov 2017 08:44:11 -0000 1.16
+++ ifq.h 14 Nov 2017 10:51:24 -0000
@@ -67,6 +67,32 @@ struct ifqueue {
  unsigned int ifq_idx;
 };
 
+struct ifiqueue {
+ struct ifnet *ifiq_if;
+ struct taskq *ifiq_softnet;
+ union {
+ void *_ifiq_softc;
+ struct ifiqueue *_ifiq_ifiqs[1];
+ } _ifiq_ptr;
+#define ifiq_softc _ifiq_ptr._ifiq_softc
+#define ifiq_ifiqs _ifiq_ptr._ifiq_ifiqs
+
+ struct mutex ifiq_mtx;
+ struct mbuf_list ifiq_ml;
+ struct task ifiq_task;
+
+ /* counters */
+ uint64_t ifiq_packets;
+ uint64_t ifiq_bytes;
+ uint64_t ifiq_qdrops;
+ uint64_t ifiq_errors;
+ uint64_t ifiq_mcasts;
+ uint64_t ifiq_noproto;
+
+ /* properties */
+ unsigned int ifiq_idx;
+};
+
 #ifdef _KERNEL
 
 #define IFQ_MAXLEN 256
@@ -435,6 +461,18 @@ ifq_idx(struct ifqueue *ifq, unsigned in
 #define IFQ_ASSERT_SERIALIZED(_ifq) KASSERT(ifq_is_serialized(_ifq))
 
 extern const struct ifq_ops * const ifq_priq_ops;
+
+/* ifiq */
+
+void ifiq_init(struct ifiqueue *, struct ifnet *, unsigned int);
+void ifiq_destroy(struct ifiqueue *);
+int ifiq_input(struct ifiqueue *, struct mbuf_list *,
+     unsigned int);
+int ifiq_enqueue(struct ifiqueue *, struct mbuf *);
+void ifiq_add_data(struct ifiqueue *, struct if_data *);
+
+#define ifiq_len(_ifiq) ml_len(&(_ifiq)->ifiq_ml)
+#define ifiq_empty(_ifiq) ml_empty(&(_ifiq)->ifiq_ml)
 
 #endif /* _KERNEL */
 
Index: if_loop.c
===================================================================
RCS file: /cvs/src/sys/net/if_loop.c,v
retrieving revision 1.83
diff -u -p -r1.83 if_loop.c
--- if_loop.c 31 Oct 2017 22:05:12 -0000 1.83
+++ if_loop.c 14 Nov 2017 10:51:24 -0000
@@ -241,12 +241,7 @@ looutput(struct ifnet *ifp, struct mbuf
  if ((m->m_flags & M_LOOP) == 0)
  return (if_input_local(ifp, m, dst->sa_family));
 
- m->m_pkthdr.ph_family = dst->sa_family;
- if (mq_enqueue(&ifp->if_inputqueue, m))
- return ENOBUFS;
- task_add(net_tq(ifp->if_index), ifp->if_inputtask);
-
- return (0);
+ return (if_output_local(ifp, m, dst->sa_family));
 }
 
 void

Reply | Threaded
Open this post in threaded view
|

Re: multiple interface input queues

Martin Pieuchot
In reply to this post by David Gwynne-5
On 14/11/17(Tue) 14:42, David Gwynne wrote:
> this replaces the single mbuf_queue and task in struct ifnet with
> a new ifiqueue structure modelled on ifqueues.

The name is confusing, should we rename ifqueues 'ifoqueue' then?

> the main motivation behind this was to show mpsafe input counters.

I like that you're sharing the whole direction where you're going.
Nice to know we don't need to address input counters :)

> ifiqueues, like ifqueues, allow a driver to configure multiple
> queueus. this in turn allows a driver with multiple rx rings to
> have them queue and account for packets independently.

How does a driver decide to use multiple queues?  Aren't we missing
the interrupt on !CPU0 bits?  Once that's done how do we split traffic?

> ifiq_input generally replaces if_input. if_input now simply queues
> on ifnets builtin ifiqueue (ifp->if_rcv) to support the current set
> of drivers that only configure a single rx context/ring.

So we always use at least `ifp->if_rcv'.  Can't we add an argument to
if_attach_common() rather than adding a new interface?

> ifiq counters are updated and read using the same semantic as percpu
> counters, ie, there's a generation number updated before and after
> a counter update. this means writers dont block, but a reader may
> loop a couple of times waiting for a writer to finish.

Why do we need yet another copy of counters_enter()/leave()?  Something
feels wrong.  I *know* it is not per-CPU memory, but I wish we could do
better.  Otherwise we will end up with per data structure counter API.

> loop a couple of times waiting for a writer to finish. readers call
> yield(), which causes splasserts to fire if if_getdata is still
> holding the kernel lock.

You mean the NET_LOCK(), well this should be easily removed.  tb@ has a
diff that should go in then you can remove the NET_LOCK()/UNLOCK() dance
around if_getdata().

> ifiq_input is set up to interact with the interface rx ring moderation
> code (ie, if_rxring code). you pass what the current rxr watermark
> is, and it will look at the backlog of packets on that ifiq to
> determine if the ring is producing too fast. if it is, itll return
> 1 to slow down packet reception. i have a later diff that adds
> functionality to the if_rxring bits so a driver can say say a ring
> is livelocked, rather than relying on the system to detect it. so
> drv_rxeof would have the following at the end of it:
>
> if (ifiq_input(rxr->ifiq, &ml, if_rxr_cwm(&rxr->rx_ring))
> if_rxr_livelocked(&rxr->rx_ring);
> drv_rx_refill(rxr);

This magic it worth discussing in a diff doing only that :)

> ive run with that on a hacked up ix(4) that runs with 8 rx rings,
> and this works pretty well. if you're doing a single stream, you
> see one rx ring grow the number of descs it will handle, but if you
> flood it with a range of traffic you'll see that one ring scale
> down and balance out with the rest of the rings. it turns out you
> can still get reasonable throughput even if the ifiqs are dynamically
> scaling themselves to only 100 packets. however, the interactivity
> of the system improves a lot.

So you're now introducing MCLGETI(9) back with support for multiple
ring.  Nice.

> currently if one interface is being DoSed, it'll end up with 8192
> packet on if_inputqueue. that takes a while to process those packets,
> which blocks the processing of packets from the other interface.
> by scaling the input queues to relatively small counts, softnet can
> service packets frmo other interfaces sooner.

I see a lot of good stuff in this diff.  I like your direction.  I'm
not sure how/when you plan to add support for multiple input rings.

Now I'm afraid of a single diff doing refactoring + iqueues + counter +
MCLGETI :o)

>
> Index: if.c
> ===================================================================
> RCS file: /cvs/src/sys/net/if.c,v
> retrieving revision 1.527
> diff -u -p -r1.527 if.c
> --- if.c 14 Nov 2017 04:08:11 -0000 1.527
> +++ if.c 14 Nov 2017 04:18:41 -0000
> @@ -156,7 +156,6 @@ int if_group_egress_build(void);
>  
>  void if_watchdog_task(void *);
>  
> -void if_input_process(void *);
>  void if_netisr(void *);
>  
>  #ifdef DDB
> @@ -437,8 +436,6 @@ if_attachsetup(struct ifnet *ifp)
>  
>   ifidx = ifp->if_index;
>  
> - mq_init(&ifp->if_inputqueue, 8192, IPL_NET);
> - task_set(ifp->if_inputtask, if_input_process, (void *)ifidx);
>   task_set(ifp->if_watchdogtask, if_watchdog_task, (void *)ifidx);
>   task_set(ifp->if_linkstatetask, if_linkstate_task, (void *)ifidx);
>  
> @@ -563,6 +560,30 @@ if_attach_queues(struct ifnet *ifp, unsi
>  }
>  
>  void
> +if_attach_iqueues(struct ifnet *ifp, unsigned int niqs)
> +{
> + struct ifiqueue **map;
> + struct ifiqueue *ifiq;
> + unsigned int i;
> +
> + KASSERT(niqs != 0);
> +
> + map = mallocarray(niqs, sizeof(*map), M_DEVBUF, M_WAITOK);
> +
> + ifp->if_rcv.ifiq_softc = NULL;
> + map[0] = &ifp->if_rcv;
> +
> + for (i = 1; i < niqs; i++) {
> + ifiq = malloc(sizeof(*ifiq), M_DEVBUF, M_WAITOK|M_ZERO);
> + ifiq_init(ifiq, ifp, i);
> + map[i] = ifiq;
> + }
> +
> + ifp->if_iqs = map;
> + ifp->if_niqs = niqs;
> +}
> +
> +void
>  if_attach_common(struct ifnet *ifp)
>  {
>   KASSERT(ifp->if_ioctl != NULL);
> @@ -587,6 +608,12 @@ if_attach_common(struct ifnet *ifp)
>   ifp->if_ifqs = ifp->if_snd.ifq_ifqs;
>   ifp->if_nifqs = 1;
>  
> + ifiq_init(&ifp->if_rcv, ifp, 0);
> +
> + ifp->if_rcv.ifiq_ifiqs[0] = &ifp->if_rcv;
> + ifp->if_iqs = ifp->if_rcv.ifiq_ifiqs;
> + ifp->if_niqs = 1;
> +
>   ifp->if_addrhooks = malloc(sizeof(*ifp->if_addrhooks),
>      M_TEMP, M_WAITOK);
>   TAILQ_INIT(ifp->if_addrhooks);
> @@ -605,8 +632,6 @@ if_attach_common(struct ifnet *ifp)
>      M_TEMP, M_WAITOK|M_ZERO);
>   ifp->if_linkstatetask = malloc(sizeof(*ifp->if_linkstatetask),
>      M_TEMP, M_WAITOK|M_ZERO);
> - ifp->if_inputtask = malloc(sizeof(*ifp->if_inputtask),
> -    M_TEMP, M_WAITOK|M_ZERO);
>   ifp->if_llprio = IFQ_DEFPRIO;
>  
>   SRPL_INIT(&ifp->if_inputs);
> @@ -694,47 +719,7 @@ if_enqueue(struct ifnet *ifp, struct mbu
>  void
>  if_input(struct ifnet *ifp, struct mbuf_list *ml)
>  {
> - struct mbuf *m;
> - size_t ibytes = 0;
> -#if NBPFILTER > 0
> - caddr_t if_bpf;
> -#endif
> -
> - if (ml_empty(ml))
> - return;
> -
> - MBUF_LIST_FOREACH(ml, m) {
> - m->m_pkthdr.ph_ifidx = ifp->if_index;
> - m->m_pkthdr.ph_rtableid = ifp->if_rdomain;
> - ibytes += m->m_pkthdr.len;
> - }
> -
> - ifp->if_ipackets += ml_len(ml);
> - ifp->if_ibytes += ibytes;
> -
> -#if NBPFILTER > 0
> - if_bpf = ifp->if_bpf;
> - if (if_bpf) {
> - struct mbuf_list ml0;
> -
> - ml_init(&ml0);
> - ml_enlist(&ml0, ml);
> - ml_init(ml);
> -
> - while ((m = ml_dequeue(&ml0)) != NULL) {
> - if (bpf_mtap_ether(if_bpf, m, BPF_DIRECTION_IN))
> - m_freem(m);
> - else
> - ml_enqueue(ml, m);
> - }
> -
> - if (ml_empty(ml))
> - return;
> - }
> -#endif
> -
> - if (mq_enlist(&ifp->if_inputqueue, ml) == 0)
> - task_add(net_tq(ifp->if_index), ifp->if_inputtask);
> + ifiq_input(&ifp->if_rcv, ml, 2048);
>  }
>  
>  int
> @@ -789,6 +774,24 @@ if_input_local(struct ifnet *ifp, struct
>   return (0);
>  }
>  
> +int
> +if_output_local(struct ifnet *ifp, struct mbuf *m, sa_family_t af)
> +{
> + struct ifiqueue *ifiq;
> + unsigned int flow = 0;
> +
> + m->m_pkthdr.ph_family = af;
> + m->m_pkthdr.ph_ifidx = ifp->if_index;
> + m->m_pkthdr.ph_rtableid = ifp->if_rdomain;
> +
> + if (ISSET(m->m_pkthdr.ph_flowid, M_FLOWID_VALID))
> + flow = m->m_pkthdr.ph_flowid & M_FLOWID_MASK;
> +
> + ifiq = ifp->if_iqs[flow % ifp->if_niqs];
> +
> + return (ifiq_enqueue(ifiq, m) == 0 ? 0 : ENOBUFS);
> +}
> +
>  struct ifih {
>   SRPL_ENTRY(ifih)  ifih_next;
>   int (*ifih_input)(struct ifnet *, struct mbuf *,
> @@ -873,26 +876,18 @@ if_ih_remove(struct ifnet *ifp, int (*in
>  }
>  
>  void
> -if_input_process(void *xifidx)
> +if_input_process(struct ifnet *ifp, struct mbuf_list *ml)
>  {
> - unsigned int ifidx = (unsigned long)xifidx;
> - struct mbuf_list ml;
>   struct mbuf *m;
> - struct ifnet *ifp;
>   struct ifih *ifih;
>   struct srp_ref sr;
>   int s;
>  
> - ifp = if_get(ifidx);
> - if (ifp == NULL)
> + if (ml_empty(ml))
>   return;
>  
> - mq_delist(&ifp->if_inputqueue, &ml);
> - if (ml_empty(&ml))
> - goto out;
> -
>   if (!ISSET(ifp->if_xflags, IFXF_CLONED))
> - add_net_randomness(ml_len(&ml));
> + add_net_randomness(ml_len(ml));
>  
>   /*
>   * We grab the NET_LOCK() before processing any packet to
> @@ -908,7 +903,7 @@ if_input_process(void *xifidx)
>   */
>   NET_RLOCK();
>   s = splnet();
> - while ((m = ml_dequeue(&ml)) != NULL) {
> + while ((m = ml_dequeue(ml)) != NULL) {
>   /*
>   * Pass this mbuf to all input handlers of its
>   * interface until it is consumed.
> @@ -924,8 +919,6 @@ if_input_process(void *xifidx)
>   }
>   splx(s);
>   NET_RUNLOCK();
> -out:
> - if_put(ifp);
>  }
>  
>  void
> @@ -1033,10 +1026,6 @@ if_detach(struct ifnet *ifp)
>   ifp->if_ioctl = if_detached_ioctl;
>   ifp->if_watchdog = NULL;
>  
> - /* Remove the input task */
> - task_del(net_tq(ifp->if_index), ifp->if_inputtask);
> - mq_purge(&ifp->if_inputqueue);
> -
>   /* Remove the watchdog timeout & task */
>   timeout_del(ifp->if_slowtimo);
>   task_del(net_tq(ifp->if_index), ifp->if_watchdogtask);
> @@ -1090,7 +1079,6 @@ if_detach(struct ifnet *ifp)
>   free(ifp->if_slowtimo, M_TEMP, sizeof(*ifp->if_slowtimo));
>   free(ifp->if_watchdogtask, M_TEMP, sizeof(*ifp->if_watchdogtask));
>   free(ifp->if_linkstatetask, M_TEMP, sizeof(*ifp->if_linkstatetask));
> - free(ifp->if_inputtask, M_TEMP, sizeof(*ifp->if_inputtask));
>  
>   for (i = 0; (dp = domains[i]) != NULL; i++) {
>   if (dp->dom_ifdetach && ifp->if_afdata[dp->dom_family])
> @@ -1113,6 +1101,17 @@ if_detach(struct ifnet *ifp)
>   free(ifp->if_ifqs, M_DEVBUF,
>      sizeof(struct ifqueue *) * ifp->if_nifqs);
>   }
> +
> + for (i = 0; i < ifp->if_niqs; i++)
> + ifiq_destroy(ifp->if_iqs[i]);
> + if (ifp->if_iqs != ifp->if_rcv.ifiq_ifiqs) {
> + for (i = 1; i < ifp->if_niqs; i++) {
> + free(ifp->if_iqs[i], M_DEVBUF,
> +    sizeof(struct ifiqueue));
> + }
> + free(ifp->if_iqs, M_DEVBUF,
> +    sizeof(struct ifiqueue *) * ifp->if_niqs);
> + }
>  }
>  
>  /*
> @@ -2280,11 +2279,21 @@ if_getdata(struct ifnet *ifp, struct if_
>  
>   *data = ifp->if_data;
>  
> + NET_UNLOCK();
> +
>   for (i = 0; i < ifp->if_nifqs; i++) {
>   struct ifqueue *ifq = ifp->if_ifqs[i];
>  
>   ifq_add_data(ifq, data);
>   }
> +
> + for (i = 0; i < ifp->if_niqs; i++) {
> + struct ifiqueue *ifiq = ifp->if_iqs[i];
> +
> + ifiq_add_data(ifiq, data);
> + }
> +
> + NET_LOCK();
>  }
>  
>  /*
> Index: if_var.h
> ===================================================================
> RCS file: /cvs/src/sys/net/if_var.h,v
> retrieving revision 1.83
> diff -u -p -r1.83 if_var.h
> --- if_var.h 31 Oct 2017 22:05:12 -0000 1.83
> +++ if_var.h 14 Nov 2017 04:18:41 -0000
> @@ -140,8 +140,6 @@ struct ifnet { /* and the entries */
>   struct task *if_linkstatetask; /* task to do route updates */
>  
>   /* procedure handles */
> - struct mbuf_queue if_inputqueue;
> - struct task *if_inputtask; /* input task */
>   SRPL_HEAD(, ifih) if_inputs; /* input routines (dequeue) */
>  
>   /* output routine (enqueue) */
> @@ -164,6 +162,10 @@ struct ifnet { /* and the entries */
>   void (*if_qstart)(struct ifqueue *);
>   unsigned int if_nifqs;
>  
> + struct ifiqueue if_rcv; /* rx/input queue */
> + struct ifiqueue **if_iqs;      /* pointer to the array of iqs */
> + unsigned int if_niqs;
> +
>   struct sockaddr_dl *if_sadl; /* pointer to our sockaddr_dl */
>  
>   void *if_afdata[AF_MAX];
> @@ -303,7 +305,9 @@ void if_start(struct ifnet *);
>  int if_enqueue_try(struct ifnet *, struct mbuf *);
>  int if_enqueue(struct ifnet *, struct mbuf *);
>  void if_input(struct ifnet *, struct mbuf_list *);
> +void if_input_process(struct ifnet *, struct mbuf_list *);
>  int if_input_local(struct ifnet *, struct mbuf *, sa_family_t);
> +int if_output_local(struct ifnet *, struct mbuf *, sa_family_t);
>  void if_rtrequest_dummy(struct ifnet *, int, struct rtentry *);
>  void p2p_rtrequest(struct ifnet *, int, struct rtentry *);
>  
> Index: ifq.c
> ===================================================================
> RCS file: /cvs/src/sys/net/ifq.c,v
> retrieving revision 1.14
> diff -u -p -r1.14 ifq.c
> --- ifq.c 14 Nov 2017 04:08:11 -0000 1.14
> +++ ifq.c 14 Nov 2017 04:18:41 -0000
> @@ -16,15 +16,22 @@
>   * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
>   */
>  
> +#include "bpfilter.h"
> +
>  #include <sys/param.h>
>  #include <sys/systm.h>
>  #include <sys/socket.h>
>  #include <sys/mbuf.h>
>  #include <sys/proc.h>
> +#include <sys/atomic.h>
>  
>  #include <net/if.h>
>  #include <net/if_var.h>
>  
> +#if NBPFILTER > 0
> +#include <net/bpf.h>
> +#endif
> +
>  /*
>   * priq glue
>   */
> @@ -457,6 +464,223 @@ ifq_mfreeml(struct ifqueue *ifq, struct
>   ifq->ifq_len -= ml_len(ml);
>   ifq->ifq_qdrops += ml_len(ml);
>   ml_enlist(&ifq->ifq_free, ml);
> +}
> +
> +/*
> + * ifiq
> + */
> +
> +struct ifiq_ref {
> + unsigned int gen;
> +}; /* __upunused */
> +
> +static void ifiq_process(void *);
> +
> +void
> +ifiq_init(struct ifiqueue *ifiq, struct ifnet *ifp, unsigned int idx)
> +{
> + ifiq->ifiq_if = ifp;
> + ifiq->ifiq_softnet = net_tq(ifp->if_index); /* + idx */
> + ifiq->ifiq_softc = NULL;
> +
> + mtx_init(&ifiq->ifiq_mtx, IPL_NET);
> + ml_init(&ifiq->ifiq_ml);
> + task_set(&ifiq->ifiq_task, ifiq_process, ifiq);
> +
> + ifiq->ifiq_qdrops = 0;
> + ifiq->ifiq_packets = 0;
> + ifiq->ifiq_bytes = 0;
> + ifiq->ifiq_qdrops = 0;
> + ifiq->ifiq_errors = 0;
> +
> + ifiq->ifiq_idx = idx;
> +}
> +
> +void
> +ifiq_destroy(struct ifiqueue *ifiq)
> +{
> + if (!task_del(ifiq->ifiq_softnet, &ifiq->ifiq_task)) {
> + int netlocked = (rw_status(&netlock) == RW_WRITE);
> +
> + if (netlocked) /* XXXSMP breaks atomicity */
> + NET_UNLOCK();

This isn't call with the NET_LOCK() held.

> +
> + taskq_barrier(ifiq->ifiq_softnet);
> +
> + if (netlocked)
> + NET_LOCK();
> + }
> +
> + /* don't need to lock because this is the last use of the ifiq */
> + ml_purge(&ifiq->ifiq_ml);
> +}
> +
> +static inline void
> +ifiq_enter(struct ifiq_ref *ref, struct ifiqueue *ifiq)
> +{
> + ref->gen = ++ifiq->ifiq_gen;
> + membar_producer();
> +}
> +
> +static inline void
> +ifiq_leave(struct ifiq_ref *ref, struct ifiqueue *ifiq)
> +{
> + membar_producer();
> + ifiq->ifiq_gen = ++ref->gen;
> +}
:1

> +
> +static inline void
> +ifiq_count(struct ifiqueue *ifiq, uint64_t packets, uint64_t bytes)
> +{
> + struct ifiq_ref ref;
> +
> + ifiq_enter(&ref, ifiq);
> + ifiq->ifiq_packets += packets;
> + ifiq->ifiq_bytes += bytes;
> + ifiq_leave(&ref, ifiq);
> +}
> +
> +static inline void
> +ifiq_qdrop(struct ifiqueue *ifiq, struct mbuf_list *ml)
> +{
> + struct ifiq_ref ref;
> + unsigned int qdrops;
> +
> + qdrops = ml_purge(ml);
> +
> + ifiq_enter(&ref, ifiq);
> + ifiq->ifiq_qdrops += qdrops;
> + ifiq_leave(&ref, ifiq);
> +}
> +
> +int
> +ifiq_input(struct ifiqueue *ifiq, struct mbuf_list *ml, unsigned int cwm)
> +{
> + struct ifnet *ifp = ifiq->ifiq_if;
> + struct mbuf *m;
> + uint64_t bytes = 0;
> +#if NBPFILTER > 0
> + caddr_t if_bpf;
> +#endif
> + int rv;
> +
> + if (ml_empty(ml))
> + return (0);
> +
> + MBUF_LIST_FOREACH(ml, m) {
> + m->m_pkthdr.ph_ifidx = ifp->if_index;
> + m->m_pkthdr.ph_rtableid = ifp->if_rdomain;
> + bytes += m->m_pkthdr.len;
> + }
> +
> + ifiq_count(ifiq, ml_len(ml), bytes);

No need to add a function which is used only once.

> +
> +#if NBPFILTER > 0
> + if_bpf = ifp->if_bpf;
> + if (if_bpf) {
> + struct mbuf_list ml0 = *ml;
> +
> + ml_init(ml);
> +
> + while ((m = ml_dequeue(&ml0)) != NULL) {
> + if (bpf_mtap_ether(if_bpf, m, BPF_DIRECTION_IN))
> + m_freem(m);
> + else
> + ml_enqueue(ml, m);
> + }
> +
> + if (ml_empty(ml))
> + return (0);
> + }
> +#endif
> +
> + if (ifiq_len(ifiq) >= cwm * 5) {
> + /* the backlock is way too high, so drop these packets */
> + ifiq_qdrop(ifiq, ml);

This function is also used only once.

> + return (1); /* tell the caller to slow down */
> + }
> +
> + /* tell the caller to slow down if the backlock is getting high */
> + rv = (ifiq_len(ifiq) >= cwm * 3);
> +
> + mtx_enter(&ifiq->ifiq_mtx);
> + ml_enlist(&ifiq->ifiq_ml, ml);
> + mtx_leave(&ifiq->ifiq_mtx);
> +
> + task_add(ifiq->ifiq_softnet, &ifiq->ifiq_task);
> +
> + return (rv);
> +}
> +
> +void
> +ifiq_add_data(struct ifiqueue *ifiq, struct if_data *data)
> +{
> + unsigned int enter, leave;
> + uint64_t packets, bytes, qdrops;
> +
> + enter = ifiq->ifiq_gen;
> + for (;;) {
> + /* the generation number is odd during an update */
> + while (enter & 1) {
> + yield();
> + enter = ifiq->ifiq_gen;
> + }
> +
> + membar_consumer();
> + packets = ifiq->ifiq_packets;
> + bytes = ifiq->ifiq_bytes;
> + qdrops = ifiq->ifiq_qdrops;
> + membar_consumer();
> +
> + leave = ifiq->ifiq_gen;
> +
> + if (enter == leave)
> + break;
> +
> + enter = leave;
> + }
> +
> + data->ifi_ipackets += packets;
> + data->ifi_ibytes += bytes;
> + data->ifi_iqdrops += qdrops;
> +}
> +
> +void
> +ifiq_barrier(struct ifiqueue *ifiq)
> +{
> + if (!task_del(ifiq->ifiq_softnet, &ifiq->ifiq_task))
> + taskq_barrier(ifiq->ifiq_softnet);
> +}
> +
> +int
> +ifiq_enqueue(struct ifiqueue *ifiq, struct mbuf *m)
> +{
> + /* this can be called from anywhere at any time, so must lock */
> +
> + mtx_enter(&ifiq->ifiq_mtx);
> + ml_enqueue(&ifiq->ifiq_ml, m);
> + mtx_leave(&ifiq->ifiq_mtx);
> +
> + task_add(ifiq->ifiq_softnet, &ifiq->ifiq_task);
> +
> + return (0);
> +}
> +
> +static void
> +ifiq_process(void *arg)
> +{
> + struct ifiqueue *ifiq = arg;
> + struct mbuf_list ml;
> +
> + if (ifiq_empty(ifiq))
> + return;
> +
> + mtx_enter(&ifiq->ifiq_mtx);
> + ml = ifiq->ifiq_ml;
> + ml_init(&ifiq->ifiq_ml);
> + mtx_leave(&ifiq->ifiq_mtx);
> +
> + if_input_process(ifiq->ifiq_if, &ml);
>  }
>  
>  /*
> Index: ifq.h
> ===================================================================
> RCS file: /cvs/src/sys/net/ifq.h,v
> retrieving revision 1.15
> diff -u -p -r1.15 ifq.h
> --- ifq.h 14 Nov 2017 04:08:11 -0000 1.15
> +++ ifq.h 14 Nov 2017 04:18:41 -0000
> @@ -69,6 +69,34 @@ struct ifqueue {
>   unsigned int ifq_idx;
>  };
>  
> +struct ifiqueue {
> + struct ifnet *ifiq_if;
> + struct taskq *ifiq_softnet;
> + union {
> + void *_ifiq_softc;
> + struct ifiqueue *_ifiq_ifiqs[1];
> + } _ifiq_ptr;
> +#define ifiq_softc _ifiq_ptr._ifiq_softc
> +#define ifiq_ifiqs _ifiq_ptr._ifiq_ifiqs
> +
> + struct mutex ifiq_mtx;
> + struct mbuf_list ifiq_ml;
> + struct task ifiq_task;
> +
> + /* counters */
> + unsigned int ifiq_gen;
> +
> + uint64_t ifiq_packets;
> + uint64_t ifiq_bytes;
> + uint64_t ifiq_qdrops;
> + uint64_t ifiq_errors;
> + uint64_t ifiq_mcasts;
> + uint64_t ifiq_noproto;
> +
> + /* properties */
> + unsigned int ifiq_idx;
> +};
> +
>  #ifdef _KERNEL
>  
>  #define IFQ_MAXLEN 256
> @@ -432,6 +460,18 @@ ifq_idx(struct ifqueue *ifq, unsigned in
>  #define IFQ_ASSERT_SERIALIZED(_ifq) KASSERT(ifq_is_serialized(_ifq))
>  
>  extern const struct ifq_ops * const ifq_priq_ops;
> +
> +/* ifiq */
> +
> +void ifiq_init(struct ifiqueue *, struct ifnet *, unsigned int);
> +void ifiq_destroy(struct ifiqueue *);
> +int ifiq_input(struct ifiqueue *, struct mbuf_list *,
> +     unsigned int);
> +int ifiq_enqueue(struct ifiqueue *, struct mbuf *);
> +void ifiq_add_data(struct ifiqueue *, struct if_data *);
> +
> +#define ifiq_len(_ifiq) ml_len(&(_ifiq)->ifiq_ml)
> +#define ifiq_empty(_ifiq) ml_empty(&(_ifiq)->ifiq_ml)
>  
>  #endif /* _KERNEL */
>  
> Index: if_loop.c
> ===================================================================
> RCS file: /cvs/src/sys/net/if_loop.c,v
> retrieving revision 1.83
> diff -u -p -r1.83 if_loop.c
> --- if_loop.c 31 Oct 2017 22:05:12 -0000 1.83
> +++ if_loop.c 14 Nov 2017 04:18:41 -0000
> @@ -241,12 +241,7 @@ looutput(struct ifnet *ifp, struct mbuf
>   if ((m->m_flags & M_LOOP) == 0)
>   return (if_input_local(ifp, m, dst->sa_family));
>  
> - m->m_pkthdr.ph_family = dst->sa_family;
> - if (mq_enqueue(&ifp->if_inputqueue, m))
> - return ENOBUFS;
> - task_add(net_tq(ifp->if_index), ifp->if_inputtask);
> -
> - return (0);
> + return (if_output_local(ifp, m, dst->sa_family));
>  }
>  
>  void
>

Reply | Threaded
Open this post in threaded view
|

Re: multiple interface input queues

David Gwynne-5

> On 14 Nov 2017, at 23:17, Martin Pieuchot <[hidden email]> wrote:
>
> On 14/11/17(Tue) 14:42, David Gwynne wrote:
>> this replaces the single mbuf_queue and task in struct ifnet with
>> a new ifiqueue structure modelled on ifqueues.
>
> The name is confusing, should we rename ifqueues 'ifoqueue' then?

yes.

>
>> the main motivation behind this was to show mpsafe input counters.
>
> I like that you're sharing the whole direction where you're going.
> Nice to know we don't need to address input counters :)
>
>> ifiqueues, like ifqueues, allow a driver to configure multiple
>> queueus. this in turn allows a driver with multiple rx rings to
>> have them queue and account for packets independently.
>
> How does a driver decide to use multiple queues?  Aren't we missing
> the interrupt on !CPU0 bits?  Once that's done how do we split traffic?

i dont know how to answer how a driver decides to use multiple queues, but i can show how it can use them once the decision is made. something like this:

        if_attach_queues(ifp, sc->num_queues);
        if_attach_iqueues(ifp, sc->num_queues);
        for (i = 0; i < sc->num_queues; i++) {
                struct ifqueue *ifq = ifp->if_ifqs[i];
                struct tx_ring *txr = &sc->tx_rings[i];
                struct ifiqueue *ifiq = ifp->if_iqs[i];
                struct rx_ring *rxr = &sc->rx_rings[i];

                ifq->ifq_softc = txr;
                txr->ifq = ifq;

                ifiq->ifiq_softc = rxr;
                rxr->ifiq = ifiq;
        }


>
>> ifiq_input generally replaces if_input. if_input now simply queues
>> on ifnets builtin ifiqueue (ifp->if_rcv) to support the current set
>> of drivers that only configure a single rx context/ring.
>
> So we always use at least `ifp->if_rcv'.  Can't we add an argument to
> if_attach_common() rather than adding a new interface?

do you want to make if_rcv optional rather than unconditional?

>
>> ifiq counters are updated and read using the same semantic as percpu
>> counters, ie, there's a generation number updated before and after
>> a counter update. this means writers dont block, but a reader may
>> loop a couple of times waiting for a writer to finish.
>
> Why do we need yet another copy of counters_enter()/leave()?  Something
> feels wrong.  I *know* it is not per-CPU memory, but I wish we could do
> better.  Otherwise we will end up with per data structure counter API.

i got rid of this in the next diff i sent.

if you're saying it would be nice to factor generation based counter updates/reads out, i agree. apart from this there hasn't been a need for it though.

>
>> loop a couple of times waiting for a writer to finish. readers call
>> yield(), which causes splasserts to fire if if_getdata is still
>> holding the kernel lock.
>
> You mean the NET_LOCK(), well this should be easily removed.  tb@ has a
> diff that should go in then you can remove the NET_LOCK()/UNLOCK() dance
> around if_getdata().

it's moving to^W^R^R^Red to RLOCK?

>
>> ifiq_input is set up to interact with the interface rx ring moderation
>> code (ie, if_rxring code). you pass what the current rxr watermark
>> is, and it will look at the backlog of packets on that ifiq to
>> determine if the ring is producing too fast. if it is, itll return
>> 1 to slow down packet reception. i have a later diff that adds
>> functionality to the if_rxring bits so a driver can say say a ring
>> is livelocked, rather than relying on the system to detect it. so
>> drv_rxeof would have the following at the end of it:
>>
>> if (ifiq_input(rxr->ifiq, &ml, if_rxr_cwm(&rxr->rx_ring))
>> if_rxr_livelocked(&rxr->rx_ring);
>> drv_rx_refill(rxr);
>
> This magic it worth discussing in a diff doing only that :)

ill send if_rxr_livelocked and if_rxr_cwm out today.

>
>> ive run with that on a hacked up ix(4) that runs with 8 rx rings,
>> and this works pretty well. if you're doing a single stream, you
>> see one rx ring grow the number of descs it will handle, but if you
>> flood it with a range of traffic you'll see that one ring scale
>> down and balance out with the rest of the rings. it turns out you
>> can still get reasonable throughput even if the ifiqs are dynamically
>> scaling themselves to only 100 packets. however, the interactivity
>> of the system improves a lot.
>
> So you're now introducing MCLGETI(9) back with support for multiple
> ring.  Nice.

yes :D :D

>
>> currently if one interface is being DoSed, it'll end up with 8192
>> packet on if_inputqueue. that takes a while to process those packets,
>> which blocks the processing of packets from the other interface.
>> by scaling the input queues to relatively small counts, softnet can
>> service packets frmo other interfaces sooner.
>
> I see a lot of good stuff in this diff.  I like your direction.  I'm
> not sure how/when you plan to add support for multiple input rings.
>
> Now I'm afraid of a single diff doing refactoring + iqueues + counter +
> MCLGETI :o)

the update to this does not include counters, and rxring moderation is implemented outside this.

dlg

>
>>
>> Index: if.c
>> ===================================================================
>> RCS file: /cvs/src/sys/net/if.c,v
>> retrieving revision 1.527
>> diff -u -p -r1.527 if.c
>> --- if.c 14 Nov 2017 04:08:11 -0000 1.527
>> +++ if.c 14 Nov 2017 04:18:41 -0000
>> @@ -156,7 +156,6 @@ int if_group_egress_build(void);
>>
>> void if_watchdog_task(void *);
>>
>> -void if_input_process(void *);
>> void if_netisr(void *);
>>
>> #ifdef DDB
>> @@ -437,8 +436,6 @@ if_attachsetup(struct ifnet *ifp)
>>
>> ifidx = ifp->if_index;
>>
>> - mq_init(&ifp->if_inputqueue, 8192, IPL_NET);
>> - task_set(ifp->if_inputtask, if_input_process, (void *)ifidx);
>> task_set(ifp->if_watchdogtask, if_watchdog_task, (void *)ifidx);
>> task_set(ifp->if_linkstatetask, if_linkstate_task, (void *)ifidx);
>>
>> @@ -563,6 +560,30 @@ if_attach_queues(struct ifnet *ifp, unsi
>> }
>>
>> void
>> +if_attach_iqueues(struct ifnet *ifp, unsigned int niqs)
>> +{
>> + struct ifiqueue **map;
>> + struct ifiqueue *ifiq;
>> + unsigned int i;
>> +
>> + KASSERT(niqs != 0);
>> +
>> + map = mallocarray(niqs, sizeof(*map), M_DEVBUF, M_WAITOK);
>> +
>> + ifp->if_rcv.ifiq_softc = NULL;
>> + map[0] = &ifp->if_rcv;
>> +
>> + for (i = 1; i < niqs; i++) {
>> + ifiq = malloc(sizeof(*ifiq), M_DEVBUF, M_WAITOK|M_ZERO);
>> + ifiq_init(ifiq, ifp, i);
>> + map[i] = ifiq;
>> + }
>> +
>> + ifp->if_iqs = map;
>> + ifp->if_niqs = niqs;
>> +}
>> +
>> +void
>> if_attach_common(struct ifnet *ifp)
>> {
>> KASSERT(ifp->if_ioctl != NULL);
>> @@ -587,6 +608,12 @@ if_attach_common(struct ifnet *ifp)
>> ifp->if_ifqs = ifp->if_snd.ifq_ifqs;
>> ifp->if_nifqs = 1;
>>
>> + ifiq_init(&ifp->if_rcv, ifp, 0);
>> +
>> + ifp->if_rcv.ifiq_ifiqs[0] = &ifp->if_rcv;
>> + ifp->if_iqs = ifp->if_rcv.ifiq_ifiqs;
>> + ifp->if_niqs = 1;
>> +
>> ifp->if_addrhooks = malloc(sizeof(*ifp->if_addrhooks),
>>    M_TEMP, M_WAITOK);
>> TAILQ_INIT(ifp->if_addrhooks);
>> @@ -605,8 +632,6 @@ if_attach_common(struct ifnet *ifp)
>>    M_TEMP, M_WAITOK|M_ZERO);
>> ifp->if_linkstatetask = malloc(sizeof(*ifp->if_linkstatetask),
>>    M_TEMP, M_WAITOK|M_ZERO);
>> - ifp->if_inputtask = malloc(sizeof(*ifp->if_inputtask),
>> -    M_TEMP, M_WAITOK|M_ZERO);
>> ifp->if_llprio = IFQ_DEFPRIO;
>>
>> SRPL_INIT(&ifp->if_inputs);
>> @@ -694,47 +719,7 @@ if_enqueue(struct ifnet *ifp, struct mbu
>> void
>> if_input(struct ifnet *ifp, struct mbuf_list *ml)
>> {
>> - struct mbuf *m;
>> - size_t ibytes = 0;
>> -#if NBPFILTER > 0
>> - caddr_t if_bpf;
>> -#endif
>> -
>> - if (ml_empty(ml))
>> - return;
>> -
>> - MBUF_LIST_FOREACH(ml, m) {
>> - m->m_pkthdr.ph_ifidx = ifp->if_index;
>> - m->m_pkthdr.ph_rtableid = ifp->if_rdomain;
>> - ibytes += m->m_pkthdr.len;
>> - }
>> -
>> - ifp->if_ipackets += ml_len(ml);
>> - ifp->if_ibytes += ibytes;
>> -
>> -#if NBPFILTER > 0
>> - if_bpf = ifp->if_bpf;
>> - if (if_bpf) {
>> - struct mbuf_list ml0;
>> -
>> - ml_init(&ml0);
>> - ml_enlist(&ml0, ml);
>> - ml_init(ml);
>> -
>> - while ((m = ml_dequeue(&ml0)) != NULL) {
>> - if (bpf_mtap_ether(if_bpf, m, BPF_DIRECTION_IN))
>> - m_freem(m);
>> - else
>> - ml_enqueue(ml, m);
>> - }
>> -
>> - if (ml_empty(ml))
>> - return;
>> - }
>> -#endif
>> -
>> - if (mq_enlist(&ifp->if_inputqueue, ml) == 0)
>> - task_add(net_tq(ifp->if_index), ifp->if_inputtask);
>> + ifiq_input(&ifp->if_rcv, ml, 2048);
>> }
>>
>> int
>> @@ -789,6 +774,24 @@ if_input_local(struct ifnet *ifp, struct
>> return (0);
>> }
>>
>> +int
>> +if_output_local(struct ifnet *ifp, struct mbuf *m, sa_family_t af)
>> +{
>> + struct ifiqueue *ifiq;
>> + unsigned int flow = 0;
>> +
>> + m->m_pkthdr.ph_family = af;
>> + m->m_pkthdr.ph_ifidx = ifp->if_index;
>> + m->m_pkthdr.ph_rtableid = ifp->if_rdomain;
>> +
>> + if (ISSET(m->m_pkthdr.ph_flowid, M_FLOWID_VALID))
>> + flow = m->m_pkthdr.ph_flowid & M_FLOWID_MASK;
>> +
>> + ifiq = ifp->if_iqs[flow % ifp->if_niqs];
>> +
>> + return (ifiq_enqueue(ifiq, m) == 0 ? 0 : ENOBUFS);
>> +}
>> +
>> struct ifih {
>> SRPL_ENTRY(ifih)  ifih_next;
>> int (*ifih_input)(struct ifnet *, struct mbuf *,
>> @@ -873,26 +876,18 @@ if_ih_remove(struct ifnet *ifp, int (*in
>> }
>>
>> void
>> -if_input_process(void *xifidx)
>> +if_input_process(struct ifnet *ifp, struct mbuf_list *ml)
>> {
>> - unsigned int ifidx = (unsigned long)xifidx;
>> - struct mbuf_list ml;
>> struct mbuf *m;
>> - struct ifnet *ifp;
>> struct ifih *ifih;
>> struct srp_ref sr;
>> int s;
>>
>> - ifp = if_get(ifidx);
>> - if (ifp == NULL)
>> + if (ml_empty(ml))
>> return;
>>
>> - mq_delist(&ifp->if_inputqueue, &ml);
>> - if (ml_empty(&ml))
>> - goto out;
>> -
>> if (!ISSET(ifp->if_xflags, IFXF_CLONED))
>> - add_net_randomness(ml_len(&ml));
>> + add_net_randomness(ml_len(ml));
>>
>> /*
>> * We grab the NET_LOCK() before processing any packet to
>> @@ -908,7 +903,7 @@ if_input_process(void *xifidx)
>> */
>> NET_RLOCK();
>> s = splnet();
>> - while ((m = ml_dequeue(&ml)) != NULL) {
>> + while ((m = ml_dequeue(ml)) != NULL) {
>> /*
>> * Pass this mbuf to all input handlers of its
>> * interface until it is consumed.
>> @@ -924,8 +919,6 @@ if_input_process(void *xifidx)
>> }
>> splx(s);
>> NET_RUNLOCK();
>> -out:
>> - if_put(ifp);
>> }
>>
>> void
>> @@ -1033,10 +1026,6 @@ if_detach(struct ifnet *ifp)
>> ifp->if_ioctl = if_detached_ioctl;
>> ifp->if_watchdog = NULL;
>>
>> - /* Remove the input task */
>> - task_del(net_tq(ifp->if_index), ifp->if_inputtask);
>> - mq_purge(&ifp->if_inputqueue);
>> -
>> /* Remove the watchdog timeout & task */
>> timeout_del(ifp->if_slowtimo);
>> task_del(net_tq(ifp->if_index), ifp->if_watchdogtask);
>> @@ -1090,7 +1079,6 @@ if_detach(struct ifnet *ifp)
>> free(ifp->if_slowtimo, M_TEMP, sizeof(*ifp->if_slowtimo));
>> free(ifp->if_watchdogtask, M_TEMP, sizeof(*ifp->if_watchdogtask));
>> free(ifp->if_linkstatetask, M_TEMP, sizeof(*ifp->if_linkstatetask));
>> - free(ifp->if_inputtask, M_TEMP, sizeof(*ifp->if_inputtask));
>>
>> for (i = 0; (dp = domains[i]) != NULL; i++) {
>> if (dp->dom_ifdetach && ifp->if_afdata[dp->dom_family])
>> @@ -1113,6 +1101,17 @@ if_detach(struct ifnet *ifp)
>> free(ifp->if_ifqs, M_DEVBUF,
>>    sizeof(struct ifqueue *) * ifp->if_nifqs);
>> }
>> +
>> + for (i = 0; i < ifp->if_niqs; i++)
>> + ifiq_destroy(ifp->if_iqs[i]);
>> + if (ifp->if_iqs != ifp->if_rcv.ifiq_ifiqs) {
>> + for (i = 1; i < ifp->if_niqs; i++) {
>> + free(ifp->if_iqs[i], M_DEVBUF,
>> +    sizeof(struct ifiqueue));
>> + }
>> + free(ifp->if_iqs, M_DEVBUF,
>> +    sizeof(struct ifiqueue *) * ifp->if_niqs);
>> + }
>> }
>>
>> /*
>> @@ -2280,11 +2279,21 @@ if_getdata(struct ifnet *ifp, struct if_
>>
>> *data = ifp->if_data;
>>
>> + NET_UNLOCK();
>> +
>> for (i = 0; i < ifp->if_nifqs; i++) {
>> struct ifqueue *ifq = ifp->if_ifqs[i];
>>
>> ifq_add_data(ifq, data);
>> }
>> +
>> + for (i = 0; i < ifp->if_niqs; i++) {
>> + struct ifiqueue *ifiq = ifp->if_iqs[i];
>> +
>> + ifiq_add_data(ifiq, data);
>> + }
>> +
>> + NET_LOCK();
>> }
>>
>> /*
>> Index: if_var.h
>> ===================================================================
>> RCS file: /cvs/src/sys/net/if_var.h,v
>> retrieving revision 1.83
>> diff -u -p -r1.83 if_var.h
>> --- if_var.h 31 Oct 2017 22:05:12 -0000 1.83
>> +++ if_var.h 14 Nov 2017 04:18:41 -0000
>> @@ -140,8 +140,6 @@ struct ifnet { /* and the entries */
>> struct task *if_linkstatetask; /* task to do route updates */
>>
>> /* procedure handles */
>> - struct mbuf_queue if_inputqueue;
>> - struct task *if_inputtask; /* input task */
>> SRPL_HEAD(, ifih) if_inputs; /* input routines (dequeue) */
>>
>> /* output routine (enqueue) */
>> @@ -164,6 +162,10 @@ struct ifnet { /* and the entries */
>> void (*if_qstart)(struct ifqueue *);
>> unsigned int if_nifqs;
>>
>> + struct ifiqueue if_rcv; /* rx/input queue */
>> + struct ifiqueue **if_iqs;      /* pointer to the array of iqs */
>> + unsigned int if_niqs;
>> +
>> struct sockaddr_dl *if_sadl; /* pointer to our sockaddr_dl */
>>
>> void *if_afdata[AF_MAX];
>> @@ -303,7 +305,9 @@ void if_start(struct ifnet *);
>> int if_enqueue_try(struct ifnet *, struct mbuf *);
>> int if_enqueue(struct ifnet *, struct mbuf *);
>> void if_input(struct ifnet *, struct mbuf_list *);
>> +void if_input_process(struct ifnet *, struct mbuf_list *);
>> int if_input_local(struct ifnet *, struct mbuf *, sa_family_t);
>> +int if_output_local(struct ifnet *, struct mbuf *, sa_family_t);
>> void if_rtrequest_dummy(struct ifnet *, int, struct rtentry *);
>> void p2p_rtrequest(struct ifnet *, int, struct rtentry *);
>>
>> Index: ifq.c
>> ===================================================================
>> RCS file: /cvs/src/sys/net/ifq.c,v
>> retrieving revision 1.14
>> diff -u -p -r1.14 ifq.c
>> --- ifq.c 14 Nov 2017 04:08:11 -0000 1.14
>> +++ ifq.c 14 Nov 2017 04:18:41 -0000
>> @@ -16,15 +16,22 @@
>>  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
>>  */
>>
>> +#include "bpfilter.h"
>> +
>> #include <sys/param.h>
>> #include <sys/systm.h>
>> #include <sys/socket.h>
>> #include <sys/mbuf.h>
>> #include <sys/proc.h>
>> +#include <sys/atomic.h>
>>
>> #include <net/if.h>
>> #include <net/if_var.h>
>>
>> +#if NBPFILTER > 0
>> +#include <net/bpf.h>
>> +#endif
>> +
>> /*
>>  * priq glue
>>  */
>> @@ -457,6 +464,223 @@ ifq_mfreeml(struct ifqueue *ifq, struct
>> ifq->ifq_len -= ml_len(ml);
>> ifq->ifq_qdrops += ml_len(ml);
>> ml_enlist(&ifq->ifq_free, ml);
>> +}
>> +
>> +/*
>> + * ifiq
>> + */
>> +
>> +struct ifiq_ref {
>> + unsigned int gen;
>> +}; /* __upunused */
>> +
>> +static void ifiq_process(void *);
>> +
>> +void
>> +ifiq_init(struct ifiqueue *ifiq, struct ifnet *ifp, unsigned int idx)
>> +{
>> + ifiq->ifiq_if = ifp;
>> + ifiq->ifiq_softnet = net_tq(ifp->if_index); /* + idx */
>> + ifiq->ifiq_softc = NULL;
>> +
>> + mtx_init(&ifiq->ifiq_mtx, IPL_NET);
>> + ml_init(&ifiq->ifiq_ml);
>> + task_set(&ifiq->ifiq_task, ifiq_process, ifiq);
>> +
>> + ifiq->ifiq_qdrops = 0;
>> + ifiq->ifiq_packets = 0;
>> + ifiq->ifiq_bytes = 0;
>> + ifiq->ifiq_qdrops = 0;
>> + ifiq->ifiq_errors = 0;
>> +
>> + ifiq->ifiq_idx = idx;
>> +}
>> +
>> +void
>> +ifiq_destroy(struct ifiqueue *ifiq)
>> +{
>> + if (!task_del(ifiq->ifiq_softnet, &ifiq->ifiq_task)) {
>> + int netlocked = (rw_status(&netlock) == RW_WRITE);
>> +
>> + if (netlocked) /* XXXSMP breaks atomicity */
>> + NET_UNLOCK();
>
> This isn't call with the NET_LOCK() held.
>
>> +
>> + taskq_barrier(ifiq->ifiq_softnet);
>> +
>> + if (netlocked)
>> + NET_LOCK();
>> + }
>> +
>> + /* don't need to lock because this is the last use of the ifiq */
>> + ml_purge(&ifiq->ifiq_ml);
>> +}
>> +
>> +static inline void
>> +ifiq_enter(struct ifiq_ref *ref, struct ifiqueue *ifiq)
>> +{
>> + ref->gen = ++ifiq->ifiq_gen;
>> + membar_producer();
>> +}
>> +
>> +static inline void
>> +ifiq_leave(struct ifiq_ref *ref, struct ifiqueue *ifiq)
>> +{
>> + membar_producer();
>> + ifiq->ifiq_gen = ++ref->gen;
>> +}
> :1
>> +
>> +static inline void
>> +ifiq_count(struct ifiqueue *ifiq, uint64_t packets, uint64_t bytes)
>> +{
>> + struct ifiq_ref ref;
>> +
>> + ifiq_enter(&ref, ifiq);
>> + ifiq->ifiq_packets += packets;
>> + ifiq->ifiq_bytes += bytes;
>> + ifiq_leave(&ref, ifiq);
>> +}
>> +
>> +static inline void
>> +ifiq_qdrop(struct ifiqueue *ifiq, struct mbuf_list *ml)
>> +{
>> + struct ifiq_ref ref;
>> + unsigned int qdrops;
>> +
>> + qdrops = ml_purge(ml);
>> +
>> + ifiq_enter(&ref, ifiq);
>> + ifiq->ifiq_qdrops += qdrops;
>> + ifiq_leave(&ref, ifiq);
>> +}
>> +
>> +int
>> +ifiq_input(struct ifiqueue *ifiq, struct mbuf_list *ml, unsigned int cwm)
>> +{
>> + struct ifnet *ifp = ifiq->ifiq_if;
>> + struct mbuf *m;
>> + uint64_t bytes = 0;
>> +#if NBPFILTER > 0
>> + caddr_t if_bpf;
>> +#endif
>> + int rv;
>> +
>> + if (ml_empty(ml))
>> + return (0);
>> +
>> + MBUF_LIST_FOREACH(ml, m) {
>> + m->m_pkthdr.ph_ifidx = ifp->if_index;
>> + m->m_pkthdr.ph_rtableid = ifp->if_rdomain;
>> + bytes += m->m_pkthdr.len;
>> + }
>> +
>> + ifiq_count(ifiq, ml_len(ml), bytes);
>
> No need to add a function which is used only once.
>
>> +
>> +#if NBPFILTER > 0
>> + if_bpf = ifp->if_bpf;
>> + if (if_bpf) {
>> + struct mbuf_list ml0 = *ml;
>> +
>> + ml_init(ml);
>> +
>> + while ((m = ml_dequeue(&ml0)) != NULL) {
>> + if (bpf_mtap_ether(if_bpf, m, BPF_DIRECTION_IN))
>> + m_freem(m);
>> + else
>> + ml_enqueue(ml, m);
>> + }
>> +
>> + if (ml_empty(ml))
>> + return (0);
>> + }
>> +#endif
>> +
>> + if (ifiq_len(ifiq) >= cwm * 5) {
>> + /* the backlock is way too high, so drop these packets */
>> + ifiq_qdrop(ifiq, ml);
>
> This function is also used only once.
>
>> + return (1); /* tell the caller to slow down */
>> + }
>> +
>> + /* tell the caller to slow down if the backlock is getting high */
>> + rv = (ifiq_len(ifiq) >= cwm * 3);
>> +
>> + mtx_enter(&ifiq->ifiq_mtx);
>> + ml_enlist(&ifiq->ifiq_ml, ml);
>> + mtx_leave(&ifiq->ifiq_mtx);
>> +
>> + task_add(ifiq->ifiq_softnet, &ifiq->ifiq_task);
>> +
>> + return (rv);
>> +}
>> +
>> +void
>> +ifiq_add_data(struct ifiqueue *ifiq, struct if_data *data)
>> +{
>> + unsigned int enter, leave;
>> + uint64_t packets, bytes, qdrops;
>> +
>> + enter = ifiq->ifiq_gen;
>> + for (;;) {
>> + /* the generation number is odd during an update */
>> + while (enter & 1) {
>> + yield();
>> + enter = ifiq->ifiq_gen;
>> + }
>> +
>> + membar_consumer();
>> + packets = ifiq->ifiq_packets;
>> + bytes = ifiq->ifiq_bytes;
>> + qdrops = ifiq->ifiq_qdrops;
>> + membar_consumer();
>> +
>> + leave = ifiq->ifiq_gen;
>> +
>> + if (enter == leave)
>> + break;
>> +
>> + enter = leave;
>> + }
>> +
>> + data->ifi_ipackets += packets;
>> + data->ifi_ibytes += bytes;
>> + data->ifi_iqdrops += qdrops;
>> +}
>> +
>> +void
>> +ifiq_barrier(struct ifiqueue *ifiq)
>> +{
>> + if (!task_del(ifiq->ifiq_softnet, &ifiq->ifiq_task))
>> + taskq_barrier(ifiq->ifiq_softnet);
>> +}
>> +
>> +int
>> +ifiq_enqueue(struct ifiqueue *ifiq, struct mbuf *m)
>> +{
>> + /* this can be called from anywhere at any time, so must lock */
>> +
>> + mtx_enter(&ifiq->ifiq_mtx);
>> + ml_enqueue(&ifiq->ifiq_ml, m);
>> + mtx_leave(&ifiq->ifiq_mtx);
>> +
>> + task_add(ifiq->ifiq_softnet, &ifiq->ifiq_task);
>> +
>> + return (0);
>> +}
>> +
>> +static void
>> +ifiq_process(void *arg)
>> +{
>> + struct ifiqueue *ifiq = arg;
>> + struct mbuf_list ml;
>> +
>> + if (ifiq_empty(ifiq))
>> + return;
>> +
>> + mtx_enter(&ifiq->ifiq_mtx);
>> + ml = ifiq->ifiq_ml;
>> + ml_init(&ifiq->ifiq_ml);
>> + mtx_leave(&ifiq->ifiq_mtx);
>> +
>> + if_input_process(ifiq->ifiq_if, &ml);
>> }
>>
>> /*
>> Index: ifq.h
>> ===================================================================
>> RCS file: /cvs/src/sys/net/ifq.h,v
>> retrieving revision 1.15
>> diff -u -p -r1.15 ifq.h
>> --- ifq.h 14 Nov 2017 04:08:11 -0000 1.15
>> +++ ifq.h 14 Nov 2017 04:18:41 -0000
>> @@ -69,6 +69,34 @@ struct ifqueue {
>> unsigned int ifq_idx;
>> };
>>
>> +struct ifiqueue {
>> + struct ifnet *ifiq_if;
>> + struct taskq *ifiq_softnet;
>> + union {
>> + void *_ifiq_softc;
>> + struct ifiqueue *_ifiq_ifiqs[1];
>> + } _ifiq_ptr;
>> +#define ifiq_softc _ifiq_ptr._ifiq_softc
>> +#define ifiq_ifiqs _ifiq_ptr._ifiq_ifiqs
>> +
>> + struct mutex ifiq_mtx;
>> + struct mbuf_list ifiq_ml;
>> + struct task ifiq_task;
>> +
>> + /* counters */
>> + unsigned int ifiq_gen;
>> +
>> + uint64_t ifiq_packets;
>> + uint64_t ifiq_bytes;
>> + uint64_t ifiq_qdrops;
>> + uint64_t ifiq_errors;
>> + uint64_t ifiq_mcasts;
>> + uint64_t ifiq_noproto;
>> +
>> + /* properties */
>> + unsigned int ifiq_idx;
>> +};
>> +
>> #ifdef _KERNEL
>>
>> #define IFQ_MAXLEN 256
>> @@ -432,6 +460,18 @@ ifq_idx(struct ifqueue *ifq, unsigned in
>> #define IFQ_ASSERT_SERIALIZED(_ifq) KASSERT(ifq_is_serialized(_ifq))
>>
>> extern const struct ifq_ops * const ifq_priq_ops;
>> +
>> +/* ifiq */
>> +
>> +void ifiq_init(struct ifiqueue *, struct ifnet *, unsigned int);
>> +void ifiq_destroy(struct ifiqueue *);
>> +int ifiq_input(struct ifiqueue *, struct mbuf_list *,
>> +     unsigned int);
>> +int ifiq_enqueue(struct ifiqueue *, struct mbuf *);
>> +void ifiq_add_data(struct ifiqueue *, struct if_data *);
>> +
>> +#define ifiq_len(_ifiq) ml_len(&(_ifiq)->ifiq_ml)
>> +#define ifiq_empty(_ifiq) ml_empty(&(_ifiq)->ifiq_ml)
>>
>> #endif /* _KERNEL */
>>
>> Index: if_loop.c
>> ===================================================================
>> RCS file: /cvs/src/sys/net/if_loop.c,v
>> retrieving revision 1.83
>> diff -u -p -r1.83 if_loop.c
>> --- if_loop.c 31 Oct 2017 22:05:12 -0000 1.83
>> +++ if_loop.c 14 Nov 2017 04:18:41 -0000
>> @@ -241,12 +241,7 @@ looutput(struct ifnet *ifp, struct mbuf
>> if ((m->m_flags & M_LOOP) == 0)
>> return (if_input_local(ifp, m, dst->sa_family));
>>
>> - m->m_pkthdr.ph_family = dst->sa_family;
>> - if (mq_enqueue(&ifp->if_inputqueue, m))
>> - return ENOBUFS;
>> - task_add(net_tq(ifp->if_index), ifp->if_inputtask);
>> -
>> - return (0);
>> + return (if_output_local(ifp, m, dst->sa_family));
>> }
>>
>> void
>>