Python 高手编程系列三千五百零二:处理错误与速率限制

张开发
2026/6/14 22:23:54 15 分钟阅读

分享文章

Python 高手编程系列三千五百零二:处理错误与速率限制
在处理这些问题时你可能会遇到的最后一个问题是外部服务提供商施加的速率限制。以使用 Google Maps API 为例在撰写本书时免费和未经身份验证的请求的官方费率限制为每秒 10 个请求和每天 2,500 个请求。当使用多线程时很容易耗尽这样的限制。更严重的问题是因为我们没有覆盖任何故障的场景而处理多线程 Python 代码中的异常比平常更复杂。当客户端超过 Google 的速率时api.geocode()函数将抛出异常这是个好消息。但是这个异常是单独引发的不会导致整个程序崩溃。工作线程当然会立即退出但是主线程将等待 work_queue 上存储的所有任务完成使用 work_queue.join()调用。这意味着我们的工作线程应该优雅地处理可能的异常并确保队列中的所有项目都被处理。如果不做进一步的改进我们可能会遇到一些情况一些工作线程崩溃程序永远不会退出。让我们对我们的代码进行一些小的改动以便为可能出现的任何问题做好准备。在工作线程中的异常情况下我们可以在 results_queue 队列中放置一个错误实例并将当前任务标记为完成与没有错误时一样。这样我们确保主线程在 work_queue.join()中等待时不会无限期地锁定。主线程然后可以检查结果并重新提出在结果队列中发现的任何异常。下面是可以以更安全的方式处理异常的 worker()和 main()函数的改进版本def worker(work_queue, results_queue):while True:try:item work_queue.get(blockFalse)except Empty:breakelse:try:result fetch_place(item)except Exception as err:results_queue.put(err)else:results_queue.put(result)finally:work_queue.task_done()def main():work_queue Queue()results_queue Queue()for place in PLACES:work_queue.put(place)threads [Thread(targetworker, args(work_queue, results_queue))for _ in range(THREAD_POOL_SIZE)]for thread in threads:thread.start()work_queue.join()while threads:threads.pop().join()while not results_queue.empty():result results_queue.get()if isinstance(result, Exception):raise resultpresent_result(result)当我们准备好处理异常时是时候打破我们的代码并超过速率限制。我们可以通过修改一些初始条件轻松地做到这一点。我们可以增加地理编码的位数和线程池的大小如下所示PLACES (‘Reykjavik’, ‘Vien’, ‘Zadar’, ‘Venice’,‘Wrocław’, ‘Bolognia’, ‘Berlin’, ‘Słubice’,‘New York’, ‘Dehli’,) * 10THREAD_POOL_SIZE 10如果你的执行环境足够快你应该很快就会得到类似的错误如下$ python3 threadpool_with_errors.pyNew York, NY, USA, 40.71, -74.01Berlin, Germany, 52.52, 13.40Wrocław, Poland, 51.11, 17.04Zadar, Croatia, 44.12, 15.23Vienna, Austria, 48.21, 16.37Bologna, Italy, 44.49, 11.34Reykjavík, Iceland, 64.13, -21.82Venice, Italy, 45.44, 12.32Dehli, Gujarat, India, 21.57, 73.22Slubice, Poland, 52.35, 14.56Vienna, Austria, 48.21, 16.37Zadar, Croatia, 44.12, 15.23Venice, Italy, 45.44, 12.32Reykjavík, Iceland, 64.13, -21.82Traceback (most recent call last):File “threadpool_with_errors.py”, line 83, inmain()File “threadpool_with_errors.py”, line 76, in mainraise resultFile “threadpool_with_errors.py”, line 43, in workerresult fetch_place(item)File “threadpool_with_errors.py”, line 23, in fetch_placereturn api.geocode(place)[0]File “…\site-packages\gmaps\geocoding.py”, line 37, in geocodereturn self._make_request(self.GEOCODE_URL, parameters, “results”)File “…\site-packages\gmaps\client.py”, line 89, in _make_request)(response)gmaps.errors.RateLimitExceeded: {‘status’: ‘OVER_QUERY_LIMIT’, ‘results’: [],‘error_message’: ‘You have exceeded your rate-limit for this API.’, ‘url’:‘https://maps.googleapis.com/maps/api/geocode/json?addressWroc%C5%82awsensorfalse’}前面的异常当然不是错误代码的结果。对于这个免费的服务这个程序有点过快。它产生了太多的并发请求为了正常工作我们需要一种方法来限制它们的速率。对工作速度的限制通常被称为节流。PyPI 上有几个包可以限制任何类型的工作的速率并且易于使用。但是我们不会在这里使用任何外部代码。节流是一个很好的用于介绍一些线程的锁定原语的机会所以我们将尝试从头开始构建一个解决方案。我们将使用的算法有时被称为令牌桶token bucket并且非常简单。• 存在具有预定量的令牌的桶。• 每个令牌响应单个权限以处理一项工作。• 每次工作者要求一个或多个令牌权限时○ 我们测量从上次我们重新装满桶所花费的时间○ 如果时间差允许它我们用对这个时间差响应的令牌量重新填充桶○ 如果存储的令牌的数量大于或等于请求的数量我们减少存储的令牌的数量并返回那个值○ 如果存储的令牌的数量小于请求的数量我们返回零。两个重要的事情是总是用零令牌来初始化令牌桶并且从不允许它用根据我们的标准量化时间以令牌表示的更多的令牌来填充令牌桶。如果我们不遵守这些预防措施我们可以释放超过速率限制的令牌。因为在我们的情况下速率限制以每秒的请求数表示我们不需要处理任意时间。我们假设我们测量的基础是一秒钟因此我们永远不会存储更多的令牌比允许的那个时间量的请求数。下面是允许使用令牌桶算法进行调节的类的示例实现from threading import Lockclass Throttle:definit(self, rate):self._consume_lock Lock()self.rate rateself.tokens 0self.last 0def consume(self, amount1):with self._consume_lock:now time.time()时间测量在第一令牌请求上初始化以避免初始突发if self.last 0:self.last nowelapsed now - self.last请确保传递时间的量足够大以添加新的令牌if int(elapsed * self.rate):self.tokens int(elapsed * self.rate)self.last now不要过度填满桶self.tokens (self.rateif self.tokens self.rateelse self.tokens)如果可用最终分派令牌if self.tokens amount:self.tokens - amountelse:amount 0return amount这个类的用法很简单。假设我们在主线程中只创建了一个 Throttle 实例例如Throttle(10)并将其作为位置参数传递给每个工作线程。在不同线程中使用相同的数据结构是安全的因为我们使用来自 threading 模块的 Lock 类的实例防止其内部状态的操作。我们现在可以更新 worker()函数实现等待每个项目直到 throttle 释放一个新的令牌如下所示def worker(work_queue, results_queue, throttle):while True:try:item work_queue.get(blockFalse)except Empty:breakelse:while not throttle.consume():passtry:result fetch_place(item)except Exception as err:results_queue.put(err)else:results_queue.put(result)finally:work_queue.task_done()

更多文章