Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.execution;
 
 
 
 import java.net.URI;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import static com.facebook.presto.OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS;
 import static com.facebook.presto.spi.StandardErrorCode.NO_NODES_AVAILABLE;
 import static com.facebook.presto.util.Failures.checkCondition;
 import static com.facebook.presto.util.ImmutableCollectors.toImmutableList;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.base.Predicates.equalTo;
 import static com.google.common.collect.Iterables.any;
 import static com.google.common.collect.Sets.newConcurrentHashSet;
 import static io.airlift.concurrent.MoreFutures.getFutureValue;
 import static io.airlift.http.client.HttpUriBuilder.uriBuilderFrom;
 
 public final class SqlStageExecution
 {
     private final PlanFragment fragment;
     private final Set<PlanNodeIdallSources;
     private final Map<PlanFragmentIdSqlStageExecutionsubStages;
 
     private final Multimap<NodeTaskIdlocalNodeTaskMap = HashMultimap.create();
     private final ConcurrentMap<TaskIdRemoteTasktasks = new ConcurrentHashMap<>();
 
     private final Optional<SplitSourcedataSource;
     private final RemoteTaskFactory remoteTaskFactory;
     private final int splitBatchSize;
 
     private final int initialHashPartitions;
    private final StageStateMachine stateMachine;
    private final Set<PlanNodeIdcompleteSources = newConcurrentHashSet();
    @GuardedBy("this")
    @GuardedBy("this")
    private final ExecutorService executor;
    private final NodeSelector nodeSelector;
    private final NodeTaskMap nodeTaskMap;
    // Note: atomic is needed to assure thread safety between constructor and scheduler thread
    private final AtomicReference<Multimap<PlanNodeIdURI>> exchangeLocations = new AtomicReference<>(ImmutableMultimap.<PlanNodeIdURI>of());
    public SqlStageExecution(QueryId queryId,
            LocationFactory locationFactory,
            StageExecutionPlan plan,
            NodeScheduler nodeScheduler,
            RemoteTaskFactory remoteTaskFactory,
            Session session,
            int splitBatchSize,
            int initialHashPartitions,
            ExecutorService executor,
            NodeTaskMap nodeTaskMap,
            OutputBuffers nextOutputBuffers)
    {
        this(
                queryId,
                new AtomicInteger(),
                locationFactory,
                plan,
                nodeScheduler,
                remoteTaskFactory,
                session,
                splitBatchSize,
                initialHashPartitions,
                executor,
                nodeTaskMap);
        // add a single output buffer
        this. = nextOutputBuffers;
    }
    private SqlStageExecution(
            QueryId queryId,
            AtomicInteger nextStageId,
            LocationFactory locationFactory,
            StageExecutionPlan plan,
            NodeScheduler nodeScheduler,
            RemoteTaskFactory remoteTaskFactory,
            Session session,
            int splitBatchSize,
            int initialHashPartitions,
            ExecutorService executor,
            NodeTaskMap nodeTaskMap)
    {
        checkNotNull(queryId"queryId is null");
        checkNotNull(nextStageId"nextStageId is null");
        checkNotNull(locationFactory"locationFactory is null");
        checkNotNull(plan"plan is null");
        checkNotNull(nodeScheduler"nodeScheduler is null");
        checkNotNull(remoteTaskFactory"remoteTaskFactory is null");
        checkNotNull(session"session is null");
        checkArgument(initialHashPartitions > 0, "initialHashPartitions must be greater than 0");
        checkNotNull(executor"executor is null");
        checkNotNull(nodeTaskMap"nodeTaskMap is null");
        StageId stageId = new StageId(queryId, String.valueOf(nextStageId.getAndIncrement()));
        try (SetThreadName ignored = new SetThreadName("Stage-%s"stageId)) {
            this. = plan.getFragment();
            this. = plan.getDataSource();
            this. = remoteTaskFactory;
            this. = splitBatchSize;
            this. = initialHashPartitions;
            this. = executor;
            this. = Stream.concat(
                    Stream.of(plan.getFragment().getPartitionedSource()),
                    plan.getFragment().getRemoteSourceNodes().stream()
                            .map(RemoteSourceNode::getId))
                    .filter(Objects::nonNull)
                    .collect(Collectors.toSet());
            ImmutableMap.Builder<PlanFragmentIdSqlStageExecutionsubStages = ImmutableMap.builder();
            for (StageExecutionPlan subStagePlan : plan.getSubStages()) {
                PlanFragmentId subStageFragmentId = subStagePlan.getFragment().getId();
                SqlStageExecution subStage = new SqlStageExecution(
                        queryId,
                        nextStageId,
                        locationFactory,
                        subStagePlan,
                        nodeScheduler,
                        remoteTaskFactory,
                        session,
                        splitBatchSize,
                        initialHashPartitions,
                        executor,
                        nodeTaskMap);
                subStages.put(subStageFragmentIdsubStage);
            }
            this. = subStages.build();
            String dataSourceName = .isPresent() ? .get().getDataSourceName() : null;
            this. = nodeScheduler.createNodeSelector(dataSourceName);
            this. = nodeTaskMap;
            this. = new StageStateMachine(stageIdlocationFactory.createStageLocation(stageId), sessionplan.getFragment(), executor);
        }
    }
    public void cancelStage(StageId stageId)
    {
        try (SetThreadName ignored = new SetThreadName("Stage-%s"stageId)) {
            if (stageId.equals(.getStageId())) {
                cancel();
            }
            else {
                for (SqlStageExecution subStage : .values()) {
                    subStage.cancelStage(stageId);
                }
            }
        }
    }
    public StageState getState()
    {
        return .getState();
    }
    public long getTotalMemoryReservation()
    {
        long memory = 0;
        for (RemoteTask task : .values()) {
            memory += task.getTaskInfo().getStats().getMemoryReservation().toBytes();
        }
        for (SqlStageExecution subStage : .values()) {
            memory += subStage.getTotalMemoryReservation();
        }
        return memory;
    }
    public StageInfo getStageInfo()
    {
        return .getStageInfo(
                () -> .values().stream()
                        .map(RemoteTask::getTaskInfo)
                        .collect(toImmutableList()),
                () -> .values().stream()
                        .map(SqlStageExecution::getStageInfo)
                        .collect(toImmutableList()));
    }
    {
        return .values();
    }
    private synchronized void parentTasksAdded(List<TaskIdparentTasksboolean noMoreParentNodes)
    {
        checkNotNull(parentTasks"parentTasks is null");
        // get the current buffers
        OutputBuffers startingOutputBuffers =  != null ?  : ;
        // add new buffers
        OutputBuffers newOutputBuffers;
            ImmutableMap.Builder<TaskIdPagePartitionFunctionnewBuffers = ImmutableMap.builder();
            for (TaskId taskId : parentTasks) {
                newBuffers.put(taskIdnew UnpartitionedPagePartitionFunction());
            }
            newOutputBuffers = startingOutputBuffers.withBuffers(newBuffers.build());
            // no more flag
            if (noMoreParentNodes) {
                newOutputBuffers = newOutputBuffers.withNoMoreBufferIds();
            }
        }
        else if (.getOutputPartitioning() == .) {
            checkArgument(noMoreParentNodes"Hash partitioned output requires all parent nodes be added in a single call");
            ImmutableMap.Builder<TaskIdPagePartitionFunctionbuffers = ImmutableMap.builder();
            for (int nodeIndex = 0; nodeIndex < parentTasks.size(); nodeIndex++) {
                TaskId taskId = parentTasks.get(nodeIndex);
                buffers.put(taskIdnew HashPagePartitionFunction(nodeIndexparentTasks.size(), getPartitioningChannels(), getHashChannel(), .getTypes()));
            }
            newOutputBuffers = startingOutputBuffers
                    .withBuffers(buffers.build())
                    .withNoMoreBufferIds();
        }
        else {
            throw new UnsupportedOperationException("Unsupported output partitioning " + .getOutputPartitioning());
        }
        // only notify scheduler and tasks if the buffers changed
        if (newOutputBuffers.getVersion() != startingOutputBuffers.getVersion()) {
            this. = newOutputBuffers;
            this.notifyAll();
        }
    }
    private synchronized OutputBuffers getCurrentOutputBuffers()
    {
        return ;
    }
    private synchronized OutputBuffers updateToNextOutputBuffers()
    {
        if ( == null) {
            return ;
        }
         = null;
        this.notifyAll();
        return ;
    }
    public void addStateChangeListener(StateChangeListener<StageStatestateChangeListener)
    {
        .addStateChangeListener(stateChangeListener::stateChanged);
    }
    {
        Multimap<PlanNodeIdURIexchangeLocations = this..get();
        ImmutableMultimap.Builder<PlanNodeIdURInewExchangeLocations = ImmutableMultimap.builder();
        for (RemoteSourceNode remoteSourceNode : .getRemoteSourceNodes()) {
            for (PlanFragmentId planFragmentId : remoteSourceNode.getSourceFragmentIds()) {
                SqlStageExecution subStage = .get(planFragmentId);
                checkState(subStage != null"Unknown sub stage %s, known stages %s"planFragmentId.keySet());
                // add new task locations
                for (URI taskLocation : subStage.getTaskLocations()) {
                    if (!exchangeLocations.containsEntry(remoteSourceNode.getId(), taskLocation)) {
                        newExchangeLocations.putAll(remoteSourceNode.getId(), taskLocation);
                    }
                }
            }
        }
        return newExchangeLocations.build();
    }
    private synchronized List<URIgetTaskLocations()
    {
        try (SetThreadName ignored = new SetThreadName("Stage-%s".getStageId())) {
            ImmutableList.Builder<URIlocations = ImmutableList.builder();
            for (RemoteTask task : .values()) {
                locations.add(task.getTaskInfo().getSelf());
            }
            return locations.build();
        }
    }
    public List<RemoteTaskgetAllTasks()
    {
        return ImmutableList.copyOf(.values());
    }
    public List<RemoteTaskgetTasks(Node node)
    {
        return FluentIterable.from(.get(node)).transform(Functions.forMap()).toList();
    }
    public Future<?> start()
    {
        try (SetThreadName ignored = new SetThreadName("Stage-%s".getStageId())) {
            return scheduleStartTasks();
        }
    }
    private Future<?> scheduleStartTasks()
    {
        try (SetThreadName ignored = new SetThreadName("Stage-%s".getStageId())) {
            // start sub-stages (starts bottom-up)
            .values().forEach(SqlStageExecution::scheduleStartTasks);
            return .submit(this::startTasks);
        }
    }
    private void startTasks()
    {
        try (SetThreadName ignored = new SetThreadName("Stage-%s".getStageId())) {
            try {
                checkState(!Thread.holdsLock(this), "Can not start while holding a lock on this");
                // transition to scheduling
                if (!.transitionToScheduling()) {
                    // stage has already been started, has been canceled or has no tasks due to partition pruning
                    return;
                }
                // schedule tasks
                if (.getDistribution() == .) {
                    scheduleFixedNodeCount(1);
                }
                else if (.getDistribution() == .) {
                    scheduleFixedNodeCount();
                }
                else if (.getDistribution() == .) {
                    scheduleSourcePartitionedNodes();
                }
                else if (.getDistribution() == .) {
                    scheduleOnCurrentNode();
                }
                else {
                    throw new IllegalStateException("Unsupported partitioning: " + .getDistribution());
                }
                .transitionToScheduled();
                // add the missing exchanges output buffers
                updateNewExchangesAndBuffers(true);
            }
            catch (Throwable e) {
                if (e instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                }
                if (.transitionToFailed(e)) {
                    throw Throwables.propagate(e);
                }
                // stage is already finished, so only throw if this is an error
                Throwables.propagateIfInstanceOf(eError.class);
            }
            finally {
                doUpdateState();
            }
        }
    }
    private void scheduleFixedNodeCount(int nodeCount)
    {
        // create tasks on "nodeCount" random nodes
        List<Nodenodes = .selectRandomNodes(nodeCount);
        checkCondition(!nodes.isEmpty(), "No worker nodes available");
        ImmutableList.Builder<TaskIdtasks = ImmutableList.builder();
        for (int taskId = 0; taskId < nodes.size(); taskId++) {
            Node node = nodes.get(taskId);
            RemoteTask task = scheduleTask(taskIdnode);
            tasks.add(task.getTaskInfo().getTaskId());
        }
        // tell sub stages about all nodes and that there will not be more nodes
        for (SqlStageExecution subStage : .values()) {
            subStage.parentTasksAdded(tasks.build(), true);
        }
    }
    private void scheduleOnCurrentNode()
    {
        // create task on current node
        Node node = .selectCurrentNode();
        RemoteTask task = scheduleTask(0, node);
        // tell sub stages about all nodes and that there will not be more nodes
        for (SqlStageExecution subStage : .values()) {
            subStage.parentTasksAdded(ImmutableList.of(task.getTaskInfo().getTaskId()), true);
        }
    }
    private void scheduleSourcePartitionedNodes()
            throws InterruptedException
    {
        AtomicInteger nextTaskId = new AtomicInteger(0);
        try (SplitSource splitSource = this..get()) {
            while (!splitSource.isFinished()) {
                // if query has been canceled, exit cleanly; query will never run regardless
                if (getState().isDone()) {
                    break;
                }
                long start = System.nanoTime();
                Set<SplitpendingSplits = ImmutableSet.copyOf(getFutureValue(splitSource.getNextBatch()));
                .recordGetSplitTime(start);
                while (!pendingSplits.isEmpty() && !getState().isDone()) {
                    Multimap<NodeSplitsplitAssignment = .computeAssignments(pendingSplits.values());
                    pendingSplits = ImmutableSet.copyOf(Sets.difference(pendingSplits, ImmutableSet.copyOf(splitAssignment.values())));
                    assignSplits(nextTaskIdsplitAssignment);
                    if (!pendingSplits.isEmpty()) {
                        waitForFreeNode(nextTaskId);
                    }
                }
            }
        }
        for (RemoteTask task : .values()) {
            task.noMoreSplits(.getPartitionedSource());
        }
        // tell sub stages there will be no more output buffers
        setNoMoreStageNodes();
    }
    private void assignSplits(AtomicInteger nextTaskIdMultimap<NodeSplitsplitAssignment)
    {
        for (Entry<NodeCollection<Split>> taskSplits : splitAssignment.asMap().entrySet()) {
            long scheduleSplitStart = System.nanoTime();
            Node node = taskSplits.getKey();
            TaskId taskId = Iterables.getOnlyElement(.get(node), null);
            RemoteTask task = taskId != null ? .get(taskId) : null;
            if (task == null) {
                RemoteTask remoteTask = scheduleTask(nextTaskId.getAndIncrement(), node.getPartitionedSource(), taskSplits.getValue());
                // tell the sub stages to create a buffer for this task
                addStageNode(remoteTask.getTaskInfo().getTaskId());
                .recordScheduleTaskTime(scheduleSplitStart);
            }
            else {
                task.addSplits(.getPartitionedSource(), taskSplits.getValue());
                .recordAddSplit(scheduleSplitStart);
            }
        }
    }
    private void waitForFreeNode(AtomicInteger nextTaskId)
    {
        // if we have sub stages...
        if (!.isEmpty()) {
            // before we block, we need to create all possible output buffers on the sub stages, or they can deadlock
            // waiting for the "noMoreBuffers" call
            .lockDownNodes();
            for (Node node : Sets.difference(new HashSet<>(.allNodes()), .keySet())) {
                RemoteTask remoteTask = scheduleTask(nextTaskId.getAndIncrement(), node);
                // tell the sub stages to create a buffer for this task
                addStageNode(remoteTask.getTaskInfo().getTaskId());
            }
            // tell sub stages there will be no more output buffers
            setNoMoreStageNodes();
        }
        synchronized (this) {
            // otherwise wait for some tasks to complete
            try {
                // todo this adds latency: replace this wait with an event listener
                ..timedWait(this, 100);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw Throwables.propagate(e);
            }
        }
        updateNewExchangesAndBuffers(false);
    }
    private void addStageNode(TaskId task)
    {
        for (SqlStageExecution subStage : .values()) {
            subStage.parentTasksAdded(ImmutableList.of(task), false);
        }
    }
    private void setNoMoreStageNodes()
    {
        for (SqlStageExecution subStage : .values()) {
            subStage.parentTasksAdded(ImmutableList.<TaskId>of(), true);
        }
    }
    private RemoteTask scheduleTask(int idNode node)
    {
        return scheduleTask(idnodenull, ImmutableList.<Split>of());
    }
    private RemoteTask scheduleTask(int idNode nodePlanNodeId sourceIdIterable<? extends SplitsourceSplits)
    {
        // before scheduling a new task update all existing tasks with new exchanges and output buffers
        addNewExchangesAndBuffers();
        TaskId taskId = new TaskId(.getStageId(), String.valueOf(id));
        ImmutableMultimap.Builder<PlanNodeIdSplitinitialSplits = ImmutableMultimap.builder();
        for (Split sourceSplit : sourceSplits) {
            initialSplits.put(sourceIdsourceSplit);
        }
        for (Entry<PlanNodeIdURIentry : .get().entries()) {
            initialSplits.put(entry.getKey(), createRemoteSplitFor(taskIdentry.getValue()));
        }
                taskId,
                node,
                ,
                initialSplits.build(),
                getCurrentOutputBuffers());
        task.addStateChangeListener(taskInfo -> doUpdateState());
        // create and update task
        task.start();
        // record this task
        .put(task.getTaskInfo().getTaskId(), task);
        .put(nodetask.getTaskInfo().getTaskId());
        .addTask(nodetask);
        // check whether the stage finished while we were scheduling this task
        if (.getState().isDone()) {
            task.cancel();
        }
        // update in case task finished before listener was registered
        doUpdateState();
        return task;
    }
    private void updateNewExchangesAndBuffers(boolean waitUntilFinished)
    {
        checkState(!Thread.holdsLock(this), "Can not add exchanges or buffers to tasks while holding a lock on this");
        while (!getState().isDone()) {
            boolean finished = addNewExchangesAndBuffers();
            if (finished || !waitUntilFinished) {
                return;
            }
            synchronized (this) {
                // wait for a state change
                //
                // NOTE this must be a wait with a timeout since there is no notification
                // for new exchanges from the child stages
                try {
                    ..timedWait(this, 100);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw Throwables.propagate(e);
                }
            }
        }
    }
    private boolean addNewExchangesAndBuffers()
    {
        // get new exchanges and update exchange state
        Set<PlanNodeIdcompleteSources = updateCompleteSources();
        boolean allSourceComplete = completeSources.containsAll();
        Multimap<PlanNodeIdURInewExchangeLocations = getNewExchangeLocations();
        .set(ImmutableMultimap.<PlanNodeIdURI>builder()
                .putAll(.get())
                .putAll(newExchangeLocations)
                .build());
        // get new output buffer and update output buffer state
        OutputBuffers outputBuffers = updateToNextOutputBuffers();
        // finished state must be decided before update to avoid race conditions
        boolean finished = allSourceComplete && outputBuffers.isNoMoreBufferIds();
        // update tasks
        try (SetThreadName ignored = new SetThreadName("SqlStageExecution-%s".getStageId())) {
            for (RemoteTask task : .values()) {
                for (Entry<PlanNodeIdURIentry : newExchangeLocations.entries()) {
                    Split remoteSplit = createRemoteSplitFor(task.getTaskInfo().getTaskId(), entry.getValue());
                    task.addSplits(entry.getKey(), ImmutableList.of(remoteSplit));
                }
                task.setOutputBuffers(outputBuffers);
                completeSources.forEach(task::noMoreSplits);
            }
        }
        return finished;
    }
    {
        for (RemoteSourceNode remoteSourceNode : .getRemoteSourceNodes()) {
            if (!.contains(remoteSourceNode.getId())) {
                boolean exchangeFinished = true;
                for (PlanFragmentId planFragmentId : remoteSourceNode.getSourceFragmentIds()) {
                    SqlStageExecution subStage = .get(planFragmentId);
                    switch (subStage.getState()) {
                        case :
                        case :
                            exchangeFinished = false;
                            break;
                    }
                }
                if (exchangeFinished) {
                    .add(remoteSourceNode.getId());
                }
            }
        }
        return ;
    }
    @SuppressWarnings("NakedNotify")
    private void doUpdateState()
    {
        checkState(!Thread.holdsLock(this), "Can not doUpdateState while holding a lock on this");
        try (SetThreadName ignored = new SetThreadName("Stage-%s".getStageId())) {
            synchronized (this) {
                // wake up worker thread waiting for state changes
                this.notifyAll();
                StageState initialState = getState();
                if (initialState.isDone()) {
                    return;
                }
                List<TaskInfotaskInfos = .values().stream()
                        .map(RemoteTask::getTaskInfo)
                        .collect(toImmutableList());
                List<TaskStatetaskStates = taskInfos.stream()
                        .map(TaskInfo::getState)
                        .collect(toImmutableList());
                if (any(taskStatesequalTo(.))) {
                    RuntimeException failure = taskInfos.stream()
                            .map(taskInfo -> Iterables.getFirst(taskInfo.getFailures(), null))
                            .filter(Objects::nonNull)
                            .findFirst()
                            .map(ExecutionFailureInfo::toException)
                            .orElse(new PrestoException(."A task failed for an unknown reason"));
                    .transitionToFailed(failure);
                }
                else if (taskStates.stream().anyMatch(.::equals)) {
                    // A task should only be in the aborted state if the STAGE is done (ABORTED or FAILED)
                    .transitionToFailed(new PrestoException(."A task is in the ABORTED state but stage is " + initialState));
                }
                else if (initialState != . && initialState != .) {
                    // all tasks are now scheduled, so we can check the finished state
                    if (taskStates.stream().allMatch(TaskState::isDone)) {
                        .transitionToFinished();
                    }
                    else if (taskStates.stream().anyMatch(.::equals)) {
                        .transitionToRunning();
                    }
                }
            }
            // if this stage is now finished, cancel all work
            if (getState().isDone()) {
                cancel();
            }
        }
    }
    public void cancel()
    {
        checkState(!Thread.holdsLock(this), "Can not cancel while holding a lock on this");
        try (SetThreadName ignored = new SetThreadName("Stage-%s".getStageId())) {
            // check if the stage already completed naturally
            doUpdateState();
            .transitionToCanceled();
            // cancel all tasks
            .values().forEach(RemoteTask::cancel);
            // propagate cancel to sub-stages
            .values().forEach(SqlStageExecution::cancel);
        }
    }
    public void abort()
    {
        checkState(!Thread.holdsLock(this), "Can not abort while holding a lock on this");
        try (SetThreadName ignored = new SetThreadName("Stage-%s".getStageId())) {
            // transition to aborted state, only if not already finished
            doUpdateState();
            .transitionToAborted();
            // abort all tasks
            .values().forEach(RemoteTask::abort);
            // propagate abort to sub-stages
            .values().forEach(SqlStageExecution::abort);
        }
    }
    private static Split createRemoteSplitFor(TaskId taskIdURI taskLocation)
    {
        URI splitLocation = uriBuilderFrom(taskLocation).appendPath("results").appendPath(taskId.toString()).build();
        return new Split("remote"new RemoteSplit(splitLocation));
    }
    @Override
    public String toString()
    {
        return .toString();
    }
    private static Optional<IntegergetHashChannel(PlanFragment fragment)
    {
        return fragment.getHash().map(symbol -> fragment.getOutputLayout().indexOf(symbol));
    }
    private static List<IntegergetPartitioningChannels(PlanFragment fragment)
    {
        checkState(fragment.getOutputPartitioning() == ."fragment is not hash partitioned");
        // We can convert the symbols directly into channels, because the root must be a sink and therefore the layout is fixed
        return fragment.getPartitionBy().stream()
                .map(symbol -> fragment.getOutputLayout().indexOf(symbol))
                .collect(toImmutableList());
    }
New to GrepCode? Check out our FAQ X