Link Search Menu Expand Document

Side streams

Table of contents

  1. Side inputs
  2. Side outputs

Side inputs

In addition to the main collection, it’s possible to provide other additional inputs to transformation as side inputs. More information about side inputs you can find in the Beam’s documentation.

To use side inputs it’s needed to create PCollectionWithSideInput<T> using the withSideInputs method of PCollection<T>. This method receives one or multiple PCollectionView as the argument which represents the side inputs.

There is the following methods creating views from collections to use it as a side input:

PCollection<T>

Method Description
asIterableView()
↪️ PCollectionView<Iterable<T>>
Takes the collection as input and produces a view mapping each window to an Iterable of the values in that window.
asListView()
↪️ PCollectionView<List<T>>
Takes the input collection and returns a view mapping each window to a list containing all elements in the window.
asSingletonView()
↪️ PCollectionView<T>
Takes the collection with a single value per window as input and produces a view that returns the value in the main input window when read as a side input.

PCollection<KV<K, V>>

Method Description
asMapView()
↪️ PCollectionView<Map<K, V>>
Takes the input collection and produces a view mapping each window to a map of key-value pairs in that window.
asMultiMapView()
↪️ PCollectionView<Map<K, Iterable<V>>>
Takes the input collection and produces a view mapping each window to its contents as a Map<K, Iterable<V>>.

After that you can use map and flatMap methods over created instance of PCollectionWithSideInput<T>:

Method Description
flatMap(f)
↪️ PCollection<U>
Takes each element with context, produces zero, one, or more elements, and returns a new PCollection<U> with all of them.

Arguments:
· f: (T, ProcessContext) -> Iterable<U> - a function to transform the element (here you can use side inputs from the context)
map(f)
↪️ PCollection<U>
Takes each element with context, transforms it into a new one, and returns a new PCollection<U> with all of them.

Arguments:
· f: (T, ProcessContext) -> U - a function to transform the element (here you can use side inputs from the context)

Side outputs

As well as creating side inputs for the transformation of the input collection you can produce any number of side outputs from it. You should create an instance of PCollectionWithSideOutput<T> from the input collection using the withSideOutputs method and define a set of tags for side outputs. After that you can also use map and flatMap methods:

Method Description
flatMap(f)
↪️ Pair<PCollection<U>, PCollectionTuple>
Takes each element with context, produces zero, one, or more elements, and returns a new PCollection<U> with all of them.

Arguments:
· f: (T, ProcessContext) -> Iterable<U> - a function to transform the element (here you can use the context to send results into a side output by the tag)
map(f)
↪️ Pair<PCollection<U>, PCollectionTuple>
Takes each element with context, transforms it into a new one, and returns a new PCollection<U> with all of them.

Arguments:
· f: (T, ProcessContext) -> U - a function to transform the element (here you can use the context to send results into a side output by the tag)

These methods return a pair of the general output collection and the PCollectionTuple instance, from where you can get side output collections by the tags. Also, there are built-in methods with side outputs:

Method Description
splitBy(p)
↪️ Pair<PCollection<T>, PCollection<T>>
Takes the input collection and produces two: one with all elements matched to the predicate and another with the rest of the elements.

Arguments:
· p: (T) -> Boolean - a function to check the element