Skip to content

Notification Stream#

In the previous chapter we looked at receiveConfigNotifications function that is responsible for receiving configuration notifications from the NDK. We saw that it starts with creating the notification stream - by calling a.StartConfigNotificationStream(ctx) - and then starts receiving notifications from it.

Let's see how the notification stream is created and how we receive notifications from it.

greeter/notification.go
func (a *App) StartConfigNotificationStream(ctx context.Context) chan *ndk.NotificationStreamResponse {
    streamID := a.createNotificationStream(ctx)

    a.logger.Info().
        Uint64("stream-id", streamID).
        Msg("Notification stream created")

    a.addConfigSubscription(ctx, streamID)

    streamChan := make(chan *ndk.NotificationStreamResponse)
    go a.startNotificationStream(ctx, streamID,
        "config", streamChan)

    return streamChan
}

The StartConfigNotificationStream performs three major tasks:

  1. Create the notification stream and associated Stream ID
  2. Add the Config subscription to the allocated notification stream
  3. Creates the streaming client and starts sending received notifications to the streamChan channel

Wouldn't hurt to have a look at each of these tasks in more detail.

Creating Notification Stream#

First, on line 2, we create a notification stream as explained in the Creating Notification Stream section.

greeter/notification.go
func (a *App) createNotificationStream(ctx context.Context) uint64 {
    retry := time.NewTicker(a.retryTimeout)

    for {
        // get subscription and streamID
        notificationResponse, err := a.SDKMgrServiceClient.NotificationRegister(ctx,
            &ndk.NotificationRegisterRequest{
                Op: ndk.NotificationRegisterRequest_Create,
            })
        if err != nil || notificationResponse.GetStatus() != ndk.SdkMgrStatus_kSdkMgrSuccess {
            a.logger.Printf("agent %q could not register for notifications: %v. Status: %s",
                a.Name, err, notificationResponse.GetStatus().String())
            a.logger.Printf("agent %q retrying in %s", a.Name, a.retryTimeout)

            <-retry.C // retry timer
            continue
        }

        return notificationResponse.GetStreamId()
    }
}

The function tries to create a notification stream for the greeter application with a retry timeout and returns the allocated Stream ID when it succeeds. The Stream ID is later used to request notification delivery of a specific type, which is in our case the Config Notification.

Adding Config Subscription#

With the notification stream created, we now request the NDK to deliver updates of our app's configuration. These are the updates made to the config tree of the greeter app, and it has only one configurable field - name leaf.

This is done in the a.addConfigSubscription(ctx, streamID) function.

greeter/notification.go
func (a *App) addConfigSubscription(ctx context.Context, streamID uint64) {
    // create notification register request for Config service
    // using acquired stream ID
    notificationRegisterReq := &ndk.NotificationRegisterRequest{
        Op:       ndk.NotificationRegisterRequest_AddSubscription,
        StreamId: streamID,
        SubscriptionTypes: &ndk.NotificationRegisterRequest_Config{ // config service
            Config: &ndk.ConfigSubscriptionRequest{},
        },
    }

    registerResp, err := a.SDKMgrServiceClient.NotificationRegister(ctx, notificationRegisterReq)
    if err != nil || registerResp.GetStatus() != ndk.SdkMgrStatus_kSdkMgrSuccess {
        a.logger.Printf("agent %s failed registering to notification with req=%+v: %v",
            a.Name, notificationRegisterReq, err)
    }
}

To indicate that we want to receive config notifications over the created notification stream we have to craft the NotificationRegisterRequest. We populate it with the streamID received after creating the notification stream to specify the stream we want to receive the notifications on.

The SubscriptionTypes set to the &ndk.NotificationRegisterRequest_Config value indicates that we would like to receive updates of this specific type as they convey configuration updates.

And we pass the empty ConfigSubscriptionRequest request since we don't want to apply any filtering on the notifications we receive.

Executing NotificationRegister function of the SDKMgrServiceClient with notification Stream ID and NotificationRegisterRequest effectively tells NDK about our intention to receive Config messages.

It is time to start the notification stream.

Starting Notification Stream#

The last bits in the StartConfigNotificationStream function create a Go channel1 of type NotificationStreamResponse and pass it to the startNotificationStream function that is started in its own goroutine. Here is the startNotificationStream function:

greeter/notification.go
func (a *App) startNotificationStream(ctx context.Context,
    streamID uint64,
    subscType string,
    streamChan chan *ndk.NotificationStreamResponse,
) {
    defer close(streamChan)

    a.logger.Info().
        Uint64("stream-id", streamID).
        Str("subscription-type", subscType).
        Msg("Starting streaming notifications")

    retry := time.NewTicker(a.retryTimeout)
    streamClient := a.getNotificationStreamClient(ctx, streamID)

    for {
        select {
        case <-ctx.Done():
            return
        default:
            streamResp, err := streamClient.Recv()
            if err == io.EOF {
                a.logger.Printf("agent %s received EOF for stream %v", a.Name, subscType)
                a.logger.Printf("agent %s retrying in %s", a.Name, a.retryTimeout)

                <-retry.C // retry timer
                continue
            }
            if err != nil {
                a.logger.Printf("agent %s failed to receive notification: %v", a.Name, err)

                <-retry.C // retry timer
                continue
            }
            streamChan <- streamResp
        }
    }
}

Let's have a look at the two major parts of the function - creating the streaming client and receiving notifications.

Stream Client#

The function starts with creating a Notification Stream Client with a.getNotificationStreamClient(ctx, req) function call. This client is a pure gRPC construct, it is automatically generated from the gRPC service proto file and facilitates the streaming of notifications.

greeter/notification.go
func (a *App) getNotificationStreamClient(ctx context.Context, streamID uint64) ndk.SdkNotificationService_NotificationStreamClient {
    retry := time.NewTicker(a.retryTimeout)

    for {
        streamClient, err := a.NotificationServiceClient.NotificationStream(ctx,
            &ndk.NotificationStreamRequest{
                StreamId: streamID,
            })
        if err != nil {
            a.logger.Info().Msgf("agent %s failed creating stream client with stream ID=%d: %v", a.Name, streamID, err)
            a.logger.Printf("agent %s retrying in %s", a.Name, a.retryTimeout)
            time.Sleep(a.retryTimeout)

            <-retry.C // retry timer
            continue
        }

        return streamClient
    }
}

Receiving Notifications#

Coming back to our startNotificationStream function, we can see that it loops over the notifications received from the NDK until the parent context is cancelled. The streamClient.Recv() function call is a blocking call that waits for the next notification to be streamed from the NDK.

greeter/notification.go
func (a *App) startNotificationStream(ctx context.Context,
    streamID uint64,
    subscType string,
    streamChan chan *ndk.NotificationStreamResponse,
) {
    defer close(streamChan)

    a.logger.Info().
        Uint64("stream-id", streamID).
        Str("subscription-type", subscType).
        Msg("Starting streaming notifications")

    retry := time.NewTicker(a.retryTimeout)
    streamClient := a.getNotificationStreamClient(ctx, streamID)

    for {
        select {
        case <-ctx.Done():
            return
        default:
            streamResp, err := streamClient.Recv()
            if err == io.EOF {
                a.logger.Printf("agent %s received EOF for stream %v", a.Name, subscType)
                a.logger.Printf("agent %s retrying in %s", a.Name, a.retryTimeout)

                <-retry.C // retry timer
                continue
            }
            if err != nil {
                a.logger.Printf("agent %s failed to receive notification: %v", a.Name, err)

                <-retry.C // retry timer
                continue
            }
            streamChan <- streamResp
        }
    }
}

When the notification is received, it is passed to the streamChan channel. On the receiving end of this channel is our app's Start function that starts the aggregateConfigNotifications function for each received notification.

Stream Response Type

If you wonder what type the notifications are, it solely depends on the type of subscriptions we added on the notification stream. In our case, we only added the Config subscription, so the notifications we receive will be backed by the ConfigNotification type.

Since the Notification Client can transport notifications of different types, the notification type is hidden behind the NotificationStreamResponse type. The NotificationStreamResponse embeds the Notification message that can be one of the following types:

message Notification
{
    uint64 sub_id                              = 1;  /* Subscription identifier */
    oneof subscription_types
    {
        InterfaceNotification intf             = 10;  // Interface details
        NetworkInstanceNotification nw_inst    = 11;  // Network instance details
        LldpNeighborNotification lldp_neighbor = 12;  // LLDP neighbor details
        ConfigNotification config              = 13;  // Configuration notification
        BfdSessionNotification bfd_session     = 14;  // BFD session details
        IpRouteNotification route              = 15;  // IP route details
        AppIdentNotification appid             = 16;  // App identification details
        NextHopGroupNotification nhg           = 17;  // Next-hop group details
    }
}

See the ConfigNotification type? This is what we expect to receive in our app.

Now our configuration notifications are streamed from the NDK to our app. Let's see how we process them to update the app's configuration.


  1. Here is where Go channels come really handy because we can use them to deliver the notifications to our app. 

Comments