try: index = self._next_index() except StopIteration: return for _ in range(self._num_workers): # find the next active worker, if any worker_queue_idx = next(self._worker_queue_idx_cycle) if self._workers_status[worker_queue_idx]: if self._in_order: break elif self._workers_num_tasks[worker_queue_idx] < max_tasks // sum( self._workers_status ): break else: # not found (i.e., didn't break) return
while watchdog.is_alive(): try: r = index_queue.get(timeout=MP_STATUS_CHECK_INTERVAL) # 等待任务(队列阻塞时等待,其余时间直接瞬时取得) except queue.Empty: continue # 处理各种信号... idx, index = r # 解包任务 data = fetcher.fetch(index) # 立即获取数据 data_queue.put((idx, data)) # 立即放入输出队列 del data, idx, index, r # 立即清理内存