1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.ooc.data;
20
21 import com.google.common.collect.Maps;
22 import com.google.common.collect.Sets;
23 import com.google.common.util.concurrent.AtomicDouble;
24 import org.apache.giraph.bsp.BspService;
25 import org.apache.giraph.ooc.OutOfCoreEngine;
26 import org.apache.giraph.worker.BspServiceWorker;
27 import org.apache.giraph.worker.WorkerProgress;
28 import org.apache.log4j.Logger;
29
30 import java.util.ArrayList;
31 import java.util.Iterator;
32 import java.util.List;
33 import java.util.Random;
34 import java.util.Set;
35 import java.util.concurrent.ConcurrentMap;
36 import java.util.concurrent.atomic.AtomicInteger;
37
38 import static com.google.common.base.Preconditions.checkState;
39
40
41
42
43
44 public class MetaPartitionManager {
45
46
47
48
49 public static final int NO_PARTITION_TO_PROCESS = -1;
50
51
52 private static final Logger LOG =
53 Logger.getLogger(MetaPartitionManager.class);
54
55 private enum StorageState { IN_MEM, ON_DISK, IN_TRANSIT };
56
57
58
59
60 private enum PartitionStorageState
61
62
63
64
65 { FULLY_IN_MEM, PARTIALLY_IN_MEM, FULLY_ON_DISK };
66
67
68
69
70 private enum ProcessingState { PROCESSED, UNPROCESSED, IN_PROCESS };
71
72
73
74
75 private final AtomicInteger numInMemoryPartitions = new AtomicInteger(0);
76
77
78
79
80 private final AtomicInteger numPartiallyInMemoryPartitions =
81 new AtomicInteger(0);
82
83 private final ConcurrentMap<Integer, MetaPartition> partitions =
84 Maps.newConcurrentMap();
85
86 private final List<MetaPartitionDictionary> perThreadPartitionDictionary;
87
88
89
90 private final List<Set<Integer>> perThreadVertexEdgeBuffers;
91
92
93
94
95 private final List<Set<Integer>> perThreadMessageBuffers;
96
97
98
99 private final OutOfCoreEngine oocEngine;
100
101
102
103
104 private final AtomicInteger numPartitionsProcessed = new AtomicInteger(0);
105
106
107
108
109 private final Random randomGenerator;
110
111
112
113
114
115
116
117
118 private final AtomicDouble lowestGraphFractionInMemory =
119 new AtomicDouble(1);
120
121
122
123
124
125
126
127 private final ConcurrentMap<Integer, Integer> partitionIndex =
128 Maps.newConcurrentMap();
129
130
131
132 private final AtomicInteger indexCounter = new AtomicInteger(0);
133
134 private final int numIOThreads;
135
136
137
138
139
140
141
142 public MetaPartitionManager(int numIOThreads, OutOfCoreEngine oocEngine) {
143 perThreadPartitionDictionary = new ArrayList<>(numIOThreads);
144 perThreadVertexEdgeBuffers = new ArrayList<>(numIOThreads);
145 perThreadMessageBuffers = new ArrayList<>(numIOThreads);
146 for (int i = 0; i < numIOThreads; ++i) {
147 perThreadPartitionDictionary.add(new MetaPartitionDictionary());
148 perThreadMessageBuffers.add(Sets.<Integer>newConcurrentHashSet());
149 perThreadVertexEdgeBuffers.add(Sets.<Integer>newConcurrentHashSet());
150 }
151 this.oocEngine = oocEngine;
152 this.randomGenerator = new Random();
153 this.numIOThreads = numIOThreads;
154 }
155
156
157
158
159 public int getNumInMemoryPartitions() {
160 return numInMemoryPartitions.get();
161 }
162
163
164
165
166 public int getNumPartiallyInMemoryPartitions() {
167 return numPartiallyInMemoryPartitions.get();
168 }
169
170
171
172
173
174
175 public int getNumPartitions() {
176 return partitions.size();
177 }
178
179
180
181
182
183
184
185
186 public double getGraphFractionInMemory() {
187 return (getNumInMemoryPartitions() +
188 getNumPartiallyInMemoryPartitions() / 2.0) / getNumPartitions();
189 }
190
191
192
193
194
195 private synchronized void updateGraphFractionInMemory() {
196 double graphInMemory = getGraphFractionInMemory();
197 if (graphInMemory < lowestGraphFractionInMemory.get()) {
198 lowestGraphFractionInMemory.set(graphInMemory);
199 WorkerProgress.get().updateLowestGraphPercentageInMemory(
200 (int) (graphInMemory * 100));
201 }
202 }
203
204
205
206
207
208
209
210
211
212
213
214 private void updateCounters(PartitionStorageState stateBefore,
215 PartitionStorageState stateAfter) {
216 numInMemoryPartitions.getAndAdd(
217 ((stateAfter == PartitionStorageState.FULLY_IN_MEM) ? 1 : 0) -
218 ((stateBefore == PartitionStorageState.FULLY_IN_MEM) ? 1 : 0));
219 numPartiallyInMemoryPartitions.getAndAdd(
220 ((stateAfter == PartitionStorageState.PARTIALLY_IN_MEM) ? 1 : 0) -
221 ((stateBefore == PartitionStorageState.PARTIALLY_IN_MEM) ? 1 : 0));
222 }
223
224
225
226
227
228
229
230 public boolean hasPartition(Integer partitionId) {
231 return partitions.containsKey(partitionId);
232 }
233
234
235
236
237
238
239 public Iterable<Integer> getPartitionIds() {
240 return partitions.keySet();
241 }
242
243
244
245
246
247
248
249 public int getOwnerThreadId(int partitionId) {
250 Integer index = partitionIndex.get(partitionId);
251 checkState(index != null);
252 return index % numIOThreads;
253 }
254
255
256
257
258
259
260 public void addPartition(int partitionId) {
261 MetaPartition meta = new MetaPartition(partitionId);
262 MetaPartition temp = partitions.putIfAbsent(partitionId, meta);
263
264 if (temp == null) {
265 int index = indexCounter.getAndIncrement();
266 checkState(partitionIndex.putIfAbsent(partitionId, index) == null);
267 int ownerThread = getOwnerThreadId(partitionId);
268 perThreadPartitionDictionary.get(ownerThread).addPartition(meta);
269 numInMemoryPartitions.getAndIncrement();
270 }
271 }
272
273
274
275
276
277
278
279 public void removePartition(Integer partitionId) {
280 MetaPartition meta = partitions.remove(partitionId);
281 int ownerThread = getOwnerThreadId(partitionId);
282 perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
283 checkState(!meta.isOnDisk());
284 numInMemoryPartitions.getAndDecrement();
285 }
286
287
288
289
290
291
292
293
294 private static <T> T popFromSet(Set<T> set) {
295 if (!set.isEmpty()) {
296 Iterator<T> it = set.iterator();
297 T entry = it.next();
298 it.remove();
299 return entry;
300 }
301 return null;
302 }
303
304
305
306
307
308
309
310
311 private static <T> T peekFromSet(Set<T> set) {
312 if (!set.isEmpty()) {
313 return set.iterator().next();
314 }
315 return null;
316 }
317
318
319
320
321
322
323
324
325
326
327 public Integer getOffloadPartitionId(int threadId) {
328
329 MetaPartition meta = perThreadPartitionDictionary.get(threadId).lookup(
330 ProcessingState.PROCESSED,
331 StorageState.IN_MEM,
332 StorageState.ON_DISK,
333 null);
334 if (meta != null) {
335 return meta.getPartitionId();
336 }
337 meta = perThreadPartitionDictionary.get(threadId).lookup(
338 ProcessingState.PROCESSED,
339 StorageState.ON_DISK,
340 StorageState.IN_MEM,
341 null);
342 if (meta != null) {
343 return meta.getPartitionId();
344 }
345
346 meta = perThreadPartitionDictionary.get(threadId).lookup(
347 ProcessingState.PROCESSED,
348 StorageState.IN_MEM,
349 StorageState.IN_MEM,
350 null);
351 if (meta != null) {
352 return meta.getPartitionId();
353 }
354
355
356 meta = perThreadPartitionDictionary.get(threadId).lookup(
357 ProcessingState.UNPROCESSED,
358 StorageState.IN_MEM,
359 StorageState.ON_DISK,
360 null);
361 if (meta != null) {
362 return meta.getPartitionId();
363 }
364 meta = perThreadPartitionDictionary.get(threadId).lookup(
365 ProcessingState.UNPROCESSED,
366 StorageState.ON_DISK,
367 StorageState.IN_MEM,
368 null);
369 if (meta != null) {
370 return meta.getPartitionId();
371 }
372
373 meta = perThreadPartitionDictionary.get(threadId).lookup(
374 ProcessingState.UNPROCESSED,
375 StorageState.IN_MEM,
376 StorageState.IN_MEM,
377 null);
378 if (meta != null) {
379 return meta.getPartitionId();
380 }
381 return null;
382 }
383
384
385
386
387
388
389
390 public Integer getOffloadPartitionBufferId(int threadId) {
391 if (oocEngine.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP) {
392 Integer partitionId =
393 popFromSet(perThreadVertexEdgeBuffers.get(threadId));
394 if (partitionId == null) {
395 DiskBackedPartitionStore<?, ?, ?> partitionStore =
396 (DiskBackedPartitionStore<?, ?, ?>) (oocEngine.getServerData()
397 .getPartitionStore());
398 perThreadVertexEdgeBuffers.get(threadId)
399 .addAll(partitionStore.getCandidateBuffersToOffload(threadId));
400 DiskBackedEdgeStore<?, ?, ?> edgeStore =
401 (DiskBackedEdgeStore<?, ?, ?>) (oocEngine.getServerData())
402 .getEdgeStore();
403 perThreadVertexEdgeBuffers.get(threadId)
404 .addAll(edgeStore.getCandidateBuffersToOffload(threadId));
405 partitionId = popFromSet(perThreadVertexEdgeBuffers.get(threadId));
406 }
407 return partitionId;
408 }
409 return null;
410 }
411
412
413
414
415
416
417
418 public Integer getOffloadMessageBufferId(int threadId) {
419 if (oocEngine.getSuperstep() != BspServiceWorker.INPUT_SUPERSTEP) {
420 Integer partitionId =
421 popFromSet(perThreadMessageBuffers.get(threadId));
422 if (partitionId == null) {
423 DiskBackedMessageStore<?, ?> messageStore =
424 (DiskBackedMessageStore<?, ?>) (oocEngine.getServerData()
425 .getIncomingMessageStore());
426 if (messageStore != null) {
427 perThreadMessageBuffers.get(threadId)
428 .addAll(messageStore.getCandidateBuffersToOffload(threadId));
429 partitionId = popFromSet(perThreadMessageBuffers.get(threadId));
430 }
431 }
432 return partitionId;
433 }
434 return null;
435 }
436
437
438
439
440
441
442
443
444
445
446
447
448 public Integer getOffloadMessageId(int threadId) {
449 if (oocEngine.getSuperstep() == BspService.INPUT_SUPERSTEP) {
450 return null;
451 }
452 MetaPartition meta = perThreadPartitionDictionary.get(threadId).lookup(
453 ProcessingState.PROCESSED,
454 StorageState.ON_DISK,
455 null,
456 StorageState.IN_MEM);
457 if (meta != null) {
458 return meta.getPartitionId();
459 }
460 meta = perThreadPartitionDictionary.get(threadId).lookup(
461 ProcessingState.PROCESSED,
462 StorageState.IN_TRANSIT,
463 null,
464 StorageState.IN_MEM);
465 if (meta != null) {
466 return meta.getPartitionId();
467 }
468 meta = perThreadPartitionDictionary.get(threadId).lookup(
469 ProcessingState.UNPROCESSED,
470 StorageState.ON_DISK,
471 null,
472 StorageState.IN_MEM);
473 if (meta != null) {
474 return meta.getPartitionId();
475 }
476 meta = perThreadPartitionDictionary.get(threadId).lookup(
477 ProcessingState.UNPROCESSED,
478 StorageState.IN_TRANSIT,
479 null,
480 StorageState.IN_MEM);
481 if (meta != null) {
482 return meta.getPartitionId();
483 }
484 return null;
485 }
486
487
488
489
490
491
492
493
494
495 public Integer getLoadPartitionId(int threadId) {
496
497 MetaPartition meta = perThreadPartitionDictionary.get(threadId).lookup(
498 ProcessingState.UNPROCESSED,
499 StorageState.IN_MEM,
500 StorageState.ON_DISK,
501 null);
502 if (meta != null) {
503 return meta.getPartitionId();
504 }
505
506 meta = perThreadPartitionDictionary.get(threadId).lookup(
507 ProcessingState.UNPROCESSED,
508 StorageState.ON_DISK,
509 StorageState.IN_MEM,
510 null);
511 if (meta != null) {
512 return meta.getPartitionId();
513 }
514
515
516 meta = perThreadPartitionDictionary.get(threadId).lookup(
517 ProcessingState.UNPROCESSED,
518 StorageState.ON_DISK,
519 StorageState.ON_DISK,
520 null);
521 if (meta != null) {
522 return meta.getPartitionId();
523 }
524
525
526 meta = perThreadPartitionDictionary.get(threadId).lookup(
527 ProcessingState.PROCESSED,
528 StorageState.IN_MEM,
529 null,
530 StorageState.ON_DISK);
531 if (meta != null) {
532 return meta.getPartitionId();
533 }
534
535 meta = perThreadPartitionDictionary.get(threadId).lookup(
536 ProcessingState.PROCESSED,
537 StorageState.ON_DISK,
538 null,
539 StorageState.IN_MEM);
540 if (meta != null) {
541 return meta.getPartitionId();
542 }
543
544 meta = perThreadPartitionDictionary.get(threadId).lookup(
545 ProcessingState.PROCESSED,
546 StorageState.ON_DISK,
547 null,
548 StorageState.ON_DISK);
549 if (meta != null) {
550 return meta.getPartitionId();
551 }
552
553 return null;
554 }
555
556
557
558
559
560
561 public void markPartitionAsInProcess(int partitionId) {
562 MetaPartition meta = partitions.get(partitionId);
563 int ownerThread = getOwnerThreadId(partitionId);
564 synchronized (meta) {
565 perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
566 meta.setProcessingState(ProcessingState.IN_PROCESS);
567 perThreadPartitionDictionary.get(ownerThread).addPartition(meta);
568 }
569 }
570
571
572
573
574
575
576
577 public boolean hasProcessedOnMemory() {
578 for (MetaPartitionDictionary dictionary : perThreadPartitionDictionary) {
579 if (dictionary.hasProcessedOnMemory()) {
580 return true;
581 }
582 }
583 return false;
584 }
585
586
587
588
589
590
591
592
593 public boolean isPartitionProcessed(Integer partitionId) {
594 MetaPartition meta = partitions.get(partitionId);
595 synchronized (meta) {
596 return meta.getProcessingState() == ProcessingState.PROCESSED;
597 }
598 }
599
600
601
602
603
604
605 public void setPartitionIsProcessed(int partitionId) {
606 MetaPartition meta = partitions.get(partitionId);
607 int ownerThread = getOwnerThreadId(partitionId);
608 synchronized (meta) {
609 perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
610 meta.setProcessingState(ProcessingState.PROCESSED);
611 perThreadPartitionDictionary.get(ownerThread).addPartition(meta);
612 }
613 numPartitionsProcessed.getAndIncrement();
614 }
615
616
617
618
619
620
621
622
623
624 public boolean startLoadingPartition(int partitionId, long superstep) {
625 MetaPartition meta = partitions.get(partitionId);
626 synchronized (meta) {
627 boolean shouldLoad = meta.getPartitionState() == StorageState.ON_DISK;
628 if (superstep == oocEngine.getSuperstep()) {
629 shouldLoad |= meta.getCurrentMessagesState() == StorageState.ON_DISK;
630 } else {
631 shouldLoad |= meta.getIncomingMessagesState() == StorageState.ON_DISK;
632 }
633 return shouldLoad;
634 }
635 }
636
637
638
639
640
641
642
643
644 public void doneLoadingPartition(int partitionId, long superstep) {
645 MetaPartition meta = partitions.get(partitionId);
646 int owner = getOwnerThreadId(partitionId);
647 synchronized (meta) {
648 PartitionStorageState stateBefore = meta.getPartitionStorageState();
649 perThreadPartitionDictionary.get(owner).removePartition(meta);
650 meta.setPartitionState(StorageState.IN_MEM);
651 if (superstep == oocEngine.getSuperstep()) {
652 meta.setCurrentMessagesState(StorageState.IN_MEM);
653 } else {
654 meta.setIncomingMessagesState(StorageState.IN_MEM);
655 }
656 PartitionStorageState stateAfter = meta.getPartitionStorageState();
657 updateCounters(stateBefore, stateAfter);
658
659
660 if (meta.getProcessingState() == ProcessingState.PROCESSED) {
661 perThreadPartitionDictionary.get(owner).increaseNumPrefetch();
662 }
663 perThreadPartitionDictionary.get(owner).addPartition(meta);
664 }
665 updateGraphFractionInMemory();
666 }
667
668
669
670
671
672
673
674
675 public boolean startOffloadingMessages(int partitionId) {
676 MetaPartition meta = partitions.get(partitionId);
677 int ownerThread = getOwnerThreadId(partitionId);
678 synchronized (meta) {
679 if (meta.getIncomingMessagesState() == StorageState.IN_MEM) {
680 perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
681 meta.setIncomingMessagesState(StorageState.IN_TRANSIT);
682 perThreadPartitionDictionary.get(ownerThread).addPartition(meta);
683 return true;
684 } else {
685 return false;
686 }
687 }
688 }
689
690
691
692
693
694
695
696
697 public void doneOffloadingMessages(int partitionId) {
698 MetaPartition meta = partitions.get(partitionId);
699 int ownerThread = getOwnerThreadId(partitionId);
700 synchronized (meta) {
701 perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
702 meta.setIncomingMessagesState(StorageState.ON_DISK);
703 perThreadPartitionDictionary.get(ownerThread).addPartition(meta);
704 }
705 }
706
707
708
709
710
711
712
713
714 public boolean startOffloadingBuffer(int partitionId) {
715
716 return true;
717 }
718
719
720
721
722
723
724
725 public void doneOffloadingBuffer(int partitionId) {
726
727 }
728
729
730
731
732
733
734
735
736 public boolean startOffloadingPartition(int partitionId) {
737 MetaPartition meta = partitions.get(partitionId);
738 int owner = getOwnerThreadId(partitionId);
739 synchronized (meta) {
740 if (meta.getProcessingState() != ProcessingState.IN_PROCESS &&
741 (meta.getPartitionState() == StorageState.IN_MEM ||
742 meta.getCurrentMessagesState() == StorageState.IN_MEM)) {
743 perThreadPartitionDictionary.get(owner).removePartition(meta);
744
745
746
747
748 if (meta.getPartitionState() != StorageState.ON_DISK) {
749 meta.setPartitionState(StorageState.IN_TRANSIT);
750 }
751 if (meta.getCurrentMessagesState() != StorageState.ON_DISK) {
752 meta.setCurrentMessagesState(StorageState.IN_TRANSIT);
753 }
754 perThreadPartitionDictionary.get(owner).addPartition(meta);
755 return true;
756 } else {
757 return false;
758 }
759 }
760 }
761
762
763
764
765
766
767
768 public void doneOffloadingPartition(int partitionId) {
769 MetaPartition meta = partitions.get(partitionId);
770 int owner = getOwnerThreadId(partitionId);
771 synchronized (meta) {
772
773
774 if (meta.getCurrentMessagesState() == StorageState.IN_TRANSIT &&
775 meta.getPartitionState() == StorageState.IN_TRANSIT) {
776 numInMemoryPartitions.getAndDecrement();
777 } else {
778 numPartiallyInMemoryPartitions.getAndDecrement();
779 }
780 perThreadPartitionDictionary.get(owner).removePartition(meta);
781 meta.setPartitionState(StorageState.ON_DISK);
782 meta.setCurrentMessagesState(StorageState.ON_DISK);
783 perThreadPartitionDictionary.get(owner).addPartition(meta);
784 }
785 updateGraphFractionInMemory();
786 }
787
788
789
790
791
792 public void resetPartitions() {
793 for (MetaPartition meta : partitions.values()) {
794 int owner = getOwnerThreadId(meta.getPartitionId());
795 perThreadPartitionDictionary.get(owner).removePartition(meta);
796 meta.resetPartition();
797 perThreadPartitionDictionary.get(owner).addPartition(meta);
798 }
799 for (MetaPartitionDictionary dictionary : perThreadPartitionDictionary) {
800 dictionary.reset();
801 }
802 numPartitionsProcessed.set(0);
803 }
804
805
806
807
808
809 public void resetMessages() {
810 for (MetaPartition meta : partitions.values()) {
811 int owner = getOwnerThreadId(meta.getPartitionId());
812 perThreadPartitionDictionary.get(owner).removePartition(meta);
813 PartitionStorageState stateBefore = meta.getPartitionStorageState();
814 meta.resetMessages();
815 PartitionStorageState stateAfter = meta.getPartitionStorageState();
816 updateCounters(stateBefore, stateAfter);
817 perThreadPartitionDictionary.get(owner).addPartition(meta);
818 }
819 }
820
821
822
823
824
825
826
827
828 public Integer getNextPartition() {
829 if (numPartitionsProcessed.get() >= partitions.size()) {
830 return NO_PARTITION_TO_PROCESS;
831 }
832 int numThreads = perThreadPartitionDictionary.size();
833 int index = randomGenerator.nextInt(numThreads);
834 int startIndex = index;
835 MetaPartition meta;
836 do {
837
838
839
840
841
842
843 while (true) {
844 meta = perThreadPartitionDictionary.get(index).lookup(
845 ProcessingState.UNPROCESSED,
846 StorageState.IN_MEM,
847 StorageState.IN_MEM,
848 null);
849 if (meta != null) {
850
851
852
853
854 synchronized (meta) {
855 if (meta.getProcessingState() == ProcessingState.UNPROCESSED &&
856 meta.getPartitionState() == StorageState.IN_MEM &&
857 meta.getCurrentMessagesState() == StorageState.IN_MEM) {
858 perThreadPartitionDictionary.get(index).removePartition(meta);
859 meta.setProcessingState(ProcessingState.IN_PROCESS);
860 perThreadPartitionDictionary.get(index).addPartition(meta);
861 return meta.getPartitionId();
862 }
863 }
864 } else {
865 break;
866 }
867 }
868 index = (index + 1) % numThreads;
869 } while (index != startIndex);
870 return null;
871 }
872
873
874
875
876
877
878
879
880 public boolean isPartitionOnDisk(int partitionId) {
881 MetaPartition meta = partitions.get(partitionId);
882 synchronized (meta) {
883 return meta.isOnDisk();
884 }
885 }
886
887
888
889
890 private static class MetaPartition {
891
892 private int partitionId;
893
894 private StorageState incomingMessagesState;
895
896 private StorageState currentMessagesState;
897
898 private StorageState partitionState;
899
900 private ProcessingState processingState;
901
902
903
904
905
906
907 public MetaPartition(int partitionId) {
908 this.partitionId = partitionId;
909 this.processingState = ProcessingState.UNPROCESSED;
910 this.partitionState = StorageState.IN_MEM;
911 this.currentMessagesState = StorageState.IN_MEM;
912 this.incomingMessagesState = StorageState.IN_MEM;
913 }
914
915 @Override
916 public String toString() {
917 StringBuffer sb = new StringBuffer();
918 sb.append("\nMetaData: {");
919 sb.append("ID: " + partitionId + "; ");
920 sb.append("Partition: " + partitionState + "; ");
921 sb.append("Current Messages: " + currentMessagesState + "; ");
922 sb.append("Incoming Messages: " + incomingMessagesState + "; ");
923 sb.append("Processed? : " + processingState + "}");
924 return sb.toString();
925 }
926
927 public int getPartitionId() {
928 return partitionId;
929 }
930
931 public StorageState getIncomingMessagesState() {
932 return incomingMessagesState;
933 }
934
935 public void setIncomingMessagesState(StorageState incomingMessagesState) {
936 this.incomingMessagesState = incomingMessagesState;
937 }
938
939 public StorageState getCurrentMessagesState() {
940 return currentMessagesState;
941 }
942
943 public void setCurrentMessagesState(StorageState currentMessagesState) {
944 this.currentMessagesState = currentMessagesState;
945 }
946
947 public StorageState getPartitionState() {
948 return partitionState;
949 }
950
951 public void setPartitionState(StorageState state) {
952 this.partitionState = state;
953 }
954
955 public ProcessingState getProcessingState() {
956 return processingState;
957 }
958
959 public void setProcessingState(ProcessingState processingState) {
960 this.processingState = processingState;
961 }
962
963
964
965
966
967
968
969 public boolean isOnDisk() {
970 return partitionState == StorageState.ON_DISK ||
971 currentMessagesState == StorageState.ON_DISK;
972 }
973
974
975
976
977 public void resetPartition() {
978 processingState = ProcessingState.UNPROCESSED;
979 }
980
981
982
983
984 public void resetMessages() {
985 currentMessagesState = incomingMessagesState;
986 incomingMessagesState = StorageState.IN_MEM;
987 }
988
989
990
991
992 public PartitionStorageState getPartitionStorageState() {
993 if (partitionState == StorageState.ON_DISK &&
994 currentMessagesState == StorageState.ON_DISK) {
995 return PartitionStorageState.FULLY_ON_DISK;
996 } else if (partitionState == StorageState.IN_MEM &&
997 currentMessagesState == StorageState.IN_MEM) {
998 return PartitionStorageState.FULLY_IN_MEM;
999 } else {
1000 return PartitionStorageState.PARTIALLY_IN_MEM;
1001 }
1002 }
1003 }
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013 private static class MetaPartitionDictionary {
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024 private final Set<MetaPartition>[][][][] partitions =
1025 (Set<MetaPartition>[][][][]) new Set<?>[3][3][3][3];
1026
1027
1028
1029
1030 private final AtomicInteger numPrefetch = new AtomicInteger(0);
1031
1032
1033
1034
1035 public MetaPartitionDictionary() {
1036 for (int i = 0; i < 3; ++i) {
1037 for (int j = 0; j < 3; ++j) {
1038 for (int k = 0; k < 3; ++k) {
1039 for (int t = 0; t < 3; ++t) {
1040 partitions[i][j][k][t] = Sets.newLinkedHashSet();
1041 }
1042 }
1043 }
1044 }
1045 }
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055 private Set<MetaPartition> getSet(MetaPartition meta) {
1056 return partitions[meta.getProcessingState().ordinal()]
1057 [meta.getPartitionState().ordinal()]
1058 [meta.getCurrentMessagesState().ordinal()]
1059 [meta.getIncomingMessagesState().ordinal()];
1060 }
1061
1062
1063
1064
1065
1066
1067 public void addPartition(MetaPartition meta) {
1068 Set<MetaPartition> partitionSet = getSet(meta);
1069 synchronized (partitionSet) {
1070 partitionSet.add(meta);
1071 }
1072 }
1073
1074
1075
1076
1077
1078
1079 public void removePartition(MetaPartition meta) {
1080 Set<MetaPartition> partitionSet = getSet(meta);
1081 synchronized (partitionSet) {
1082 partitionSet.remove(meta);
1083 }
1084 }
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097 public MetaPartition lookup(ProcessingState processingState,
1098 StorageState partitionStorageState,
1099 StorageState currentMessagesState,
1100 StorageState incomingMessagesState) {
1101 int iStart =
1102 (processingState == null) ? 0 : processingState.ordinal();
1103 int iEnd =
1104 (processingState == null) ? 3 : (processingState.ordinal() + 1);
1105 int jStart =
1106 (partitionStorageState == null) ? 0 : partitionStorageState.ordinal();
1107 int jEnd = (partitionStorageState == null) ? 3 :
1108 (partitionStorageState.ordinal() + 1);
1109 int kStart =
1110 (currentMessagesState == null) ? 0 : currentMessagesState.ordinal();
1111 int kEnd = (currentMessagesState == null) ? 3 :
1112 (currentMessagesState.ordinal() + 1);
1113 int tStart =
1114 (incomingMessagesState == null) ? 0 : incomingMessagesState.ordinal();
1115 int tEnd = (incomingMessagesState == null) ? 3 :
1116 (incomingMessagesState.ordinal() + 1);
1117 for (int i = iStart; i < iEnd; ++i) {
1118 for (int j = jStart; j < jEnd; ++j) {
1119 for (int k = kStart; k < kEnd; ++k) {
1120 for (int t = tStart; t < tEnd; ++t) {
1121 Set<MetaPartition> partitionSet = partitions[i][j][k][t];
1122 synchronized (partitionSet) {
1123 MetaPartition meta = peekFromSet(partitionSet);
1124 if (meta != null) {
1125 return meta;
1126 }
1127 }
1128 }
1129 }
1130 }
1131 }
1132 return null;
1133 }
1134
1135
1136
1137
1138
1139
1140
1141 public boolean hasProcessedOnMemory() {
1142 int count = 0;
1143 for (int i = 0; i < 3; ++i) {
1144 for (int j = 0; j < 3; ++j) {
1145 Set<MetaPartition> partitionSet =
1146 partitions[ProcessingState.PROCESSED.ordinal()]
1147 [StorageState.IN_MEM.ordinal()][i][j];
1148 synchronized (partitionSet) {
1149 count += partitionSet.size();
1150 }
1151 }
1152 }
1153 return count - numPrefetch.get() != 0;
1154 }
1155
1156
1157 public void increaseNumPrefetch() {
1158 numPrefetch.getAndIncrement();
1159 }
1160
1161
1162
1163
1164
1165 public void reset() {
1166 numPrefetch.set(0);
1167 }
1168 }
1169 }