카테고리 보관물: Programming

Rust 프로그래밍에서 map() 활용

Python이 그러하듯 Rust도 declarative programming(선언형 프로그래밍)을 지원한다. 그 중 map()은 이러한 코딩스타일의 대표처럼 사용되고는 하는데, 이것을 이용하면 길고 장황한 코드를 간결하게 나타낼 수 있다.

기본적 사용법

1 부터 10까지 1씩 증가하는 값을 가진 i32 형의 10칸짜리 배열 arr이 있다고 할 때, 그 안에 있는 각 원소들에 2를 곱하는 코드이다.

// 배열의 각 요소들에 x2를 수행하고 결과를 출력.
// 실행결과:
// Doubled array: [2, 4, 6, 8, 10, 12, 14, 16, 18, 20]

fn main() {
    // i32 type 배열 선언.
    let arr: [i32; 10] = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];

    // map()을 사용해서 각 element들에 x2를 수행.
    let d_arr = arr.map(|x| x * 2);
    println!("Doubled array: {:?}", d_arr);
}

위의 코드는 Python과 같은 다른 언어에서도 사용하는 형식을 가지고 있어서 직관적이고 그 내용을 이해하기도 비교적 쉽다. 그렇다면 vector에 대해 같은 코드를 작성하면 어떨까? 10칸짜리 vector, vec를 만들고 동일한 동작을 수행해 보도록 하자.

// 벡터의 각 요소들에 x2를 수행하고 결괄르 출력.
// 실행결과:
// Doubled vector: [2, 4, 6, 8, 10, 12, 14, 16, 18, 20]

fn main() {
    // Vector를 선언하고 1 부터 10까지 값들로 초기화.
    let vec = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];

    // map()을 사용해서 각 element들에 x2를 수행.
    let d_vec = vec.iter().map(|x| x * 2).collect::<Vec<_>>();
    println!("Doubled vector: {:?}", d_vec);
}

앞에서의 배열에 대한 map() 연산과 달리 뭔가 복잡해 졌다. 10번째 줄을 보면, map()을 호출하기 전에 iter()를 통해서 먼저 iterator(반복자)를 받고, 그 다음에 map()의 내용을 lambda로 수행한 다음에 collect::<Vec<_>>()라는 기괴한 구문으로 끝을 맺고 있다.

The “Lazy Pipeline”

이와 같이 data soruce로 부터 반복자를 받아서 map()을 통해 형태를 변경하고 다시 collect()로 넘겨 최종 처리하는 패턴(Create – Transform – Consume)이 Rust에서 자주 사용되는데 이것을 “Iterator Bridge” 패턴 혹은 “Lazy Pipeline”이라고 부른다.

이렇게 복잡한 pipeline을 거치는 이유는 크게 두가지 정도를 생각해 볼 수 있는데, 첫번째는 원하는 목적에 따라 각 단계의 함수(메소드)들을 다른 종류로 대체할 수 있기 때문이고 두번째는 앞서 살펴본 array와 달리 컴파일 시간에 그 크기가 정해지지 않는 동적 타입에 대해 실행시간을 최대한 미루는 게으른 처리(lazy evaluation)을 이용하여 성능을 향상시키기 위함이다.

각 함수(메소드)들의 대체

위의 예제에서는 Vector type의 source에 대하여 x2라는 transformation을 수행하고, collect::<Vec<_>>()로 consume해서 새로운 vector를 생성해 냈다. 반복자를 호출하는 방법에도 소유권을 넘겨주는 지 혹은 빌려오는지에 따라 iter_into() 혹은 iter()와 같은 다양한 종류의 반복자 반환 메소드를 사용할 수 있고, transformation을 수행하는 메소드도 map() 뿐만 아니라 특정한 조건을 만족하는 원소를 걸러내는 filter()같은 메소드들을 사용할 수 있으며, 최종단의 consumer 역시도 새로운 container를 만들어 내는 collect()외에도 요소의 갯수를 반환하는 count() 나 요소의 합을 계산하는 sum() 같은 다양한 메소드들로 목적에 맞게 파이프라인을 구성할 수 있다.

동일한 패턴을 유지하면서 vector내부에 있는 짝수의 총 합을 계산하는 다음 코드를 한번 보자.

// 벡터안의 짝수만 걸러서(filter) 합을 구하기
// 실행결과:
// Sum of all even numbers: 30

fn main() {
    let vec = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];

    let sum = vec.iter().filter(|x| *x % 2 == 0).sum::<i32>();
    println!("Sum of all even numbers: {}", sum);
}

게으른 처리 (Lazy Evaluation)

Adapter인 map() 혹은 filter()로 호출한 구문은 consumer(collect(), fold(), sum(), count(), for loop 등)을 만나기 전까지는 아무런 수행도 하지 않았다가 consumer를 만나고 나서야 동작을 수행하게 된다. 이러한 동작은 pipeline이 구성된 후에 수행되기 때문에 불필요하게 중간상태를 보관하기 위한 메모리를 할당할 필요 없이 최적화된 코드 수행을 할 수 있도록 해준다. 실행시간에 크기가 변화하는 Vector와 같은 container를 감안할 때, 코드 수행단계마다 처리해서 임시 메모리에 쌓아두는 것 보다, 필요 할 때까지 미루어 뒀다가 최종 상태를 감안해서 처리하는 게으른 처리는 메모리 소모와 성능면에서 좋은 전략이 될 수 있다.

Option과 Result에 대한 map() 사용

언뜻 보면 일관성이 없어 보이기도 하지만, Option과 Result에도 map()을 사용할 수 있는데, 이는 Option의 Some()이나 Result의 Ok() 상태 처럼 정상동작한 경우에 대해 수행할 동작을 정의하고자 할 때 사용할 수 있어서, if let 구문이나 match 구문을 대체하여 코드를 간결하게 유지할 수 있도록 해준다.

// 실행결과:
//the value is SOME: 'some value'
//the result is OK: Result was good

use std::io::{Error, ErrorKind};

fn main() {
    // Option, map()
    let opt = Some("some value");
    //let opt: Option<String> = None;
    opt.map(|x| println!("{}", format!("the value is SOME: '{x}'")));

    // Result, map()
    let res: Result<String, Error> = Ok("Result was good".to_string());
    //let res: Result<String, Error> = Err(Error::new(ErrorKind::Other, "Some error"));
    let _ = res.map(|x| println!("{}", format!("the result is OK: {x}")));
}

Conclusion

Rust에서 사용되는 map()의 활용에 대해 살펴 보았다. Container에 대해 map()을 사용하는 것은 비교적 우리에게 익숙하지만, Option / Result에 map()을 사용하는 개념은 상대적으로 그렇지 않은데, 많은 설명들에 따르면, 이것은 “상자 속에 들어 있는” 어떤 값에 대해 그 내부를 직접 들여다 보지 않고 적용한다는 점에서 container와 Option, Result 공통으로 적용할 수 있는 선언형 프로그래밍의 철학이 적용된 것이라고 한다.

[Tip] Emacs: rust-analyzer-tramp가 계속 죽는 문제

Remote server에 tramp mode로 rust file을 읽어서 rust-mode에 진입한 후에 미니버퍼에 아래와 같은 경고가 뜨는데, ‘y’를 입력해서 rust-analyzer-tramp를 재실행 시켜도 계속해서 죽어서 같은 오류가 보이는 문제가 생겼다.

rust-analyzer 설치 확인

Remote server측에 rust-analyzer가 설치되어 있지 않으면 이와 같은 문제가 생길 수 있으니 다음의 명령어로 rust-analyzer를 update해본다. 만약 설치 되어 있지 않다면 이 과정에서 설치될 것이다.

rustup component add rust-analyzer

Remote path 사용

rust-analyzer가 이미 설치 되어 있음에도 문제가 발생 한다면 서버측의 path가 제대로 설정되고 있는지 확인해 보자. Trump mode는 서버측의 PATH 환경변수 값 을 읽지 않으므로 경로정보를 서버의 것으로 유지 하도록 설정해 주어야 한다.

 (require 'tramp)
 (setq tramp-default-method "ssh")
 ;; Respect remote path.
 (with-eval-after-load 'tramp
   (add-to-list 'tramp-remote-path 'tramp-own-remote-path))

OpenVINO Object Detection Model의 전처리와 후처리를 간단하게

AI model들은 입출력 layer의 구조가 다르므로 서로 다른 pre/post processing을 필요로 한다. Vision model을 예로 들면 어떤 모델은 입력을 416×416으로 받게 되어 있어서 입력 전에 원본 크기로 부터 resizing을 해주어야 하고, 어떤 모델은 resizing layer을 포함하고 있어서 그냥 입력해도 잘 동작하기도 한다. 또한 어떤 모델은 입력값을 정규화 해서 입력해 주어야 하고, 어떤 모델은 정규화가 필요 없기도 하다.

입력층 뿐 아니라 출력층도 제각각이다. Object detection을 수행하는 모델들을 보면 YOLO같은 모델은 score값을 bounding box와 함께 전달하고, D-Fine같은 모델은 bounding box, score, label을 각각 따로 출력하기도 한다.

어떤 모델의 입출력 형태의 차이를 보고 싶으면 읽어들인 모델의 inputs와 outputs 변수의 내용을 살펴보면 된다.

def print_model_io_layer(ovcore: Core, model_path: str, model_name: str):
    read_model = ovcore.read_model(model_path)
    print(f"[{model_name}]")

    # Display input layer
    for idx, input_layer in enumerate(read_model.inputs):
        print(f"Input({idx+1}):")
        print("  any_name     :", input_layer.get_any_name())
        print("  names        :", input_layer.get_names())
        print("  shape        :", input_layer.get_partial_shape())
        print("  element type :", input_layer.get_element_type())

    # Display output layer
    for idx, output_layer in enumerate(read_model.outputs):
        print(f"Output({idx+1}):")
        print("  any_name     :", output_layer.get_any_name())
        print("  names        :", output_layer.get_names())
        print("  shape        :", output_layer.get_partial_shape())
        print("  element type :", output_layer.get_element_type())

이렇게 다양한 모델들의 입출력을 직접 처리하는 것은 여간 번거로운 일이 아니다. OpenVINO에서는 모델에 전/후처리를 사용자가 지정할 수 있는 PrePostProcessor를 지원하기는 하지만 이 마저도 모델별로 상이한 부분을 직접 수정해 주어야 하기 때문에 번거로운 것은 마찬가지다.

OpenVINO Model API 사용법

예를 들어 ATSS, YOLOX-S, YOLOX-Tiny, D-Fine 네개의 OpenVINO model과 각각에 대한 ONNX model들까지 8개의 서로 다른 세개의 서로 다른 모델들이 있다고 할 때, 8개의 서로 다른 입출력 결과를 처리하는 것은 아주 많은 노력을 필요로 할 것이다.

openvino-model-api를 사용하면 이러한 부분을 비교적 간단하게 처리 할 수 있다. 사용하려면 다음과 같이 필요한 패키지들을 설치해 준다.

pip install openvino openvino-model-api onnx

설치가 끝나면 먼저 서로 다른 입출력 계층을 맞춰 주는 모델을 생성한다. 추론에 사용하고자 하는 모델 파일로 OpenvinoAdapter를 만들어서 create_model()에 넘겨 준다. 이렇게 생성된 모델은 입력을 위한 전처리 과정을 자동으로 수행한다.

from openvino import Core
from model_api.adapters import OpenvinoAdapter
from model_api.models import Model


ovcore = Core()

# Adapter를 생성하고 create_model()에 넘겨준다.
ovadapter = OpenvinoAdapter(ovcore, model_path)
model = Model.create_model(ovadapter, preload=True)

그리고 나서 추론을 실행하면 그 결과는 DetectionResult 타입으로 정리되어 반환된다. 따라서 모델별로 서로다른 후처리(post processing) 과정 필요 없이 DetectionResult으로 반환되는 결과를 처리해주면 된다.

# 추론 실행
detection_result = model(test_image)

결과물 처리를 위해서 detection_result를 원본 이미지에 overlay하는 함수로 만들었다(overlay_detection_result). 이 함수는 Threshold값을 넘는 결과물을 원본 이미지 위에 bounding box와 label로 표시하고 detect된 object의 갯수와 함께 반환해 준다.

DetectionResult에서 참조되는 주요한 멤버변수는 Socre값을 나타내는 .score, bounding box를 나타내는 .xmin, .ymin, .xmax, .ymax 그리고 label을 가지고 있는 .str_label이다. Bounding box 좌표도 입력 이미지의 크기에 맞게 이미 scaling되어 있으므로 좌표를 그대로 가져다 쓰면 된다.

from model_api.models.utils import DetectionResult


def overlay_detection_result(
    original_image: np.ndarray, detection_result: DetectionResult, threshold: float
) -> Tuple[np.ndarray, int]:
    num_detected = 0
    processed_image = original_image.copy()

    for det in detection_result[0]:
        score = det.score
        if score >= threshold:
            num_detected += 1

            # Bounding box
            x1 = int(det.xmin)
            y1 = int(det.ymin)
            x2 = int(det.xmax)
            y2 = int(det.ymax)

            # Clamping
            x1 = max(0, min(x1, original_image.shape[1]))
            y1 = max(0, min(y1, original_image.shape[0]))
            x2 = max(0, min(x2, original_image.shape[1]))
            y2 = max(0, min(y2, original_image.shape[0]))

            # Draw BBox
            cv2.rectangle(processed_image, (x1, y1), (x2, y2), BBOX_COLOR, 2)

            # Draw label.
            display_text = (
                f"{det.str_label} {score:.2f}" if {det.str_label} else f"{score:.2f}"
            )
            cv2.putText(
                processed_image,
                display_text,
                (x1, y1 - 10),
                cv2.FONT_HERSHEY_SIMPLEX,
                0.5,
                LABEL_COLOR,
                2,
            )
    return processed_image, num_detected

8개의 모델에 시험해 보자

사용하려는 8개의 모델들은 다음과 같다.

atss_model_file = os.path.join(MODEL_PATH, "atss_fp32/model.xml")
atss_onnx_model_file = os.path.join(MODEL_PATH, "atss_onnx_fp32/model.onnx")
yolos_model_file = os.path.join(MODEL_PATH, "yolos_fp32/model.xml")
yolos_onnx_model_file = os.path.join(MODEL_PATH, "yolos_onnx_fp32/model.onnx")
yolotiny_model_file = os.path.join(MODEL_PATH, "yolotiny_fp32/model.xml")
yolotiny_onnx_model_file = os.path.join(MODEL_PATH, "yolotiny_onnx_fp32/model.onnx")
dfinex_model_file = os.path.join(MODEL_PATH, "dfinex_fp32/model.xml")
dfinex_onnx_model_file = os.path.join(MODEL_PATH, "dfinex_onnx_fp32/model.onnx")

이것들을 all_models라는 list에 넣고 한번에 돌린다. 즉, 각 모델들의 서로 다른 입력과 출력 계층에 대한 처리를 단일한 코드로 수행하는 것이다.

ovcore = Core()


# Test image
test_image = cv2.imread("./data/test_image.jpg")

threshold = 0.8
for m in all_models:
    model_path = m[0]
    model_name = m[1]
    ovadapter = OpenvinoAdapter(ovcore, model_path)
    model = Model.create_model(ovadapter, preload=True)
    detection_result = model(test_image)
    proc_image, num_det = overlay_detection_result(
        test_image, detection_result, threshold
    )

    print(f"{model_name} :: {num_det} objects detected (threshold: {threshold}).")
    cv2.imwrite(f"output_{model_name}.jpg", proc_image)

전체 소스코드