autozane Ruby

Adding AWS SNS via Ruby SDK is Deadman Simple

I wanted to write up a quick post on how easy it was to add AWS Simple Notification Service messaging support to deadman-check, a monitoring application written in Ruby.

Deadman-check was put together to be an alerting service that can alert when data isn't present or is stale. This is known as a deadman switch monitoring concept. Initially, it supported alerting to Slack channels directly. This was great but SNS support would be awesome! You may be wondering why support SNS? In a nutshell, SNS is a broadcast point. In deadman-check we can encapsulate the alert message into one message that can be sent to one endpoint and from there be broadcast to an unlimited number of subscribers. These subscribers may be SMS, email or possibly AWS Lambda functions that can all simultaneously receive the one message sent from deadman-check (I really like the last one). The SNS to AWS Lambda triggers really open up a lot of event driven alerting possibilities!

To add SNS support to Deadman-check I simply needed to initialize a Ruby object via the AWS Ruby SDK. I would produce the initialization only when the CLI flag is present and requesting it.

In my class initialization for the switch monitor I added the attr_accessors needed - alert_to_sns and alert_to_sns_region

These new accessors will be the SNS ARN (the unique AWS identifier for the SNS topic to send messge to) and the AWS region the topic is in.

    def initialize(host, port, target, alert_to_slack, alert_to_sns,
                  alert_to_sns_region, recurse, daemon_sleep)
    --snip--
    end

During initialization I will create the connection to SNS unless the variables are nil.

    unless @alert_to_sns.nil?
      @sns = Aws::SNS::Client.new(
        region: @alert_to_sns_region
        )
    end

To be able to talk to SNS from the application there are a few different ways you can authenticate.

1. ENV['AWS_ACCESS_KEY_ID'] and ENV['AWS_SECRET_ACCESS_KEY']
2. The shared credentials ini file at ~/.aws/credentials (more information)
3. From an instance profile when running on EC2

Depending on how you deploy the deadman-check service one of the above methods should be easy to make available on the running EC2 instance or from a server in your datacenter.

Now that the client connection to AWS is taken care of let's move onto the message publish function! This part is the juicy part of generating the one message that will get sent to the SNS topic for broadcast.

    def sns_alert(alert_to_sns, target, epoch_diff)
      @sns.publish(
        target_arn: @alert_to_sns,
        message_structure: 'json',
        message: {
          :default => "Alert: Deadman Switch triggered for #{target}",
          :email => "Alert: Deadman Switch triggered for #{target}, with
          #{epoch_diff} seconds since last run",
          :sms => "Alert: Deadman Switch for #{target}"
        }.to_json
      )
    end

This function will take in some business logic from other parts of deadman-check and generate a message that will publish to the topic (target). In the message: hash element you can see we can define a 'default' and then more specific messages based on the type of SNS subscriber. SMS messages can be shorter and more concise, while maybe Email might be more wordy given we have more screen real estate to work with.

That is it! The application is now ready to publish to SNS. The sky's the limit.

Managing Terraform Versions on Jenkins

When leveraging Terraform to code your infrastructure you will notice that the release cadence for new versions of Terraform is fast. In this post I will show you how to manage upgrading versions of Terraform on a per Jenkins Job basis. This will allow you to run multiple versions of Terraform on your Jenkins system and gives you the flexibility to control when to upgrade a given Terraform state to a newer version of Terraform.

We will be leveraging this great open source wrapper: tfenv. Thank you kamatam41 for sharing this.

To get this installed on a Jenkins system I have provide a small Chef snippet for this:

This will setup a couple versions of Terraform for our job to work with. Here is the example Jenkins Terraform job. We will leverage some Terraform code I put on Github that will create an AWS S3 bucket, a S3 bucket policy to attach to the S3 bucket, and an AWS VPC.

I want to leverage a new Terraform AWS resource provider new to 0.7.3 called aws_s3_bucket_policy. We add it to our Terraform code example, seen here.

Let's first try to 'plan' this Terraform code using 0.7.2. With this version we should expect to see a failure to plan since the aws_s3_bucket_policy resource isn't available in this version. We can control the version we want to use for Terraform with the hidden file .terraform-version in our configuration directory where we run Terraform.

Note we see the failure:

aws_s3_bucket_policy.autozane_s3_policy: Provider doesn't support resource: aws_s3_bucket_policy

Ok, now let's commit a change to our .terraform-version file and call out the usage of 0.7.3 for this state. We should see now see a successful 'plan' that will include the creation of the requested AWS resources (S3 bucket, S3 Bucket Policy, VPC).

Success! We were able to leverage tfenv inside of our Jenkins CI environment to upgrade to a newer version of Terraform. This upgrade didn't affect any other Terraform jobs on this Jenkins system that were perhaps using other Terraform versions.

Example Terraform and Jenkins config

tfenv on Github

Chef to install tfenv on Jenkins

Leave a message at the beep, I mean in the queue

+

Let's say you don't have an AWS Direct Connect or a VPN connection from your AWS account to your on-premise datacenter. But you have some processed data that needs to be sent back to your on-premise Hadoop cluster upon completion. This requires you to somehow initiate a process (in this example a Hadoop Distcp) from your on-premise. How do we know when the processed data is ready? How do we know when to start the data copy process? Just leave a message!

Here is an example of a small AWS SQS consumer application that works with Hadoop and AWS s3 to copy processed data from s3 to your local Hadoop cluster.

In this example scenario, we have Amazon EMR processing data that is outputted to s3. The last step in the EMR workflow is to post a message to an AWS SQS queue. The body of this message contains the s3 path and destination HDFS directory I want the data copied to in the data center.

Amazon EMR --> s3 --> SQS message <-- Ruby Consumer --> Local Hadoop HDFS

We are leaving a message for our simple consumer to pick up!

What the consumer will see in the body will be a comma separated string that represents s3 source path (including bucket name), and requested destination path in local HDFS

bucket.name/path/to/s3/processed/data/,/path/to/local/hdfs/destination/

Here is the consumer code that will run in a loop acting on one message at a time.

require 'aws-sdk'
require 'logger'

@queue_name = ARGV[0].to_s
@aws_region = ARGV[1].to_s

@sqs = Aws::SQS::Client.new(region: @aws_region.to_s)
@log = Logger.new('/var/log/s3_to_hdfs/s3_to_hdfs.log')

def _retrieve_queue_url
  @sqs.get_queue_url(queue_name: @queue_name).to_h[:queue_url]
end

def _s3_path(message)
  message[:messages][0][:body].to_s.split(',')
end

def _retrieve_message_with_uri
  url = _retrieve_queue_url
  s3_path_result = nil
  message_receipt_handle = nil
  message = @sqs.receive_message(queue_url: url, max_number_of_messages: 1,
                                 wait_time_seconds: 1,
                                 visibility_timeout: 1).to_h
  unless message.empty?
    s3_path_result = _s3_path(message)
    message_receipt_handle = message[:messages][0][:receipt_handle]
  end
  [s3_path_result, message_receipt_handle]
end

def _delete_message(message_receipt_handle)
  url = _retrieve_queue_url
  message = @sqs.delete_message(queue_url: url,
                                receipt_handle: message_receipt_handle)
  @log.info message
end

def _run_hadoop_distcp
  s3_path_result, message_receipt_handle = _retrieve_message_with_uri
  if s3_path_result.nil?
    @log.info 'No SQS message presently'
  else
    # Distcp command to run using s3 input and hdfs local output
    # Assumes you have your AWS access keys available in core-site.xml
    # Must use s3a, available in Hadoop 2.x
    shellcmd_hadoop_distcp = `#{'hadoop distcp -overwrite s3a://'\
      '#{s3_path_result[0].to_s.strip} #{s3_path_result[1].to_s.strip}'}`
    # Remove message from SQS queue if distcp exits clean
    _delete_message(message_receipt_handle) if $? == 0
    @log.info shellcmd_hadoop_distcp
  end
end

def main
  # Run distcp function
  _run_hadoop_distcp
end

loop do
  main
  sleep(10)
end

This can be started by passing the AWS SQS queue name and region as 2 arguments.

# ruby s3_to_hdfs_copy.rb sqs_queue_name us-west-2

This consumer will perform the Hadoop Distcp with the provided source in s3 and a destination to your on-premise Hadoop cluster. Once the Distcp finishes the message receipt is returned to the AWS SQS queue and the message is removed. The consumer runs in a 10 second loop checking for messages on the specified SQS queue.

To make sure the consumer stays up and running we need to start it via a process management system. Here is an example using Upstart.

description "S3 to HDFS copy"

start on runlevel [2345]
stop on runlevel [!2345]
respawn

pre-start script

bash << "EOF"
  mkdir -p /var/log/s3_to_hdfs
  chown myuser:myuser /var/log/s3_to_hdfs
EOF

end script

exec su - myuser -c 'ruby /opt/s3_to_hdfs_copy/s3_to_hdfs_copy.rb sqs_queue_name us-west-2 &>>/var/log/s3_to_hdfs/s3_to_hdfs.log'

At this point we have a simple consumer working for us shuttling data from s3 to our local Hadoop cluster without any need for a VPN, Direct Connect or firewall rules. It is worth noting that data retrieval from s3 incurs bandwidth charges on your AWS account.

Happy message consuming... beep.