当前位置:网站首页>Fink异步IO的实战(关联维表)
Fink异步IO的实战(关联维表)
2022-07-30 14:29:00 【顶尖高手养成计划】
简介
异步io实战
知识前提
线程池异步io
应用程序
public class ASyncIODemo {
public static void main(String[] args) throws Exception {
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.Source
//DataStreamSource[1,2,3,4,5]
DataStreamSource<CategoryInfo> categoryDS = env.addSource(new RichSourceFunction<CategoryInfo>() {
private Boolean flag = true;
@Override
public void run(SourceContext<CategoryInfo> ctx) throws Exception {
Integer[] ids = {1, 2, 3, 4, 5};
for (Integer id : ids) {
ctx.collect(new CategoryInfo(id, null));
}
}
@Override
public void cancel() {
this.flag = false;
}
});
//方式一:线程池模拟异步IO
//unorderedWait无序等待
SingleOutputStreamOperator<CategoryInfo> result2 = AsyncDataStream
.unorderedWait(categoryDS, new ASyncIOFunction2(), 1000, TimeUnit.SECONDS, 10);
//打印结果
result2.print();
env.execute();
}
}
/**
* 同步调用+线程池模拟异步IO
*/
class ASyncIOFunction2 extends RichAsyncFunction<CategoryInfo, CategoryInfo> {
private HashMap<Integer,String> dic_name=new HashMap<>();
private ExecutorService executorService;//线程池
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
dic_name.put(1,"手机");
dic_name.put(2,"电脑");
dic_name.put(3,"服装");
executorService = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
//异步发送请求
@Override
public void asyncInvoke(CategoryInfo input, ResultFuture<CategoryInfo> resultFuture) throws Exception {
executorService.execute(new Runnable() {
@Override
public void run() {
//TODO 这里查询数据库
String resName = dic_name.get(input.getId());
input.setName(resName);
resultFuture.complete(Collections.singletonList(input));
}
});
}
@Override
public void close() throws Exception {
}
@Override
public void timeout(CategoryInfo input, ResultFuture<CategoryInfo> resultFuture) throws Exception {
System.out.println("async call time out!");
input.setName("未知");
resultFuture.complete(Collections.singleton(input));
}
}输出结果
1> CategoryInfo(id=4, name=null)
2> CategoryInfo(id=5, name=null)
15> CategoryInfo(id=2, name=电脑)
14> CategoryInfo(id=1, name=手机)
16> CategoryInfo(id=3, name=服装)Java-vertx中提供的异步client实现异步IO
class ASyncIOFunction1 extends RichAsyncFunction<CategoryInfo, CategoryInfo> {
private transient SQLClient mySQLClient;
@Override
public void open(Configuration parameters) throws Exception {
JsonObject mySQLClientConfig = new JsonObject();
mySQLClientConfig
.put("driver_class", "com.mysql.jdbc.Driver")
.put("url", "jdbc:mysql://localhost:3306/bigdata")
.put("user", "root")
.put("password", "root")
.put("max_pool_size", 20);
VertxOptions options = new VertxOptions();
options.setEventLoopPoolSize(10);
options.setWorkerPoolSize(20);
Vertx vertx = Vertx.vertx(options);
//根据上面的配置参数获取异步请求客户端
mySQLClient = JDBCClient.createNonShared(vertx, mySQLClientConfig);
}
//使用异步客户端发送异步请求
@Override
public void asyncInvoke(CategoryInfo input, ResultFuture<CategoryInfo> resultFuture) throws Exception {
mySQLClient.getConnection(new Handler<AsyncResult<SQLConnection>>() {
@Override
public void handle(AsyncResult<SQLConnection> sqlConnectionAsyncResult) {
if (sqlConnectionAsyncResult.failed()) {
return;
}
SQLConnection connection = sqlConnectionAsyncResult.result();
connection.query("select id,name from t_category where id = " +input.getId(), new Handler<AsyncResult<ResultSet>>() {
@Override
public void handle(AsyncResult<io.vertx.ext.sql.ResultSet> resultSetAsyncResult) {
if (resultSetAsyncResult.succeeded()) {
List<JsonObject> rows = resultSetAsyncResult.result().getRows();
for (JsonObject jsonObject : rows) {
CategoryInfo categoryInfo = new CategoryInfo(jsonObject.getInteger("id"), jsonObject.getString("name"));
resultFuture.complete(Collections.singletonList(categoryInfo));
}
}
}
});
}
});
}
@Override
public void close() throws Exception {
mySQLClient.close();
}
@Override
public void timeout(CategoryInfo input, ResultFuture<CategoryInfo> resultFuture) throws Exception {
System.out.println("async call time out!");
input.setName("未知");
resultFuture.complete(Collections.singleton(input));
}
}异步IO读取Redis数据
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<String> lines = env.readTextFile("data/input/city.txt");
SingleOutputStreamOperator<String> result1 = AsyncDataStream.orderedWait(lines, new AsyncRedis(), 10, TimeUnit.SECONDS, 1);
SingleOutputStreamOperator<String> result2 = AsyncDataStream.orderedWait(lines, new AsyncRedisByVertx(), 10, TimeUnit.SECONDS, 1);
result1.print().setParallelism(1);
result2.print().setParallelism(1);
env.execute();
}
}
/**
* 使用异步的方式读取redis的数据
*/
class AsyncRedis extends RichAsyncFunction<String, String> {
//定义redis的连接池对象
private JedisPoolConfig config = null;
private static String ADDR = "localhost";
private static int PORT = 6379;
//等待可用连接的最大时间,单位是毫秒,默认是-1,表示永不超时,如果超过等待时间,则会抛出异常
private static int TIMEOUT = 10000;
//定义redis的连接池实例
private JedisPool jedisPool = null;
//定义连接池的核心对象
private Jedis jedis = null;
//初始化redis的连接
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//定义连接池对象属性配置
config = new JedisPoolConfig();
//初始化连接池对象
jedisPool = new JedisPool(config, ADDR, PORT, TIMEOUT);
//实例化连接对象(获取一个可用的连接)
jedis = jedisPool.getResource();
}
@Override
public void close() throws Exception {
super.close();
if(jedis.isConnected()){
jedis.close();
}
}
//异步调用redis
@Override
public void asyncInvoke(String input, ResultFuture<String> resultFuture) throws Exception {
System.out.println("input:"+input);
//发起一个异步请求,返回结果
CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
String[] arrayData = input.split(",");
String name = arrayData[1];
String value = jedis.hget("AsyncReadRedis", name);
System.out.println("output:"+value);
return value;
}
}).thenAccept((String dbResult)->{
//设置请求完成时的回调,将结果返回
resultFuture.complete(Collections.singleton(dbResult));
});
}
//连接超时的时候调用的方法,一般在该方法中输出连接超时的错误日志,如果不重新该方法,连接超时后会抛出异常
@Override
public void timeout(String input, ResultFuture<String> resultFuture) throws Exception {
System.out.println("redis connect timeout!");
}
}
/**
* 使用高性能异步组件vertx实现类似于连接池的功能,效率比连接池要高
* 1)在java版本中可以直接使用
* 2)如果在scala版本中使用的话,需要scala的版本是2.12+
*/
class AsyncRedisByVertx extends RichAsyncFunction<String,String> {
//用transient关键字标记的成员变量不参与序列化过程
private transient RedisClient redisClient;
//获取连接池的配置对象
private JedisPoolConfig config = null;
//获取连接池
JedisPool jedisPool = null;
//获取核心对象
Jedis jedis = null;
//Redis服务器IP
private static String ADDR = "localhost";
//Redis的端口号
private static int PORT = 6379;
//访问密码
private static String AUTH = "XXXXXX";
//等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出JedisConnectionException;
private static int TIMEOUT = 10000;
private static final Logger logger = LoggerFactory.getLogger(AsyncRedis.class);
//初始化连接
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
config = new JedisPoolConfig();
jedisPool = new JedisPool(config, ADDR, PORT, TIMEOUT);
jedis = jedisPool.getResource();
RedisOptions config = new RedisOptions();
config.setHost(ADDR);
config.setPort(PORT);
VertxOptions vo = new VertxOptions();
vo.setEventLoopPoolSize(10);
vo.setWorkerPoolSize(20);
Vertx vertx = Vertx.vertx(vo);
redisClient = RedisClient.create(vertx, config);
}
//数据异步调用
@Override
public void asyncInvoke(String input, ResultFuture<String> resultFuture) throws Exception {
System.out.println("input:"+input);
String[] split = input.split(",");
String name = split[1];
// 发起一个异步请求
redisClient.hget("AsyncReadRedis", name, res->{
if(res.succeeded()){
String result = res.result();
if(result== null){
resultFuture.complete(null);
return;
}
else {
// 设置请求完成时的回调: 将结果传递给 collector
resultFuture.complete(Collections.singleton(result));
}
}else if(res.failed()) {
resultFuture.complete(null);
return;
}
});
}
@Override
public void timeout(String input, ResultFuture resultFuture) throws Exception {
}
@Override
public void close() throws Exception {
super.close();
if (redisClient != null) {
redisClient.close(null);
}
}
边栏推荐
- 深入浅出零钱兑换问题——背包问题的套壳
- Application of time series database in the field of ship risk management
- v-model组件化编程应用
- 【Vue.js 3.0源码】KeepAlive 组件:如何让组件在内存中缓存和调度?
- canal抓取数据
- 【回归预测-lssvm分类】基于最小二乘支持向量机lssvm实现数据分类代码
- 还在说软件测试没有中年危机?9年测试工程师惨遭淘汰
- 华为再发「天才少年」召集令!曾放弃360万年薪的他也来首秀
- JUC常见的线程池源码学习 02 ( ThreadPoolExecutor 线程池 )
- 5G-based Warehousing Informatization Solution 2022
猜你喜欢

Eight years of testing experience, why was the leader criticized: the test documents you wrote are not as good as those of fresh graduates

打破原则引入SQL,MongoDB到底想要干啥?

深入浅出零钱兑换问题——背包问题的套壳

机房布线的至高境界,美到窒息
4位资深专家多年大厂经验分享出Flink技术内幕架构设计与实现原理

Flink real-time data warehouse completed

Flask框架——Sijax

去腾讯面试,直接让人出门左拐 :幂等性都不知道!

JVM性能调优

还在说软件测试没有中年危机?9年测试工程师惨遭淘汰
随机推荐
71-page comprehensive overall solution for global tourism 2021 ppt
Eclipse connects to SQL server database "recommended collection"
[Enlightenment by Opportunity-53]: "Sushu"-3- Self-cultivation and Self-cultivation
Flask Framework - Flask-Mail Mail
JSON common annotations
吃透Chisel语言.28.Chisel进阶之有限状态机(二)——Mealy状态机及与Moore状态机的对比
What is the relationship between the construction of smart cities and 5G technology in the new era
CS内网横向移动 模拟渗透实操 超详细
The main content of terrain analysis (the special effect level of the wandering earth)
Flask框架——Sijax
Web消息推送之SSE
This editor actually claims to be as fast as lightning!
canal抓取数据
关于MySQL主从复制的数据同步延迟问题
关于华为应用市场审核App无法启动的问题
Lock wait timeout exceeded解决方案
3 years of software testing experience, the interview requires a monthly salary of 22K, obviously he has memorized a lot of interview questions...
【回归预测-lssvm分类】基于最小二乘支持向量机lssvm实现数据分类代码
三电系统集成技术杂谈
Application of time series database in the field of ship risk management