Golang

Logrocket - How to use Go channels

---

Getting value from coroutine via pointer

package main

import (

"context"

log "github.com/sirupsen/logrus"

"golang.org/x/sync/errgroup"

"os"

"time"

)

func init() {

// Log as JSON instead of the default ASCII formatter.

log.SetFormatter(&log.TextFormatter{

FullTimestamp: true,

})

// Output to stdout instead of the default stderr

// Can be any io.Writer, see below for File example

log.SetOutput(os.Stdout)

// Only log the warning severity or above.

//log.SetLevel(log.WarnLevel)

}

func main() {

log.Infof("Start...")

ct00, cancel := context.WithTimeout(context.Background(), time.Second*5)

defer cancel()

var totalUnfiltered *int

g, ctx := errgroup.WithContext(ct00)

g.Go(func() error {

SleepWithContext(ctx, time.Second*20, func() { // 20 or 2 sec

aVal := 123

totalUnfiltered = &aVal

log.Infof("Writing a value")

})

return nil

})

err0 := g.Wait()

log.Infof("Finished waiting")

if err0 != nil {

log.Errorf("Error %v", err0)

}

if totalUnfiltered != nil {

log.Infof("We have a value %v", *totalUnfiltered)

} else {

anErr := ctx.Err()

log.Errorf("Ctx err %v", anErr)

}

}

func SleepWithContext(ctx context.Context, d time.Duration, f func()) {

timer := time.NewTimer(d)

select {

case <-ctx.Done():

log.Infof("Timer - Context is closed")

if !timer.Stop() { // To ensure the channel is empty after a call to Stop

<-timer.C

log.Infof("Timer - already stopped")

}

case <-timer.C:

log.Infof("Timer - Tick!!!")

f()

}

}

---

Getting value from goroutine via channel

package main

import (

"context"

log "github.com/sirupsen/logrus"

"golang.org/x/sync/errgroup"

"os"

"time"

)

func init() {

// Log as JSON instead of the default ASCII formatter.

log.SetFormatter(&log.TextFormatter{

FullTimestamp: true,

})

// Output to stdout instead of the default stderr

// Can be any io.Writer, see below for File example

log.SetOutput(os.Stdout)

// Only log the warning severity or above.

//log.SetLevel(log.WarnLevel)

}

func main() {

log.Infof("Start...")

ct00, cancel := context.WithTimeout(context.Background(), time.Second*5)

defer cancel()

var totalUnfilteredChan = make(chan int, 1)

g, ctx := errgroup.WithContext(ct00)

g.Go(func() error {

SleepWithContext(ctx, time.Second*2, func() { // 20 or 2 sec

defer close(totalUnfilteredChan)

totalUnfilteredChan <- 123

log.Infof("Writing a value")

})

return nil

})

err0 := g.Wait()

log.Infof("Finished waiting")

if err0 != nil {

log.Errorf("Error %v", err0)

}

var totalUnfiltered0 int

select {

//case <-ctx.Done():

// log.Errorf("Sender: Done %v", ctx.Err())

// return

case totalUnfiltered1, ok := <-totalUnfilteredChan:

if !ok {

log.Errorf("Sender: no values in channel %v\n", ctx.Err())

return

}

totalUnfiltered0 = totalUnfiltered1

default:

log.Errorf("No value provided: %v", ctx.Err())

break

}

// it hangs in case no value in channel

//totalUnfiltered0, ok := <-totalUnfilteredChan

//if !ok {

// log.Errorf("Sender: no values in channel %v\n", ctx.Err())

// return

//}

log.Infof("We have a value %v", totalUnfiltered0)

}

func SleepWithContext(ctx context.Context, d time.Duration, f func()) {

timer := time.NewTimer(d)

select {

case <-ctx.Done():

log.Infof("Timer - Context is closed")

if !timer.Stop() { // To ensure the channel is empty after a call to Stop

<-timer.C

log.Infof("Timer - already stopped")

}

case <-timer.C:

log.Infof("Timer - Tick!!!")

f()

}

}

---

Gather several results from goroutines

package main

import (

"context"

log "github.com/sirupsen/logrus"

"golang.org/x/sync/errgroup"

"os"

"sync/atomic"

"time"

)

func init() {

// Log as JSON instead of the default ASCII formatter.

log.SetFormatter(&log.TextFormatter{

FullTimestamp: true,

})

// Output to stdout instead of the default stderr

// Can be any io.Writer, see below for File example

log.SetOutput(os.Stdout)

// Only log the warning severity or above.

//log.SetLevel(log.WarnLevel)

}

func main() {

log.Infof("Start...")

ct00, cancel := context.WithTimeout(context.Background(), time.Second*10)

defer cancel()

const NumOfWorkers = 2

var totalUnfilteredChan = make(chan int, NumOfWorkers)

g, ctx := errgroup.WithContext(ct00)

sends := int32(NumOfWorkers)

// worker 1

g.Go(func() error {

SleepWithContext(ctx, time.Second*2, func() { // 20 or 2 sec

totalUnfilteredChan <- 123

log.Infof("Writing a value 123")

if atomic.AddInt32(&sends, -1) == 0 {

close(totalUnfilteredChan)

}

})

return nil

})

// worker 2

g.Go(func() error {

SleepWithContext(ctx, time.Second*2, func() { // 20 or 2 sec

totalUnfilteredChan <- 124

log.Infof("Writing a value 124")

})

if atomic.AddInt32(&sends, -1) == 0 {

close(totalUnfilteredChan)

}

return nil

})

var totalUnfiltered0 []int

g.Go(func() error {

hasValues := true

for hasValues {

select {

case <-ctx.Done():

return ctx.Err()

case aValue, more := <-totalUnfilteredChan:

if !more {

hasValues = false

break

}

totalUnfiltered0 = append(totalUnfiltered0, aValue)

}

}

return nil

})

err0 := g.Wait()

log.Infof("Finished waiting")

if err0 != nil {

log.Errorf("ErrorWg %v", err0)

}

if len(totalUnfiltered0) != NumOfWorkers {

log.Errorf("Wrong count %v", ctx.Err())

}

log.Infof("We have a value %v", totalUnfiltered0)

}

func SleepWithContext(ctx context.Context, d time.Duration, f func()) {

timer := time.NewTimer(d)

select {

case <-ctx.Done():

log.Infof("Timer - Context is closed")

if !timer.Stop() { // To ensure the channel is empty after a call to Stop

<-timer.C

log.Infof("Timer - already stopped")

}

case <-timer.C:

log.Infof("Timer - Tick!!!")

f()

}

}

---

Label Breaks In Go