In our last blog post here we saw what are Akka stream , how they are initiated, what is Source etc. a very basic introduction. In this post we will go further ahead and check what are some basic operations we can do with stream flow.
Scan Operation
Scan operation can be used to performa user specific operation over the stream of Integers who’s sequence we defined in Source
public Source<Integer, NotUsed> getIntegerRangeSource(){
return Source.range(1, 10);
}
The Scan operation over Source takes 2 arguments first is the initial value of the operation or computation we want to perform and second is function. This function takes 2 arguments first is accumulator which we will carry forward to next operation , second is next element from our stream and function that performs the computation. Who’s result is added to accumulator. Syntax for this is
public <T> Source<T, Mat> scan(final T zero, final Function2<T, Out, T> f)
Here is the Example to perform the addition operation over all the elements from the Source Stream.
Source<BigInteger, NotUsed> addition = source.scan(BigInteger.ZERO, (accumulator, next) -> acc.add(BigInteger.valueOf(next)));
.scan() function takes BigInt Zero as first argument and function tat performs addition of the 2 integers.
How Computation happens here.
The Source stream is from 1 to 10 . The initial value of the Scan Operation is 0 (zero). So on the first function call would have output
0(accumulator) + 1(next) -> 1 (assigned to accumulator)
Since this is stream operation the Next value would be 2 and accumulator value is already 1 so the result would be 3 which is assigned to accumulator. In this way the remaining operations are as follows
1(accumulator) + 2(next) -> 3(assigned to accumulator)
3(accumulator) + 3(next) -> 6(assigned to accumulator)
6(accumulator) + 4(next) -> 10(assigned to accumulator)
10(accumulator) + 5(next) -> 15(assigned to accumulator)
In this way the computation will continue till the Source Stream reached its end at 10.
Of Course as this is Stream operation we will have to to use runWith() to sink the stream , and we can use map() to print our results in sequence.
public void addingStreamValues() { Source<Integer, NotUsed> source = getIntegerRangeSource(); Source<BigInteger, NotUsed> addition = source.scan(BigInteger.ZERO, (accumulator, next) -> accumulator.add(BigInteger.valueOf(next))); addition .map(val -> { System.out.println(val); return val; }) .runWith(Sink.seq(), actorSystem); }
Factorial in Akka Streams Using Scan Operation
As we have seen in previous example the the addition of the numbers is performed using only one initial value which is 0 and all the elements of the stream. Now Instead of doing addition if we perform multiplication that would be factorial operation. Now only difference is instead of keeping initial accumulator value as 0 we will keep it 1 .
public void factorialInStream() {
Source<Integer, NotUsed> source = getIntegerRangeSource();
Source<BigInteger, NotUsed> multiplication = source
.take(5)
.scan(BigInteger.ONE,
(accumulator, next) -> accumulator.multiply(BigInteger.valueOf(next)));
multiplication
.map(val -> {
System.out.println(val);
return val;
})
.runWith(Sink.seq(), actorSystem);
}
Here we have taken only first 5 elements from the Source Stream and perform operations on those only and result is printed.
Zip Operation
Zip operation is used to combine two Streams using Pair. Consider we have 2 Streams one is from Range 1 to 10 and another is of Range 11 to 20 . The Zip operation will perform on Source 1 and Source 2 will result into Pairing Each element from each Stream to another like this.
public void zipOperation(){ Source<Integer, NotUsed> source = getIntegerRangeSource(); Source<Integer, NotUsed> source2 = getIntegerRangeSource2(); source .zip(source2) .map(val -> { System.out.println(val); return val; }) .runWith(Sink.seq(), actorSystem); }
Output of the above code is
Pair(1,11)
Pair(2,12)
Pair(3,13)
Pair(4,14)
Pair(5,15)
Pair(6,16)
Pair(7,17)
Pair(8,18)
Pair(9,19)
Pair(10,20)
The Pair Object is from Akka Actor library , and each element from this Zip operation can be accessed using this Pair object.
One thing to notice here this operation will be performed only till the number of elements from both streams are equal , that means if the source 2 has 15 elements then for remaining 5 elements which does not pair with source 1 will be skipped as source 1 has only 10 elements.
If you want to perform Zip operation even if the stream elements wont match then use ZipAll function like below
public void zipAllOperation(){
Source<Integer, NotUsed> source = getIntegerRangeSource();
Source<Integer, NotUsed> source2 = getIntegerRangeSource2();
source
.zipAll(source2, -1 , 0)
.map(val -> {
val.first();
System.out.println(val);
return val;})
.runWith(Sink.seq(), actorSystem);
}
output will be
Pair(1,11)
Pair(2,12)
Pair(3,13)
Pair(4,14)
Pair(5,15)
Pair(6,16)
Pair(7,17)
Pair(8,18)
Pair(9,19)
Pair(10,20)
Pair(-1,21)
Pair(-1,22)
Pair(-1,23)
Pair(-1,24)
Pair(-1,25)
Here you can see when source 1 does not match with any element it prints as -1 which we have given in
.zipAll(source2, -1 , 0)
ZipWith Operation
ZipWith function is used to combine 2 streams with function that performs operations on this elements.
public void zipWithOperation(){
Source<Integer, NotUsed> source = getIntegerRangeSource();
Source<Integer, NotUsed> source2 = getIntegerRangeSource2();
source
.zipWith(source2, (elem1, elem2) -> elem1*elem2)
.map(val -> {
System.out.println(val);
return val;})
.runWith(Sink.seq(), actorSystem);
}
Here is above code sample we have performed ZipWith using multiplication operation. Please note in this case as well just like Zip function the elements that does not have any corresponding element in another stream wont be called.
Conclusion
In this blog post we saw some more useful operations that we can do using akka streams also we saw one simple use case of calculating Factorial using the Akka Stream. You can check all code form this blog alone here on github.