Bulk data imports are where most data pipelines fail — insufficient memory, transaction lock timeouts, charset encoding issues, and partial imports that leave the database in a broken state. Shell scripts that wrap bulk imports with validation, batching, and rollback capability make the difference between reliable and fragile data pipelines.
1
MySQL LOAD DATA and chunked inserts
BASH
#!/usr/bin/env bash
# bulk_import.sh — Safe chunked CSV import into MySQL
set -euo pipefail
CSV_FILE="${1:?Usage: bulk_import.sh FILE}"
DB_CONF="/etc/myapp/mysql.conf"
TABLE="import_staging"
BATCH_SIZE=10000
ERRORS=0
db() { mysql --defaults-file="${DB_CONF}" -D myapp -e "$1"; }
# ── Step 1: Validate CSV before importing ─────────────────
LINE_COUNT=$(wc -l < "${CSV_FILE}")
echo " Lines in file: ${LINE_COUNT}"
# Check header matches expected columns
HEADER=$(head -1 "${CSV_FILE}")
EXPECTED="id,name,email,created_at"
[[ "${HEADER}" == "${EXPECTED}" ]] || { echo "ERROR: Header mismatch"; exit 1; }
# ── Step 2: Load into staging table ───────────────────────
db "TRUNCATE TABLE ${TABLE}"
# LOAD DATA INFILE — fastest MySQL bulk import method
db "LOAD DATA LOCAL INFILE '${CSV_FILE}'
INTO TABLE ${TABLE}
FIELDS TERMINATED BY ','
OPTIONALLY ENCLOSED BY '"'
LINES TERMINATED BY '
'
IGNORE 1 LINES -- skip header
(id, name, email, created_at)"
IMPORTED=$(db "SELECT COUNT(*) FROM ${TABLE}" | tail -1)
echo " Imported: ${IMPORTED} rows"
# ── Step 3: Validate staging data ─────────────────────────
NULLS=$(db "SELECT COUNT(*) FROM ${TABLE} WHERE email IS NULL" | tail -1)
(( NULLS > 0 )) && { echo "ERROR: ${NULLS} rows with NULL email"; exit 1; }
DUPES=$(db "SELECT COUNT(*) FROM (SELECT email,COUNT(*) c FROM ${TABLE} GROUP BY email HAVING c>1) d" | tail -1)
(( DUPES > 0 )) && echo "WARN: ${DUPES} duplicate emails in import"
# ── Step 4: Upsert from staging to production ─────────────
echo " Upserting to production..."
db "INSERT INTO users (id,name,email,created_at)
SELECT id,name,email,created_at FROM ${TABLE}
ON DUPLICATE KEY UPDATE
name=VALUES(name),
email=VALUES(email)"
echo " Import complete: ${IMPORTED} rows"
2
PostgreSQL COPY and pgloader patterns
BASH
# ── PostgreSQL COPY — fastest bulk load ───────────────────
# From stdin (no server filesystem required)
cat data.csv | PGPASSWORD="${DB_PASS}" psql -h "${DB_HOST}" -U "${DB_USER}" -d myapp -c "COPY users (id,name,email,created_at) FROM STDIN CSV HEADER"
# ── From file (server filesystem — very fast) ─────────────
PGPASSWORD="${DB_PASS}" psql -h "${DB_HOST}" -U "${DB_USER}" -d myapp -c "COPY users FROM '/tmp/users_import.csv' CSV HEADER"
# ── Chunked import with progress ──────────────────────────
TOTAL=$(wc -l < data.csv)
CHUNK=50000
split -l "${CHUNK}" data.csv /tmp/import_chunk_
for chunk in /tmp/import_chunk_*; do
PGPASSWORD="${DB_PASS}" psql -h "${DB_HOST}" -U "${DB_USER}" -d myapp -c "COPY users FROM STDIN CSV" < "${chunk}"
DONE=$(( DONE + CHUNK ))
printf " Progress: %d/%d (%.0f%%)\r" "${DONE}" "${TOTAL}" "$(echo "scale=0; ${DONE}*100/${TOTAL}" | bc)"
done
echo ""
# ── SQLite bulk import from CSV ────────────────────────────
sqlite3 mydb.db << SQL
.mode csv
.import data.csv users_staging
INSERT OR REPLACE INTO users SELECT * FROM users_staging;
DROP TABLE users_staging;
SQL
✔ ✔ Bulk impo — Always import into a staging/temporary table first, validate, then upsert/swap to production. For MySQL,
LOAD DATA LOCAL INFILE is 10-50x faster than row-by-row INSERT. For PostgreSQL, COPY FROM STDIN is the fastest path. Always disable indexes on the target table during bulk import and rebuild after (for empty-table loads). Validate row count matches source file before committing.