当前位置:网站首页>Fink异步IO的实战(关联维表)
Fink异步IO的实战(关联维表)
2022-07-30 14:29:00 【顶尖高手养成计划】
简介
异步io实战
知识前提
线程池异步io
应用程序
public class ASyncIODemo {
public static void main(String[] args) throws Exception {
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.Source
//DataStreamSource[1,2,3,4,5]
DataStreamSource<CategoryInfo> categoryDS = env.addSource(new RichSourceFunction<CategoryInfo>() {
private Boolean flag = true;
@Override
public void run(SourceContext<CategoryInfo> ctx) throws Exception {
Integer[] ids = {1, 2, 3, 4, 5};
for (Integer id : ids) {
ctx.collect(new CategoryInfo(id, null));
}
}
@Override
public void cancel() {
this.flag = false;
}
});
//方式一:线程池模拟异步IO
//unorderedWait无序等待
SingleOutputStreamOperator<CategoryInfo> result2 = AsyncDataStream
.unorderedWait(categoryDS, new ASyncIOFunction2(), 1000, TimeUnit.SECONDS, 10);
//打印结果
result2.print();
env.execute();
}
}
/**
* 同步调用+线程池模拟异步IO
*/
class ASyncIOFunction2 extends RichAsyncFunction<CategoryInfo, CategoryInfo> {
private HashMap<Integer,String> dic_name=new HashMap<>();
private ExecutorService executorService;//线程池
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
dic_name.put(1,"手机");
dic_name.put(2,"电脑");
dic_name.put(3,"服装");
executorService = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
//异步发送请求
@Override
public void asyncInvoke(CategoryInfo input, ResultFuture<CategoryInfo> resultFuture) throws Exception {
executorService.execute(new Runnable() {
@Override
public void run() {
//TODO 这里查询数据库
String resName = dic_name.get(input.getId());
input.setName(resName);
resultFuture.complete(Collections.singletonList(input));
}
});
}
@Override
public void close() throws Exception {
}
@Override
public void timeout(CategoryInfo input, ResultFuture<CategoryInfo> resultFuture) throws Exception {
System.out.println("async call time out!");
input.setName("未知");
resultFuture.complete(Collections.singleton(input));
}
}输出结果
1> CategoryInfo(id=4, name=null)
2> CategoryInfo(id=5, name=null)
15> CategoryInfo(id=2, name=电脑)
14> CategoryInfo(id=1, name=手机)
16> CategoryInfo(id=3, name=服装)Java-vertx中提供的异步client实现异步IO
class ASyncIOFunction1 extends RichAsyncFunction<CategoryInfo, CategoryInfo> {
private transient SQLClient mySQLClient;
@Override
public void open(Configuration parameters) throws Exception {
JsonObject mySQLClientConfig = new JsonObject();
mySQLClientConfig
.put("driver_class", "com.mysql.jdbc.Driver")
.put("url", "jdbc:mysql://localhost:3306/bigdata")
.put("user", "root")
.put("password", "root")
.put("max_pool_size", 20);
VertxOptions options = new VertxOptions();
options.setEventLoopPoolSize(10);
options.setWorkerPoolSize(20);
Vertx vertx = Vertx.vertx(options);
//根据上面的配置参数获取异步请求客户端
mySQLClient = JDBCClient.createNonShared(vertx, mySQLClientConfig);
}
//使用异步客户端发送异步请求
@Override
public void asyncInvoke(CategoryInfo input, ResultFuture<CategoryInfo> resultFuture) throws Exception {
mySQLClient.getConnection(new Handler<AsyncResult<SQLConnection>>() {
@Override
public void handle(AsyncResult<SQLConnection> sqlConnectionAsyncResult) {
if (sqlConnectionAsyncResult.failed()) {
return;
}
SQLConnection connection = sqlConnectionAsyncResult.result();
connection.query("select id,name from t_category where id = " +input.getId(), new Handler<AsyncResult<ResultSet>>() {
@Override
public void handle(AsyncResult<io.vertx.ext.sql.ResultSet> resultSetAsyncResult) {
if (resultSetAsyncResult.succeeded()) {
List<JsonObject> rows = resultSetAsyncResult.result().getRows();
for (JsonObject jsonObject : rows) {
CategoryInfo categoryInfo = new CategoryInfo(jsonObject.getInteger("id"), jsonObject.getString("name"));
resultFuture.complete(Collections.singletonList(categoryInfo));
}
}
}
});
}
});
}
@Override
public void close() throws Exception {
mySQLClient.close();
}
@Override
public void timeout(CategoryInfo input, ResultFuture<CategoryInfo> resultFuture) throws Exception {
System.out.println("async call time out!");
input.setName("未知");
resultFuture.complete(Collections.singleton(input));
}
}异步IO读取Redis数据
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<String> lines = env.readTextFile("data/input/city.txt");
SingleOutputStreamOperator<String> result1 = AsyncDataStream.orderedWait(lines, new AsyncRedis(), 10, TimeUnit.SECONDS, 1);
SingleOutputStreamOperator<String> result2 = AsyncDataStream.orderedWait(lines, new AsyncRedisByVertx(), 10, TimeUnit.SECONDS, 1);
result1.print().setParallelism(1);
result2.print().setParallelism(1);
env.execute();
}
}
/**
* 使用异步的方式读取redis的数据
*/
class AsyncRedis extends RichAsyncFunction<String, String> {
//定义redis的连接池对象
private JedisPoolConfig config = null;
private static String ADDR = "localhost";
private static int PORT = 6379;
//等待可用连接的最大时间,单位是毫秒,默认是-1,表示永不超时,如果超过等待时间,则会抛出异常
private static int TIMEOUT = 10000;
//定义redis的连接池实例
private JedisPool jedisPool = null;
//定义连接池的核心对象
private Jedis jedis = null;
//初始化redis的连接
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//定义连接池对象属性配置
config = new JedisPoolConfig();
//初始化连接池对象
jedisPool = new JedisPool(config, ADDR, PORT, TIMEOUT);
//实例化连接对象(获取一个可用的连接)
jedis = jedisPool.getResource();
}
@Override
public void close() throws Exception {
super.close();
if(jedis.isConnected()){
jedis.close();
}
}
//异步调用redis
@Override
public void asyncInvoke(String input, ResultFuture<String> resultFuture) throws Exception {
System.out.println("input:"+input);
//发起一个异步请求,返回结果
CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
String[] arrayData = input.split(",");
String name = arrayData[1];
String value = jedis.hget("AsyncReadRedis", name);
System.out.println("output:"+value);
return value;
}
}).thenAccept((String dbResult)->{
//设置请求完成时的回调,将结果返回
resultFuture.complete(Collections.singleton(dbResult));
});
}
//连接超时的时候调用的方法,一般在该方法中输出连接超时的错误日志,如果不重新该方法,连接超时后会抛出异常
@Override
public void timeout(String input, ResultFuture<String> resultFuture) throws Exception {
System.out.println("redis connect timeout!");
}
}
/**
* 使用高性能异步组件vertx实现类似于连接池的功能,效率比连接池要高
* 1)在java版本中可以直接使用
* 2)如果在scala版本中使用的话,需要scala的版本是2.12+
*/
class AsyncRedisByVertx extends RichAsyncFunction<String,String> {
//用transient关键字标记的成员变量不参与序列化过程
private transient RedisClient redisClient;
//获取连接池的配置对象
private JedisPoolConfig config = null;
//获取连接池
JedisPool jedisPool = null;
//获取核心对象
Jedis jedis = null;
//Redis服务器IP
private static String ADDR = "localhost";
//Redis的端口号
private static int PORT = 6379;
//访问密码
private static String AUTH = "XXXXXX";
//等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出JedisConnectionException;
private static int TIMEOUT = 10000;
private static final Logger logger = LoggerFactory.getLogger(AsyncRedis.class);
//初始化连接
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
config = new JedisPoolConfig();
jedisPool = new JedisPool(config, ADDR, PORT, TIMEOUT);
jedis = jedisPool.getResource();
RedisOptions config = new RedisOptions();
config.setHost(ADDR);
config.setPort(PORT);
VertxOptions vo = new VertxOptions();
vo.setEventLoopPoolSize(10);
vo.setWorkerPoolSize(20);
Vertx vertx = Vertx.vertx(vo);
redisClient = RedisClient.create(vertx, config);
}
//数据异步调用
@Override
public void asyncInvoke(String input, ResultFuture<String> resultFuture) throws Exception {
System.out.println("input:"+input);
String[] split = input.split(",");
String name = split[1];
// 发起一个异步请求
redisClient.hget("AsyncReadRedis", name, res->{
if(res.succeeded()){
String result = res.result();
if(result== null){
resultFuture.complete(null);
return;
}
else {
// 设置请求完成时的回调: 将结果传递给 collector
resultFuture.complete(Collections.singleton(result));
}
}else if(res.failed()) {
resultFuture.complete(null);
return;
}
});
}
@Override
public void timeout(String input, ResultFuture resultFuture) throws Exception {
}
@Override
public void close() throws Exception {
super.close();
if (redisClient != null) {
redisClient.close(null);
}
}
边栏推荐
猜你喜欢

5. DOM

00 testers of seasoning after nearly a year, whether to change careers or to learn the software testing students summarized the following heart advice

我为何从开发人员转做测试,3年软件测试工程师,带你聊聊这其中的秘辛

Mac 中 MySQL 的安装与卸载

Still saying software testing doesn't have a midlife crisis?9 years of test engineers were eliminated

手把手教你写让人眼前一亮的软件测试简历,收不到面试邀请算我输
4位资深专家多年大厂经验分享出Flink技术内幕架构设计与实现原理

CVE-2022-33891 Apache Spark 命令注入复现

浅析显卡市场的未来走向:现在可以抄底了吗?

国内数字藏品的乱象与未来
随机推荐
【Vue.js 3.0源码】KeepAlive 组件:如何让组件在内存中缓存和调度?
Flask框架——Flask-SQLite数据库
5. DOM
ECCV 2022 | Towards Data Efficient Transformer Object Detectors
桌面软件开发框架大赏
浅析显卡市场的未来走向:现在可以抄底了吗?
[机缘参悟-53]:《素书》-3-修身养志[求人之志章第三]
2022年,目前大环境下还适合转行软件测试吗?
Desktop Software Development Framework Awards
开始学习C语言了
BI-SQL丨WHILE
DDS Arbitrary Waveform Output Based on FPGA
CS内网横向移动 模拟渗透实操 超详细
Why do software testing have to learn automation?Talk about the value of automated testing in my eyes
惊艳!京东T8纯手码的Redis核心原理手册,基础与源码齐下
00 testers of seasoning after nearly a year, whether to change careers or to learn the software testing students summarized the following heart advice
网站添加能换装可互动的live 2d看板娘
Metaverse Post Office AI space-themed series of digital collections will be launched at 10:00 on July 30th "Yuanyou Digital Collection"
A new generation of open source free terminal tools, so cool
基于5G的仓储信息化解决方案2022