ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Grpc Service의 호출오류가 Grpc Client 헬스체크 실패를 야기하는 문제 해결
    Spring 2024. 11. 19. 21:15

     

     

     

     

     

     

    하나의 서비스에 GrpcService를 구현하고 이제 이를 사용하는 별도 서비스의 GrpcClient에 구현된 기능을 호출하도록 구현하였는데, 

    GrpcService의 오류만으로도 GrpcClient 서버의 헬스체크가 실패하는 현상이 발생했다. 

    아니, Grpc 양반... 이게 무슨일이오

     

     

    해당 문제의 원인을 파악하고 문제를 해결한 글을 남긴다!

     

     

     

     

    문제 상황 

     

    서두에 말한것과 같이 별도의 서버에 구성된 GrpcClient, GrpcService가 존재한다. 두 서버는 별도의 서버로 서로에게 영향을 주어서는 안될 것으로 생각했는데, GrpcService가 장애 상황이거나 응답할 수 없는 상황에 장애가 전파되어 GrpcClient까지 헬스체크가 실패해서 livenessProbe, readinessProbe에 모두 503 응답으로 ServiceUnavailable 상황이 되었다. 

     

    단, 두번의 호출만으로 service unavailable 사태가 되어버린 grpc client
    pod의 health check 실패

     

    actuator 상태를 확인확인한 후 status가 OutOfService임을 확인할 수 있었다. 

    나는 왜 GrpcService 장애가 GrpcClient에 영향을 주는지 궁금했다...! 

     

     

     

    원인 파악 

    처음에는 blockingStub의 사용으로 인한 OOM이나 deadlock이 발생되었는가를 의심해보았지만 그정도의 트래픽을 주지 않았기도 했고, 메트릭을 보았을 때 특별하게 메모리나 cpu가 증가하지도 않았기 때문에 해당 문제로는 보이지 않았다. 

     

    그렇게 공식 문서를 확인하던중 다음의 항목을 찾게 되었다. 

     

    Configuration

    Spring Boot starter module for gRPC framework.

    yidongnan.github.io

    어라...?

     

    요약하자면 해당 서버는 net.devh의 grpc-spring-boot-starter를 사용중이었는데,(https://yidongnan.github.io/grpc-spring-boot-starter) 해당 라이브러리의  GrpcClientHealthAutoConfiguration이 actuator 지표중인 grpcChannel의 지표가 하나라도 TRANSIENT_FAILURE인 상태를 가진다면, 서비스 상태를 OUT_OF_SERVICE 상태로 바꿔버린다 는 것이었다. 

     

     

    그렇다면 GrpcClientHealthAutoConfiguration의 코드를 살펴보자. 

     

    grpc-spring/grpc-client-spring-boot-starter/src/main/java/net/devh/boot/grpc/client/autoconfigure/GrpcClientHealthAutoConfigurat

    Spring Boot starter module for gRPC framework. Contribute to grpc-ecosystem/grpc-spring development by creating an account on GitHub.

    github.com

    package net.devh.boot.grpc.client.autoconfigure;
    
    import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator;
    import org.springframework.boot.actuate.health.Health;
    import org.springframework.boot.actuate.health.HealthIndicator;
    import org.springframework.boot.autoconfigure.AutoConfigureAfter;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Lazy;
    
    import com.google.common.collect.ImmutableMap;
    
    import io.grpc.ConnectivityState;
    import net.devh.boot.grpc.client.channelfactory.GrpcChannelFactory;
    
    /**
     * Auto configuration class for Spring-Boot. This allows zero config client health status updates for gRPC services.
     *
     * @author Daniel Theuke (daniel.theuke@heuboe.de)
     */
    @Configuration(proxyBeanMethods = false)
    @AutoConfigureAfter(GrpcClientAutoConfiguration.class)
    @ConditionalOnEnabledHealthIndicator("grpcChannel")
    @ConditionalOnClass(name = "org.springframework.boot.actuate.health.HealthIndicator")
    public class GrpcClientHealthAutoConfiguration {
    
        /**
         * Creates a HealthIndicator based on the channels' {@link ConnectivityState}s from the underlying
         * {@link GrpcChannelFactory}.
         *
         * @param factory The factory to derive the connectivity states from.
         * @return A health indicator bean, that uses the following assumption
         *         <code>DOWN == states.contains(TRANSIENT_FAILURE)</code>.
         */
        @Bean
        @Lazy
        public HealthIndicator grpcChannelHealthIndicator(final GrpcChannelFactory factory) {
            return () -> {
                final ImmutableMap<String, ConnectivityState> states = ImmutableMap.copyOf(factory.getConnectivityState());
                final Health.Builder health;
                if (states.containsValue(ConnectivityState.TRANSIENT_FAILURE)) {
                    health = Health.outOfService();
                } else {
                    health = Health.up();
                }
                return health.withDetails(states)
                        .build();
            };
        }
    
    }

     

     

    코드에서 보인다시피, GrpcChannelFactory에서 연결상태가 하나라도 TRANSIENT_FAILURE상태라면 헬스 상태를 OUT_OF_SERVICE 상태로 바꿔버리는 것을 확인할 수 있다. 

     

     

    찾았다...!!!

     

     

    참고로 연결상태의 종류는 다음과 같다. 

     

    ConnectivityState (grpc-all 1.68.1 API)

    TRANSIENT_FAILURE public static final ConnectivityState TRANSIENT_FAILURE There has been some transient failure (such as a TCP 3-way handshake timing out or a socket error). Channels in this state will eventually switch to the CONNECTING state and try to

    grpc.github.io

    상태 설명 특징
    IDLE
    • 채널이 현재 사용되지 않고 있으며, 어떤 연결도 활성화되지 않은 상태
    • 이 상태는 채널이 처음 생성되었거나, 마지막 RPC가 종료된 후 일정 시간이 지나면 전환
    • 이 상태에서는 클라이언트가 새 RPC를 보내는 순간, 채널은 자동으로 CONNECTING 상태로 전환
    • 연결 유지 비용을 줄이기 위한 상태
    • 클라이언트가 아직 서버와 적극적으로 연결을 시도하지 않음
    CONNECTING
    • 채널이 서버와 연결을 설정하려고 시도 중인 상태
    • 새 RPC 요청이 도착하거나, 기존 연결이 끊어진 후에 다시 연결하려는 시도에서 해당 상태가 발생
    • 연결이 성공하면 READY로 전환
    • 실패 시 재시도 정책에 따라 재연결을 시도하거나 TRANSIENT_FAILURE로 전환
    READY
    • 채널이 활성 상태이고, 서버와의 연결이 성공적으로 이루어진 상태
    • 이 상태에서는 RPC 호출을 즉시 수행할 수 있음
    • 정상적인 작업 상태
    • gRPC 호출을 처리할 준비 완료
    TRANSIENT_FAILURE
    • 채널이 일시적인 장애를 겪고 있으며, 연결을 설정하거나 유지하는 데 문제가 발생한 상태
    • 이 상태는 오류가 치명적이지 않다고 간주되며, 클라이언트는 자동으로 재시도를 시도
    • 장애 발생 시 연결이 일시적으로 실패한 상태
    • 재연결을 시도하기 전에 대기 시간이 있을 수 있음
     
    SHUTDOWN
    • 채널이 종료된 상태로, 더 이상 사용이 불가능
    • 이 상태는 클라이언트가 채널을 명시적으로 닫거나(예: shutdown() 호출), 리소스가 해제될 때 발생
    • 모든 RPC 호출이 즉시 실패
    • 채널의 수명 주기가 종료됨

     

     

     

     

     

    문제 재현

    그렇다면 디버깅으로 상태를 확인해보자.

     

    문제재현 1)

    먼저 기존에 GrpcService 를 호출하지 않은 상태에서 GrpcClient가 뜨고 난 이후 서버는 actuator호출시 연결 상태가 IDLE을 확인할 수 있었다. 

     

    이후에 grpcService 호출에 실패(IO Exception 발생시) 했을때, 다음과 같이 TRANSIENT_FAILURE 상태로 변경되는 것을 확인할 수 있었다. 

     

    grpc-ui를 통해 grpcClient의 grpcService를 사용하는 관련 로직을 호출 했을때 받은 응답

     

     

     

     

     

    문제재현 2)

    그런데 GrpcService에 Deadline을 적용함으로서 문제가 해결이 되는 것처럼 보였다.

    그렇다면 Deadline을 적용하여 서비스를 호출하는 경우 어떻게 될까?

    다음과 같이 GrpcService에 Deadline을 적용해서 호출해보자. 

    나는 다음과 같이 확장함수를 구현하여 deadline을 적용한 서비스 호출이 가능하게 적용하였다. (pseudo 코드) 

        // deadline 적용 확장함수 구현 
        fun <T, S :AbstractStub<S>> S.call(
            deadline: Long,
            action: (S) -> T,
        ): T {
            return this.withDeadlineAfter(deadline, TimeUnit.MILLISECONDS).let(action)
        }
        
        
        @GrpcClient("grpc-service")
        private lateinit var grpcService: GrpcServiceGrpc.GrpcServiceBlockingStub
    
        val deadline = 3L
    
        fun test(): GrpcResponse? {
        	// deadline을 적용하여 호출 
            return grpcService.call(deadline) {
                it.test(
                    GrpcRequest.newBuilder().apply {
                        name = GrpcRequest.NameSpec.newBuilder().apply {
                            addName("test")
                        }.build()
                    }.build()
                ).firstOrNull()
    
            }
        }

     

    먼저 기존에 GrpcService 를 호출하지 않은 상태에서 GrpcClient가 뜨고 난 이후 서버는 actuator호출시 연결 상태가 IDLE을 확인할 수 있었다. 

     

    그 다음, 호출했을때 응답은 DEADLINE_EXCEEDED로 받으면서, ConnectivityState는 CONNECTING으로 상태 변경이 되면서 연결맺기를 시도하는 것을 확인할 수 있었다. 

     

     

     

     

     

     

     

     

    문제재현 3)

    그렇다면 먼저 연결이 된 상태에서는 상태가 어떻게 변하게 될까?

    먼저 데드라인이 적용되지 않은 상태에서 먼저 호출해서 성공한 이후 서비스 문제로 재호출에 실패하는 케이스를 테스트해보았다. 

    먼저 호출에 성공했을때는 다음과 같이 READY 상태로 응답하는 것을 확인할 수 있다. 

     

     

    이 상태에서 서비스 연결을 끊고 호출을 하게 되면 다음과 같은 UNAVAILABLE 응답을 받게되고, 연결 상태가 TRANSIENT_FAILURE로 변경되는 것을 확인할 수 있었다.

    io.grpc.StatusRuntimeException: UNAVAILABLE: io exception
    	at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:271) ~[grpc-stub-1.46.1.jar:1.46.1]
    	at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:252) ~[grpc-stub-1.46.1.jar:1.46.1]
    	at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:165) ~[grpc-stub-1.46.1.jar:1.46.1]
    	at kr.socar.grpc.apis.accounts.member.v1.MemberServiceGrpc$MemberServiceBlockingStub.getMemberWithDriverLicense(MemberServiceGrpc.java:772) ~[main/:na]
    	at kr.socar.client.AccountsMemberGrpcClient.findByUserId(AccountsMemberGrpcClient.kt:34) ~[main/:na]
    	at kr.socar.dao.MemberGrpcDAO.findByUserId(MemberGrpcDAO.kt:18) ~[main/:na]
    	at kr.socar.dao.MemberGrpcDAO$$FastClassBySpringCGLIB$$8c0d7d21.invoke(<generated>) ~[main/:na]

     

     

    actuator/health 요청 응답으로 받은결과는 다음과 같다. 

    status가 OUT_OF_SERVICE이고, grpcChnnel항목에 TRANSIENT_FAILURE 항목을 가지고 있는 것을 확인할 수 있다. 

    {
        "status": "OUT_OF_SERVICE",
        "components": {
            "db": {
                "status": "UP",
                "components": {
                    "dataSource": {
                        "status": "UP",
                        "details": {
                            "database": "",
                            "validationQuery": "isValid()"
                        }
                    },
                    "masterDataSource": {
                        "status": "UP",
                        "details": {
                            "database": "",
                            "validationQuery": "isValid()"
                        }
                    },
                    "readonlyDataSource": {
                        "status": "UP",
                        "details": {
                            "database": "",
                            "validationQuery": "isValid()"
                        }
                    }
                }
            },
            "diskSpace": {
                "status": "UP",
                "details": {
                    "total": ,
                    "free": ,
                    "threshold": ,
                    "exists": true
                }
            },
            "grpcChannel": {
                "status": "OUT_OF_SERVICE",
                "details": {
                    "other-grpc-services": "IDLE",
                    "test-grpc-services": "TRANSIENT_FAILURE"
                }
            },
            "livenessState": {
                "status": "UP"
            },
            "ping": {
                "status": "UP"
            },
            "readinessState": {
                "status": "UP"
            },
            "refreshScope": {
                "status": "UP"
            }
        },
        "groups": [
            "liveness",
            "readiness"
        ]
    }

     

     

     

    문제재현 4)

    그렇다면 데드라인이 적용된 상태에서 먼저 호출해서 성공한 이후 서비스 문제로 재호출에 실패하면 어떻게 될까?

    먼저 호출에 성공했을때는 다음과 같이 READY 상태로 응답하는 것을 확인할 수 있다. 



    그 다음의 서비스의 DeadLine Exception 발생을 확인하였다. 

     

    상태가 READY인 상태로 유지되는 것을 확인할 수 있었다!

     

     

     

     

     

    참고) 💡
    Deadline Exception을 테스트하고 싶을때에는 terminal에서 kubectl로 라우팅하다가 ctrl+z를 통해서 포트는 연결되어있지만, 실제로는 동작하지 않는 상태로(suspend), IO Exception을 테스트하고 싶을때는 ctrl+c를 통해서 실제 서비스를 종료(termination) 시켜버렸다. 
    - https://askubuntu.com/questions/510811/what-is-the-difference-between-ctrl-z-and-ctrl-c-in-the-terminal




     

     

     

    문제 재현 결과 

    처음에 연결이 이미 맺어지지 않아 상태값이 IDLE 일때는

    1. IO Exception을 받는 경우 (UNAVAILABLE) TRANSIENT_FAILURE 상태로 변경
    2. 데드라인 설정이 되어있고 파드 자체의 문제가 아닌 DeadlineExcepiton을 받는 경우 CONNECTING 상태로 변경

    으로 변경되는 것을 확인할 수 있었다.

    이미 연결이 맺어져 상태값이 READY 일때는

    1. IO Exception을 받는 경우 (UNAVAILABLE) TRANSIENT_FAILURE 상태로 변경
    2. 데드라인 설정이 되어있고 파드 자체의 문제가 아닌 DeadlineExcepiton을 받는 경우 READY 상태 유지

    으로 변경되는 것을 확인할 수 있었다.

     

    결론적으로 GrpcService가 DeadlineException을 발생시키는 경우 (이미 서비스가 요청은 받았고 timeout이 발생되는 것으로 판단되는 경우)에는 /actuator/health 체크에 실패하지 않고 성공하지만, IOException을 발생시키는 경우 곧바로 TRANSIENT_FAILURE 상태로 변경되어 헬스체크에 실패하는 것을 확인할 수 있었다.

     

    참고로 Deadline의 경우 기본적으로 설정이 없으면 무한정 기다리기 때문에  설정을 안해두면 긴 시간동안 메모리 등의 리소스를 사용하게 되어 서비스가 지연되거나, 심지어 서비스가 사용 불가능한 상태가 될 수 있다. 꼭 설정해두도록 하자. 

     

    Deadlines

    Explains how deadlines can be used to effectively deal with unreliable backends.

    grpc.io

     

    Deadlines on the Client

    By default, gRPC does not set a deadline which means it is possible for a client to end up waiting for a response effectively forever …

     

     

     

     

     

    문제 해결 

     

    그렇다면 문제를 어떻게 해결할 수 있을까?

     

     

    해결방안 1) actuator의 grpcChannel 사용하지 않기 

    application.yaml 파일에 grpcChannel 항목을 사용하지 않도록 설정할 수 있다. 

     

    When grpc client is deployed in Kubernetes, what is the usecase of HealthIndicator (actuator/health) that reports the service as

    The context I have deployed microservice on Kubernetes that has configured liveness probe that uses the spring actuator's endpoint to check the health of microservice. If the livenesss probe return...

    github.com

    management:
      health:
        grpcChannel:
          enabled: false

     

    하지만 해당 방법의 경우 grpcChannel의 항목을 사용하지 않아 grpcChannel상태를 확인하기 어렵다. 

     

     

    해결방안 2) GrpcClientHealthAutoConfiguration을 사용하지 않기 

    다음의 방안을 적용하여 서비스 헬스 상태를 OutOfService로 만들어버리는 GrpcClientHealthAutoConfiguration을 사용하지 않도록 설정할 수 있다. 

    위 공식문서에서 보았듯이, 해당 GrpcClientHealthAutoConfiguration 설정을 제외함으로서 문제를 해결할 수 있다. 

    // 1. config 파일을 통해 적용  
    @Configuration
    @EnableAutoConfiguration(exclude = {GrpcClientHealthAutoConfiguration.class})
    
    // 2. SpringApplication과 분리되어있지 않다면 다음과 같이도 적용 가능 
    @SpringApplication(exclude = {...})

     

    하지만, 해당 방법 또한 grpcChannel상태를 확인할 수 없다는 것은 동일하다. 

     

    /actuator/health의 호출 결과는 다음과 같다. (grpcChannel 상태정보가 없음) 

     

     

     

    해결방안 3) Custom GrpcClientHealthConfiguration을 만들기

    서비스의 health상태를 변경하지 않으면서 grpcChannel의 상태를 확인하고 싶다면 다음과 같이 custom한 GrpcClientHealthConfiguration을 만들어 적용하여 문제를 확인할 수 있다. 

    package com.yunhalee.grpcclient.config.client
    
    import com.google.common.collect.ImmutableMap
    import net.devh.boot.grpc.client.autoconfigure.GrpcClientAutoConfiguration
    import net.devh.boot.grpc.client.channelfactory.GrpcChannelFactory
    import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator
    import org.springframework.boot.actuate.health.Health
    import org.springframework.boot.actuate.health.HealthIndicator
    import org.springframework.boot.autoconfigure.AutoConfigureAfter
    import org.springframework.boot.autoconfigure.condition.ConditionalOnClass
    import org.springframework.context.annotation.Bean
    import org.springframework.context.annotation.Configuration
    import org.springframework.context.annotation.Lazy
    
    @Configuration(proxyBeanMethods = false)
    @AutoConfigureAfter(GrpcClientAutoConfiguration::class)
    @ConditionalOnEnabledHealthIndicator("grpcChannel")
    @ConditionalOnClass(name = ["org.springframework.boot.actuate.health.HealthIndicator"])
    class GrpcClientHealthConfiguration {
    
        // 기존에 grpcChannel이 하나라도 TRANSIENT_FAILURE 상태일 경우 outOfService로 설정했던 것과 다르게, up인 상태에서 grpc channel의 state만 actuator 정보에 추가
        @Bean
        @Lazy
        fun grpcChannelHealthIndicator(factory: GrpcChannelFactory): HealthIndicator {
            return HealthIndicator {
                val states = ImmutableMap.copyOf(factory.connectivityState)
                val health: Health.Builder = Health.up()
                health.withDetails(states).build()
            }
        }
    }

     

    /actuator/health 호출시 다음과 같이 service의 status는 UP이면서 grpcChannel 상태 정보를 확인할 수 있는 것을 볼 수 있다. 

     

     

     

     

     

    문제해결 끝-!! ✨✨✨✨✨

     

     

     

     

    참고한 사이트 ) ✨



     

     

    추가 참고 내용 

    +) 그렇다면 grpc 채널 상태 정보는 어떻게 설정되는 것일까?

     

    grpc-spring-boot-starter/grpc-client-spring-boot-autoconfigure/src/main/java/net/devh/boot/grpc/client/channelfactory/InProcessO

    Spring Boot starter module for gRPC framework. Contribute to yidongnan/grpc-spring-boot-starter development by creating an account on GitHub.

    github.com

     

    참고 문서를 찾아 ChannelFactory의 상태값은 inProcessChannelFactoryGrpcChannelFactory를 통해 상태값이 활용되는 것을 찾았다.

        @Override
        public Map<String, ConnectivityState> getConnectivityState() {
            return ImmutableMap.<String, ConnectivityState>builder()
                    .putAll(inProcessChannelFactory.getConnectivityState())
                    .putAll(alternativeChannelFactory.getConnectivityState())
                    .build();
        }

     

     

    InProcessChannelFactory의 경우 AbstractChannelFactory를 exteneds하고 있고

     

    grpc-spring-boot-starter/grpc-client-spring-boot-autoconfigure/src/main/java/net/devh/boot/grpc/client/channelfactory/InProcessC

    Spring Boot starter module for gRPC framework. Contribute to yidongnan/grpc-spring-boot-starter development by creating an account on GitHub.

    github.com

     

     

    AbstractChannelFactory 에서는 채널 상태를 다음과 같이 관리하고 있었다.

     

    grpc-spring-boot-starter/grpc-client-spring-boot-autoconfigure/src/main/java/net/devh/boot/grpc/client/channelfactory/AbstractCh

    Spring Boot starter module for gRPC framework. Contribute to yidongnan/grpc-spring-boot-starter development by creating an account on GitHub.

    github.com

     

     

      private final Map<String, ConnectivityState> channelStates = new ConcurrentHashMap<>();
    
        
        
        @Override
        public Map<String, ConnectivityState> getConnectivityState() {
            return Collections.unmodifiableMap(this.channelStates);
        }
    
    
        /**
         * Watch the given channel for connectivity changes.
         *
         * @param name The name of the channel in the state overview.
         * @param channel The channel to watch the state of.
         */
        protected void watchConnectivityState(final String name, final ManagedChannel channel) {
            final ConnectivityState state = channel.getState(false);
            this.channelStates.put(name, state);
            if (state != ConnectivityState.SHUTDOWN) {
                channel.notifyWhenStateChanged(state, () -> watchConnectivityState(name, channel));
            }
        }
        
        /**
         * Closes this channel factory and the channels created by this instance. The shutdown happens in two phases, first
         * an orderly shutdown is initiated on all channels and then the method waits for all channels to terminate. If the
         * channels don't have terminated after 60 seconds then they will be forcefully shutdown.
         */
        @Override
        @PreDestroy
        public synchronized void close() {
            if (this.shutdown) {
                return;
            }
            this.shutdown = true;
            final List<ShutdownRecord> shutdownEntries = new ArrayList<>();
            for (final Entry<String, ManagedChannel> entry : this.channels.entrySet()) {
                final ManagedChannel channel = entry.getValue();
                channel.shutdown();
                final long gracePeriod = this.properties.getChannel(entry.getKey()).getShutdownGracePeriod().toMillis();
                shutdownEntries.add(new ShutdownRecord(entry.getKey(), channel, gracePeriod));
            }
            try {
                final long start = System.currentTimeMillis();
                shutdownEntries.sort(comparingLong(ShutdownRecord::getGracePeriod));
    
                for (final ShutdownRecord entry : shutdownEntries) {
                    if (!entry.channel.isTerminated()) {
                        log.debug("Awaiting channel termination: {}", entry.name);
    
                        final long waitedTime = System.currentTimeMillis() - start;
                        final long waitTime = entry.gracePeriod - waitedTime;
    
                        if (waitTime > 0) {
                            entry.channel.awaitTermination(waitTime, MILLISECONDS);
                        }
                        entry.channel.shutdownNow();
                    }
                    log.debug("Completed channel termination: {}", entry.name);
                }
            } catch (final InterruptedException e) {
                Thread.currentThread().interrupt();
                log.debug("We got interrupted - Speeding up shutdown process");
            } finally {
                for (final ManagedChannel channel : this.channels.values()) {
                    if (!channel.isTerminated()) {
                        log.debug("Channel not terminated yet - force shutdown now: {} ", channel);
                        channel.shutdownNow();
                    }
                }
            }
            final int channelCount = this.channels.size();
            this.channels.clear();
            this.channelStates.clear();
            log.debug("GrpcChannelFactory closed (including {} channels)", channelCount);
        }

     

     

    그런데 ManagedChannel의 경우 getState가 Deprecated 되어있어서 

      /**
       * Gets the current connectivity state. Note the result may soon become outdated.
       *
       * <p>Note that the core library did not provide an implementation of this method until v1.6.1.
       *
       * @param requestConnection if {@code true}, the channel will try to make a connection if it is
       *        currently IDLE
       * @throws UnsupportedOperationException if not supported by implementation
       * @since 1.1.0
       */
      @ExperimentalApi("https://github.com/grpc/grpc-java/issues/4359")
      public ConnectivityState getState(boolean requestConnection) {
        throw new UnsupportedOperationException("Not implemented");
      }

     

     

    그래서 다시 grpc 코드로 찾아갔다.

    다시 grpc 코드로 보면 ConnectivityStateInfo 라는 클래스가 있고 여기서 state를 관리하는 것으로 확인하였다.

     

    grpc-java/api/src/main/java/io/grpc/ConnectivityStateInfo.java at master · grpc/grpc-java

    The Java gRPC implementation. HTTP/2 based RPC. Contribute to grpc/grpc-java development by creating an account on GitHub.

    github.com

     

    package io.grpc;
    
    import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
    
    import com.google.common.base.Preconditions;
    
    /**
     * A tuple of a {@link ConnectivityState} and its associated {@link Status}.
     *
     * <p>If the state is {@code TRANSIENT_FAILURE}, the status is never {@code OK}.  For other states,
     * the status is always {@code OK}.
     */
    @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1771")
    public final class ConnectivityStateInfo {
      private final ConnectivityState state;
      private final Status status;
    
      /**
       * Returns an instance for a state that is not {@code TRANSIENT_FAILURE}.
       *
       * @throws IllegalArgumentException if {@code state} is {@code TRANSIENT_FAILURE}.
       */
      public static ConnectivityStateInfo forNonError(ConnectivityState state) {
        Preconditions.checkArgument(
            state != TRANSIENT_FAILURE,
            "state is TRANSIENT_ERROR. Use forError() instead");
        return new ConnectivityStateInfo(state, Status.OK);
      }
    
      /**
       * Returns an instance for {@code TRANSIENT_FAILURE}, associated with an error status.
       */
      public static ConnectivityStateInfo forTransientFailure(Status error) {
        Preconditions.checkArgument(!error.isOk(), "The error status must not be OK");
        return new ConnectivityStateInfo(TRANSIENT_FAILURE, error);
      }
    
      /**
       * Returns the state.
       */
      public ConnectivityState getState() {
        return state;
      }
    
      /**
       * Returns the status associated with the state.
       *
       * <p>If the state is {@code TRANSIENT_FAILURE}, the status is never {@code OK}.  For other
       * states, the status is always {@code OK}.
       */
      public Status getStatus() {
        return status;
      }
    
      @Override
      public boolean equals(Object other) {
        if (!(other instanceof ConnectivityStateInfo)) {
          return false;
        }
        ConnectivityStateInfo o = (ConnectivityStateInfo) other;
        return state.equals(o.state) && status.equals(o.status);
      }
    
      @Override
      public int hashCode() {
        return state.hashCode() ^ status.hashCode();
      }
    
      @Override
      public String toString() {
        if (status.isOk()) {
          return state.toString();
        }
        return state + "(" + status + ")";
      }
    
      private ConnectivityStateInfo(ConnectivityState state, Status status) {
        this.state = Preconditions.checkNotNull(state, "state is null");
        this.status = Preconditions.checkNotNull(status, "status is null");
      }
    }

     


    그럼 헬스체크에 관한 상태 변경은 누가할까?

    ManageChannelImpl를 살펴보았다.

     

    grpc-java/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java at e58c998a42d07a8745cfd5ed19f4e211a4400938 · grpc/grpc-j

    The Java gRPC implementation. HTTP/2 based RPC. Contribute to grpc/grpc-java development by creating an account on GitHub.

    github.com

     

     

    여기서 다음과 같이 idle 타임을 종료할 때, CONNECTING 상태로 변경하고 idle 타임을 시작할 때 IDLE 상태로 변경하고 서비스 종료시 SHUTDOWN 상태로 변경 및 로드밸런싱의 요청에 따라서 상태를 변경하는 것을 확인할 수 있었다.

     

      @VisibleForTesting
      void exitIdleMode() {
        syncContext.throwIfNotInThisSynchronizationContext();
        if (shutdown.get() || panicMode) {
          return;
        }
        if (inUseStateAggregator.isInUse()) {
          // Cancel the timer now, so that a racing due timer will not put Channel on idleness
          // when the caller of exitIdleMode() is about to use the returned loadBalancer.
          cancelIdleTimer(false);
        } else {
          // exitIdleMode() may be called outside of inUseStateAggregator.handleNotInUse() while
          // isInUse() == false, in which case we still need to schedule the timer.
          rescheduleIdleTimer();
        }
        if (lbHelper != null) {
          return;
        }
        channelLogger.log(ChannelLogLevel.INFO, "Exiting idle mode");
        LbHelperImpl lbHelper = new LbHelperImpl();
        lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper);
        // Delay setting lbHelper until fully initialized, since loadBalancerFactory is user code and
        // may throw. We don't want to confuse our state, even if we will enter panic mode.
        this.lbHelper = lbHelper;
    
        channelStateManager.gotoState(CONNECTING);
        NameResolverListener listener = new NameResolverListener(lbHelper, nameResolver);
        nameResolver.start(listener);
        nameResolverStarted = true;
      }
      
      
      // Must be run from syncContext
      private void enterIdleMode() {
        // nameResolver and loadBalancer are guaranteed to be non-null.  If any of them were null,
        // either the idleModeTimer ran twice without exiting the idle mode, or the task in shutdown()
        // did not cancel idleModeTimer, or enterIdle() ran while shutdown or in idle, all of
        // which are bugs.
        shutdownNameResolverAndLoadBalancer(true);
        delayedTransport.reprocess(null);
        channelLogger.log(ChannelLogLevel.INFO, "Entering IDLE state");
        channelStateManager.gotoState(IDLE);
        // If the inUseStateAggregator still considers pending calls to be queued up or the delayed
        // transport to be holding some we need to exit idle mode to give these calls a chance to
        // be processed.
        if (inUseStateAggregator.anyObjectInUse(pendingCallsInUseObject, delayedTransport)) {
          exitIdleMode();
        }
      }
    
    
      /**
       * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately
       * cancelled.
       */
      @Override
      public ManagedChannelImpl shutdown() {
        channelLogger.log(ChannelLogLevel.DEBUG, "shutdown() called");
        if (!shutdown.compareAndSet(false, true)) {
          return this;
        }
        final class Shutdown implements Runnable {
          @Override
          public void run() {
            channelLogger.log(ChannelLogLevel.INFO, "Entering SHUTDOWN state");
            channelStateManager.gotoState(SHUTDOWN);
          }
        }
    
        syncContext.execute(new Shutdown());
        realChannel.shutdown();
        final class CancelIdleTimer implements Runnable {
          @Override
          public void run() {
            cancelIdleTimer(/* permanent= */ true);
          }
        }
    
        syncContext.execute(new CancelIdleTimer());
        return this;
      }
    
    
      // Called from syncContext
      @VisibleForTesting
      void panic(final Throwable t) {
        if (panicMode) {
          // Preserve the first panic information
          return;
        }
        panicMode = true;
        try {
          cancelIdleTimer(/* permanent= */ true);
          shutdownNameResolverAndLoadBalancer(false);
        } finally {
          updateSubchannelPicker(new LoadBalancer.FixedResultPicker(PickResult.withDrop(
              Status.INTERNAL.withDescription("Panic! This is a bug!").withCause(t))));
          realChannel.updateConfigSelector(null);
          channelLogger.log(ChannelLogLevel.ERROR, "PANIC! Entering TRANSIENT_FAILURE");
          channelStateManager.gotoState(TRANSIENT_FAILURE);
        }
      }
      
      
      
      
        @Override
        public void updateBalancingState(
            final ConnectivityState newState, final SubchannelPicker newPicker) {
          syncContext.throwIfNotInThisSynchronizationContext();
          checkNotNull(newState, "newState");
          checkNotNull(newPicker, "newPicker");
          final class UpdateBalancingState implements Runnable {
            @Override
            public void run() {
              if (LbHelperImpl.this != lbHelper || panicMode) {
                return;
              }
              updateSubchannelPicker(newPicker);
              // It's not appropriate to report SHUTDOWN state from lb.
              // Ignore the case of newState == SHUTDOWN for now.
              if (newState != SHUTDOWN) {
                channelLogger.log(
                    ChannelLogLevel.INFO, "Entering {0} state with picker: {1}", newState, newPicker);
                channelStateManager.gotoState(newState);
              }
            }
          }
    
          syncContext.execute(new UpdateBalancingState());
        }

     

     

     

    위에서 상태를 업데이트하는 것updateBalancingState 함수를 사용하는 밸런서 구현체에 따라 달라지는 것으로 보여졌다.

     

    GitHub · Build and ship software on a single, collaborative platform

    Join the world's most widely adopted, AI-powered developer platform where millions of developers, businesses, and the largest open source community build software that advances humanity.

    github.com

     

    GrpcClient의 경우 따로 채널을 설정해주지 않으면 pick_first 방식을 사용하는것으로 확인하였다.

     

    Custom Load Balancing Policies

    Explains how custom load balancing policies can help optimize load balancing under unique circumstances.

    grpc.io

     

    PickFirstLoadBalancer는 다음과 같았다.

     

    grpc-java/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java at e58c998a42d07a8745cfd5ed19f4e211a4400938 · grpc/grp

    The Java gRPC implementation. HTTP/2 based RPC. Contribute to grpc/grpc-java development by creating an account on GitHub.

    github.com

    @Override
      public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
        List<EquivalentAddressGroup> servers = resolvedAddresses.getAddresses();
        if (servers.isEmpty()) {
          Status unavailableStatus = Status.UNAVAILABLE.withDescription(
                  "NameResolver returned no usable address. addrs=" + resolvedAddresses.getAddresses()
                          + ", attrs=" + resolvedAddresses.getAttributes());
          handleNameResolutionError(unavailableStatus);
          return unavailableStatus;
        }
    
        // We can optionally be configured to shuffle the address list. This can help better distribute
        // the load.
        if (resolvedAddresses.getLoadBalancingPolicyConfig() instanceof PickFirstLoadBalancerConfig) {
          PickFirstLoadBalancerConfig config
              = (PickFirstLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
          if (config.shuffleAddressList != null && config.shuffleAddressList) {
            servers = new ArrayList<EquivalentAddressGroup>(servers);
            Collections.shuffle(servers,
                config.randomSeed != null ? new Random(config.randomSeed) : new Random());
          }
        }
    
        if (subchannel == null) {
          final Subchannel subchannel = helper.createSubchannel(
              CreateSubchannelArgs.newBuilder()
                  .setAddresses(servers)
                  .build());
          subchannel.start(new SubchannelStateListener() {
              @Override
              public void onSubchannelState(ConnectivityStateInfo stateInfo) {
                processSubchannelState(subchannel, stateInfo);
              }
            });
          this.subchannel = subchannel;
    
          // The channel state does not get updated when doing name resolving today, so for the moment
          // let LB report CONNECTION and call subchannel.requestConnection() immediately.
          updateBalancingState(CONNECTING, new Picker(PickResult.withSubchannel(subchannel)));
          subchannel.requestConnection();
        } else {
          subchannel.updateAddresses(servers);
        }
    
        return Status.OK;
      }
    
    
      @Override
      public void handleNameResolutionError(Status error) {
        if (subchannel != null) {
          subchannel.shutdown();
          subchannel = null;
        }
    
        // NB(lukaszx0) Whether we should propagate the error unconditionally is arguable. It's fine
        // for time being.
        updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(error)));
      }
      
        private void updateBalancingState(ConnectivityState state, SubchannelPicker picker) {
        currentState = state;
        helper.updateBalancingState(state, picker);
      }

     

    또다른 예시로 rount_robinRoundRobinLoadBalancer에서 상태를 바꾸는 쪽은 다음과 같았다.

      /**
       * Updates picker with the list of active subchannels (state == READY).
       */
      @Override
      protected void updateOverallBalancingState() {
        List<ChildLbState> activeList = getReadyChildren();
        if (activeList.isEmpty()) {
          // No READY subchannels
    
          // RRLB will request connection immediately on subchannel IDLE.
          boolean isConnecting = false;
          for (ChildLbState childLbState : getChildLbStates()) {
            ConnectivityState state = childLbState.getCurrentState();
            if (state == CONNECTING || state == IDLE) {
              isConnecting = true;
              break;
            }
          }
    
          if (isConnecting) {
            updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult()));
          } else {
            updateBalancingState(TRANSIENT_FAILURE, createReadyPicker(getChildLbStates()));
          }
        } else {
          updateBalancingState(READY, createReadyPicker(activeList));
        }
      }
      
      
      private void updateBalancingState(ConnectivityState state, SubchannelPicker picker) {
        if (state != currentConnectivityState || !picker.equals(currentPicker)) {
          getHelper().updateBalancingState(state, picker);
          currentConnectivityState = state;
          currentPicker = picker;
        }
      }

     

     

     

     

     

    +) 추가적으로 GrpcChannel의 actuator metric 정보는 어떻게 받을까?

     

    Spring Boot Actuator Support

    Spring Boot starter module for gRPC framework.

    yidongnan.github.io

     

    GrpcClientMetricAutoConfigurationGrpcServerMetricAutoConfiguration 을 통해서 수집되고 있다.

    /**
     * Auto configuration class for Spring-Boot. This allows zero config client metrics for gRPC services.
     *
     * @author Daniel Theuke (daniel.theuke@heuboe.de)
     */
    @Configuration(proxyBeanMethods = false)
    @AutoConfigureAfter(CompositeMeterRegistryAutoConfiguration.class)
    @AutoConfigureBefore(GrpcClientAutoConfiguration.class)
    @ConditionalOnBean(MeterRegistry.class)
    public class GrpcClientMetricAutoConfiguration {
    
        /**
         * Creates a {@link ClientInterceptor} that collects metrics about incoming and outgoing requests and responses.
         *
         * @param registry The registry used to create the metrics.
         * @return The newly created MetricCollectingClientInterceptor bean.
         */
        @Bean
        @ConditionalOnMissingBean
        public MetricCollectingClientInterceptor metricCollectingClientInterceptor(final MeterRegistry registry) {
            return new MetricCollectingClientInterceptor(registry);
        }
    
    }
    
    
    /**
     * A gRPC client interceptor that will collect metrics for micrometer.
     *
     * @author Daniel Theuke (daniel.theuke@heuboe.de)
     */
    @GrpcGlobalClientInterceptor
    @Order(InterceptorOrder.ORDER_TRACING_METRICS)
    public class MetricCollectingClientInterceptor extends AbstractMetricCollectingInterceptor
            implements ClientInterceptor {
    
        /**
         * Creates a new gRPC client interceptor that will collect metrics into the given {@link MeterRegistry}.
         *
         * @param registry The registry to use.
         */
        @Autowired
        public MetricCollectingClientInterceptor(final MeterRegistry registry) {
            super(registry);
        }
    
        /**
         * Creates a new gRPC client interceptor that will collect metrics into the given {@link MeterRegistry} and uses the
         * given customizer to configure the {@link Counter}s and {@link Timer}s.
         *
         * @param registry The registry to use.
         * @param counterCustomizer The unary function that can be used to customize the created counters.
         * @param timerCustomizer The unary function that can be used to customize the created timers.
         * @param eagerInitializedCodes The status codes that should be eager initialized.
         */
        public MetricCollectingClientInterceptor(final MeterRegistry registry,
                final UnaryOperator<Counter.Builder> counterCustomizer,
                final UnaryOperator<Timer.Builder> timerCustomizer, final Code... eagerInitializedCodes) {
            super(registry, counterCustomizer, timerCustomizer, eagerInitializedCodes);
        }
    
        @Override
        protected Counter newRequestCounterFor(final MethodDescriptor<?, ?> method) {
            return this.counterCustomizer.apply(
                    prepareCounterFor(method,
                            METRIC_NAME_CLIENT_REQUESTS_SENT,
                            "The total number of requests sent"))
                    .register(this.registry);
        }
    
        @Override
        protected Counter newResponseCounterFor(final MethodDescriptor<?, ?> method) {
            return this.counterCustomizer.apply(
                    prepareCounterFor(method,
                            METRIC_NAME_CLIENT_RESPONSES_RECEIVED,
                            "The total number of responses received"))
                    .register(this.registry);
        }
    
        @Override
        protected Function<Code, Timer> newTimerFunction(final MethodDescriptor<?, ?> method) {
            return asTimerFunction(() -> this.timerCustomizer.apply(
                    prepareTimerFor(method,
                            METRIC_NAME_CLIENT_PROCESSING_DURATION,
                            "The total time taken for the client to complete the call, including network delay")));
        }
    
        @Override
        public <Q, A> ClientCall<Q, A> interceptCall(
                final MethodDescriptor<Q, A> methodDescriptor,
                final CallOptions callOptions,
                final Channel channel) {
    
            final MetricSet metrics = metricsFor(methodDescriptor);
            final Consumer<Code> processingDurationTiming = metrics.newProcessingDurationTiming(this.registry);
    
            return new MetricCollectingClientCall<>(
                    channel.newCall(methodDescriptor, callOptions),
                    metrics.getRequestCounter(),
                    metrics.getResponseCounter(),
                    processingDurationTiming);
        }
    
    }

     

    @Slf4j
    @Configuration(proxyBeanMethods = false)
    @AutoConfigureAfter(CompositeMeterRegistryAutoConfiguration.class)
    @AutoConfigureBefore(GrpcServerAutoConfiguration.class)
    @ConditionalOnBean(MeterRegistry.class)
    public class GrpcServerMetricAutoConfiguration {
    
        @Bean
        @ConditionalOnMissingBean
        public MetricCollectingServerInterceptor metricCollectingServerInterceptor(final MeterRegistry registry,
                final Collection<BindableService> services) {
            final MetricCollectingServerInterceptor metricCollector = new MetricCollectingServerInterceptor(registry);
            log.debug("Pre-Registering service metrics");
            for (final BindableService service : services) {
                log.debug("- {}", service);
                metricCollector.preregisterService(service);
            }
            return metricCollector;
        }
    
        @Bean
        @Lazy
        InfoContributor grpcInfoContributor(final GrpcServerProperties properties,
                final Collection<BindableService> grpcServices) {
            final Map<String, Object> details = new LinkedHashMap<>();
            details.put("port", properties.getPort());
    
            if (properties.isReflectionServiceEnabled()) {
                // Only expose services via web-info if we do the same via grpc.
                final Map<String, List<String>> services = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
                details.put("services", services);
                for (final BindableService grpcService : grpcServices) {
                    final ServiceDescriptor serviceDescriptor = grpcService.bindService().getServiceDescriptor();
    
                    final List<String> methods = collectMethodNamesForService(serviceDescriptor);
                    services.put(serviceDescriptor.getName(), methods);
                }
            }
    
            return new SimpleInfoContributor("grpc.server", details);
        }
    
        /**
         * Gets all method names from the given service descriptor.
         *
         * @param serviceDescriptor The service descriptor to get the names from.
         * @return The newly created and sorted list of the method names.
         */
        protected List<String> collectMethodNamesForService(final ServiceDescriptor serviceDescriptor) {
            final List<String> methods = new ArrayList<>();
            for (final MethodDescriptor<?, ?> grpcMethod : serviceDescriptor.getMethods()) {
                methods.add(extractMethodName(grpcMethod));
            }
            methods.sort(String.CASE_INSENSITIVE_ORDER);
            return methods;
        }
    
    }

     

     

     

    추가조사까지 끝 ✨✨✨

Designed by Tistory.