ETL (Extract, Transform, Load) pipelines move data between systems — from operational databases to analytics warehouses, from vendor APIs to internal tables, from legacy systems to modern schemas. Shell scripts are an underrated ETL tool: no runtime dependency, composable with any data source, and trivially schedulable with cron.
1
Extract-Transform-Load pipeline structure
BASH
#!/usr/bin/env bash
# etl_sales_pipeline.sh — Daily sales ETL: MySQL → analytics warehouse
set -euo pipefail
PIPELINE_DIR="/opt/myapp/etl"
LOG="${PIPELINE_DIR}/logs/etl-$(date +%Y%m%d).log"
SRC_CONF="/etc/myapp/mysql-prod.conf"
DST_CONF="/etc/myapp/mysql-analytics.conf"
STAGE_DIR="${PIPELINE_DIR}/staging"
mkdir -p "${STAGE_DIR}" "$(dirname "${LOG}")"
log() { echo "[$(date +%H:%M:%S)] $*" | tee -a "${LOG}"; }
src() { mysql --defaults-file="${SRC_CONF}" -BNs -D myapp -e "$1"; }
dst() { mysql --defaults-file="${DST_CONF}" -BNs -D analytics -e "$1"; }
YESTERDAY=$(date -d yesterday +%Y-%m-%d 2>/dev/null || date -v-1d +%Y-%m-%d)
log "=== ETL Pipeline start: ${YESTERDAY} ==="
# ── EXTRACT ───────────────────────────────────────────────
log "Extract: sales data for ${YESTERDAY}"
src "
SELECT
o.id AS order_id,
DATE(o.created_at) AS order_date,
u.id AS customer_id,
COALESCE(u.plan,'free') AS customer_plan,
COUNT(DISTINCT oi.product_id) AS product_count,
SUM(oi.quantity) AS items_sold,
o.total AS order_total,
o.status,
LEFT(u.email, INSTR(u.email,'@')-1) AS username
FROM orders o
JOIN users u ON u.id=o.user_id
JOIN order_items oi ON oi.order_id=o.id
WHERE DATE(o.created_at)='${YESTERDAY}'
GROUP BY o.id" > "${STAGE_DIR}/sales_${YESTERDAY}.tsv"
ROW_COUNT=$(wc -l < "${STAGE_DIR}/sales_${YESTERDAY}.tsv")
log " Extracted ${ROW_COUNT} rows"
# ── TRANSFORM ─────────────────────────────────────────────
log "Transform: normalise and enrich"
awk -F$'\t' '
OFS="\t"
{
# Normalise status to standard codes
if ($8 == "completed") $8 = "COMPLETE"
if ($8 == "cancelled") $8 = "CANCEL"
if ($8 == "pending") $8 = "PENDING"
# Add revenue tier
tier = ($7 < 50) ? "LOW" : ($7 < 200) ? "MED" : "HIGH"
print $0, tier
}' "${STAGE_DIR}/sales_${YESTERDAY}.tsv" > "${STAGE_DIR}/sales_${YESTERDAY}.transformed.tsv"
# ── LOAD ──────────────────────────────────────────────────
log "Load: insert into analytics warehouse"
dst "DELETE FROM fact_sales WHERE order_date='${YESTERDAY}'"
mysql --defaults-file="${DST_CONF}" analytics << SQL
LOAD DATA LOCAL INFILE '${STAGE_DIR}/sales_${YESTERDAY}.transformed.tsv'
INTO TABLE fact_sales
FIELDS TERMINATED BY '\t'
LINES TERMINATED BY '\n'
(order_id, order_date, customer_id, customer_plan, product_count,
items_sold, order_total, status, username, revenue_tier);
SQL
LOADED=$(dst "SELECT COUNT(*) FROM fact_sales WHERE order_date='${YESTERDAY}'")
log "Loaded: ${LOADED} rows"
log "=== ETL Pipeline complete ==="
2
Multi-source ETL with error handling
BASH
# ── ETL with retry and partial failure handling ────────────
run_etl_step() {
local name="$1" cmd="$2"
local attempts=0 max_attempts=3
while (( attempts < max_attempts )); do
(( attempts++ ))
log " Step: ${name} (attempt ${attempts})"
if eval "${cmd}"; then
log " PASS: ${name}"
return 0
fi
log " FAIL: ${name} — retrying in 30s"
sleep 30
done
log " FATAL: ${name} failed after ${max_attempts} attempts"
return 1
}
# ── Pipeline with multiple sources ────────────────────────
PIPELINE_OK=true
run_etl_step "extract_orders" "extract_orders ${YESTERDAY}" || PIPELINE_OK=false
run_etl_step "extract_events" "extract_events ${YESTERDAY}" || PIPELINE_OK=false
run_etl_step "transform_all" "transform_data ${YESTERDAY}" || PIPELINE_OK=false
run_etl_step "load_warehouse" "load_analytics ${YESTERDAY}" || PIPELINE_OK=false
run_etl_step "update_agg_tables" "rebuild_aggregates ${YESTERDAY}" || PIPELINE_OK=false
if "${PIPELINE_OK}"; then
log "Pipeline: SUCCESS"
else
log "Pipeline: PARTIAL FAILURE — check ${LOG}"
mail -s "ETL FAILURE: ${YESTERDAY}" dba@example.com < "${LOG}"
exit 1
fi
✔ ✔ ETL pipeli — Always delete-before-insert for idempotent reloads (if ETL fails halfway, re-run produces clean results). Use staging files rather than direct DB-to-DB pipes — they allow inspection after failures. Log every step with row counts before and after. Retry transient failures (network timeouts) with backoff. Alert on partial failures — a partially-loaded day is worse than a missing day.