ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • gRPC ② gRPC + 자바 프로젝트 구성해보기
    네트워크 & 인프라 2022. 10. 5. 22:02

     

     

     

     

    이번엔 gRPC를 이용한 자바 프로젝트를 예시로 gRPC를 이해해보자. 

     

     

     

     

    참고한 글 
     

    gRPC 사용법, gRPC 예제 코드 실행해보기, 원리는 몰라도 gRPC 입문은 가능하다 (grpc java example)

    이 포스트는 springcamp2017에서 grpc발표를 하신 오명운님의 발표 자료 및 github소스를 참고해서 작성한 것입니다. gRPC의 장점 service 정의가 단순하다 여러 프로그래밍 언어나 플랫폼에서 사용이 가

    jeong-pro.tistory.com

     

    GitHub - HomoEfficio/springcamp2017-grpc-java-server: SpringCamp 2017에서 사용할 gRPC Java 예제 서버

    SpringCamp 2017에서 사용할 gRPC Java 예제 서버. Contribute to HomoEfficio/springcamp2017-grpc-java-server development by creating an account on GitHub.

    github.com

     

     

     

    참고한 사이트 
     

    Getting Started

    Spring Boot starter module for gRPC framework.

    yidongnan.github.io

     

     

     

     

    1. gRPC 플러그인 설정
    • 맥북이 m1칩을 사용하는 경우 다음과 같이 :osx-x86_64 로 명시
    group 'homo.efficio'
    version '1.0-SNAPSHOT'
    
    apply plugin: 'java'
    // protobuf 플러그인 적용 
    apply plugin: 'com.google.protobuf'
    apply plugin: 'idea'
    
    sourceCompatibility = 1.8
    
    repositories {
        mavenCentral()
    }
    
    buildscript {
    
        repositories {
            mavenCentral()
        }
    
        dependencies {
            classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.6'
        }
    }
    
    protobuf {
        protoc {
            artifact = "com.google.protobuf:protoc:3.6.1:osx-x86_64"
            // protoc compiler에 의해 생성될 소스코드 위치 설정 
    				generatedFilesBaseDir = "$projectDir/gen-src"
        }
        plugins {
            grpc {
                artifact = 'io.grpc:protoc-gen-grpc-java:1.16.1:osx-x86_64'
            }
        }
        generateProtoTasks {
            all()*.plugins {
                grpc {}
            }
        }
    }
    
    // gRPC에 의해 생성될 소스 코드의 디렉토리 위치 추가 
    sourceSets {
        main {
            java {
                srcDirs 'gen-src/main/java'
                srcDirs 'gen-src/main/grpc'
            }
        }
     }
    
    dependencies {
        compile 'io.grpc:grpc-core:1.16.1'
        compile 'io.grpc:grpc-netty:1.16.1'
        compile 'io.grpc:grpc-protobuf:1.16.1'
        compile 'io.grpc:grpc-stub:1.16.1'
        compile 'com.google.protobuf:protobuf-java:3.6.1'
    
        testCompile group: 'junit', name: 'junit', version: '4.12'
    }

     

     

     

     

    2. main / proto / hello-springcamp.proto 파일 작성
    • gRPC의 경우 .proto 파일을 이용하여 gRPC Base Files을 자동 생성
    • 로컬 stub은 gRPC Base Files를 호출하고 원격 stub은 gRPC BaseFiles를 구현
    // 메타 정보 
    syntax = "proto3";
    
    option java_multiple_files = true;
    option java_outer_classname = "HelloGrpcProto";
    
    package homo.efficio.springcamp2017.grpc.hello;
    
    // 자료 구조 
    message HelloRequest {
        string clientName = 1;
    }
    
    message HelloResponse {
        string welcomeMessage = 1;
    }
    
    // 메서드 선언 
    service HelloSpringCamp {
    
        rpc unaryHello(HelloRequest) returns (HelloResponse) {}
    
        rpc serverStreamingHello(HelloRequest) returns (stream HelloResponse) {}
    
        rpc clientStreamingHello(stream HelloRequest) returns (HelloResponse) {}
    
        rpc biStreamingHello(stream HelloRequest) returns (stream HelloResponse) {}
    }

     

     

     

     

     

     

    3. .proto에 의한 stub 자료구조 생성 확인
    • gen-src / main / grpc : stub
    • gen-src / main / java : 자료구조

     

     

     

     

     

     

     

    4. gRPC 서버 구성 1) ServerService(ServiceImpl) 구현
    • Async Unary 스트리밍
      • 진행 순서
        1. gRPC 클라이언트 → gRPC 서버 : request, responseObserver 보냄
        2. gRPC 서버는 요청을 처리하는 동안 gRPC 클라이언트는 다른 할 일 진행 (블로킹이 아니기 때문)
        3. gRPC 클라이언트 ← gRPC 서버 : responseObserver.onNext(), responseObserver.onCompleted() 응답값 보냄
    • Server Service에서 unaryHello()를 정의하면 Server에서는 ServerService를 서버로 추가해주고 ServerRunner는 main()을 통해서 이를 활용
    package homo.efficio.springcamp2017.grpc.hello;
    
    import io.grpc.stub.StreamObserver;
    
    import java.util.logging.Level;
    import java.util.logging.Logger;
    
    public class HelloGrpcServerService extends HelloSpringCampGrpc.HelloSpringCampImplBase {
    
        private final Logger logger = Logger.getLogger(HelloGrpcServerService.class.getName());
    
    		// Unary 용 코드 
        @Override
        public void unaryHello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
            logger.info("Unary message: " + request.getClientName());
            HelloResponse helloResponse = HelloResponse.newBuilder().setWelcomeMessage("Unary Hello " + request.getClientName()).build();
            responseObserver.onNext(helloResponse);
            responseObserver.onCompleted();
        }
    
        // Async Unary 용 코드
        // Async 동작 확인을 위해 Thread.sleep()으로 지연 시간을 준 것 외에는 Blocking Unary와 동일
        @Override
        public void unaryHello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
            logger.info("Unary 메시지 왔다: " + request.getClientName());
    
            // 1초 동안 비즈니스 로직 처리 후에 응답한다고 가정
            try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
    
            HelloResponse helloResponse =
                    HelloResponse.newBuilder()
                                 .setWelcomeMessage("Unary Hello " + request.getClientName())
                                 .build();
    
            // 응답 시작
            responseObserver.onNext(helloResponse);
    
            // 응답 시작 후 1초 후에 응답 완료된다고 가정
            try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
    
            // 응답 완료
            responseObserver.onCompleted();
        }
    }

     

     

     

     

     

    5. gRPC 서버 구성 2) Server에 ServerService 추가
    package homo.efficio.springcamp2017.grpc.hello;
    
    import io.grpc.BindableService;
    import io.grpc.Server;
    import io.grpc.ServerBuilder;
    
    import java.io.IOException;
    import java.util.logging.Logger;
    
    public class HelloGrpcServer {
    
        private final Logger logger = Logger.getLogger(HelloGrpcServer.class.getName());
    
        private final int port;
        private final Server server;
    
        public HelloGrpcServer(int port, BindableService service) throws IOException {
            this.port = port;
            this.server = ServerBuilder.forPort(port)
                                       .addService(**service**)
                                       .build();
        }
    
        public void start() throws IOException, InterruptedException {
            this.server.start();
            logger.info("gRPC Server Listening on port " + port);
            this.server.awaitTermination();
        }
    
        public void shutdown() {
            System.err.println("gRPC 서버 종료..");
            server.shutdown();
            System.err.println("gRPC 서버 종료 완료");
        }
    }

     

     

     

     

    6. gRPC 서버 구성 3) ServerRunner에서 활용
    package homo.efficio.springcamp2017.grpc.hello;
    
    import io.grpc.BindableService;
    import io.grpc.Server;
    
    import java.io.IOException;
    
    public class HelloGrpcServerRunner {
    
        public static void main(String[] args) throws IOException, InterruptedException {
    
            final int port = 54321;
            final BindableService helloService = new HelloGrpcServerService();
    
            HelloGrpcServer server = new HelloGrpcServer(port, helloService);
    
            server.start();
    
            Runtime.getRuntime().addShutdownHook(
                    new Thread(() -> server.shutdown())
            );
        }
    }
    • HelloGprcServerRunner를 실행하면 다음과 같이 확인이 가능

     

     

     

     

    7. gRPC 클라이언트 구성 1) Client 구현
    • 클라이언트도 서버와 동일하게 모두 구성
    • Async Unary 스트리밍
    • Client 에서 sendAsyncUnaryMessage()를 정의하면 ClientStubFactory에서는 blockingStub, asyncStub, futureStub을 정의하고 ClientRunner는 main()을 통해서 이를 활용
    package homo.efficio.springcamp2017.grpc.hello;
    
    import io.grpc.StatusRuntimeException;
    import io.grpc.stub.StreamObserver;
    
    import java.util.Iterator;
    import java.util.List;
    import java.util.logging.Level;
    import java.util.logging.Logger;
    
    public class HelloGrpcClient {
    
        private final Logger logger = Logger.getLogger(HelloGrpcClient.class.getName());
    
        private final HelloSpringCampGrpc.HelloSpringCampBlockingStub blockingStub;
        private final HelloSpringCampGrpc.HelloSpringCampStub asyncStub;
        private final HelloSpringCampGrpc.HelloSpringCampFutureStub futureStub;
    
        public HelloGrpcClient(HelloSpringCampGrpc.HelloSpringCampBlockingStub blockingStub,
                               HelloSpringCampGrpc.HelloSpringCampStub asyncStub,
                               HelloSpringCampGrpc.HelloSpringCampFutureStub futureStub) {
            this.blockingStub = blockingStub;
            this.asyncStub = asyncStub;
            this.futureStub = futureStub;
        }
    
        public void sendAsyncUnaryMessage(String clientName) {
    
            // 클라이언트 비즈니스 로직 수행 결과인 clientName으로 request 생성
            HelloRequest request = HelloRequest.newBuilder().setClientName(clientName).build();
            logger.info("Unary Hello 서비스 Async 호출, 메시지 [" + clientName + "]");
    
            // 서버에 보낼 데이터를 담은 request와
            // 비동기 방식으로 서버에서 호출될 콜백 객체도 함께 파라미터로 전달
    				// 로컬 메서드처럼 호출하는 RPC
            asyncStub.unaryHello(
                    request,
                    // 서버에 보낼 콜백 객체
                    new StreamObserver<HelloResponse>() {
                        @Override
                        public void onNext(HelloResponse response) {
                            logger.info("Async Unary 서버로부터의 응답 " + response.getWelcomeMessage());
                        }
    
                        @Override
                        public void onError(Throwable t) {
                            logger.log(Level.SEVERE, "Async Unary responseObserver.onError() 호출됨");
                        }
    
                        @Override
                        public void onCompleted() {
                            logger.info("Async Unary 서버 응답 completed");
                        }
                    }
            );
    
            // 서버에서 응답이 올 때까지 기다리지 않고, 호출 결과에 상관없이 다른 작업 수행 가능
            logger.info("(Nonblocking이면서)Async이니까 원격 메서드 호출 직후 바로 로그가 찍힌다.");
        }
    }

     

     

     

     

     

    8. gRPC 클라이언트 구성 2) ClientStubFactory 구현
    package homo.efficio.springcamp2017.grpc.hello;
    
    import io.grpc.ManagedChannel;
    import io.grpc.ManagedChannelBuilder;
    
    import java.util.concurrent.TimeUnit;
    import java.util.logging.Logger;
    
    public class HelloGrpcClientStubFactory {
    
        private final Logger logger = Logger.getLogger(HelloGrpcClientStubFactory.class.getName());
    
        private final ManagedChannel channel;
        private final HelloSpringCampGrpc.HelloSpringCampBlockingStub blockingStub;
        private final HelloSpringCampGrpc.HelloSpringCampStub asyncStub;
        private final HelloSpringCampGrpc.HelloSpringCampFutureStub futureStub;
    
        public HelloGrpcClientStubFactory(String host, int port) {
            this.channel = ManagedChannelBuilder.forAddress(host, port)
                                                .usePlaintext(true)
                                                .build();
            this.blockingStub = HelloSpringCampGrpc.newBlockingStub(channel);
            this.asyncStub = HelloSpringCampGrpc.newStub(channel);
            this.futureStub = HelloSpringCampGrpc.newFutureStub(channel);
        }
    
        public void shutdownChannel() throws InterruptedException {
            logger.info("gRPC Channel shutdown...");
            this.channel.shutdown().awaitTermination(2, TimeUnit.SECONDS);
        }
    
        public HelloSpringCampGrpc.HelloSpringCampBlockingStub getBlockingStub() {
            return blockingStub;
        }
    
        public HelloSpringCampGrpc.HelloSpringCampStub getAsyncStub() {
            return asyncStub;
        }
    
        public HelloSpringCampGrpc.HelloSpringCampFutureStub getFutureStub() {
            return futureStub;
        }
    }

     

     

     

     

    9. gRPC 클라이언트 구성 2) ClientRunner 구현
    package homo.efficio.springcamp2017.grpc.hello;
    
    import java.util.Arrays;
    
    public class HelloGrpcClientRunner {
    
        public static void main(String[] args) throws InterruptedException {
    
            String host = "localhost";
            int port = 54321;
    
            HelloGrpcClientStubFactory clientStubFactory =
                    new HelloGrpcClientStubFactory(host, port);
    
            HelloGrpcClient grpcClient =
                    new HelloGrpcClient(clientStubFactory.getBlockingStub(),
                                        clientStubFactory.getAsyncStub(),
                                        clientStubFactory.getFutureStub());
    
            // Async Unary
            grpcClient.sendAsyncUnaryMessage("Async Unary, gㅏ벼운 RPC, gRPC");
            Thread.sleep(3000);
    				// 예제이기 때문에 요청 후 채널을 닫지만, 실무에서는 채널을 닫지않고 재활용 가능 
            clientStubFactory.shutdownChannel();
    }
    • ClientRunner 실행시 다음과 같이 확인 가능

     

     

     

     

     

    + Thread와 gRPC
    • Thread Safe (gRPC 라이브러리에서 제공)
      • Stub
      • Channel
    • Thread not safe → 개발자가 Thread - safe 하게 구현해야 함
      • StreamObserver

     

     

     

     

     

    + 4 스트리밍과 3 stub 구현해보기
    • ServerService
    public class HelloGrpcServerService extends HelloSpringCampGrpc.HelloSpringCampImplBase {
    
        private final Logger logger = Logger.getLogger(HelloGrpcServerService.class.getName());
    
        // Async Unary 용 코드
        // Async 동작 확인을 위해 Thread.sleep()으로 지연 시간을 준 것 외에는 Blocking Unary와 동일
        @Override
        public void unaryHello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
            logger.info("Unary 메시지 왔다: " + request.getClientName());
    
            // 1초 동안 비즈니스 로직 처리 후에 응답한다고 가정
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            HelloResponse helloResponse =
                    HelloResponse.newBuilder()
                            .setWelcomeMessage("Unary Hello " + request.getClientName())
                            .build();
    
            // 응답 시작
            responseObserver.onNext(helloResponse);
    
            // 응답 시작 후 1초 후에 응답 완료된다고 가정
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            // 응답 완료
            responseObserver.onCompleted();
        }
    
        @Override
        public void serverStreamingHello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
            logger.info("Server Streaming 메시지 왔다: " + request.getClientName());
    
            // 1초 동안 비즈니스 로직 처리 후에 응답한다고 가정
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            HelloResponse response =
                    HelloResponse.newBuilder()
                            .setWelcomeMessage("Server Streaming Hello " + request.getClientName())
                            .build();
    
            // 클라이언트에 의해 데이터는 한 번 들어오지만 Server Streaming이므로 responseObserver.onNext()를 여러번 호출 가능
            responseObserver.onNext(response);
            responseObserver.onNext(response);
            responseObserver.onNext(response);
    
            // 응답 시작 후 1초 후에 응답 완료된다고 가정
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            // 응답 완료
            responseObserver.onCompleted();
        }
    
        @Override
        public StreamObserver<HelloRequest> clientStreamingHello(StreamObserver<HelloResponse> responseObserver) {
            return new StreamObserver<HelloRequest>() {
                StringBuilder sb = new StringBuilder();
    
                @Override
                public void onNext(HelloRequest request) {
                    // 클라이언트로부터 여러ㅓㅂㄴ의 onNext가 호출될 예정
                    logger.info("Client Streaming 메시지 왔다: " + request.getClientName());
    
                    sb.append("Client Streaming Hello " + request.getClientName())
                            .append("\\n============================\\n");
    
                    // 1초 동안 비즈니스 로직 처리 후에 응답한다고 가정
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
    
                @Override
                public void onError(Throwable t) {
                    logger.log(Level.SEVERE, "Client Streaming requestObserver.onError() 호출");
                }
    
                @Override
                public void onCompleted() {
                    responseObserver.onNext(HelloResponse.newBuilder().setWelcomeMessage(sb.toString()).build());
                    // server는 Streaming이 아니므로 responseObserver.onNext()를 1회만 호출 가능
                    // 아래와 같이 2회 이상 호출하면 responseObserver.onError() 호출됨(여러번 응답시 biStream형식이 되기 때문)
    //                responseObserver.onNext(HelloResponse.newBuilder().setWelcomeMessage(sb.toString()).build());
    
                    // 응답 시작 후 1초 후에 응답 완료된다고 가정
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    responseObserver.onCompleted();
                }
            };
        }
    
        @Override
        public StreamObserver<HelloRequest> biStreamingHello(StreamObserver<HelloResponse> responseObserver) {
            return new StreamObserver<HelloRequest>() {
                StringBuilder sb = new StringBuilder();
    
                @Override
                public void onNext(HelloRequest request) {
                    // 클라이언트로부터 데이터가 올 때마다 onNext호출
                    logger.info("Bidirectional Streaming 메시지 왔다: " + request.getClientName());
                    sb.append("Bidirectional Streaming Hello " + request.getClientName())
                            .append("\\n============================\\n");
    
                    // 1초 동안 비즈니스 로직 처리 후에 응답한다고 가정
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    responseObserver.onNext(HelloResponse.newBuilder().setWelcomeMessage(sb.toString()).build());
                    // BiDirectional Streaming 이므로 responseObserver.onNext()를 2회 이상 호출할 수 있음.
    //                responseObserver.onNext(HelloResponse.newBuilder().setWelcomeMessage(sb.toString()).build());
                }
    
                @Override
                public void onError(Throwable t) {
                    logger.log(Level.SEVERE, "BiDirectional Streaming requestObserver.onError() 호출");
                }
    
                @Override
                public void onCompleted() {
                    // 응답 시작 후 1초 후에 응답 완료된다고 가정
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    responseObserver.onCompleted();
                }
            };
        }
    }
    • Server
    public class HelloGrpcServer {
    
        private final Logger logger = Logger.getLogger(HelloGrpcServer.class.getName());
    
        private final int port;
        private final Server server;
    
        public HelloGrpcServer(int port, BindableService service) throws IOException {
            this.port = port;
            this.server = ServerBuilder.forPort(port)
                                       .addService(service)
                                       .build();
        }
    
        public void start() throws IOException, InterruptedException {
            this.server.start();
            logger.info("gRPC Server Listening on port " + port);
            this.server.awaitTermination();
        }
    
        public void shutdown() {
            System.err.println("gRPC 서버 종료..");
            server.shutdown();
            System.err.println("gRPC 서버 종료 완료");
        }
    }
    • ServerRunner
    public class HelloGrpcServerRunner {
    
        public static void main(String[] args) throws IOException, InterruptedException {
    
            final int port = 54321;
            final BindableService helloService = new HelloGrpcServerService();
    
            HelloGrpcServer server = new HelloGrpcServer(port, helloService);
    
            server.start();
    
            Runtime.getRuntime().addShutdownHook(
                    new Thread(() -> server.shutdown())
            );
        }
    }
    • Client
    public class HelloGrpcClient {
    
        private final Logger logger = Logger.getLogger(HelloGrpcClient.class.getName());
    
        private final HelloSpringCampGrpc.HelloSpringCampBlockingStub blockingStub;
        private final HelloSpringCampGrpc.HelloSpringCampStub asyncStub;
        private final HelloSpringCampGrpc.HelloSpringCampFutureStub futureStub;
    
        public HelloGrpcClient(HelloSpringCampGrpc.HelloSpringCampBlockingStub blockingStub,
                               HelloSpringCampGrpc.HelloSpringCampStub asyncStub,
                               HelloSpringCampGrpc.HelloSpringCampFutureStub futureStub) {
            this.blockingStub = blockingStub;
            this.asyncStub = asyncStub;
            this.futureStub = futureStub;
        }
    
        public void sendBlockingUnaryMessage(String clientName) {
            // 클라이언트 비즈니스 로직 수행 결과인 clientName으로 request 생성
            HelloRequest request = HelloRequest.newBuilder().setClientName(clientName).build();
            HelloResponse response;
    
            try {
                logger.info("Unary 서비스 호출, 메시지 [" + clientName + "]");
                response = blockingStub.unaryHello(request);
            } catch (StatusRuntimeException e) {
                logger.log(Level.SEVERE, "Unary 서비스 호출 중 실패: " + e.getStatus());
                return;
            }
    
            logger.info("Unary 서비스 응답: " + response.getWelcomeMessage());
        }
    
        public void sendAsyncUnaryMessage(String clientName) {
            // 클라이언트 비즈니스 로직 수행 결과인 clientName으로 request 생성
            HelloRequest request = HelloRequest.newBuilder().setClientName(clientName).build();
            logger.info("Unary 서비스 Async 호출, 메시지 [" + clientName + "]");
    
            // 서버에 보낼 데이터를 담은 request와 비동기로 응답을 받기 위해 서버에서 호출될 콜백 객체도 함께 파라미터로 전달
            asyncStub.unaryHello(
                    request,
                    // 서버에 보낼 콜백 객체
                    new StreamObserver<HelloResponse>() {
                        @Override
                        public void onNext(HelloResponse response) {
                            logger.info("Async Unary 서버로부터의 응답 " + response.getWelcomeMessage());
                        }
    
                        @Override
                        public void onError(Throwable t) {
                            logger.log(Level.SEVERE, "Async Unary responseObserver.onError() 호출됨");
                        }
    
                        @Override
                        public void onCompleted() {
                            logger.info("Async Unary 서버 응답 completed");
                        }
                    }
            );
    
            // 서버에서 응답이 올 때까지 기다리지 않고, 호출 결과에 상관없이 다른 작업 수행 가능
            logger.info("Nonblocking + Async 이니까 원격 메서드 호출(rpc) 직후 바로 로그가 찍힘.");
        }
    
        public void sendBlockingServerStreamingMessage(String clientName) {
            // 클라이언트 비즈니스 로직 수행 결과인 clientName으로 request 생성
            HelloRequest request = HelloRequest.newBuilder().setClientName(clientName).build();
            Iterator<HelloResponse> responseIterator;
            // streaming이 완료될 때 까지 대기
            try {
                logger.info("Server Streaming Hello 서비스 Blocking 호출, 메시지 [" + clientName + "]");
                responseIterator = blockingStub.serverStreamingHello(request);
            } catch (StatusRuntimeException e) {
                logger.log(Level.SEVERE, "Server Streaming Hello 서비스 Blocking 호출 중 실패: " + e.getStatus());
                return;
            }
            responseIterator.forEachRemaining(
                    (response) -> logger.info("Server Streaming Hello 서비스 응답: " + response.getWelcomeMessage())
            );
        }
    
        public void sendAsyncServerStreamingMessage(String clientName) {
            // 클라이언트 비즈니스 로직 수행 결과인 clientName으로 request 생성
            HelloRequest request = HelloRequest.newBuilder().setClientName(clientName).build();
            Iterator<HelloResponse> responseIterator;
            logger.info("Server Streaming Hello 서비스 Async 호출, 메시지 [" + clientName + "]");
            asyncStub.serverStreamingHello(
                    request,
                    new StreamObserver<HelloResponse>() {
                        @Override
                        public void onNext(HelloResponse response) {
                            logger.info("Async Server Streaming 서버로부터의 응답 " + response.getWelcomeMessage());
                        }
    
                        @Override
                        public void onError(Throwable t) {
                            logger.log(Level.SEVERE, "Async Server Streaming responseObserver.onError() 호출됨");
                        }
    
                        @Override
                        public void onCompleted() {
                            logger.info("Async Server Streaming 서버 응답 completed");
                        }
                    }
            );
    
            // 서버에서 응답이 올 때까지 기다리지 않고, 호출 결과에 상관없이 다른 작업 수행 가능
            logger.info("Nonblocking + Async 이니까 원격 메서드 호출 직후 바로 로그가 찍힘.");
        }
    
        // Client Streaming은 AsyncStub에서만 가능
        public void sendAsyncClientStreamingMessage(List<String> messages) {
            // 서버에 보낼 콜백 객체(비동기로 응답 받을 객체 생성)
            StreamObserver<HelloResponse> responseObserver = new StreamObserver<HelloResponse>() {
                @Override
                public void onNext(HelloResponse value) {
                    logger.info("Async Client Streaming 서버로부터의 응답\\n" + value.getWelcomeMessage());
                }
    
                @Override
                public void onError(Throwable t) {
                    logger.log(Level.SEVERE, "Async Clent Streaming responseObserver.onError() 호출됨");
                }
    
                @Override
                public void onCompleted() {
                    logger.info("Async Client Streaming 서버 응답 completed");
                }
            };
    
            // 비동기로 요청을 보낼 객체 생성 및 응답 받을 객체와 연결
            StreamObserver<HelloRequest> requestObserver = asyncStub.clientStreamingHello(responseObserver);
            try {
                for (String msg: messages) {
                    requestObserver.onNext(HelloRequest.newBuilder().setClientName(msg).build());
                }
            } catch (Exception e) {
                requestObserver.onError(e);
                throw e;
            }
    
            // 서버에서 응답이 올 때까지 기다리지 않고, 호출 결과에 상관없이 다른 작업 수행 가능
            logger.info("Nonblocking + Async이니까 원격 메서드 호출 직후 바로 로그가 찍힘.");
            // 데이터 전송 완료 시그널
            requestObserver.onCompleted();
        }
    
        // Bidirectional Streaming은 AsyncStub에서만 가능
        public void sendBidirectionalStreamingMessage(List<String> messages) {
    
            // 서버에 보낼 콜백 객체(응답 받을 객체 생성)
            StreamObserver<HelloResponse> responseObserver = new StreamObserver<HelloResponse>() {
                @Override
                public void onNext(HelloResponse response) {
                    logger.info("Bidirectional Streaming 서버로부터의 응답\\n" + response.getWelcomeMessage());
                }
    
                @Override
                public void onError(Throwable t) {
                    logger.log(Level.SEVERE, "Bidirectional Streaming responseObserver.onError() 호출됨");
                }
    
                @Override
                public void onCompleted() {
                    logger.info("Bidirectional Streaming 서버 응답 completed");
                }
            };
    
            // 요청 보낼 객체 생성 및 응답 객체와 연결
            StreamObserver<HelloRequest> requestObserver = asyncStub.biStreamingHello(responseObserver);
            try {
                for (String msg: messages) {
                    requestObserver.onNext(HelloRequest.newBuilder().setClientName(msg).build());
                }
            } catch (Exception e) {
                requestObserver.onError(e);
                throw e;
            }
    
            // 서버에서 응답이 올 때까지 기다리지 않고, 호출 결과에 상관없이 다른 작업 수행 가능
            logger.info("Nonblocking + Async이니까 원격 메서드 호출 직후 바로 로그가 찍힘");
            requestObserver.onCompleted();
        }
    }
    • ClientStubFactory
    public class HelloGrpcClientStubFactory {
    
        private final Logger logger = Logger.getLogger(HelloGrpcClientStubFactory.class.getName());
    
        private final ManagedChannel channel;
        private final HelloSpringCampGrpc.HelloSpringCampBlockingStub blockingStub;
        private final HelloSpringCampGrpc.HelloSpringCampStub asyncStub;
        private final HelloSpringCampGrpc.HelloSpringCampFutureStub futureStub;
    
        public HelloGrpcClientStubFactory(String host, int port) {
            this.channel = ManagedChannelBuilder.forAddress(host, port)
                                                .usePlaintext(true)
                                                .build();
            this.blockingStub = HelloSpringCampGrpc.newBlockingStub(channel);
            this.asyncStub = HelloSpringCampGrpc.newStub(channel);
            this.futureStub = HelloSpringCampGrpc.newFutureStub(channel);
        }
    
        public void shutdownChannel() throws InterruptedException {
            logger.info("gRPC Channel shutdown...");
            this.channel.shutdown().awaitTermination(2, TimeUnit.SECONDS);
        }
    
        public HelloSpringCampGrpc.HelloSpringCampBlockingStub getBlockingStub() {
            return blockingStub;
        }
    
        public HelloSpringCampGrpc.HelloSpringCampStub getAsyncStub() {
            return asyncStub;
        }
    
        public HelloSpringCampGrpc.HelloSpringCampFutureStub getFutureStub() {
            return futureStub;
        }
    }
    • ClientRunner
    public class HelloGrpcClientRunner {
    
        public static void main(String[] args) throws InterruptedException {
    
            String host = "localhost";
            int port = 54321;
    
            HelloGrpcClientStubFactory clientStubFactory =
                    new HelloGrpcClientStubFactory(host, port);
    
            HelloGrpcClient grpcClient =
                    new HelloGrpcClient(clientStubFactory.getBlockingStub(),
                                        clientStubFactory.getAsyncStub(),
                                        clientStubFactory.getFutureStub());
    
            // Blocking Unary
            grpcClient.sendBlockingUnaryMessage("Blocking Unary, gㅏ벼운 RPC, gPRC");
            clientStubFactory.shutdownChannel();
    
            // Async Unary
            grpcClient.sendAsyncUnaryMessage("Async Unary, gㅏ벼운 RPC, gRPC");
            Thread.sleep(3000);
            clientStubFactory.shutdownChannel();
    
            // Blocking Server Streaming
            grpcClient.sendBlockingServerStreamingMessage("Blocking Server Streaming, gㅏ벼운 RPC, gPRC");
            clientStubFactory.shutdownChannel();
    
            // Async Server Streaming
            grpcClient.sendAsyncServerStreamingMessage("Async Server Streaming, gㅏ벼운 RPC, gRPC");
            Thread.sleep(3000);
            clientStubFactory.shutdownChannel();
    
            // Async Client Streaming
            grpcClient.sendAsyncClientStreamingMessage(Arrays.asList("Async Client Streaming,", "gㅏ벼운 RPC,", "gRPC"));
            Thread.sleep(3000);
            clientStubFactory.shutdownChannel();
            // 아래와 같이 1초만 대기해서 서버 응답 전에 channel을 닫으면
            // 클라이언트는 응답을 못 받으며, 서버 쪽에서도 에러가 발생하지 않는다.
            Thread.sleep(1000);
            clientStubFactory.shutdownChannel();
    
            // Bidirectional Client Streaming
            grpcClient.sendBidirectionalStreamingMessage(Arrays.asList("Async Bidirectional Streaming,", "gㅏ벼운 RPC,", "gRPC"));
            Thread.sleep(3000);
            clientStubFactory.shutdownChannel();
        }

     

     

     

     

     

Designed by Tistory.