原创文章,欢迎转载,转载请注明出处:http://blog.csdn.net/jmppok/article/details/16840231
参考1:
参考2:
参考3:
在Strom中使用C++开发Bolt总结
参考4:
Storm DRPC 使用
方法:
1)根据[参考2]介绍,实现C++的Bolt; 其中storm.h 和 storm.cpp可从其中下载。JsonCPP可从官方下载编译。这里提供一个编译好的:jsoncpp 0.6.0rc2 代码+ubuntu下gcc编译结果
SplitSentence.h修改后的代码
#ifndef SPLIT_SENTENCE_H
#define SPLIT_SENTENCE_H
#include <string>
#include <vector>
#include "Storm.h"
#include "json/json.h"
using namespace std;
namespace storm
{
class SplitSentence : public BasicBolt
{
public:
void Initialize(Json::Value conf, Json::Value context) { }
void Process(Tuple &tuple)
{
// first is args
std::string s = tuple.GetValues()[0].asString();
s+="-------------";
Json::Value j_token;
j_token.append(s);
j_token.append(tuple.GetValues()[1].asString());
Tuple t(j_token);
Emit(t);
}
};
}
#endif
Test.cpp代码
#include <iostream>
#include <stdexcept>
#include "SplitSentence.h"
using namespace storm;
using namespace std;
int main(int argc, char *argv[])
{
SplitSentence b;
b.Run();
return 0;
}
编译生成 my_app:
g++ -o my_app Test.cpp Storm.cpp -I ../../jsoncpp/jsoncpp/include -L ../../jsoncpp/jsoncpp/libs/linux-gcc-4.6/ -ljson_linux-gcc-4.6_libmt
需要注意Storm.h和Storm.cpp 以及Jsoncpp的位置。
编译完成后将其拷贝到Java工程的resources目录下,如果有依赖的动态库,同样考过去。保证在运行my_app时能够找到所有的依赖库。
2) 实现Java的壳,CppBolt.java
package drpc;
import java.util.HashMap;
import java.util.Map;
import backtype.storm.task.ShellBolt;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class CppBolt extends ShellBolt implements IRichBolt/* implements IBasicBolt */{
public CppBolt()
{
super("/bin/sh","start.sh");
}
public void prepare(Map conf, TopologyContext context) {
}
/*
public void execute(Tuple tuple, BasicOutputCollector collector) {
String input = tuple.getString(1);
collector.emit(new Values(tuple.getValue(0), input + " @ " + getLocalIP()+":"+getPid()+":"+getThreadId()));
}
*/
public void cleanup() {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "result"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
}
注意在CppBolt构造函数中需要使用sh进行中转,不能直接运行my_app,(会没有权限),见[参考3]中的说明。
3)创建Topology
TopologyBuilder builder = new TopologyBuilder();
DRPCSpout drpcSpout = new DRPCSpout("drpc-query");
builder.setSpout("drpc-input", drpcSpout,5);
//我们的CPPBolt
builder.setBolt("cpp", new CppBolt(), 5)
.noneGrouping("drpc-input");
builder.setBolt("return", new ReturnResults(),5)
.noneGrouping("cpp");
Config conf = new Config();
conf.setDebug(false);
conf.setMaxTaskParallelism(3);
try
{
StormSubmitter.submitTopology("drpc-q", conf,builder.createTopology());
}
catch (Exception e)
{
e.printStackTrace();
}
4.将所有程序打包成一个jar包。
5.通过storm jar提交到storm Cluster中;
需要注意drpc server的配置问题,并启动之。见[参考4]
6.客户端访问,见[参考4]
分享到:
相关推荐
storm DRPC简单例程,服务器端是运行在集群环境中的,客户端去调用DRPC服务
这是storm中drpc应用的一个例子。
storm之drpc操作demo示例
Storm-drpc节点适用于Node.js的Apache Storm DRPC客户端受启发,但不同之处在于可以选择将其设置为保持活动状态,它不需要在每个execute()调用中都创建连接,并且可以喜欢的传统方式或promise方式使用它。...
• BasicBolt • Storm 批处理 • Storm TOPN • Storm 流程聚合 • Storm DRPC • Storm executor、worker、task之间的关系和调优 • Storm异常解决
01.Storm基础知识02.Storm集群安装-1-new .avi.baiduyun.p05.Storm配置文件配置项讲解07.Storm基本API介绍08.Storm Topology的并发度09.Strom消息机制原理讲解10.Storm DRPC实战讲解
内容概要: • Storm 记录级容错原理 • Storm 配置详解 • Storm 批处理 • Storm TOPN • Storm 流程聚合 • Storm DRPC • Storm executor、worker、task之间的关系和调优 • Storm异常解决
2、注重实践,对较抽象难懂的技术点如Grouping策略、并发度及线程安全、批处理事务、DRPC、Storm Trident均结合企业场景开发案例进行讲解,简单易懂; 3、分享积累的经验和技巧,从架构的角度剖析场景和设计实现...
linux实验环境,storm搭建完毕后的开发。eclipse开发环境,大数据界hello world——wordcount详解,bolt、分组机制、storm DRPC详解
《Storm实战:构建大数据实时计算 》一共分为10章:第1章全面介绍了Storm的特性、能解决什么问题,以及和其他流计算系统的对比;第2章通过实际运行一个简单的例子,以及介绍本地环境和集群环境的搭建,让读者对...
3、注重实践,对较抽象难懂的技术点如Grouping策略、并发度及线程安全、批处理事务、DRPC、Storm Trident均结合企业场景开发案例进行讲解,让学员觉得简单易懂; 4、每个技术均采用最新稳定版本,学完后会员可以从...
【Storm篇】--Storm中的同步服务DRPC 【Storm篇】--Storm从初始到分布式搭建 【Storm篇】--Storm 容错机制 【Storm篇】--Storm并发机制 【Storm篇】--Storm分组策略 【Storm篇】--Storm基础概念
dRPC 一些描述。要求移液器> = 0.1.4安装pip install dRPC特征一些功能。文献资料例子一些描述。 # Some code 新分支测试
插入式轻量级 gRPC 替代品。 链接 强调 很简单,只有几千。 。 go.mod 中只有 3 个要求,还有 9 行go mod graph ! 兼容的。 适用于许多 gRPC 用例!... DRPC 具有闪电般的... 例如,可以针对drpchttp包使用Twirp客户
风暴Debian包装 用于分布式实时计算系统的... 提供以下服务的软件包: storm-drpc storm-logviewer storm-nimbus storm-supervisor storm-ui 还有storm-common软件包,它是服务软件包的依赖项。 还有一个storm软件包,
DRPC:简单的Discord RPC程序
目前包括几个服务: storm-mesos - Nimbus(Mesos 调度程序) storm-ui - Web 界面(默认端口为8080 ) storm-drpc - DRPC 守护进程storm-log - 用于在 Web UI 中显示日志的服务(从计算节点获取日志) 请注意,不...
分布式RPC(distributedRPC,DRPC)用于对Storm上大量的函数调用进行并行计算过程。对于每一次函数调用,Storm集群上运行的拓扑接收调用函数的参数信息作为输入流,并将计算结果作为输出流发射出去。DRPC本身算不上...