Contact Us

Contact Us



Pleas confirm by checkbox


App DevelopmentTechnical

Part-3: Building a bidirectional-streaming gRPC service using Golang

Author_img
By Toran Sahu April 22, 2021

If you have been through part-2 of this blog series, then you know that the gRPC framework has got support for uni-directional streaming RPCs. But that is not the end. gRPC has support for bi-directional RPCs as well. Being said that, a gRPC client and a gRPC server can stream requests and responses simultaneously utilizing the same TCP connection.

Objective

In this blog, you’ll get to know what bi-directional streaming RPCs are. How to implement, test, and run them using a live, fully functional example.

Previously in part-2 of this blog series, we’ve learned the basics of uni-directional streaming gRPCs, how to implement those gRPC, how to write unit tests, how to launch the server & client. Part-2 walks you through a step-by-step guide to implement a Stack Machine server & client leveraging the uni-directional streaming RPC.

If you’ve missed that, I would recommend you to go through it to get familiar with the basics of the gRPC framework & streaming RPCs.

Introduction

Let’s understand how bi-directional streaming RPCs work at a very high level.

Bidirectional streaming RPCs where:

    • both sides send a sequence of messages using a read-write stream
    • the two streams operate independently, so clients and servers can read and write in whatever order they like
    • for example, the server could wait to receive all the client messages before writing its responses, or it could alternately read a message then write a message, or some other combination of reads and writes

The best thing is, the order of messages in each stream is preserved.

Now let’s improve the “Stack Machine” server & client codes to support bidirectional streaming.

Implementing Bidirectional Streaming RPC

We already have Server-streaming RPC ServerStreamingExecute() to handle FIB operation which streams the numbers from Fibonacci series, and Client-streaming RPC Execute() to handle the stream of instructions to perform basic ADD/SUB/MUL/DIV operations and return a single response.

In this blog we’ll merge both the functionality to make the Execute() RPC a bidirectional streaming one.

Update the protobuf

Let’s update the machine/machine.proto to make Execute() a bi-directional (server & client streaming) RPC, so that the client can stream the instructions rather than sending a set of instructions to the server & the server can respond with a stream of results. Doing so, we’re getting rid of InstructionSet and ServerStreamingExecute().

The updated machine/machine.proto now looks like:

service Machine {

-     rpc Execute(stream Instruction) returns (Result) {}

-     rpc ServerStreamingExecute(InstructionSet) returns (stream Result) {}

+     rpc Execute(stream Instruction) returns (stream Result) {}

 }

 

source: machine/machine.proto

Notice the stream keyword at two places in the Execute() RPC declaration.

Generating the updated client and server interface Go code

Now let’s generate an updated golang code from the machine/machine.proto by running:

~/disk/E/workspace/grpc-eg-go

$ SRC_DIR=./

$ DST_DIR=$SRC_DIR

$ protoc \

  -I=$SRC_DIR \

  --go_out=plugins=grpc:$DST_DIR \

  $SRC_DIR/machine/machine.proto

 

You’ll notice that declaration of ServerStreamingExecute() is not there in MachineServer & MachineClient interfaces. However, the signature of Execute() is intact.

...


 type MachineServer interface {

    Execute(Machine_ExecuteServer) error

-   ServerStreamingExecute(*InstructionSet, Machine_ServerStreamingExecuteServer) error

 }


...


 type MachineClient interface {

    Execute(ctx context.Context, opts ...grpc.CallOption) (Machine_ExecuteClient, error)

-   ServerStreamingExecute(ctx context.Context, in *InstructionSet, opts ...grpc.CallOption) (Machine_ServerStreamingExecuteClient, error)

 }

...

 

source: machine/machine.pb.go

Update the Server

Now we need to update the server code to make Execute() a bi-directional (server & client streaming) RPC so that it should be able to accept stream of the instructions from the client and, at the same time, it can respond with a stream of results.

func (s *MachineServer) Execute(stream machine.Machine_ExecuteServer) error {

    var stack stack.Stack

    for {

        instruction, err := stream.Recv()

        if err == io.EOF {

            log.Println("EOF")

            return nil

        }

        if err != nil {

            return err

        }



        operand := instruction.GetOperand()

        operator := instruction.GetOperator()

        op_type := OperatorType(operator)


        fmt.Printf("Operand: %v, Operator: %v\n", operand, operator)


        switch op_type {

        case PUSH:

            stack.Push(float32(operand))

        case POP:

            stack.Pop()

        case ADD, SUB, MUL, DIV:

            item2, popped := stack.Pop()

            item1, popped := stack.Pop()


            if !popped {

                return status.Error(codes.Aborted, "Invalid sets of instructions. Execution aborted")

            }


            var res float32

            if op_type == ADD {

                res = item1 + item2

            } else if op_type == SUB {

                res = item1 - item2

            } else if op_type == MUL {

                res = item1 * item2

            } else if op_type == DIV {

                res = item1 / item2

            }


            stack.Push(res)

            if err := stream.Send(&machine.Result{Output: float32(res)}); err != nil {

                return err

            }

        case FIB:

            n, popped := stack.Pop()


            if !popped {

                return status.Error(codes.Aborted, "Invalid sets of instructions. Execution aborted")

            }


            if op_type == FIB {

                for f := range utils.FibonacciRange(int(n)) {

                    if err := stream.Send(&machine.Result{Output: float32(f)}); err != nil {

                        return err

                    }

                }

            }

        default:

            return status.Errorf(codes.Unimplemented, "Operation '%s' not implemented yet", operator)

        }

    }

}

 

source: server/machine.go

Update the Client

Let’s update the client code to make client.Execute() a bi-directional streaming RPC so that the client can stream the instructions to the server and can receive a stream of results at the same time.

func runExecute(client machine.MachineClient, instructions []*machine.Instruction) {

    log.Printf("Streaming %v", instructions)

    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)

    defer cancel()

    stream, err := client.Execute(ctx)

    if err != nil {

        log.Fatalf("%v.Execute(ctx) = %v, %v: ", client, stream, err)

    }

    waitc := make(chan struct{})

    go func() {

        for {

            result, err := stream.Recv()

            if err == io.EOF {

                log.Println("EOF")

                close(waitc)

                return

            }

            if err != nil {

                log.Printf("Err: %v", err)

            }

            log.Printf("output: %v", result.GetOutput())

        }

    }()


    for _, instruction := range instructions {

        if err := stream.Send(instruction); err != nil {

            log.Fatalf("%v.Send(%v) = %v: ", stream, instruction, err)

        }

        time.Sleep(500 * time.Millisecond)

    }

    if err := stream.CloseSend(); err != nil {

        log.Fatalf("%v.CloseSend() got error %v, want %v", stream, err, nil)

    }

    <-waitc

}

 

source: client/machine.go

Test

Before we start updating the unit test, let’s generate mocks for MachineClient, Machine_ExecuteClient, and Machine_ExecuteServer interfaces to mock the stream type while testing the bidirectional streaming RPC Execute().

~/disk/E/workspace/grpc-eg-go

$ mockgen github.com/toransahu/grpc-eg-go/machine MachineClient,Machine_ExecuteServer,Machine_ExecuteClient > mock_machine/machine_mock.go

 

The updated mock_machine/machine_mock.go should look like this.

Now, we’re good to write a unit test for bidirectional streaming RPC Execute().

Server

Let’s update the unit test to test the server-side logic of Execute() RPC for bidirectional streaming using mock:

func TestExecute(t *testing.T) {

    s := MachineServer{}


    ctrl := gomock.NewController(t)

    defer ctrl.Finish()

    mockServerStream := mock_machine.NewMockMachine_ExecuteServer(ctrl)


    mockResults := []*machine.Result{}

    callRecv1 := mockServerStream.EXPECT().Recv().Return(&machine.Instruction{Operand: 1, Operator: "PUSH"}, nil)

    callRecv2 := mockServerStream.EXPECT().Recv().Return(&machine.Instruction{Operand: 2, Operator: "PUSH"}, nil).After(callRecv1)

    callRecv3 := mockServerStream.EXPECT().Recv().Return(&machine.Instruction{Operator: "MUL"}, nil).After(callRecv2)

    callRecv4 := mockServerStream.EXPECT().Recv().Return(&machine.Instruction{Operand: 3, Operator: "PUSH"}, nil).After(callRecv3)

    callRecv5 := mockServerStream.EXPECT().Recv().Return(&machine.Instruction{Operator: "ADD"}, nil).After(callRecv4)

    callRecv6 := mockServerStream.EXPECT().Recv().Return(&machine.Instruction{Operator: "FIB"}, nil).After(callRecv5)

    mockServerStream.EXPECT().Recv().Return(nil, io.EOF).After(callRecv6)

    mockServerStream.EXPECT().Send(gomock.Any()).DoAndReturn(

        func(result *machine.Result) error {

            mockResults = append(mockResults, result)

            return nil

        }).AnyTimes()

    wants := []float32{2, 5, 0, 1, 1, 2, 3, 5}


    err := s.Execute(mockServerStream)

    if err != nil {

        t.Errorf("Execute(%v) got unexpected error: %v", mockServerStream, err)

    }

    for i, result := range mockResults {

        got := result.GetOutput()

        want := wants[i]

        if got != want {

            t.Errorf("got %v, want %v", got, want)

        }

    }

}

 

Please refer to the server/machine_test.go for detailed content.

Let’s run the unit test:

~/disk/E/workspace/grpc-eg-go

$ go test server/machine.go server/machine_test.go

ok      command-line-arguments  0.004s

 

Client

Now, add unit test to test client-side logic of Execute() RPC for bidirectional streaming using mock:

func testExecute(t *testing.T, client machine.MachineClient) {

    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)

    defer cancel()


    instructions := []*machine.Instruction{}

    instructions = append(instructions, &machine.Instruction{Operand: 5, Operator: "PUSH"})

    instructions = append(instructions, &machine.Instruction{Operand: 6, Operator: "PUSH"})

    instructions = append(instructions, &machine.Instruction{Operator: "MUL"})


    stream, err := client.Execute(ctx)

    if err != nil {

        log.Fatalf("%v.Execute(%v) = _, %v: ", client, ctx, err)

    }

    for _, instruction := range instructions {

        if err := stream.Send(instruction); err != nil {

            log.Fatalf("%v.Send(%v) = %v: ", stream, instruction, err)

        }

    }

    result, err := stream.Recv()

    if err != nil {

        log.Fatalf("%v.Recv() got error %v, want %v", stream, err, nil)

    }


    got := result.GetOutput()

    want := float32(30)

    if got != want {

        t.Errorf("got %v, want %v", got, want)

    }

}


func TestExecute(t *testing.T) {

    ctrl := gomock.NewController(t)

    defer ctrl.Finish()

    mockMachineClient := mock_machine.NewMockMachineClient(ctrl)


    mockClientStream := mock_machine.NewMockMachine_ExecuteClient(ctrl)

    mockClientStream.EXPECT().Send(gomock.Any()).Return(nil).AnyTimes()

    mockClientStream.EXPECT().Recv().Return(&machine.Result{Output: 30}, nil)


    mockMachineClient.EXPECT().Execute(

        gomock.Any(), // context

    ).Return(mockClientStream, nil)


    testExecute(t, mockMachineClient)

}

 

Please refer to the mock_machine/machine_mock_test.go for detailed content.

Let’s run the unit test:

~/disk/E/workspace/grpc-eg-go

$ go test mock_machine/machine_mock.go mock_machine/machine_mock_test.go

ok      command-line-arguments  0.003s

 

Run

Unit tests assure us that the business logic of the server & client codes is working as expected. Let’s try running the server and communicating to it via our client code.

Server

To spin up the server we need to run the previously created cmd/run_machine_server.go file.

~/disk/E/workspace/grpc-eg-go

$ go run cmd/run_machine_server.go

 

Client

Now, let’s run the client code client/machine.go.

~/disk/E/workspace/grpc-eg-go

$ go run client/machine.go

Streaming [operator:"PUSH" operand:1  operator:"PUSH" operand:2  operator:"ADD"  operator:"PUSH" operand:3  operator:"DIV"  operator:"PUSH" operand:4  operator:"MUL"  operator:"FIB"  operator:"PUSH" operand:5  operator:"PUSH" operand:6  operator:"SUB" ]

output: 3

output: 1

output: 4

output: 0

output: 1

output: 1

output: 2

output: 3

output: -1

EOF

 

Bonus

There are situations when one has to choose between mocking a dependency versus incorporating the dependencies into the test environment & running them live.

The decision – whether to mock or not could be made based on:

    1. how many dependencies are there
    2. which are the essential & most used dependencies
    3. is it feasible to install dependencies on the test (and even the developer’s) environment, etc.

To one extreme we can mock everything. But the mocking effort should pay us off.

For the gRPC framework, we can run the gRPC server live & write client codes to test against the business logic.

But spinning up a server from the test file can lead to unintended consequences that may require you to allocate a TCP port (parallel runs, multiple runs under the same CI server).

To solve this, gRPC community has introduced a package called bufconn under gRPC’s testing package. bufconn is a package that provides a Listener object that implements net.Conn. We can substitute this listener in a gRPC server – allowing us to spin up a server that acts as a full-fledged server that can be used for testing that talks over an in-memory buffer instead of a real port.

As bufconn already comes with the grpc go module – which we already have installed, we don’t need to install it explicitly.

So, let’s create a new test file server/machine_live_test.go write the following test code to launch the gRPC server live using bufconn, and write a client to test the bidirectional RPC Execute().

const bufSize = 1024 * 1024


var lis *bufconn.Listener


func init() {

    lis = bufconn.Listen(bufSize)

    s := grpc.NewServer()

    machine.RegisterMachineServer(s, &MachineServer{})

    go func() {

        if err := s.Serve(lis); err != nil {

            log.Fatalf("Server exited with error: %v", err)

        }

    }()

}


func bufDialer(context.Context, string) (net.Conn, error) {

    return lis.Dial()

}


func testExecute_Live(t *testing.T, client machine.MachineClient, instructions []*machine.Instruction, wants []float32) {

    log.Printf("Streaming %v", instructions)

    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)

    defer cancel()

    stream, err := client.Execute(ctx)

    if err != nil {

        log.Fatalf("%v.Execute(ctx) = %v, %v: ", client, stream, err)

    }

    waitc := make(chan struct{})

    go func() {

        i := 0

        for {

            result, err := stream.Recv()

            if err == io.EOF {

                log.Println("EOF")

                close(waitc)

                return

            }

            if err != nil {

                log.Printf("Err: %v", err)

            }

            log.Printf("output: %v", result.GetOutput())

            got := result.GetOutput()

            want := wants[i]

            if got != want {

                t.Errorf("got %v, want %v", got, want)

            }

            i++

        }

    }()


    for _, instruction := range instructions {

        if err := stream.Send(instruction); err != nil {

            log.Fatalf("%v.Send(%v) = %v: ", stream, instruction, err)

        }

    }

    if err := stream.CloseSend(); err != nil {

        log.Fatalf("%v.CloseSend() got error %v, want %v", stream, err, nil)

    }

    <-waitc

}


func TestExecute_Live(t *testing.T) {

    ctx := context.Background()

    conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(bufDialer), grpc.WithInsecure())

    if err != nil {

        t.Fatalf("Failed to dial bufnet: %v", err)

    }

    defer conn.Close()

    client := machine.NewMachineClient(conn)



    // try Execute()

    instructions := []*machine.Instruction{

        {Operand: 1, Operator: "PUSH"},

        {Operand: 2, Operator: "PUSH"},

        {Operator: "ADD"},

        {Operand: 3, Operator: "PUSH"},

        {Operator: "DIV"},

        {Operand: 4, Operator: "PUSH"},

        {Operator: "MUL"},

        {Operator: "FIB"},

        {Operand: 5, Operator: "PUSH"},

        {Operand: 6, Operator: "PUSH"},

        {Operator: "SUB"},

    }

    wants := []float32{3, 1, 4, 0, 1, 1, 2, 3, -1}

    testExecute_Live(t, client, instructions, wants)

}

 

source: server/machine_live_test.go

Let’s run the unit test:

~/disk/E/workspace/grpc-eg-go 

$ go test server/machine.go server/machine_live_test.go 

ok      command-line-arguments  0.005s

 

Fantastic!!! Everything worked as expected.

At the end of this blog, we’ve learned:

    • How to define an interface for bi-directional streaming RPC using protobuf
    • How to write gRPC server & client logic for bi-directional streaming RPC
    • How to write and run the unit test for bi-directional streaming RPC
    • How to write and run the unit test for bi-directional streaming RPC by running the server live leveraging the bufconn package
    • How to run the gRPC server and a client can communicate to it

The source code of this example is available at toransahu/grpc-eg-go.

Feel free to write your queries or opinions. Let’s have a healthy discussion.

Related posts
Angular — How to render HTML containing Angular Components dynamically at run-time
App Development

Angular — How to render HTML containing Angular Components dynamically at run-time

By kulwinder.singh May 05, 2021
A Step-by-Step Guide to Easy Android in-App Review Setup.
App Development

A Step-by-Step Guide to Easy Android in-App Review Setup.

By kulwinder.singh April 16, 2021
Part -2: Building a unidirectional-streaming gRPC service using Golang
App Development

Part -2: Building a unidirectional-streaming gRPC service using Golang

By kulwinder.singh April 15, 2021
How to Integrate Firebase Authentication for Google Sign-in Functionality?
App Development

How to Integrate Firebase Authentication for Google Sign-in Functionality?

By kulwinder.singh April 09, 2021
Part-1: A Quick Overview of gRPC in Golang
App Development

Part-1: A Quick Overview of gRPC in Golang

By kulwinder.singh April 07, 2021
Publish Your Android Library on JitPack for Better Reachability
App Development

Publish Your Android Library on JitPack for Better Reachability

By kulwinder.singh April 02, 2021
How to Use Firebase Remote Config Efficiently?
App Development

How to Use Firebase Remote Config Efficiently?

By kulwinder.singh March 26, 2021
How to simplify Android app distribution with Fastlane and improve workflow?
App Development

How to simplify Android app distribution with Fastlane and improve workflow?

By kulwinder.singh March 18, 2021
Google Play Instant Run Integration
App Development

Google Play Instant Run Integration

By kulwinder.singh September 09, 2019
Data Auditing using Javers
App Development

Data Auditing using Javers

By kulwinder.singh August 28, 2019

Stay updated

Get the latest creative news from Fubiz about art, design and pop-culture.