Hourly Pipeline
This pipeline is an example of a standard extract transform load (ETL) workflow. The pipeline is scheduled to run every 3 hours starting on 2024-04-25 at 05:45:00. The goal of the pipeline is to perform the following:
- access online hosted CSV file
- perform lite data wrangling
- write file to local storage in parquet format
This example is setup as a simple set of tasks creating objects that are used in the next series of tasks. All components of the pipeline are within the pipeline_wildfire_hourly
function, which has no parameters.
#' pipeline_wildfire_hourly maestro pipeline
#'
#' @maestroFrequency 3 hour
#' @maestroStartTime 2024-04-25 05:45:00
#' @maestroTz America/Halifax
pipeline_wildfire_hourly <- function() {
# load libraries
library(dplyr)
library(readr)
library(sf)
library(sfarrow)
# Access active wildfire data from hosted csv
df <- readr::read_csv("https://cwfis.cfs.nrcan.gc.ca/downloads/activefires/activefires.csv")
# Data wrangling
df_geom <- df |>
dplyr::mutate(insert_datetime = Sys.time()) |>
sf::st_as_sf(coords = c("lon", "lat"), crs = 4326)
# Write active wildfires to file
basename <- paste("cdn_wildfire", as.integer(Sys.time()), sep = "_")
df_geom |>
sfarrow::write_sf_dataset("~/data/wildfires",
format = "parquet",
basename_template = paste0(basename,
"-{i}.parquet"))
}
Daily Pipeline
This pipeline is an example of a standard extract transform load (ETL) workflow. The pipeline is scheduled to run every day starting on 2024-04-25 at 06:30:00. The goal of the pipeline is to perform the following:
- submit a request to an API
- extract data from the API
- add insert datetime column
- write file to local storage in parquet format
This example has a custom function that is used to access and extract the data from the API, which is piped into additional tasks. All components of the pipeline are within the pipeline_climate_daily
function, which has no parameters.
#' pipeline_climate_daily maestro pipeline
#'
#' @maestroFrequency 1 day
#' @maestroStartTime 2024-04-25 06:30:00
#' @maestroTz America/Halifax
pipeline_climate_daily <- function() {
# load libraries
library(dplyr)
library(httr2)
library(arrow)
# Custom function for accessing api climate data
get_hourly_climate_info <- function(station_id, request_limit = 24) {
# Validate parameters
stopifnot("`station_id` must be a real number" = is.numeric(station_id) && station_id > 0)
stopifnot("`station_id` must be a length-one vector" = length(station_id) == 1)
# Access climate hourly via geomet api
hourly_req <- httr2::request("https://api.weather.gc.ca/collections/climate-hourly/items") |>
httr2::req_url_query(
lang = "en-CA",
offset = 0,
CLIMATE_IDENTIFIER = station_id,
LOCAL_DATE = Sys.Date() - 1,
limit = request_limit
)
# Perform the request
hourly_resp <- hourly_req |>
httr2::req_perform()
# Climate station response to data frame
geomet_json <- hourly_resp |>
httr2::resp_body_json(simplifyVector = TRUE)
geomet_json$features
}
# Write climate hourly to file
basename <- paste("climate_hourly", as.integer(Sys.time()), sep = "_")
get_hourly_climate_info(8202251) |>
dplyr::mutate(insert_datetime = Sys.time()) |>
arrow::write_dataset(
"~/data/climate",
format = "parquet",
basename_template = basename
)
}