Since we're on a major migration process of this website, some component documents here are out of sync right now. In the meantime you may want to look at the early version of the new website
https://camel.apache.org/staging/
We would very much like to receive any feedback on the new site, please join the discussion on the Camel user mailing list.
Apache Spark componentApache Spark component is available starting from Camel 2.17.
This documentation page covers the Apache Spark component for the Apache Camel. The main purpose of the Spark integration with Camel is to provide a bridge between Camel connectors and Spark tasks. In particular Camel connector provides a way to route message from various transports, dynamically choose a task to execute, use incoming message as input data for that task and finally deliver the results of the execution back to the Camel pipeline. Supported architectural stylesSpark component can be used as a driver application deployed into an application server (or executed as a fat jar). Spark component can also be submitted as a job directly into the Spark cluster. While Spark component is primary designed to work as a long running job serving as an bridge between Spark cluster and the other endpoints, you can also use it as a fire-once short job. Running Spark in OSGi serversCurrently the Spark component doesn't support execution in the OSGi container. Spark has been designed to be executed as a fat jar, usually submitted as a job to a cluster. For those reasons running Spark in an OSGi server is at least challenging and is not support by Camel as well. URI formatCurrently the Spark component supports only producers - it it intended to invoke a Spark job and return results. You can call RDD, data frame or Hive SQL job.
Spark URI format spark:{rdd|dataframe|hive}
RDD jobs
To invoke an RDD job, use the following URI: Spark RDD producer spark:rdd?rdd=#testFileRdd&rddCallback=#transformation Where Spark RDD callback public interface RddCallback<T> { T onRdd(JavaRDDLike rdd, Object... payloads); } The following snippet demonstrates how to send message as an input to the job and return results: Calling spark job String pattern = "job input"; long linesCount = producerTemplate.requestBody("spark:rdd?rdd=#myRdd&rddCallback=#countLinesContaining", pattern, long.class); The RDD callback for the snippet above registered as Spring bean could look as follows: Spark RDD callback @Bean RddCallback<Long> countLinesContaining() { return new RddCallback<Long>() { Long onRdd(JavaRDDLike rdd, Object... payloads) { String pattern = (String) payloads[0]; return rdd.filter({line -> line.contains(pattern)}).count(); } } } The RDD definition in Spring could looks as follows: Spark RDD definition @Bean JavaRDDLike myRdd(JavaSparkContext sparkContext) { return sparkContext.textFile("testrdd.txt"); }
RDD jobs options
Void RDD callbacksIf your RDD callback doesn't return any value back to a Camel pipeline, you can either return Spark RDD definition @Bean RddCallback<Void> rddCallback() { return new VoidRddCallback() { @Override public void doOnRdd(JavaRDDLike rdd, Object... payloads) { rdd.saveAsTextFile(output.getAbsolutePath()); } }; } Converting RDD callbacksIf you know what type of the input data will be sent to the RDD callback, you can use Spark RDD definition @Bean RddCallback<Long> rddCallback(CamelContext context) { return new ConvertingRddCallback<Long>(context, int.class, int.class) { @Override public Long doOnRdd(JavaRDDLike rdd, Object... payloads) { return rdd.count() * (int) payloads[0] * (int) payloads[1]; } }; }; } Annotated RDD callbacksProbably the easiest way to work with the RDD callbacks is to provide class with method marked with Annotated RDD callback definition import static org.apache.camel.component.spark.annotations.AnnotatedRddCallback.annotatedRddCallback; @Bean RddCallback<Long> rddCallback() { return annotatedRddCallback(new MyTransformation()); } ... import org.apache.camel.component.spark.annotation.RddCallback; public class MyTransformation { @RddCallback long countLines(JavaRDD<String> textFile, int first, int second) { return textFile.count() * first * second; } } If you will pass CamelContext to the annotated RDD callback factory method, the created callback will be able to convert incoming payloads to match the parameters of the annotated method: Body conversions for annotated RDD callbacks import static org.apache.camel.component.spark.annotations.AnnotatedRddCallback.annotatedRddCallback; @Bean RddCallback<Long> rddCallback(CamelContext camelContext) { return annotatedRddCallback(new MyTransformation(), camelContext); } ... import org.apache.camel.component.spark.annotation.RddCallback; public class MyTransformation { @RddCallback long countLines(JavaRDD<String> textFile, int first, int second) { return textFile.count() * first * second; } } ... // Convert String "10" to integer long result = producerTemplate.requestBody("spark:rdd?rdd=#rdd&rddCallback=#rddCallback" Arrays.asList(10, "10"), long.class);
DataFrame jobs
Instead of working with RDDs Spark component can work with DataFrames as well. To invoke an DataFrame job, use the following URI: Spark RDD producer spark:dataframe?dataFrame=#testDataFrame&dataFrameCallback=#transformation Where Spark RDD callback public interface DataFrameCallback<T> { T onDataFrame(DataFrame dataFrame, Object... payloads); } The following snippet demonstrates how to send message as an input to a job and return results: Calling spark job String model = "Micra"; long linesCount = producerTemplate.requestBody("spark:dataFrame?dataFrame=#cars&dataFrameCallback=#findCarWithModel", model, long.class); The DataFrame callback for the snippet above registered as Spring bean could look as follows: Spark RDD callback @Bean RddCallback<Long> findCarWithModel() { return new DataFrameCallback<Long>() { @Override public Long onDataFrame(DataFrame dataFrame, Object... payloads) { String model = (String) payloads[0]; return dataFrame.where(dataFrame.col("model").eqNullSafe(model)).count(); } }; } The DataFrame definition in Spring could looks as follows: Spark RDD definition @Bean DataFrame cars(HiveContext hiveContext) { DataFrame jsonCars = hiveContext.read().json("/var/data/cars.json"); jsonCars.registerTempTable("cars"); return jsonCars; }
DataFrame jobs options
Hive jobsInstead of working with RDDs or DataFrame Spark component can also receive Hive SQL queries as payloads. To send Hive query to Spark component, use the following URI: Spark RDD producer spark:hive The following snippet demonstrates how to send message as an input to a job and return results: Calling spark job long carsCount = template.requestBody("spark:hive?collect=false", "SELECT * FROM cars", Long.class); List<Row> cars = template.requestBody("spark:hive", "SELECT * FROM cars", List.class); The table we want to execute query against should be registered in a HiveContext before we query it. For example in Spring such registration could look as follows: Spark RDD definition @Bean DataFrame cars(HiveContext hiveContext) { DataFrame jsonCars = hiveContext.read().json("/var/data/cars.json"); jsonCars.registerTempTable("cars"); return jsonCars; }
Hive jobs options
|