Shell ScriptingETLPipelineAdvancedMay 2026

Shell Scripting Database Centric: ETL Pipelines in Shell

Build reliable Extract-Transform-Load pipelines entirely in bash — extract from MySQL with SQL aggregation, transform with AWK, load to analytics with LOAD DATA, retry handling, and pipeline orchestration.

ETL (Extract, Transform, Load) pipelines move data between systems — from operational databases to analytics warehouses, from vendor APIs to internal tables, from legacy systems to modern schemas. Shell scripts are an underrated ETL tool: no runtime dependency, composable with any data source, and trivially schedulable with cron.

BASH
#!/usr/bin/env bash
# etl_sales_pipeline.sh — Daily sales ETL: MySQL → analytics warehouse

set -euo pipefail

PIPELINE_DIR="/opt/myapp/etl"
LOG="${PIPELINE_DIR}/logs/etl-$(date +%Y%m%d).log"
SRC_CONF="/etc/myapp/mysql-prod.conf"
DST_CONF="/etc/myapp/mysql-analytics.conf"
STAGE_DIR="${PIPELINE_DIR}/staging"
mkdir -p "${STAGE_DIR}" "$(dirname "${LOG}")"

log() { echo "[$(date +%H:%M:%S)] $*" | tee -a "${LOG}"; }
src() { mysql --defaults-file="${SRC_CONF}" -BNs -D myapp -e "$1"; }
dst() { mysql --defaults-file="${DST_CONF}" -BNs -D analytics -e "$1"; }
YESTERDAY=$(date -d yesterday +%Y-%m-%d 2>/dev/null || date -v-1d +%Y-%m-%d)

log "=== ETL Pipeline start: ${YESTERDAY} ==="

# ── EXTRACT ───────────────────────────────────────────────
log "Extract: sales data for ${YESTERDAY}"
src "
SELECT
  o.id                               AS order_id,
  DATE(o.created_at)                 AS order_date,
  u.id                               AS customer_id,
  COALESCE(u.plan,'free')            AS customer_plan,
  COUNT(DISTINCT oi.product_id)      AS product_count,
  SUM(oi.quantity)                   AS items_sold,
  o.total                            AS order_total,
  o.status,
  LEFT(u.email, INSTR(u.email,'@')-1) AS username
FROM orders o
JOIN users u ON u.id=o.user_id
JOIN order_items oi ON oi.order_id=o.id
WHERE DATE(o.created_at)='${YESTERDAY}'
GROUP BY o.id" > "${STAGE_DIR}/sales_${YESTERDAY}.tsv"

ROW_COUNT=$(wc -l < "${STAGE_DIR}/sales_${YESTERDAY}.tsv")
log "  Extracted ${ROW_COUNT} rows"

# ── TRANSFORM ─────────────────────────────────────────────
log "Transform: normalise and enrich"
awk -F$'\t' '
OFS="\t"
{
  # Normalise status to standard codes
  if ($8 == "completed") $8 = "COMPLETE"
  if ($8 == "cancelled") $8 = "CANCEL"
  if ($8 == "pending")   $8 = "PENDING"
  # Add revenue tier
  tier = ($7 < 50) ? "LOW" : ($7 < 200) ? "MED" : "HIGH"
  print $0, tier
}' "${STAGE_DIR}/sales_${YESTERDAY}.tsv" > "${STAGE_DIR}/sales_${YESTERDAY}.transformed.tsv"

# ── LOAD ──────────────────────────────────────────────────
log "Load: insert into analytics warehouse"
dst "DELETE FROM fact_sales WHERE order_date='${YESTERDAY}'"

mysql --defaults-file="${DST_CONF}" analytics << SQL
LOAD DATA LOCAL INFILE '${STAGE_DIR}/sales_${YESTERDAY}.transformed.tsv'
INTO TABLE fact_sales
FIELDS TERMINATED BY '\t'
LINES TERMINATED BY '\n'
(order_id, order_date, customer_id, customer_plan, product_count,
 items_sold, order_total, status, username, revenue_tier);
SQL

LOADED=$(dst "SELECT COUNT(*) FROM fact_sales WHERE order_date='${YESTERDAY}'")
log "Loaded: ${LOADED} rows"
log "=== ETL Pipeline complete ==="
BASH
# ── ETL with retry and partial failure handling ────────────
run_etl_step() {
  local name="$1" cmd="$2"
  local attempts=0 max_attempts=3

  while (( attempts < max_attempts )); do
    (( attempts++ ))
    log "  Step: ${name} (attempt ${attempts})"
    if eval "${cmd}"; then
      log "  PASS: ${name}"
      return 0
    fi
    log "  FAIL: ${name} — retrying in 30s"
    sleep 30
  done

  log "  FATAL: ${name} failed after ${max_attempts} attempts"
  return 1
}

# ── Pipeline with multiple sources ────────────────────────
PIPELINE_OK=true

run_etl_step "extract_orders"   "extract_orders    ${YESTERDAY}" || PIPELINE_OK=false
run_etl_step "extract_events"   "extract_events    ${YESTERDAY}" || PIPELINE_OK=false
run_etl_step "transform_all"    "transform_data    ${YESTERDAY}" || PIPELINE_OK=false
run_etl_step "load_warehouse"   "load_analytics    ${YESTERDAY}" || PIPELINE_OK=false
run_etl_step "update_agg_tables" "rebuild_aggregates ${YESTERDAY}" || PIPELINE_OK=false

if "${PIPELINE_OK}"; then
  log "Pipeline: SUCCESS"
else
  log "Pipeline: PARTIAL FAILURE — check ${LOG}"
  mail -s "ETL FAILURE: ${YESTERDAY}" dba@example.com < "${LOG}"
  exit 1
fi
✔ ETL pipeli — Always delete-before-insert for idempotent reloads (if ETL fails halfway, re-run produces clean results). Use staging files rather than direct DB-to-DB pipes — they allow inspection after failures. Log every step with row counts before and after. Retry transient failures (network timeouts) with backoff. Alert on partial failures — a partially-loaded day is worse than a missing day.