In the previous chapter we looked at receiveConfigNotifications function that is responsible for receiving configuration notifications from the NDK. We saw that it starts with creating the notification stream - by calling a.StartConfigNotificationStream(ctx) - and then starts receiving notifications from it.
Let's see how the notification stream is created and how we receive notifications from it.
func(a*App)createNotificationStream(ctxcontext.Context)uint64{retry:=time.NewTicker(a.retryTimeout)for{// get subscription and streamIDnotificationResponse,err:=a.SDKMgrServiceClient.NotificationRegister(ctx,&ndk.NotificationRegisterRequest{Op:ndk.NotificationRegisterRequest_Create,})iferr!=nil||notificationResponse.GetStatus()!=ndk.SdkMgrStatus_kSdkMgrSuccess{a.logger.Printf("agent %q could not register for notifications: %v. Status: %s",a.Name,err,notificationResponse.GetStatus().String())a.logger.Printf("agent %q retrying in %s",a.Name,a.retryTimeout)<-retry.C// retry timercontinue}returnnotificationResponse.GetStreamId()}}
The function tries to create a notification stream for the greeter application with a retry timeout and returns the allocated Stream ID when it succeeds. The Stream ID is later used to request notification delivery of a specific type, which is in our case the Config Notification.
With the notification stream created, we now request the NDK to deliver updates of our app's configuration. These are the updates made to the config tree of the greeter app, and it has only one configurable field - name leaf.
func(a*App)addConfigSubscription(ctxcontext.Context,streamIDuint64){// create notification register request for Config service// using acquired stream IDnotificationRegisterReq:=&ndk.NotificationRegisterRequest{Op:ndk.NotificationRegisterRequest_AddSubscription,StreamId:streamID,SubscriptionTypes:&ndk.NotificationRegisterRequest_Config{// config serviceConfig:&ndk.ConfigSubscriptionRequest{},},}registerResp,err:=a.SDKMgrServiceClient.NotificationRegister(ctx,notificationRegisterReq)iferr!=nil||registerResp.GetStatus()!=ndk.SdkMgrStatus_kSdkMgrSuccess{a.logger.Printf("agent %s failed registering to notification with req=%+v: %v",a.Name,notificationRegisterReq,err)}}
To indicate that we want to receive config notifications over the created notification stream we have to craft the NotificationRegisterRequest. We populate it with the streamID received after creating the notification stream to specify the stream we want to receive the notifications on.
The SubscriptionTypes set to the &ndk.NotificationRegisterRequest_Config value indicates that we would like to receive updates of this specific type as they convey configuration updates.
And we pass the empty ConfigSubscriptionRequest request since we don't want to apply any filtering on the notifications we receive.
Executing NotificationRegister function of the SDKMgrServiceClient with notification Stream ID and NotificationRegisterRequest effectively tells NDK about our intention to receive Config messages.
The last bits in the StartConfigNotificationStream function create a Go channel1 of type NotificationStreamResponse and pass it to the startNotificationStream function that is started in its own goroutine. Here is the startNotificationStream function:
greeter/notification.go
func(a*App)startNotificationStream(ctxcontext.Context,streamIDuint64,subscTypestring,streamChanchan*ndk.NotificationStreamResponse,){deferclose(streamChan)a.logger.Info().Uint64("stream-id",streamID).Str("subscription-type",subscType).Msg("Starting streaming notifications")retry:=time.NewTicker(a.retryTimeout)streamClient:=a.getNotificationStreamClient(ctx,streamID)for{select{case<-ctx.Done():returndefault:streamResp,err:=streamClient.Recv()iferr==io.EOF{a.logger.Printf("agent %s received EOF for stream %v",a.Name,subscType)a.logger.Printf("agent %s retrying in %s",a.Name,a.retryTimeout)<-retry.C// retry timercontinue}iferr!=nil{a.logger.Printf("agent %s failed to receive notification: %v",a.Name,err)<-retry.C// retry timercontinue}streamChan<-streamResp}}}
Let's have a look at the two major parts of the function - creating the streaming client and receiving notifications.
The function starts with creating a Notification Stream Client with a.getNotificationStreamClient(ctx, req) function call. This client is a pure gRPC construct, it is automatically generated from the gRPC service proto file and facilitates the streaming of notifications.
greeter/notification.go
func(a*App)getNotificationStreamClient(ctxcontext.Context,streamIDuint64)ndk.SdkNotificationService_NotificationStreamClient{retry:=time.NewTicker(a.retryTimeout)for{streamClient,err:=a.NotificationServiceClient.NotificationStream(ctx,&ndk.NotificationStreamRequest{StreamId:streamID,})iferr!=nil{a.logger.Info().Msgf("agent %s failed creating stream client with stream ID=%d: %v",a.Name,streamID,err)a.logger.Printf("agent %s retrying in %s",a.Name,a.retryTimeout)time.Sleep(a.retryTimeout)<-retry.C// retry timercontinue}returnstreamClient}}
Coming back to our startNotificationStream function, we can see that it loops over the notifications received from the NDK until the parent context is cancelled. The streamClient.Recv() function call is a blocking call that waits for the next notification to be streamed from the NDK.
greeter/notification.go
func(a*App)startNotificationStream(ctxcontext.Context,streamIDuint64,subscTypestring,streamChanchan*ndk.NotificationStreamResponse,){deferclose(streamChan)a.logger.Info().Uint64("stream-id",streamID).Str("subscription-type",subscType).Msg("Starting streaming notifications")retry:=time.NewTicker(a.retryTimeout)streamClient:=a.getNotificationStreamClient(ctx,streamID)for{select{case<-ctx.Done():returndefault:streamResp,err:=streamClient.Recv()iferr==io.EOF{a.logger.Printf("agent %s received EOF for stream %v",a.Name,subscType)a.logger.Printf("agent %s retrying in %s",a.Name,a.retryTimeout)<-retry.C// retry timercontinue}iferr!=nil{a.logger.Printf("agent %s failed to receive notification: %v",a.Name,err)<-retry.C// retry timercontinue}streamChan<-streamResp}}}
When the notification is received, it is passed to the streamChan channel. On the receiving end of this channel is our app's Start function that starts the aggregateConfigNotifications function for each received notification.
Stream Response Type
If you wonder what type the notifications are, it solely depends on the type of subscriptions we added on the notification stream. In our case, we only added the Config subscription, so the notifications we receive will be backed by the ConfigNotification type.
Since the Notification Client can transport notifications of different types, the notification type is hidden behind the NotificationStreamResponse type. The NotificationStreamResponse embeds the Notification message that can be one of the following types: