View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.reducers.collect;
20  import org.apache.giraph.block_app.framework.api.BlockMasterApi;
21  import org.apache.giraph.block_app.framework.api.CreateReducersApi;
22  import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
23  import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
24  import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
25  import org.apache.giraph.block_app.reducers.array.ArrayOfHandles;
26  import org.apache.giraph.function.Supplier;
27  import org.apache.giraph.master.MasterGlobalCommUsage;
28  import org.apache.giraph.reducers.ReduceOperation;
29  import org.apache.giraph.worker.WorkerBroadcastUsage;
30  import org.apache.giraph.writable.kryo.KryoWritableWrapper;
31  import org.apache.giraph.writable.kryo.TransientRandom;
33  /**
34   * Reducing values into a list of reducers, randomly,
35   * and getting the results of all reducers together
36   *
37   * @param <S> Single value type
38   * @param <R> Reduced value type
39   */
40  public abstract class ShardedReducerHandle<S, R>
41      implements ReducerHandle<S, R> {
42    // Use a prime number for number of reducers, large enough to make sure
43    // request sizes are within expected size (0.5MB)
44    protected static final int REDUCER_COUNT = 39989;
46    protected final TransientRandom random = new TransientRandom();
48    protected ArrayOfHandles.ArrayOfReducers<S, KryoWritableWrapper<R>> reducers;
50    public final void register(final CreateReducersApi reduceApi) {
51      reducers = new ArrayOfHandles.ArrayOfReducers<>(REDUCER_COUNT,
52          new Supplier<ReducerHandle<S, KryoWritableWrapper<R>>>() {
53            @Override
54            public ReducerHandle<S, KryoWritableWrapper<R>> get() {
55              return reduceApi.createLocalReducer(createReduceOperation());
56            }
57          });
58    }
60    @Override
61    public final void reduce(S value) {
62      reducers.get(random.nextInt(REDUCER_COUNT)).reduce(value);
63    }
65    @Override
66    public final R getReducedValue(MasterGlobalCommUsage master) {
67      KryoWritableWrapper<R> ret = new KryoWritableWrapper<>(
68          createReduceResult(master));
69      ReduceOperation<S, KryoWritableWrapper<R>> reduceOperation =
70          createReduceOperation();
71      for (int i = 0; i < REDUCER_COUNT; i++) {
72        reduceOperation.reduceMerge(ret,
73            reducers.get(i).getReducedValue(master));
74      }
75      return ret.get();
76    }
78    public abstract ReduceOperation<S, KryoWritableWrapper<R>>
79    createReduceOperation();
81    public R createReduceResult(MasterGlobalCommUsage master) {
82      return createReduceOperation().createInitialValue().get();
83    }
85    public BroadcastHandle<R> createBroadcastHandle(
86        BroadcastArrayHandle<KryoWritableWrapper<R>> broadcasts) {
87      return new ShardedBroadcastHandle(broadcasts);
88    }
90    @Override
91    public final BroadcastHandle<R> broadcastValue(BlockMasterApi masterApi) {
92      return createBroadcastHandle(reducers.broadcastValue(masterApi));
93    }
95    /**
96     * Broadcast for ShardedReducerHandle
97     */
98    public class ShardedBroadcastHandle implements BroadcastHandle<R> {
99      protected final BroadcastArrayHandle<KryoWritableWrapper<R>> broadcasts;
101     public ShardedBroadcastHandle(
102         BroadcastArrayHandle<KryoWritableWrapper<R>> broadcasts) {
103       this.broadcasts = broadcasts;
104     }
106     public R createBroadcastResult(WorkerBroadcastUsage worker) {
107       return createReduceOperation().createInitialValue().get();
108     }
110     @Override
111     public final R getBroadcast(WorkerBroadcastUsage worker) {
112       KryoWritableWrapper<R> ret = new KryoWritableWrapper<>(
113           createBroadcastResult(worker));
114       ReduceOperation<S, KryoWritableWrapper<R>> reduceOperation =
115           createReduceOperation();
116       for (int i = 0; i < REDUCER_COUNT; i++) {
117         reduceOperation.reduceMerge(ret,
118             broadcasts.get(i).getBroadcast(worker));
119       }
120       return ret.get();
121     }
122   }
123 }