글쓴이 보관물: litcoder

gRPC를 이용한 Observer Pattern

Remote에서 제공하는 기능을 마치 local system의 function call 처럼 제공 한다는 gRPC의 개념 자체는 1990년대에도 있었던 것이기에 새로울 것은 없지만, 그 위에서 “Subject의 변경이 있을 때 subscriber에게 notify 해주는 Observer Pattern을 어떻게 구현 할 수 있을까?”하는 의문이 들었다.

이 포스팅에서는 gRPC를 이용한 observer pattern의 예제로 server에서 임의의 주식과 그 변동 가격을 client로 notify 해주고 이것을 화면에 출력하는 예제를 작성해 본다. 전체 코드는 https://github.com/litcoder/grpcobsr에서 확인 할 수 있다.

기본적인 gRPC는 client에서 필요한 정보를 server에게 요청해서 그 결과를 돌려받는다. 반면, Observer Pattern은 그 반대로 server 측에서 정보의 변경 사항이 있을 때 이것을 client 측에 알려 주어야 한다.

이를 가능하게 하는 것은 ServerWriteRectorClientReadReactor template인데, 이들은 client의 요청에 대해 server가 여러 개의 응답을 비동기적으로 전송하는 이른바 gRPC의 server-side streaming을 구현하는데 가장 핵심이 되는 요소들이다. 우리 예제의 경우 client는 server로 “주식 가격을 알려 주세요”라는 request를 전송하면 ClientReadReactor의 OnReadDone() 함수로 변경된 주식과 가격이 하나씩 들어오는 식이다.

Proto file

주식 정보를 제공하는 proto file은 다음과 같이 정의 한다. 클라이언트가 UpdateStockPrice()를 요청하면 서버가 StockPriceResponse를 여러 개 stream으로 전송해 주는데 그 안에는 각각의 주식의 종목(symbol)과 가격(price) 정보가 담겨져 있다.

syntax = "proto3";
import "google/protobuf/empty.proto";

message StockPriceResponse {
    string symbol = 1;
    double price = 2;
}

service StockService {
    rpc UpdateStockPrice(google.protobuf.Empty) returns (stream StockPriceResponse);
}

StockPriceWriteReactor

StockPriceWriteReactor는 서버에서 동작하는 reactor이다.

class StockPriceWriteReactor
    : public ::grpc::ServerWriteReactor<::StockPriceResponse>
{
public:
  StockPriceWriteReactor(int evCnt);

  void OnWriteDone(bool ok) override;
  void OnDone() override;
  void OnCancel() override;

private:
  void NextWrite();

  int _mReqEventCount;
  int _mCurEventCount;
  StockPriceResponse _mResp;
  StockRepository _mStockRepo;
};
  • NextWrite(): 클라이언트로 전송할 데이터를 생성하고 stream에 쓴다.
  • OnWriteDone(): NextWrite()에서 호출하는 StartWrite()에 의해 하나의 정보가 쓰여졌을 때 호출되는 callback이다. 이전의 전송이 성공적으로 되었는지 검사하고 다음 정보를 전송한다.
  • OnDone(): Stream 전송 완료를 의미하는 Finish()를 호출하면 불리는 callback이다. 해당 인스턴스의 사용이 종료되었다는 의미이므로 메모리를 해제한다.

Server 구현

class StockServiceImpl final : public StockService::CallbackService
{
public:
  ...
  ServerWriteReactor<::StockPriceResponse> *UpdateStockPrice(
      CallbackServerContext *context,
      const google::protobuf::Empty *empty) override
  {
    return new StockPriceWriteReactor(_mEventCount);
  }
  ...
};

gRPC 비동기 호출을 위한 서버는 StockService::Server가 아닌 StockService::CallbackService를 상속받아 구현한다. StockService::Server가 write stream에 전송할 내용을 쓰고 grpc::Status를 반환 하도록 하는 것과 달리 CallbackService는 앞서 정의한 ServerWriteReactor<StockPriceResponse>* type을 반환 하도록 정의 되어있다.

StockPriceReadReactor

StockPriceReadReactor는 클라이언트에서 동작하는 reactor이다.

class StockPriceReadReactor
    : public ::grpc::ClientReadReactor<::StockPriceResponse>
{
public:
  StockPriceReadReactor(
      std::shared_ptr<StockService::Stub> stub, std::shared_ptr<Publisher> pub);
  void OnReadDone(bool ok) override;
  void OnDone(const ::grpc::Status &s) override;
  ::grpc::Status Await();

private:
  std::shared_ptr<Publisher> _mPub;
  ::grpc::ClientContext _mContext;
  ::StockPriceResponse _mResp;

  std::mutex _mMtx;
  std::condition_variable _mCondVar;
  ::grpc::Status _mStatus;
  bool _mAllDone = false;
};
  • OnReadDone(): StartRead() 호출을 통해 서버로 부터 하나의 record인 주식 정보 업데이트를 받을 때 마다 호출되는 callback이다. 이것을 이용해 Observer pattern에서 event publisher가 자신에게 등록된 subscriber들에게 event를 notify하는 코드를 구현할 수 있다.
  • OnDone(): 서버로 부터 stream 종료를 받으면 호출되는 callback이다. Await()과 공유되는 condition_variable을 이용해 process가 종료 될 수 있도록 한다.
  • Await(): 비동기 호출은 multi threading을 전체 하므로, 이 함수는 서버로 부터 받는 stream이 종료될 때까지 main thread가 종료되지 않고 유지 되도록 해준다.
  • mutext와 condition_variable: 위에서 설명한 OnDone()과 Await()이 thread control을 할 수 있도록 해주는 동기화 변수 들이다.

Client 구현

class StockClient
{
public:
  ...
  void updateStockPrice()
  {
    StockPriceReadReactor reader(_mStub, _mPub);
    Status status = reader.Await();
    if (status.ok())
    {
      spdlog::info("PriceListing succeed.");
    }
    else
    {
      spdlog::error("Failed to get prices.");
      spdlog::error("{}({})", status.error_message(), status.error_code());
    }
  }
  ...
};

클라이언트 코드는 StockPriceReadReactor의 instance를 만들고 Await()을 호출해서 서버로 부터 전송이 완료되기를 기다린다. 그럼 서버쪽으로 “주식 가격 주세요”라는 request는 누가 날리냐고? StockPriceReadReactor의 생성자에 다음과 같이 stub에 UpdateStockPrice()를 호출하는 부분이 정의되어 있다.

StockPriceReadReactor::StockPriceReadReactor(
    std::shared_ptr<StockService::Stub> stub, std::shared_ptr<Publisher> pub)
    : _mPub(pub)
{

  ::google::protobuf::Empty empty;
  stub->async()->UpdateStockPrice(&_mContext, &empty, this);
  StartRead(&_mResp);
  StartCall();
}

동작 확인

Code repo: https://github.com/litcoder/grpcobsr

References

  • gRPC Long-lived Streaming using Observer Pattern: Java로 gRPC의 Observer pattern을 구현한 내용을 설명한 글이다. 사실 본 포스팅의 시작도 원래는 이 구현을 C++로 변경해 보고자 하는 의도였으나 안타깝게도 C++에서 사용할 수 없는 의존성 때문에 많은 부분을 새롭게 작성해야 했다.
  • Asynchronous Callback API Tutorial: gPRC에 대한 비동기 호출에 대해 예제를 포함해서 매우 자세히 설명한 글이다. Observer pattern을 직접 언급하고 있지는 않으나 활용도가 높은 Unary, Server-side streaming, Client-side streaming 그리고 Bidirectional streaming을 설명한다.
  • gRPC API reference: API들에 대한 설명을 찾아 볼 수 있다. 친절하게 설명된 문서는 아니지만 그래도 없는 것 보다는 뭐…

Union을 이용한 byte 단위 접근

Big-endian으로 주어진 byte들을 little-endian으로 변환해야 하는 문제가 생겼다. Byte들의 order를 거꾸로 만드는 것은 어렵지 않지만 그러기 위해서는 byte pointer가 가리키는 element들을 1 byte 단위로 접근해야 한다. 1 byte씩 뒤집은 다음에는 변환된 array를 원하는 크기의 타입으로 읽도록 type casting을 해주어야 한다.

Union을 이용하면 코드를 못생기게 만드는 pointer 직접 연산이나 type casting을 하지 않고 이를 구현할 수 있다. 즉 union은 선언된 element의 가장 큰 크기 만큼의 메모리가 할당 되므로 같은 크기의 두 element를 서로 다른 data type으로 선언하는 것이다.

union
{
  int32_t v;
  uint8_t b[4];
} value;

위와 같이 선언하면 value.v = 0xdeadbeef 같은 식으로 int32를 쓰거나 읽을 수 있고 value.b[0] 같이 각 메모리 index에 접근 할 수 있다.

Template으로 만들어서 여러 타입에 대응 할 수 있다.

    template <typename T> T readFromBigEndian(uint8_t *b)
    {
      union
      {
        T v;
        uint8_t b[sizeof(T)];
      } dest;

      union
      {
        T v;
        uint8_t *b;
      } src;

      src.b = b;
      for (int i = 0; i < sizeof(T); ++i)
      {
        dest.b[i] = src.b[sizeof(T) - i - 1];
      }
      return dest.v;
    }

일본 출장 로그 (23 Sep – 28 Sep)

2024년 추석이 끝난 다음 주로 일본 출장이 잡혔다. 처음으로 가보는 일본인데 관광이 아닌 업무가 첫 방문 목적이되었다.

1일차: 입국과 첫 인상

일본 입국은 머릿속으로 걱정하며 여러번 그렸던것에 비해서는 비교적 순탄했다. 미리 Visit Japan Web으로 세관신고와 입국수속을 해둔 덕에 대기하면서 서류를 작성해야 하는 일도 없었고 파파고에 의지해서 일본어를 더듬거리며 내 입국 목적을 설명해야 하거나 핸드캐리해 온 물건들이 무엇인지 설명해야 하는 일도 다행히 없었다. 시키는대로 줄서 있다가 지문 스캔하고 얼굴 사진찍고 여권 스캔하고 하다보니 여권에 90일짜리 체류 도장이 찍혔다. 다만, 나리타 공항에서 도쿄로 가는 고속열차 예약을 너무 여유있게 해둔 나머지 수속을 마치고 나서도 시간이 꽤나 남아서 일정을 변경하려했더니 Klook을 통해 프로모션으로 싸게 구매한 티켓이어서 변경도 안되고 환불시간 마저도 지나버려서 그냥 표 하나를 버리고 새로 구매해야 했다. 일정이 불확실할 때는 할인 안되는 한이 있더라도 변경가능한 티켓을 샀어야 했는데 이 부분은 잘 못 생각한 것 같다. Narita express 티켓을 새로 사려 판매기를 찾아 갔는데 친절한 알바 청년이 자세히 안내해 주어서 어렵지 않게 한국 신용 카드로 20분 정도 남은 열차의 티켓을 살 수 있었다. 일본인 동료가 설명해 주었는데, 고속전철은 수요가 많은 일본의 휴가시즌이 아니라면 굳이 미리 사 둘 필요는 없다고 한다.

비행편이 만석이라 저가항공을 타게 되었는데, 음식은 물론, 음료수도 돈 내고 사먹어야 했기에 열차 플랫폼에 도착했을 때 즈음에는 목이 말랐다. 마침 자판기가 있어서 물을 사먹으려고 한국에서 환전해 온 1000엔짜리 지폐를 넣었는데 자꾸만 뱉어냈다. 다른 지폐로 바꾸거나 옷에 문지르는 트릭도 전혀 먹지 않았다. 그때 몇몇 사람들이 자판기를 사용하는 것을 보니 현금을 쓰는 사람들은 없고 모두가 핸드폰으로 결제를 하는 것을 보고 인터넷에서 읽은 Mobile Suica 카드가 생각났다. 부랴부랴 앱을 깔고 애플 월렛에 추가한 다음 한국 신용카드로 1000엔을 충전해서 갖다 댔더니 드디어 자판기에서 물을 사먹을 수 있었다. 최근에 일본 화폐가 변경되었다고 하니 혹시나 그것과 관련하여 자판기가 아직 준비 되어 있지 않은게 아닐까 추측이 든다. Narita Express를 타고 익숙해 보이는 풍경의 들판과 산들을 지나 마침내 일본에 도착한 듯 보이는 건물들이 나타나기 시작하더니 도쿄역에 도착했다. 마침 이 날은 일본의 휴일이어서 차들의 출입을 막고 도로가 보행자들에게 개방되어 있었다.

출장 기간 동안 호텔 수요가 많아서 가격이 무척 비쌌다. 겨우 긴자의 한쪽 골목 안에 있는 3성급의 Ibis Styles Hotel에 자리가 있다고 해서 4박을 묵었는데 비지니스 호텔이라 그런지 아무런 부대 시설이 없고 그냥 잠을 자고 씻을 수 있는 좁은 방에 동경 사무실 까지 걸어서 대략 20분 정도 걸리는 먼 거리에 자리하고 있었다.

도쿄 첫 식사

호텔에 짐을 푼 다음 도쿄에서 나의 첫 식사는 라멘이었다. 야마노테선이 지나다니는 철길의 아랫쪽으로 몇몇 가게들이 들어서 있었는데 그 중 하나에 들어갔다.

안쪽에는 식권 자판기가 있어서 여기 까지는 수월했는데, 라면 식권을 사서 알바생에게 내밀자 뭐라뭐라 물어 봤다. 일본어 알못인 나는 무슨말인지 몰라 당황했고 알바생도 그런 나를 보고 당황했는데 주방 쪽에 있던 주방장 아저씨가 믿음이 가는 톤으로 걱정 말라는 듯 말하며 “… 노마루(normal)”라고 했다. 눈치로 짐작 컨데 면을 어느정도 익힐지 물어 본것 같고 중간정도로 익혀 주겠다고 한 것 같았다. 도코츠라멘 이었는데 맛있었다.

2일차 – 4일차

도쿄 사무실이 있는 고쿠사이 빌딩 앞에는 하나은행 동경 지점이 있었다. 한국 사무실은 하나은행 본사에 입주해 있는데 도쿄 사무실 앞에서 저 로고를 또 보니 기분이 묘했다. 한국어로 된 광고가 잔뜩 있는 걸 보니 현지인 보다는 관광객을 상대로 영업하는 것 같아 보인다.

사무실 안에는 노트북을 올려놓고 사이클을 할 수 있는 기이한 물건이 있었는데 이게 일본 제품인지는 모르겠지만 사무실에 이런 걸 둔다는게 왠지 모르게 일본문화와 어울린다는 생각이 들었다.

펜트리에는 공짜 음료수와 스낵들이 있었는데 멋 모르고 저 초록색 팩에 담긴 녹차를 너무 많이 먹었다가 밤에 잠을 못자서 다음날 피로에 쩔어 고생을 좀 했다.

2일차 부터 4일차 동안은 출장 온 목적을 달성하느라 정신 없이 달렸는데 하루는 저녁 밥도 못먹고 밤 11시 30분경에 퇴근하게 되어서 숙소로 돌아가는 길에 편의점에서 나의 일본 여행 첫 스시를 사다가 먹었다. 신라면과 함께…

5일차: 도쿄 마지막 밤

모든일을 끝내고 지금까지 머물던 숙소가 연장이 안되서 마지막 하룻밤은 긴자에 있는 5성급인 Imperial Hotel(제국호텔)에 머물게 되었다. 일본 제국 시절에 외국 대사들이 머물던 곳이었다고 하는데 그래서 인지 지금은 히비야 공원이된 도쿠카와 이에야스의 옛 성터의 근방에 위치해 있었다.

6일차: 도쿄에서의 마지막 식사와 나리타 공항 행

호텔 조식은 출장비에 포함시킬 수도 있었으나 의미 없이 비싸기만 한 것 같아서 마지막 식사는 근방에 있는 마츠야에서 규동을 먹기로 했다. 가게 밖에서 기계로 주문하고 안에서 전광판에 식권 번호가 뜨면 식사를 가져가는 시스템 이었는데, 기계에서는 한국어도 지원해서 주문을 수월하게 할 수 있었다.

호텔에서는 나리타 공항으로 가는 셔틀 버스편을 판매 하고 있었는데 올 때 처럼 도쿄역에서 Narita Express를 탈 수도 있었으나 무거운 짐을 가지고 엘리베이터 시설도 잘 안되어 있는 역을 오르내리는게 쉽지 않을 것 같아서 시간이 조금 더 걸리기는 하지만 호텔에서 바로 탑승 할 수 있는 버스를 구매 했다. 호텔 직원들이 버스로 안내해 주었고 짐도 실어준 다음 버스가 떠날 때는 줄서서 고개를 숙여 인사까지 했다. 부담스러…

달리는 버스 안에서 잠깐 잠들었다가 나리타 공항 3터미널에 가까워 지자 기사 아저씨가 방송을 시작 했는데 일본어로 이야기 한 다음 영어로 이야기 하며 “프리즈 돈트 포겟 아니씨잉”하며 끝맺자 승객중 몇몇이 특이한 엑센트가 재미 있었는지 매 터미널에 아저씨가 방송할 때 마다 마지막 부분을 함께 따라 했다. 무례해 보일 수도 있는 행동 이었지만 버스 아저씨가 함께 웃으며 받아주는 모습을 보니 여유 있어 보였다.

소회

아마도 관광이었다면 도쿄를 택하지는 않았을 것 같았다. 서울에서 매일 보는것과 비슷한 광경을 보기 위해서 굳이 그렇게 돈을 들일 필요가 없다고 생각 했을 것 같기 때문이다. 하지만 운이 좋게도 내 친절한 일본인 동료가 점심시간마다 가게들을 바꿔가며 새로운 음식들을 소개해 주고 이곳 저곳들을 보여준 덕에 관광객으로 방문했으면 놓쳤을지도 모를 큰 길가의 대형 건물들 사이의 골목들에서 공존하는 진짜 사람들의 세상을 조금이나마 경험할 수 있었다. 그러고 나서 보니 왠지 서울의 그것과는 다른 감흥이 들었다. 어딘가에서 읽은 “일본은 혹은 일본 사람들은 어떻다더라” 하는 것과는 전혀 다른 경험을 할 수 있는 기회가 있었던 것에 감사한다.