글쓴이 보관물: litcoder

VNC SSH Tunneling

Local VNC server는 접속이 가능하지만 외부에서 접속이 불가능 하다면 SSH tunneling을 설정해서 연결을 시도할 수 있다.

VNC Client에서 SSH Tunneling을 지원하는 경우

SSH tunneling을 지원하는 VNC client인 Remmina와 같은 경우에는 다음과 같이 서버로 localhost:5901을 설정한다. 여기에 서버 IP가 아니라 localhost를 적어주는게 좀 이상해 보이긴 하지만 SSH tunneling을 설정한 상태에서는 localhost의 5901번이 remote server의 VNC service를 제공하도록 연결된다.

그 다음에는 SSH Tunnel 탭을 선택해서 다음과 같이 정보를 입력한다. Enable SSH tunnel을 클릭해서 활성화 하고, 서버의 IP와 SSH가 서비스되는 port번호인 22번을 적어준다(SERVER_IP:<SSH_PORT>). 로그인 방식은 설정에 따라 적어 준다. 나는 SSH key login을 선호하므로 SSH Identity file을 선택하고 private key를 설정해 주었다.

SSH Tunneling 설정이 없는 경우

TigerVNC와 같이 명시적으로 SSH tunneling을 지원하지 않는 경우에는 터미널을 하나 열어서 다음과 같이 SSH tunel을 하나 열어 준다.

다음은 리모트 서버의 5901 포트를 로컬 5999번에 tunelling하는 예이다.

ssh -L 5999:localhost:5901 <user_id>@<vnc_server_ip>

터미널을 그대로 열어둔 상태에서 VNC client를 열어서 로컬의 5999번 포트로 연결한다.

연결

gRPC를 이용한 Observer Pattern

Remote에서 제공하는 기능을 마치 local system의 function call 처럼 제공 한다는 gRPC의 개념 자체는 1990년대에도 있었던 것이기에 새로울 것은 없지만, 그 위에서 “Subject의 변경이 있을 때 subscriber에게 notify 해주는 Observer Pattern을 어떻게 구현 할 수 있을까?”하는 의문이 들었다.

이 포스팅에서는 gRPC를 이용한 observer pattern의 예제로 server에서 임의의 주식과 그 변동 가격을 client로 notify 해주고 이것을 화면에 출력하는 예제를 작성해 본다. 전체 코드는 https://github.com/litcoder/grpcobsr에서 확인 할 수 있다.

기본적인 gRPC는 client에서 필요한 정보를 server에게 요청해서 그 결과를 돌려받는다. 반면, Observer Pattern은 그 반대로 server 측에서 정보의 변경 사항이 있을 때 이것을 client 측에 알려 주어야 한다.

이를 가능하게 하는 것은 ServerWriteRectorClientReadReactor template인데, 이들은 client의 요청에 대해 server가 여러 개의 응답을 비동기적으로 전송하는 이른바 gRPC의 server-side streaming을 구현하는데 가장 핵심이 되는 요소들이다. 우리 예제의 경우 client는 server로 “주식 가격을 알려 주세요”라는 request를 전송하면 ClientReadReactor의 OnReadDone() 함수로 변경된 주식과 가격이 하나씩 들어오는 식이다.

Proto file

주식 정보를 제공하는 proto file은 다음과 같이 정의 한다. 클라이언트가 UpdateStockPrice()를 요청하면 서버가 StockPriceResponse를 여러 개 stream으로 전송해 주는데 그 안에는 각각의 주식의 종목(symbol)과 가격(price) 정보가 담겨져 있다.

syntax = "proto3";
import "google/protobuf/empty.proto";

message StockPriceResponse {
    string symbol = 1;
    double price = 2;
}

service StockService {
    rpc UpdateStockPrice(google.protobuf.Empty) returns (stream StockPriceResponse);
}

StockPriceWriteReactor

StockPriceWriteReactor는 서버에서 동작하는 reactor이다.

class StockPriceWriteReactor
    : public ::grpc::ServerWriteReactor<::StockPriceResponse>
{
public:
  StockPriceWriteReactor(int evCnt);

  void OnWriteDone(bool ok) override;
  void OnDone() override;
  void OnCancel() override;

private:
  void NextWrite();

  int _mReqEventCount;
  int _mCurEventCount;
  StockPriceResponse _mResp;
  StockRepository _mStockRepo;
};
  • NextWrite(): 클라이언트로 전송할 데이터를 생성하고 stream에 쓴다.
  • OnWriteDone(): NextWrite()에서 호출하는 StartWrite()에 의해 하나의 정보가 쓰여졌을 때 호출되는 callback이다. 이전의 전송이 성공적으로 되었는지 검사하고 다음 정보를 전송한다.
  • OnDone(): Stream 전송 완료를 의미하는 Finish()를 호출하면 불리는 callback이다. 해당 인스턴스의 사용이 종료되었다는 의미이므로 메모리를 해제한다.

Server 구현

class StockServiceImpl final : public StockService::CallbackService
{
public:
  ...
  ServerWriteReactor<::StockPriceResponse> *UpdateStockPrice(
      CallbackServerContext *context,
      const google::protobuf::Empty *empty) override
  {
    return new StockPriceWriteReactor(_mEventCount);
  }
  ...
};

gRPC 비동기 호출을 위한 서버는 StockService::Server가 아닌 StockService::CallbackService를 상속받아 구현한다. StockService::Server가 write stream에 전송할 내용을 쓰고 grpc::Status를 반환 하도록 하는 것과 달리 CallbackService는 앞서 정의한 ServerWriteReactor<StockPriceResponse>* type을 반환 하도록 정의 되어있다.

StockPriceReadReactor

StockPriceReadReactor는 클라이언트에서 동작하는 reactor이다.

class StockPriceReadReactor
    : public ::grpc::ClientReadReactor<::StockPriceResponse>
{
public:
  StockPriceReadReactor(
      std::shared_ptr<StockService::Stub> stub, std::shared_ptr<Publisher> pub);
  void OnReadDone(bool ok) override;
  void OnDone(const ::grpc::Status &s) override;
  ::grpc::Status Await();

private:
  std::shared_ptr<Publisher> _mPub;
  ::grpc::ClientContext _mContext;
  ::StockPriceResponse _mResp;

  std::mutex _mMtx;
  std::condition_variable _mCondVar;
  ::grpc::Status _mStatus;
  bool _mAllDone = false;
};
  • OnReadDone(): StartRead() 호출을 통해 서버로 부터 하나의 record인 주식 정보 업데이트를 받을 때 마다 호출되는 callback이다. 이것을 이용해 Observer pattern에서 event publisher가 자신에게 등록된 subscriber들에게 event를 notify하는 코드를 구현할 수 있다.
  • OnDone(): 서버로 부터 stream 종료를 받으면 호출되는 callback이다. Await()과 공유되는 condition_variable을 이용해 process가 종료 될 수 있도록 한다.
  • Await(): 비동기 호출은 multi threading을 전체 하므로, 이 함수는 서버로 부터 받는 stream이 종료될 때까지 main thread가 종료되지 않고 유지 되도록 해준다.
  • mutext와 condition_variable: 위에서 설명한 OnDone()과 Await()이 thread control을 할 수 있도록 해주는 동기화 변수 들이다.

Client 구현

class StockClient
{
public:
  ...
  void updateStockPrice()
  {
    StockPriceReadReactor reader(_mStub, _mPub);
    Status status = reader.Await();
    if (status.ok())
    {
      spdlog::info("PriceListing succeed.");
    }
    else
    {
      spdlog::error("Failed to get prices.");
      spdlog::error("{}({})", status.error_message(), status.error_code());
    }
  }
  ...
};

클라이언트 코드는 StockPriceReadReactor의 instance를 만들고 Await()을 호출해서 서버로 부터 전송이 완료되기를 기다린다. 그럼 서버쪽으로 “주식 가격 주세요”라는 request는 누가 날리냐고? StockPriceReadReactor의 생성자에 다음과 같이 stub에 UpdateStockPrice()를 호출하는 부분이 정의되어 있다.

StockPriceReadReactor::StockPriceReadReactor(
    std::shared_ptr<StockService::Stub> stub, std::shared_ptr<Publisher> pub)
    : _mPub(pub)
{

  ::google::protobuf::Empty empty;
  stub->async()->UpdateStockPrice(&_mContext, &empty, this);
  StartRead(&_mResp);
  StartCall();
}

동작 확인

Code repo: https://github.com/litcoder/grpcobsr

References

  • gRPC Long-lived Streaming using Observer Pattern: Java로 gRPC의 Observer pattern을 구현한 내용을 설명한 글이다. 사실 본 포스팅의 시작도 원래는 이 구현을 C++로 변경해 보고자 하는 의도였으나 안타깝게도 C++에서 사용할 수 없는 의존성 때문에 많은 부분을 새롭게 작성해야 했다.
  • Asynchronous Callback API Tutorial: gPRC에 대한 비동기 호출에 대해 예제를 포함해서 매우 자세히 설명한 글이다. Observer pattern을 직접 언급하고 있지는 않으나 활용도가 높은 Unary, Server-side streaming, Client-side streaming 그리고 Bidirectional streaming을 설명한다.
  • gRPC API reference: API들에 대한 설명을 찾아 볼 수 있다. 친절하게 설명된 문서는 아니지만 그래도 없는 것 보다는 뭐…

Union을 이용한 byte 단위 접근

Big-endian으로 주어진 byte들을 little-endian으로 변환해야 하는 문제가 생겼다. Byte들의 order를 거꾸로 만드는 것은 어렵지 않지만 그러기 위해서는 byte pointer가 가리키는 element들을 1 byte 단위로 접근해야 한다. 1 byte씩 뒤집은 다음에는 변환된 array를 원하는 크기의 타입으로 읽도록 type casting을 해주어야 한다.

Union을 이용하면 코드를 못생기게 만드는 pointer 직접 연산이나 type casting을 하지 않고 이를 구현할 수 있다. 즉 union은 선언된 element의 가장 큰 크기 만큼의 메모리가 할당 되므로 같은 크기의 두 element를 서로 다른 data type으로 선언하는 것이다.

union
{
  int32_t v;
  uint8_t b[4];
} value;

위와 같이 선언하면 value.v = 0xdeadbeef 같은 식으로 int32를 쓰거나 읽을 수 있고 value.b[0] 같이 각 메모리 index에 접근 할 수 있다.

Template으로 만들어서 여러 타입에 대응 할 수 있다.

    template <typename T> T readFromBigEndian(uint8_t *b)
    {
      union
      {
        T v;
        uint8_t b[sizeof(T)];
      } dest;

      union
      {
        T v;
        uint8_t *b;
      } src;

      src.b = b;
      for (int i = 0; i < sizeof(T); ++i)
      {
        dest.b[i] = src.b[sizeof(T) - i - 1];
      }
      return dest.v;
    }