View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.framework.internal;
20  import java.util.ArrayList;
21  import java.util.Arrays;
22  import java.util.Objects;
24  import org.apache.giraph.block_app.framework.api.BlockApiHandle;
25  import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
26  import org.apache.giraph.conf.DefaultMessageClasses;
27  import org.apache.giraph.conf.GiraphConstants;
28  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
29  import org.apache.giraph.conf.MessageClasses;
30  import org.apache.giraph.counters.CustomCounter;
31  import org.apache.giraph.counters.CustomCounters;
32  import org.apache.giraph.factories.DefaultMessageValueFactory;
33  import org.apache.giraph.master.MasterCompute;
34  import org.apache.giraph.types.NoMessage;
35  import org.apache.giraph.utils.UnsafeReusableByteArrayInput;
36  import org.apache.giraph.utils.WritableUtils;
37  import org.apache.giraph.worker.WorkerGlobalCommUsage;
38  import org.apache.giraph.writable.kryo.KryoWritableWrapper;
39  import;
40  import;
41  import org.apache.log4j.Logger;
43  /**
44   * Pair of pieces to be executed on workers in a superstep
45   *
46   * @param <S> Execution stage type
47   */
48  @SuppressWarnings({ "rawtypes", "unchecked" })
49  public class BlockWorkerPieces<S> {
50    private static final Logger LOG = Logger.getLogger(BlockWorkerPieces.class);
52    /** Aggregator holding next worker computation */
53    private static final
54    String NEXT_WORKER_PIECES = "giraph.blocks.next_worker_pieces";
56    /** Passed worker stats counter group */
57    private static final String PASSED_WORKER_STATS_GROUP = "PassedWorker Stats";
59    /** Total serialised size counter name */
60    private static final String TOTAL_SERIALISED_SIZE_NAME =
61            "total serialized size";
63    /** Split parts counter name */
64    private static final String SPLIT_PARTS_NAME = "split parts";
66    private final PairedPieceAndStage<S> receiver;
67    private final PairedPieceAndStage<S> sender;
68    private final BlockApiHandle blockApiHandle;
70    public BlockWorkerPieces(
71        PairedPieceAndStage<S> receiver, PairedPieceAndStage<S> sender,
72        BlockApiHandle blockApiHandle) {
73      this.receiver = receiver;
74      this.sender = sender;
75      this.blockApiHandle = blockApiHandle;
76    }
78    public PairedPieceAndStage<S> getReceiver() {
79      return receiver;
80    }
82    public PairedPieceAndStage<S> getSender() {
83      return sender;
84    }
86    public BlockApiHandle getBlockApiHandle() {
87      return blockApiHandle;
88    }
90    public MessageClasses getOutgoingMessageClasses(
91        ImmutableClassesGiraphConfiguration conf) {
92      MessageClasses messageClasses;
93      if (sender == null || sender.getPiece() == null) {
94        messageClasses = new DefaultMessageClasses(
95            NoMessage.class,
96            DefaultMessageValueFactory.class,
97            null,
98            MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION);
99      } else {
100       messageClasses = sender.getPiece().getMessageClasses(conf);
101     }
103     messageClasses.verifyConsistent(conf);
104     return messageClasses;
105   }
107   @Override
108   public String toString() {
109     return "[receiver=" + receiver + ",sender=" + sender + "]";
110   }
112   public String toStringShort() {
113     String receiverString =
114         Objects.toString(receiver != null ? receiver.getPiece() : null);
115     String senderString =
116         Objects.toString(sender != null ? sender.getPiece() : null);
117     if (receiverString.equals(senderString)) {
118       return "[receiver&sender=" + receiverString + "]";
119     } else {
120       return "[receiver=" + receiverString + ",sender=" + senderString + "]";
121     }
122   }
124   /**
125    * Sets which WorkerComputation is going to be executed in the next superstep.
126    */
127   public static <S> void setNextWorkerPieces(
128       MasterCompute master, BlockWorkerPieces<S> nextWorkerPieces) {
129     Writable toBroadcast = new KryoWritableWrapper<>(nextWorkerPieces);
130     byte[] data = WritableUtils.toByteArrayUnsafe(toBroadcast);
132     // TODO: extract splitting logic into common utility
133     int overhead = 4096;
134     int singleSize = Math.max(
135         overhead,
136         GiraphConstants.MAX_MSG_REQUEST_SIZE.get(master.getConf()) - overhead);
138     ArrayList<byte[]> splittedData = new ArrayList<>();
139     if (data.length < singleSize) {
140       splittedData.add(data);
141     } else {
142       for (int start = 0; start < data.length; start += singleSize) {
143         splittedData.add(Arrays.copyOfRange(
144             data, start, Math.min(data.length, start + singleSize)));
145       }
146     }
148"Next worker piece - total serialized size: " + data.length +
149         ", split into " + splittedData.size());
150     CustomCounters.addCustomCounter(PASSED_WORKER_STATS_GROUP,
151             TOTAL_SERIALISED_SIZE_NAME, CustomCounter.Aggregation.SUM);
152     master.getContext().getCounter(PASSED_WORKER_STATS_GROUP,
154         .increment(data.length);
155     CustomCounters.addCustomCounter(PASSED_WORKER_STATS_GROUP,
156             SPLIT_PARTS_NAME, CustomCounter.Aggregation.SUM);
157     master.getContext().getCounter(PASSED_WORKER_STATS_GROUP, SPLIT_PARTS_NAME)
158         .increment(splittedData.size());
160     master.broadcast(NEXT_WORKER_PIECES, new IntWritable(splittedData.size()));
162     for (int i = 0; i < splittedData.size(); i++) {
163       master.broadcast(NEXT_WORKER_PIECES + "_part_" + i,
164           KryoWritableWrapper.wrapIfNeeded(splittedData.get(i)));
165     }
167     master.setOutgoingMessageClasses(
168         nextWorkerPieces.getOutgoingMessageClasses(master.getConf()));
169   }
171   public static <S> BlockWorkerPieces<S> getNextWorkerPieces(
172       WorkerGlobalCommUsage worker) {
173     int splits = worker.<IntWritable>getBroadcast(NEXT_WORKER_PIECES).get();
175     int totalLength = 0;
176     ArrayList<byte[]> splittedData = new ArrayList<>();
177     for (int i = 0; i < splits; i++) {
178       byte[] cur = KryoWritableWrapper.<byte[]>unwrapIfNeeded(
179           worker.getBroadcast(NEXT_WORKER_PIECES + "_part_" + i));
180       splittedData.add(cur);
181       totalLength += cur.length;
182     }
184     byte[] merged;
185     if (splits == 1) {
186       merged = splittedData.get(0);
187     } else {
188       merged = new byte[totalLength];
189       int index = 0;
190       for (int i = 0; i < splits; i++) {
191         System.arraycopy(
192             splittedData.get(i), 0, merged, index, splittedData.get(i).length);
193         index += splittedData.get(i).length;
194       }
195     }
197     KryoWritableWrapper<BlockWorkerPieces<S>> wrapper =
198         new KryoWritableWrapper<>();
199     WritableUtils.fromByteArrayUnsafe(
200         merged, wrapper, new UnsafeReusableByteArrayInput());
201     return wrapper.get();
202   }
203 }