The following is the field definition from Linux man proc(5).

SigPnd, ShdPnd: Number of signals pending for thread and for process as a whole (see pthreads(7) and signal(7)).

SigBlk, SigIgn, SigCgt: Masks indicating signals being blocked, ignored, and caught (see signal(7)).

But what they look like and how to interpret? Here is an output example:

SigPnd: 0000000000000000 ShdPnd: 0000000000000000 SigBlk: fffffffe7ffbfeff SigIgn: 0000000000000000 SigCgt: 0000000000000000

They are displayed in hex format as 8 bytes. The right most 4 bytes represent stardard signals, the left is Linux real-time signal extension. Each bit in the 8 bytes represents one corresponding signal. If the bits index starts from zero(the right most bit in the above), The corresponding signal is represented by bit[signalValue-1]. An example is that the signal SIGHUP, whose value is 1, is represented the bit 0.

Take the above SigBlk as example, the first two bytes are

0xfeff

The binary format is

1111,1110,1111,1111

It means all signals from 1 to 16 are blocked except the signal 9 (SIGKILL). This is true because SIGKILL cannot be blocked or ignored.

Last week, I did a small experiment of using oprofile to profile two piece of code. In this small experiment, I learned some basic ideas of using oprofile. In particular, I used oprofile to profile memory cache behaviour and the performance effect. In this experiment, I got ideas of how memory cache invalidation can affect the speed of running program. The following is the detail.

The Experiment Configuration

I ran the experiment in an Thinkpad T400 with a Ubuntu installation. The experiment needs to run on a multi-core processor because of profiling memory cache invalidation cost. In a single core processor, there is no such memory cache invalidation cost.

1
2
3
4
5
6
7
8
9
10
$sudo lsb_release -a
No LSB modules are available.
Distributor ID: Ubuntu
Description:    Ubuntu 12.04 LTS
Release:        12.04
Codename:       precise

$ cat /proc/cpuinfo |grep "model name"
model name      : Intel(R) Core(TM)2 Duo CPU     P8600  @ 2.40GHz
model name      : Intel(R) Core(TM)2 Duo CPU     P8600  @ 2.40GHz

Source Code

There are two pieces of programs, which are “no_alignment” and “alignment”. They are compiled with GNU GCC. Each program clones a child sharing the same memory address space, which means the parent and the child updating different fileds of the same global data. The difference is that the program “alignment” optimizing the fields of the shared_data to the cache line size alignment. In this way, the two fields are able to be fetched on different cache lines. Therefore, when the parent and the child runs on different cores, the “no_alignment” program has the cache invlidation costs between two cores, but the “alignment” program doesn’t.

The following are the source codes of the two programs. The only difference is the definition of struct shared_data.

no_alignment
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#define _GNU_SOURCE
#include <sched.h>
#include <stdio.h>
#include <errno.h>
#include <stdlib.h>

// shared data
struct shared_data {
  unsigned int num_proc1;
  unsigned int num_proc2;
};

struct shared_data shared;

// define loops to run for a while
int loop_i = 100000, loop_j = 100000;

int child(void *);

#define STACK_SIZE (8 * 1024)

int main(void)
{
  pid_t pid;
  int i, j;

  /* Stack */
  char *stack = (char *)malloc(STACK_SIZE);
  if (!stack) {
    perror("malloc");
    exit(1);
  }

  printf("main: shared %p %p\n",
         &shared.num_proc1, &shared.num_proc2);

  /* clone a thread sharing memory space with the parent process */
  if ((pid = clone(child, stack + STACK_SIZE, CLONE_VM, NULL)) < 0) {
    perror("clone");
    exit(1);
  }

  for (i = 0; i < loop_i; i++) {
    for(j = 0; j < loop_j; j++) {
      shared.num_proc1++;
    }
  }
}

int child(void *arg)
{
  int i, j;

  printf("child: shared %p %p\n",
         &shared.num_proc1, &shared.num_proc2);

  for (i = 0; i < loop_i; i++) {
    for (j = 0; j < loop_j; j++) {
      shared.num_proc2++;
    }
  }
}
alignment
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
#define _GNU_SOURCE
#include <sched.h>
#include <stdio.h>
#include <errno.h>
#include <stdlib.h>

// cache line size
// hardware dependent value
// it can checks from /proc/cpuinfo
#define CACHE_LINE_SIZE 64

// shared data aligned with cache line size
struct shared_data {
  unsigned int __attribute__ ((aligned (CACHE_LINE_SIZE))) num_proc1;
  unsigned int __attribute__ ((aligned (CACHE_LINE_SIZE))) num_proc2;
};

struct shared_data shared;

// define loops to run for a while
int loop_i = 100000, loop_j = 100000;

int child(void *);

#define STACK_SIZE (8 * 1024)

int main(void)
{
  pid_t pid;
  int i, j;

  /* Stack */
  char *stack = (char *)malloc(STACK_SIZE);
  if (!stack) {
    perror("malloc");
    exit(1);
  }

  printf("main: shared %p %p\n",
         &shared.num_proc1, &shared.num_proc2);

  /* clone a thread sharing memory space with the parent process */
  if ((pid = clone(child, stack + STACK_SIZE, CLONE_VM, NULL)) < 0) {
    perror("clone");
    exit(1);
  }

  for (i = 0; i < loop_i; i++) {
    for(j = 0; j < loop_j; j++) {
      shared.num_proc1++;
    }
  }
}

int child(void *arg)
{
  int i, j;

  printf("child: shared %p %p\n",
         &shared.num_proc1, &shared.num_proc2);

  for (i = 0; i < loop_i; i++) {
    for (j = 0; j < loop_j; j++) {
      shared.num_proc2++;
    }
  }
}

Testing

I was using the Oprofile 0.99 which has ‘operf’ program that allows non-root users to profile a specified individual process with less setup. Different CPUs have different supported events. The supported events, their meanings and accepted event format by operf can be checked from ophelp and the CPU’s hardware manual. In this experiment, the CLOCK event is CPU_CLK_UNHALTED and L2_LINES_IN is the number of allocated lines in L2 which is L2 cache missing number. As examples, the “CPU_CLK_UNHALTED:100000” tells operf to sample every 100000 unhalted clock with default mask(unhalted core cycles). The “L2_LINES_IN:100000:0xf0” tells operf to sample every 1000000 number of allocated lines in L2 on all cores with all prefetch inclusive.

The following is the experiment results.

profiling result of alignment
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
$time operf --event=CPU_CLK_UNHALTED:100000,L2_LINES_IN:100000:0xf0 ./alignment

operf: Profiler started
main: shared 0x6010c0 0x601100
child: shared 0x6010c0 0x601100

profiled app exited with the following status: 160

Profiling done.

real    0m41.373s
user    0m54.015s
sys     0m0.728s


$opreport
Using /home/work/oprof_cache/oprofile_data/samples/ for samples directory.
CPU: Core 2, speed 2.401e+06 MHz (estimated)
Counted CPU_CLK_UNHALTED events (Clock cycles when not halted) with a unit mask of 0x00 (Unhalted core cycles) count 100000
Counted L2_LINES_IN events (number of allocated lines in L2) with a unit mask of 0xf0 (multiple flags) count 100000
CPU_CLK_UNHALT...|L2_LINES_IN:10...|
  samples|      %|  samples|      %|
------------------------------------
   939093 100.000    396933 100.000 alignment
        CPU_CLK_UNHALT...|L2_LINES_IN:10...|
          samples|      %|  samples|      %|
        ------------------------------------
           936430 99.7164    395920 99.7448 alignment
             2657  0.2829      1013  0.2552 no-vmlinux
                5 5.3e-04         0       0 ld-2.15.so
                1 1.1e-04         0       0 libc-2.15.so
profiling result of no_alignment
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
$time operf --event=CPU_CLK_UNHALTED:100000,L2_LINES_IN:100000:0xf0 ./no_alignment
operf: Profiler started
main: shared 0x601058 0x60105c
child: shared 0x601058 0x60105c
profiled app exited with the following status: 160

Profiling done.

real    1m24.739s
user    1m59.167s
sys     0m1.344s

$opreport
Using /home/work/oprof_cache/oprofile_data/samples/ for samples directory.
CPU: Core 2, speed 2.401e+06 MHz (estimated)
Counted CPU_CLK_UNHALTED events (Clock cycles when not halted) with a unit mask of 0x00 (Unhalted core cycles) count 100000
Counted L2_LINES_IN events (number of allocated lines in L2) with a unit mask of 0xf0 (multiple flags) count 100000
CPU_CLK_UNHALT...|L2_LINES_IN:10...|
  samples|      %|  samples|      %|
------------------------------------
  1423030 100.000   1177262 100.000 no_alignment
        CPU_CLK_UNHALT...|L2_LINES_IN:10...|
          samples|      %|  samples|      %|
        ------------------------------------
          1419249 99.7343   1174038 99.7261 no_alignment
             3778  0.2655      3224  0.2739 no-vmlinux
                2 1.4e-04         0       0 ld-2.15.so
                1 7.0e-05         0       0 libc-2.15.so

Analysis and Conclusion

I drawed the table for the analysising.

Here is the conclusion:

  • The cache missing rate of no_alignment was 82.73%. It became 42.27% with alignment optimization. The cache missing rate was reduced to almost half.
  • The no_alignment ran 120.5 seconds and the alignment ran 55 seconds. It was a bit more than double faster with alignment optimization.

NOTE: The real time is less than user + sys because the program runs on mutlecores parallel

为什么要对这段代码重构

上周对一段代码进行了重构,这段代码是参加一个小组的code review时读到的,这段代码并不长,只有100多行,我花了几个小时(也许两个小时),但是没有读明白这段代码是如何工作的。因为代码可读性不好,代码阅读者的时间就这样被浪费了而没有任何收获。而且代码的书写者在一段时间后也会面临同样的问题,他/她至少需要一点时间才能明白代码如何工作的。还有很重要的一点是,从长远来看,这样的代码可维护性也不高,引入新的改动会比较难或者比较容易引入新的bug。为了避免下一个代码阅读者掉入同样的一个“坑”,我提出需要对代码重构。

为什么可读性不高

原始代码请看后面,它的可读性不好主要体现在以下几点:

  • 变量的命名可读性。比如dummy命名为isdummy,peek_dummy命名为next_msg_isdummy的可读性会更好,sleep_entry实际上就是!q->msg_queue_pending_msgs变量有重复之嫌。关于如何命名可以参考clean code
  • do … while的循环条件是一个长条件(((*head == NULL) && !sleep_entry) || (dummy && sleep_entry) || unlikely(peek_dummy)),难读懂,这是造成可读性不好的主要原因,其实际体现出来的是代码的逻辑(或者流程)不清晰
  • 函数的长度很长。并不是说长函数的可读性一定有问题,但是长函数往往是有问题,它通常反映代码书写者对代码没有进行很好的抽象。抽象是书写代码最终要的一种能力,如果我们打开备受Peter Norvig推崇的Structure and Interpretation of Computer Programs一书,它的第一章就是Building Abstractions with Procedures
  • 注释太少,一些需要注释的而没有,例如上面提到的do … while循环的三个条件

通过与拥有这段代码的小组讨论之后,代码的意图实际上如下面的流程图所示:

流程图没有经过简化,有一部分模块是重复的,但是可以让我们理解原代码的意图是什么。可以看到原代码将整个流程都在一个do … while中实现了,这就导致代码结构不清晰。

代码重构中遵循的一些原则

重构后的代码请看后面,那么重构中遵循了哪些原则呢?并不复杂,只要写代码过程中,留意下面几条原则,代码的可读性就可以得到极大的提高。

  • 给变量和函数一个可读性好的名字,正如我们在clean code中学到的
  • 给if, when, switch等中的逻辑条件(condition)一个体现意图的可读性强的名字,这个condition可以是变量判断或者函数返回值实现的。
  • 保持函数在一个合理的长度
  • 在必要的时候画流程图可以解决混乱,注意这里流程图画的是我们希望代码做什么,而不是我们已经写的代码的流程图,这点很重要,因为我们要写出是执行我们意图的代码
  • DRY principle,即不要在不同的代码重复相同的代码和信息
  • 就象写完文章之前最后再把文章读一边,在提交代码前把代码再读一边,通常我们都可以发现一些改进的点。不要担心这会导致无止无=尽的修改,最终我们会到达一个我们能接受的点,另外deadline也会迫使我们在适当的时候停下。

重构之后的一些反思

  • 我始终认为代码的可读性是最重要的,尤其是现在代码的开发都是团队合作的结果。虽然重构后的代码有将近200多行,这是因为有更多的函数,注释和空格。但新的代码可读性更好,长远看更容易维护(意味着成本更低)
  • 写可读性强的代码需要时间,所以在计划和估计任务时请考虑这点
  • 可读性好的代码不是一蹴而就的,好的代码通常是通过反复的、慢慢的改进而来,我想这就是为什么需要经常性的代码重构的思想的来源
  • 可读性好的代码让读代码的人获得乐趣,写代码的人也会因为写了一份好的代码而获得乐趣
  • 也许一开始我们写可读性好的代码会有难度或者觉得花费时间,但是只要坚持上述提到的原则,早晚我们可以又快又好的写出漂亮的代码
  • 保持stupid simple,就象功夫,花巧并不一定好,真正的高手只用基础招数的组合
  • 没有UT也可以重构,也不影响写出一个好的代码,但是看下一条
  • 当代码提交后,新代码review之后,碰到的一个问题是:是否应该将新代码提交,这样的犹豫是因为没有足够的UT和funtional测试。完全可以理解,所以这就是为什么需要UT和CI。当然我这里也有个疑惑就是Linux Kernel开发没有UT,那么在Linuxe Kernel的演化过程中,它的开发人员是如何获得这样的信心的

重构前的代码

[原始代码] (recv_whole_msg_origin.c) download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
int msg_recv_whole_msg(struct msg_client *client, struct msg_frag **head, int nonblock,
                          struct timespec *ts)
{
   struct msg_frag *cur_msg;
   struct msg_queue *q;
   struct ext_wait_queue wait;
   struct timeval tv;
   int ret = 0;
   long timeout = 0;
   bool dummy, peek_dummy, sleep_entry;
   bool first_entry = true;
   buffer_bottom_t* bb = NULL;
   *head = NULL;

  if(g_enable_timestamps)
     do_gettimeofday(&tv);

   rcu_read_lock(); // make sure q is not freed (in RCU call back)
   q = client->q;
   if (unlikely(!q)) {
      rcu_read_unlock();
      return -ENOTCONN;
   }

   prefetch(q);
   timeout = __prepare_timeout(ts);
   do {
      spin_lock_bh(&q->lock);
      if(likely(first_entry)){
         rcu_read_unlock(); //we can unlock RCU because spin_lock_bh already disabled preempt
         first_entry = false;
      }
      else {
         /* Make sure next message is dummy because other threads may fetch it. */
         if (!__next_msg_is_dummy(q) && ((*head) != NULL)) {
            spin_unlock_bh(&q->lock);
            break;
         }
      }

      if (q->msg_queue_pending_msgs) {
         sleep_entry = false;
         DTRACE(3, "msg_recv_whole_msg(): Msg priority queue not empty.\n");

         cur_msg = __get_first_msg(q, false);
         if (unlikely(cur_msg == NULL)) {
            spin_unlock_bh(&q->lock);
            EPRINTF("Msg Q list empty while counter for msg is %u\n",
               q->msg_queue_pending_msgs);
            return -ERANGE;
         }
         dummy = MSG_IS_DUMMY_MSG(MSGHEADER(cur_msg));

         bb = (buffer_bottom_t *)(cur_msg->data);

         q->msg_queue_pending_msgs--;

         if (dummy) {
            q->msg_queue_pending_dummy_msgs--;
         }

         if (need_input_sync(client, cur_msg)) {
            client->q->msg_queue_pending_wosp_msgs--;
         }

         if (!MSG_IS_SYNC_MSG(bb)){
            q->msg_queue_size -= cur_msg->n_bufs;
            q->msg_queue_msgs--;
            msg_pid_queue_stat_dec(q, bb->process_id, cur_msg->n_bufs);
         }

         recv_input_sync_prepare(client, cur_msg);

         // must put behind sync msg send out, for sync msg will use filler2.
         if (!dummy){
            bb->filler2 = (bb->filler2 & 0xffff0000)| get_and_inc_hand_seq(q, bb->process_id);
         }

         /* Take a peek whether the next message is a dummy message. */
         peek_dummy = __next_msg_is_dummy(q);

         spin_unlock_bh(&q->lock);
         recv_input_sync(client, cur_msg);
         ret = 0;
      }
      else {
         cur_msg = NULL;
         dummy = peek_dummy = false;
         sleep_entry = true;
         if (unlikely(nonblock)) {
            DPRINTF(3, "msg_recv_whole_msg(): nonblock, don't sleep\n");
            spin_unlock_bh(&q->lock);
            ret = -EAGAIN;
         }
         else if (unlikely(timeout < 0)) {
            DPRINTF(3, "msg_recv_whole_msg(): timeout < 0: %ld\n", timeout);
            spin_unlock_bh(&q->lock);
            ret = -ETIMEDOUT;
         }
         else {
            wait.task = current;
            wait.state = STATE_NONE;
            DPRINTF(3, "msg_recv_whole_msg(): going to sleep.\n");
            ret = __wq_sleep(&client->q, &timeout, &wait);
            if (ret == 0) {
               cur_msg = wait.head;
               recv_input_sync(client, cur_msg);
               if(unlikely(MSG_IS_DUMMY_MSG(MSGHEADER(cur_msg)))){
                  WPRINTF("Unexpected dummy message received in wait task. From %x.%x.\n",
                     MSGHEADER(cur_msg)->computer,MSGHEADER(cur_msg)->family);
                  dummy = true;
               }
            }
         }
      }
      DTRACE(3, "msg_recv_whole_msg() ret: %d, msg:(%p)\n", ret, cur_msg);

      if (dummy)
         msg_free_msg(&cur_msg);
      else if ((*head) == NULL) //Store the non-dummy message to receiver
         *head = cur_msg;
   } while (((*head == NULL) && !sleep_entry) || (dummy && sleep_entry) || unlikely(peek_dummy));

   /* restore bb->phys_computer because all input-sync related logic is completed here
    restore it so that monitor tools sees compliant data as cIPA */
   if (*head) {
      bb = (buffer_bottom_t *)(*head)->data;
      bb->phys_computer = bb->next_phys_computer;
      if(g_enable_timestamps){
         set_time_rec(*head, TIMEREC_ENTER_IOCTL, &tv);
      }
   }

   __msg_monitor(*head, MON_POINTS_T_RECEIVE_C);

   return ret;
}

重构后的代码

[重构后的代码] (recv_whole_msg_r2.c) download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
/**
 * Aquire the q's lock need RCU read lock proctect because the q can be release
 *   in the mid without that proctect. After the q's lock is got, rcu read lock
 *   is released because q cannot be released anymore
 *
 * return NULL: failure
 * Otherwise: pointer to q is returned and q is locked
 */
STATIC INLINE struct msg_queue *client_q_lock_protect(struct msg_client *client) {
  struct msg_queue *q = NULL;

  rcu_read_lock();
  q = client->q;
  if (unlikely(!q)) {
    rcu_read_unlock();
    return NULL;
  }
  spin_lock_bh(&q->lock);
  rcu_read_unlock();
  return q;
}

#define client_q_unlock(q) spin_unlock_bh(&q->lock)


/**
 * Get message from pending queue.
 *
 * Caller should hold the q's lock and the q shouldn't be empty
 * 
 * The q's lock will be freed in this function when it returns
 * 
 * Return: 0 is successful otherwise failure
 * Output: In case of success, *head helds the pointer of the received msg
 */
STATIC INLINE int __get_a_msg_from_q(struct msg_client *client, struct msg_frag **head)
{
  bool dummy;
  buffer_bottom_t     * bb = NULL;
  struct msg_queue * q = client->q;
  struct msg_frag *cur_msg = NULL;

  cur_msg = __get_first_msg(q, false);

  if (unlikely(!cur_msg)) {
    EPRINTF("Msg Q list empty while counter for msg is %u\n", q->msg_queue_pending_msgs);
    client_q_unlock(q);
    return -ERANGE;
  }

  q->msg_queue_pending_msgs--;
  dummy = MSG_IS_DUMMY_MSG(MSGHEADER(cur_msg));
  if (dummy) {
    q->msg_queue_pending_dummy_msgs--;
  }

  if (need_input_sync(client, cur_msg)) {
    client->q->msg_queue_pending_wosp_msgs--;
  }

  bb = (buffer_bottom_t *)(cur_msg)->data;
  if (!MSG_IS_SYNC_MSG(bb)){
    q->msg_queue_size -= cur_msg->n_bufs;
    q->msg_queue_msgs--;
    msg_pid_queue_stat_dec(q, bb->process_id, cur_msg->n_bufs);
  }

  recv_input_sync_prepare(client, cur_msg);

  // must put behind sync msg send out, for sync msg will use filler2.
  if (!dummy){
    bb->filler2 = (bb->filler2 & 0xffff0000)| get_and_inc_hand_seq(q, bb->process_id);
  }

  client_q_unlock(q);
  *head = cur_msg;
  return 0;
}


/**
 * Waiting for new message, it will sleep while no message is arrival.
 * Caller should hold the q's lock
 * The __wq_sleep will free the q's lock when it returns from.
 */
STATIC INLINE int __waiting_for_new_msg(struct msg_queue *q, int nonblock,
              struct timespec *ts, struct msg_frag **head)
{
  int    ret = 0;
  long   timeout = 0;
  struct ext_wait_queue wait;

  *head = NULL;
  timeout = __prepare_timeout(ts);
  if (unlikely(nonblock)) {
    DPRINTF(3, "%s(): nonblock, don't sleep\n", __func__);
    spin_unlock_bh(&q->lock);
    ret = -EAGAIN;
  }
  else if (unlikely(timeout < 0)) {
    DPRINTF(3, "%s(): timeout < 0: %ld\n", __func__, timeout);
    spin_unlock_bh(&q->lock);
    ret = -ETIMEDOUT;
  }
  else {
    wait.task = current;
    wait.state = STATE_NONE;
    DPRINTF(3, "%s(): going to sleep.\n", __func__);
    ret = __wq_sleep(&q, &timeout, &wait);
    if (ret == 0) {
      *head = wait.head;
    }
  }

  return ret;
}


/**
 * return 0 until it received a non dummy message, the recevied msg is saved
 *   in the parameter cur_msg. If it received a dummy message, it handles 
 *   this dummy message and back to receive next message again 
 * 
 * return non-zero if there are errors
 */
STATIC INLINE int msg_recv_a_nondummy_msg(struct msg_client *client,
                 struct msg_frag * cur_msg, int nonblock,
                 struct timespec *ts)
{
  bool dummy = false; // the received msg is a dummy message
  int  ret = 0;
  struct msg_queue * q;

  cur_msg = NULL;
  do {  /* Get first un-dummy messages. */
    q = client_q_lock_protect(client);
    if (unlikely(!q)){
      return -ENOTCONN;
    }

    if (q->msg_queue_pending_msgs) {
      DTRACE(3, "%s(): Msg priority queue NOT empty.\n", __func__);
      ret = __get_a_msg_from_q(client, &cur_msg);
    }
    else {
      DTRACE(3, "%s(): Msg priority queue empty.\n", __func__);
      ret = __waiting_for_new_msg(q, nonblock, ts, &cur_msg);

    }

    if (unlikely(ret != 0)) {
      return ret;
    }

    recv_input_sync(client, cur_msg);

    dummy = MSG_IS_DUMMY_MSG(MSGHEADER(cur_msg));
    if (dummy) {
      msg_free_msg(&cur_msg);
    }
  } while (dummy);

  // a non dummy message is received, return OK
  return 0;
}


/**
 * exhaust all dummy messages in the queue until the queue is empty or the head
 * message isn't a dummy message.
 *
 * Up to now, no need to know whether this function succeeds or not. Therefore,
 * no error code is returned even if there are errors
 */
STATIC INLINE void exhaust_dummy_msgs_from_queue(struct msg_client *client)
{
  struct msg_queue * q;
  struct msg_frag * cur_msg = 0;

  for(;;) {
    q = client_q_lock_protect(client);
    if (unlikely(!q)) {
      break;
    }

    if (!__next_msg_is_dummy(q)) {
      client_q_unlock(q);
      break;
    }

    if (unlikely( __get_a_msg_from_q(client, &cur_msg))) {
      break;
    }

    msg_free_msg(&cur_msg);
  }
}

/*
 *
 */
int msg_recv_whole_msg(struct msg_client *client, struct msg_frag **head,
            int nonblock, struct timespec *ts)
{
  int  ret = 0;
  struct timeval tv;
  buffer_bottom_t * bb = NULL;
  struct msg_frag * cur_msg = 0;

  *head = NULL;

  if(g_enable_timestamps)
    do_gettimeofday(&tv);

  ret = msg_recv_a_nondummy_msg(client, cur_msg, nonblock, ts);
  if (unlikely(!ret)){
    return ret;
  }

  /* restore bb->phys_computer because all input-sync related logic is completed here
   * restore it so that monitor tools sees compliant data as cIPA */
  *head = cur_msg;
  cur_msg = NULL;
  bb = (buffer_bottom_t *)(*head)->data;
  bb->phys_computer = bb->next_phys_computer;

  if(g_enable_timestamps){
    set_time_rec(*head, TIMEREC_ENTER_IOCTL, &tv);
  }

  /* exhaust dummy msgs before give the received msg to user 
   * for better latency of relative messages
   */
  exhaust_dummy_msgs_from_queue(client);

  return ret;
}

双向链表是一个简单的数据结构,使用非常广泛,应该人人知道,会用,打开编辑器马上可以写一个。但是Linux的双向链表实现非常巧妙,它使用了一个小小的编程技巧,使得开发人员定义自己的数据结构时添加一个struct list_head字段(list_head是Linux Kernel定义的双向链表的数据结构),就可以使自己的数据结构具备双向链表功能,采用Linux Kernel提供的操作函数,使得使用双向链表非常简单。如果我们在v3.9-rc8中grep list_head,竟然有多达11400多条匹配,可见双向链表在Linux kernel中的江湖地位,实际上,我们在读Linux源码时,到处可以见到以list_head组织起来的数据结构。

下面我们将详细看看list_head结构及其相关函数,并且用一个简单例子学习如何使用它们。

list_head结构

数据结构是代码的核心,我们先来看看双向链表的数据结构。linux kernel在<linux/list.h>中定义了双向链表的数据结构struct list_head。list_head定义很简单,只有两个成员函数next和prev,用于指向list_head:

struct list_head的定义
1
2
3
struct list_head {
       struct list_head *next, *prev;
};

函数列表

在list_head定义的基础上,Linux kernel提供了丰富的链表操作函数,下表列出了其中一些主要的。所有双向链表的操作都应该使用操作函数,而不应该直接访问list_head内部的next和prev指针,这样当list_head的实现改变时,使用的代码可以不受到影响。

使用

Linux kernel的双向列表实现巧妙,只要在我们的数据结构中定义一个struct list_head的字段就可以使我们的数据结构具备双向链表的功能。通过链表提供的操作函数,我们可以创建自己的双向链表。一个创建好的链表为下图所示的组织形式:

我们可以通过一个例子来具体的理解和体会这个结构的方便简单的使用方式。

例子

定义自己的数据结构

定义我们的struct my_struct_t结构:

定义my_struct_t
1
2
3
4
5
typedef struct my_struct {
  int id;
  struct list_head lists;
  char *name;
} my_struct_t;

struct list_head可以在数据结构中任意位置。上面代码中,我们定义了一个my_struct_t的结构,有三个字段id, lists和name。其中lists申明为struct list_head.

定义链表头
定义链表头my_struct_list
1
LIST_HEAD(my_struct_list);

LIST_HEAD是一个宏,它实际上声明了一个struct list_head变量my_struct_list,并且初始化字段next和prev指向自身。初始化完成之后,my_struct_list实际为如下图所示:

插入一个链表项

我们可以用list_add(new, head)向在链表头部插入一个链表项。

插入my_struct2到链表
1
2
3
my_struct_t my_struct2 = {.id = 2,
                          .name = "struct2"};
list_add(&my_struct2.lists, &my_struct_list);

插入my_struct2之后,我们看到的链表结构如下图所示:

再插入一个链表项
插入my_struct1到链表
1
2
3
my_struct_t my_struct1 = {.id = 1,
                          .name = "struct1"};
list_add(&my_struct1.lists, &my_struct_list);

插入my_struct1之后,我们看到的链表结构如下图所示:

从链表项指针得到原数据结构的指针

从链表项指针到原数据结构的指针是双向链表使用便利的关键,如果我们得到了一个链表项,可以通过list_entry(ptr, type, member)来得到指向struct list_head嵌入的数据结构的指针。其中ptr是一个指向struct list_head的指针,type是struct list_head嵌入的数据结构,memeber是struct list_head嵌入的字段名。看一个例子会容易明白:

得到指向原数据结构的指针
1
my_struct_t *my_structp = list_entry(my_struct_list.next, my_struct_t, lists);

my_structp会指向上述链表结构中my_struct1,和下述赋值语句等效

1
my_structp = &my_struct1;

大家可能对list_entry如何实现比较好奇,下面是简化的源代码,使用了c中一个常见的技巧:

list_entry的实现
1
2
3
4
#define list_entry(ptr, type, member) \
          (type *) ((char *) ptr - offset(type, memeber);

#define offsetof(TYPE, MEMBER) ((size_t) &((TYPE *)0)->MEMBER)

offsetof计算字段MEMBER在类型TYPE中的偏移,这个偏移在编译的时候就由编译器计算得到。list_entry将链表项的指针减去list_head字段的偏移就得到了原数据结构的指针。

遍历整个链表list_for_each和list_for_each_entry

使用list_for_each(pos, head)可以遍历整个链表的链表项,head是链表头,pos是指向当前链表项的指针,每次循环时,pos都会更新为下一个链表项,下面例子打印列表中的id和name:

遍历一
1
2
3
4
5
6
7
struct list_head *pos;
my_struct_t *my_structp;

list_for_each(pos, &my_struct_list) {
  my_structp = list_entry(pos, my_struct_t, lists);
  printf("id=%d, name=%s\n", my_structp->id, my_structp->name);
}

输出

id=1, name=struct1 id=2, name=struct2

list_for_each_entry(pos, head, member)可以遍历整个链表的原数据结构,head是链表头,pos为指向当前链表项所嵌入的原数据结构的指针,member是被嵌入数据结构struct list_head申明的字段名。实际上是list_for_each和list_entry的组合,所以上段代码也可以写成下面的方式,看起来更加简洁:

遍历二
1
2
3
4
5
my_struct_t *my_structp;

list_for_each_entry(my_structp, &my_struct_list, lists) {
  printf("id=%d, name=%s\n", my_structp->id, my_structp-name);
}

总结

  1. 使用双向链表需要在自己的数据结构中申明一个struct list_head的字段,定义的数据结构通过这个字段链接起来
  2. 使用LIST_HEAD宏申明一个链表头,这个链表头指向双向链表
  3. 使用<linux/list.h>提供的add,delete等操作函数管理链表
  4. 通过list_entry及其他_entry结尾的函数得到原数据结构的指针,list_entry的实现采用了一个小技巧,是链表方便易用的原因