ETL (Extract, Transform, Load)
ArkitekturProces til at udtrække data fra kilder, transformere det, og loade det ind i et destination system.
Beskrivelse
ETL er en fundamental dataintegrations-proces bestående af tre faser: Extract (udtræk data fra flere kilder), Transform (rens, konverter, aggreger data), og Load (indsæt i destinationsdatabase eller data warehouse). ETL er kernen i moderne data-pipelines og bruges til at konsolidere data fra forskellige systemer til analyse. Ekstraktion kan være fra databaser, API'er, CSV-filer eller realtids-streams. Transformation inkluderer datarensning (fjern dubletter, ret formater), berigelse (join med referencedata), aggregering (sum, gennemsnit) og normalisering. Loading kan være fuld load (hele datasættet) eller inkrementel load (kun ændringer). Moderne varianter inkluderer ELT (load først, transformer i destination) og realtids-streaming ETL. Værktøjer som Apache Airflow, Talend, Informatica og AWS Glue automatiserer ETL-workflows. God ETL sikrer datakvalitet, konsistens og tilgængelighed for business intelligence.
Problem
Organisationer har data spredt på tværs af mange systemer - CRM, ERP, databases, APIs, filer. Data har forskellige formater, kvalitet, og opdateringsfrekvenser. Hvordan konsoliderer man dette til et samlet, rent, analyseklart dataset?
Løsning
ETL pipelines automatisk udtrækker data fra alle kilder, transformerer det til et konsistent format og kvalitet, og loader det ind i et central data warehouse hvor analysts kan lave queries og rapporter uden at bekymre sig om data sources.
Eksempel
// Simpelt ETL eksempel med Node.js
// ============ EXTRACT ============
async function extractFromSources() {
// Udtræk fra MySQL database
const dbCustomers = await mysqlClient.query(`
SELECT customer_id, name, email, created_at
FROM customers
WHERE updated_at > '2024-01-01'
`);
// Udtræk fra REST API
const apiOrders = await fetch('https://api.shop.dk/orders')
.then(res => res.json());
// Udtræk fra CSV fil
const csvProducts = await fs.readFile('products.csv', 'utf-8')
.then(data => parseCSV(data));
return { customers: dbCustomers, orders: apiOrders, products: csvProducts };
}
// ============ TRANSFORM ============
function transformData({ customers, orders, products }) {
// 1. Data Cleaning
const cleanCustomers = customers
.filter(c => c.email && c.email.includes('@')) // Remove invalid emails
.map(c => ({
...c,
email: c.email.toLowerCase().trim(), // Normalize
name: c.name.trim()
}));
// 2. Data Enrichment - Join orders with customers
const enrichedOrders = orders.map(order => {
const customer = cleanCustomers.find(c => c.customer_id === order.customer_id);
return {
order_id: order.id,
customer_name: customer?.name || 'Unknown',
customer_email: customer?.email,
order_total: order.total,
order_date: new Date(order.created_at)
};
});
// 3. Data Aggregation
const customerMetrics = cleanCustomers.map(customer => {
const customerOrders = enrichedOrders.filter(
o => o.customer_email === customer.email
);
return {
customer_id: customer.customer_id,
customer_email: customer.email,
total_orders: customerOrders.length,
total_spent: customerOrders.reduce((sum, o) => sum + o.order_total, 0),
avg_order_value: customerOrders.length > 0
? customerOrders.reduce((sum, o) => sum + o.order_total, 0) / customerOrders.length
: 0,
first_order_date: customerOrders.length > 0
? new Date(Math.min(...customerOrders.map(o => o.order_date)))
: null
};
});
return { customerMetrics, enrichedOrders, products };
}
// ============ LOAD ============
async function loadToWarehouse(transformedData) {
const { customerMetrics, enrichedOrders, products } = transformedData;
// Load til data warehouse (PostgreSQL)
await warehouseDb.transaction(async (trx) => {
// Truncate staging tables
await trx.raw('TRUNCATE staging.customer_metrics');
// Bulk insert
await trx('staging.customer_metrics').insert(customerMetrics);
await trx('staging.enriched_orders').insert(enrichedOrders);
// Merge til production tables (upsert)
await trx.raw(`
INSERT INTO prod.customers
SELECT * FROM staging.customer_metrics
ON CONFLICT (customer_id)
DO UPDATE SET
total_orders = EXCLUDED.total_orders,
total_spent = EXCLUDED.total_spent,
updated_at = NOW()
`);
});
console.log(`ETL completed: Loaded ${customerMetrics.length} customers`);
}
// ============ ORCHESTRATION ============
async function runETLPipeline() {
try {
console.log('Starting ETL pipeline...');
const rawData = await extractFromSources();
const transformedData = transformData(rawData);
await loadToWarehouse(transformedData);
console.log('ETL pipeline completed successfully');
} catch (error) {
console.error('ETL failed:', error);
// Send alert, log to monitoring
}
}
// Schedule: Run every day at 2 AM
cron.schedule('0 2 * * *', runETLPipeline);Fordele
- ✓Centraliseret data til analyse
- ✓Konsistent datakvalitet og format
- ✓Automatisering af data-workflows
- ✓Historisk datasporing
- ✓Adskillelse af ansvar (OLTP vs OLAP)
Udfordringer
- ⚠Kompleksitet ved mange datakilder
- ⚠Datakvalitetsproblemer fra kildesystemer
- ⚠Ydeevne ved store datamængder
- ⚠Skemaændringer i kildesystemer
- ⚠Overvågning og fejlhåndtering
Anvendelsesområder
- •Befolkning af data warehouse
- •Business intelligence-rapporter
- •Datamigrering mellem systemer
- •Aggregering af data fra flere kilder
- •Master Data Management (MDM)
Eksempler fra den virkelige verden
- •E-handel: Kombiner web, CRM, lagerdata til salgsrapporter
- •Bankvirksomhed: Aggreger transaktioner fra alle filialer til central rapportering
- •Sundhedsvæsen: Konsolider patientdata fra flere systemer
- •Detailhandel: Kombiner point-of-sale, lager, kundedata
- •Marketing: Samle data fra annoncer, website, CRM til attributionsanalyse