恐龙书课后答案都看完了,还是不会做题?试试用Python模拟这些经典OS算法(附代码)

张开发
2026/4/29 22:38:13 15 分钟阅读

分享文章

恐龙书课后答案都看完了,还是不会做题?试试用Python模拟这些经典OS算法(附代码)
从理论到实践用Python模拟操作系统核心算法操作系统理论学习后的实践困境许多计算机科学专业的学生在研读《操作系统概念》这类经典教材时常常陷入一种困境课后习题答案都能看懂但面对实际问题或面试中的算法实现题目时却无从下手。这种现象在技术社区中被称为恐龙书综合征——理论了然于胸实践却举步维艰。操作系统核心算法如进程调度、内存管理和死锁处理等其理论描述往往抽象简洁。例如银行家算法的描述可能只有几段文字和简单的公式但要将这些文字转化为可运行的代码却需要跨越巨大的认知鸿沟。这种理论与实践之间的脱节正是许多学习者面临的痛点。为什么模拟实现算法如此重要可视化算法执行过程直观理解抽象概念验证理论假设发现边界条件和异常情况培养系统编程能力为实际开发打下基础面试中展示扎实的操作系统功底环境准备与基础工具1.1 Python环境配置我们将使用Python 3.7作为实现语言因其简洁的语法和丰富的库支持特别适合算法模拟。推荐使用以下工具链# 创建虚拟环境 python -m venv os-simulator source os-simulator/bin/activate # Linux/Mac os-simulator\Scripts\activate # Windows # 安装必要库 pip install numpy matplotlib tabulate1.2 模拟框架设计为保持代码结构清晰我们设计一个基础模拟类class OSSimulator: def __init__(self): self.time 0 # 模拟时钟 self.log [] # 事件日志 def log_event(self, event): 记录模拟事件 self.log.append((self.time, event)) def print_log(self): 输出模拟日志 from tabulate import tabulate print(tabulate(self.log, headers[Time, Event]))进程调度算法实现2.1 先来先服务(FCFS)调度FCFS是最简单的调度算法完美展示了调度器的基本结构class FCFSSimulator(OSSimulator): def __init__(self, processes): super().__init__() self.processes sorted(processes, keylambda x: x[arrival_time]) self.ready_queue [] def run(self): current_process None while self.processes or self.ready_queue or current_process: # 新进程到达 while self.processes and self.processes[0][arrival_time] self.time: p self.processes.pop(0) self.ready_queue.append(p) self.log_event(fProcess {p[pid]} arrived) # 分配CPU if not current_process and self.ready_queue: current_process self.ready_queue.pop(0) self.log_event(fProcess {current_process[pid]} started) # 执行进程 if current_process: current_process[remaining_time] - 1 if current_process[remaining_time] 0: self.log_event(fProcess {current_process[pid]} completed) current_process None self.time 12.2 最短作业优先(SJF)调度SJF算法需要预测进程的执行时间我们使用指数平均法进行预测class SJFSimulator(OSSimulator): def __init__(self, processes, alpha0.5): super().__init__() self.processes sorted(processes, keylambda x: x[arrival_time]) self.ready_queue [] self.alpha alpha # 预测权重系数 self.predicted_time 5 # 初始预测值 def predict_next(self, actual_time): 指数平均预测 self.predicted_time self.alpha * actual_time (1 - self.alpha) * self.predicted_time return self.predicted_time def run(self): current_process None while self.processes or self.ready_queue or current_process: # 新进程到达 while self.processes and self.processes[0][arrival_time] self.time: p self.processes.pop(0) p[predicted] self.predict_next(p[burst_time]) self.ready_queue.append(p) self.ready_queue.sort(keylambda x: x[predicted]) # 按预测时间排序 self.log_event(fProcess {p[pid]} arrived, predicted time: {p[predicted]:.1f}) # 分配CPU if not current_process and self.ready_queue: current_process self.ready_queue.pop(0) actual_time current_process[burst_time] self.predict_next(actual_time) # 更新预测模型 self.log_event(fProcess {current_process[pid]} started) # 执行进程 if current_process: current_process[remaining_time] - 1 if current_process[remaining_time] 0: self.log_event(fProcess {current_process[pid]} completed) current_process None self.time 12.3 时间片轮转(RR)调度RR算法引入了时间片概念是分时系统的核心class RRSimulator(OSSimulator): def __init__(self, processes, time_quantum4): super().__init__() self.processes sorted(processes, keylambda x: x[arrival_time]) self.ready_queue [] self.time_quantum time_quantum self.current_quantum 0 def run(self): current_process None while self.processes or self.ready_queue or current_process: # 新进程到达 while self.processes and self.processes[0][arrival_time] self.time: p self.processes.pop(0) self.ready_queue.append(p) self.log_event(fProcess {p[pid]} arrived) # 时间片用完或进程完成 if current_process and (self.current_quantum self.time_quantum or current_process[remaining_time] 0): if current_process[remaining_time] 0: self.ready_queue.append(current_process) self.log_event(fProcess {current_process[pid]} preempted) else: self.log_event(fProcess {current_process[pid]} completed) current_process None self.current_quantum 0 # 分配CPU if not current_process and self.ready_queue: current_process self.ready_queue.pop(0) self.log_event(fProcess {current_process[pid]} started) # 执行进程 if current_process: current_process[remaining_time] - 1 self.current_quantum 1 self.time 1内存管理算法实现3.1 页面置换算法最近最少使用(LRU)算法class LRUSimulator: def __init__(self, page_references, frame_count3): self.page_references page_references self.frame_count frame_count self.frames [] self.page_faults 0 self.access_order [] # 记录访问顺序 def run(self): for page in self.page_references: if page not in self.frames: self.page_faults 1 if len(self.frames) self.frame_count: # 找到最久未使用的页面 lru_page None for p in self.frames: if p not in self.access_order[-self.frame_count:]: lru_page p break if not lru_page: # 都在最近访问过取最早访问的 lru_page self.access_order[-self.frame_count] self.frames.remove(lru_page) self.frames.append(page) # 更新访问记录 if page in self.access_order: self.access_order.remove(page) self.access_order.append(page) print(fPage {page}: Frames {self.frames}, Faults {self.page_faults})先进先出(FIFO)算法class FIFOSimulator: def __init__(self, page_references, frame_count3): self.page_references page_references self.frame_count frame_count self.frames [] self.page_faults 0 self.queue [] # 维护替换顺序 def run(self): for page in self.page_references: if page not in self.frames: self.page_faults 1 if len(self.frames) self.frame_count: oldest self.queue.pop(0) self.frames.remove(oldest) self.frames.append(page) self.queue.append(page) print(fPage {page}: Frames {self.frames}, Faults {self.page_faults})3.2 银行家算法实现死锁避免的经典算法class BankersAlgorithm: def __init__(self, available, max_claim, allocation): self.available available self.max_claim max_claim self.allocation allocation self.n_process len(max_claim) self.n_resources len(available) def is_safe(self): work self.available.copy() finish [False] * self.n_process need [[self.max_claim[i][j] - self.allocation[i][j] for j in range(self.n_resources)] for i in range(self.n_process)] safe_sequence [] while True: found False for i in range(self.n_process): if not finish[i] and all(need[i][j] work[j] for j in range(self.n_resources)): # 模拟资源分配 work [work[j] self.allocation[i][j] for j in range(self.n_resources)] finish[i] True safe_sequence.append(i) found True break if not found: break if all(finish): print(Safe state. Sequence:, safe_sequence) return True else: print(Unsafe state detected!) return False def request_resources(self, process_id, request): need [self.max_claim[process_id][j] - self.allocation[process_id][j] for j in range(self.n_resources)] if not all(request[j] need[j] for j in range(self.n_resources)): print(Error: Process has exceeded its maximum claim) return False if not all(request[j] self.available[j] for j in range(self.n_resources)): print(Resources not available. Process must wait) return False # 尝试分配 new_available [self.available[j] - request[j] for j in range(self.n_resources)] new_allocation [self.allocation[i][:] for i in range(self.n_process)] new_allocation[process_id] [self.allocation[process_id][j] request[j] for j in range(self.n_resources)] # 检查安全性 temp_banker BankersAlgorithm(new_available, self.max_claim, new_allocation) if temp_banker.is_safe(): self.available new_available self.allocation new_allocation print(Request granted) return True else: print(Request denied - would lead to unsafe state) return False算法可视化与分析4.1 调度算法性能比较我们可以通过模拟不同负载下的调度算法来比较其性能def compare_schedulers(processes): print(\n FCFS Simulation ) fcfs FCFSSimulator([p.copy() for p in processes]) fcfs.run() fcfs.print_log() print(\n SJF Simulation ) sjf SJFSimulator([p.copy() for p in processes]) sjf.run() sjf.print_log() print(\n RR Simulation ) rr RRSimulator([p.copy() for p in processes]) rr.run() rr.print_log() # 测试数据 test_processes [ {pid: 1, arrival_time: 0, burst_time: 8}, {pid: 2, arrival_time: 1, burst_time: 4}, {pid: 3, arrival_time: 2, burst_time: 9}, {pid: 4, arrival_time: 3, burst_time: 5} ] compare_schedulers(test_processes)4.2 页面置换算法比较模拟不同页面引用串下的表现def compare_page_replacements(references, frame_counts[3, 4]): for frames in frame_counts: print(f\n LRU with {frames} frames ) lru LRUSimulator(references, frames) lru.run() print(fTotal page faults: {lru.page_faults}) print(f\n FIFO with {frames} frames ) fifo FIFOSimulator(references, frames) fifo.run() print(fTotal page faults: {fifo.page_faults}) # 测试数据 page_references [1, 2, 3, 4, 1, 2, 5, 1, 2, 3, 4, 5] compare_page_replacements(page_references)进阶挑战与扩展5.1 多级反馈队列实现结合了多种调度策略优点的复杂调度器class MFQSimulator(OSSimulator): def __init__(self, processes, queues3, base_quantum2): super().__init__() self.processes sorted(processes, keylambda x: x[arrival_time]) self.queues [[] for _ in range(queues)] # 多级队列 self.quantums [base_quantum * (2 ** i) for i in range(queues)] self.current_quantum 0 self.current_queue 0 def run(self): current_process None while self.processes or any(self.queues) or current_process: # 新进程到达 while self.processes and self.processes[0][arrival_time] self.time: p self.processes.pop(0) self.queues[0].append(p) # 新进程进入最高优先级队列 self.log_event(fProcess {p[pid]} arrived - Q0) # 时间片用完或进程完成 if current_process and (self.current_quantum self.quantums[self.current_queue] or current_process[remaining_time] 0): if current_process[remaining_time] 0: # 降级到下一队列 new_q min(self.current_queue 1, len(self.queues) - 1) self.queues[new_q].append(current_process) self.log_event(fProcess {current_process[pid]} demoted to Q{new_q}) else: self.log_event(fProcess {current_process[pid]} completed) current_process None self.current_quantum 0 # 分配CPU (从最高优先级非空队列选择) if not current_process: for q in range(len(self.queues)): if self.queues[q]: current_process self.queues[q].pop(0) self.current_queue q self.log_event(fProcess {current_process[pid]} started from Q{q}) break # 执行进程 if current_process: current_process[remaining_time] - 1 self.current_quantum 1 self.time 15.2 磁盘调度算法模拟实现SCAN和C-SCAN等磁盘调度策略class DiskScheduler: def __init__(self, requests, initial_position50, max_track199): self.requests requests self.position initial_position self.max_track max_track self.direction 1 # 1 for outward, -1 for inward self.total_movement 0 def fcfs(self): movement 0 for track in self.requests: movement abs(self.position - track) self.position track return movement def scan(self): movement 0 requests sorted(self.requests) # 分割请求 left [r for r in requests if r self.position] right [r for r in requests if r self.position] if self.direction 1: # 向外扫描 for track in right: movement abs(self.position - track) self.position track if right: # 移动到最外道 movement abs(self.position - self.max_track) self.position self.max_track for track in reversed(left): movement abs(self.position - track) self.position track else: # 向内扫描 for track in reversed(left): movement abs(self.position - track) self.position track if left: # 移动到最内道 movement abs(self.position - 0) self.position 0 for track in right: movement abs(self.position - track) self.position track return movement def c_scan(self): movement 0 requests sorted(self.requests) left [r for r in requests if r self.position] right [r for r in requests if r self.position] # 单向扫描 for track in right: movement abs(self.position - track) self.position track if right: # 移动到最外道 movement abs(self.position - self.max_track) self.position self.max_track # 直接跳回最内道 if left: movement abs(self.position - 0) self.position 0 for track in left: movement abs(self.position - track) self.position track return movement实战应用与面试准备6.1 常见面试题目实现生产者-消费者问题进程同步经典问题import threading import time import random class ProducerConsumer: def __init__(self, buffer_size5): self.buffer [] self.buffer_size buffer_size self.mutex threading.Semaphore(1) self.empty threading.Semaphore(buffer_size) self.full threading.Semaphore(0) def producer(self, id): for i in range(3): # 每个生产者生产3个产品 item random.randint(1, 100) self.empty.acquire() # 等待空位 self.mutex.acquire() # 进入临界区 self.buffer.append(item) print(fProducer {id} produced {item}. Buffer: {self.buffer}) self.mutex.release() self.full.release() # 增加一个满位 time.sleep(random.random()) def consumer(self, id): for i in range(3): # 每个消费者消费3个产品 self.full.acquire() # 等待有产品 self.mutex.acquire() # 进入临界区 item self.buffer.pop(0) print(fConsumer {id} consumed {item}. Buffer: {self.buffer}) self.mutex.release() self.empty.release() # 增加一个空位 time.sleep(random.random()) # 测试 pc ProducerConsumer() producers [threading.Thread(targetpc.producer, args(i,)) for i in range(2)] consumers [threading.Thread(targetpc.consumer, args(i,)) for i in range(2)] for p in producers: p.start() for c in consumers: c.start() for p in producers: p.join() for c in consumers: c.join()6.2 性能优化技巧当实现操作系统算法时性能优化需要考虑数据结构选择调度器常使用堆或优先队列管理就绪队列预计算如银行家算法中的Need矩阵可预先计算存储惰性计算页面置换算法中的访问位可批量更新空间换时间使用更多内存存储中间结果加速决策# 优化后的LRU实现使用OrderedDict from collections import OrderedDict class OptimizedLRU: def __init__(self, capacity): self.cache OrderedDict() self.capacity capacity def access(self, key): if key in self.cache: self.cache.move_to_end(key) return True else: if len(self.cache) self.capacity: self.cache.popitem(lastFalse) self.cache[key] True return False # 测试 lru OptimizedLRU(3) pages [1, 2, 3, 4, 1, 2, 5, 1, 2, 3, 4, 5] faults 0 for page in pages: if not lru.access(page): faults 1 print(fPage {page}: Faults {faults})

更多文章