從 Linux 源碼看 socket 的阻塞和非阻塞

Linux愛好者2018-04-13 10:30:01

(點擊上方公眾號,可快速關注)


來源:無毀的湖光-Al

my.oschina.net/alchemystar/blog/1791017


筆者一直覺得如果能知道從應用到框架再到操作系統的每一處,是一件Exciting的事情。


大部分高性能網絡框架採用的是模式。筆者這次就從linux的角度來闡述socket阻塞(block)和非阻塞(non_block)的區別。 本文源碼均來自採用Linux-2.6.24內核版本。


一個TCP非阻塞client端簡單的例子


如果我們要產生一個非阻塞的socket,在C語言中如下代碼所示:


// 創建socket

int sock_fd = socket(AF_INET, SOCK_STREAM, 0);

...

// 更改socket為nonblock

fcntl(sock_fd, F_SETFL, fdflags | O_NONBLOCK);

// connect

....

while(1)  {  

    int recvlen = recv(sock_fd, recvbuf, RECV_BUF_SIZE) ;

    ......

}

...


由於網絡協議非常複雜,內核裏面用到了大量的面向對象的技巧,所以我們從創建連接開始,一步一步追述到最後代碼的點。


socket的創建


很明顯,內核的第一步應該是通過AF_INET、SOCK_STREAM以及最後一個參數0定位到需要創建一個TCP的socket,如下圖綠線所示:



我們跟蹤源碼調用


socket(AF_INET, SOCK_STREAM, 0)

|->sys_socket 進入系統調用

|->sock_create

|->__sock_create


進一步分析__sock_create的代碼判斷:


const struct net_proto_family *pf;

// RCU(Read-Copy Update)是linux的一種內核同步方法,在此不闡述

// family=INET

pf = rcu_dereference(net_families[family]);

err = pf->create(net, sock, protocol);


const struct net_proto_family *pf; // RCU(Read-Copy Update)是linux的一種內核同步方法,在此不闡述 // family=INET pf = rcu_dereference(net_families[family]); err = pf->create(net, sock, protocol);



則通過源碼可知,由於是AF_INET(PF_INET),所以net_families[PF_INET].create=inet_create(以後我們都用PF_INET表示),即
pf->create = inet_create; 進一步追溯調用:


inet_create(struct net *net, struct socket *sock, int protocol){

Socksock;

......

// 此處是尋找對應協議處理器的過程

lookup_protocol:

// 迭代尋找protocol==answer->protocol的情況

list_for_each_rcu(p, &inetsw[sock->type]) answer = list_entry(p, struct inet_protosw, list);

 

/* Check the non-wild match. */

if (protocol == answer->protocol) {

if (protocol != IPPROTO_IP)

break;

}

......

// 這邊answer指的是SOCK_STREAM

sock->ops = answer->ops;

answer_no_check = answer->no_check;

// 這邊sk->prot就是answer_prot=>tcp_prot

sk = sk_alloc(net, PF_INET, GFP_KERNEL, answer_prot);

sock_init_data(sock, sk);

......

}


上面的代碼就是在INET中尋找SOCK_STREAM的過程了 我們再看一下inetsw[SOCK_STREAM]的具體配置:


static struct inet_protosw inetsw_array[] =

{

{

.type =       SOCK_STREAM,

.protocol =   IPPROTO_TCP,

.prot =       &tcp_prot,

.ops =        &inet_stream_ops,

.capability = -1,

.no_check =   0,

.flags =      INET_PROTOSW_PERMANENT |

      INET_PROTOSW_ICSK,

},

......

}


這邊也用了重載,AF_INET有TCP、UDP以及Raw三種:



從上述代碼,我們可以清楚的發現sock->ops=&inet_stream_ops;


const struct proto_ops inet_stream_ops = {

.family    = PF_INET,

.owner    = THIS_MODULE,

......

.sendmsg    = tcp_sendmsg,

.recvmsg    = sock_common_recvmsg,

......

}


即sock->ops->recvmsg = sock_common_recvmsg;


同時sock->sk->sk_prot = tcp_prot;


我們再看下tcp_prot中的各個函數重載的定義:


struct proto tcp_prot = {

.name"TCP",

.closetcp_close,

.connecttcp_v4_connect,

.disconnecttcp_disconnect,

.acceptinet_csk_accept,

......

// 我們重點考察tcp的讀

.recvmsgtcp_recvmsg,

......

}


fcntl控制socket的阻塞\非阻塞狀態


我們用fcntl修改socket的阻塞\非阻塞狀態。 事實上: fcntl的作用就是將O_NONBLOCK標誌位存儲在sock_fd對應的filp結構的f_lags裏,如下圖所示。



fcntl(sock_fd, F_SETFL, fdflags | O_NONBLOCK);

|->setfl


追蹤setfl代碼:


static int setfl(int fd, struct file * filp, unsigned long arg) {

......

filp->f_flags = (arg & SETFL_MASK) | (filp->f_flags & ~SETFL_MASK);

......

}


上圖中,由sock_fd在task_struct(進程結構體)->files_struct->fd_array中找到對應的socket的file描述符,再修改file->flags


在調用socket.recv的時候


我們跟蹤源碼調用:


socket.recv

|->sys_recv

|->sys_recvfrom

|->sock_recvmsg

|->__sock_recvmsg

|->sock->ops->recvmsg


由上文可知: sock->ops->recvmsg = sock_common_recvmsg;


sock


值得注意的是,在sock_recmsg中,有對標識O_NONBLOCK的處理


if (sock->file->f_flags & O_NONBLOCK)

flags |= MSG_DONTWAIT;


上述代碼中sock關聯的file中獲取其f_flags,如果flags有O_NONBLOCK標識,那麼就設置msg_flags為MSG_DONTWAIT(不等待)。


fcntl與socket就是通過其共同操作File結構關聯起來的。


繼續跟蹤調用


sock_common_recvmsg


int sock_common_recvmsg(struct kiocb *iocb, struct socket *sock,

struct msghdr *msg, size_t size, int flags) {

......

// 如果flags的MSG_DONTWAIT標識置位,則傳給recvmsg的第5個參數為正,否則為0

err = sk->sk_prot->recvmsg(iocb, sk, msg, size, flags & MSG_DONTWAIT,

   flags & ~MSG_DONTWAIT, &addr_len);

.....   

}


由上文可知: sk->sk_prot->recvmsg 其中sk_prot=tcp_prot,即最終調用的是tcp_prot->tcp_recvmsg,上面的代碼可以看出,如果fcntl(O_NONBLOCK)=>MSG_DONTWAIT置位=>(flags & MSG_DONTWAIT)>0, 再結合tcp_recvmsg的函數簽名,即如果設置了O_NONBLOCK的話,設置給tcp_recvmsg的nonblock參數>0,關係如下圖所示:



最終的調用邏輯tcp_recvmsg


首先我們看下tcp_recvmsg的函數簽名:


int tcp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,

size_t len, int nonblock, int flags, int *addr_len)


顯然我們關注焦點在(int nonblock這個參數上):


int tcp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,

size_t len, int nonblock, int flags, int *addr_len){

......

// copied是指向用户空間拷貝了多少字節,即讀了多少

int copied;

// target指的是期望多少字節

int target;

// 等效為timo = noblock ? 0 : sk->sk_rcvtimeo;

timeo = sock_rcvtimeo(sk, nonblock);

......

// 如果設置了MSG_WAITALL標識target=需要讀的長度

// 如果未設置,則為最低低水位值

target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);

......

 

do{

// 表明讀到數據

if (copied) {

// 注意,這邊只要!timeo,即nonblock設置了就會跳出循環

if (sk->sk_err ||

    sk->sk_state == TCP_CLOSE ||

    (sk->sk_shutdown & RCV_SHUTDOWN) ||

    !timeo ||

    signal_pending(current) ||

    (flags & MSG_PEEK))

break;

}else{

// 到這裏,表明沒有讀到任何數據

// 且nonblock設置了導致timeo=0,則返回-EAGAIN,符合我們的預期

if (!timeo) {

copied = -EAGAIN;

break;

}

// 這邊如果堵到了期望的數據,繼續,否則當前進程阻塞在sk_wait_data上

if (copied >= target) {

/* Do not sleep, just process backlog. */

release_sock(sk);

lock_sock(sk);

} else

sk_wait_data(sk, &timeo);

} while (len > 0);

......

return copied

}


上面的邏輯歸結起來就是:


(1)在設置了nonblock的時候,如果copied>0,則返回讀了多少字節,如果copied=0,則返回-EAGAIN,提示應用重複調用。


(2)如果沒有設置nonblock,如果讀取的數據>=期望,則返回讀取了多少字節。如果沒有則用sk_wait_data將當前進程等待。


如下流程圖所示:



阻塞函數sk_wait_data


sk_wait_data代碼-函數為:


// 將進程狀態設置為可打斷INTERRUPTIBLE

prepare_to_wait(sk->sk_sleep, &wait, TASK_INTERRUPTIBLE);

set_bit(SOCK_ASYNC_WAITDATA, &sk->sk_socket->flags);

// 通過調用schedule_timeout讓出CPU,然後進行睡眠

rc = sk_wait_event(sk, timeo, !skb_queue_empty(&sk->sk_receive_queue));

// 到這裏的時候,有網絡事件或超時事件喚醒了此進程,繼續運行

clear_bit(SOCK_ASYNC_WAITDATA, &sk->sk_socket->flags);

finish_wait(sk->sk_sleep, &wait);


該函數調用schedule_timeout進入睡眠,其進一步調用了schedule函數,首先從運行隊列刪除,其次加入到等待隊列,最後調用和體系結構相關的switch_to宏來完成進程間的切換。


如下圖所示:



阻塞後什麼時候恢復運行呢


情況1:有對應的網絡數據到來


首先我們看下網絡分組到來的內核路徑,網卡發起中斷後調用netif_rx將事件掛入CPU的等待隊列,並喚起軟中斷(soft_irq),再通過linux的軟中斷機制調用net_rx_action,如下圖所示:


注:上圖來自PLKA(<<深入Linux內核架構>>)


緊接着跟蹤next_rx_action


next_rx_action

|-process_backlog

......

|->packet_type->func 在這裏我們考慮ip_rcv

|->ipprot->handler 在這裏ipprot重載為tcp_protocol

(handler 即為tcp_v4_rcv)


緊接着tcp_v4_rcv:


tcp_input.c

tcp_v4_rcv

|-tcp_v4_do_rcv

|-tcp_rcv_state_process

|-tcp_data_queue

|-sk->sk_data_ready=sock_def_readable

|-wake_up_interruptible

|-__wake_up

|-__wake_up_common


在這裏__wake_up_common將停在當前wait_queue_head_t中的進程喚醒,即狀態改為task_running,等待CFS調度以進行下一步的動作,如下圖所示。



情況2:設定的超時時間到來


在前面調用sk_wait_event中調用了schedule_timeout


fastcall signed long __sched schedule_timeout(signed long timeout) {

......

// 設定超時的回掉函數為process_timeout

setup_timer(&timer, process_timeout, (unsigned long)current);

__mod_timer(&timer, expire);

// 這邊讓出CPU

schedule();

del_singleshot_timer_sync(&timer);

timeout = expire - jiffies;

out:

// 返回經過了多長事件

return timeout < 0 ? 0 : timeout;

}


process_timeout函數即是將此進程重新喚醒


static void process_timeout(unsigned long __data)

{

wake_up_process((struct task_struct *)__data);

}


總結


linux內核源代碼博大精深,閲讀其代碼很費周折。希望筆者這篇文章能幫助到閲讀linux網絡協議棧代碼的人。



看完本文有收穫?請分享給更多人

關注「Linux 愛好者」,提升Linux技能

閲讀原文

TAGS: