본문 바로가기
공부 정리/알고리즘

우선순위 큐

by 블로그별명 2023. 9. 15.

우선순위 큐 개념정리

우선순위 큐의 기능

우선순위 큐는 주로 다음과 같은 연산을 지원한다

  1. Insertion(삽입): 원소를 큐에 삽입할때는 해당 원소의 우선순위도 함께 지정된다
  2. Deletion(삭제): 큐에서 원소를 삭제할때는 가장 높은(또는 낮은) 우선순위를 가진 원소가 제거된다

 

우선순위 큐는 언제 사용될수있을까

우선순위 큐는 다양한 상황에서 요소들 사이의 상대적인 '중요성' 또는 '우선순위'를 고려하여 처리해야 할 때 유용하게 사용된다. 병원의 응급실은 이에 대한 아주 좋은 예다.
응급실에서 환자들은 단순히 먼저 도착한 순서대로 진료를 받는 것이 아니라, 그들의 상태나 '중증도'에 따라 진료 우선순위가 결정된다. 이러한 방식은 특히 생명을 구하는 긴급한 상황에서 필수적이다. 단순한 감기 증상으로 응급실을 찾은 환자보다는 심장 마비나 중증 외상을 입은 환자가 먼저 치료를 받는것이 옳기 때문이다

 

 

안정적 우선순위 큐 vs 불안정 우선순위 큐

만약 우선순위 큐에 같은 우선순위를 가진 요소들이 여러개 있다면 어떻게 처리해야할까
안정적 우선순위 큐는 동일한 우선순위를 가진 요소들이 존재할 경우, 요소들이 큐에 삽입된 순서를 고려한다. 즉, 먼저 삽입된 요소가 먼저 제거된다. 반면에 불안정 우선순위 큐는 동일한 우선순위를 가진 요소들의 삽입 순서를 고려하지 않는다. 

 

우선순위 큐 구현

우선순위 큐를 구현할수있는 더 다양한 방법이 있지만 간단하게 두가지 방법(배열을이용한 방법, 이진힙을 이용한 방법)만 알아보겠다

 

배열을 이용한 우선순위 큐

  • 삽입과정: 배열에 단순 append로 O(1)의 시간복잡도를 가진다
  • 제거과정: 배열안의 모든 요소를 순회하며 argmax 또는 argmin을 찾은후 제거한다 따라서 O(n)의 시간복잡도를 가진다 

 

class PriorityQueue:
    def __init__(self):
        self.queue = []

    def insert(self, item):
        self.queue.append(item)

    def delete(self):
        if len(self.queue) == 0:
            return "Queue is empty"

        min_index = 0
        for i in range(len(self.queue)):
            if self.queue[i] < self.queue[min_index]:
                min_index = i

        return self.queue.pop(min_index)

# 예제 사용
pq = PriorityQueue()
pq.insert(3)
pq.insert(1)
pq.insert(4)
pq.insert(1)
pq.insert(5)

print(pq.delete())  # 1
print(pq.delete())  # 1
print(pq.delete())  # 3
print(pq.delete())  # 4
print(pq.delete())  # 5
print(pq.delete())  # Queue is empty

 

이진힙을 이용한 우선순위 큐

이진힙(BinaryHeap)의 개념

이진 힙은 완전 이진 트리(complete binary tree)의 한 형태이다. 이진힙자체로는 안정적 우선순위큐를 보장하지않는다.

  • 노드: 트리의 기본단위(동그라미), 트리는 여러 노드로 구성된 구조이다
  • 루트노드&리프노드: 가장 상위에 있는 노드를 루트노드 그리고 가장 하위에 있는 노드(들)을 리프 노드라고 한다
  • 이진트리: 각 노드가 최대 두개의 자식노드를 가지는 트리
  • 완전이진트리: 모든레벨이 꽉 차있는 이진트리, 다만 마지막 레벨은 왼쪽부터 차례대로 채워져있어야한다
  • 부모노드&자식노드: 부모노드는 특정 노드보다 한단계상위에 위치한 노드를 의미한다, 자식노드는 특정노드보다 한단계 하위에 위치한 노드를 의미한다 4의 부모노드는 2, 자식노드는 8과9이다
  • 최소 이진 힙(Min Binary Heap): 부모 노드의 값이 자식 노드들의 값보다 작거나 같은 완전 이진 트리
  • 최대 이진 힙(Max Binary Heap): 부모 노드의 값이 자식 노드들의 값보다 크거나 같은 완전 이진 트리

 

이진힙의 기능

삽입과정

새로운 노드는 이진 힙의 가장 하단, 가장 오른쪽에 임시로 추가된다. 그다음 상향힙화(Heapify-Up)을통해 힙을 재정렬한다. 이 과정은 O(log n)의 시간 복잡도를 가진다. 상향힙화는 다음과 같이 이루어진다

  1. 새 노드와 그 부모노드를 비교한다. 만약 더 이상 부모 노드가 없을경우 과정을 종료한다
  2. 새노드가 힙의 속성을 만족한다면 과정을 종료한다
  3. 새노드가 힙의 속성을 만족하지않는경우 부모노드와 위치를 교환한다. 그후, 다시 첫번째 단계로 돌아가 과정을 반복한다

 

제거과정

제거하려는 노드는 힙의 루트 노드이다(우선순위가 가장 높은 노드). 루트 노드를 제거한 후에는 힙의 가장 하단, 가장 오른쪽에 위치한 노드를 임시적으로 루트노드로 옮긴다. 그다음 하향힙화(Heapify-Down)을 통해 힙을 재정렬한다. 이 과정 역시 O(log n)의 시간 복잡도를 가진다. 하향 힙화는 다음과 같이 이루어진다

  1. 루트노드를 자식 노드들과 비교한다. 만약 더 이상 자식 노드가 없을경우 과정을 종료한다
  2. 루트노드가 힙의 속성을 만족한다면 과정을 종료한다
  3. 루트노드가 힙의 속성을 만족하지않는다면, 힙의 속성을 만족할수있게 자식노드중 하나와 위치를 교환한다. 그후, 다시 첫번째 단계로 돌아가 과정을 반복한다.

 

heapq 라이브러리를 사용한 우선순위 큐 

현재 코드는 불안정 우선순위 큐로, 우선순위만을 입력으로 주고있지만 (우선순위, 아이템)형태로 입력을 주게된다면 자동으로 안정적 우선순위큐로 변환된다

import heapq

class PriorityQueue:
    def __init__(self):
        self.elements = []

    def is_empty(self):
        return not self.elements

    def push(self, priority):
        heapq.heappush(self.elements, priority)

    def pop(self):
        if self.is_empty():
            return "Queue is empty"
        return heapq.heappop(self.elements)

# 우선순위 큐 사용 예시
pq = PriorityQueue()

# 요소 삽입
pq.push(3)
pq.push(5)
pq.push(1)
pq.push(4)
pq.push(2)

# 가장 우선순위가 높은 요소 제거 및 반환
print(pq.pop())  # 1
print(pq.pop())  # 2
print(pq.pop())  # 3
print(pq.pop())  # 4
print(pq.pop())  # 5
print(pq.pop())  # Queue is empty

 

heapq 라이브러리를 사용하지않은 우선순위 큐

이진힙을 배열로 저장하는 방법

트리의 노드를 위에서 아래로, 그리고 각 레벨에서 왼쪽에서 오른쪽으로 배열에 저장한다. 예를들어 다음과 같은 최소힙이 있다고 가정했을때 배열에 저장된모습은 다음과 같다

그림1

배열에서 자식인덱스와 부모 인덱스를 찾는 공식

  • 왼쪽 자식의 인덱스: $2i + 1$
  • 오른쪽 자식의 인덱스: $2i + 2$
  • 부모 노드의 인덱스: $(i - 1) \div 2$

예를들어 5의 자식인덱스와 부모 인덱스를 각각 찾고싶다면 $i$에 5의 인덱스인 2를 대입하면된다

신기하게도 공식을 이용해 부모노드와 자식노드의 인덱스를 찾을수있는것을 확인할수있다

 

배열에서 자식인덱스와 부모 인덱스를 찾는 공식의 유도 과정

앞선 공식이 어떻게 유도되었는지를 짧게 다루고 넘어가려고 한다
그림1의 최소힙은 3개의 레벨로 구성되어있다 각 레벨의 구성요소는 다음과 같다

  1. 레벨 0: [1]
  2. 레벨 1: [3,8]
  3. 레벨 3: [7,4,9,12]

이진힙의 특성상, 레벨이 올라갈때마다 구성요소의 개수는 2배씩 늘어난다 따라서

완전히 꽉 찬 이진 트리에서 레벨$n$까지의 노드 개수는 $2^0 + 2^1 + \ldots + 2^n$개 이다 
이때 $2^{n+1} - 1 = 2^0 + 2^1 + \ldots + 2^n$이다 왜냐하면


$S = 2^0 + 2^1 + \ldots + 2^n$

$2S = 2^1 + 2^2 + \ldots + 2^{n+1}$

$2S - S = 2^{n+1} - 2^0$

$S = 2^{n+1} - 1$


 

인덱스 $x$와 해당 인덱스가 속한 $\text{level}$을 알면 인덱스 $x$가 $\text{level}$의 몇번째 노드인지 알수있다.

바로 $x - (2^{\text{level}} - 2)$번째 노드이다

1이아닌 2를 빼주는 이유는 인덱스는 0부터 시작하기때문에 실제 노드의 개수보다 1이 더 적기때문이다

이를 가지고 왼쪽 자식노드와 오른쪽 자식노드 그리고 부모노드의 인덱스를 알수있다

오른쪽자식 인덱스

$(2^{\text{level}+1} - 2) + 2(x - (2^{\text{level}} - 2))$

$= 2 \cdot 2^{\text{level}} - 2 + 2x - 2 \cdot 2^{\text{level}} + 4$

$= 2x + 2$

왼쪽자식 인덱스

$2x + 2 - 1$

$= 2x + 1$

 

부모인덱스

자식노드의 인덱스 $i$가 주어졌을때 부모 노드의 인덱스 $x$를 찾는식을 찾아보자

  • 왼쪽 자식의 경우:
    $2x + 1 = i$
    $2x = i - 1$
    $x = (i - 1) \div 2$
  • 오른쪽 자식의 경우
    $2x + 2 = i$
    $2x = i - 2$
    $x = (i - 2) \div 2$

문제는 우리는 실제 공식을 적용할때 자식노드가 왼쪽 자식인지 오른쪽 자식인지 알수없다는것이다.

 

자식노드에 부모노드의 인덱스를 찾는 공식을 대입했을때 모든 경우의 수를 알아보자

  실제 왼쪽 자식($i$) 실제 오른쪽 자식($i$)
왼쪽 자식 공식 대입 결과 $x$ $x + 0.5$
오른쪽 자식 공식 대입 결과 $x - 0.5$ $x$

따라서 부모노드의 인덱스를 찾는 공식은 $x = \left\lfloor \frac{i - 1}{2} \right\rfloor$이다

여기서 $\left\lfloor x \right\rfloor$는 $x$를 내림한것을 의미한다

 

class PriorityQueue:

    def __init__(self, min_heap=True):
        self.elements = []
        self.min_heap=min_heap
        
    def push(self, priority):
        self.elements.append(priority) # 요소추가
        new_idx = len(self.elements) - 1 # 새로추가된 원소의 인덱스
        self.heapify_up(new_idx) # 상향힙화를 통한 재정렬
    
    def heapify_up(self, child_idx):
        if child_idx == 0: 
            return # new_idx가 루트노드가 되었다면 상향힙화 종료
        parent_idx=self.find_parent_idx(child_idx)
        child=self.elements[child_idx]
        parent=self.elements[parent_idx]

        if self.min_heap: # 최소힙인지 최대힙인지 파악
            condition = child < parent
        else:
            condition = child > parent

        if condition: # condition을 만족한다면 부모노드와 자식 노드의 자리를 바꾸고 비교 계속 진행, 그렇지 않을 경우 정렬 종료 
            self.elements[child_idx] = parent
            self.elements[parent_idx] = child
            child_idx=parent_idx
            return self.heapify_up(child_idx)
    
    def find_parent_idx(self, child_idx): # 부모 노드의 인덱스를 찾아주는 함수이다
        parent_idx=(child_idx - 1) // 2
        return parent_idx

    def pop(self): 
        if not self.elements: # queue가 비어있을경우 pop안함
            return "Queue is empty"
        popped_element = self.elements[0] # 제거될 요소 저장
        self.elements[0] = self.elements[-1] # 루트노드를 힙트리마지막 노드로 교체
        self.elements.pop() 
        if not self.elements: # pop후 queue가 빈경우 재정렬 안함
            return popped_element
        self.heapify_down(0) # 하향힙화를 통한 재정렬
        return popped_element

    def heapify_down(self, parent_idx):
        left_child_idx, right_child_idx=self.find_child_idx(parent_idx)
        
        parent=self.elements[parent_idx]
        
        # 부모노드에 양쪽 자식중 하나라도 없는경우, value비교를 위해 없는자식의 값을 inf또는 -inf값으로 대체 하였다
        if self.min_heap:
            left_child = self.elements[left_child_idx] if left_child_idx < len(self.elements) else float('inf')
            right_child = self.elements[right_child_idx] if right_child_idx < len(self.elements) else float('inf')
            value=min(left_child, right_child, parent)
        
        else:
            left_child = self.elements[left_child_idx] if left_child_idx < len(self.elements) else float('-inf')
            right_child = self.elements[right_child_idx] if right_child_idx < len(self.elements) else float('-inf')
            value=max(left_child, right_child, parent)
        
        if left_child==value: # 왼쪽 자식 노드와 부모 노드 교환
            self.elements[left_child_idx] = parent
            self.elements[parent_idx] = left_child
            parent_idx=left_child_idx
            return self.heapify_down(parent_idx)
        elif right_child == value: # 오른쪽 자식 노드와 부모노드 교환
            self.elements[right_child_idx] = parent
            self.elements[parent_idx] = right_child
            parent_idx = right_child_idx
            return self.heapify_down(parent_idx)
        else: # 힙의 속성을 만족한다면 정렬 종료                
            return

    def find_child_idx(self, parent_idx): # 자식노드의 인덱스를 찾아주는 함수이다
        left_child_idx=(2*parent_idx)+1
        right_child_idx=(2*parent_idx)+2
        return left_child_idx, right_child_idx
        

# 우선순위 큐 사용 예시
pq = PriorityQueue()

# 요소 삽입
pq.push(3)
pq.push(5)
pq.push(1)
pq.push(4)
pq.push(2)


# 가장 우선순위가 높은 요소 제거 및 반환
print(pq.pop())  # 1
print(pq.pop())  # 2
print(pq.pop())  # 3
print(pq.pop())  # 4
print(pq.pop())  # 5
print(pq.pop())  # Queue is empty

댓글