Streams

What is a Stream?

A stream is a sequence of objects that supports various methods which can be pipelined to produce the desired result A stream is not a data structure instead it takes input from the Collections, Arrays or I/O channels

What is a Stream?

Processing a stream

  1. Create a stream
  2. Specify intermediate(/transformation) operations (e.g. filter, map)
  3. Apply terminal operation to produce a result (e.g. toSet, reduce)

Stream

You can only apply terminal operation once.

Stream Creation

Array strategy #1

/** @param stringArray source array to create stream
 *  @return stream representation of this array */
public Stream<String> fromArray1(String[] stringArray) {
    Stream<String> stringStream = Arrays.stream(stringArray);
    return stringStream;
}

Stream Creation

Array strategy #2

/** @param stringArray source array to create stream
 *  @return stream representation of this array */
public Stream<String> fromArray2(String[] stringArray) {
    Stream<String> stringStream = Stream.of(stringArray);
    return stringStream;
}

Stream Creation

From varargs

/** @return stream representation of this array */
public Stream<String> fromVarargs() {
    Stream<String> stringStream1 = Stream.of("The");
    Stream<String> stringStream2 = Stream.of("The", "Quick", "Brown");

    return stringStream2;
}

Stream Creation

List

/** @param stringList source list to create stream
*   @return stream representation of this List */
public Stream<String> fromList(List<String> stringList) {
    Stream<String> stringStream = stringList.stream();
    return stringStream;
}

Stream Creation

.empty

public Stream<String> fromEmpty() {
	return Stream.empty();
}

Extracting substreams

public Stream<String> getSubStream(String[] stringArray, int startIndex, int endIndex) {
	return Arrays.stream(stringArray, startIndex, endIndex);
}

public Stream<String> getSubStream(String[] stringArray, int endIndex) {
	return Arrays.stream(stringArray).limit(endIndex);
}

Combining substreams

Stream.concat concatenates two streams

public Stream<String> combineStreams(String[] array1, String[] array2) {
    Stream<String> stream1 = Arrays.stream(array1);
    Stream<String> stream2 = Arrays.stream(array2);

    return Stream.concat(stream1, stream2);
}

Method References

Method References ::

Instance method of a class

Stream<String> words = Stream.of("The", "Quick", "Brown", "Fox");
words.forEach(System.out::print);

Method References ::

.generate - creates an infinite stream by calling a static function

/** @return endless stream */
public Stream<Double> fromGenerator() {
    Stream<Double> randoms = Stream.generate(Math::random);
    return randoms;
}

Method References ::

.generate - creates an infinite stream by calling a instance function

/** @return endless stream */
public Stream<Double> fromGenerator() {
    Stream<Double> echos = Stream.generate(this::echo);
    return echos;
}

public String echo() {
    return "Echo";
}


Functional Interface

Instance method of a class

class SquareMaker {
     public double square(double num){
        return Math.pow(num , 2);
    }
}

class DemoSquareMaker {
	public static void main(String[] args) {
		SquareMaker squareMaker = new SquareMaker();
		Function<Double, Double> squareMethod = squareMaker::square;
		double ans = squareMethod.apply(23.0);
	}
}

Transformations

Filter, Map, and FlatMap

Transformations - Filter

filter yields a new stream with
elements that match the specified criteria

public Stream<String> getStringsLongerThan(String[] arr, int length) {
    return Arrays
            .stream(stringArray)
            .filter(this::stringIsLongerThan4);
}

public boolean stringIsLongerThan4(String str) {
    return str.length() > 4;
}

Map

map takes an argument of a function, applies the function to each element, and returns the respective stream

public List<Integer> mapArrayLengths(String[] arr) {
    return Arrays
            .stream(arr)
            .map(this::length)
            .collect(Collectors.toList());
}

public Integer length(String x) {
    return x.length();
}

FlatMap

flatMap prevents nested stream structures like Stream<Stream<String>> to Stream<String>

public Stream<String> letters(String someWord) {
    String[] characters = someWord.split("");
    return Stream.of(characters);
}

public Stream<String> wordsFlatMap(String[] stringArray) {
    return Arrays.stream(stringArray).flatMap(this::letters);
}

Distinct

distinct yields a new stream with duplicates removed

public Stream<String> uniqueWords(String... words) {
	return Stream.of(words).distinct();
}

Sorted

.sorted will call the compareTo method on the object to sort Object must implements Comparable

public Stream<String> sort(String[] words) {
    return Arrays.stream(words).sorted();
}

or you must supply a Comparator

public Stream<String> sort(String[] words) {
    return Arrays.stream(words).sorted(this::compareStringsLength);
}

public int compareStringsLength(String str1, String str2) {
    return str1.length() - str2.length();
}

Simple Reductions

Optional<T>

Reductions

.count

/** @return number of elements in an array using a stream */
public long getCount(String[] stringArray) {
    return Arrays.stream(stringArray).count();
}

Reductions

.min, .max

/** @return longest String object in an array using a stream */
public Optional<String> getMax(String[] stringArray) {
    return Arrays.stream(stringArray).max(String::compareToIgnoreCase);
}

/** @return longest String object in an array using a stream */
public Optional<String> getMin(String[] stringArray) {
    return Arrays.stream(stringArray).min(String::compareToIgnoreCase);
}

Reductions

.findFirst, .findAny

/** @return get first String from an array using a stream */
public Optional<String> getFirst(String[] stringArray) {
    return Arrays.stream(stringArray).findFirst();
}

/** @return a random string in an array using a stream */
public Optional<String> getRandom(String[] stringArray) {
    return Arrays.stream(stringArray).findAny();
}

Reductions

public Integer sum(Integer[] numbers) {
    Optional<Integer> result = Stream.of(numbers).reduce(Integer::sum);
    Integer sum = result.get();
    return sum;
}

Reductions

public Integer sum(Integer[] numbers) {
  Integer sum = Stream.of(numbers).reduce(10, Integer::sum);
  return sum;
}

Collecting Results

Collecting Results

Stream<String> words = Stream.of("The", "Quick", "Brown", "Fox");
String[] array = words.toArray(String[]::new);

Collecting Results

.collect() to a List

Stream<String> words = Stream.of("The", "Quick", "Brown", "Fox");
List<String> list = words.collect(Collectors.toList());

Collecting Results

.collect() to a Set

Stream<String> words = Stream.of("The", "Quick", "Brown", "Fox");
Set<String> list = words.collect(Collectors.toSet());

Collecting Results

.collect() to a Map

Stream<String> words = Stream.of("The", "Quick", "Brown", "Fox");
Map<Integer, String> map = words.collect(Collectors.toMap(String::hashCode, String::toString));

Grouping and Partitioning

Grouping

.groupingBy() groups values with the same characteristic

public Map<String, List<Locale>> groupingByDemo() {
    Stream<Locale> locales = LocaleFactory.createLocaleStream(999);
    return locales.collect(Collectors.groupingBy(Locale::getCountry));
}

Partitioning

.partitioningBy() yields a Map that contains two groups, one for true values and another for false values.

public Map<Boolean, List<String>> partitionedStream() {
    return Stream
            .of("The", "Quick", "Brown", "Fox")
            .collect(Collectors.partitioningBy(this::lengthIsGreaterThan4));
}

public boolean lengthIsGreaterThan4(String x) {
    return x.length() > 4;
}

Relevant Functional Interfaces

Downstream Collectors

class Demo {
	public Map<String, Set<Locale>> demoDownstreamCollectors1() {
	    Stream<Locale> locales = LocaleFactory.createLocaleStream();
	    Map<String, Set<Locale>> countryToLocaleSet = locales.collect(
	            groupingBy(Locale::getCountry, Collectors.toSet()));

	    return countryToLocaleSet;
	}
}

Downstream Collectors

class Demo {
    public Map<String, Long> demoDownstreamCollectors2() {
        Stream<Locale> locales = LocaleFactory.createLocaleStream();
        Map<String, Long> countryToLocaleSet = locales.collect(
                groupingBy(Locale::getCountry, counting()));

        return countryToLocaleSet;
    }
}

Primitive Type Streams

Populate IntStream

class PrimitiveStreams {
    public IntStream demoOf() {
        IntStream intStream = IntStream.of(1, 3, 5, 8, 13);
        return intStream;
    }
}

Generate IntStream

class PrimitiveStreams {
    public DoubleStream demoGenerate() {
        DoubleStream doubles = DoubleStream.generate(Math::random);
    }
}

Create exclusive range IntStream

class PrimitiveStreams {
    /** upper bound is excluded
     *  @param min value to generate
     *  @param max value to generate
     *  @return range of numbers betwen min and max */
    public IntStream demoRange(int min, int max) {
        return IntStream.range(min, max);
    }
}

Create inclusive range IntStream

class PrimitiveStreams {
    /** upper bound is included
     *  @param min value to generate
     *  @param max value to generate
     *  @return range of numbers betwen min and max */
    public IntStream demoRange(int min, int max) {
        return IntStream.rangeClosed(min, max);
    }
}

Primitive to Object stream

public class PrimitiveStreams {
    public Stream<Integer> demoBoxStream() {
        return IntStream.of(0, 1, 2).boxed();
    }
}

Composing Optional Value Functions
With .flatMap

Chaining Method Calls

Parallel Streams

Improper usage

class Demo {
  public void demo(List<String> words) {
      int[] shortWords = new int[12];
      words.parallelStream().forEach(s -> {
          if (s.length() < 12) {
              shortWords[s.length()]++;
          }});
      // Error - race condition!
      System.out.println(Arrays.toString(shortWords));
  }
}

Using parallel stream

class Demo {
	public Map<Integer, Long> wordCountMap(Stream<String> words) {
		return words
            .paralellStream()
			.filter(s -> s.length() < 10)
			.collect(groupingBy(String::length, counting()));
	}
}