Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements. See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership. The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
   *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.marmotta.platform.core.services.importer;
 
 import java.io.File;
 import java.util.Map;
 
 
 
Implementation for watching import directory. This service watches the import directory (see getImportRoot()) for (new) files and imports them.

Author(s):
Sergio Fernández
Jakob Frank <jakob@apache.org>
 
 public class ImportWatchServiceImpl implements ImportWatchService {
 
     private static final String CONFIG_PREFIX = "file-import.";
     private static final String CONFIG_KEY_LOCK_FILE =  + "lockFile";
     private static final String CONFIG_KEY_CONF_FILE =  + "dirConfigFile";
     private static final String CONFIG_KEY_IMPORT_DELAY =  + "importDelay";
     private static final String CONFIG_KEY_DELETE_AFTER_IMPORT =  + "deleteAfterImport";
     private static final String CONFIG_KEY_SERVICE_ENABLED =  + "enabled";
 
     private static final String TASK_GROUP = "Import Watch";
 
     private static final String TASK_DETAIL_PATH = "path";
     private static final String TASK_DETAIL_QUEUE = "import queue";
 
     @Inject
     private Logger log;
 
     @Inject
 
    @Inject
    @Inject
    private ImportService importService;
    @Inject
    @Inject
    private UserService userService;
    private ImportWatcher importWatcher = null;

    
Initialize and start the watcher service.
    @Override
    public void startup() {
             = new ImportWatcher(getImportRoot());
            new Thread().start();
        }
    }

    
The import root. all files put into this directory (and any subdir) will be imported. Directories containing a file called "lock" (configurable, see CONFIG_KEY_LOCK_FILE) are ignored.
    @Override
    public Path getImportRoot() {
    }

    
Shutdown the directory.
    @Override
    public void shutdown() {
        if ( != null) {
            try {
                .shutdown();
            } catch (IOException e) {
                .error("Exception while shutting down import watcher: {}\n{}"e.getMessage(), e);
            }
             = null;
        }
    }
        if (event.containsChangedKeyWithPrefix()) {
            if (event.containsChangedKey()) {
                shutdown();
                startup();
            } else if ( != null) {
            }
        }
    }
    protected void onSystemStartupEvent(@Observes SystemStartupEvent event) {
        shutdown();
        startup();
    }

    
Import the given file.

    @Override
    public boolean importFile(File filethrows MarmottaImportException {
        return importFile(file.toPath());
    }

    
Import the given file. The format of the input file is detected based on the filename, as is an optional compression of the file (known formats: GZip and BZip2)

Parameters:
file the file to import
Throws:
org.apache.marmotta.platform.core.exception.io.MarmottaImportException if the import failed due to various reasons.
    @Override
    public boolean importFile(Path filethrows MarmottaImportException {
        try {
            URI context;
            try {
                context = getTargetContext(file);
            } catch (URISyntaxException e) {
                .warn("Could not build context for file {}: {}"filee.getMessage());
                context = null;
            }
            String format = detectFormat(file);
            InputStream is = openStream(file);
            URI user = .getAdminUser();
            .importData(isformatusercontext);
            is.close();
            return true;
        } catch (IOException e) {
            throw new MarmottaImportException("Could not read input file " + file.toFile().getAbsolutePath(), e);
        }
    }

    
Detect the import format of the given file (mime-type)

Parameters:
file the file to check
Returns:
the mime-type
Throws:
org.apache.marmotta.platform.core.exception.io.MarmottaImportException
    private String detectFormat(Path filethrows MarmottaImportException {
        String format = null;
        final String fileName = file.toFile().getName();
        final Path config = file.getParent().resolve(.getStringConfiguration("config"));
        if (Files.isReadable(config)) {
            Properties prop = loadConfigFile(file);
            final String fmt = prop.getProperty("format");
            if (fmt != null) {
                RDFFormat rdfFormat = Rio.getParserFormatForMIMEType(fmt);
                if (rdfFormat != null) {
                    format = rdfFormat.getDefaultMIMEType();
                    .debug("Using format {} from config file {}"formatconfig);
                } else {
                    .debug("Unknown format {} in config file {}, ignoring"fmtconfig);
                }
            } else {
                .trace("No format defined in {}"config);
            }
        }
        // mimetype detection based on file-extension
        if (format == null) {
            // FIXME: Maybe use GzipUtils and BZip2Utils instead?
            RDFFormat rdfFormat = Rio.getParserFormatForFileName(fileName.replaceFirst("\\.(gz|bz2)$",""));
            if (rdfFormat != null) {
                format = rdfFormat.getDefaultMIMEType();
                .trace("Using format {} based on file-name {}"formatfileName);
            }
        }
        if (format == null || !.getAcceptTypes().contains(format)) {
            throw new MarmottaImportException("Suitable RDF parser not found");
        }
        // encoding detection
        // FIXME: is this required?
        try (BufferedInputStream bis = new BufferedInputStream(openStream(file))) {
            CharsetDetector cd = new CharsetDetector();
            cd.setText(bis);
            CharsetMatch cm = cd.detect();
            if (cm != null) {
                .trace("Detected charset {} in {}"cm.getName(), file);
                format += "; charset=" + cm.getName();
            }
            bis.close();
        } catch (IOException e) {
            .error("Error detecting charset for '{}': {}"fileNamee.getMessage());
        }
        return format;
    }
    private InputStream openStream(Path filethrows IOException {
        final String fName = file.getFileName().toString();
        final FileInputStream fis = new FileInputStream(file.toFile());
        
        if (GzipUtils.isCompressedFilename(fName)) {
            .trace("{} looks GZIP compressed,"file);
            return new GZIPInputStream(fis);
        } else if (BZip2Utils.isCompressedFilename(fName)) {
            .trace("{} looks BZ2 compressed"file);
            return new BZip2CompressorInputStream(fis);
        } else {
            return fis;
        }
    }
    private Properties loadConfigFile(Path importFile) {
        // Check for a configFile
        final Path config = importFile.getParent().resolve(.getStringConfiguration("config"));
        if (Files.isReadable(config)) {
            try {
                Properties prop = new Properties();
                final FileInputStream inStream = new FileInputStream(config.toFile());
                prop.load(inStream);
                inStream.close();
                return prop;
            } catch (IOException e) {
                .warn("could not read dirConfigFile {}: {}"confige.getMessage());
            }
        }
        return null;
    }

    
Get the target context. The algorithm is as follows:
  1. check for a file "conf" (configurable, see CONFIG_KEY_CONF_FILE) which specifies the target content using java.util.Properties syntax (key context), then use is; or
  2. check if the sub-directory is a url-encoded URI, then use it; or
  3. construct the context by using org.apache.marmotta.platform.core.api.config.ConfigurationService.getBaseContext() and the relative sub-dirs and use it; or
  4. use the default context as a general fallback.

Parameters:
file the file
Returns:
the context URI
Throws:
java.net.URISyntaxException
    private URI getTargetContext(Path filethrows URISyntaxException {
        // Check for a configFile
        final Path config = file.getParent().resolve(.getStringConfiguration("config"));
        if (Files.isReadable(config)) {
            Properties prop = loadConfigFile(file);
            final String _c = prop.getProperty("context");
            if (_c != null) {
                try {
                    URI context = .createContext(_c);
                    .debug("using context {} from config file {}"contextconfig);
                    return context;
                } catch (URISyntaxException e) {
                    .warn("invalid context {} in config file {}, ignoring"_cconfig);
                }
            } else {
                .trace("no context defined in config file {}"config);
            }
        }
        // Check for url-encoded directory
        Path subDir = getImportRoot().relativize(file.getParent());
        if (StringUtils.isBlank(subDir.toString())) {
            .trace("using default context for file {}"file);
            return .getDefaultContext();
        } else if (StringUtils.startsWith(subDir.toString(), "http%3A%2F%2F")){
            .debug("using url-encoded context {} for import of {}"subDirfile);
            try {
                return .createContext(URLDecoder.decode(subDir.toString(), "UTF-8"));
            } catch (UnsupportedEncodingException e) {
                .error("Error url-decoding context name '{}', so using the default one: {}"subDire.getMessage());
                return .getDefaultContext();
            }
        } else {
            final String _c = String.format("%s/%s".getBaseContext().replaceFirst("/$"""), subDir);
            final URI context = .createContext(_c);
            .debug("using context {} based on relative subdir {} for file {}"contextsubDirfile);
            return context;
        }
    }
    private class ImportWatcher extends SimpleTreeWatcher {
        private String dirConfigFileName = null;
        private boolean deleteAfterImport = false;
        private int importDelay = 2500;
        private String lockFile = null;
        private final ScheduledThreadPoolExecutor executor;
        private final Map<PathScheduledFuture<?>> fileSchedules;
        private final Task task;
        public ImportWatcher(Path target) {
            super(targettrue);
             = new ScheduledThreadPoolExecutor(1);
            .setMaximumPoolSize(1);
             = new HashMap<>();
             = .createTask("Import Watch");
            .updateMessage("off");
            .updateDetailMessage(target.toAbsolutePath().toString());
        }
        public void setLockFile(String lockFile) {
            this. = lockFile;
        }
        public void setDirConfigFileName(String configFileName) {
            this. = configFileName;
        }
        public void setDeleteAfterImport(boolean deleteAfterImport) {
            this. = deleteAfterImport;
        }

        
Wait for some time before actually starting the import.

Parameters:
importDelay the delay in milliseconds.
        public void setImportDelay(int importDelay) {
            this. = importDelay;
        }
        @Override
        public void run() {
            .updateMessage("waiting for new files");
            scheduleDirectoryRecursive();
            super.run();
        }
        @Override
        public void shutdown() throws IOException {
            try {
                .updateMessage("shutting down");
                super.shutdown();
                .shutdownNow();
            } finally {
                .endTask();
            }
        }
        @Override
        public void onChildDeleted(final Path parentPath child) {
            // if the lockfile is deleted, import the full directory
            if ( != null && child.endsWith()) {
                scheduleDirectory(parent);
            } else {
                // otherwise remove a potential scheduled import
                final ScheduledFuture<?> scheduled = .remove(child);
                if (scheduled != null) {
                    scheduled.cancel(true);
                    updateQueueSizeMonitor();
                }
            }
        }
        private void scheduleDirectory(Path dir) {
            if (!isLocked(dir)) {
                try {
                    Files.walkFileTree(dir, EnumSet.noneOf(FileVisitOption.class), 1, new SimpleFileVisitor<Path>() {
                        @Override
                        public FileVisitResult visitFile(Path file,
                                BasicFileAttributes attrsthrows IOException {
                            if (!Files.isDirectory(file)) {
                                scheduleFile(file);
                            }
                            return .;
                        }
                    });
                } catch (IOException e) {
                    .warn("Could not schedule directory {} for import: {}"dire.getMessage());
                }
            }
        }
        private boolean isLocked(Path dir) {
            if ( == null) {
                return false;
            } else {
                return Files.exists(dir.resolve());
            }
        }
        private void scheduleFile(final Path file) {
            // ignore directories
            if (Files.isDirectory(file)) {
                .trace("not scheduling directory {}"file);
                return;
            }
            
            // if the dir is locked, do not schedule
            if (isLocked(file.getParent())) {
                .trace("not scheduling {} because {} is locked"filefile.getParent());
                return;
            }
            // do not schedule a config file
            if ( != null && file.endsWith()) {
                .trace("not scheduling {} because it is a config-file"file);
                return;
            }
            // schedule the import
            final ScheduledFuture<?> prevSchedule = .put(file.schedule(new Runnable() {
                @Override
                public void run() {
                    final String threadName = Thread.currentThread().getName();
                    Thread.currentThread().setName(String.format("%sWorker for %s"ImportWatcher.class.getSimpleName(), file));
                    try {
                        .updateMessage("importing " + file);
                        if (importFile(file)) {
                            .remove(file);
                            updateQueueSizeMonitor();
                            if () {
                                Files.delete(file);
                            }
                        }
                    } catch (IOException e) {
                        .warn("Could not delete file {} after successful import: {}"filee.getMessage());
                    } catch (MarmottaImportException e) {
                        .warn("importing {} failed: {}"filee.getMessage());
                    } catch (final Throwable t) {
                        .error("{} during file-import: {}"t.getClass().getSimpleName(), t.getMessage());
                        throw t;
                    } finally {
                        .updateMessage("waiting for new files");
                        Thread.currentThread().setName(threadName);
                    }
                }
            }, .));
            // cancel any previously scheduled import for this file.
            if (prevSchedule != null) {
                prevSchedule.cancel(true);
                .trace("rescheduled {} for import"file);
            } else {
                .trace("scheduled {} for import"file);
            }
            updateQueueSizeMonitor();
        }
        private void updateQueueSizeMonitor() {
            .updateDetailMessage(.getQueue().size() + " files");
        }
        @Override
        public void onFileCreated(Path createdFile) {
            scheduleFile(createdFile);
        }
        @Override
        public void onFileModified(Path modifiedFile) {
            scheduleFile(modifiedFile);
        }
        @Override
        public void onDirectoryCreated(Path createdDir) {
            scheduleDirectoryRecursive(createdDir);
        }
        private void scheduleDirectoryRecursive(Path directory) {
            try {
                Files.walkFileTree(directorynew SimpleFileVisitor<Path> () {
                    @Override
                    public FileVisitResult preVisitDirectory(Path dir,
                            BasicFileAttributes attrsthrows IOException {
                        scheduleDirectory(dir);
                        return .;
                    }
                });
            } catch (IOException e) {
                .warn("Could not schedule directory {} for import: {}"directorye.getMessage());
            }
        }
    }
New to GrepCode? Check out our FAQ X