astyanaxを使ってみる
astyanaxは主にNetflixが開発を行っている Cassandra Java Client でcassandraへの読み書きはもちろん、主に自動的にリングやノードの状態を見て負荷を平均になるようにコントロールできる新しいクライアントです
astyanaxのインストール
事前に準備しておくもの
- maven2
- git
- JDK6
まずgitからastyanaxを取得してビルドします
git clone https://github.com/Netflix/astyanax.git
mvnを使用してローカルリポジトリにおきたい場合は取得したastyanaxのディレクトリ上で
mvn clean install
jarファイルだけほしい場合は
mvn package
astyanaxの依存関係のライブラリを取得したい場合は以下のコマンドでtarget/dependency に出力される
mvn dependency:copy-dependencies
またgradlewを使用してビルドしたい場合は
gradlew build
astyanaxで適当にCassandraを使用してみる
Cassandra側でKeyspaceとColumn Familyを作成しておく
create keyspace ks
with placement_strategy = 'SimpleStrategy'
and strategy_options = {replication_factor:2}
create column family cf
with column_type = 'Standard'
and comparator = 'UTF8Type'
and default_validation_class = 'UTF8Type'
and key_validation_class = 'UTF8Type'
以下のような単体で動かせるようなソースコードを作成(ソース名ConnectionTest.java)
import org.apache.log4j.PropertyConfigurator;
import com.netflix.astyanax.*;
import com.netflix.astyanax.AstyanaxContext.Builder;
import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
import com.netflix.astyanax.thrift.ThriftFamilyFactory;
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.model.ColumnList;
import com.netflix.astyanax.model.Column;
import com.netflix.astyanax.model.Row;
import com.netflix.astyanax.model.Rows;
import com.netflix.astyanax.serializers.StringSerializer;
import com.netflix.astyanax.connectionpool.OperationResult;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
import com.netflix.astyanax.connectionpool.impl.FixedRetryBackoffStrategy;
import com.netflix.astyanax.connectionpool.impl.SmaLatencyScoreStrategyImpl;
import com.netflix.astyanax.connectionpool.impl.BadHostDetectorImpl;
import com.netflix.astyanax.connectionpool.impl.ConnectionPoolType;
import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
public class ConnectionTest
{
public static void main(String[] argv) throws ConnectionException{
//適当にlog4jの設定を引数で渡してやる
String filePath = argv[0];
PropertyConfigurator.configure(filePath);
//コネクション設定と開始
AstyanaxContext<Keyspace> context = new AstyanaxContext.Builder()
.forCluster("{クラスタ名}")
.forKeyspace("ks")
.withAstyanaxConfiguration(new AstyanaxConfigurationImpl()
//.setDiscoveryType(NodeDiscoveryType.NONE) ノードの情報を意識しない場合の設定
.setDiscoveryType(NodeDiscoveryType.RING_DESCRIBE) // リング内のノードを意識したアクセスの仕方に設定
.setConnectionPoolType(ConnectionPoolType.TOKEN_AWARE) // ノードのトークンを意識したアクセスの仕方に設定
)
.withConnectionPoolConfiguration(new ConnectionPoolConfigurationImpl("MyConnectionPool")
.setPort(9160) // 使用ポート
.setMaxConnsPerHost(1) // 限界のコネクション数
.setSeeds("{ホスト}:9160") //Seedsのアドレス
)
.withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
.buildKeyspace(ThriftFamilyFactory.getInstance());
context.start();
Keyspace keyspace = context.getEntity();
//使用するカラムファミリの定義
ColumnFamily<String, String> CF_TEST =
new ColumnFamily<String, String>(
"cf", // カラムファミリ名
StringSerializer.get(), //キーのシリアライザ
StringSerializer.get()); //カラムのシリアライザ
//データの書込み
MutationBatch m = keyspace.prepareMutationBatch();
m.withRow(CF_TEST, "0001")
.putColumn("value1", "hoge", null)
.putColumn("value2", "hogehoge", null);
m.withRow(CF_TEST, "0002")
.putColumn("value1", "huga", null)
.putColumn("value2", "hugahuga", null);
try {
OperationResult<Void> result = m.execute();
} catch (ConnectionException e) {
}
//データの取得
OperationResult<Rows<String, String>> result =
keyspace.prepareQuery(CF_TEST)
.getKeySlice("0001", "0002")
.execute();
//取得したデータの表示
for (Row<String, String> row : result.getResult()) {
System.out.println(row.getKey());
for (Column<String> column : row.getColumns()) {
System.out.println(column.getName() + " " + column.getStringValue());
}
}
}
}
ビルドして実行(lib内に各jar、第一引数に適当なlog4jの設定ファイルで実行)
$ javac -cp ./lib/*:./ ConnectionTest.java
$ java -cp ./lib/*:./ ConnectionTest ./log4j.properties
0002
value1 huga
value2 hugahuga
0001
value1 hoge
value2 hogehoge