Basic tutorial から一歩進むPythonでのgRPC
アソビュー! Advent Calendar 2021の11日目です。
こんにちは、アソビュー!SREチームのkirimaruです。
夏休みの宿題は追い込まれてからやるタイプです。(執筆日12月10日)
ただここから更に本番稼働させていく際に欲しい機能がいくつかあり、それらが上記に内包されていないので本記事ではそこに触れていこうと思います。
具体的にはserver側の実装で下記3つの機能になります。
- Server Reflection
- Health Check
- Graceful Shutdown
基本的なserverの実装は上記のtutorialにあるこちらを参考にします。
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
route_guide_pb2_grpc.add_RouteGuideServicer_to_server(
RouteGuideServicer(), server)
server.add_insecure_port('[::]:50051')
server.start()
server.wait_for_termination()
Server Reflection
gRPC Server Reflectionは、対象のgRPCサーバーの情報を参照することができるようになる機能です。これを有効にするとgRPCurl や Evans といったツールを使う際に手元にprotoファイルがなくてもリクエストを送ることができます。
PythonのgRPC Serverではデフォルトでこの機能が有効になっていないため、明示的に有効化する必要があります。
grpcio-reflectionで提供しているので、これを各サービスに対して有効化するだけで簡単に利用することができるようになります。
from grpc_reflection.v1alpha import reflectionservices = tuple(
service.full_name for service in
route_guide_pb2.DESCRIPTOR.services_by_name.values())
reflection.enable_server_reflection(services, server)
Health Check
アソビューではgRPCサーバーのロードバランシングにenvoyを使っているため、envoyからのHealth Checkを行う必要がありました。
他社様の記事で恐縮ですがこちらの記事と同様の構成になっています。
余談ですがkubernetesのlivenessやreadinessはtcpでも実施可能です。
別途httpサーバーを起動している場合はそちらでやるのもいいかもしれません。
livenessProbe:
tcpSocket:
port: 50051
readinessProbe:
tcpSocket:
port: 50051
実装はGoogleがHealth Check用のprotoを提供しているのでそちらを利用します。
まずはサービスの実装のインスタンス化とサーバーへの登録を行います。
from grpc_health.v1 import health
from grpc_health.v1 import health_pb2
from grpc_health.v1 import health_pb2_grpchealth_servicer = health.HealthServicer(
experimental_non_blocking=True,
experimental_thread_pool=
futures.ThreadPoolExecutor(max_workers=100))health_pb2_grpc.add_HealthServicer_to_server(
health_servicer, server)
これで起動後にステータスを参照できるようになったので、サーバー起動前に起動時のステータスを設定します。先程のサービス一覧をとっている変数にreflectionとhealthのサービスも追加しておきます。
services = tuple(
service.full_name for service in
route_guide_pb2.DESCRIPTOR.services_by_name.values())
+ (reflection.SERVICE_NAME, health.SERVICE_NAME)for service in services:
health_servicer.set(
service, health_pb2.HealthCheckResponse.SERVING)
Graceful Shutdown
最後にデプロイ等に備えてGraceful Shutdownを実装していきます。wait_for_termination
だけではSIGTERMが送られた場合処理中の処理がすべて中断され、そのまま停止してしまいます。そこでこれに備えた処理を実装する必要があります。実装に関しては下記が非常に参考になりました。
done = threading.Event()
def on_done(signum, frame):
logger.info('Got signal {}, {}'.format(signum, frame))
done.set()
signal.signal(signal.SIGTERM, on_done)
done.wait()logger.info('Stopped RPC server, Waiting for RPCs to complete...')
server.stop(NUM_SECS_TO_WAIT)
server#stopでは指定した秒数で実行中の処理を待ってくれます。
ただこの処理にも注意点があり、PythonのgRPCサーバーはstop処理中でも新規コネクションを受け付けてしまいます。
そこで、Health Checkのステータスを明示的にNOT_SERVINGにして、Health Checkを行っている箇所から対象として除外させるようにします。
また、kubernetes環境で実行しているためserviceの対象から外れるまでNOT_SERVINGにした後に多少の待ち時間を入れています。
done = threading.Event()
def on_done(signum, frame):
logger.info('Got signal {}, {}'.format(signum, frame))
done.set()
signal.signal(signal.SIGTERM, on_done)
done.wait()health_servicer.enter_graceful_shutdown()
server.wait_for_termination(10)logger.info('Stopped RPC server, Waiting for RPCs to complete...')
server.stop(NUM_SECS_TO_WAIT)
まとめ
ここまでの実装をまとめたものが下記になります。
最初に比べると随分と長くなりました。他にも運用していく中で出てくることがあると思うので、出てきた段階で追記できればと思います。参考になれば幸いです。