语法结构

条件分支控制流

掌握如何写出好的条件分支代码非常重要,它可以帮助我们用更简洁、更清晰的代码来表达复杂逻辑。

分支惯用写法

当某个对象作为主角出现在 if 分支里时,解释器会主动对它进行真值测试,也就是调用 bool() 函数获取它的布尔值:

1
2
>>> bool([]), bool([1, 2, 3])
(False, True)

不要因为过度追求简写而引入其他逻辑问题:

1
2
3
4
5
6
7
# 更精准:只有为 0 的时候,才会满足分支条件
if containers_count == 0:
pass

# 更宽泛:为 0、None、空字符串等时,都可以满足分支条件
if not containers_count:
pass

修改对象的布尔值

为类定义 __len__ 魔法方法,实际上就是为它实现了 Python 世界的长度协议,类的实例就可以直接用于真值测试:

1
2
3
4
5
6
7
8
9
10
11
class UserCollection:
def __init__(self, users):
self.items = users

def __len__(self):
return len(self.items)

users = UserCollection(['ethan', 'yang'])

if users:
print("There're some users in the collection!")

不过,定义 __len__ 并非影响布尔值结果的唯一方法,还有一个魔法方法 __bool__ 和对象的布尔值息息相关:

1
2
3
4
5
6
class ScoreJudger:
def __init__(self, score):
this.score = score

def __bool__(self):
return self.score >= 60
1
2
3
4
>>> bool(ScoreJudger(60))
True
>>> bool(ScoreJudger(59))
False

与 None 比较时使用 is 运算符

对于自定义对象来说,它们在进行 == 运算时行为是可操纵的——只要实现类型的 __eq__ 魔法方法就行:

1
2
3
class EqualWithAnything:
def __eq__(self, other):
return True
1
2
3
>>> foo = EqualWithAnything()
>>> foo == None
True

== 和 is 的本质区别:

  • == 对比两个对象的值是否相等,行为可被 __eq__ 方法重载;
  • is 判断两个对象是否是内存里的同一个东西,无法被重载

仅当你需要判断某个对象是否是 None、True、False 时使用 is,因为除了它们外,其他类型的对象在 Python 中并不是严格以单例模式存在的。Python 语言使用了一种名为整数驻留(Integer Interning)的底层优化技术,对于 -5 到 256 的这些常用小整数,Python 会将它们缓存在内存里的一个数组中。

编程建议

要竭尽所能地避免分支嵌套,多层嵌套可以用提前返回的技巧来优化。当你编写分支时,首先找到那些会中断执行的条件,把它们移到函数的最前面,然后在分支里直接使用 return 或 raise 结束执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
def buy_fruit(nerd, store):
if store.is_open():
if store.has_stocks('apple'):
if nerd.can_afford(store.price('apple', amount=1)):
nerd.buy(store, 'apple', amount=1)
return
else:
nerd.go_home_and_get_money()
return buy_fruit(nerd, store)
else:
raise MadAtNoFruit('no apple in the store!')
else:
raise MadAtNoFruit('store is closed!')
1
2
3
4
5
6
7
8
9
10
11
12
13
def buy_fruit(nerd, store):
if not store.is_open():
raise MadAtNoFruit('store is closed!')

if not store.has_stocks('apple'):
raise MadAtNoFruit('no apple in the store!')

if nerd.can_afford(store.price('apple', amount=1)):
nerd.buy(store, 'apple', amount=1)
return
else:
nerd.go_home_and_get_money()
return buy_fruit(nerd, store)

我们可以使用 Python 函数的动态关键字参数(**kwargs)特性,降低分支内代码的相似性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
if user.no_profile_exists:
create_user_profile(
username=data.username,
gender=data.gender,
email=data.email,
age=data.age,
address=data.address,
points=0,
created=now(),
)
else:
update_user_profile(
username=data.username,
gender=data.gender,
email=data.email,
age=data.age,
address=data.address,
updated=now(),
)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
if user.no_profile_exists:
_update_or_create = create_user_profile
extra_args = {'points': 0, 'created': now()}
else:
_update_or_create = update_user_profile
extra_args = {'updated': now()}

_update_or_create(
username=user.username,
gender=user.gender,
email=user.email,
age=user.age,
address=user.address,
**extra_args,
)

德摩根定律:not A or not B 等价于 not (A and B):

1
2
if not user.has_logged_in or not user.is_from_chrome:
return 'our service is only open for chrome logged in user'
1
2
if not (user.has_logged_in and user.is_from_chrome):
return 'our service is only open for chrome logged in user'

all() 和 any() 接受一个可迭代对象作为参数,返回一个布尔值结果:

1
2
3
4
5
6
7
8
def all_numbers_gt_10(numbers):
if not numbers:
return False

for n in numbers:
if n <= 10:
return False
return True
1
2
def all_numbers_gt_10(numbers):
return bool(numbers) and all(n > 10 for n in numbers)

or 最有趣的地方是它的短路求值特性,使用 a or b 来表示“a 为空时用 b 代替”的写法非常常见,你在各种编程语言、各类项目源码里都能发现它的影子。但在这种写法下,其实藏着一个陷阱:

1
2
3
# 假如 config.timeout 的值被主动配置成 0 秒,timeout 也会被重新赋值为 60
# 正确的配置反而被忽略了
timeout = config.timeout or 60

异常与错误处理

如果能善用异常机制优雅地处理好程序里的错误,我们就能用更少、更清晰的代码,写出更健壮的程序。

优先使用异常捕获

两种截然不同的编程风格:

  • LBYL (Look Before You Leap):三思而后行,在执行一个可能会出错的操作时,先做一些关键的条件判断,仅当条件满足时才进行操作;
  • EAFP (Easier to Ask for Forgiveness than Permission):获取原谅比许可简单,不做任何事前检查直接执行操作,但在外层用 try 来捕获可能发生的异常;

和 LBYL 相比,EAFP 编程风格更为简单直接,它总是直奔主流程而去,把意外情况都放在异常处理 try/except 块内消化掉:

1
2
3
4
5
6
7
def incr_by_one(value):
if isinstance(value, int):
return value + 1
elif isinstance(value, str) and value.isdigit():
return int(value) + 1
else:
print(f'Unable to perform incr for value: {value}')
1
2
3
4
5
def incr_by_one(value):
try:
return int(value) + 1
except (TypeError, ValueError) as e:
print(f'Unable to perform incr for value: {value}, error: {e}')

try 语句常用知识

Python 的内置异常类之间存在许多继承关系,比如 BaseException -> Exception -> LookupError -> KeyError,要把更精确的 except 语句放在前面。

异常捕获语句里的 else 表示:仅当 try 语句块里没有抛出任何异常时,才执行 else 分支下的内容,效果就像在 try 最后增加一个标记变量一样:

1
2
3
4
5
6
7
8
9
10
def sync_user_profile(user):
sync_succeeded = False
try:
sync_profile(user.profile, to_external=True)
sync_succeeded = True
except Exception as e:
print("Error while syncing user profile")

if sync_succeeded:
send_notification(user, 'profile sync succeeded')
1
2
3
4
5
6
7
def sync_user_profile(user):
try:
sync_profile(user.profile, to_external=True)
except Exception as e:
print("Error while syncing user profile")
else:
send_notification(user, 'profile sync succeeded')

和 finally 语句不同,假如程序在执行 try 代码块时碰到了 return 或 break 等跳转语句,中断了本次异常捕获,那么即便代码没抛出任何异常,else 分支内的逻辑也不会被执行。

如果仅仅想记录下某个异常,然后把它重新抛出,交由上层处理。这时,不带任何参数的 raise 语句可以派上用场:

1
2
3
4
5
6
def incr_by_key(d, key):
try:
d[key] += 1
except KeyError:
print(f'key {key} does not exists, re-raise the exception')
raise

抛出异常而不是返回错误

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def create_item(name):
if len(name) > MAX_LENGTH_OF_NAME:
return None, 'name of item is too long'
if len(get_current_items()) > MAX_ITEMS_QUOTA:
return None, 'items is full'
return Item(name=name), ''

def create_from_input():
name = input()
_, err_msg = create_item(name)
if err_msg:
print(f'create item failed: {err_msg}')
else:
print('item<{name}> created')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class CreateItemError(Exception):
"""创建 Item 失败"""

def create_item(name):
if len(name) > MAX_LENGTH_OF_NAME:
raise CreateItemError('name of item is too long')
if len(get_current_items()) > MAX_ITEMS_QUOTA:
raise CreateItemError('items is full')
return Item(name=name)

def create_from_input():
name = input()
try:
_ = create_item(name)
except CreateItemError as e:
print(f'create item failed: {e}')
else:
print(f'item<{name}> created')

用抛出异常替代返回错误后,整个代码结构乍看上去变化不大,但细节上的改变其实非常多:

  • 新函数拥有更稳定的返回值类型,它永远只会返回 Item 类型或是抛出异常;
  • 最好在函数文档里说明可能抛出的异常类型;
  • 不同于返回值,异常在被捕获前会不断往调用栈上层汇报。但假如程序缺少一个顶级的统一异常处理逻辑,那么某个被所有人忽视了的异常可能会层层上报,最终弄垮整个程序

使用上下文管理器

with 是一个神奇的关键字,它可以在代码中开辟一段由它管理的上下文,并控制程序在进入和退出这段上下文时的行为。只有满足上下文管理器(Context Manager)协议的对象才可以配合 with 使用,要创建一个上下文管理器只要实现 __enter__ 和 __exit__ 两个魔法方法即可:

1
2
3
4
5
6
7
8
9
10
class DummyContext:
def __init__(self, name):
self.name = name

def __enter__(self):
return f'{self.name}-{random.random()}'

def __exit__(self, exc_type, exc_value, traceback):
print('Exiting DummyContext')
return False

上下文管理器用于替代 finally 语句清理资源:

1
2
3
4
5
6
7
conn = create_conn(host, port, timeout=None)
try:
conn.send_text('Hello, world!')
except Exception as e:
print(f'Unable to use connection: {e}')
finally:
conn.close()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class create_conn_obj:
def __init__(self, host, port, timeout=None):
self.conn = create_conn(host, port, timeout=timeout)

def __enter__(self):
return self.conn

def __exit__(self, exc_type, exc_val, exc_tb):
self.conn.close()
return False

with create_conn_obj(host, port, timeout=None) as conn:
try:
conn.send_text('Hello, world!')
except Exception as e:
print(f'Unable to use connection: {e}')

程序的行为取决于 __exit__ 方法的返回值:

  • __exit__ 返回了 True,那么这个异常就会被当前的 with 语句压制住,不再继续抛出,达到忽略异常的效果;
  • __exit__ 返回了 False,那这个异常就会被正常抛出,交由调用方处理;

使用上下文管理器,我们可以很方便地实现可复用的忽略异常功能:

1
2
3
4
5
6
7
8
class ignore_closed:
def __enter__(self):
pass

def __exit__(self, exc_type, exc_value, traceback):
if exc_type == AlreadyClosedError:
return True
return False

__exit__ 接收的三个参数:

  • exc_type:异常类型;
  • exc_value:异常对象;
  • traceback:错误的堆栈对象;

在日常工作中,我们用到的大多数上下文管理器,可以直接通过生成器函数 + @contextmanager 的方式来定义,这比创建一个符合协议的类要简单得多:

1
2
3
4
5
6
7
8
9
from contextlib import contextmanager

@contextmanager
def create_conn_obj(host, port, timeout=None):
conn = create_conn(host, port, timeout=timeout)
try:
yield conn
finally:
conn.close()

以 yield 关键字为界,yield 前的逻辑会进入管理器时执行(类似于 __enter__),yield 后的逻辑会在退出管理器时执行(类似于 __exit__)。

编程建议

在代码中捕获异常,表面上是避免程序因为异常而直接崩溃,但它的核心,其实是编码者对处于程序主流程之外的、已知或未知情况的一种妥当处置:

  • 永远只捕获那些可能会抛出异常的语句块;
  • 尽量只捕获精确的异常类型,而不是模糊的 Exception;
  • 如果出现了预期外的异常,让程序早点儿崩溃也未必是件坏事;

避免抛出抽象级别高于当前模块的异常:

  • 让模块只抛出与当前抽象级别一致的异常;
  • 在必要的地方进行异常包装与转换;
1
2
3
4
5
def process_image(fp):
try:
image = Image.open(fp)
except Exception:
raise error_codes.INVALID_IMAGE_UPLOADED
1
2
3
4
5
6
7
8
9
10
class ImageOpenError(Exception):
def __init__(self, exc):
self.exc = exc
super().__init__(f'Image open error: {self.exc}')

def process_image(fp):
try:
image = Image.open(fp)
except Exception as e:
raise ImageOpenError(exc=e)

我们同样应该避免泄漏低于当前抽象级别的异常,比如 urllib3 模块是 requests 依赖的低层实现细节。

除了极少数情况外,不要直接忽略异常,通过日志记录下这个异常总会更好。面对异常,调用方可以:

  • 在 except 语句里捕获并处理它,继续执行后面的代码;
  • 在 except 语句里捕获它,将错误通知给终端用户,中断执行;
  • 不捕获异常,让异常继续往堆栈上层走,最终可能导致程序崩溃;

当开发者编写自定义异常类时,遵循的常见原则:

  • 要继承 Exception 而不是 BaseException;
  • 异常类名最好以 Error 或 Exception 结尾;
  • 调用方是否能清晰区分各种异常;

我们可以利用异常间的继承关系,设计一些更精准的异常子类:

1
2
3
4
5
class CreateItemError(Exception):
"""创建 Item 失败"""

class CreateErrorItemsFull(CreateItemError):
"""当前的 Item 容器已满"""

还可以创建一些包含额外属性的异常类,比如包含错误代码:

1
2
3
4
5
class CreateItemError(Exception):
def __init__(self, error_code, message):
self.error_code = error_code
self.message = message
super().__init__(f'{self.error_code} - {self.message}')

assert 是一个专供开发者调试程序的关键字,它所提供的断言检查,可以在执行 Python 时使用 -O 选项直接跳过。请不要拿 assert 来做参数校验,用 raise 语句来替代它吧。

对于所有编写代码的程序员来说,错误处理永远是一种在代码主流程之外的额外负担。空对象模式(Null Object Pattern)在本该返回 None 值或抛出异常时,返回一个符合正确结果接口的特制空类型对象来代替,以此免去调用方的错误处理工作。

循环与可迭代对象

对于一些常见的循环任务,使用 for 比 while 要方便得多。要把循环代码写得漂亮,有时关键不在循环结构自身,而在于另一个用来配合循环的主角:可迭代对象。

迭代器与可迭代对象

iter() 函数和 bool() 很像,调用 iter() 会尝试返回一个迭代器对象。迭代器最鲜明的特征是:

  • 不断执行 next() 函数会返回下一次迭代结果,当迭代器没有更多值可以返回时,便会抛出 StopIteration 异常;
  • 对迭代器执行 iter() 函数,尝试获取迭代器的迭代器对象时,返回的结果一定是迭代器本身;

下面这两段循环代码是等价的:

1
2
3
4
names = ['foo', 'bar', 'baz']

for name in names:
print(name)
1
2
3
4
5
6
7
iterator = iter(names)
while True:
try:
name = next(iterator)
print(name)
except StopIteration:
break

要定义一个迭代器类型,关键在于实现 __iter__ 和 __next__ 两个魔法方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class Range7:
def __init__(self, start, end):
self.start = start
self.end = end
self.current = start

def __iter__(self):
return self

def __next__(self):
while True:
if self.current >= self.end:
raise StopIteration

if self.num_is_valid(self.current):
ret = self.current
self.current += 1
return ret
self.current += 1

迭代器是可迭代对象的一种,它最常出现的场景是在迭代其他对象时,作为一种介质或工具对象存在。每个迭代器都对应一次完整的迭代过程,因此它自身必须保存与当前迭代相关的状态——迭代位置。如果想让 Range7 对象在每次迭代时都返回完整的结果,我们必须把现在的代码拆成两部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class Range7:
def __init__(self, start, end):
self.start = start
self.end = end

def __iter__(self):
return Range7Iterator(self)

class Range7Iterator:
def __init__(self, range_obj):
self.range_obj = range_obj
self.current = range_obj.start

def __iter__(self):
return self

def __next__(self):
while True:
if self.current >= self.range_obj.end:
raise StopIteration

if self.num_is_valid(self.current):
ret = self.current
self.current += 1
return ret
self.current += 1

迭代器与迭代对象的区别:

  • 可迭代对象不一定是迭代器,但迭代器一定是可迭代对象;
  • 对可迭代对象使用 iter() 会返回迭代器,迭代器则会返回其自身;
  • 每个迭代器的被迭代过程是一次性的,可迭代对象则不一定;
  • 可迭代对象只需要实现 __iter__ 方法,而迭代器要额外实现 __next__ 方法;

生成器是一种懒惰的可迭代对象,使用它来替代传统列表可以节约内存,提升执行效率。生成器还是一种简化的迭代器实现,使用它可以大大降低实现传统迭代器的编码成本:

1
2
3
4
5
6
def range_7_gen(start, end):
num = start
while num < end:
if num != 0 and (num % 7 == 0 or '7' in str(num)):
yield num
num += 1

我们可以用 iter() 和 next() 函数来验证生成器就是迭代器这个事实:

1
2
3
4
5
6
7
8
9
>>> nums = range_7_gen(0, 20)
>>> iter(nums)
<generator object range_7_gen at 0x10a0b84a0>
>>> iter(nums) is nums
True
>>> next(nums)
7
>>> next(nums)
14

修饰可迭代对象优化循环

虽然 enumerate() 函数很简单,但它其实代表了一种循环代码优化思路:通过修饰可迭代对象来优化循环。用生成器(或普通的迭代器)在循环外部包装原本的循环主体,完成一些原本必须在循环内部执行的工作:

1
2
3
4
5
6
def sum_even_only(numbers):
result = 0
for num in numbers:
if num % 2 == 0:
result += num
return result
1
2
3
4
5
6
7
8
9
10
def even_only(numbers):
for num in numbers:
if num % 2 == 0:
yield num

def sum_even_only(numbers):
result = 0
for num in even_only(numbers):
result += num
return result

使用 itertools 模块优化循环

使用 product() 扁平化多层嵌套循环,product() 接收多个可迭代对象作为参数,然后根据它们的笛卡尔积不断生成结果:

1
2
3
4
5
6
def find_twelve(num_list1, num_list2, num_list3):
for num1 in num_list1:
for num2 in num_list2:
for num3 in num_list3:
if num1 + num2 + num3 == 12:
return num1, num2, num3
1
2
3
4
5
6
from itertools import product

def find_twelve(num_list1, num_list2, num_list3):
for num1, num2, num3 in product(num_list1, num_list2, num_list3):
if num1 + num2 + num3 == 12:
return num1, num2, num3

使用 islice() 实现循环内隔行处理,islice(seq, start, end, step) 函数和数组切片操作接收的参数几乎完全一致:

1
2
3
4
5
def parse_titles(filename):
with open(filename, 'r') as fp:
for i, line in enumerate(fp):
if i % 2 == 0:
yield line.strip()
1
2
3
4
5
6
from itertools import islice

def parse_titles(filename):
with open(filename, 'r') as fp:
for line in islice(fp, 0, None, 2):
yield line.strip()

使用 takewhile() 替代 break 语句,takewhile(predicate, iterable) 会在迭代第二个参数 iterable 的过程中,不断使用当前值作为参数调用 predicate() 函数,并对返回结果进行真值测试:

1
2
3
4
for user in users:
if not is_qualified(user):
break
# TODO: 进行处理
1
2
3
4
from itertools import takewhile

for user in takewhile(is_qualified, users):
# TODO: 进行处理

循环语句的 else 关键字

for 循环(和 while 循环)后的 else 关键字,代表如果循环没有碰到任何 break,便执行该分支内的语句。因此,老式的“循环 + 标记变量”代码,就可以简写为“循环 + else 分支”:

1
2
3
4
5
6
7
8
9
10
11
12
def process_tasks(tasks):
non_pending_found = False
for task in tasks:
if not task.is_pending():
non_pending_found = True
break
process(task)

if non_pending_found:
notify_admin('Found non-pending task, processing aborted.')
else:
notify_admin('All tasks was processed.')
1
2
3
4
5
6
7
8
def process_tasks(tasks):
for task in tasks:
if not task.is_pending():
notify_admin('Found non-pending task, processing aborted.')
break
process(task)
else:
notify_admin('All tasks was processed.')

编程建议

Python 语言不支持带标签的 break 语句,无法用一个 break 跳出多层循环。如果想快速从嵌套循环里跳出,需要把循环代码拆分成一个新函数,然后直接使用 return:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def print_first_word(fp, prefix):
first_word = None
for line in fp:
for word in line.split():
if word.startswith(prefix):
first_word = word
break
if first_word:
break

if first_word:
print(f'Found the first word startswith {prefix}: {first_word}.')
else:
print(f'Word starts with {prefix} was not found.')
1
2
3
4
5
6
7
8
9
10
11
12
13
def find_first_word(fp, prefix):
for line in fp:
for word in line.split():
if word.startswith(prefix):
return word
return

def print_first_word(fp, prefix):
first_word = find_first_word(fp, prefix)
if first_word:
print(f'Found the first word startswith {prefix}: {first_word}.')
else:
print(f'Word starts with {prefix} was not found.')

拿到字典 d 的第一个 key,先用 iter() 获取一个 d.keys() 的迭代器,再对它调用 next():

1
2
3
>>> d = {'foo': 1, 'bar': 2}
>>> next(iter(d.keys()))
'foo'

找到列表 nums 里面第一个可以被 7 整除的数字,直接用 next() 配合生成器表达式:

1
2
3
>>> nums = [3, 6, 8, 2, 21, 30, 42]
>>> next(i for i in nums if i % 7 == 0)
21

你需要将生成器(迭代器)可被一次性耗尽的特点铭记于心,避免写出由它所导致的 bug。假如要重复使用一个生成器,可以调用 list() 函数将它转成列表后再使用。