]> git.baikalelectronics.ru Git - kernel.git/commitdiff
rxrpc: Delay terminal ACK transmission on a client call
authorDavid Howells <dhowells@redhat.com>
Fri, 24 Nov 2017 10:18:41 +0000 (10:18 +0000)
committerDavid Howells <dhowells@redhat.com>
Fri, 24 Nov 2017 10:18:41 +0000 (10:18 +0000)
Delay terminal ACK transmission on a client call by deferring it to the
connection processor.  This allows it to be skipped if we can send the next
call instead, the first DATA packet of which will implicitly ack this call.

Signed-off-by: David Howells <dhowells@redhat.com>
net/rxrpc/ar-internal.h
net/rxrpc/conn_client.c
net/rxrpc/conn_event.c
net/rxrpc/conn_object.c
net/rxrpc/recvmsg.c

index a972887b3f5dc928b3006e28267638a633a4873a..d1213d503f30b3ded509380c58acdcd5b9094f16 100644 (file)
@@ -338,8 +338,17 @@ enum rxrpc_conn_flag {
        RXRPC_CONN_DONT_REUSE,          /* Don't reuse this connection */
        RXRPC_CONN_COUNTED,             /* Counted by rxrpc_nr_client_conns */
        RXRPC_CONN_PROBING_FOR_UPGRADE, /* Probing for service upgrade */
+       RXRPC_CONN_FINAL_ACK_0,         /* Need final ACK for channel 0 */
+       RXRPC_CONN_FINAL_ACK_1,         /* Need final ACK for channel 1 */
+       RXRPC_CONN_FINAL_ACK_2,         /* Need final ACK for channel 2 */
+       RXRPC_CONN_FINAL_ACK_3,         /* Need final ACK for channel 3 */
 };
 
+#define RXRPC_CONN_FINAL_ACK_MASK ((1UL << RXRPC_CONN_FINAL_ACK_0) |   \
+                                  (1UL << RXRPC_CONN_FINAL_ACK_1) |    \
+                                  (1UL << RXRPC_CONN_FINAL_ACK_2) |    \
+                                  (1UL << RXRPC_CONN_FINAL_ACK_3))
+
 /*
  * Events that can be raised upon a connection.
  */
@@ -393,6 +402,7 @@ struct rxrpc_connection {
 #define RXRPC_ACTIVE_CHANS_MASK        ((1 << RXRPC_MAXCALLS) - 1)
        struct list_head        waiting_calls;  /* Calls waiting for channels */
        struct rxrpc_channel {
+               unsigned long           final_ack_at;   /* Time at which to issue final ACK */
                struct rxrpc_call __rcu *call;          /* Active call */
                u32                     call_id;        /* ID of current call */
                u32                     call_counter;   /* Call ID counter */
@@ -404,6 +414,7 @@ struct rxrpc_connection {
                };
        } channels[RXRPC_MAXCALLS];
 
+       struct timer_list       timer;          /* Conn event timer */
        struct work_struct      processor;      /* connection event processor */
        union {
                struct rb_node  client_node;    /* Node in local->client_conns */
@@ -861,6 +872,12 @@ static inline void rxrpc_put_connection(struct rxrpc_connection *conn)
                rxrpc_put_service_conn(conn);
 }
 
+static inline void rxrpc_reduce_conn_timer(struct rxrpc_connection *conn,
+                                          unsigned long expire_at)
+{
+       timer_reduce(&conn->timer, expire_at);
+}
+
 /*
  * conn_service.c
  */
index 5f9624bd311c6882bc6f751b393b4904222f5667..cfb997593da90f81afd3b31cea52b12ad4fcdca7 100644 (file)
@@ -554,6 +554,11 @@ static void rxrpc_activate_one_channel(struct rxrpc_connection *conn,
 
        trace_rxrpc_client(conn, channel, rxrpc_client_chan_activate);
 
+       /* Cancel the final ACK on the previous call if it hasn't been sent yet
+        * as the DATA packet will implicitly ACK it.
+        */
+       clear_bit(RXRPC_CONN_FINAL_ACK_0 + channel, &conn->flags);
+
        write_lock_bh(&call->state_lock);
        if (!test_bit(RXRPC_CALL_TX_LASTQ, &call->flags))
                call->state = RXRPC_CALL_CLIENT_SEND_REQUEST;
@@ -813,6 +818,19 @@ void rxrpc_disconnect_client_call(struct rxrpc_call *call)
                goto out_2;
        }
 
+       /* Schedule the final ACK to be transmitted in a short while so that it
+        * can be skipped if we find a follow-on call.  The first DATA packet
+        * of the follow on call will implicitly ACK this call.
+        */
+       if (test_bit(RXRPC_CALL_EXPOSED, &call->flags)) {
+               unsigned long final_ack_at = jiffies + 2;
+
+               WRITE_ONCE(chan->final_ack_at, final_ack_at);
+               smp_wmb(); /* vs rxrpc_process_delayed_final_acks() */
+               set_bit(RXRPC_CONN_FINAL_ACK_0 + channel, &conn->flags);
+               rxrpc_reduce_conn_timer(conn, final_ack_at);
+       }
+
        /* Things are more complex and we need the cache lock.  We might be
         * able to simply idle the conn or it might now be lurking on the wait
         * list.  It might even get moved back to the active list whilst we're
index 59a51a56e7c88e78d69dfc0184ca1230ca797a2e..9e9a8db1bc9cd0f1afd3efd7e9e26c4d2890a7d3 100644 (file)
  * Retransmit terminal ACK or ABORT of the previous call.
  */
 static void rxrpc_conn_retransmit_call(struct rxrpc_connection *conn,
-                                      struct sk_buff *skb)
+                                      struct sk_buff *skb,
+                                      unsigned int channel)
 {
-       struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
+       struct rxrpc_skb_priv *sp = skb ? rxrpc_skb(skb) : NULL;
        struct rxrpc_channel *chan;
        struct msghdr msg;
        struct kvec iov;
@@ -48,7 +49,7 @@ static void rxrpc_conn_retransmit_call(struct rxrpc_connection *conn,
 
        _enter("%d", conn->debug_id);
 
-       chan = &conn->channels[sp->hdr.cid & RXRPC_CHANNELMASK];
+       chan = &conn->channels[channel];
 
        /* If the last call got moved on whilst we were waiting to run, just
         * ignore this packet.
@@ -56,7 +57,7 @@ static void rxrpc_conn_retransmit_call(struct rxrpc_connection *conn,
        call_id = READ_ONCE(chan->last_call);
        /* Sync with __rxrpc_disconnect_call() */
        smp_rmb();
-       if (call_id != sp->hdr.callNumber)
+       if (skb && call_id != sp->hdr.callNumber)
                return;
 
        msg.msg_name    = &conn->params.peer->srx.transport;
@@ -65,9 +66,9 @@ static void rxrpc_conn_retransmit_call(struct rxrpc_connection *conn,
        msg.msg_controllen = 0;
        msg.msg_flags   = 0;
 
-       pkt.whdr.epoch          = htonl(sp->hdr.epoch);
-       pkt.whdr.cid            = htonl(sp->hdr.cid);
-       pkt.whdr.callNumber     = htonl(sp->hdr.callNumber);
+       pkt.whdr.epoch          = htonl(conn->proto.epoch);
+       pkt.whdr.cid            = htonl(conn->proto.cid);
+       pkt.whdr.callNumber     = htonl(call_id);
        pkt.whdr.seq            = 0;
        pkt.whdr.type           = chan->last_type;
        pkt.whdr.flags          = conn->out_clientflag;
@@ -87,11 +88,11 @@ static void rxrpc_conn_retransmit_call(struct rxrpc_connection *conn,
                mtu = conn->params.peer->if_mtu;
                mtu -= conn->params.peer->hdrsize;
                pkt.ack.bufferSpace     = 0;
-               pkt.ack.maxSkew         = htons(skb->priority);
-               pkt.ack.firstPacket     = htonl(chan->last_seq);
-               pkt.ack.previousPacket  = htonl(chan->last_seq - 1);
-               pkt.ack.serial          = htonl(sp->hdr.serial);
-               pkt.ack.reason          = RXRPC_ACK_DUPLICATE;
+               pkt.ack.maxSkew         = htons(skb ? skb->priority : 0);
+               pkt.ack.firstPacket     = htonl(chan->last_seq + 1);
+               pkt.ack.previousPacket  = htonl(chan->last_seq);
+               pkt.ack.serial          = htonl(skb ? sp->hdr.serial : 0);
+               pkt.ack.reason          = skb ? RXRPC_ACK_DUPLICATE : RXRPC_ACK_IDLE;
                pkt.ack.nAcks           = 0;
                pkt.info.rxMTU          = htonl(rxrpc_rx_mtu);
                pkt.info.maxMTU         = htonl(mtu);
@@ -272,7 +273,8 @@ static int rxrpc_process_event(struct rxrpc_connection *conn,
        switch (sp->hdr.type) {
        case RXRPC_PACKET_TYPE_DATA:
        case RXRPC_PACKET_TYPE_ACK:
-               rxrpc_conn_retransmit_call(conn, skb);
+               rxrpc_conn_retransmit_call(conn, skb,
+                                          sp->hdr.cid & RXRPC_CHANNELMASK);
                return 0;
 
        case RXRPC_PACKET_TYPE_BUSY:
@@ -378,6 +380,48 @@ abort:
        _leave(" [aborted]");
 }
 
+/*
+ * Process delayed final ACKs that we haven't subsumed into a subsequent call.
+ */
+static void rxrpc_process_delayed_final_acks(struct rxrpc_connection *conn)
+{
+       unsigned long j = jiffies, next_j;
+       unsigned int channel;
+       bool set;
+
+again:
+       next_j = j + LONG_MAX;
+       set = false;
+       for (channel = 0; channel < RXRPC_MAXCALLS; channel++) {
+               struct rxrpc_channel *chan = &conn->channels[channel];
+               unsigned long ack_at;
+
+               if (!test_bit(RXRPC_CONN_FINAL_ACK_0 + channel, &conn->flags))
+                       continue;
+
+               smp_rmb(); /* vs rxrpc_disconnect_client_call */
+               ack_at = READ_ONCE(chan->final_ack_at);
+
+               if (time_before(j, ack_at)) {
+                       if (time_before(ack_at, next_j)) {
+                               next_j = ack_at;
+                               set = true;
+                       }
+                       continue;
+               }
+
+               if (test_and_clear_bit(RXRPC_CONN_FINAL_ACK_0 + channel,
+                                      &conn->flags))
+                       rxrpc_conn_retransmit_call(conn, NULL, channel);
+       }
+
+       j = jiffies;
+       if (time_before_eq(next_j, j))
+               goto again;
+       if (set)
+               rxrpc_reduce_conn_timer(conn, next_j);
+}
+
 /*
  * connection-level event processor
  */
@@ -394,6 +438,10 @@ void rxrpc_process_connection(struct work_struct *work)
        if (test_and_clear_bit(RXRPC_CONN_EV_CHALLENGE, &conn->events))
                rxrpc_secure_connection(conn);
 
+       /* Process delayed ACKs whose time has come. */
+       if (conn->flags & RXRPC_CONN_FINAL_ACK_MASK)
+               rxrpc_process_delayed_final_acks(conn);
+
        /* go through the conn-level event packets, releasing the ref on this
         * connection that each one has when we've finished with it */
        while ((skb = skb_dequeue(&conn->rx_queue))) {
index fe575798592fec5945148694b54de90a3d78c3f6..a01a3791f06ac481d695cd14c838f6cf31be9f97 100644 (file)
@@ -24,6 +24,14 @@ unsigned int rxrpc_connection_expiry = 10 * 60;
 
 static void rxrpc_destroy_connection(struct rcu_head *);
 
+static void rxrpc_connection_timer(struct timer_list *timer)
+{
+       struct rxrpc_connection *conn =
+               container_of(timer, struct rxrpc_connection, timer);
+
+       rxrpc_queue_conn(conn);
+}
+
 /*
  * allocate a new connection
  */
@@ -38,6 +46,7 @@ struct rxrpc_connection *rxrpc_alloc_connection(gfp_t gfp)
                INIT_LIST_HEAD(&conn->cache_link);
                spin_lock_init(&conn->channel_lock);
                INIT_LIST_HEAD(&conn->waiting_calls);
+               timer_setup(&conn->timer, &rxrpc_connection_timer, 0);
                INIT_WORK(&conn->processor, &rxrpc_process_connection);
                INIT_LIST_HEAD(&conn->proc_link);
                INIT_LIST_HEAD(&conn->link);
@@ -332,6 +341,7 @@ static void rxrpc_destroy_connection(struct rcu_head *rcu)
 
        _net("DESTROY CONN %d", conn->debug_id);
 
+       del_timer_sync(&conn->timer);
        rxrpc_purge_queue(&conn->rx_queue);
 
        conn->security->clear(conn);
index 8510a98b87e1e224b5ba524f2fe75a21d9b41370..be0b9ae138933de21c5e31fd65c93399705d57f2 100644 (file)
@@ -144,11 +144,13 @@ static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial)
        trace_rxrpc_receive(call, rxrpc_receive_end, 0, call->rx_top);
        ASSERTCMP(call->rx_hard_ack, ==, call->rx_top);
 
+#if 0 // TODO: May want to transmit final ACK under some circumstances anyway
        if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY) {
                rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, 0, serial, true, false,
                                  rxrpc_propose_ack_terminal_ack);
                rxrpc_send_ack_packet(call, false);
        }
+#endif
 
        write_lock_bh(&call->state_lock);