-
gRPC ② gRPC + 자바 프로젝트 구성해보기네트워크 & 인프라 2022. 10. 5. 22:02
이번엔 gRPC를 이용한 자바 프로젝트를 예시로 gRPC를 이해해보자.
참고한 글
- 깃허브 : https://github.com/HomoEfficio/springcamp2017-grpc-java-server
- 해당 코드의 모든 출처는 https://github.com/HomoEfficio 입니다 🙌
참고한 사이트
1. gRPC 플러그인 설정
- 맥북이 m1칩을 사용하는 경우 다음과 같이 :osx-x86_64 로 명시
group 'homo.efficio' version '1.0-SNAPSHOT' apply plugin: 'java' // protobuf 플러그인 적용 apply plugin: 'com.google.protobuf' apply plugin: 'idea' sourceCompatibility = 1.8 repositories { mavenCentral() } buildscript { repositories { mavenCentral() } dependencies { classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.6' } } protobuf { protoc { artifact = "com.google.protobuf:protoc:3.6.1:osx-x86_64" // protoc compiler에 의해 생성될 소스코드 위치 설정 generatedFilesBaseDir = "$projectDir/gen-src" } plugins { grpc { artifact = 'io.grpc:protoc-gen-grpc-java:1.16.1:osx-x86_64' } } generateProtoTasks { all()*.plugins { grpc {} } } } // gRPC에 의해 생성될 소스 코드의 디렉토리 위치 추가 sourceSets { main { java { srcDirs 'gen-src/main/java' srcDirs 'gen-src/main/grpc' } } } dependencies { compile 'io.grpc:grpc-core:1.16.1' compile 'io.grpc:grpc-netty:1.16.1' compile 'io.grpc:grpc-protobuf:1.16.1' compile 'io.grpc:grpc-stub:1.16.1' compile 'com.google.protobuf:protobuf-java:3.6.1' testCompile group: 'junit', name: 'junit', version: '4.12' }
2. main / proto / hello-springcamp.proto 파일 작성
- gRPC의 경우 .proto 파일을 이용하여 gRPC Base Files을 자동 생성
- 로컬 stub은 gRPC Base Files를 호출하고 원격 stub은 gRPC BaseFiles를 구현
// 메타 정보 syntax = "proto3"; option java_multiple_files = true; option java_outer_classname = "HelloGrpcProto"; package homo.efficio.springcamp2017.grpc.hello; // 자료 구조 message HelloRequest { string clientName = 1; } message HelloResponse { string welcomeMessage = 1; } // 메서드 선언 service HelloSpringCamp { rpc unaryHello(HelloRequest) returns (HelloResponse) {} rpc serverStreamingHello(HelloRequest) returns (stream HelloResponse) {} rpc clientStreamingHello(stream HelloRequest) returns (HelloResponse) {} rpc biStreamingHello(stream HelloRequest) returns (stream HelloResponse) {} }
3. .proto에 의한 stub 자료구조 생성 확인
- gen-src / main / grpc : stub
- gen-src / main / java : 자료구조
4. gRPC 서버 구성 1) ServerService(ServiceImpl) 구현
- Async Unary 스트리밍
- 진행 순서
- gRPC 클라이언트 → gRPC 서버 : request, responseObserver 보냄
- gRPC 서버는 요청을 처리하는 동안 gRPC 클라이언트는 다른 할 일 진행 (블로킹이 아니기 때문)
- gRPC 클라이언트 ← gRPC 서버 : responseObserver.onNext(), responseObserver.onCompleted() 응답값 보냄
- 진행 순서
- Server Service에서 unaryHello()를 정의하면 Server에서는 ServerService를 서버로 추가해주고 ServerRunner는 main()을 통해서 이를 활용
package homo.efficio.springcamp2017.grpc.hello; import io.grpc.stub.StreamObserver; import java.util.logging.Level; import java.util.logging.Logger; public class HelloGrpcServerService extends HelloSpringCampGrpc.HelloSpringCampImplBase { private final Logger logger = Logger.getLogger(HelloGrpcServerService.class.getName()); // Unary 용 코드 @Override public void unaryHello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) { logger.info("Unary message: " + request.getClientName()); HelloResponse helloResponse = HelloResponse.newBuilder().setWelcomeMessage("Unary Hello " + request.getClientName()).build(); responseObserver.onNext(helloResponse); responseObserver.onCompleted(); } // Async Unary 용 코드 // Async 동작 확인을 위해 Thread.sleep()으로 지연 시간을 준 것 외에는 Blocking Unary와 동일 @Override public void unaryHello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) { logger.info("Unary 메시지 왔다: " + request.getClientName()); // 1초 동안 비즈니스 로직 처리 후에 응답한다고 가정 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } HelloResponse helloResponse = HelloResponse.newBuilder() .setWelcomeMessage("Unary Hello " + request.getClientName()) .build(); // 응답 시작 responseObserver.onNext(helloResponse); // 응답 시작 후 1초 후에 응답 완료된다고 가정 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } // 응답 완료 responseObserver.onCompleted(); } }
5. gRPC 서버 구성 2) Server에 ServerService 추가
package homo.efficio.springcamp2017.grpc.hello; import io.grpc.BindableService; import io.grpc.Server; import io.grpc.ServerBuilder; import java.io.IOException; import java.util.logging.Logger; public class HelloGrpcServer { private final Logger logger = Logger.getLogger(HelloGrpcServer.class.getName()); private final int port; private final Server server; public HelloGrpcServer(int port, BindableService service) throws IOException { this.port = port; this.server = ServerBuilder.forPort(port) .addService(**service**) .build(); } public void start() throws IOException, InterruptedException { this.server.start(); logger.info("gRPC Server Listening on port " + port); this.server.awaitTermination(); } public void shutdown() { System.err.println("gRPC 서버 종료.."); server.shutdown(); System.err.println("gRPC 서버 종료 완료"); } }
6. gRPC 서버 구성 3) ServerRunner에서 활용
package homo.efficio.springcamp2017.grpc.hello; import io.grpc.BindableService; import io.grpc.Server; import java.io.IOException; public class HelloGrpcServerRunner { public static void main(String[] args) throws IOException, InterruptedException { final int port = 54321; final BindableService helloService = new HelloGrpcServerService(); HelloGrpcServer server = new HelloGrpcServer(port, helloService); server.start(); Runtime.getRuntime().addShutdownHook( new Thread(() -> server.shutdown()) ); } }
- HelloGprcServerRunner를 실행하면 다음과 같이 확인이 가능
7. gRPC 클라이언트 구성 1) Client 구현
- 클라이언트도 서버와 동일하게 모두 구성
- Async Unary 스트리밍
- Client 에서 sendAsyncUnaryMessage()를 정의하면 ClientStubFactory에서는 blockingStub, asyncStub, futureStub을 정의하고 ClientRunner는 main()을 통해서 이를 활용
package homo.efficio.springcamp2017.grpc.hello; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; import java.util.Iterator; import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; public class HelloGrpcClient { private final Logger logger = Logger.getLogger(HelloGrpcClient.class.getName()); private final HelloSpringCampGrpc.HelloSpringCampBlockingStub blockingStub; private final HelloSpringCampGrpc.HelloSpringCampStub asyncStub; private final HelloSpringCampGrpc.HelloSpringCampFutureStub futureStub; public HelloGrpcClient(HelloSpringCampGrpc.HelloSpringCampBlockingStub blockingStub, HelloSpringCampGrpc.HelloSpringCampStub asyncStub, HelloSpringCampGrpc.HelloSpringCampFutureStub futureStub) { this.blockingStub = blockingStub; this.asyncStub = asyncStub; this.futureStub = futureStub; } public void sendAsyncUnaryMessage(String clientName) { // 클라이언트 비즈니스 로직 수행 결과인 clientName으로 request 생성 HelloRequest request = HelloRequest.newBuilder().setClientName(clientName).build(); logger.info("Unary Hello 서비스 Async 호출, 메시지 [" + clientName + "]"); // 서버에 보낼 데이터를 담은 request와 // 비동기 방식으로 서버에서 호출될 콜백 객체도 함께 파라미터로 전달 // 로컬 메서드처럼 호출하는 RPC asyncStub.unaryHello( request, // 서버에 보낼 콜백 객체 new StreamObserver<HelloResponse>() { @Override public void onNext(HelloResponse response) { logger.info("Async Unary 서버로부터의 응답 " + response.getWelcomeMessage()); } @Override public void onError(Throwable t) { logger.log(Level.SEVERE, "Async Unary responseObserver.onError() 호출됨"); } @Override public void onCompleted() { logger.info("Async Unary 서버 응답 completed"); } } ); // 서버에서 응답이 올 때까지 기다리지 않고, 호출 결과에 상관없이 다른 작업 수행 가능 logger.info("(Nonblocking이면서)Async이니까 원격 메서드 호출 직후 바로 로그가 찍힌다."); } }
8. gRPC 클라이언트 구성 2) ClientStubFactory 구현
package homo.efficio.springcamp2017.grpc.hello; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; public class HelloGrpcClientStubFactory { private final Logger logger = Logger.getLogger(HelloGrpcClientStubFactory.class.getName()); private final ManagedChannel channel; private final HelloSpringCampGrpc.HelloSpringCampBlockingStub blockingStub; private final HelloSpringCampGrpc.HelloSpringCampStub asyncStub; private final HelloSpringCampGrpc.HelloSpringCampFutureStub futureStub; public HelloGrpcClientStubFactory(String host, int port) { this.channel = ManagedChannelBuilder.forAddress(host, port) .usePlaintext(true) .build(); this.blockingStub = HelloSpringCampGrpc.newBlockingStub(channel); this.asyncStub = HelloSpringCampGrpc.newStub(channel); this.futureStub = HelloSpringCampGrpc.newFutureStub(channel); } public void shutdownChannel() throws InterruptedException { logger.info("gRPC Channel shutdown..."); this.channel.shutdown().awaitTermination(2, TimeUnit.SECONDS); } public HelloSpringCampGrpc.HelloSpringCampBlockingStub getBlockingStub() { return blockingStub; } public HelloSpringCampGrpc.HelloSpringCampStub getAsyncStub() { return asyncStub; } public HelloSpringCampGrpc.HelloSpringCampFutureStub getFutureStub() { return futureStub; } }
9. gRPC 클라이언트 구성 2) ClientRunner 구현
package homo.efficio.springcamp2017.grpc.hello; import java.util.Arrays; public class HelloGrpcClientRunner { public static void main(String[] args) throws InterruptedException { String host = "localhost"; int port = 54321; HelloGrpcClientStubFactory clientStubFactory = new HelloGrpcClientStubFactory(host, port); HelloGrpcClient grpcClient = new HelloGrpcClient(clientStubFactory.getBlockingStub(), clientStubFactory.getAsyncStub(), clientStubFactory.getFutureStub()); // Async Unary grpcClient.sendAsyncUnaryMessage("Async Unary, gㅏ벼운 RPC, gRPC"); Thread.sleep(3000); // 예제이기 때문에 요청 후 채널을 닫지만, 실무에서는 채널을 닫지않고 재활용 가능 clientStubFactory.shutdownChannel(); }
- ClientRunner 실행시 다음과 같이 확인 가능
+ Thread와 gRPC
- Thread Safe (gRPC 라이브러리에서 제공)
- Stub
- Channel
- Thread not safe → 개발자가 Thread - safe 하게 구현해야 함
- StreamObserver
+ 4 스트리밍과 3 stub 구현해보기
- ServerService
public class HelloGrpcServerService extends HelloSpringCampGrpc.HelloSpringCampImplBase { private final Logger logger = Logger.getLogger(HelloGrpcServerService.class.getName()); // Async Unary 용 코드 // Async 동작 확인을 위해 Thread.sleep()으로 지연 시간을 준 것 외에는 Blocking Unary와 동일 @Override public void unaryHello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) { logger.info("Unary 메시지 왔다: " + request.getClientName()); // 1초 동안 비즈니스 로직 처리 후에 응답한다고 가정 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } HelloResponse helloResponse = HelloResponse.newBuilder() .setWelcomeMessage("Unary Hello " + request.getClientName()) .build(); // 응답 시작 responseObserver.onNext(helloResponse); // 응답 시작 후 1초 후에 응답 완료된다고 가정 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } // 응답 완료 responseObserver.onCompleted(); } @Override public void serverStreamingHello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) { logger.info("Server Streaming 메시지 왔다: " + request.getClientName()); // 1초 동안 비즈니스 로직 처리 후에 응답한다고 가정 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } HelloResponse response = HelloResponse.newBuilder() .setWelcomeMessage("Server Streaming Hello " + request.getClientName()) .build(); // 클라이언트에 의해 데이터는 한 번 들어오지만 Server Streaming이므로 responseObserver.onNext()를 여러번 호출 가능 responseObserver.onNext(response); responseObserver.onNext(response); responseObserver.onNext(response); // 응답 시작 후 1초 후에 응답 완료된다고 가정 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } // 응답 완료 responseObserver.onCompleted(); } @Override public StreamObserver<HelloRequest> clientStreamingHello(StreamObserver<HelloResponse> responseObserver) { return new StreamObserver<HelloRequest>() { StringBuilder sb = new StringBuilder(); @Override public void onNext(HelloRequest request) { // 클라이언트로부터 여러ㅓㅂㄴ의 onNext가 호출될 예정 logger.info("Client Streaming 메시지 왔다: " + request.getClientName()); sb.append("Client Streaming Hello " + request.getClientName()) .append("\\n============================\\n"); // 1초 동안 비즈니스 로직 처리 후에 응답한다고 가정 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void onError(Throwable t) { logger.log(Level.SEVERE, "Client Streaming requestObserver.onError() 호출"); } @Override public void onCompleted() { responseObserver.onNext(HelloResponse.newBuilder().setWelcomeMessage(sb.toString()).build()); // server는 Streaming이 아니므로 responseObserver.onNext()를 1회만 호출 가능 // 아래와 같이 2회 이상 호출하면 responseObserver.onError() 호출됨(여러번 응답시 biStream형식이 되기 때문) // responseObserver.onNext(HelloResponse.newBuilder().setWelcomeMessage(sb.toString()).build()); // 응답 시작 후 1초 후에 응답 완료된다고 가정 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } responseObserver.onCompleted(); } }; } @Override public StreamObserver<HelloRequest> biStreamingHello(StreamObserver<HelloResponse> responseObserver) { return new StreamObserver<HelloRequest>() { StringBuilder sb = new StringBuilder(); @Override public void onNext(HelloRequest request) { // 클라이언트로부터 데이터가 올 때마다 onNext호출 logger.info("Bidirectional Streaming 메시지 왔다: " + request.getClientName()); sb.append("Bidirectional Streaming Hello " + request.getClientName()) .append("\\n============================\\n"); // 1초 동안 비즈니스 로직 처리 후에 응답한다고 가정 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } responseObserver.onNext(HelloResponse.newBuilder().setWelcomeMessage(sb.toString()).build()); // BiDirectional Streaming 이므로 responseObserver.onNext()를 2회 이상 호출할 수 있음. // responseObserver.onNext(HelloResponse.newBuilder().setWelcomeMessage(sb.toString()).build()); } @Override public void onError(Throwable t) { logger.log(Level.SEVERE, "BiDirectional Streaming requestObserver.onError() 호출"); } @Override public void onCompleted() { // 응답 시작 후 1초 후에 응답 완료된다고 가정 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } responseObserver.onCompleted(); } }; } }
- Server
public class HelloGrpcServer { private final Logger logger = Logger.getLogger(HelloGrpcServer.class.getName()); private final int port; private final Server server; public HelloGrpcServer(int port, BindableService service) throws IOException { this.port = port; this.server = ServerBuilder.forPort(port) .addService(service) .build(); } public void start() throws IOException, InterruptedException { this.server.start(); logger.info("gRPC Server Listening on port " + port); this.server.awaitTermination(); } public void shutdown() { System.err.println("gRPC 서버 종료.."); server.shutdown(); System.err.println("gRPC 서버 종료 완료"); } }
- ServerRunner
public class HelloGrpcServerRunner { public static void main(String[] args) throws IOException, InterruptedException { final int port = 54321; final BindableService helloService = new HelloGrpcServerService(); HelloGrpcServer server = new HelloGrpcServer(port, helloService); server.start(); Runtime.getRuntime().addShutdownHook( new Thread(() -> server.shutdown()) ); } }
- Client
public class HelloGrpcClient { private final Logger logger = Logger.getLogger(HelloGrpcClient.class.getName()); private final HelloSpringCampGrpc.HelloSpringCampBlockingStub blockingStub; private final HelloSpringCampGrpc.HelloSpringCampStub asyncStub; private final HelloSpringCampGrpc.HelloSpringCampFutureStub futureStub; public HelloGrpcClient(HelloSpringCampGrpc.HelloSpringCampBlockingStub blockingStub, HelloSpringCampGrpc.HelloSpringCampStub asyncStub, HelloSpringCampGrpc.HelloSpringCampFutureStub futureStub) { this.blockingStub = blockingStub; this.asyncStub = asyncStub; this.futureStub = futureStub; } public void sendBlockingUnaryMessage(String clientName) { // 클라이언트 비즈니스 로직 수행 결과인 clientName으로 request 생성 HelloRequest request = HelloRequest.newBuilder().setClientName(clientName).build(); HelloResponse response; try { logger.info("Unary 서비스 호출, 메시지 [" + clientName + "]"); response = blockingStub.unaryHello(request); } catch (StatusRuntimeException e) { logger.log(Level.SEVERE, "Unary 서비스 호출 중 실패: " + e.getStatus()); return; } logger.info("Unary 서비스 응답: " + response.getWelcomeMessage()); } public void sendAsyncUnaryMessage(String clientName) { // 클라이언트 비즈니스 로직 수행 결과인 clientName으로 request 생성 HelloRequest request = HelloRequest.newBuilder().setClientName(clientName).build(); logger.info("Unary 서비스 Async 호출, 메시지 [" + clientName + "]"); // 서버에 보낼 데이터를 담은 request와 비동기로 응답을 받기 위해 서버에서 호출될 콜백 객체도 함께 파라미터로 전달 asyncStub.unaryHello( request, // 서버에 보낼 콜백 객체 new StreamObserver<HelloResponse>() { @Override public void onNext(HelloResponse response) { logger.info("Async Unary 서버로부터의 응답 " + response.getWelcomeMessage()); } @Override public void onError(Throwable t) { logger.log(Level.SEVERE, "Async Unary responseObserver.onError() 호출됨"); } @Override public void onCompleted() { logger.info("Async Unary 서버 응답 completed"); } } ); // 서버에서 응답이 올 때까지 기다리지 않고, 호출 결과에 상관없이 다른 작업 수행 가능 logger.info("Nonblocking + Async 이니까 원격 메서드 호출(rpc) 직후 바로 로그가 찍힘."); } public void sendBlockingServerStreamingMessage(String clientName) { // 클라이언트 비즈니스 로직 수행 결과인 clientName으로 request 생성 HelloRequest request = HelloRequest.newBuilder().setClientName(clientName).build(); Iterator<HelloResponse> responseIterator; // streaming이 완료될 때 까지 대기 try { logger.info("Server Streaming Hello 서비스 Blocking 호출, 메시지 [" + clientName + "]"); responseIterator = blockingStub.serverStreamingHello(request); } catch (StatusRuntimeException e) { logger.log(Level.SEVERE, "Server Streaming Hello 서비스 Blocking 호출 중 실패: " + e.getStatus()); return; } responseIterator.forEachRemaining( (response) -> logger.info("Server Streaming Hello 서비스 응답: " + response.getWelcomeMessage()) ); } public void sendAsyncServerStreamingMessage(String clientName) { // 클라이언트 비즈니스 로직 수행 결과인 clientName으로 request 생성 HelloRequest request = HelloRequest.newBuilder().setClientName(clientName).build(); Iterator<HelloResponse> responseIterator; logger.info("Server Streaming Hello 서비스 Async 호출, 메시지 [" + clientName + "]"); asyncStub.serverStreamingHello( request, new StreamObserver<HelloResponse>() { @Override public void onNext(HelloResponse response) { logger.info("Async Server Streaming 서버로부터의 응답 " + response.getWelcomeMessage()); } @Override public void onError(Throwable t) { logger.log(Level.SEVERE, "Async Server Streaming responseObserver.onError() 호출됨"); } @Override public void onCompleted() { logger.info("Async Server Streaming 서버 응답 completed"); } } ); // 서버에서 응답이 올 때까지 기다리지 않고, 호출 결과에 상관없이 다른 작업 수행 가능 logger.info("Nonblocking + Async 이니까 원격 메서드 호출 직후 바로 로그가 찍힘."); } // Client Streaming은 AsyncStub에서만 가능 public void sendAsyncClientStreamingMessage(List<String> messages) { // 서버에 보낼 콜백 객체(비동기로 응답 받을 객체 생성) StreamObserver<HelloResponse> responseObserver = new StreamObserver<HelloResponse>() { @Override public void onNext(HelloResponse value) { logger.info("Async Client Streaming 서버로부터의 응답\\n" + value.getWelcomeMessage()); } @Override public void onError(Throwable t) { logger.log(Level.SEVERE, "Async Clent Streaming responseObserver.onError() 호출됨"); } @Override public void onCompleted() { logger.info("Async Client Streaming 서버 응답 completed"); } }; // 비동기로 요청을 보낼 객체 생성 및 응답 받을 객체와 연결 StreamObserver<HelloRequest> requestObserver = asyncStub.clientStreamingHello(responseObserver); try { for (String msg: messages) { requestObserver.onNext(HelloRequest.newBuilder().setClientName(msg).build()); } } catch (Exception e) { requestObserver.onError(e); throw e; } // 서버에서 응답이 올 때까지 기다리지 않고, 호출 결과에 상관없이 다른 작업 수행 가능 logger.info("Nonblocking + Async이니까 원격 메서드 호출 직후 바로 로그가 찍힘."); // 데이터 전송 완료 시그널 requestObserver.onCompleted(); } // Bidirectional Streaming은 AsyncStub에서만 가능 public void sendBidirectionalStreamingMessage(List<String> messages) { // 서버에 보낼 콜백 객체(응답 받을 객체 생성) StreamObserver<HelloResponse> responseObserver = new StreamObserver<HelloResponse>() { @Override public void onNext(HelloResponse response) { logger.info("Bidirectional Streaming 서버로부터의 응답\\n" + response.getWelcomeMessage()); } @Override public void onError(Throwable t) { logger.log(Level.SEVERE, "Bidirectional Streaming responseObserver.onError() 호출됨"); } @Override public void onCompleted() { logger.info("Bidirectional Streaming 서버 응답 completed"); } }; // 요청 보낼 객체 생성 및 응답 객체와 연결 StreamObserver<HelloRequest> requestObserver = asyncStub.biStreamingHello(responseObserver); try { for (String msg: messages) { requestObserver.onNext(HelloRequest.newBuilder().setClientName(msg).build()); } } catch (Exception e) { requestObserver.onError(e); throw e; } // 서버에서 응답이 올 때까지 기다리지 않고, 호출 결과에 상관없이 다른 작업 수행 가능 logger.info("Nonblocking + Async이니까 원격 메서드 호출 직후 바로 로그가 찍힘"); requestObserver.onCompleted(); } }
- ClientStubFactory
public class HelloGrpcClientStubFactory { private final Logger logger = Logger.getLogger(HelloGrpcClientStubFactory.class.getName()); private final ManagedChannel channel; private final HelloSpringCampGrpc.HelloSpringCampBlockingStub blockingStub; private final HelloSpringCampGrpc.HelloSpringCampStub asyncStub; private final HelloSpringCampGrpc.HelloSpringCampFutureStub futureStub; public HelloGrpcClientStubFactory(String host, int port) { this.channel = ManagedChannelBuilder.forAddress(host, port) .usePlaintext(true) .build(); this.blockingStub = HelloSpringCampGrpc.newBlockingStub(channel); this.asyncStub = HelloSpringCampGrpc.newStub(channel); this.futureStub = HelloSpringCampGrpc.newFutureStub(channel); } public void shutdownChannel() throws InterruptedException { logger.info("gRPC Channel shutdown..."); this.channel.shutdown().awaitTermination(2, TimeUnit.SECONDS); } public HelloSpringCampGrpc.HelloSpringCampBlockingStub getBlockingStub() { return blockingStub; } public HelloSpringCampGrpc.HelloSpringCampStub getAsyncStub() { return asyncStub; } public HelloSpringCampGrpc.HelloSpringCampFutureStub getFutureStub() { return futureStub; } }
- ClientRunner
public class HelloGrpcClientRunner { public static void main(String[] args) throws InterruptedException { String host = "localhost"; int port = 54321; HelloGrpcClientStubFactory clientStubFactory = new HelloGrpcClientStubFactory(host, port); HelloGrpcClient grpcClient = new HelloGrpcClient(clientStubFactory.getBlockingStub(), clientStubFactory.getAsyncStub(), clientStubFactory.getFutureStub()); // Blocking Unary grpcClient.sendBlockingUnaryMessage("Blocking Unary, gㅏ벼운 RPC, gPRC"); clientStubFactory.shutdownChannel(); // Async Unary grpcClient.sendAsyncUnaryMessage("Async Unary, gㅏ벼운 RPC, gRPC"); Thread.sleep(3000); clientStubFactory.shutdownChannel(); // Blocking Server Streaming grpcClient.sendBlockingServerStreamingMessage("Blocking Server Streaming, gㅏ벼운 RPC, gPRC"); clientStubFactory.shutdownChannel(); // Async Server Streaming grpcClient.sendAsyncServerStreamingMessage("Async Server Streaming, gㅏ벼운 RPC, gRPC"); Thread.sleep(3000); clientStubFactory.shutdownChannel(); // Async Client Streaming grpcClient.sendAsyncClientStreamingMessage(Arrays.asList("Async Client Streaming,", "gㅏ벼운 RPC,", "gRPC")); Thread.sleep(3000); clientStubFactory.shutdownChannel(); // 아래와 같이 1초만 대기해서 서버 응답 전에 channel을 닫으면 // 클라이언트는 응답을 못 받으며, 서버 쪽에서도 에러가 발생하지 않는다. Thread.sleep(1000); clientStubFactory.shutdownChannel(); // Bidirectional Client Streaming grpcClient.sendBidirectionalStreamingMessage(Arrays.asList("Async Bidirectional Streaming,", "gㅏ벼운 RPC,", "gRPC")); Thread.sleep(3000); clientStubFactory.shutdownChannel(); }
'네트워크 & 인프라' 카테고리의 다른 글
쿠버네티스 ① 메인 K8s component (1) (1) 2022.10.08 gRPC ③ gRPC + 스프링부트 프로젝트 구성해보기 (0) 2022.10.05 gRPC ① gRPC란 ( + Kotlin 설정) (1) 2022.09.30 SSH 별칭으로 접속 시도시 RSA 공유키 충돌 문제 발생 (0) 2022.05.30 그림으로 공부하는 IT 인프라 구조 정리 (0) 2022.04.15