Page Banner

Computing Moving-Average of Stocks in Hadoop HIVE

General Sense of Moving Average:

Moving Average is a widely used indicator in technical analysis that helps smooth out price action by filtering out the “noise” from random price fluctuations. A moving average (MA) is a trend-following or lagging indicator because it is based on past prices.

The two basic and commonly used MAs are the simple moving average (SMA), which is the simple average of a security over a defined number of time periods, and the exponential moving average (EMA), which gives bigger weight to more recent prices. The most common applications of MAs are to identify the trend direction and to determine support and resistance levels. While MAs are useful enough on their own, they also form the basis for other indicators such as the Moving Average Convergence Divergence (MACD).

As an SMA example, consider a security with the following closing prices over 15 days:

Week 1 (5 days) – 20, 22, 24, 25, 23

Week 2 (5 days) – 26, 28, 26, 29, 27

Week 3 (5 days) – 28, 30, 27, 29, 28

A 10-day MA would average out the closing prices for the first 10 days as the first data point. The next data point would drop the earliest price, add the price on day 11 and take the average, and so on as shown below.

[table]

[tr][th]Day[/th] [th]Closing Price[/th] [th]10 Day SMA[/th] [th]Values Used for SMA[/th][/tr]

[tr][td]1[/td] [td]20[/td] [td][/td] [td][/td][/tr]

[tr][td]2[/td] [td]22[/td] [td][/td] [td][/td][/tr]

[tr][td]3[/td] [td]24[/td] [td][/td] [td][/td][/tr]

[tr][td]4[/td] [td]25[/td] [td][/td] [td][/td][/tr]

[tr][td]5[/td] [td]23[/td] [td][/td] [td][/td][/tr]

[tr][td]6[/td] [td]26[/td] [td][/td] [td][/td][/tr]

[tr][td]7[/td] [td]28[/td] [td][/td] [td][/td][/tr]

[tr][td]8[/td] [td]26[/td] [td][/td] [td][/td][/tr]

[tr][td]9[/td] [td]29[/td] [td][/td] [td][/td][/tr]

[tr][td]10[/td] [td]27[/td] [td]25[/td] [td]Average of Day 1 through 10[/td][/tr]

[tr][td]11[/td] [td]28[/td] [td]25.8[/td] [td]Average of Day 2 through 11[/td][/tr]

[tr][td]12[/td] [td]30[/td] [td]26.6[/td] [td]Average of Day 3 through 12[/td][/tr]

[tr][td]13[/td] [td]27[/td] [td]26.9[/td] [td]Average of Day 4 through 13[/td][/tr]

[tr][td]14[/td] [td]29[/td] [td]27.3[/td] [td]Average of Day 5 through 14[/td][/tr]

[tr][td]15[/td] [td]28[/td] [td]27.8[/td] [td]Average of Day 6 through 15[/td][/tr]

[/table]

As noted earlier, MAs lag current price action because they are based on past prices; the longer the time period for the MA, the greater the lag. Thus a 200-day MA will have a much greater degree of lag than a 20-day MA because it contains prices for the past 200 days. The length of the MA to use depends on the trading objectives, with shorter MAs used for short-term trading and longer-term MAs more suited for long-term investors. The 200-day MA is widely followed by investors and traders, with breaks above and below this moving average considered to be important trading signals.
Implementing Moving Average User Defined Function (UDF) in HIVE:

Step 1: Run the following HQL script (create_load_stocks.hql) to create and load stock exchange data into the ‘stocks’ table

[training@localhost ~]$ /usr/bin/hive -f create_load_stocks.hql

-- Create and load stocks data into partitioned HIVE table.
CREATE DATABASE IF NOT EXISTS training;
USE training;
-- We'll demonstrate the use of two features, external (vs. managed or
-- internal) tables and partitioning the table to speed up performance.
-- You can also use partitioning with managed tables.
-- We'll use historical stock price data from Infochimps.com:
-- NASDAQ: infochimps_dataset_4777_download_16185
-- NYSE: infochimps_dataset_4778_download_16677
-- The EXTERNAL keyword tells Hive that the table storage will
-- be "external" to Hive, rather than the default internal
-- storage. We'll specify where the storage exists below.
-- We'll also partition the table by the exchange and the
-- stock symbol, which will speed-up queries selecting on either
-- field, because Hive will know it can skip partitions that
-- don't match the specified query values!
CREATE EXTERNAL TABLE IF NOT EXISTS stocks (
ymd               STRING,
price_open        FLOAT,
price_high        FLOAT,
price_low         FLOAT,
price_close       FLOAT,
volume            INT,
price_adj_close   FLOAT
)
PARTITIONED BY (exchange STRING, symbol STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
-- We don't have any partitions yet:
SHOW PARTITIONS stocks;
-- For EXTERNAL, partitioned tables, you use ALTER TABLE to add each
-- partition and specify a unique directory for its data.
-- We'll just add data for four stocks:
ALTER TABLE stocks ADD PARTITION(exchange = 'NASDAQ', symbol = 'AAPL')
LOCATION '/user/training/stocks/input/plain-text/NASDAQ/AAPL';
ALTER TABLE stocks ADD PARTITION(exchange = 'NASDAQ', symbol = 'INTC')
LOCATION '/user/training/stocks/input/plain-text/NASDAQ/INTC';
ALTER TABLE stocks ADD PARTITION(exchange = 'NYSE', symbol = 'GE')
LOCATION '/user/training/stocks/input/plain-text/NYSE/GE';
ALTER TABLE stocks ADD PARTITION(exchange = 'NYSE', symbol = 'IBM')
LOCATION '/user/training/stocks/input/plain-text/NYSE/IBM';
SHOW PARTITIONS stocks;

Step 2: We’ll use a Custom UDF implemented in Java (MovingAverageUDF) to calculate the moving average of stocks.

package com.training.hiveudfs;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.io.Text;
/**
* A UDF that computes the moving average of the the last "numberOfUnits" of
* the input "value" associated with the key.
* @return the moving average for that key.
*/
@Description(name = "moving_avg",
   value = "_FUNC_(n, key, value) - Return the average of the last n values for the specified key",
   extended = "Example:n"
   + " > SELECT _FUNC_(50, symbol, price_close) FROM stocks LIMIT 100;n"
   + " 25.73n")
public final class MovingAverageUDF extends UDF {
    private Map<Text,LinkedList<Float>> map = new HashMap<Text,LinkedList<Float>>();
    public float evaluate(
            final int numberOfUnits, final Text key, final float value) {
        LinkedList<Float> list = map.get(key);
        if (list == null) {
            list = new LinkedList<Float>();
            map.put(key, list);
        }
        list.add(value);
        if (list.size() > numberOfUnits) {
            list.removeFirst();
        }
        if (numberOfUnits == 0) {
            return 0.0f;
        } else {
            int size = list.size();
            int n = size < numberOfUnits ? size : numberOfUnits;
            return sum(list) / (1.0f * n);
       }
   }
   private float sum(LinkedList<Float> list) {
       float result = 0.0F;
       for (float f: list) {
           result += f;
       }
       return result;
  }
}

Step 3:  Run the following HQL script (custom_udf.hql) to compute moving average of stock prices.

[training@localhost ~]$ /usr/bin/hive -f custom_udf.hql

-- Custom UDFs
CREATE DATABASE IF NOT EXISTS training;
USE training;
-- We'll use a Custom UDF implemented in Java to calculate the moving
-- average of stocks.
-- Built this java code. The "jar" that you incorporate using
-- the ADD JAR command is named "moving-avg-udf.jar". Let's now add this jar.
-- Note the relative path to the jar used below; we assume you started the
-- CLI in this directory, that is, the directory that holds this HQL script!
--This command will add the "jar" file into HIVE library.
--NOTE: Exclamatory symbol (!) placed before any command
-- will treat the command to be executed from terminal instead of HIVE shell
!sudo cp /home/training/moving-avg-udf.jar /usr/lib/hive/lib
ADD JAR moving-avg-udf.jar;
CREATE TEMPORARY FUNCTION moving_avg AS 'com.training.hiveudfs.MovingAverageUDF';
-- Verify it appears in...
SHOW FUNCTIONS;
DESCRIBE FUNCTION EXTENDED moving_avg;
SELECT ymd, symbol, price_close, moving_avg(50, symbol, price_close)
FROM stocks
WHERE symbol = 'AAPL' LIMIT 20;
INSERT OVERWRITE LOCAL DIRECTORY '/tmp/apple_ibm'
SELECT ymd, symbol, price_close, moving_avg(50, symbol, price_close)
FROM stocks
WHERE symbol = 'AAPL' OR symbol = 'IBM';
-- When you're done with the directory, you can delete it.

You have successfully implemented a custom UDF in HIVE!