]> git.baikalelectronics.ru Git - kernel.git/commitdiff
xprtrdma: Invoke rpcrdma_ep_create() in the connect worker
authorChuck Lever <chuck.lever@oracle.com>
Fri, 21 Feb 2020 22:00:12 +0000 (17:00 -0500)
committerAnna Schumaker <Anna.Schumaker@Netapp.com>
Fri, 27 Mar 2020 14:47:24 +0000 (10:47 -0400)
Refactor rpcrdma_ep_create(), rpcrdma_ep_disconnect(), and
rpcrdma_ep_destroy().

rpcrdma_ep_create will be invoked at connect time instead of at
transport set-up time. It will be responsible for allocating per-
connection resources. In this patch it allocates the CQs and
creates a QP. More to come.

rpcrdma_ep_destroy() is the inverse functionality that is
invoked at disconnect time. It will be responsible for releasing
the CQs and QP.

These changes should be safe to do because both connect and
disconnect is guaranteed to be serialized by the transport send
lock.

This takes us another step closer to resolving the address and route
only at connect time so that connection failover to another device
will work correctly.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
net/sunrpc/xprtrdma/transport.c
net/sunrpc/xprtrdma/verbs.c
net/sunrpc/xprtrdma/xprt_rdma.h

index 3cfeba68ee9a1136d15a953821f932558836f6f1..d915524a8e685db746f1cdb01589bfe6ee674bbd 100644 (file)
@@ -284,7 +284,7 @@ xprt_rdma_destroy(struct rpc_xprt *xprt)
 
        cancel_delayed_work_sync(&r_xprt->rx_connect_worker);
 
-       rpcrdma_ep_destroy(r_xprt);
+       rpcrdma_ep_disconnect(&r_xprt->rx_ep, &r_xprt->rx_ia);
        rpcrdma_buffer_destroy(&r_xprt->rx_buf);
        rpcrdma_ia_close(&r_xprt->rx_ia);
 
@@ -351,13 +351,9 @@ xprt_setup_rdma(struct xprt_create *args)
        if (rc)
                goto out1;
 
-       rc = rpcrdma_ep_create(new_xprt);
-       if (rc)
-               goto out2;
-
        rc = rpcrdma_buffer_create(new_xprt);
        if (rc)
-               goto out3;
+               goto out2;
 
        if (!try_module_get(THIS_MODULE))
                goto out4;
@@ -375,8 +371,6 @@ xprt_setup_rdma(struct xprt_create *args)
 out4:
        rpcrdma_buffer_destroy(&new_xprt->rx_buf);
        rc = -ENODEV;
-out3:
-       rpcrdma_ep_destroy(new_xprt);
 out2:
        rpcrdma_ia_close(&new_xprt->rx_ia);
 out1:
index 353f61ac8d51961720f5c915310dd07a97e45f20..042e6cc4f767fa462e0f4f4ac99089d5f049381f 100644 (file)
@@ -84,6 +84,7 @@ static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep);
 static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt);
 static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt);
 static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt);
+static void rpcrdma_ep_destroy(struct rpcrdma_xprt *r_xprt);
 static struct rpcrdma_regbuf *
 rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction,
                     gfp_t flags);
@@ -391,32 +392,17 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia)
 {
        struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt,
                                                   rx_ia);
-       struct rpcrdma_ep *ep = &r_xprt->rx_ep;
 
-       /* This is similar to rpcrdma_ep_destroy, but:
-        * - Don't cancel the connect worker.
-        * - Don't call rpcrdma_ep_disconnect, which waits
-        *   for another conn upcall, which will deadlock.
-        * - rdma_disconnect is unneeded, the underlying
-        *   connection is already gone.
-        */
-       if (ia->ri_id->qp) {
+       if (ia->ri_id->qp)
                rpcrdma_xprt_drain(r_xprt);
-               rdma_destroy_qp(ia->ri_id);
-               ia->ri_id->qp = NULL;
-       }
-       ib_free_cq(ep->rep_attr.recv_cq);
-       ep->rep_attr.recv_cq = NULL;
-       ib_free_cq(ep->rep_attr.send_cq);
-       ep->rep_attr.send_cq = NULL;
 
-       /* The ULP is responsible for ensuring all DMA
-        * mappings and MRs are gone.
-        */
        rpcrdma_reps_unmap(r_xprt);
        rpcrdma_reqs_reset(r_xprt);
        rpcrdma_mrs_destroy(r_xprt);
        rpcrdma_sendctxs_destroy(r_xprt);
+
+       rpcrdma_ep_destroy(r_xprt);
+
        ib_dealloc_pd(ia->ri_pd);
        ia->ri_pd = NULL;
 
@@ -434,11 +420,8 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia)
 void
 rpcrdma_ia_close(struct rpcrdma_ia *ia)
 {
-       if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
-               if (ia->ri_id->qp)
-                       rdma_destroy_qp(ia->ri_id);
+       if (ia->ri_id && !IS_ERR(ia->ri_id))
                rdma_destroy_id(ia->ri_id);
-       }
        ia->ri_id = NULL;
 
        /* If the pd is still busy, xprtrdma missed freeing a resource */
@@ -447,25 +430,19 @@ rpcrdma_ia_close(struct rpcrdma_ia *ia)
        ia->ri_pd = NULL;
 }
 
-/**
- * rpcrdma_ep_create - Create unconnected endpoint
- * @r_xprt: transport to instantiate
- *
- * Returns zero on success, or a negative errno.
- */
-int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt)
+static int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt,
+                            struct rdma_cm_id *id)
 {
        struct rpcrdma_ep *ep = &r_xprt->rx_ep;
        struct rpcrdma_ia *ia = &r_xprt->rx_ia;
        struct rpcrdma_connect_private *pmsg = &ep->rep_cm_private;
-       struct ib_cq *sendcq, *recvcq;
        int rc;
 
        ep->rep_max_requests = r_xprt->rx_xprt.max_reqs;
        ep->rep_inline_send = xprt_rdma_max_inline_write;
        ep->rep_inline_recv = xprt_rdma_max_inline_read;
 
-       rc = frwr_query_device(r_xprt, ia->ri_id->device);
+       rc = frwr_query_device(r_xprt, id->device);
        if (rc)
                return rc;
        r_xprt->rx_buf.rb_max_requests = cpu_to_be32(ep->rep_max_requests);
@@ -491,25 +468,22 @@ int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt)
        init_waitqueue_head(&ep->rep_connect_wait);
        ep->rep_receive_count = 0;
 
-       sendcq = ib_alloc_cq_any(ia->ri_id->device, r_xprt,
-                                ep->rep_attr.cap.max_send_wr + 1,
-                                IB_POLL_WORKQUEUE);
-       if (IS_ERR(sendcq)) {
-               rc = PTR_ERR(sendcq);
-               goto out1;
+       ep->rep_attr.send_cq = ib_alloc_cq_any(id->device, r_xprt,
+                                              ep->rep_attr.cap.max_send_wr,
+                                              IB_POLL_WORKQUEUE);
+       if (IS_ERR(ep->rep_attr.send_cq)) {
+               rc = PTR_ERR(ep->rep_attr.send_cq);
+               goto out_destroy;
        }
 
-       recvcq = ib_alloc_cq_any(ia->ri_id->device, NULL,
-                                ep->rep_attr.cap.max_recv_wr + 1,
-                                IB_POLL_WORKQUEUE);
-       if (IS_ERR(recvcq)) {
-               rc = PTR_ERR(recvcq);
-               goto out2;
+       ep->rep_attr.recv_cq = ib_alloc_cq_any(id->device, NULL,
+                                              ep->rep_attr.cap.max_recv_wr,
+                                              IB_POLL_WORKQUEUE);
+       if (IS_ERR(ep->rep_attr.recv_cq)) {
+               rc = PTR_ERR(ep->rep_attr.recv_cq);
+               goto out_destroy;
        }
 
-       ep->rep_attr.send_cq = sendcq;
-       ep->rep_attr.recv_cq = recvcq;
-
        /* Initialize cma parameters */
        memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma));
 
@@ -525,7 +499,7 @@ int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt)
        /* Client offers RDMA Read but does not initiate */
        ep->rep_remote_cma.initiator_depth = 0;
        ep->rep_remote_cma.responder_resources =
-               min_t(int, U8_MAX, ia->ri_id->device->attrs.max_qp_rd_atom);
+               min_t(int, U8_MAX, id->device->attrs.max_qp_rd_atom);
 
        /* Limit transport retries so client can detect server
         * GID changes quickly. RPC layer handles re-establishing
@@ -540,45 +514,41 @@ int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt)
        ep->rep_remote_cma.flow_control = 0;
        ep->rep_remote_cma.rnr_retry_count = 0;
 
+       rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
+       if (rc)
+               goto out_destroy;
        return 0;
 
-out2:
-       ib_free_cq(sendcq);
-out1:
+out_destroy:
+       rpcrdma_ep_destroy(r_xprt);
        return rc;
 }
 
-/**
- * rpcrdma_ep_destroy - Disconnect and destroy endpoint.
- * @r_xprt: transport instance to shut down
- *
- */
-void rpcrdma_ep_destroy(struct rpcrdma_xprt *r_xprt)
+static void rpcrdma_ep_destroy(struct rpcrdma_xprt *r_xprt)
 {
        struct rpcrdma_ep *ep = &r_xprt->rx_ep;
        struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 
        if (ia->ri_id && ia->ri_id->qp) {
-               rpcrdma_ep_disconnect(ep, ia);
                rdma_destroy_qp(ia->ri_id);
                ia->ri_id->qp = NULL;
        }
 
        if (ep->rep_attr.recv_cq)
                ib_free_cq(ep->rep_attr.recv_cq);
+       ep->rep_attr.recv_cq = NULL;
        if (ep->rep_attr.send_cq)
                ib_free_cq(ep->rep_attr.send_cq);
+       ep->rep_attr.send_cq = NULL;
 }
 
 /* Re-establish a connection after a device removal event.
  * Unlike a normal reconnection, a fresh PD and a new set
  * of MRs and buffers is needed.
  */
-static int rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt,
-                                   struct ib_qp_init_attr *qp_init_attr)
+static int rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt)
 {
        struct rpcrdma_ia *ia = &r_xprt->rx_ia;
-       struct rpcrdma_ep *ep = &r_xprt->rx_ep;
        int rc, err;
 
        trace_xprtrdma_reinsert(r_xprt);
@@ -587,39 +557,24 @@ static int rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt,
        if (rpcrdma_ia_open(r_xprt))
                goto out1;
 
-       rc = -ENOMEM;
-       err = rpcrdma_ep_create(r_xprt);
-       if (err) {
-               pr_err("rpcrdma: rpcrdma_ep_create returned %d\n", err);
-               goto out2;
-       }
-       memcpy(qp_init_attr, &ep->rep_attr, sizeof(*qp_init_attr));
-
        rc = -ENETUNREACH;
-       err = rdma_create_qp(ia->ri_id, ia->ri_pd, qp_init_attr);
-       if (err) {
-               pr_err("rpcrdma: rdma_create_qp returned %d\n", err);
-               goto out3;
-       }
+       err = rpcrdma_ep_create(r_xprt, ia->ri_id);
+       if (err)
+               goto out2;
        return 0;
 
-out3:
-       rpcrdma_ep_destroy(r_xprt);
 out2:
        rpcrdma_ia_close(ia);
 out1:
        return rc;
 }
 
-static int rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt,
-                               struct ib_qp_init_attr *qp_init_attr)
+static int rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt)
 {
        struct rpcrdma_ia *ia = &r_xprt->rx_ia;
        struct rdma_cm_id *id, *old;
        int err, rc;
 
-       rpcrdma_ep_disconnect(&r_xprt->rx_ep, ia);
-
        rc = -EHOSTUNREACH;
        id = rpcrdma_create_id(r_xprt, ia);
        if (IS_ERR(id))
@@ -640,15 +595,14 @@ static int rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt,
                goto out_destroy;
        }
 
-       err = rdma_create_qp(id, ia->ri_pd, qp_init_attr);
+       err = rpcrdma_ep_create(r_xprt, id);
        if (err)
                goto out_destroy;
 
-       /* Atomically replace the transport's ID and QP. */
+       /* Atomically replace the transport's ID. */
        rc = 0;
        old = ia->ri_id;
        ia->ri_id = id;
-       rdma_destroy_qp(old);
 
 out_destroy:
        rdma_destroy_id(old);
@@ -665,26 +619,25 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
        struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt,
                                                   rx_ia);
        struct rpc_xprt *xprt = &r_xprt->rx_xprt;
-       struct ib_qp_init_attr qp_init_attr;
        int rc;
 
 retry:
-       memcpy(&qp_init_attr, &ep->rep_attr, sizeof(qp_init_attr));
        switch (ep->rep_connected) {
        case 0:
-               rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &qp_init_attr);
-               if (rc) {
-                       rc = -ENETUNREACH;
+               rc = -ENETUNREACH;
+               if (rpcrdma_ep_create(r_xprt, ia->ri_id))
                        goto out_noupdate;
-               }
                break;
        case -ENODEV:
-               rc = rpcrdma_ep_recreate_xprt(r_xprt, &qp_init_attr);
+               rc = rpcrdma_ep_recreate_xprt(r_xprt);
                if (rc)
                        goto out_noupdate;
                break;
+       case 1:
+               rpcrdma_ep_disconnect(ep, ia);
+               /* fall through */
        default:
-               rc = rpcrdma_ep_reconnect(r_xprt, &qp_init_attr);
+               rc = rpcrdma_ep_reconnect(r_xprt);
                if (rc)
                        goto out;
        }
@@ -742,10 +695,14 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 {
        struct rpcrdma_xprt *r_xprt = container_of(ep, struct rpcrdma_xprt,
                                                   rx_ep);
+       struct rdma_cm_id *id = ia->ri_id;
        int rc;
 
+       if (!id)
+               goto out;
+
        /* returns without wait if ID is not connected */
-       rc = rdma_disconnect(ia->ri_id);
+       rc = rdma_disconnect(id);
        if (!rc)
                wait_event_interruptible(ep->rep_connect_wait,
                                                        ep->rep_connected != 1);
@@ -753,10 +710,14 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
                ep->rep_connected = rc;
        trace_xprtrdma_disconnect(r_xprt, rc);
 
-       rpcrdma_xprt_drain(r_xprt);
+       if (id->qp)
+               rpcrdma_xprt_drain(r_xprt);
+out:
        rpcrdma_reqs_reset(r_xprt);
        rpcrdma_mrs_destroy(r_xprt);
        rpcrdma_sendctxs_destroy(r_xprt);
+
+       rpcrdma_ep_destroy(r_xprt);
 }
 
 /* Fixed-size circular FIFO queue. This implementation is wait-free and
index 37d5080c250b87b775e29a56d4c919ac3588175c..9a536319557ed44843bfd2fe9e39db8cf258d97a 100644 (file)
@@ -464,8 +464,6 @@ void rpcrdma_ia_close(struct rpcrdma_ia *);
 /*
  * Endpoint calls - xprtrdma/verbs.c
  */
-int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt);
-void rpcrdma_ep_destroy(struct rpcrdma_xprt *r_xprt);
 int rpcrdma_ep_connect(struct rpcrdma_ep *, struct rpcrdma_ia *);
 void rpcrdma_ep_disconnect(struct rpcrdma_ep *, struct rpcrdma_ia *);