单调栈与单调队列:从滑动窗口最值到 DP 优化的工程映射
2026/6/12 11:57:52 网站建设 项目流程

单调栈与单调队列:从滑动窗口最值到 DP 优化的工程映射

一、暴力枚举的"时间黑洞":O(n²) 窗口扫描 vs O(n) 单调维护

滑动窗口最值是算法竞赛和工程数据处理中的高频问题。给定长度为 n 的数组和窗口大小 k,求每个窗口的最大值或最小值。朴素解法对每个窗口遍历 k 个元素,总时间复杂度 O(nk),当 n=10⁶、k=10⁵ 时耗时超过 10 秒。某实时数据监控系统中,每秒需对 50 万条指标计算滑动窗口极值,暴力扫描直接导致处理延迟飙升,P99 从 200ms 恶化到 3.5s。

单调栈与单调队列的核心思想是:维护一个单调递增或递减的序列,利用"淘汰不可能成为答案的元素"策略,将每个元素最多入栈/出栈一次,从而将整体复杂度降至 O(n)。

二、单调栈与单调队列的底层机制

flowchart TB subgraph 单调栈["单调栈(Monotonic Stack)"] direction TB S1[新元素入栈] --> S2{栈顶元素 ≤ 新元素?} S2 -->|是| S3[弹出栈顶,继续比较] S3 --> S2 S2 -->|否| S4[新元素入栈] S4 --> S5[栈底到栈顶单调递减] end subgraph 单调队列["单调队列(Monotonic Queue)"] direction TB Q1[新元素入队] --> Q2{队尾元素 ≤ 新元素?} Q2 -->|是| Q3[弹出队尾,继续比较] Q3 --> Q2 Q2 -->|否| Q4[新元素入队尾] Q4 --> Q5{队首元素超出窗口?} Q5 -->|是| Q6[弹出队首] Q6 --> Q7[队首即为窗口最大值] Q5 -->|否| Q7 end 单调栈 -->|适用场景| A[下一个更大/更小元素] 单调队列 -->|适用场景| B[滑动窗口最值] style 单调栈 fill:#eef,stroke:#333 style 单调队列 fill:#efe,stroke:#333

单调栈和单调队列的区别在于:单调栈只在一端操作,适合求解"下一个更大/更小元素"类问题;单调队列在两端操作,额外维护窗口范围约束,适合求解"滑动窗口最值"类问题。两者共享同一个核心不变量——序列的单调性。

三、生产级代码实现与应用

from collections import deque from typing import List, Tuple, Optional # ============ 核心1:单调栈 ============ def next_greater_element(nums: List[int]) -> List[int]: """ 求每个元素右侧第一个比它大的元素索引 单调栈维护递减序列,弹出时记录答案 时间复杂度 O(n),每个元素最多入栈出栈各一次 """ n = len(nums) result = [-1] * n stack = [] # 存储索引,栈底到栈顶对应的值单调递减 for i in range(n): # 当前元素比栈顶大,栈顶的"下一个更大元素"就是当前元素 while stack and nums[stack[-1]] < nums[i]: idx = stack.pop() result[idx] = i stack.append(i) return result def largest_rectangle_area(heights: List[int]) -> int: """ 柱状图中的最大矩形面积 单调栈求每个柱子左右两侧第一个比它矮的位置 这是单调栈最经典的应用之一 """ n = len(heights) # 左边界:左侧第一个比 heights[i] 小的位置 left = [-1] * n stack = [] for i in range(n): while stack and heights[stack[-1]] >= heights[i]: stack.pop() left[i] = stack[-1] if stack else -1 stack.append(i) # 右边界:右侧第一个比 heights[i] 小的位置 right = [n] * n stack = [] for i in range(n - 1, -1, -1): while stack and heights[stack[-1]] >= heights[i]: stack.pop() right[i] = stack[-1] if stack else n stack.append(i) max_area = 0 for i in range(n): width = right[i] - left[i] - 1 max_area = max(max_area, heights[i] * width) return max_area # ============ 核心2:单调队列 ============ class MonotonicQueue: """ 单调队列:维护滑动窗口内的最大值 队列从队首到队尾单调递减 """ def __init__(self): self._deque = deque() # 存储索引 def push(self, idx: int, val: int, window_left: int): """ 入队:淘汰队尾所有比当前值小的元素 这些被淘汰的元素永远不会成为窗口最大值 """ # 淘汰队尾更小的元素 while self._deque and self._deque_val(self._deque[-1]) <= val: self._deque.pop() self._deque.append((idx, val)) # 淘汰超出窗口的队首元素 while self._deque and self._deque[0][0] < window_left: self._deque.popleft() def _deque_val(self, item): return item[1] if isinstance(item, tuple) else item def max(self) -> Optional[int]: """获取当前窗口最大值""" if not self._deque: return None return self._deque[0][1] def min_queue(self): """返回最小值队列(单调递增)""" return self # 实际使用时反转比较逻辑即可 def sliding_window_max(nums: List[int], k: int) -> List[int]: """ 滑动窗口最大值 时间复杂度 O(n),空间复杂度 O(k) """ if not nums or k <= 0: return [] result = [] dq = deque() # 存储索引,对应值从队首到队尾单调递减 for i in range(len(nums)): # 淘汰队尾比当前元素小的——它们不可能成为后续窗口的最大值 while dq and nums[dq[-1]] <= nums[i]: dq.pop() dq.append(i) # 淘汰超出窗口范围的队首元素 if dq[0] <= i - k: dq.popleft() # 窗口形成后,队首即为最大值 if i >= k - 1: result.append(nums[dq[0]]) return result # ============ 应用1:股票价格跨度 ============ class StockSpanner: """ 股票价格跨度:连续小于等于今日价格的天数(含今日) 等价于求左侧第一个比今日价格大的位置 """ def __init__(self): self._stack = [] # (价格, 跨度) self._day = 0 def next(self, price: int) -> int: span = 1 # 弹出所有比当前价格小的,累加它们的跨度 while self._stack and self._stack[-1][0] <= price: _, s = self._stack.pop() span += s self._stack.append((price, span)) self._day += 1 return span # ============ 应用2:DP 优化——最长上升子序列 O(n log n) ============ def length_of_lis_binary(nums: List[int]) -> int: """ 最长上升子序列:贪心 + 二分 维护一个单调递增数组 tails,tails[i] 表示长度为 i+1 的上升子序列的最小末尾元素 时间复杂度 O(n log n) """ tails = [] for num in nums: # 二分查找第一个 >= num 的位置 lo, hi = 0, len(tails) while lo < hi: mid = (lo + hi) // 2 if tails[mid] < num: lo = mid + 1 else: hi = mid if lo == len(tails): tails.append(num) else: tails[lo] = num # 替换,保持更小的末尾 return len(tails) # ============ 应用3:DP 优化——滑动窗口最大值优化 ============ def constrained_subset_sum(nums: List[int], k: int) -> int: """ 带窗口约束的最大子序列和 dp[i] = nums[i] + max(0, dp[i-k], ..., dp[i-1]) 单调队列优化 dp 转移,从 O(nk) 降到 O(n) """ n = len(nums) dp = [0] * n dq = deque() # 存储索引,dp 值单调递减 for i in range(n): # 淘汰超出窗口的队首 while dq and dq[0] < i - k: dq.popleft() # 状态转移 dp[i] = nums[i] if dq and dp[dq[0]] > 0: dp[i] += dp[dq[0]] # 维护单调性 while dq and dp[dq[-1]] <= dp[i]: dq.pop() dq.append(i) return max(dp) # ============ 应用4:实时指标监控 ============ class SlidingWindowMonitor: """ 实时数据监控:滑动窗口极值与均值 适用于时序数据流场景 """ def __init__(self, window_size: int): self._k = window_size self._max_dq = deque() # 存 (idx, val),单调递减 self._min_dq = deque() # 存 (idx, val),单调递增 self._sum = 0 self._count = 0 self._data = [] def push(self, value: float): """推入新数据点""" idx = self._count self._count += 1 self._data.append(value) self._sum += value # 维护最大值队列 while self._max_dq and self._max_dq[-1][1] <= value: self._max_dq.pop() self._max_dq.append((idx, value)) # 维护最小值队列 while self._min_dq and self._min_dq[-1][1] >= value: self._min_dq.pop() self._min_dq.append((idx, value)) # 淘汰超出窗口的元素 left = idx - self._k + 1 if left > 0: self._sum -= self._data[left - 1] while self._max_dq and self._max_dq[0][0] < left: self._max_dq.popleft() while self._min_dq and self._min_dq[0][0] < left: self._min_dq.popleft() def get_max(self) -> Optional[float]: if not self._max_dq: return None return self._max_dq[0][1] def get_min(self) -> Optional[float]: if not self._min_dq: return None return self._min_dq[0][1] def get_avg(self) -> Optional[float]: window_len = min(self._count, self._k) if window_len == 0: return None return self._sum / window_len

四、单调栈与单调队列的 Trade-offs

元素淘汰的不可逆性。单调栈弹出元素后无法恢复,这意味着只能求解"下一个"满足条件的位置,无法回溯"上一个"。如果需要双向查询,必须分别从左到右和从右到左各扫描一次。

等值元素的处理策略。当序列中存在相等元素时,单调性的定义(严格单调 vs 非严格单调)直接影响结果。求"下一个更大元素"时用严格大于弹出,求"下一个更大或相等元素"时用大于等于弹出。不同题意需要不同的弹出条件,混淆会导致结果错误。

空间开销与窗口约束。单调队列需要同时维护索引和值,空间复杂度为 O(k)。在极端情况下(如严格递增序列),队列会存储窗口内所有元素。对于超大规模数据流,需要评估内存占用是否可接受。

DP 优化中的适用范围。单调队列优化 DP 仅适用于转移方程形如dp[i] = f(nums[i]) + max/min(dp[j])(i-k ≤ j < i)的场景。对于更复杂的转移(如涉及乘法、非线性函数),单调队列无法直接应用。

五、总结

单调栈与单调队列通过维护序列的单调性,将暴力 O(n²) 的扫描降为 O(n),核心策略是"淘汰不可能成为答案的元素"。单调栈适用于"下一个更大/更小元素"和"最大矩形面积"类问题;单调队列适用于"滑动窗口最值"和"DP 转移优化"类问题。在工程实践中,滑动窗口监控器是单调队列的直接应用场景,股票价格跨度是单调栈的典型应用。使用时需注意等值元素的处理策略、空间开销评估以及 DP 优化中对转移方程形式的约束。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询