diff --git a/net/rds/tcp.c b/net/rds/tcp.c index 31e7425e2da9..45484a93d75f 100644 --- a/net/rds/tcp.c +++ b/net/rds/tcp.c @@ -384,6 +384,7 @@ static int rds_tcp_conn_alloc(struct rds_connection *conn, gfp_t gfp) tc->t_tinc = NULL; tc->t_tinc_hdr_rem = sizeof(struct rds_header); tc->t_tinc_data_rem = 0; + init_waitqueue_head(&tc->t_recv_done_waitq); conn->c_path[i].cp_transport_data = tc; tc->t_cpath = &conn->c_path[i]; diff --git a/net/rds/tcp.h b/net/rds/tcp.h index 7c91974fcde7..b36af0865a07 100644 --- a/net/rds/tcp.h +++ b/net/rds/tcp.h @@ -55,6 +55,9 @@ struct rds_tcp_connection { u32 t_last_sent_nxt; u32 t_last_expected_una; u32 t_last_seen_una; + + /* for rds_tcp_conn_path_shutdown */ + wait_queue_head_t t_recv_done_waitq; }; struct rds_tcp_statistics { @@ -105,6 +108,7 @@ void rds_tcp_xmit_path_prepare(struct rds_conn_path *cp); void rds_tcp_xmit_path_complete(struct rds_conn_path *cp); int rds_tcp_xmit(struct rds_connection *conn, struct rds_message *rm, unsigned int hdr_off, unsigned int sg, unsigned int off); +int rds_tcp_is_acked(struct rds_message *rm, uint64_t ack); void rds_tcp_write_space(struct sock *sk); /* tcp_stats.c */ diff --git a/net/rds/tcp_connect.c b/net/rds/tcp_connect.c index 4947ee73bad0..b77c88ffb199 100644 --- a/net/rds/tcp_connect.c +++ b/net/rds/tcp_connect.c @@ -75,8 +75,16 @@ void rds_tcp_state_change(struct sock *sk) rds_connect_path_complete(cp, RDS_CONN_CONNECTING); } break; + case TCP_CLOSING: + case TCP_TIME_WAIT: + if (wq_has_sleeper(&tc->t_recv_done_waitq)) + wake_up(&tc->t_recv_done_waitq); + break; case TCP_CLOSE_WAIT: + case TCP_LAST_ACK: case TCP_CLOSE: + if (wq_has_sleeper(&tc->t_recv_done_waitq)) + wake_up(&tc->t_recv_done_waitq); rds_conn_path_drop(cp, false); break; default: @@ -226,18 +234,58 @@ void rds_tcp_conn_path_shutdown(struct rds_conn_path *cp) { struct rds_tcp_connection *tc = cp->cp_transport_data; struct socket *sock = tc->t_sock; + struct sock *sk; + unsigned int rounds; rdsdebug("shutting down conn %p tc %p sock %p\n", cp->cp_conn, tc, sock); if (sock) { + sk = sock->sk; if (rds_destroy_pending(cp->cp_conn)) - sock_no_linger(sock->sk); - sock->ops->shutdown(sock, RCV_SHUTDOWN | SEND_SHUTDOWN); - lock_sock(sock->sk); + sock_no_linger(sk); + + sock->ops->shutdown(sock, SHUT_WR); + + /* after sending FIN, + * wait until we processed all incoming messages + * and we're sure that there won't be any more: + * i.e. state CLOSING, TIME_WAIT, CLOSE_WAIT, + * LAST_ACK, or CLOSE (RFC 793). + * + * Give up waiting after 5 seconds and allow messages + * to theoretically get dropped, if the TCP transition + * didn't happen. + */ + rounds = 0; + do { + /* we need to ensure messages are dequeued here + * since "rds_recv_worker" only dispatches messages + * while the connection is still in RDS_CONN_UP + * and there is no guarantee that "rds_tcp_data_ready" + * was called nor that "sk_data_ready" still points to + * it. + */ + rds_tcp_recv_path(cp); + } while (!wait_event_timeout(tc->t_recv_done_waitq, + (sk->sk_state == TCP_CLOSING || + sk->sk_state == TCP_TIME_WAIT || + sk->sk_state == TCP_CLOSE_WAIT || + sk->sk_state == TCP_LAST_ACK || + sk->sk_state == TCP_CLOSE) && + skb_queue_empty_lockless(&sk->sk_receive_queue), + msecs_to_jiffies(100)) && + ++rounds < 50); + lock_sock(sk); + + /* discard messages that the peer received already */ + tc->t_last_seen_una = rds_tcp_snd_una(tc); + rds_send_path_drop_acked(cp, rds_tcp_snd_una(tc), + rds_tcp_is_acked); + rds_tcp_restore_callbacks(sock, tc); /* tc->tc_sock = NULL */ - release_sock(sock->sk); + release_sock(sk); sock_release(sock); } diff --git a/net/rds/tcp_listen.c b/net/rds/tcp_listen.c index 8129ea9da31c..492dcc6568bf 100644 --- a/net/rds/tcp_listen.c +++ b/net/rds/tcp_listen.c @@ -278,6 +278,20 @@ int rds_tcp_accept_one(struct rds_tcp_net *rtn) rds_tcp_set_callbacks(new_sock, cp); rds_connect_path_complete(cp, RDS_CONN_CONNECTING); } + + /* Since "rds_tcp_set_callbacks" happens this late + * the connection may already have been closed without + * "rds_tcp_state_change" doing its due diligence. + * + * If that's the case, we simply drop the path, + * knowing that "rds_tcp_conn_path_shutdown" will + * dequeue pending messages. + */ + if (new_sock->sk->sk_state == TCP_CLOSE_WAIT || + new_sock->sk->sk_state == TCP_LAST_ACK || + new_sock->sk->sk_state == TCP_CLOSE) + rds_conn_path_drop(cp, 0); + new_sock = NULL; ret = 0; if (conn->c_npaths == 0) diff --git a/net/rds/tcp_recv.c b/net/rds/tcp_recv.c index b7cf7f451430..49f96ee0c40f 100644 --- a/net/rds/tcp_recv.c +++ b/net/rds/tcp_recv.c @@ -278,6 +278,10 @@ static int rds_tcp_read_sock(struct rds_conn_path *cp, gfp_t gfp) rdsdebug("tcp_read_sock for tc %p gfp 0x%x returned %d\n", tc, gfp, desc.error); + if (skb_queue_empty_lockless(&sock->sk->sk_receive_queue) && + wq_has_sleeper(&tc->t_recv_done_waitq)) + wake_up(&tc->t_recv_done_waitq); + return desc.error; } diff --git a/net/rds/tcp_send.c b/net/rds/tcp_send.c index 4e82c9644aa6..7c52acc749cf 100644 --- a/net/rds/tcp_send.c +++ b/net/rds/tcp_send.c @@ -169,7 +169,7 @@ int rds_tcp_xmit(struct rds_connection *conn, struct rds_message *rm, * unacked byte of the TCP sequence space. We have to do very careful * wrapping 32bit comparisons here. */ -static int rds_tcp_is_acked(struct rds_message *rm, uint64_t ack) +int rds_tcp_is_acked(struct rds_message *rm, uint64_t ack) { if (!test_bit(RDS_MSG_HAS_ACK_SEQ, &rm->m_flags)) return 0;