Contact Us

Contact Us



Pleas confirm by checkbox


App DevelopmentTechnical

Part -2: Building a unidirectional-streaming gRPC service using Golang

Author_img
By Toran Sahu April 15, 2021

Have you ever wondered while developing a REST API that the server could get the capability to stream responses using the same TCP connection? Or, reversely, the REST client could have the ability to stream the requests to the server? This could have saved the cost of bringing up another service (like WebSocket) just to fulfill such a requirement.

For such cases, REST isn’t the only API architecture available. People can now bank on the gRPC model as it has begun to play a crucial role. gRPC’s unidirectional-streaming RPC feature could be the perfect choice to meet those requirements.

Objective

In this blog, you’ll get to know what client streaming & server streaming uni-directional RPCs are. I will also discuss how to implement, test, and run them using a live, fully functional example.

Previously, in Part-1 of this blog series, we’ve learned the basics of gRPC, how to implement a Simple/Unary gRPC, how to write unit tests, how to launch the server & client. Part-1 is a step-by-step guide to implement a Stack Machine server & client leveraging Simple/Unary RPC.

If you’ve missed that, it is highly recommended to go through it to get familiar with the basics of the gRPC framework.

Introduction

Let’s understand how Client streaming & Server streaming RPCs work at a very high level.

Client streaming RPCs where:

    • the client writes a sequence of messages and sends them to the server using a provided stream
    • once the client has finished writing the messages, it waits for the server to read them and return its response

Server streaming RPCs where:

    • the client sends a request to the server and gets a stream to read a sequence of messages back
    • the client reads from the returned stream until there are no more messages

The best thing is gRPC guarantees message ordering within an individual RPC call.

Now let’s improve the “Stack Machine” server & client codes to support unidirectional streaming.

Implementing Server Streaming RPC

We’ll see an example of Server Streaming first by implementing the FIB operation.

Where the FIB RPC will:

    • perform a Fibonacci operation
    • accept an integer input i.e. generate first N numbers of the Fibonacci series
    • will respond with a stream of integers i.e. first N numbers of the Fibonacci series

And later we’ll see how Client Streaming can be implemented so that a client can input a stream of instructions to the Stack Machine in real-time rather than sending a single request consisting of a set of instructions.

Update Protobuf

We already have defined the gRPC service Machine and a Simple (Unary) RPC method Execute inside our service definition in part-1 of the blog series. Now, let’s update the service definition to add one server streaming RPC called ServerStreamingExecute.

    • A server streaming RPC where the client sends a request to the server using the stub and waits for a response to come back as a stream of result
    • To specify a server-side streaming method, need to place the stream keyword before the response type

// ServerStreamingExecute accepts a set of Instructions from client and returns a stream of Result.

rpc ServerStreamingExecute(InstructionSet) returns (stream Result) {}

source: machine/machine.proto

Generating pdated Client & Server Interface Go Code

We need to generate the gRPC client and server interfaces from our machine/machine.proto service definition.

~/disk/E/workspace/grpc-eg-go

$ SRC_DIR=./

$ DST_DIR=$SRC_DIR

$ protoc \

  -I=$SRC_DIR \

  --go_out=plugins=grpc:$DST_DIR \

  $SRC_DIR/machine/machine.proto

 

You can observe that the declaration of ServerStreamingExecute() in the MachineClient and MachineServer interface has been auto-generated:

...

 type MachineClient interface {

    Execute(ctx context.Context, in *InstructionSet, opts ...grpc.CallOption) (*Result, error)

+   ServerStreamingExecute(ctx context.Context, in *InstructionSet, opts ...grpc.CallOption) (Machine_ServerStreamingExecuteClient, error)

 }

...

 type MachineServer interface {

    Execute(context.Context, *InstructionSet) (*Result, error)

+   ServerStreamingExecute(*InstructionSet, Machine_ServerStreamingExecuteServer) error

 }

 

source: machine/machine.pb.go

Update the Server

Just in case if you’re wondering, What if my service doesn’t implement some of the RPCs declared in the machine.pb.go file, then you’ll encounter the following error while launching your gRPC server.

~/disk/E/workspace/grpc-eg-go

$ go run cmd/run_machine_server.go

# command-line-arguments

cmd/run_machine_server.go:32:44: cannot use &server.MachineServer literal (type *server.MachineServer) as type machine.MachineServer in argument to machine.RegisterMachineServer:

        *server.MachineServer does not implement machine.MachineServer (missing ServerStreamingExecute method)

 

So, it’s always the best practice to keep your service in sync with the service definition i.e. machine/machine.proto & machine/machine.pb.go. If you do not want to support a particular RPC, or its implementation is not yet ready, just respond with Unimplemented error status. Example:

// ServerStreamingExecute runs the set of instructions given and streams a sequence of Results.

func (s *MachineServer) ServerStreamingExecute(instructions *machine.InstructionSet, stream machine.Machine_ServerStreamingExecuteServer) error {

    return status.Error(codes.Unimplemented, "ServerStreamingExecute() not implemented yet")

}

 

source: server/machine.go

Before we implement the ServerStreamingExecute() RPC, let’s write a Fibonacci series generator called FibonacciRange().

package utils

func FibonacciRange(n int) <-chan int {

    ch := make(chan int)

    fn := make([]int, n+1, n+2)

    fn[0] = 0

    fn[1] = 1

    go func() {

        defer close(ch)

        for i := 0; i <= n; i++ {

            var f int

            if i < 2 {

                f = fn[i]

            } else {

                f = fn[i-1] + fn[i-2]

            }

            fn[i] = f

            ch <- f

        }

    }()

    return ch

}

source: utils/fibonacci.go

The blog series assumes that you’re familiar with Golang basics & its concurrency paradigms & concepts like Channels. You can read more about the Channels from the official document.

This function yields the numbers of Fibonacci series till the Nth position.

Let’s also add a small unit test to validate the FibonacciRange() generator.

package utils

import (

    "testing"

)

func TestFibonacciRange(t *testing.T) {

    fibOf5 := []int{0, 1, 1, 2, 3, 5}

    i := 0

    for f := range FibonacciRange(5) {

        if f != fibOf5[i] {

            t.Errorf("got %d, want %d", f, fibOf5[i])

        }

        i++

    }

}

source: utils/fibonacci_test.go

Let’s implement ServerStreamingExecute() to handle the basic instructions PUSH/POP, and FIB with proper error handling. On completion of the execution of instructions set, it should POP the result from the Stack and should respond with a Result object to the client.

func (s *MachineServer) ServerStreamingExecute(instructions *machine.InstructionSet, stream machine.Machine_ServerStreamingExecuteServer) error {

    if len(instructions.GetInstructions()) == 0 {

        return status.Error(codes.InvalidArgument, "No valid instructions received")

    }

    var stack stack.Stack

    for _, instruction := range instructions.GetInstructions() {

        operand := instruction.GetOperand()

        operator := instruction.GetOperator()

        op_type := OperatorType(operator)

        log.Printf("Operand: %v, Operator: %v\n", operand, operator)

        switch op_type {

        case PUSH:

            stack.Push(float32(operand))

        case POP:

            stack.Pop()

        case FIB:

            n, popped := stack.Pop()

            if !popped {

                return status.Error(codes.Aborted, "Invalid sets of instructions. Execution aborted")

            }

            if op_type == FIB {

                for f := range utils.FibonacciRange(int(n)) {

                    log.Println(float32(f))

                    stream.Send(&machine.Result{Output: float32(f)})

                }

            }

        default:

            return status.Errorf(codes.Unimplemented, "Operation '%s' not implemented yet", operator)

        }

    }

    return nil

}

 

source: server/machine.go

Update the Client

Now, update the client code to call ServerStreamingExecute() where the client will receivenumbers of the Fibonacci series through the stream and print the same.

func runServerStreamingExecute(client machine.MachineClient, instructions *machine.InstructionSet) {

    log.Printf("Executing %v", instructions)

    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)

    defer cancel()

    stream, err := client.ServerStreamingExecute(ctx, instructions)

    if err != nil {

        log.Fatalf("%v.Execute(_) = _, %v: ", client, err)

    }

    for {

        result, err := stream.Recv()

        if err == io.EOF {

            log.Println("EOF")

            break

        }

        if err != nil {

            log.Printf("Err: %v", err)

            break

        }

        log.Printf("output: %v", result.GetOutput())

    }

    log.Println("DONE!")

}

source: client/machine.go

Test

To write the unit test we’ll have to generate the mock of multiple interfaces as required.

mockgen is the ready-to-go framework for mocking in Golang, so we’ll be leveraging it in our unit tests.

Server

As we’ve updated our interface i.e. machine/machine.pb.go, let’s update the mock for MachineClient interface. And as we’ve introduced a new RPC ServerStreamingExecute(), let’s generate the mock for ServerStream interface Machine_ServerStreamingExecuteServer as well.

~/disk/E/workspace/grpc-eg-go

$ mockgen github.com/toransahu/grpc-eg-go/machine MachineClient,Machine_ServerStreamingExecuteServer > mock_machine/machine_mock.go

The updated mock_machine/machine_mock.go should look like this.

Now, we’re good to write unit test for server-side streaming RPC ServerStreamingExecute():

func TestServerStreamingExecute(t *testing.T) {

    s := MachineServer{}


    // set up test table

    tests := []struct {

        instructions []*machine.Instruction

        want         []float32

    }{

        {

            instructions: []*machine.Instruction{

                {Operand: 5, Operator: "PUSH"},

                {Operator: "FIB"},

            },

            want: []float32{0, 1, 1, 2, 3, 5},

        },

        {

            instructions: []*machine.Instruction{

                {Operand: 6, Operator: "PUSH"},

                {Operator: "FIB"},

            },

            want: []float32{0, 1, 1, 2, 3, 5, 8},

        },

    }


    ctrl := gomock.NewController(t)

    defer ctrl.Finish()

    mockServerStream := mock_machine.NewMockMachine_ServerStreamingExecuteServer(ctrl)

    for _, tt := range tests {

        mockResults := []*machine.Result{}

        mockServerStream.EXPECT().Send(gomock.Any()).DoAndReturn(

            func(result *machine.Result) error {

                mockResults = append(mockResults, result)

                return nil

            }).AnyTimes()


        req := &machine.InstructionSet{Instructions: tt.instructions}


        err := s.ServerStreamingExecute(req, mockServerStream)

        if err != nil {

            t.Errorf("ServerStreamingExecute(%v) got unexpected error: %v", req, err)

        }

        for i, result := range mockResults {

            got := result.GetOutput()

            want := tt.want[i]

            if got != want {

                t.Errorf("got %v, want %v", got, want)

            }

        }

    }

}
 

Please refer to the server/machine_test.go for detailed content.

Let’s run the unit test:

~/disk/E/workspace/grpc-eg-go

$ go test server/machine.go server/machine_test.go

ok      command-line-arguments  0.003s

 

Client

For our new RPC ServerStreamingExecute(), let’s add the mock for ClientStream interface Machine_ServerStreamingExecuteClient as well.

~/disk/E/workspace/grpc-eg-go

$ mockgen github.com/toransahu/grpc-eg-go/machine MachineClient,Machine_ServerStreamingExecuteServer,Machine_ServerStreamingExecuteClient > mock_machine/machine_mock.go

 

source: mock_machine/machine_mock.go

Let’s add unit test to test client-side logic for server-side streaming RPC ServerStreamingExecute() using mock MockMachine_ServerStreamingExecuteClient :

func TestServerStreamingExecute(t *testing.T) {

    instructions := []*machine.Instruction{}

    instructions = append(instructions, &machine.Instruction{Operand: 1, Operator: "PUSH"})

    instructions = append(instructions, &machine.Instruction{Operator: "FIB"})

    instructionSet := &machine.InstructionSet{Instructions: instructions}


    ctrl := gomock.NewController(t)

    defer ctrl.Finish()

    mockMachineClient := mock_machine.NewMockMachineClient(ctrl)

    clientStream := mock_machine.NewMockMachine_ServerStreamingExecuteClient(ctrl)


    clientStream.EXPECT().Recv().Return(&machine.Result{Output: 0}, nil)


    mockMachineClient.EXPECT().ServerStreamingExecute(

        gomock.Any(),   // context

        instructionSet, // rpc uniary message

    ).Return(clientStream, nil)


    if err := testServerStreamingExecute(t, mockMachineClient, instructionSet); err != nil {

        t.Fatalf("Test failed: %v", err)

    }

}

Please refer to the mock_machine/machine_mock_test.go for detailed content.

Let’s run the unit test:

~/disk/E/workspace/grpc-eg-go

$ go test mock_machine/machine_mock.go mock_machine/machine_mock_test.go

ok      command-line-arguments  0.003s

 

Run

nit tests assure us that the business logic of the server & client codes is working as expected, let’s try running the server and communicating to it via our client code.

Server

To start the server we need to run the previously created cmd/run_machine_server.go file.

~/disk/E/workspace/grpc-eg-go

$ go run cmd/run_machine_server.go

 

Client

Now, let’s run the client code client/machine.go.

~/disk/E/workspace/grpc-eg-go

$ go run client/machine.go

Executing instructions:<operator:"PUSH" operand:5 > instructions:<operator:"PUSH" operand:6 > instructions:<operator:"MUL" >

output:30

Executing instructions:<operator:"PUSH" operand:6 > instructions:<operator:"FIB" >

output: 0

output: 1

output: 1

output: 2

output: 3

output: 5

output: 8

EOF

DONE!

 

Awesome! A Server Streaming RPC has been successfully implemented.

Implementing Client Streaming RPC

We have learned how to implement a Server Streaming RPC, now it’s time to explore the Client Streaming RPC.

To do so, we’ll not introduce another RPC, rather we’ll update the existing Execute() RPC to accept a stream of Instructions from the client in real-time rather than sending a single request comprisesa set of Instructions.

Update the protobuf

So, let’s update the interface:

service Machine {

-     rpc Execute(InstructionSet) returns (Result) {}

+     rpc Execute(stream Instruction) returns (Result) {}

      rpc ServerStreamingExecute(InstructionSet) returns (stream Result) {}

 }

 

source: machine/machine.proto

Generating the updated client and server interface Go code

Now let’s generate an updated golang code from the machine/machine.proto by running:

~/disk/E/workspace/grpc-eg-go

$ SRC_DIR=./

$ DST_DIR=$SRC_DIR

$ protoc \

  -I=$SRC_DIR \

  --go_out=plugins=grpc:$DST_DIR \

  $SRC_DIR/machine/machine.proto

 

You’ll notice that the declaration of Execute() has been updated from MachineServer & MachineClient interfaces.

type MachineServer interface {

-   Execute(context.Context, *InstructionSet) (*Result, error)

+   Execute(Machine_ExecuteServer) error

    ServerStreamingExecute(*InstructionSet, Machine_ServerStreamingExecuteServer) error

 }

 type MachineClient interface {

-    Execute(ctx context.Context, in *InstructionSet, opts ...grpc.CallOption) (*Result, error)

+    Execute(ctx context.Context, opts ...grpc.CallOption) (Machine_ExecuteClient, error)

     ServerStreamingExecute(ctx context.Context, in *InstructionSet, opts ...grpc.CallOption) (Machine_ServerStreamingExecuteClient, error)

 }

 

source: machine/machine.pb.go

Update the Server

Let’s update the server code to make Execute() a client streaming uni-directional RPC so that it should be able to accept stream the instructions from the client and respond with a Result struct.

func (s *MachineServer) Execute(stream machine.Machine_ExecuteServer) error {

    var stack stack.Stack

    for {

        instruction, err := stream.Recv()

        if err == io.EOF {

            log.Println("EOF")

            output, popped := stack.Pop()

            if !popped {

                return status.Error(codes.Aborted, "Invalid sets of instructions. Execution aborted")

            }


            if err := stream.SendAndClose(&machine.Result{

                Output: output,

            }); err != nil {

                return err

            }


            return nil

        }

        if err != nil {

            return err

        }


        operand := instruction.GetOperand()

        operator := instruction.GetOperator()

        op_type := OperatorType(operator)


        fmt.Printf("Operand: %v, Operator: %v\n", operand, operator)


        switch op_type {

        case PUSH:

            stack.Push(float32(operand))

        case POP:

            stack.Pop()

        case ADD, SUB, MUL, DIV:

            item2, popped := stack.Pop()

            item1, popped := stack.Pop()


            if !popped {

                return status.Error(codes.Aborted, "Invalid sets of instructions. Execution aborted")

            }


            if op_type == ADD {

                stack.Push(item1 + item2)

            } else if op_type == SUB {

                stack.Push(item1 - item2)

            } else if op_type == MUL {

                stack.Push(item1 * item2)

            } else if op_type == DIV {

                stack.Push(item1 / item2)

            }


        default:

            return status.Errorf(codes.Unimplemented, "Operation '%s' not implemented yet", operator)

        }

    }

}

source: server/machine.go

Update the Client

Now update the client code to make client.Execute() a uni-directional streaming RPC, so that the client can stream the instructions to the server and can receive a Result struct once the streaming completes.

func runExecute(client machine.MachineClient, instructions *machine.InstructionSet) {

    log.Printf("Streaming %v", instructions)

    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)

    defer cancel()

    stream, err := client.Execute(ctx)

    if err != nil {

        log.Fatalf("%v.Execute(ctx) = %v, %v: ", client, stream, err)

    }

    for _, instruction := range instructions.GetInstructions() {

        if err := stream.Send(instruction); err != nil {

            log.Fatalf("%v.Send(%v) = %v: ", stream, instruction, err)

        }

    }

    result, err := stream.CloseAndRecv()

    if err != nil {

        log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)

    }

    log.Println(result)

}

 

source: client/machine.go

Test

Generate mock for Machine_ExecuteClient and Machine_ExecuteServer interface to test client-streaming RPC Execute():

~/disk/E/workspace/grpc-eg-go

$ mockgen github.com/toransahu/grpc-eg-go/machine MachineClient,Machine_ServerStreamingExecuteClient,Machine_ServerStreamingExecuteServer,Machine_ExecuteServer,Machine_ExecuteClient > mock_machine/machine_mock.go

 

The updated mock_machine/machine_mock.go should look like this.

Server

Let’s update the unit test to test the server-side logic of client streaming Execute() RPC using mock:

func TestExecute(t *testing.T) {

    s := MachineServer{}

    ctrl := gomock.NewController(t)

    defer ctrl.Finish()

    mockServerStream := mock_machine.NewMockMachine_ExecuteServer(ctrl)

    mockResult := &machine.Result{}

    callRecv1 := mockServerStream.EXPECT().Recv().Return(&machine.Instruction{Operand: 5, Operator: "PUSH"}, nil)

    callRecv2 := mockServerStream.EXPECT().Recv().Return(&machine.Instruction{Operand: 6, Operator: "PUSH"}, nil).After(callRecv1)

    callRecv3 := mockServerStream.EXPECT().Recv().Return(&machine.Instruction{Operator: "MUL"}, nil).After(callRecv2)

    mockServerStream.EXPECT().Recv().Return(nil, io.EOF).After(callRecv3)

    mockServerStream.EXPECT().SendAndClose(gomock.Any()).DoAndReturn(

        func(result *machine.Result) error {

            mockResult = result

            return nil

        })



    err := s.Execute(mockServerStream)

    if err != nil {

        t.Errorf("Execute(%v) got unexpected error: %v", mockServerStream, err)

    }

    got := mockResult.GetOutput()

    want := float32(30)

    if got != want {

        t.Errorf("got %v, wanted %v", got, want)

    }

}

 

Please refer to the server/machine_test.go for detailed content.

Let’s run the unit test:

~/disk/E/workspace/grpc-eg-go

$ go test server/machine.go server/machine_test.go

ok      command-line-arguments  0.003s

 

Client

Now, add unit test to test client-side logic of client streaming Execute() RPC using mock:

func TestExecute(t *testing.T) {

    ctrl := gomock.NewController(t)

    defer ctrl.Finish()

    mockMachineClient := mock_machine.NewMockMachineClient(ctrl)


    mockClientStream := mock_machine.NewMockMachine_ExecuteClient(ctrl)

    mockClientStream.EXPECT().Send(gomock.Any()).Return(nil).AnyTimes()

    mockClientStream.EXPECT().CloseAndRecv().Return(&machine.Result{Output: 30}, nil)


    mockMachineClient.EXPECT().Execute(

        gomock.Any(), // context

    ).Return(mockClientStream, nil)


    testExecute(t, mockMachineClient)

}

Please refer to the mock_machine/machine_mock_test.go for detailed content.

Let’s run the unit test:

~/disk/E/workspace/grpc-eg-go

$ go test mock_machine/machine_mock.go mock_machine/machine_mock_test.go

ok      command-line-arguments  0.003s

 

Run all the unit tests at once:

~/disk/E/workspace/grpc-eg-go

$ go test ./...

?       github.com/toransahu/grpc-eg-go/client  [no test files]

?       github.com/toransahu/grpc-eg-go/cmd     [no test files]

?       github.com/toransahu/grpc-eg-go/machine [no test files]

ok      github.com/toransahu/grpc-eg-go/mock_machine    (cached)

ok      github.com/toransahu/grpc-eg-go/server  (cached)

ok      github.com/toransahu/grpc-eg-go/utils   (cached)

?       github.com/toransahu/grpc-eg-go/utils/stack     [no test files]

 

Run

Now we are assured through unit tests that the business logic of the server & client codes is working as expected. Let’s try running the server and communicating with it via our client code.

Server

To launch the server we need to run the previously created cmd/run_machine_server.go file.

~/disk/E/workspace/grpc-eg-go

$ go run cmd/run_machine_server.go

 

Client

Now, let’s run the client code client/machine.go.

~/disk/E/workspace/grpc-eg-go

$ go run client/machine.go

Streaming instructions:<operator:"PUSH" operand:5 > instructions:<operator:"PUSH" operand:6 > instructions:<operator:"MUL" >

output:30

Executing instructions:<operator:"PUSH" operand:6 > instructions:<operator:"FIB" >

output: 0

output: 1

output: 1

output: 2

output: 3

output: 5

output: 8

EOF

DONE!

 

Awesome!! We have successfully transformed a Unary RPC into Server Streaming RPC.

At the end of this blog, we’ve learned:

      • How to define an interface for uni-directional streaming RPCs using protobuf?
      • How to write gRPC server & client logic for uni-directional streaming RPCs?
      • How to write and run the unit test for server-streaming & client-streaming RPCs?
      • How to run the gRPC server and a client can communicate to it?

The source code of this example is available at toransahu/grpc-eg-go.

You can also git checkout to this commit SHA for Part-2(a) and to this commit SHA for Part-2(b).

See you in the next part of this blog series.

 

Related posts
Angular — How to render HTML containing Angular Components dynamically at run-time
App Development

Angular — How to render HTML containing Angular Components dynamically at run-time

By kulwinder.singh May 05, 2021
Part-3: Building a bidirectional-streaming gRPC service using Golang
App Development

Part-3: Building a bidirectional-streaming gRPC service using Golang

By kulwinder.singh April 22, 2021
A Step-by-Step Guide to Easy Android in-App Review Setup.
App Development

A Step-by-Step Guide to Easy Android in-App Review Setup.

By kulwinder.singh April 16, 2021
How to Integrate Firebase Authentication for Google Sign-in Functionality?
App Development

How to Integrate Firebase Authentication for Google Sign-in Functionality?

By kulwinder.singh April 09, 2021
Part-1: A Quick Overview of gRPC in Golang
App Development

Part-1: A Quick Overview of gRPC in Golang

By kulwinder.singh April 07, 2021
Publish Your Android Library on JitPack for Better Reachability
App Development

Publish Your Android Library on JitPack for Better Reachability

By kulwinder.singh April 02, 2021
How to Use Firebase Remote Config Efficiently?
App Development

How to Use Firebase Remote Config Efficiently?

By kulwinder.singh March 26, 2021
How to simplify Android app distribution with Fastlane and improve workflow?
App Development

How to simplify Android app distribution with Fastlane and improve workflow?

By kulwinder.singh March 18, 2021
Google Play Instant Run Integration
App Development

Google Play Instant Run Integration

By kulwinder.singh September 09, 2019
Data Auditing using Javers
App Development

Data Auditing using Javers

By kulwinder.singh August 28, 2019

Stay updated

Get the latest creative news from Fubiz about art, design and pop-culture.