Link Search Menu Expand Document

Kio Context

Table of contents

  1. Overview
  2. Arguments
  3. Creating a pipeline
  4. Running the pipeline

Overview

Kio context is a main entry point for the application. It allows us to define and execute pipelines. In general, the Kio context wraps Beam’s Pipeline class in itself.

There are several ways to create the context:

  1. Context with a default configuration:
    val kio = Kio.create()
    
  2. With the configuration from arguments:
    val kio = Kio.fromArguments(args)
    
  3. From an existing Pipeline instance:
    val kio = Kio.fromPipeline(pipeline)
    

Arguments

In case when you create the context from arguments, you have access to additional ones (not related to the defined runner) using the arguments property of the context. There are the following methods to work with these additional arguments:

Method                                                        Description
required(key): String
operator get(key): String
Get a value for required argument as string.
If there are no arguments with the defined key, it will throw the IllegalArgumentException exception.

Arguments:
· key: String - the key of the argument to get the value
optinal(key): String? Get a value for defined argument as string or null if there are no arguments with the defined key.

Arguments:
· key: String - the key of the argument to get the value
int(key): Int
long(key): Long
float(key): Float
double(key): Double
bool(key): Boolean
Methods to get a typed value for the argument with the defined key. If the value cannot be cast to the defined type, it will throw the IllegalArgumentException exception.

Arguments:
· key: String - the key of the argument to get the value
int(key, default): Int
long(key, default): Long
float(key, default): Float
double(key, default): Double
bool(key, default): Boolean
Methods to get a typed value for the argument with the defined key. If there are no arguments with the defined key, the default value will be used. If the value cannot be cast to the defined type, it will throw the IllegalArgumentException exception.

Arguments:
· key: String - the key of the argument to get the value
· default: () -> T - lambda to get the default value

Creating a pipeline

A pipeline can be created using readers (see IO basics and Connectors).

Also, there is the parallelize method to transform any collection to the pipeline:

val collection: PCollection<Int> = kio.parallelize(listOf(1, 2, 3, 4, 5))

This method can receive any implementations of the Iterable interface as the argument value.

Running the pipeline

There is the execute method to run the defined pipeline:

kio.execute()

It returns an ExecutionContext instance with the following additional method:

Method Description
getState(): State Getting current state of executed pipeline.
isCompleted(): Boolean Returns true if the pipeline is in any terminal state, and false otherwise.
waitUntilFinish(duration) Makes the application wait until the pipeline is finished.

Arguments:
· duration: Duration (optional) - time period after which the job will be interrupted
waitUntilDone(duration) Makes the application wait until the pipeline is done.
If the task is completed with a state other than DONE, it will throw an exception.

Arguments:
· duration: Duration (optional) - time period after which the job will be interrupted