Coverage Report - org.apache.giraph.writable.kryo.HadoopKryo
 
Classes in this File Line Coverage Branch Coverage Complexity
HadoopKryo
0%
0/101
0%
0/14
0
HadoopKryo$1
0%
0/2
N/A
0
HadoopKryo$2
0%
0/2
N/A
0
HadoopKryo$3
0%
0/2
N/A
0
HadoopKryo$4
0%
0/14
0%
0/12
0
HadoopKryo$4$1
0%
0/5
N/A
0
HadoopKryo$5
0%
0/9
0%
0/2
0
HadoopKryo$6
0%
0/10
0%
0/2
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.writable.kryo;
 19  
 
 20  
 import java.io.DataInput;
 21  
 import java.io.DataOutput;
 22  
 import java.util.Arrays;
 23  
 import java.util.Collections;
 24  
 import java.util.LinkedHashMap;
 25  
 import java.util.Map;
 26  
 import java.util.Map.Entry;
 27  
 import java.util.Random;
 28  
 
 29  
 import com.esotericsoftware.kryo.util.DefaultClassResolver;
 30  
 import de.javakaffee.kryoserializers.guava.ImmutableMapSerializer;
 31  
 import org.apache.giraph.conf.GiraphConfigurationSettable;
 32  
 import com.esotericsoftware.kryo.ClassResolver;
 33  
 import com.esotericsoftware.kryo.ReferenceResolver;
 34  
 import com.esotericsoftware.kryo.util.MapReferenceResolver;
 35  
 import org.apache.giraph.types.ops.collections.Basic2ObjectMap;
 36  
 import org.apache.giraph.types.ops.collections.BasicSet;
 37  
 import org.apache.giraph.writable.kryo.markers.NonKryoWritable;
 38  
 import org.apache.giraph.writable.kryo.markers.KryoIgnoreWritable;
 39  
 import org.apache.giraph.writable.kryo.serializers.ArraysAsListSerializer;
 40  
 import org.apache.giraph.writable.kryo.serializers.CollectionsNCopiesSerializer;
 41  
 import org.apache.giraph.writable.kryo.serializers.DirectWritableSerializer;
 42  
 import org.apache.giraph.writable.kryo.serializers.FastUtilSerializer;
 43  
 import org.apache.giraph.writable.kryo.serializers.ImmutableBiMapSerializerUtils;
 44  
 import org.apache.giraph.writable.kryo.serializers.ReusableFieldSerializer;
 45  
 import org.apache.hadoop.conf.Configurable;
 46  
 import org.apache.hadoop.conf.Configuration;
 47  
 import org.apache.hadoop.io.Writable;
 48  
 import org.apache.log4j.Logger;
 49  
 import org.objenesis.strategy.StdInstantiatorStrategy;
 50  
 
 51  
 import com.esotericsoftware.kryo.Kryo;
 52  
 import com.esotericsoftware.kryo.Serializer;
 53  
 import com.esotericsoftware.kryo.factories.SerializerFactory;
 54  
 import com.esotericsoftware.kryo.io.Input;
 55  
 import com.esotericsoftware.kryo.io.InputChunked;
 56  
 import com.esotericsoftware.kryo.io.Output;
 57  
 import com.esotericsoftware.kryo.io.OutputChunked;
 58  
 import com.esotericsoftware.kryo.pool.KryoCallback;
 59  
 import com.esotericsoftware.kryo.pool.KryoFactory;
 60  
 import com.esotericsoftware.kryo.pool.KryoPool;
 61  
 import com.esotericsoftware.kryo.serializers.ClosureSerializer;
 62  
 import com.esotericsoftware.kryo.serializers.FieldSerializer;
 63  
 import com.esotericsoftware.kryo.util.ObjectMap;
 64  
 import com.google.common.base.Preconditions;
 65  
 
 66  
 import de.javakaffee.kryoserializers.guava.ImmutableListSerializer;
 67  
 
 68  
 /**
 69  
  * Kryo instance that provides serialization through DataInput/DataOutput
 70  
  * that org.apache.hadoop.io.Writable uses.
 71  
  *
 72  
  * All public APIs are static.
 73  
  *
 74  
  * It extends Kryo to reuse KryoPool functionality, but have additional needed
 75  
  * objects cached as well. If we move to ThreadLocal or other caching
 76  
  * technique, we can use composition, instead of inheritance here.
 77  
  *
 78  
  * TODO: Refactor this class into two separate classes depending on
 79  
  * whether the reference tracking is enabled or disabled.
 80  
  */
 81  0
 public class HadoopKryo extends Kryo {
 82  
   /** Pool of reusable Kryo objects, since they are expensive to create */
 83  0
   private static final KryoPool KRYO_POOL = new KryoPool.Builder(
 84  0
       new KryoFactory() {
 85  
         @Override
 86  
         public Kryo create() {
 87  0
           return createKryo(true, true);
 88  
         }
 89  0
       }).build();
 90  
   /** Thread local HadoopKryo object */
 91  0
   private static final ThreadLocal<HadoopKryo> KRYO =
 92  0
     new ThreadLocal<HadoopKryo>() {
 93  
       @Override protected HadoopKryo initialValue() {
 94  0
         return  createKryo(false, false);
 95  
       }
 96  
     };
 97  
 
 98  
   /**
 99  
    * List of interfaces/parent classes that will not be allowed to be
 100  
    * serialized, together with explanation of why, that will be shown
 101  
    * when throwing such exception
 102  
    */
 103  
   private static final Map<Class<?>, String> NON_SERIALIZABLE;
 104  
 
 105  
   static {
 106  0
     NON_SERIALIZABLE = new LinkedHashMap<>();
 107  0
     NON_SERIALIZABLE.put(
 108  
         NonKryoWritable.class,
 109  
         "it is marked to not allow serialization, " +
 110  
         "look at the class for more details");
 111  0
     NON_SERIALIZABLE.put(
 112  
         KryoWritableWrapper.class, "recursion is disallowed");
 113  0
     NON_SERIALIZABLE.put(
 114  
         Configuration.class,
 115  
         "it cannot be supported since it contains ClassLoader");
 116  0
     NON_SERIALIZABLE.put(
 117  
         GiraphConfigurationSettable.class, "configuration cannot be set");
 118  0
     NON_SERIALIZABLE.put(
 119  
         Configurable.class, "configuration cannot be set");
 120  0
     NON_SERIALIZABLE.put(
 121  
         Random.class,
 122  
         "it should be rarely serialized, since it would create same stream " +
 123  
         "of numbers everywhere, use TransientRandom instead");
 124  0
     NON_SERIALIZABLE.put(
 125  
         Logger.class,
 126  
         "Logger must be a static field");
 127  0
   }
 128  
 
 129  
   /** Reusable Input object */
 130  
   private InputChunked input;
 131  
   /** Reusable Output object */
 132  
   private OutputChunked output;
 133  
 
 134  
   /** Reusable DataInput wrapper stream */
 135  
   private DataInputWrapperStream dataInputWrapperStream;
 136  
   /** Reusable DataOutput wrapper stream */
 137  
   private DataOutputWrapperStream dataOutputWrapperStream;
 138  
 
 139  
   /**
 140  
    * Map of already initialized serializers used
 141  
    * for readIntoObject/writeOutOfObject pair of methods
 142  
    */
 143  0
   private final ObjectMap<Class<?>, ReusableFieldSerializer<Object>>
 144  
   classToIntoSerializer = new ObjectMap<>();
 145  
 
 146  
   /** Hide constructor, so all access go through pool of cached objects */
 147  0
   private HadoopKryo() {
 148  0
   }
 149  
 
 150  
   /**
 151  
    * Constructor that takes custom class resolver and reference resolver.
 152  
    * @param classResolver Class resolver
 153  
    * @param referenceResolver Reference resolver
 154  
    */
 155  
   private HadoopKryo(ClassResolver classResolver,
 156  
                      ReferenceResolver referenceResolver) {
 157  0
     super(classResolver, referenceResolver);
 158  0
   }
 159  
 
 160  
   // Public API:
 161  
 
 162  
   /**
 163  
    * Write type of given object and the object itself to the output stream.
 164  
    * Inverse of readClassAndObj.
 165  
    *
 166  
    * @param out Output stream
 167  
    * @param object Object to write
 168  
    */
 169  
   public static void writeClassAndObj(
 170  
           final DataOutput out, final Object object) {
 171  0
     writeInternal(out, object, false);
 172  0
   }
 173  
 
 174  
   /**
 175  
    * Read object from the input stream, by reading first type of the object,
 176  
    * and then all of its fields.
 177  
    * Inverse of writeClassAndObject.
 178  
    *
 179  
    * @param in Input stream
 180  
    * @return Deserialized object
 181  
    * @param <T> Type of the object being read
 182  
    */
 183  
   public static <T> T readClassAndObj(DataInput in) {
 184  0
     return readInternal(in, null, false);
 185  
   }
 186  
 
 187  
   /**
 188  
    * Write an object to output, in a way that can be read by readIntoObject.
 189  
    *
 190  
    * @param out Output stream
 191  
    * @param object Object to be written
 192  
    */
 193  
   public static void writeOutOfObject(
 194  
       final DataOutput out, final Object object) {
 195  0
     writeInternal(out, object, true);
 196  0
   }
 197  
 
 198  
   /**
 199  
    * Reads an object, from input, into a given object,
 200  
    * allowing object reuse.
 201  
    * Inverse of writeOutOfObject.
 202  
    *
 203  
    * @param in Input stream
 204  
    * @param object Object to fill from input
 205  
    */
 206  
   public static void readIntoObject(DataInput in, Object object) {
 207  0
     readInternal(in, object, true);
 208  0
   }
 209  
 
 210  
   /**
 211  
    * Writes class and object to specified output stream with specified
 212  
    * Kryo object. It does not use interim buffers.
 213  
    * @param kryo Kryo object
 214  
    * @param out Output stream
 215  
    * @param object Object
 216  
    */
 217  
   public static void writeWithKryo(
 218  
           final HadoopKryo kryo, final Output out,
 219  
           final Object object) {
 220  0
     kryo.writeClassAndObject(out, object);
 221  0
     out.close();
 222  0
   }
 223  
 
 224  
   /**
 225  
    * Write out of object with given kryo
 226  
    * @param kryo Kryo object
 227  
    * @param out Output
 228  
    * @param object Object to write
 229  
    */
 230  
   public static void writeWithKryoOutOfObject(
 231  
           final HadoopKryo kryo, final Output out,
 232  
           final Object object) {
 233  0
     kryo.writeOutOfObject(out, object);
 234  0
     out.close();
 235  0
   }
 236  
 
 237  
   /**
 238  
    * Reads class and object from specified input stream with
 239  
    * specified kryo object.
 240  
    * it does not use interim buffers.
 241  
    * @param kryo Kryo object
 242  
    * @param in Input buffer
 243  
    * @param <T> Object type parameter
 244  
    * @return Object
 245  
    */
 246  
   public static <T> T readWithKryo(
 247  
           final HadoopKryo kryo, final Input in) {
 248  
     T object;
 249  0
     object = (T) kryo.readClassAndObject(in);
 250  0
     in.close();
 251  0
     return object;
 252  
   }
 253  
 
 254  
   /**
 255  
    * Read into object with given kryo.
 256  
    * @param kryo Kryo object
 257  
    * @param in Input
 258  
    * @param object Object to read into
 259  
    */
 260  
   public static void readWithKryoIntoObject(
 261  
           final HadoopKryo kryo, final Input in, Object object) {
 262  0
     kryo.readIntoObject(in, object);
 263  0
     in.close();
 264  0
   }
 265  
 
 266  
   /**
 267  
    * Create copy of the object, by magically recursively copying
 268  
    * all of its fields, keeping reference structures (like cycles)
 269  
    *
 270  
    * @param object Object to be copied
 271  
    * @return Copy of the object.
 272  
    * @param <T> Type of the object
 273  
    */
 274  
   public static <T> T createCopy(final T object) {
 275  0
     return KRYO_POOL.run(new KryoCallback<T>() {
 276  
       @Override
 277  
       public T execute(Kryo kryo) {
 278  0
         return kryo.copy(object);
 279  
       }
 280  
     });
 281  
   }
 282  
 
 283  
   /**
 284  
    * Returns a kryo which doesn't track objects, hence
 285  
    * serialization of recursive/nested objects is not
 286  
    * supported.
 287  
    *
 288  
    * Reference tracking significantly degrades the performance
 289  
    * since kryo has to store all serialized objects and search
 290  
    * the history to check if an object has been already serialized.
 291  
    *
 292  
    * @return Hadoop kryo which doesn't track objects.
 293  
    */
 294  
   public static HadoopKryo getNontrackingKryo() {
 295  0
     return KRYO.get();
 296  
   }
 297  
 
 298  
   // Private implementation:
 299  
 
 300  
   /**
 301  
    * Create new instance of HadoopKryo, properly initialized.
 302  
    * @param trackReferences if true, object references are tracked.
 303  
    * @param hasBuffer if true, an interim buffer is used.
 304  
    * @return new HadoopKryo instance
 305  
    */
 306  
   private static HadoopKryo createKryo(boolean trackReferences,
 307  
                                        boolean hasBuffer) {
 308  
     HadoopKryo kryo;
 309  0
     if (trackReferences) {
 310  0
       kryo = new HadoopKryo();
 311  
     } else {
 312  
       // Only use GiraphClassResolver if it is properly initialized.
 313  
       // This is to enable test cases which use KryoSimpleWrapper
 314  
       // but don't start ZK.
 315  0
       kryo = new HadoopKryo(
 316  0
               GiraphClassResolver.isInitialized() ? new GiraphClassResolver() :
 317  
                                                     new DefaultClassResolver(),
 318  
               new MapReferenceResolver());
 319  
     }
 320  
 
 321  
     try {
 322  0
       kryo.register(Class.forName("java.lang.invoke.SerializedLambda"));
 323  0
       kryo.register(Class.forName(
 324  
         "com.esotericsoftware.kryo.serializers.ClosureSerializer$Closure"),
 325  
         new ClosureSerializer());
 326  0
     } catch (ClassNotFoundException e) {
 327  0
       throw new IllegalStateException(
 328  
         "Trying to use Kryo on Java version " +
 329  0
           System.getProperty("java.version") +
 330  
           ", but unable to find needed classes", e);
 331  0
     }
 332  
 
 333  0
     kryo.register(Arrays.asList().getClass(), new ArraysAsListSerializer());
 334  0
     kryo.register(Collections.nCopies(1, new Object()).getClass(),
 335  
         new CollectionsNCopiesSerializer());
 336  
 
 337  0
     ImmutableListSerializer.registerSerializers(kryo);
 338  0
     ImmutableMapSerializer.registerSerializers(kryo);
 339  0
     ImmutableBiMapSerializerUtils.registerSerializers(kryo);
 340  
 
 341  
     // There are many fastutil classes, register them at the end,
 342  
     // so they don't use up small registration numbers
 343  0
     FastUtilSerializer.registerAll(kryo);
 344  
 
 345  0
     kryo.setInstantiatorStrategy(new DefaultInstantiatorStrategy(
 346  
         new StdInstantiatorStrategy()));
 347  
 
 348  0
     SerializerFactory customSerializerFactory = new SerializerFactory() {
 349  
       @SuppressWarnings("rawtypes")
 350  
       @Override
 351  
       public Serializer makeSerializer(Kryo kryo, final Class<?> type) {
 352  
         for (final Entry<Class<?>, String> entry :
 353  0
             NON_SERIALIZABLE.entrySet()) {
 354  0
           if (entry.getKey().isAssignableFrom(type)) {
 355  
             // Allow Class object to be serialized, but not a live instance.
 356  0
             return new Serializer() {
 357  
               @Override
 358  
               public Object read(Kryo kryo, Input input, Class type) {
 359  0
                 throw new RuntimeException("Cannot serialize " + type +
 360  
                     ". Objects being serialized cannot capture " +
 361  0
                     entry.getKey() + " because " + entry.getValue() +
 362  
                     ". Either remove field in question" +
 363  
                     ", or make it transient (so that it isn't serialized)");
 364  
               }
 365  
 
 366  
               @Override
 367  
               public void write(Kryo kryo, Output output, Object object) {
 368  0
                 throw new RuntimeException("Cannot serialize " + type +
 369  
                     ". Objects being serialized cannot capture " +
 370  0
                     entry.getKey() + " because " + entry.getValue() +
 371  
                     ". Either remove field in question" +
 372  
                     ", or make it transient (so that it isn't serialized)");
 373  
               }
 374  
             };
 375  
           }
 376  0
         }
 377  
 
 378  0
         if (Writable.class.isAssignableFrom(type) &&
 379  0
             !KryoIgnoreWritable.class.isAssignableFrom(type) &&
 380  
             // remove BasicSet, BasicArrayList and Basic2ObjectMap temporarily,
 381  
             // for lack of constructors
 382  0
             !BasicSet.class.isAssignableFrom(type) &&
 383  0
             !Basic2ObjectMap.class.isAssignableFrom(type)) {
 384  
           // use the Writable method defined by the type
 385  0
           DirectWritableSerializer serializer = new DirectWritableSerializer();
 386  0
           return serializer;
 387  
         } else {
 388  0
           FieldSerializer serializer = new FieldSerializer<>(kryo, type);
 389  0
           serializer.setIgnoreSyntheticFields(false);
 390  0
           return serializer;
 391  
         }
 392  
       }
 393  
     };
 394  
 
 395  0
     kryo.addDefaultSerializer(Writable.class, customSerializerFactory);
 396  0
     kryo.setDefaultSerializer(customSerializerFactory);
 397  
 
 398  0
     if (hasBuffer) {
 399  0
       kryo.input = new InputChunked(4096);
 400  0
       kryo.output = new OutputChunked(4096);
 401  0
       kryo.dataInputWrapperStream = new DataInputWrapperStream();
 402  0
       kryo.dataOutputWrapperStream = new DataOutputWrapperStream();
 403  
     }
 404  
 
 405  0
     if (!trackReferences) {
 406  0
       kryo.setReferences(false);
 407  
 
 408  
       // Auto reset can only be disabled if the GiraphClassResolver is
 409  
       // properly initialized.
 410  0
       if (GiraphClassResolver.isInitialized()) {
 411  0
         kryo.setAutoReset(false);
 412  
       }
 413  
     }
 414  0
     return kryo;
 415  
   }
 416  
 
 417  
   /**
 418  
    * Register serializer for class with class name
 419  
    *
 420  
    * @param kryo HadoopKryo
 421  
    * @param className Name of the class for which to register serializer
 422  
    * @param serializer Serializer to use
 423  
    */
 424  
   private static void registerSerializer(HadoopKryo kryo, String className,
 425  
       Serializer serializer) {
 426  
     try {
 427  0
       kryo.register(Class.forName(className), serializer);
 428  0
     } catch (ClassNotFoundException e) {
 429  0
       throw new IllegalStateException("Class " + className + " is missing", e);
 430  0
     }
 431  0
   }
 432  
 
 433  
   /**
 434  
    * Initialize reusable objects for reading from given DataInput.
 435  
    *
 436  
    * @param in Input stream
 437  
    */
 438  
   private void setDataInput(DataInput in) {
 439  0
     dataInputWrapperStream.setDataInput(in);
 440  0
     input.setInputStream(dataInputWrapperStream);
 441  0
   }
 442  
 
 443  
   /**
 444  
    * Initialize reusable objects for writing into given DataOutput.
 445  
    *
 446  
    *  @param out Output stream
 447  
    */
 448  
   private void setDataOutput(DataOutput out) {
 449  0
     dataOutputWrapperStream.setDataOutput(out);
 450  0
     output.setOutputStream(dataOutputWrapperStream);
 451  0
   }
 452  
 
 453  
   /**
 454  
    * Get or create reusable serializer for given class.
 455  
    *
 456  
    * @param type Type of the object
 457  
    * @return Serializer
 458  
    */
 459  
   private ReusableFieldSerializer<Object> getOrCreateReusableSerializer(
 460  
       Class<?> type) {
 461  0
     ReusableFieldSerializer<Object> serializer =
 462  0
         classToIntoSerializer.get(type);
 463  0
     if (serializer == null) {
 464  0
       serializer = new ReusableFieldSerializer<>(this, type);
 465  0
       classToIntoSerializer.put(type, serializer);
 466  
     }
 467  0
     return serializer;
 468  
   }
 469  
 
 470  
   /**
 471  
    * Internal write implementation, that reuses HadoopKryo objects
 472  
    * from the pool.
 473  
    *
 474  
    * @param out Output stream
 475  
    * @param object Object to be written
 476  
    * @param outOf whether we are writing reusable objects,
 477  
    *              or full objects with class name
 478  
    */
 479  
   private static void writeInternal(
 480  
       final DataOutput out, final Object object, final boolean outOf) {
 481  0
     KRYO_POOL.run(new KryoCallback<Void>() {
 482  
       @Override
 483  
       public Void execute(Kryo kryo) {
 484  0
         HadoopKryo hkryo = (HadoopKryo) kryo;
 485  0
         hkryo.setDataOutput(out);
 486  
 
 487  0
         if (outOf) {
 488  0
           hkryo.writeOutOfObject(hkryo.output, object);
 489  
         } else {
 490  0
           hkryo.writeClassAndObject(hkryo.output, object);
 491  
         }
 492  
 
 493  0
         hkryo.output.endChunks();
 494  0
         hkryo.output.close();
 495  
 
 496  0
         return null;
 497  
       }
 498  
     });
 499  0
   }
 500  
 
 501  
   /**
 502  
    * Internal read implementation, that reuses HadoopKryo objects
 503  
    * from the pool.
 504  
    *
 505  
    * @param in Input stream
 506  
    * @param outObject Object to fill from input (if not null)
 507  
    * @param into whether we are reading reusable objects,
 508  
    *             or full objects with class name
 509  
    * @return Read object (new one, or same passed in if we use reusable)
 510  
    * @param <T> Type of the object to read
 511  
    */
 512  
   @SuppressWarnings("unchecked")
 513  
   private static <T> T readInternal(
 514  
       final DataInput in, final T outObject, final boolean into) {
 515  0
     return KRYO_POOL.run(new KryoCallback<T>() {
 516  
       @Override
 517  
       public T execute(Kryo kryo) {
 518  0
         HadoopKryo hkryo = (HadoopKryo) kryo;
 519  0
         hkryo.setDataInput(in);
 520  
 
 521  
         T object;
 522  0
         if (into) {
 523  0
           hkryo.readIntoObject(hkryo.input, outObject);
 524  0
           object = outObject;
 525  
         } else {
 526  0
           object = (T) hkryo.readClassAndObject(hkryo.input);
 527  
         }
 528  0
         hkryo.input.nextChunks();
 529  
 
 530  0
         hkryo.input.close();
 531  0
         return object;
 532  
       }
 533  
     });
 534  
   }
 535  
 
 536  
   /**
 537  
    * Reads an object, from input, into a given object,
 538  
    * allowing object reuse.
 539  
    *
 540  
    * @param input Input stream
 541  
    * @param object Object to fill from input
 542  
    */
 543  
   private void readIntoObject(Input input, Object object) {
 544  0
     Preconditions.checkNotNull(object);
 545  
 
 546  0
     Class<?> type = object.getClass();
 547  0
     ReusableFieldSerializer<Object> serializer =
 548  0
         getOrCreateReusableSerializer(type);
 549  
 
 550  0
     serializer.setReadIntoObject(object);
 551  0
     Object result = readObject(input, type, serializer);
 552  
 
 553  0
     Preconditions.checkState(result == object);
 554  0
   }
 555  
 
 556  
   /**
 557  
    * Write an object to output, in a way that can be read
 558  
    * using readIntoObject.
 559  
    * @param output Output stream
 560  
    * @param object Object to be written
 561  
    */
 562  
   private void writeOutOfObject(Output output, Object object) {
 563  0
     ReusableFieldSerializer<Object> serializer =
 564  0
         getOrCreateReusableSerializer(object.getClass());
 565  0
     writeObject(output, object, serializer);
 566  0
   }
 567  
 
 568  
 }