This R package is a wrapper around and an interface to the Amazon Kinesis Client Library (KCL) MultiLangDaemon, which is part of the Amazon KCL for Java. This Java-based daemon takes care of communicating with the Kinesis API (to retrieve status of streams, shards and eg to retrieve records from those) and also handles a bunch of other useful things, like checkpointing using Amazon DynamoDB – so that the R developer can actually concentrate on the stream processing algorithm.
A minimal stream processing script written in R looks something like:
::kinesis_consumer(processRecords = function(records) {
AWR.Kinesislog_info(jsonlite::toJSON(records))
}
This R script, executed by the MultiLangDaemon, reads records from
the Kinesis stream and logs those as JSON in the application log, which
by default is a temporary file. Note: it’s important not to write
anything to stdout
, as stdin
and
stdout
is used by the package internals to communicate with
the MultiLangDaemon. But as the package is already imports and
integrates the logger
R package, it’s very convenient to
use the log
functions for app logging with various log
levels.
Let’s see a more complex stream processing app:
::kinesis_consumer(
AWR.Kinesisinitialize = function()
log_info('Loading some data'),
processRecords = function(records)
log_info('Received some records from Kinesis'),
shutdown = function()
log_info('Bye'),
updater = list(
list(1, function()
log_info('Updating some data every minute')),
list(1/60, function()
log_info('This is a high frequency updater call'))))
This application takes multiple (anonymous) functions. Besides the
processRecords
argument, which we used in the above
application to define a function to process the records, we also have an
init, a shutdown and two updater functions. The initialize
and shutdown
calls are trivial: these functions are run
when the applications starts and when it stops, eg when there are no
further records to be read from a shard due to a shard merge
operation.
The updater
part starts a timer in the background and
executes the defined functions at the given frequency (1 minute and 1
second in the above example) before the processRecords
calls.
So this application will log * Loading some data
on app
start, * Received some records from Kinesis
every time it
reads from Kinesis, * This is a high frequency updater call
(almost) every second after a process records call, *
Updating some data every minute
around once a minute, *
Bye
when the app stops.
Use the initialize
function to load/cache some data for
the processRecords
calls, then use the updater
functions to refresh your cached data on a regular basis. To store
credentials to databases, APIs etc, use the botor R package to interact
with the AWS Key Management Service.
The R script has to be an executable, so add the executable bit
(chmod +x
) and also set a hashbang (for eg
Rscript
or use littler). Then
define a configuration file for the MultiLangDaemon, for example the
content of app.properties
could be as simple as:
executableName = ./demo_app.R
streamName = demo_stream
applicationName = demo_app
This config file will look for a demo_stream
Kinesis
stream in the default (US East) region, start reading the oldest
available record (via TRIM_HORIZON
), and run the
demo_app.R
script to process the records, using to the
demo_app
DynamodDB table for checkpointing. There are quite
many other useful settings as well, see the example
file of the Python client for more details.
Running the MultiLangDaemon with the above defined configuration file
is easy, as the required jar
files are bundled with the
AWR
and AWR.Kinesis
packages. So first,
identify where the jar
files were installed:
sapply(c('AWR', 'AWR.Kinesis'), function(pkg) system.file('java', package = pkg))
This returns two folders that you should pass as custom classpaths to
java
, for example:
/usr/bin/java -cp `Rscript --vanilla -e "cat(paste(c(sapply(c('AWR', 'AWR.Kinesis'), function(pkg) file.path(system.file('java', package = pkg), '*')), './'), collapse = ':'))"` com.amazonaws.services.kinesis.multilang.MultiLangDaemon ./app.properties
Please note that you need AWS access to both Kinesis and DynamoDB to get the above examples working.
Again, this is just a wrapper around the MultiLangDaemon, so the related documentation will be extremely useful if you get stuck: * AWS introduction into Kinesis * AWS docs on using the KCL * Java Kinesis Client * Python Kinesis Client