Добро пожаловать в форум, Guest  >>   Войти | Регистрация | Поиск | Правила | В избранное | Подписаться
Все форумы / Java Новый топик    Ответить
 Подскажите по rxJava  [new]
bobo96
Member

Откуда:
Сообщений: 122
Всем привет, не могу разобраться с rxJava.
Элементарная (на первый взгляд) задача: пройтись по массиву с ip адресами и найти первый доступный.
Код:

        Subscription subscription = Observable
                .from(%ARRAY%)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onCompleted() {
                        Log.d("mylog", "onCompleted ");
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d("mylog", "onError " + e.getMessage());
                    }

                    @Override
                    public void onNext(String s) {
                        Log.d("mylog", "onNext " + s);

                        String[] serverData = s.split(":");
                        Socket socket = new Socket();

                        try {
                            socket.connect(new InetSocketAddress(
                                            InetAddress.getByName(serverData[0]),
                                            Integer.parseInt(serverData[1])), 2000);
                        } catch (IOException e) {
                        }
                    }
                });


В логах имеем:
D/mylog: onNext 192.168.0.103:7799
D/mylog: onError null

То есть доходит до первого недоступного и останавливается.
Интернеты говорят, что нужно вроде как использовать onExceptionResumeNext\onErrorResumeNext. Пробовал добавлять и .onExceptionResumeNext(Observable.from(%ARRAY%)) и .onErrorResumeNext(Observable.from(%ARRAY%)) и вместе - результат в логах такой же, как и без них.
Что я делаю не так ?

Сообщение было отредактировано: 9 июн 21, 11:51
9 июн 21, 11:52    [22333201]     Ответить | Цитировать Сообщить модератору
 Re: Подскажите по rxJava  [new]
chpasha
Member

Откуда:
Сообщений: 10838
я не специалист по rx от слова вообще, но по-моему оно в onError вообще не должно попадать по-хорошему, если ты все ошибки в onNext ловишь. Возможно у тебя там вываливается что-то окромя IOException, например в Integer.parseInt или прилетает null вместо строки или IndexOutOfBounds, если в адресах IP без порта. T.e. стремится нужно не к затыканию дыр в виде "как продолжить после ошибки", а к исправлению или учету этой конкретной ошибки

Кроме того из кода не видно, где тут "первый доступный"? Где остановка при успехе? По-хорошему проверка должна где-то в filter (или аналоге) происходить, а потом должно быть что-то типа findFirst. Но вот вопрос в том, как сделать в filter асинхронную проверку. Имхо RxJava тут либо вообще мимо кассы, либо нужно делать не так (как правильно на Rx я не знаю) - стартовать всю процедуру в отдельном потоке и уже в нем проверять каждый IP по очереди блокирующим запросом. На стримах я бы сделал так

@WorkerThread /*аннотация ничего не делает, только сигнализирует, что метод должен выполняться в фоне*/
Optional<String> findAvailable()
{
      Arrays.stream(%ARRAY%)
             .filter(/*здесь проверка из OnNext, возвращающая только рабочие IP*/)
             .findFirst()          
}


P. S. погуглил коротко, с rx нужно делать примерно так (правильный синтаксис может отличаться, но идея вроде понятна)

PublishSubject<Integer> stop = PublishSubject.create();

source
.takeUntil(stop)
.doOnNext(new Action1<Integer>() {
    int calls;
    @Override
    public void call(Integer t) {
        System.out.println("Saving " + t);
        if (++calls == 3) {
            stop.onNext(1);
        }
    }
})
.subscribe();
9 июн 21, 13:31    [22333275]     Ответить | Цитировать Сообщить модератору
 Re: Подскажите по rxJava  [new]
bobo96
Member

Откуда:
Сообщений: 122
chpasha,

Ок, будем вникать дальше, спасибо!
9 июн 21, 13:53    [22333287]     Ответить | Цитировать Сообщить модератору
 Re: Подскажите по rxJava  [new]
mayton
Member

Откуда: loopback
Сообщений: 51389
Возможно - долгий таймаут по недоступности. А добавь логгирование ошибок.

try {
                            socket.connect(new InetSocketAddress(
                                            InetAddress.getByName(serverData[0]),
                                            Integer.parseInt(serverData[1])), 2000);
                        } catch (IOException e) {
                            Log.error(.....);
                        }
9 июн 21, 13:54    [22333288]     Ответить | Цитировать Сообщить модератору
 Re: Подскажите по rxJava  [new]
Garrick
Member

Откуда: Москва
Сообщений: 3106
bobo96,

Самое первое - неправильно вот это

автор
 } catch (IOException e) {
 }


Поэтому ни фига и непонятно что не работает.
9 июн 21, 14:13    [22333311]     Ответить | Цитировать Сообщить модератору
 Re: Подскажите по rxJava  [new]
mayton
Member

Откуда: loopback
Сообщений: 51389
Возможно вот эта ловушка и не ловит никаких ошибок потому что не делается re-throw ошибк наверх.

                    public void onError(Throwable e) {
                        Log.d("mylog", "onError " + e.getMessage());
                    }
9 июн 21, 14:22    [22333323]     Ответить | Цитировать Сообщить модератору
 Re: Подскажите по rxJava  [new]
bobo96
Member

Откуда:
Сообщений: 122
Спасибо за ответы. В общем ситуация такая (сам я тоже с этой либой раньше не работал): изначально у меня была такая зависимость - implementation group: 'io.reactivex', name: 'rxjava', version: '1.3.8' (кодинг под андроид если что), потом, попав вроде как на офф. сайт проекта заимплементил другие либы - implementation 'io.reactivex.rxjava3:rxandroid:3.0.0', implementation 'io.reactivex.rxjava3:rxjava:3.0.0'
С ними все ок, при ошибке остановки нет! Но проблема в другом: как остановиться, когда найден ip, который отвечает ?)))
Код немного другой, на всякий случай с зависимостями:

import io.reactivex.rxjava3.android.schedulers.AndroidSchedulers;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.schedulers.Schedulers;

        Observable.fromArray(mKeeper.getConfig().getSERVERS())
                .subscribeOn(AndroidSchedulers.mainThread())
                .observeOn(Schedulers.newThread())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(@io.reactivex.rxjava3.annotations.NonNull Disposable d) {
                        Log.d("mylog", "onSubscribe ");
                    }

                    @Override
                    public void onNext(@io.reactivex.rxjava3.annotations.NonNull String s) {
                        String[] serverData = s.split(":");

                        Socket socket = new Socket();

                        try {
                            Log.d("mylog", "onNext " + s);
                            socket.connect(new InetSocketAddress(
                                            InetAddress.getByName(serverData[0]),
                                            Integer.parseInt(serverData[1])),
                                    mKeeper.getConfig().getSERVER_CONNECT_TIMEOUT());
                            Log.d("mylog", "Connect! ");
                        } catch (IOException e) {
                            e.printStackTrace();
                            Log.d("mylog", "catch " + e.getMessage());
                        } finally {
                            try {
                                socket.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                    }

                    @Override
                    public void onError(@io.reactivex.rxjava3.annotations.NonNull Throwable e) {
                        Log.d("mylog", "onError " + e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        Log.d("mylog", "onComplete ");
                    }
                });


Лог:
D/mylog: onSubscribe
D/mylog: onNext 192.168.43.21:7001
D/mylog: catch failed to connect to /192.168.43.21 (port 7001) from /192.168.43.1 (port 49872) after 2000ms: isConnected failed: ECONNREFUSED (Connection refused)
D/mylog: onNext 192.168.0.103:7001
D/mylog: catch failed to connect to /192.168.0.103 (port 7001) from /10.88.209.250 (port 45876) after 2000ms
D/mylog: onNext 85.12.240.55:7001
D/mylog: catch failed to connect to /85.12.xxx.xxx (port 7001) from /10.88.209.250 (port 39630) after 2000ms
D/mylog: onNext 109.195.107.114:7001
D/mylog: catch failed to connect to /109.195.xxx.xxx (port 7001) from /10.88.209.250 (port 42966) after 1999ms: isConnected failed: ECONNREFUSED (Connection refused)
D/mylog: onComplete
10 июн 21, 07:22    [22333565]     Ответить | Цитировать Сообщить модератору
 Re: Подскажите по rxJava  [new]
mayton
Member

Откуда: loopback
Сообщений: 51389
А "реактивность" вообще предполагает такое понятие как остановиться?
10 июн 21, 11:26    [22333651]     Ответить | Цитировать Сообщить модератору
 Re: Подскажите по rxJava  [new]
bobo96
Member

Откуда:
Сообщений: 122
mayton, без понятия, но логично предположить, что такая возможность должна быть.
10 июн 21, 12:54    [22333723]     Ответить | Цитировать Сообщить модератору
 Re: Подскажите по rxJava  [new]
mayton
Member

Откуда: loopback
Сообщений: 51389
Можно вызвать System.exit(). Это прервёт java-процесс и процесс операционки. Но можно просто
посмотреть в сторону отказа от реактивности как от шаблона подобных разработок. Я не имею
ничего против Р. но очевидно что данная задача имеет строго выраженое начало и конец. Массив - конечен.
10 июн 21, 12:59    [22333730]     Ответить | Цитировать Сообщить модератору
Все форумы / Java Ответить