RxJava的简单使用

RxJava已经火了一年多了,作为一个安卓开发再不去接触的话自己真的对不起自己的这份工作啊。在github中Rxjava是这样介绍的:RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.翻译过来意思指的是rxjava是java虚拟机实现:无扩展为组成的异步事件通过观测序列的基础程序库。从字面上看我就了解过异步与观察。所以我就从异步与观察入手慢慢的学习吧。

RxJava的异步实现是通过观察者模式实现的

其中主要实现过程:Observer(观察者)–>subscribe(订阅)–>Event(Observable某个状态),其中连接两者的桥梁便是subscribe订阅。通过订阅事件便可以及时收到事件改变的通知。在被观察者出现事件的某个状态的时候观察者通过订阅便会第一时间收到通知。

RxJava常用操作符的简单使用

just(创建操作):将一个或多个对象转换成发射这个或这些对象的一个Observable
Observable.just("1", "2", "3")
.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
Log.e(TAG, "complete");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, e.getMessage());
}
@Override
public void onNext(String s) {
Log.e(TAG, s);
}
});
//输出结果
1
2
3
complete
from(创建操作):将一个Iterable, 一个Future, 或者一个数组转换成一个Observable
Observable.from(new String[]{"1", "2", "3"})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e(TAG, s);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
Log.e(TAG, throwable.getMessage());
}
}, new Action0() {
@Override
public void call() {
Log.e(TAG, "complete");
}
});
//输出结果
1
2
3
complete
create(创建操作):使用一个函数从头开始创建一个Observable
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try {
for (int i = 0; i < 3; i++) {
subscriber.onNext(String.valueOf(i));
}
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(e);
}
}
}).subscribe(new Observer<String>() {
@Override
public void onCompleted() {
Log.e(TAG, "complete");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, e.getMessage());
}
@Override
public void onNext(String s) {
Log.e(TAG, s);
}
});
//输出结果
0
1
2
complete
timer(创建操作):创建在一个指定的延迟之后发射单个数据的Observable
Observable
.timer(2, TimeUnit.SECONDS, AndroidSchedulers.mainThread())
.map(new Func1<Long, Object>() {
@Override
public Object call(Long aLong) {
Log.e(TAG,"is time");//do something you want
return null;
}
}).subscribe();
interval(创建操作):创建一个定时发射整数序列的Observable
//创建一个Subscriber,10s后停止订阅
Subscriber<Long> mySubscriber = new Subscriber<Long>() {
@Override
public void onNext(Long s) {
Log.e(TAG, String.valueOf(s));
if (s.equals(10L))
mySubscriber.unsubscribe();
}
@Override
public void onCompleted() { }
@Override
public void onError(Throwable e) { }
};
Observable
.interval(0,1, TimeUnit.SECONDS)
.subscribe(mySubscriber);
map(变换操作):将一个或多个项转变为其他类型的数据类型
//通过imageview的((BitmapDrawable) drawable).getBitmap()方法,将其获取的bitmap映射出去
Observable
.just(imageView.getDrawable())
.map(new Func1<Drawable, Bitmap>() {
@Override
public Bitmap call(Drawable drawable) {
return ((BitmapDrawable) drawable).getBitmap();
}
})
.subscribe(new Observer<Bitmap>() {
@Override
public void onCompleted() {
Log.e(TAG,"complete");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Bitmap bitmap) {
Log.e(TAG, String.valueOf(bitmap));
}
});
flatmap(变换操作):flatMap将Observable的数据转换成一个或多个Observable
//打印出每个学生所需要修的所有课程的名称
List<Student> students = new ArrayList<>();
for (int i = 0; i < 3; i++) {
List<Course> courses = new ArrayList<>();
courses.add(new Course("c" + String.valueOf(i)));
courses.add(new Course("c" + String.valueOf(i+1)));
students.add(new Student("s" + String.valueOf(i), courses));
}
Observable
.from(students)
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
return Observable.from(student.courses);
}
})
.subscribe(new Observer<Course>() {
@Override
public void onCompleted() {
Log.e(TAG, "complete");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Course course) {
Log.e(TAG, course.name);
}
});
//输出结果
c0
c1
c1
c2
c2
c3
complete
filter(过滤操作)
//打印出每个学生所需要修的所有课程的名称,并过滤c2的课程
List<Student> students = new ArrayList<>();
for (int i = 0; i < 3; i++) {
List<Course> courses = new ArrayList<>();
courses.add(new Course("c" + String.valueOf(i)));
courses.add(new Course("c" + String.valueOf(i+1)));
students.add(new Student("s" + String.valueOf(i), courses));
}
Observable
.from(students)
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
return Observable.from(student.courses);
}
})
.subscribe(new Observer<Course>() {
@Override
public void onCompleted() {
Log.e(TAG, "complete");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Course course) {
Log.e(TAG, course.name);
}
});
//输出结果
c0
c1
c1
c3
complete
doOnSubscribe(辅助操作)
Observable
.create(new Observable.OnSubscribe<Boolean>() {
@Override
public void call(final Subscriber<? super Boolean> subscriber) {
Observable
.timer(3, TimeUnit.SECONDS)
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
subscriber.onNext(true);
}
});
}
})
.subscribeOn(Schedulers.io())// 指定subscribeOn在IO线程
.doOnSubscribe(new Action0() {
@Override
public void call() {
progressBar.setVisibility(View.VISIBLE);
}
})
.subscribeOn(AndroidSchedulers.mainThread())//让上面的doOnSubscribe在主线程执行
.observeOn(AndroidSchedulers.mainThread())//指定在主线成观察
.subscribe(new Action1<Boolean>() {
@Override
public void call(Boolean aBoolean) {
if (aBoolean)
progressBar.setVisibility(View.GONE);
}
});

RxJava的线程控制器

  • 当前线程:Schedulers.immediate()
  • 新线程:Schedulers.newThread()
  • IO线程:Schedulers.io()
  • UI线程:AndroidSchedulers.mainThread()
Observable
.just(1,2,3)
.subscribeOn(Schedulers.io())// 指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread())// 指定回调 发生在 UI 线程
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e(TAG, String.valueOf(integer));
}
});

RxJava实践

现在有这么一个需求:用户选择图片之后将图片裁剪成正方形,及时裁剪图片正中央的那一块,然后将路径回传给上一个页面。实现更换头像功能。在以前我们可以通过AsyncTask处理类似这样的需求,但是RxJava也是异步而且使用方法简单。我们就使用RxJava实现这个功能:首先我们获取图片的绝对路径通过转换为Bitmap再转换成绝对路径。中间过程在UI线程有个进度提示。下面是具体的主要代码实现。其中也有注释。有些方法你可以自己去实现就不贴代码了。

Observable
.just(avatarPath)//创建
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String s) {
return BitmapFactory.decodeFile(s);//将
}
})//将图片路径变换位Bitmap
.subscribeOn(Schedulers.io())//指定subscribeOn在IO线程
.doOnSubscribe(new Action0() {
@Override
public void call() {
showProressDialog();//进度提示
}
})//辅助操作
.observeOn(AndroidSchedulers.mainThread())//UI线程观察
.map(new Func1<Bitmap, String>() {
@Override
public String call(Bitmap bitmap) {
return BitmapUtils.centerSquareScaleBitmap(bitmap, mScreenWidth);//裁剪功能并返回裁剪后的图片路径
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
//回传绝对路径
hideProgressDialog();
Bundle bundle = new Bundle();
bundle.putBoolean(FROM_PHOTO, true);
bundle.putString(PATH, s);
backWithResultThenKill(bundle);
}
});

注意:
添加依赖:
compile ‘io.reactivex:rxandroid:1.2.1’
compile ‘io.reactivex:rxjava:1.1.6’

由于Rxjava功能太过强大了,以至于太多的功能不能一一列出来。后续补充。

坚持原创技术分享,您的支持将鼓励我继续创作!