ETL (Extract, Transform, Load)
ArkitekturProces til at udtrække data fra kilder, transformere det, og loade det ind i et destination system.
Beskrivelse
ETL er en fundamental data integration proces bestående af tre faser: Extract (udtræk data fra multiple kilder), Transform (rens, konverter, aggreger data), og Load (indsæt i destination database eller data warehouse). ETL er kernen i moderne data pipelines og bruges til at konsolidere data fra forskellige systemer til analyse. Extraction kan være fra databaser, APIs, CSV filer, eller real-time streams. Transformation inkluderer data cleaning (fjern duplicates, fix formats), enrichment (join med reference data), aggregation (sum, average), og normalization. Loading kan være full load (helt dataset) eller incremental load (kun ændringer). Moderne varianter inkluderer ELT (load først, transform i destination) og real-time streaming ETL. Tools som Apache Airflow, Talend, Informatica og AWS Glue automatiserer ETL workflows. God ETL sikrer data quality, consistency og tilgængelighed for business intelligence.
Problem
Organisationer har data spredt på tværs af mange systemer - CRM, ERP, databases, APIs, filer. Data har forskellige formater, kvalitet, og opdateringsfrekvenser. Hvordan konsoliderer man dette til et samlet, rent, analyseklart dataset?
Løsning
ETL pipelines automatisk udtrækker data fra alle kilder, transformerer det til et konsistent format og kvalitet, og loader det ind i et central data warehouse hvor analysts kan lave queries og rapporter uden at bekymre sig om data sources.
Eksempel
// Simpelt ETL eksempel med Node.js
// ============ EXTRACT ============
async function extractFromSources() {
// Udtræk fra MySQL database
const dbCustomers = await mysqlClient.query(`
SELECT customer_id, name, email, created_at
FROM customers
WHERE updated_at > '2024-01-01'
`);
// Udtræk fra REST API
const apiOrders = await fetch('https://api.shop.dk/orders')
.then(res => res.json());
// Udtræk fra CSV fil
const csvProducts = await fs.readFile('products.csv', 'utf-8')
.then(data => parseCSV(data));
return { customers: dbCustomers, orders: apiOrders, products: csvProducts };
}
// ============ TRANSFORM ============
function transformData({ customers, orders, products }) {
// 1. Data Cleaning
const cleanCustomers = customers
.filter(c => c.email && c.email.includes('@')) // Remove invalid emails
.map(c => ({
...c,
email: c.email.toLowerCase().trim(), // Normalize
name: c.name.trim()
}));
// 2. Data Enrichment - Join orders with customers
const enrichedOrders = orders.map(order => {
const customer = cleanCustomers.find(c => c.customer_id === order.customer_id);
return {
order_id: order.id,
customer_name: customer?.name || 'Unknown',
customer_email: customer?.email,
order_total: order.total,
order_date: new Date(order.created_at)
};
});
// 3. Data Aggregation
const customerMetrics = cleanCustomers.map(customer => {
const customerOrders = enrichedOrders.filter(
o => o.customer_email === customer.email
);
return {
customer_id: customer.customer_id,
customer_email: customer.email,
total_orders: customerOrders.length,
total_spent: customerOrders.reduce((sum, o) => sum + o.order_total, 0),
avg_order_value: customerOrders.length > 0
? customerOrders.reduce((sum, o) => sum + o.order_total, 0) / customerOrders.length
: 0,
first_order_date: customerOrders.length > 0
? new Date(Math.min(...customerOrders.map(o => o.order_date)))
: null
};
});
return { customerMetrics, enrichedOrders, products };
}
// ============ LOAD ============
async function loadToWarehouse(transformedData) {
const { customerMetrics, enrichedOrders, products } = transformedData;
// Load til data warehouse (PostgreSQL)
await warehouseDb.transaction(async (trx) => {
// Truncate staging tables
await trx.raw('TRUNCATE staging.customer_metrics');
// Bulk insert
await trx('staging.customer_metrics').insert(customerMetrics);
await trx('staging.enriched_orders').insert(enrichedOrders);
// Merge til production tables (upsert)
await trx.raw(`
INSERT INTO prod.customers
SELECT * FROM staging.customer_metrics
ON CONFLICT (customer_id)
DO UPDATE SET
total_orders = EXCLUDED.total_orders,
total_spent = EXCLUDED.total_spent,
updated_at = NOW()
`);
});
console.log(`ETL completed: Loaded ${customerMetrics.length} customers`);
}
// ============ ORCHESTRATION ============
async function runETLPipeline() {
try {
console.log('Starting ETL pipeline...');
const rawData = await extractFromSources();
const transformedData = transformData(rawData);
await loadToWarehouse(transformedData);
console.log('ETL pipeline completed successfully');
} catch (error) {
console.error('ETL failed:', error);
// Send alert, log to monitoring
}
}
// Schedule: Run every day at 2 AM
cron.schedule('0 2 * * *', runETLPipeline);Fordele
- ✓Centraliseret data for analytics
- ✓Konsistent data quality og format
- ✓Automatisering af data workflows
- ✓Historisk data tracking
- ✓Separation of concerns (OLTP vs OLAP)
Udfordringer
- ⚠Kompleksitet ved mange data sources
- ⚠Data quality issues fra source systemer
- ⚠Performance ved store data volumes
- ⚠Schema changes i source systemer
- ⚠Monitoring og error handling
Anvendelsesområder
- •Data warehouse population
- •Business intelligence rapporter
- •Data migration mellem systemer
- •Aggregation af multi-source data
- •Master Data Management (MDM)
Eksempler fra den virkelige verden
- •E-commerce: Kombiner web, CRM, inventory data til sales reports
- •Banking: Aggregate transaktioner fra alle branches til central reporting
- •Healthcare: Konsolider patient data fra multiple systemer
- •Retail: Kombiner point-of-sale, inventory, customer data
- •Marketing: Samle data fra ads, website, CRM til attribution analysis