Kio Context
Table of contents
Overview
Kio context is a main entry point for the application. It allows us to define and execute pipelines. In general, the Kio context wraps Beam’s Pipeline class in itself.
There are several ways to create the context:
- Context with a default configuration:
val kio = Kio.create() - With the configuration from arguments:
val kio = Kio.fromArguments(args) - From an existing
Pipelineinstance:val kio = Kio.fromPipeline(pipeline)
Arguments
In case when you create the context from arguments, you have access to additional ones (not related to the defined runner) using the arguments property of the context. There are the following methods to work with these additional arguments:
| Method | Description |
|---|---|
required(key): Stringoperator get(key): String | Get a value for required argument as string. If there are no arguments with the defined key, it will throw the IllegalArgumentException exception.Arguments: · key: String - the key of the argument to get the value |
optinal(key): String? | Get a value for defined argument as string or null if there are no arguments with the defined key. Arguments: · key: String - the key of the argument to get the value |
int(key): Intlong(key): Longfloat(key): Floatdouble(key): Doublebool(key): Boolean | Methods to get a typed value for the argument with the defined key. If the value cannot be cast to the defined type, it will throw the IllegalArgumentException exception.Arguments: · key: String - the key of the argument to get the value |
int(key, default): Intlong(key, default): Longfloat(key, default): Floatdouble(key, default): Doublebool(key, default): Boolean | Methods to get a typed value for the argument with the defined key. If there are no arguments with the defined key, the default value will be used. If the value cannot be cast to the defined type, it will throw the IllegalArgumentException exception.Arguments: · key: String - the key of the argument to get the value· default: () -> T - lambda to get the default value |
Creating a pipeline
A pipeline can be created using readers (see IO basics and Connectors).
Also, there is the parallelize method to transform any collection to the pipeline:
val collection: PCollection<Int> = kio.parallelize(listOf(1, 2, 3, 4, 5))
This method can receive any implementations of the Iterable interface as the argument value.
Running the pipeline
There is the execute method to run the defined pipeline:
kio.execute()
It returns an ExecutionContext instance with the following additional method:
| Method | Description |
|---|---|
getState(): State | Getting current state of executed pipeline. |
isCompleted(): Boolean | Returns true if the pipeline is in any terminal state, and false otherwise. |
waitUntilFinish(duration) | Makes the application wait until the pipeline is finished. Arguments: · duration: Duration (optional) - time period after which the job will be interrupted |
waitUntilDone(duration) | Makes the application wait until the pipeline is done. If the task is completed with a state other than DONE, it will throw an exception.Arguments: · duration: Duration (optional) - time period after which the job will be interrupted |