Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
DataCleaner (community edition) Copyright (C) 2014 Neopost - Customer Information Management This copyrighted material is made available to anyone wishing to use, modify, copy, or redistribute it subject to the terms and conditions of the GNU Lesser General Public License, as published by the Free Software Foundation. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this distribution; if not, write to: Free Software Foundation, Inc. 51 Franklin Street, Fifth Floor Boston, MA 02110-1301 USA
 
 package org.datacleaner.beans.writers;
 
 import java.io.File;
 import java.util.Date;
 import java.util.List;
 
 
 
 @Named("Insert into table")
 @Description("Insert records into a table in a registered datastore. This component allows you to map the values available in the flow with the columns of the target table, in order to insert these values into the table.")
 @Categorized(superCategory = WriteSuperCategory.class)
 @Concurrent(true)
 
     private static final String PROPERTY_NAME_VALUES = "Values";
 
     private static final File TEMP_DIR = FileHelper.getTempDir();
 
     private static final String ERROR_MESSAGE_COLUMN_NAME = "insert_into_table_error_message";
 
     private static final Logger logger = LoggerFactory.getLogger(InsertIntoTableAnalyzer.class);
 
     @Inject
     @Description("Values to write to the table")
     InputColumn<?>[] values;
 
     @Inject
    @Configured
    @Description("Names of columns in the target table.")
    @Inject
    @Configured
    @Description("Datastore to write to")
    @Inject
    @Configured(required = false)
    @Description("Schema name of target table")
    @Inject
    @Configured(required = false)
    @Description("Table to target (insert into)")
    @Inject
    @Configured
    @Description("Truncate table before inserting?")
    boolean truncateTable = false;
    @Inject
    @Configured("Buffer size")
    @Description("How much data to buffer before committing batches of data. Large batches often perform better, but require more memory.")
    @Inject
    @Configured(value = "How to handle insertion errors?")
    @Inject
    @Configured(value = "Error log file location", required = false)
    @Description("Directory or file path for saving erroneous records")
    @FileProperty(accessMode = ., extension = ".csv")
    @Inject
    @Configured(required = false)
    @Description("Additional values to write to error log")
    @Inject
    @Provided
    private Column[] _targetColumns;
    private WriteBuffer _writeBuffer;
    private AtomicInteger _errorRowCount;
    @Validate
    public void validate() {
        if (. != .) {
            throw new IllegalStateException("Length of 'Values' (" + . + ") and 'Column names' ("
                    + . + ") must be equal");
        }
    }

    
Truncates the database table if necesary. This is NOT a distributable initializer, since it can only happen once.
    @Initialize(distributed = false)
    public void truncateIfNecesary() {
        if () {
            final UpdateableDatastoreConnection con = .openConnection();
            try {
                final SchemaNavigator schemaNavigator = con.getSchemaNavigator();
                final Table table = schemaNavigator.convertToTable();
                final UpdateableDataContext dc = con.getUpdateableDataContext();
                dc.executeUpdate(new UpdateScript() {
                    @Override
                    public void run(UpdateCallback callback) {
                        final RowDeletionBuilder delete = callback.deleteFrom(table);
                        if (.isInfoEnabled()) {
                            .info("Executing truncating DELETE operation: {}"delete.toSql());
                        }
                        delete.execute();
                    }
                });
            } finally {
                con.close();
            }
        }
    }
    @Initialize
    public void init() throws IllegalArgumentException {
        if (.isDebugEnabled()) {
            .debug("At init() time, InputColumns are: {}", Arrays.toString());
        }
         = new AtomicInteger();
         = new AtomicInteger();
             = createErrorDataContext();
        }
        int bufferSize = .calculateBufferSize(.);
        .info("Row buffer size set to {}"bufferSize);
         = new WriteBuffer(bufferSizethis);
        final UpdateableDatastoreConnection con = .openConnection();
        try {
            final SchemaNavigator schemaNavigator = con.getSchemaNavigator();
             = schemaNavigator.convertToColumns();
            final List<StringcolumnsNotFound = new ArrayList<String>();
            for (int i = 0; i < .i++) {
                if ([i] == null) {
                    columnsNotFound.add([i]);
                }
            }
            if (!columnsNotFound.isEmpty()) {
                throw new IllegalArgumentException("Could not find column(s): " + columnsNotFound);
            }
        } finally {
            con.close();
        }
        if (. != .) {
            throw new IllegalArgumentException("Configuration yielded unexpected target column count (got "
                    + . + ", expected " + . + ")");
        }
    }
    @Override
    public String getSuggestedLabel() {
        if ( == null ||  == null) {
            return null;
        }
        return .getName() + " - " + ;
    }
    private void validateCsvHeaders(CsvDataContext dc) {
        Schema schema = dc.getDefaultSchema();
        if (schema.getTableCount() == 0) {
            // nothing to worry about, we will create the table ourselves
            return;
        }
        Table table = schema.getTables()[0];
        // verify that table names correspond to what we need!
        for (String columnName : ) {
            Column column = table.getColumnByName(columnName);
            if (column == null) {
                throw new IllegalStateException("Error log file does not have required column header: " + columnName);
            }
        }
        if ( != null) {
            for (InputColumn<?> inputColumn : ) {
                String columnName = translateAdditionalErrorLogColumnName(inputColumn.getName());
                Column column = table.getColumnByName(columnName);
                if (column == null) {
                    throw new IllegalStateException("Error log file does not have required column header: "
                            + columnName);
                }
            }
        }
        Column column = table.getColumnByName();
        if (column == null) {
            throw new IllegalStateException("Error log file does not have required column: "
                    + );
        }
    }
    private String translateAdditionalErrorLogColumnName(String columnName) {
        if (ArrayUtils.contains(columnName)) {
            return translateAdditionalErrorLogColumnName(columnName + "_add");
        }
        return columnName;
    }
        final File file;
        if ( == null || .equals()) {
            try {
                file = File.createTempFile("insertion_error"".csv");
            } catch (IOException e) {
                throw new IllegalStateException("Could not create new temp file"e);
            }
        } else if (.isDirectory()) {
            file = new File("insertion_error_log.csv");
        } else {
            file = ;
        }
        final CsvDataContext dc = new CsvDataContext(file);
        final Schema schema = dc.getDefaultSchema();
        if (file.exists() && file.length() > 0) {
            validateCsvHeaders(dc);
        } else {
            // create table if no table exists.
            dc.executeUpdate(new UpdateScript() {
                @Override
                public void run(UpdateCallback cb) {
                    TableCreationBuilder tableBuilder = cb.createTable(schema"error_table");
                    for (String columnName : ) {
                        tableBuilder = tableBuilder.withColumn(columnName);
                    }
                    if ( != null) {
                        for (InputColumn<?> inputColumn : ) {
                            String columnName = translateAdditionalErrorLogColumnName(inputColumn.getName());
                            tableBuilder = tableBuilder.withColumn(columnName);
                        }
                    }
                    tableBuilder = tableBuilder.withColumn();
                    tableBuilder.execute();
                }
            });
        }
        return dc;
    }
    @Override
    public void run(InputRow rowint distinctCount) {
        if (.isDebugEnabled()) {
            .debug("At run() time, InputColumns are: {}", Arrays.toString());
        }
        final Object[] rowData;
        if ( == null) {
            rowData = new Object[.];
        } else {
            rowData = new Object[. + .];
        }
        for (int i = 0; i < .i++) {
            rowData[i] = row.getValue([i]);
        }
        if ( != null) {
            for (int i = 0; i < .i++) {
                Object value = row.getValue([i]);
                rowData[. + i] = value;
            }
        }
        try {
            // perform conversion in a separate loop, since it might crash and
            // the
            // error data will be more complete if first loop finished.
            for (int i = 0; i < .i++) {
                rowData[i] = convertType(rowData[i], [i]);
                if (.isDebugEnabled()) {
                    .debug("Value for {} set to: {}"[i], rowData[i]);
                }
            }
        } catch (RuntimeException e) {
            for (int i = 0; i < distinctCounti++) {
                errorOccurred(rowDatae);
            }
            return;
        }
        if (.isDebugEnabled()) {
            .debug("Adding row data to buffer: {}", Arrays.toString(rowData));
        }
        for (int i = 0; i < distinctCounti++) {
            .addToBuffer(rowData);
        }
    }
    private Object convertType(final Object valueColumn targetColumnthrows IllegalArgumentException {
        if (value == null) {
            return null;
        }
        Object result = value;
        ColumnType type = targetColumn.getType();
        if (type.isLiteral()) {
            // for strings, only convert some simple cases, since JDBC drivers
            // typically also do a decent job here (with eg. Clob types, char[]
            // types etc.)
            if (value instanceof Number || value instanceof Date) {
                result = value.toString();
            }
        } else if (type.isNumber()) {
            Number numberValue = ConvertToNumberTransformer.transformValue(value);
            if (numberValue == null && !"".equals(value)) {
                throw new IllegalArgumentException("Could not convert " + value + " to number");
            }
            result = numberValue;
        } else if (type == .) {
            Boolean booleanValue = ConvertToBooleanTransformer.transformValue(value);
            if (booleanValue == null && !"".equals(value)) {
                throw new IllegalArgumentException("Could not convert " + value + " to boolean");
            }
            result = booleanValue;
        }
        return result;
    }
    @Override
    public WriteDataResult getResult() {
        .flushBuffer();
        final int writtenRowCount = .get();
        final FileDatastore errorDatastore;
        if ( != null) {
            Resource resource = .getResource();
            errorDatastore = new CsvDatastore(resource.getName(), resource);
        } else {
            errorDatastore = null;
        }
        return new WriteDataResultImpl(writtenRowCount.get(),
                errorDatastore);
    }

    
Method invoked when flushing the buffer
    @Override
    public void run(final Iterable<Object[]> bufferthrows Exception {
        final UpdateableDatastoreConnection con = .openConnection();
        try {
            final Column[] columns = con.getSchemaNavigator().convertToColumns();
            if (.isDebugEnabled()) {
                .debug("Inserting into columns: {}", Arrays.toString(columns));
            }
            final UpdateableDataContext dc = con.getUpdateableDataContext();
            dc.executeUpdate(new BatchUpdateScript() {
                @Override
                public void run(UpdateCallback callback) {
                    int insertCount = 0;
                    for (Object[] rowData : buffer) {
                        RowInsertionBuilder insertBuilder = callback.insertInto(columns[0].getTable());
                        for (int i = 0; i < columns.lengthi++) {
                            insertBuilder = insertBuilder.value(columns[i], rowData[i]);
                        }
                        if (.isDebugEnabled()) {
                            .debug("Inserting: {}", Arrays.toString(rowData));
                        }
                        try {
                            insertBuilder.execute();
                            insertCount++;
                            .incrementAndGet();
                        } catch (final RuntimeException e) {
                            errorOccurred(rowDatae);
                        }
                    }
                    if (insertCount > 0) {
                        .publishMessage(new ExecutionLogMessage(insertCount + " inserts executed"));
                    }
                }
            });
        } finally {
            con.close();
        }
    }
    protected void errorOccurred(final Object[] rowDatafinal RuntimeException e) {
            throw e;
        } else {
            .warn("Error occurred while inserting record. Writing to error stream"e);
            .executeUpdate(new UpdateScript() {
                @Override
                public void run(UpdateCallback cb) {
                    RowInsertionBuilder insertBuilder = cb
                            .insertInto(.getDefaultSchema().getTables()[0]);
                    for (int i = 0; i < .i++) {
                        insertBuilder = insertBuilder.value([i], rowData[i]);
                    }
                    if ( != null) {
                        for (int i = 0; i < .i++) {
                            String columnName = translateAdditionalErrorLogColumnName([i]
                                    .getName());
                            Object value = rowData[. + i];
                            insertBuilder = insertBuilder.value(columnNamevalue);
                        }
                    }
                    insertBuilder = insertBuilder.value(e.getMessage());
                    insertBuilder.execute();
                }
            });
        }
    }
New to GrepCode? Check out our FAQ X