Skip to content

Latest commit

 

History

History
525 lines (396 loc) · 20.1 KB

streams.livemd

File metadata and controls

525 lines (396 loc) · 20.1 KB

Streams

Mix.install([
  {:youtube, github: "brooklinjazz/youtube"},
  {:hidden_cell, github: "brooklinjazz/hidden_cell"},
  {:tested_cell, github: "brooklinjazz/tested_cell"},
  {:benchee, "~> 1.1"},
  {:utils, path: "#{__DIR__}/../utils"}
])

Navigation

Return Home Report An Issue

Setup

Ensure you type the ea keyboard shortcut to evaluate all Elixir cells before starting. Alternatively you can evaluate the Elixir cells as you read.

Enum Vs Stream

Currently, we typically use the Enum module to work with collections. The Enum module uses eager evaluation. That means any instruction we provide is immediately executed. This requires each element in the enumeration to be stored in memory during execution.

1..10
|> Enum.map(fn each -> each * 2 end)
|> Enum.filter(fn each -> each <= 10 end)
|> Enum.take(4)

Each Enum function stores a copy of the the enumerable it creates and executes sequentially as seen in this slide.

defmodule Boxes do
  def space, do: box("", %{background: "none", border: "none"})
  def spaces(0), do: []
  def spaces(integer), do: Enum.map(1..integer, fn _ -> space() end)
  def green_box(text), do: box(text, %{background: "#D5E8D4", border: "#82B366"})
  def green_boxes(range), do: Enum.map(range, &Boxes.green_box/1)
  def grey_box(text), do: box(text, %{background: "#F5F5F5", border: "#666666"})
  def grey_boxes(range), do: Enum.map(range, &Boxes.grey_box/1)

  def box(text, style) do
    "<div style=\"margin: 0 10px; font-size: 24px; font-weight: bold; height: 50px; width: 50px; background-color: #{style.background}; border: #{style.border} solid 1px; display: flex;  align-items: center; justify-content: center;\">#{text}</div>"
  end

  def row(title, items) do
    "<div style=\"height: 80px; display: flex; width: 100%; align-items: center;\">
    <p style=\"font-weight: bold; font-size: 24px; margin: 0; width: 10%;\">#{title}</p>
    <div style=\"display: flex; width: 90%;\">#{items}</div>
  </div>"
  end

  def sequence do
    [
      "
  #{row("1..10", Enum.map(1..10, &Boxes.green_box/1))}
  #{row("map", [])}
  #{row("filter", [])}
  #{row("take", [])}
",
      "
  #{row("1..10", Enum.map(1..10, &Boxes.green_box/1))}
  #{row("map", Enum.map(2..20//2, &Boxes.green_box/1))}
  #{row("filter", [])}
  #{row("take", [])}
",
      "
  #{row("1..10", Enum.map(1..10, &Boxes.green_box/1))}
  #{row("map", Enum.map(2..20//2, &Boxes.green_box/1))}
  #{row("filter", Enum.map(2..10//2, &Boxes.green_box/1) ++ Enum.map(12..20//2, &Boxes.grey_box/1))}
  #{row("take", [])}
",
      "
  #{row("1..10", Enum.map(1..10, &Boxes.green_box/1))}
  #{row("map", Enum.map(2..20//2, &Boxes.green_box/1))}
  #{row("filter", Enum.map(2..10//2, &Boxes.green_box/1) ++ Enum.map(12..20//2, &Boxes.grey_box/1))}
  #{row("take", Enum.map(2..8//2, &Boxes.green_box/1) ++ Enum.map(10..20//2, fn _ -> Boxes.space() end))}
  "
    ]
  end
end

Kino.animate(2000, 0, fn i ->
  md = Kino.Markdown.new(Enum.at(Boxes.sequence(), i))
  {:cont, md, rem(i + 1, length(Boxes.sequence()))}
end)

While the operation above is a bit contrived for the sake of simplifying the example, you may notice that by the end of the execution the Enum.take/2 function only needs four elements.

It's a waste of resources to execute every operation on every element. It would be far more performant if we could instead build up a series of functions and run them one by one only when needed.

Kino.animate(500, {0, 0}, fn {current_row, current_column} ->
  current_element = current_column + 1
  range = Boxes.green_boxes(1..current_element) ++ Boxes.grey_boxes((current_element + 1)..10)

  maybe_display = fn expected_row, display ->
    if current_row === expected_row, do: display, else: []
  end

  indent = Boxes.spaces(current_column)

  md =
    Kino.Markdown.new("""
    #{Boxes.row("1..10", range)}
    #{Boxes.row("map", maybe_display.(1, indent ++ [Boxes.green_box(current_element * 2)]))}
    #{Boxes.row("filter", maybe_display.(2, indent ++ [Boxes.green_box(current_element * 2)]))}
    #{Boxes.row("take", Boxes.green_boxes(2..(current_element * 2 - 2)//2) ++ maybe_display.(3, [Boxes.green_box(current_element * 2)]))}
    """)

  next_row = rem(current_row + 1, 4)
  next_column = rem((current_row === 3 && current_column + 1) || current_column, 4)
  {:cont, md, {next_row, next_column}}
end)

We can't accomplish this with the Enum module, however we can with Stream. Streams are composable, lazy enumerables. Lazy means they execute on each element in the stream one by one. Composable means that we build up functions to execute on each element in the stream.

Notice that the following does not execute, it instead builds up future work to execute and returns a Stream data structure with a list of functions to apply.

1..10
|> Stream.map(fn each -> each * 2 end)
|> Stream.filter(fn each -> each <= 10 end)
|> Stream.take(4)

The Stream stores the enumerable, and the list of functions to call on it. The Stream will only evaluate when it's called with any eager function from the Enum module. For example,

1..10
|> Stream.map(fn each -> each * 2 end)
|> Stream.filter(fn each -> each <= 10 end)
|> Stream.take(4)
|> Enum.to_list()

We can see this more clearly if we use IO.puts/1 to trigger a side effect. Notice we're not printing anything.

Stream.each(1..10, &IO.puts/1)

IO.puts/1 is only called when we run the stream with Stream.run/1 or when we use an Enum function.

1..10
|> Stream.each(&IO.puts/1)
|> Stream.run()
1..10
|> Stream.each(&IO.puts/1)
|> Enum.to_list()

As the collection grows, the performance benefits become more clear. Let's compare the same functionality from the Stream module and the Enum module.

Performance Comparison

Benchee.run(
  %{
    "enum" => fn ->
      1..200
      |> Enum.map(fn each -> each * 2 end)
      |> Enum.filter(fn each -> each <= 10 end)
      |> Enum.take(4)
    end,
    "stream" => fn ->
      1..200
      |> Stream.map(fn each -> each * 2 end)
      |> Stream.filter(fn each -> each <= 10 end)
      |> Stream.take(4)
      |> Enum.to_list()
    end
  },
  memory_time: 2
)

While the exact results may vary, you should notice that the stream runs faster and consumes less memory.

Name             ips        average  deviation         median         99th %
stream      172.86 K        5.79 μs   ±418.69%           4 μs       18.50 μs
enum          9.64 K      103.76 μs    ±21.83%       98.70 μs      190.13 μs

Comparison: 
stream      172.86 K
enum          9.64 K - 17.94x slower +97.97 μs

Memory usage statistics:

Name      Memory usage
stream         8.54 KB
enum         181.20 KB - 21.22x memory usage +172.66 KB

**All measurements for memory usage were the same**

Your Turn

In the Elixir cell below, try refactoring this code using the Stream module.

1..100
|> Enum.map(fn each -> div(each, 2))
|> Enum.map(fn each -> each * 3)
|> Enum.filter(fn each -> rem(each, 2) === 0)

Stream Gotchas

While the Stream module can improve performance, it's worth mentioning that it's not a silver bullet. For example, a single Stream.map is not faster than Enum.map.

{stream_time, _result} = :timer.tc(fn -> Stream.map(1..1000, &(&1 * 2)) |> Enum.to_list() end)
{enum_time, _result} = :timer.tc(fn -> Enum.map(1..1000, &(&1 * 2)) end)

%{stream_time: stream_time, enum_time: enum_time}

That's because we only gain performance benefits when sequential execution on each element in the collection reduces the amount of computation and/or memory necessary.

Notice how when we string multiple maps together, the Enum module is still just as fast or even faster than the Stream module.

stream_fn = fn ->
  1..1000
  |> Stream.map(&(&1 * 2))
  |> Stream.map(&(&1 * 2))
  |> Stream.map(&(&1 * 2))
  |> Enum.to_list()
end

enum_fn = fn -> 1..1000 |> Enum.map(&(&1 * 2)) |> Enum.map(&(&1 * 2)) |> Enum.map(&(&1 * 2)) end

{stream_time, _result} = :timer.tc(stream_fn)
{enum_time, _result} = :timer.tc(enum_fn)

%{stream_time: stream_time, enum_time: enum_time}

In terms of improving performance, Streams will generally provide the greatest benefits when operations reduce the number of elements in the enumerable. Whenever optimizing, use benchmarks to verify your assumptions rather than relying on theoretical performance gains.

Stream

As you've seen, Streams generate elements one by one and can improve performance. They are often beneficial when working with large data as they only consume the amount of data necessary for each operation. On the other hand, Enum will create an entire copy of the collection for each intermediate step.

flowchart
  subgraph Enum
    direction LR
    E1[Enum] --> EO1[Operation] --> L1[Collection] --> EO2[Operation] --> L2[Collection]
  end
  subgraph Stream
    S1[Stream] --> SE1[Enum] --> SO1[Operation] --> SR[Collection]
  end
Loading

Stream provides many equivalent functions that we'll find on the Enum module. You've already seen map/2, filter/2, and take/2.

We also have access to equivalent functions such as with_index/1, drop/2, chunk_every/2, and each/2. We've used pattern matching here to demonstrate each has similar behavior.

[{"a", 0}, {"b", 1}, {"c", 2}] = Enum.with_index(["a", "b", "c"])
[{"a", 0}, {"b", 1}, {"c", 2}] = Stream.with_index(["a", "b", "c"]) |> Enum.to_list()

["b", "c"] = Enum.drop(["a", "b", "c"], 1)
["b", "c"] = Stream.drop(["a", "b", "c"], 1) |> Enum.to_list()

[[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]] = Enum.chunk_every(1..10, 2)
[[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]] = Stream.chunk_every(1..10, 2) |> Enum.to_list()

Streams As Generators

However, Streams are more than a performant alternative to Enum. Streams are also used to generate a (potentially infinite) amount of data.

Streams only execute necessary computation, so we can have a theoretically infinite stream such as 1,2,3,1,2,3,1,2,3.... continue forever. However, we'll only ever generate the necessary number of elements.

flowchart LR
1 --> 2 --> 3 --> 1
Loading

Here's that shown using Stream.cycle/2, which cycles through an enumerable.

Stream.cycle([1, 2, 3])
|> Enum.take(10)

Lazy Evaluation

We've been using lazy evaluation for a while now! Ranges only create each element in the list when necessary, so we can create a massive range without any noticeable performance impact.

When we use that massive range with an Enum operation, then we pay the computational price!

{creation_time, _result} = :timer.tc(fn -> 1..10_000_000 end)
{execution_time, _result} = :timer.tc(fn -> Enum.to_list(1..10_000_000) end)

%{creation_time: creation_time, execution_time: execution_time}

Stream.iterate/2

We can use Stream.iterate/2 to iterate over an accumulator. For example, we could start at 0, and then add 1 to it for every iteration.

flowchart LR
  1 --> 2 --> 3 --> 4 --> 5
  n1[n] --> n2[n + 1] --> n3[n + 2] --> n4[n + 3] --> n5[n + 4]
Loading
Stream.iterate(0, fn accumulator -> accumulator + 1 end) |> Enum.take(5)

The initial value of the accumulator will be 0. The next value of the accumulator will be accumulator + 1 which is 0 + 1. This pattern continues to create [1, 2, 3, 4, 5].

flowchart LR
Accumulator --> Function --> A2[Next Accumulator] --> Function
Loading

You could build more complicated sequences this way too. For example, there is such a thing as the Collatz Conjecture. Also known as the simplest math problem that no one can solve.

YouTube.new("https://www.youtube.com/watch?v=094y1Z2wpJg")

The Collatz Conjecture states that if you take any number and apply the following two rules:

  1. If the number is odd, multiply the number by three and add 1
  2. If the number is even, divide by two.

Eventually, no matter the starting number, the sequence should infinitely repeat 4, 2, 1.

flowchart LR
integer --> even
integer --> odd
  even --> a[x / 2]
  odd --> b[3x + 1]
  a --> 4
  b --> 4
  4 --> 2 --> 1
Loading

For example, let's take the number 5 to see how it quickly repeats 4, 2, 1

flowchart LR
  5 -- 5 * 3 + 1 --> 16 --16 / 2--> 8 -- 8 / 2 --> 4 -- 4 / 2--> 2 -- 2 / 2 --> 1
  -- 1 * 3 + 1--> a[4] --4 / 2--> b[2] --2 / 2--> c[1]
Loading

Your Turn

Use Stream.iterate to implement the collatz conjecture given a starting_integer such as 20 as the initial accumulator.

  1. If the number is odd, multiply the number by three and add 1
  2. If the number is even, divide by two.

Use Enum.take_while/2 to generate elements from the stream until it returns 4 to prove it eventually repeats 4, 2, 1, forever. Try changing your starting_integer to provide this for larger numbers.

Example Solution
starting_integer = 20

Stream.iterate(starting_integer, fn int ->
  if rem(int, 2) == 0 do
    div(int, 2)
  else
    int * 3 + 1
  end
end)
|> Enum.take_while(fn integer -> integer != 4 end)
starting_integer = 20

Stream.iterate(starting_integer, fn int ->
  if rem(int, 2) == 0 do
    div(int, 2)
  else
    int * 3 + 1
  end
end)
|> Enum.take_while(fn integer -> integer != 4 end)

Stream.unfold/2

While Stream.iterate/2 treats the accumulator as the value, unfold/2 separates the accumulator and the return value. So you can accumulate, and then generate a separate value from the accumulator.

flowchart LR
  Accumulator --> Function --> Value
  Function --> Accumulator
Loading
Stream.unfold(5, fn accumulator ->
  value = "value: #{accumulator}"
  next_accumulator = accumulator + 1
  {value, next_accumulator}
end)
|> Enum.take(5)

You can use Stream.unfold/2 with Enum.to_list/2 so long as you specify when the stream should end. Otherwise, the stream would be infinite and run forever. To specify when the stream should end, you return nil.

You could do this with a separate function head, or some other control flow.

Stream.unfold(0, fn
  10 ->
    nil

  accumulator ->
    value = "value: #{accumulator}"
    next_accumulator = accumulator + 1
    {value, next_accumulator}
end)
|> Enum.to_list()

Your Turn

Use Stream.unfold/2 to generate a list of cubed numbers. Use Enum.take/2 to take the first 20 cubed numbers.

I.e. $1^3, 2^3, 3^3, 4^3, ...$ which would be [1, 8, 27 , 64] and so on.

Example Solution
1
|> Stream.unfold(fn int ->
  {int ** 3, int + 1}
end)
|> Enum.take(20)

Commit Your Progress

Run the following in your command line from the beta_curriculum folder to track and save your progress in a Git commit.

$ git add .
$ git commit -m "finish streams section"

Up Next

Previous Next
Games Benchmarking Stream Drills