/** Copyright (c) 2007 Ricebridge. BSD License. */ package com.ricebridge.example.updown; import java.io.InputStream; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.ArrayList; import java.util.Arrays; import java.util.Properties; import com.ricebridge.csvman.CsvLoader; import com.ricebridge.csvman.CsvManager; import com.ricebridge.csvman.CsvSpec; /** Insert data into a database table from a CSV file. */ public class CsvUploader extends UploaderSupport { /** Read the CSV data from an InputStream, and insert or update the * corresponding database table rows. */ public void process( InputStream pInputStream, Properties pProperties ) throws Exception { mNumLines = 0; mInputErrors = new ArrayList(); mDatabaseErrors = new ArrayList(); // use CSV Manager to parse the CSV file CsvSpec csvspec = makeCsvSpec( pProperties ); CsvManager csvman = new CsvManager( csvspec ); CsvLoader csvldr = csvman.makeLoader( pInputStream ); ResultSet rs = null; PreparedStatement selectsql = null; PreparedStatement insertsql = null; PreparedStatement updatesql = null; try { // the column names are the first line of the CSV file String[] colnames = null; csvldr.begin(); if( csvldr.hasNext() ) { colnames = csvldr.next(); } else { throw new Exception("CSV file is empty."); } // find the identity column index // this is used to check for existing table rows int idcol = -1; for( int colI = 0; colI < colnames.length; colI++ ) { if( mIdentityColumn.equals(colnames[colI]) ) { idcol = colI; break; } } if( -1 == idcol ) { throw new Exception("Identity column '" +mIdentityColumn+"' not found in column headers."); } Converter conv = new Converter( colnames, mTableName, mIdentityColumn, mConnection ); selectsql = makeSelect( mTableName, mIdentityColumn, mConnection ); insertsql = makeInsert( colnames, mTableName, mIdentityColumn, mConnection ); updatesql = makeUpdate( colnames, mTableName, mIdentityColumn, mConnection ); // process each CSV line while( csvldr.hasNext() ) { String[] data = csvldr.next(); try { // check if row already exists, using identity column conv.setData( 1, idcol, data[idcol], selectsql ); rs = selectsql.executeQuery(); boolean exists = rs.next(); rs.close(); // if row exists, update, else insert if( exists ) { conv.setData( data, updatesql ); conv.setData( data.length+1, idcol, data[idcol], updatesql ); updatesql.executeUpdate(); } else { conv.setData( data, insertsql ); insertsql.executeUpdate(); } } catch( Exception e ) { mDatabaseErrors.add(e); // keep processing the rest of the data } } } finally { csvldr.end(); mNumLines = csvman.getLineCount(); mInputErrors.addAll( csvman.getBadLines() ); close(rs,selectsql,null); close(null,insertsql,null); close(null,updatesql,mConnection); } } /** The particular CSV format to use is specified by a set of properties. */ public CsvSpec makeCsvSpec( Properties pProperties ) { CsvSpec csvspec = CsvSpecHandler.makeCsvSpec(pProperties); return csvspec; } /** Create SQL statement to find existing rows using the identity column. */ public PreparedStatement makeSelect( String pTableName, String pIdentityColumn, Connection pConnection ) throws Exception { String sql = "SELECT "+pIdentityColumn+" FROM "+pTableName +" WHERE "+pIdentityColumn+" = ?;"; return pConnection.prepareStatement( sql ); } /** Create SQL statement to insert new rows. */ public PreparedStatement makeInsert( String[] pColNames, String pTableName, String pIdentityColumn, Connection pConnection ) throws Exception { StringBuffer sql = new StringBuffer( "INSERT INTO "+pTableName+" VALUES( "); for( int colI = 0; colI < pColNames.length; colI++ ) { sql.append( 0 == colI ? "?":", ?" ); } sql.append(" );"); return pConnection.prepareStatement( sql.toString() ); } /** Create SQL statement to update existing rows using the identity * column. */ public PreparedStatement makeUpdate( String[] pColNames, String pTableName, String pIdentityColumn, Connection pConnection ) throws Exception { StringBuffer sql = new StringBuffer( "UPDATE "+pTableName+" SET "); for( int colI = 0; colI < pColNames.length; colI++ ) { sql.append( 0 == colI ? "":", " ); sql.append( pColNames[colI]+" = ?" ); } sql.append(" WHERE "+pIdentityColumn+" = ?;"); return pConnection.prepareStatement( sql.toString() ); } }