Kio Context
Table of contents
Overview
Kio context is a main entry point for the application. It allows us to define and execute pipelines. In general, the Kio context wraps Beam’s Pipeline
class in itself.
There are several ways to create the context:
- Context with a default configuration:
val kio = Kio.create()
- With the configuration from arguments:
val kio = Kio.fromArguments(args)
- From an existing
Pipeline
instance:val kio = Kio.fromPipeline(pipeline)
Arguments
In case when you create the context from arguments, you have access to additional ones (not related to the defined runner) using the arguments
property of the context. There are the following methods to work with these additional arguments:
Method | Description |
---|---|
required(key): String operator get(key): String | Get a value for required argument as string. If there are no arguments with the defined key, it will throw the IllegalArgumentException exception.Arguments: · key: String - the key of the argument to get the value |
optinal(key): String? | Get a value for defined argument as string or null if there are no arguments with the defined key. Arguments: · key: String - the key of the argument to get the value |
int(key): Int long(key): Long float(key): Float double(key): Double bool(key): Boolean | Methods to get a typed value for the argument with the defined key. If the value cannot be cast to the defined type, it will throw the IllegalArgumentException exception.Arguments: · key: String - the key of the argument to get the value |
int(key, default): Int long(key, default): Long float(key, default): Float double(key, default): Double bool(key, default): Boolean | Methods to get a typed value for the argument with the defined key. If there are no arguments with the defined key, the default value will be used. If the value cannot be cast to the defined type, it will throw the IllegalArgumentException exception.Arguments: · key: String - the key of the argument to get the value· default: () -> T - lambda to get the default value |
Creating a pipeline
A pipeline can be created using readers (see IO basics and Connectors).
Also, there is the parallelize
method to transform any collection to the pipeline:
val collection: PCollection<Int> = kio.parallelize(listOf(1, 2, 3, 4, 5))
This method can receive any implementations of the Iterable
interface as the argument value.
Running the pipeline
There is the execute
method to run the defined pipeline:
kio.execute()
It returns an ExecutionContext
instance with the following additional method:
Method | Description |
---|---|
getState(): State | Getting current state of executed pipeline. |
isCompleted(): Boolean | Returns true if the pipeline is in any terminal state, and false otherwise. |
waitUntilFinish(duration) | Makes the application wait until the pipeline is finished. Arguments: · duration: Duration (optional) - time period after which the job will be interrupted |
waitUntilDone(duration) | Makes the application wait until the pipeline is done. If the task is completed with a state other than DONE , it will throw an exception.Arguments: · duration: Duration (optional) - time period after which the job will be interrupted |