InfoTalk Summer Workshop 2012(Amazon DynamoDB Hackathon)に行ってきました。
DynamoDBにさわるのは初めてでしたが、非常に有意義な時間でした。今回はチームに分かれてDynamoDBを使用したアプリを作ったりしました。
このDBの一番の特徴は読み書きのThroughputをコントロール出来る点です。これにより運用コストを低減することが出来ます。
但し、現状はThroughputのダウンが1日1回のみの制限がありますので、注意が必要です。
ハッカソンで作ったものではないですが、DynamoDBがどのように動くのかを確認する為に、
CouchDBに流し込んでいるPythonスクリプトを変更し、DynamoDBに入れ込むようにしてみました。
PythonからDynamoDBを扱う場合は、botoを使用します。なお、本スクリプトを実行前にテーブルは作成済みであることとします。
# -*- coding: utf-8 -*- import sys import tweepy import json import time import jsonpickle from datetime import datetime import re import boto CONSUMER_KEY = 'YOUR CONSUMERKEY' CONSUMER_SECRET = 'YOUR CONSUMER_SECRET' ACCESS_TOKEN = 'YOUR ACCESS_TOKEN' ACCESS_TOKEN_SECRET = 'YOUR ACCESS_TOKEN_SECRET' auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET) auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET) conn = boto.connect_dynamodb() table = conn.get_table('twitter') count = 0 class CustomStreamListener(tweepy.StreamListener): def on_status(self, status): results = {} try: if re.match(u"[ァ-ヶーぁ-ん]" , status.text): pickled = jsonpickle.encode(status) results = json.loads(pickled) del results['_api'] item = table.new_item( hash_key=int(results['id_str']), range_key=20120729, attrs={'user': results['author']['screen_name'], 'text': results['text']}) item.put() #print item except Exception, e: print >> sys.stderr, "Encountered Exception:", e pass def on_error(self, status_code): print >> sys.stderr, "Encountered error with status code:", status_code return True def on_timeout(self): print >> sys.stderr, "Timeout..." return True streaming_api = tweepy.streaming.Stream(auth, CustomStreamListener(), timeout=60) try: # sample(): streaming_api.sample() #streaming_api.filter(follow=None, locations=QUERY) #streaming_api.filter(follow=None, track=QUERY) streaming_api.sample() except Exception, e: print >> sys.stderr, "Error: '%s' '%s'" % (str(datetime.now()), str(e)) finally: print >> sys.stderr, "disconnecting..." streaming_api.disconnect()