Friday, September 30, 2011

Reusing Talend ETL Jobs

The question about how to reuse Talend jobs is always in the forums. I will demonstrate here with a proof of concept how this can be achieved using lightweight JSON HTTP requests.

You can find the source code in here.

If you prefer not to hack into the code then just keep on reading for an explanation.

Build a job (retrieve_stock) like the below which basically uses a Yahoo API to retrieve data about a security symbol:

The job uses a tFileInputJson:
//Basic Settings
context.url + "%22" + context.symbol + "%22"

... and a tJavaFlex:
//Advanced Settings/Import
import org.json.simple.JSONObject;
import org.json.simple.JSONArray;
import org.json.simple.parser.JSONParser;
//Basic Settings
//./Start Code
JSONParser parser = new JSONParser(); 
//./Main Code
JSONObject json = (JSONObject) parser.parse(row1.json);

As you already noticed you need to declare a context with two variables like below (showing default values):
url: "*"
symbol: "AAPL"

For those who like screenshots ;-)

All we are doing is getting the json from the remote URL (built out of context parameters) and returning it to the stdout. The URL will remain fixed in this example but I use a context variable to show it could change in the future.

Export the job as "Autonomous", uncompress the zip and run the wrapper shell/batch. I am working on Windows this time so we will need to modify the batch file adding @ECHO ON at the beginning of the file.

We can run now from command line our Talend job for a different symbol like Google (GOOG)
C:\etl\releases\retrieve_stock_0.1\retrieve_stock>retrieve_stock_run.bat --context_param symbol=GOOG

Here is a typical response
cationreturnedforsymbolchangedinvalid":null,"DaysRange":"519.41 - 537.30","MoreInfo":"cnprmiIed","AnnualizedGain":null,"Change_PercentChange":"-1.34 - -0.25%","DaysRangeRealtime":"N\/A - N\/
e":"9\/29\/2011","TwoHundreddayMovingAverage":"543.247","AskRealtime":"620.00","DividendPayDate":null,"PercentChange":"-0.25%","YearRange":"473.02 - 642.96","symbol":"GOOG","Change":"-1.34",
"PercentChangeFromFiftydayMovingAverage":"-1.17%","HoldingsGainPercent":"- - -","Notes":null,"HoldingsGain":null,"YearHigh":"642.96","Symbol":"GOOG","AfterHoursChangeRealtime":"N\/A - N\/A",
"HoldingsGainPercentRealtime":"N\/A - N\/A","MarketCapitalization":"170.3B","BidRealtime":"485.00","LastTradePriceOnly":"527.50","PERatio":"19.08","EPSEstimateNextQuarter":"10.04","MarketCap
Realtime":null,"AverageDailyVolume":"3820260","PercentChangeFromYearLow":"+11.52%","TickerTrend":"&nbsp;======&nbsp;","LastTradeWithTime":"4:00pm - <b>527.50<\/b>","ChangeFromYearHigh":"-115
0","DaysValueChange":"- - -0.25%","HighLimit":null,"TradeDate":null,"OneyrTargetPrice":"719.68","ChangeRealtime":"-1.34","YearLow":"473.02","ExDividendDate":null,"EPSEstimateNextYear":"41.98
gsGainRealtime":null,"PEGRatio":"0.79","Name":"Google Inc.","Commission":null,"ChangePercentRealtime":"N\/A - -0.25%","DaysValueChangeRealtime":"N\/A - N\/A","LastTradeRealtimeWithTime":"N\/
A - <b>527.50<\/b>","HoldingsValueRealtime":null,"EarningsShare":"27.719","PriceBook":"3.28","ChangeinPercent":"-0.25%","SharesOwned":null,"ShortRatio":"1.60","PriceSales":"5.12"}

This job is supposed to be reused from others. If at any time the way the symbol is retrieved changes we will change the logic from just one place. After that we just need to export retrieve_stock job and deploy it in our server.

We could reuse retrieve_stock job using a tSystem component to invoke the command but I am going to go a step forward and propose something else.

It is well known that java will use fork() to invoke an external command which means the whole JVM heap memory will be duplicated when the process run. This is of course not efficient. I have posted a solution around this issue before.

So the proposal here is to run a local server that executes local talend command line scripts. The output will be JSON. As you have already figured I am advocating here to use JSON as a lightweight data structure that can be used to maintain the communication channel between different jobs.

Using a nodejs server is better than using Jetty or any other java container for just running shell commands. In my tests the memory footprint for nodejs is really low and the performance is similar to a java container.

After you setup your server you should get the same response after hitting a url like the below:

What happens if the amount of data generated by one job is too big? Well in that case I recommend to use a shared resource like a file or a DB (or both like the case of sqlite ;-) If a job works generating a file or DB tables of course the interface will need to be documented and still you can invoke it via a REST call like we have explained here returning back to the caller (Parent job) the results of its execution.

The job we will use to invoke retrieve_stock actually uses the same pattern as you see below:

You will be able to hit a url like the below and your job will return the AAPL price:

As you can see at the end a job will call external jobs using the same JSON response strategy.

I am not including any transformations in these examples because the whole purpose of this post is to discuss alternatives when it comes to inter-job communication in Talend. Here is what this architecture is allowing me to do:
  1. Projects can be isolated and so better maintained in version control systems using the (free and open source) TOS version
  2. There is a good separation of concern when it comes to building an ETL (You do not need to be a Java developer, Talend also supports Perl but even if you build Java projects the language is only used in a scripting fashion so there is no overhead of unneeded OOP for ETL. Many developers are tempted to push data logic in their java code once they have the luxury of working from java and using Talend jobs written in java ) The ETL developer can test absolutely everything without the need of merging jars, troubleshooting class loading issues etc.
  3. Reusability.
  4. Deployment is easy, just uncompressing a zip file.
  5. Release is not hard if you keep the versioned zip file in a repository. Of course this should be so much better but unfortunately the Open Source version is still not providing maven integration for example. It would be great to be able to run 'mvn release' and get the project tagged.
  6. Interoperability: For example a job written in version 4 could interact with others written in version 5 as the only interface between them is an HTTP JSON Service call.


You are working with shell commands which can be pretty destructive if you do not ensure the json server just run locally attached to a loopback IP (in our case You will need a more robust server implementation if you violate that rule or if you cannot guarantee your server will not be hosting any other service where a different user could run malicious code.


In case you did not notice the below two lines are equivalent. The second is URL encoded. We need that to pass the whole command as a parameter from a query string or a POST request.
C:\etl\releases\retrieve_stock_0.1\retrieve_stock\retrieve_stock_run.bat --context_param symbol=AAPL

No comments: