为什么要对这段代码重构

上周对一段代码进行了重构,这段代码是参加一个小组的code review时读到的,这段代码并不长,只有100多行,我花了几个小时(也许两个小时),但是没有读明白这段代码是如何工作的。因为代码可读性不好,代码阅读者的时间就这样被浪费了而没有任何收获。而且代码的书写者在一段时间后也会面临同样的问题,他/她至少需要一点时间才能明白代码如何工作的。还有很重要的一点是,从长远来看,这样的代码可维护性也不高,引入新的改动会比较难或者比较容易引入新的bug。为了避免下一个代码阅读者掉入同样的一个“坑”,我提出需要对代码重构。

为什么可读性不高

原始代码请看后面,它的可读性不好主要体现在以下几点:

  • 变量的命名可读性。比如dummy命名为isdummy,peek_dummy命名为next_msg_isdummy的可读性会更好,sleep_entry实际上就是!q->msg_queue_pending_msgs变量有重复之嫌。关于如何命名可以参考clean code
  • do … while的循环条件是一个长条件(((*head == NULL) && !sleep_entry) || (dummy && sleep_entry) || unlikely(peek_dummy)),难读懂,这是造成可读性不好的主要原因,其实际体现出来的是代码的逻辑(或者流程)不清晰
  • 函数的长度很长。并不是说长函数的可读性一定有问题,但是长函数往往是有问题,它通常反映代码书写者对代码没有进行很好的抽象。抽象是书写代码最终要的一种能力,如果我们打开备受Peter Norvig推崇的Structure and Interpretation of Computer Programs一书,它的第一章就是Building Abstractions with Procedures
  • 注释太少,一些需要注释的而没有,例如上面提到的do … while循环的三个条件

通过与拥有这段代码的小组讨论之后,代码的意图实际上如下面的流程图所示:

流程图没有经过简化,有一部分模块是重复的,但是可以让我们理解原代码的意图是什么。可以看到原代码将整个流程都在一个do … while中实现了,这就导致代码结构不清晰。

代码重构中遵循的一些原则

重构后的代码请看后面,那么重构中遵循了哪些原则呢?并不复杂,只要写代码过程中,留意下面几条原则,代码的可读性就可以得到极大的提高。

  • 给变量和函数一个可读性好的名字,正如我们在clean code中学到的
  • 给if, when, switch等中的逻辑条件(condition)一个体现意图的可读性强的名字,这个condition可以是变量判断或者函数返回值实现的。
  • 保持函数在一个合理的长度
  • 在必要的时候画流程图可以解决混乱,注意这里流程图画的是我们希望代码做什么,而不是我们已经写的代码的流程图,这点很重要,因为我们要写出是执行我们意图的代码
  • DRY principle,即不要在不同的代码重复相同的代码和信息
  • 就象写完文章之前最后再把文章读一边,在提交代码前把代码再读一边,通常我们都可以发现一些改进的点。不要担心这会导致无止无=尽的修改,最终我们会到达一个我们能接受的点,另外deadline也会迫使我们在适当的时候停下。

重构之后的一些反思

  • 我始终认为代码的可读性是最重要的,尤其是现在代码的开发都是团队合作的结果。虽然重构后的代码有将近200多行,这是因为有更多的函数,注释和空格。但新的代码可读性更好,长远看更容易维护(意味着成本更低)
  • 写可读性强的代码需要时间,所以在计划和估计任务时请考虑这点
  • 可读性好的代码不是一蹴而就的,好的代码通常是通过反复的、慢慢的改进而来,我想这就是为什么需要经常性的代码重构的思想的来源
  • 可读性好的代码让读代码的人获得乐趣,写代码的人也会因为写了一份好的代码而获得乐趣
  • 也许一开始我们写可读性好的代码会有难度或者觉得花费时间,但是只要坚持上述提到的原则,早晚我们可以又快又好的写出漂亮的代码
  • 保持stupid simple,就象功夫,花巧并不一定好,真正的高手只用基础招数的组合
  • 没有UT也可以重构,也不影响写出一个好的代码,但是看下一条
  • 当代码提交后,新代码review之后,碰到的一个问题是:是否应该将新代码提交,这样的犹豫是因为没有足够的UT和funtional测试。完全可以理解,所以这就是为什么需要UT和CI。当然我这里也有个疑惑就是Linux Kernel开发没有UT,那么在Linuxe Kernel的演化过程中,它的开发人员是如何获得这样的信心的

重构前的代码

[原始代码] (recv_whole_msg_origin.c) download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
int msg_recv_whole_msg(struct msg_client *client, struct msg_frag **head, int nonblock,
                          struct timespec *ts)
{
   struct msg_frag *cur_msg;
   struct msg_queue *q;
   struct ext_wait_queue wait;
   struct timeval tv;
   int ret = 0;
   long timeout = 0;
   bool dummy, peek_dummy, sleep_entry;
   bool first_entry = true;
   buffer_bottom_t* bb = NULL;
   *head = NULL;

  if(g_enable_timestamps)
     do_gettimeofday(&tv);

   rcu_read_lock(); // make sure q is not freed (in RCU call back)
   q = client->q;
   if (unlikely(!q)) {
      rcu_read_unlock();
      return -ENOTCONN;
   }

   prefetch(q);
   timeout = __prepare_timeout(ts);
   do {
      spin_lock_bh(&q->lock);
      if(likely(first_entry)){
         rcu_read_unlock(); //we can unlock RCU because spin_lock_bh already disabled preempt
         first_entry = false;
      }
      else {
         /* Make sure next message is dummy because other threads may fetch it. */
         if (!__next_msg_is_dummy(q) && ((*head) != NULL)) {
            spin_unlock_bh(&q->lock);
            break;
         }
      }

      if (q->msg_queue_pending_msgs) {
         sleep_entry = false;
         DTRACE(3, "msg_recv_whole_msg(): Msg priority queue not empty.\n");

         cur_msg = __get_first_msg(q, false);
         if (unlikely(cur_msg == NULL)) {
            spin_unlock_bh(&q->lock);
            EPRINTF("Msg Q list empty while counter for msg is %u\n",
               q->msg_queue_pending_msgs);
            return -ERANGE;
         }
         dummy = MSG_IS_DUMMY_MSG(MSGHEADER(cur_msg));

         bb = (buffer_bottom_t *)(cur_msg->data);

         q->msg_queue_pending_msgs--;

         if (dummy) {
            q->msg_queue_pending_dummy_msgs--;
         }

         if (need_input_sync(client, cur_msg)) {
            client->q->msg_queue_pending_wosp_msgs--;
         }

         if (!MSG_IS_SYNC_MSG(bb)){
            q->msg_queue_size -= cur_msg->n_bufs;
            q->msg_queue_msgs--;
            msg_pid_queue_stat_dec(q, bb->process_id, cur_msg->n_bufs);
         }

         recv_input_sync_prepare(client, cur_msg);

         // must put behind sync msg send out, for sync msg will use filler2.
         if (!dummy){
            bb->filler2 = (bb->filler2 & 0xffff0000)| get_and_inc_hand_seq(q, bb->process_id);
         }

         /* Take a peek whether the next message is a dummy message. */
         peek_dummy = __next_msg_is_dummy(q);

         spin_unlock_bh(&q->lock);
         recv_input_sync(client, cur_msg);
         ret = 0;
      }
      else {
         cur_msg = NULL;
         dummy = peek_dummy = false;
         sleep_entry = true;
         if (unlikely(nonblock)) {
            DPRINTF(3, "msg_recv_whole_msg(): nonblock, don't sleep\n");
            spin_unlock_bh(&q->lock);
            ret = -EAGAIN;
         }
         else if (unlikely(timeout < 0)) {
            DPRINTF(3, "msg_recv_whole_msg(): timeout < 0: %ld\n", timeout);
            spin_unlock_bh(&q->lock);
            ret = -ETIMEDOUT;
         }
         else {
            wait.task = current;
            wait.state = STATE_NONE;
            DPRINTF(3, "msg_recv_whole_msg(): going to sleep.\n");
            ret = __wq_sleep(&client->q, &timeout, &wait);
            if (ret == 0) {
               cur_msg = wait.head;
               recv_input_sync(client, cur_msg);
               if(unlikely(MSG_IS_DUMMY_MSG(MSGHEADER(cur_msg)))){
                  WPRINTF("Unexpected dummy message received in wait task. From %x.%x.\n",
                     MSGHEADER(cur_msg)->computer,MSGHEADER(cur_msg)->family);
                  dummy = true;
               }
            }
         }
      }
      DTRACE(3, "msg_recv_whole_msg() ret: %d, msg:(%p)\n", ret, cur_msg);

      if (dummy)
         msg_free_msg(&cur_msg);
      else if ((*head) == NULL) //Store the non-dummy message to receiver
         *head = cur_msg;
   } while (((*head == NULL) && !sleep_entry) || (dummy && sleep_entry) || unlikely(peek_dummy));

   /* restore bb->phys_computer because all input-sync related logic is completed here
    restore it so that monitor tools sees compliant data as cIPA */
   if (*head) {
      bb = (buffer_bottom_t *)(*head)->data;
      bb->phys_computer = bb->next_phys_computer;
      if(g_enable_timestamps){
         set_time_rec(*head, TIMEREC_ENTER_IOCTL, &tv);
      }
   }

   __msg_monitor(*head, MON_POINTS_T_RECEIVE_C);

   return ret;
}

重构后的代码

[重构后的代码] (recv_whole_msg_r2.c) download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
/**
 * Aquire the q's lock need RCU read lock proctect because the q can be release
 *   in the mid without that proctect. After the q's lock is got, rcu read lock
 *   is released because q cannot be released anymore
 *
 * return NULL: failure
 * Otherwise: pointer to q is returned and q is locked
 */
STATIC INLINE struct msg_queue *client_q_lock_protect(struct msg_client *client) {
  struct msg_queue *q = NULL;

  rcu_read_lock();
  q = client->q;
  if (unlikely(!q)) {
    rcu_read_unlock();
    return NULL;
  }
  spin_lock_bh(&q->lock);
  rcu_read_unlock();
  return q;
}

#define client_q_unlock(q) spin_unlock_bh(&q->lock)


/**
 * Get message from pending queue.
 *
 * Caller should hold the q's lock and the q shouldn't be empty
 * 
 * The q's lock will be freed in this function when it returns
 * 
 * Return: 0 is successful otherwise failure
 * Output: In case of success, *head helds the pointer of the received msg
 */
STATIC INLINE int __get_a_msg_from_q(struct msg_client *client, struct msg_frag **head)
{
  bool dummy;
  buffer_bottom_t     * bb = NULL;
  struct msg_queue * q = client->q;
  struct msg_frag *cur_msg = NULL;

  cur_msg = __get_first_msg(q, false);

  if (unlikely(!cur_msg)) {
    EPRINTF("Msg Q list empty while counter for msg is %u\n", q->msg_queue_pending_msgs);
    client_q_unlock(q);
    return -ERANGE;
  }

  q->msg_queue_pending_msgs--;
  dummy = MSG_IS_DUMMY_MSG(MSGHEADER(cur_msg));
  if (dummy) {
    q->msg_queue_pending_dummy_msgs--;
  }

  if (need_input_sync(client, cur_msg)) {
    client->q->msg_queue_pending_wosp_msgs--;
  }

  bb = (buffer_bottom_t *)(cur_msg)->data;
  if (!MSG_IS_SYNC_MSG(bb)){
    q->msg_queue_size -= cur_msg->n_bufs;
    q->msg_queue_msgs--;
    msg_pid_queue_stat_dec(q, bb->process_id, cur_msg->n_bufs);
  }

  recv_input_sync_prepare(client, cur_msg);

  // must put behind sync msg send out, for sync msg will use filler2.
  if (!dummy){
    bb->filler2 = (bb->filler2 & 0xffff0000)| get_and_inc_hand_seq(q, bb->process_id);
  }

  client_q_unlock(q);
  *head = cur_msg;
  return 0;
}


/**
 * Waiting for new message, it will sleep while no message is arrival.
 * Caller should hold the q's lock
 * The __wq_sleep will free the q's lock when it returns from.
 */
STATIC INLINE int __waiting_for_new_msg(struct msg_queue *q, int nonblock,
              struct timespec *ts, struct msg_frag **head)
{
  int    ret = 0;
  long   timeout = 0;
  struct ext_wait_queue wait;

  *head = NULL;
  timeout = __prepare_timeout(ts);
  if (unlikely(nonblock)) {
    DPRINTF(3, "%s(): nonblock, don't sleep\n", __func__);
    spin_unlock_bh(&q->lock);
    ret = -EAGAIN;
  }
  else if (unlikely(timeout < 0)) {
    DPRINTF(3, "%s(): timeout < 0: %ld\n", __func__, timeout);
    spin_unlock_bh(&q->lock);
    ret = -ETIMEDOUT;
  }
  else {
    wait.task = current;
    wait.state = STATE_NONE;
    DPRINTF(3, "%s(): going to sleep.\n", __func__);
    ret = __wq_sleep(&q, &timeout, &wait);
    if (ret == 0) {
      *head = wait.head;
    }
  }

  return ret;
}


/**
 * return 0 until it received a non dummy message, the recevied msg is saved
 *   in the parameter cur_msg. If it received a dummy message, it handles 
 *   this dummy message and back to receive next message again 
 * 
 * return non-zero if there are errors
 */
STATIC INLINE int msg_recv_a_nondummy_msg(struct msg_client *client,
                 struct msg_frag * cur_msg, int nonblock,
                 struct timespec *ts)
{
  bool dummy = false; // the received msg is a dummy message
  int  ret = 0;
  struct msg_queue * q;

  cur_msg = NULL;
  do {  /* Get first un-dummy messages. */
    q = client_q_lock_protect(client);
    if (unlikely(!q)){
      return -ENOTCONN;
    }

    if (q->msg_queue_pending_msgs) {
      DTRACE(3, "%s(): Msg priority queue NOT empty.\n", __func__);
      ret = __get_a_msg_from_q(client, &cur_msg);
    }
    else {
      DTRACE(3, "%s(): Msg priority queue empty.\n", __func__);
      ret = __waiting_for_new_msg(q, nonblock, ts, &cur_msg);

    }

    if (unlikely(ret != 0)) {
      return ret;
    }

    recv_input_sync(client, cur_msg);

    dummy = MSG_IS_DUMMY_MSG(MSGHEADER(cur_msg));
    if (dummy) {
      msg_free_msg(&cur_msg);
    }
  } while (dummy);

  // a non dummy message is received, return OK
  return 0;
}


/**
 * exhaust all dummy messages in the queue until the queue is empty or the head
 * message isn't a dummy message.
 *
 * Up to now, no need to know whether this function succeeds or not. Therefore,
 * no error code is returned even if there are errors
 */
STATIC INLINE void exhaust_dummy_msgs_from_queue(struct msg_client *client)
{
  struct msg_queue * q;
  struct msg_frag * cur_msg = 0;

  for(;;) {
    q = client_q_lock_protect(client);
    if (unlikely(!q)) {
      break;
    }

    if (!__next_msg_is_dummy(q)) {
      client_q_unlock(q);
      break;
    }

    if (unlikely( __get_a_msg_from_q(client, &cur_msg))) {
      break;
    }

    msg_free_msg(&cur_msg);
  }
}

/*
 *
 */
int msg_recv_whole_msg(struct msg_client *client, struct msg_frag **head,
            int nonblock, struct timespec *ts)
{
  int  ret = 0;
  struct timeval tv;
  buffer_bottom_t * bb = NULL;
  struct msg_frag * cur_msg = 0;

  *head = NULL;

  if(g_enable_timestamps)
    do_gettimeofday(&tv);

  ret = msg_recv_a_nondummy_msg(client, cur_msg, nonblock, ts);
  if (unlikely(!ret)){
    return ret;
  }

  /* restore bb->phys_computer because all input-sync related logic is completed here
   * restore it so that monitor tools sees compliant data as cIPA */
  *head = cur_msg;
  cur_msg = NULL;
  bb = (buffer_bottom_t *)(*head)->data;
  bb->phys_computer = bb->next_phys_computer;

  if(g_enable_timestamps){
    set_time_rec(*head, TIMEREC_ENTER_IOCTL, &tv);
  }

  /* exhaust dummy msgs before give the received msg to user 
   * for better latency of relative messages
   */
  exhaust_dummy_msgs_from_queue(client);

  return ret;
}