+

Let's say you don't have an AWS Direct Connect or a VPN connection from your AWS account to your on-premise datacenter. But you have some processed data that needs to be sent back to your on-premise Hadoop cluster upon completion. This requires you to somehow initiate a process (in this example a Hadoop Distcp) from your on-premise. How do we know when the processed data is ready? How do we know when to start the data copy process? Just leave a message!

Here is an example of a small AWS SQS consumer application that works with Hadoop and AWS s3 to copy processed data from s3 to your local Hadoop cluster.

In this example scenario, we have Amazon EMR processing data that is outputted to s3. The last step in the EMR workflow is to post a message to an AWS SQS queue. The body of this message contains the s3 path and destination HDFS directory I want the data copied to in the data center.

Amazon EMR --> s3 --> SQS message <-- Ruby Consumer --> Local Hadoop HDFS

We are leaving a message for our simple consumer to pick up!

What the consumer will see in the body will be a comma separated string that represents s3 source path (including bucket name), and requested destination path in local HDFS

bucket.name/path/to/s3/processed/data/,/path/to/local/hdfs/destination/

Here is the consumer code that will run in a loop acting on one message at a time.

require 'aws-sdk'
require 'logger'

@queue_name = ARGV[0].to_s
@aws_region = ARGV[1].to_s

@sqs = Aws::SQS::Client.new(region: @aws_region.to_s)
@log = Logger.new('/var/log/s3_to_hdfs/s3_to_hdfs.log')

def _retrieve_queue_url
  @sqs.get_queue_url(queue_name: @queue_name).to_h[:queue_url]
end

def _s3_path(message)
  message[:messages][0][:body].to_s.split(',')
end

def _retrieve_message_with_uri
  url = _retrieve_queue_url
  s3_path_result = nil
  message_receipt_handle = nil
  message = @sqs.receive_message(queue_url: url, max_number_of_messages: 1,
                                 wait_time_seconds: 1,
                                 visibility_timeout: 1).to_h
  unless message.empty?
    s3_path_result = _s3_path(message)
    message_receipt_handle = message[:messages][0][:receipt_handle]
  end
  [s3_path_result, message_receipt_handle]
end

def _delete_message(message_receipt_handle)
  url = _retrieve_queue_url
  message = @sqs.delete_message(queue_url: url,
                                receipt_handle: message_receipt_handle)
  @log.info message
end

def _run_hadoop_distcp
  s3_path_result, message_receipt_handle = _retrieve_message_with_uri
  if s3_path_result.nil?
    @log.info 'No SQS message presently'
  else
    # Distcp command to run using s3 input and hdfs local output
    # Assumes you have your AWS access keys available in core-site.xml
    # Must use s3a, available in Hadoop 2.x
    shellcmd_hadoop_distcp = `#{'hadoop distcp -overwrite s3a://'\
      '#{s3_path_result[0].to_s.strip} #{s3_path_result[1].to_s.strip}'}`
    # Remove message from SQS queue if distcp exits clean
    _delete_message(message_receipt_handle) if $? == 0
    @log.info shellcmd_hadoop_distcp
  end
end

def main
  # Run distcp function
  _run_hadoop_distcp
end

loop do
  main
  sleep(10)
end

This can be started by passing the AWS SQS queue name and region as 2 arguments.

# ruby s3_to_hdfs_copy.rb sqs_queue_name us-west-2

This consumer will perform the Hadoop Distcp with the provided source in s3 and a destination to your on-premise Hadoop cluster. Once the Distcp finishes the message receipt is returned to the AWS SQS queue and the message is removed. The consumer runs in a 10 second loop checking for messages on the specified SQS queue.

To make sure the consumer stays up and running we need to start it via a process management system. Here is an example using Upstart.

description "S3 to HDFS copy"

start on runlevel [2345]
stop on runlevel [!2345]
respawn

pre-start script

bash << "EOF"
  mkdir -p /var/log/s3_to_hdfs
  chown myuser:myuser /var/log/s3_to_hdfs
EOF

end script

exec su - myuser -c 'ruby /opt/s3_to_hdfs_copy/s3_to_hdfs_copy.rb sqs_queue_name us-west-2 &>>/var/log/s3_to_hdfs/s3_to_hdfs.log'

At this point we have a simple consumer working for us shuttling data from s3 to our local Hadoop cluster without any need for a VPN, Direct Connect or firewall rules. It is worth noting that data retrieval from s3 incurs bandwidth charges on your AWS account.

Happy message consuming... beep.