Golang
Logrocket - How to use Go channels
---
Getting value from coroutine via pointer
package main
import (
"context"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"os"
"time"
)
func init() {
// Log as JSON instead of the default ASCII formatter.
log.SetFormatter(&log.TextFormatter{
FullTimestamp: true,
})
// Output to stdout instead of the default stderr
// Can be any io.Writer, see below for File example
log.SetOutput(os.Stdout)
// Only log the warning severity or above.
//log.SetLevel(log.WarnLevel)
}
func main() {
log.Infof("Start...")
ct00, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
var totalUnfiltered *int
g, ctx := errgroup.WithContext(ct00)
g.Go(func() error {
SleepWithContext(ctx, time.Second*20, func() { // 20 or 2 sec
aVal := 123
totalUnfiltered = &aVal
log.Infof("Writing a value")
})
return nil
})
err0 := g.Wait()
log.Infof("Finished waiting")
if err0 != nil {
log.Errorf("Error %v", err0)
}
if totalUnfiltered != nil {
log.Infof("We have a value %v", *totalUnfiltered)
} else {
anErr := ctx.Err()
log.Errorf("Ctx err %v", anErr)
}
}
func SleepWithContext(ctx context.Context, d time.Duration, f func()) {
timer := time.NewTimer(d)
select {
case <-ctx.Done():
log.Infof("Timer - Context is closed")
if !timer.Stop() { // To ensure the channel is empty after a call to Stop
<-timer.C
log.Infof("Timer - already stopped")
}
case <-timer.C:
log.Infof("Timer - Tick!!!")
f()
}
}
---
Getting value from goroutine via channel
package main
import (
"context"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"os"
"time"
)
func init() {
// Log as JSON instead of the default ASCII formatter.
log.SetFormatter(&log.TextFormatter{
FullTimestamp: true,
})
// Output to stdout instead of the default stderr
// Can be any io.Writer, see below for File example
log.SetOutput(os.Stdout)
// Only log the warning severity or above.
//log.SetLevel(log.WarnLevel)
}
func main() {
log.Infof("Start...")
ct00, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
var totalUnfilteredChan = make(chan int, 1)
g, ctx := errgroup.WithContext(ct00)
g.Go(func() error {
SleepWithContext(ctx, time.Second*2, func() { // 20 or 2 sec
defer close(totalUnfilteredChan)
totalUnfilteredChan <- 123
log.Infof("Writing a value")
})
return nil
})
err0 := g.Wait()
log.Infof("Finished waiting")
if err0 != nil {
log.Errorf("Error %v", err0)
}
var totalUnfiltered0 int
select {
//case <-ctx.Done():
// log.Errorf("Sender: Done %v", ctx.Err())
// return
case totalUnfiltered1, ok := <-totalUnfilteredChan:
if !ok {
log.Errorf("Sender: no values in channel %v\n", ctx.Err())
return
}
totalUnfiltered0 = totalUnfiltered1
default:
log.Errorf("No value provided: %v", ctx.Err())
break
}
// it hangs in case no value in channel
//totalUnfiltered0, ok := <-totalUnfilteredChan
//if !ok {
// log.Errorf("Sender: no values in channel %v\n", ctx.Err())
// return
//}
log.Infof("We have a value %v", totalUnfiltered0)
}
func SleepWithContext(ctx context.Context, d time.Duration, f func()) {
timer := time.NewTimer(d)
select {
case <-ctx.Done():
log.Infof("Timer - Context is closed")
if !timer.Stop() { // To ensure the channel is empty after a call to Stop
<-timer.C
log.Infof("Timer - already stopped")
}
case <-timer.C:
log.Infof("Timer - Tick!!!")
f()
}
}
---
Gather several results from goroutines
package main
import (
"context"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"os"
"sync/atomic"
"time"
)
func init() {
// Log as JSON instead of the default ASCII formatter.
log.SetFormatter(&log.TextFormatter{
FullTimestamp: true,
})
// Output to stdout instead of the default stderr
// Can be any io.Writer, see below for File example
log.SetOutput(os.Stdout)
// Only log the warning severity or above.
//log.SetLevel(log.WarnLevel)
}
func main() {
log.Infof("Start...")
ct00, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
const NumOfWorkers = 2
var totalUnfilteredChan = make(chan int, NumOfWorkers)
g, ctx := errgroup.WithContext(ct00)
sends := int32(NumOfWorkers)
// worker 1
g.Go(func() error {
SleepWithContext(ctx, time.Second*2, func() { // 20 or 2 sec
totalUnfilteredChan <- 123
log.Infof("Writing a value 123")
if atomic.AddInt32(&sends, -1) == 0 {
close(totalUnfilteredChan)
}
})
return nil
})
// worker 2
g.Go(func() error {
SleepWithContext(ctx, time.Second*2, func() { // 20 or 2 sec
totalUnfilteredChan <- 124
log.Infof("Writing a value 124")
})
if atomic.AddInt32(&sends, -1) == 0 {
close(totalUnfilteredChan)
}
return nil
})
var totalUnfiltered0 []int
g.Go(func() error {
hasValues := true
for hasValues {
select {
case <-ctx.Done():
return ctx.Err()
case aValue, more := <-totalUnfilteredChan:
if !more {
hasValues = false
break
}
totalUnfiltered0 = append(totalUnfiltered0, aValue)
}
}
return nil
})
err0 := g.Wait()
log.Infof("Finished waiting")
if err0 != nil {
log.Errorf("ErrorWg %v", err0)
}
if len(totalUnfiltered0) != NumOfWorkers {
log.Errorf("Wrong count %v", ctx.Err())
}
log.Infof("We have a value %v", totalUnfiltered0)
}
func SleepWithContext(ctx context.Context, d time.Duration, f func()) {
timer := time.NewTimer(d)
select {
case <-ctx.Done():
log.Infof("Timer - Context is closed")
if !timer.Stop() { // To ensure the channel is empty after a call to Stop
<-timer.C
log.Infof("Timer - already stopped")
}
case <-timer.C:
log.Infof("Timer - Tick!!!")
f()
}
}
---
Label Breaks In Go