Skip to content

jjeffcaii/reactor-go

Folders and files

NameName
Last commit message
Last commit date

Latest commit

5248ec1 ยท Jan 20, 2025
Jun 1, 2022
Nov 11, 2020
Jun 1, 2022
Jul 19, 2022
Nov 22, 2020
Jun 1, 2022
Jan 20, 2025
Mar 14, 2022
Dec 13, 2020
Sep 22, 2020
Mar 7, 2019
Mar 14, 2022
Jun 27, 2022
Dec 13, 2020
Dec 13, 2020
Jun 1, 2022
Jun 1, 2022
Dec 7, 2020
Nov 22, 2020
Nov 8, 2020
Nov 8, 2020
Nov 22, 2020
Dec 7, 2020
Dec 13, 2020
Jun 1, 2022

Repository files navigation

reactor-go ๐Ÿš€๐Ÿš€๐Ÿš€

GitHub Workflow Status codecov GoDoc Go Report Card License GitHub Release

A golang implementation for reactive-streams.

Install

go get -u github.com/jjeffcaii/reactor-go

Example

Mono

package mono_test

import (
	"context"
	"fmt"

	"github.com/jjeffcaii/reactor-go"
	"github.com/jjeffcaii/reactor-go/mono"
)

func Example() {
	gen := func(ctx context.Context, sink mono.Sink) {
		sink.Success("World")
	}
	mono.
		Create(gen).
		Map(func(input reactor.Any) (output reactor.Any, err error) {
			output = "Hello " + input.(string) + "!"
			return
		}).
		DoOnNext(func(v reactor.Any) error {
			fmt.Println(v)
			return nil
		}).
		Subscribe(context.Background())
}

// Should print
// Hello World!

Flux

package flux_test

import (
	"context"
	"fmt"

	"github.com/jjeffcaii/reactor-go"
	"github.com/jjeffcaii/reactor-go/flux"
	"github.com/jjeffcaii/reactor-go/scheduler"
)

func Example() {
	gen := func(ctx context.Context, sink flux.Sink) {
		for i := 0; i < 10; i++ {
			v := i
			sink.Next(v)
		}
		sink.Complete()
	}
	done := make(chan struct{})

	var su reactor.Subscription
	flux.Create(gen).
		Filter(func(i interface{}) bool {
			return i.(int)%2 == 0
		}).
		Map(func(input reactor.Any) (output reactor.Any, err error) {
			output = fmt.Sprintf("#HELLO_%04d", input.(int))
			return
		}).
		SubscribeOn(scheduler.Elastic()).
		Subscribe(context.Background(),
			reactor.OnSubscribe(func(s reactor.Subscription) {
				su = s
				s.Request(1)
			}),
			reactor.OnNext(func(v reactor.Any) error {
				fmt.Println("next:", v)
				su.Request(1)
				return nil
			}),
			reactor.OnComplete(func() {
				close(done)
			}),
		)
	<-done
}
// Should print:
// next: #HELLO_0000
// next: #HELLO_0002
// next: #HELLO_0004
// next: #HELLO_0006
// next: #HELLO_0008