[go: nahoru, domu]

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add channel class and task method for parallel processing in mruby #1336

Merged
merged 41 commits into from
Nov 30, 2017

Conversation

naritta
Copy link
Contributor
@naritta naritta commented Jun 22, 2017

This PR is for #1329

TODO:

  • add queue for when ch.push is called before ch.shift
  • add tests
  • use script to create embedded code

Copy link
Member
@kazuho kazuho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for starting your work on the issue.

I am impressed by the fact that you have already succeeded in extending the asynchronous operations that we make. Nice work!

I have left a design comment in-line, please let me know what you think (it's a rough idea and I might be wrong).

@@ -0,0 +1,124 @@
/*
* Copyright (c) 2015-2016 DeNA Co., Ltd., Kazuho Oku
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change the name to that of yours, and adjust copyright year to the date when you wrote or modified the code (in this case it should be 2017).

I would like to have your name printed on the source code (as well as on the README if your work gets merged).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I appreciate your kindness.
I fixed it here: 60f1d0e

@@ -177,10 +182,18 @@ mrb_value h2o_mruby_http_join_response_callback(h2o_mruby_context_t *ctx, mrb_va
int *next_action);
mrb_value h2o_mruby_http_fetch_chunk_callback(h2o_mruby_context_t *ctx, mrb_value receiver, mrb_value input,
int *next_action);
mrb_value h2o_mruby_channel_shift_callback(h2o_mruby_context_t *ctx, mrb_value receiver, mrb_value input,
int *next_action);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about retaining the queue in mruby side, while restricting use of the C code to when signalling between fibers is necessary?

I believe that doing so would simplify the code and will give us more flexibility in extending the object (for example, it would be easier to add a peek method).

class Channel
  def initialize()
    @queue = []
  end
  def push(o)
    @queue << o
    self._notify
  end
  def shift
    while true
      if !@queue.empty?
        return @queue.shift
      end
      self._wait
    end
  end
  // _notify and _wait are C methods that are called via fiber switching
end

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this idea, because it's a little complex to deal with queue in C.
Then I fixed it like below. (I'll add rb file and use script to create embedded code soon.)
064a44b

・For _h2o__channel_wait, it seems we can't call fiber yield ruby code from C, then I wrote to call _h2o__channel_wait directly.
・For initializing Channel object, maybe there is better or more simple way, then I'm willing to fix it.

@naritta
Copy link
Contributor Author
naritta commented Jun 27, 2017

@kazuho Thank you for the comments. I'm really grateful to be involved with such great product.

I added task method here (b4b6ede), and it works like below.

・When task with block is called, block is processed firstly.

・In the block, when async function is called, the fiber will be yielded in this method, and pass the resumer and object to context. And application continues running after called running until async function is called in application itself without task.

・When event is called, it's called from mruby_run_fiber as usual and it returns output into mruby_run_fiber. (From second time for calling async function in the block, it doesn't have to call register_receiver because it's called in mruby_run_fiber as usual.)

・When the fiber ends, it returns 0 to mruby_run_fiber method and finish running.

Perhaps it can be improved, then please let me know when there is something. I'm willing to fix it.

@naritta naritta changed the title [WIP] Add http request callback function and channel class for parallel http response processing Add http request callback function and channel class for parallel http response processing Jul 1, 2017
@naritta naritta changed the title Add http request callback function and channel class for parallel http response processing Add channel class and task method for parallel processing in mruby Jul 11, 2017
@naritta
Copy link
Contributor Author
naritta commented Jul 11, 2017

@kazuho @i110
I've implemented channel class and task method.
Could you review it when you have time?

@i110
Copy link
Contributor
i110 commented Jul 17, 2017

@naritta sorry for being late. I'll review this on Wednesday

@naritta
Copy link
Contributor Author
naritta commented Jul 31, 2017

@i110 @kazuho
Sorry to bother though you are so busy, but I'm grateful if you could review this because I want to use h2o with this function in my company product.

@tagomoris
Copy link
Contributor
tagomoris commented Aug 18, 2017

Are there any progress on this change?

@i110
Copy link
Contributor
i110 commented Sep 5, 2017

@naritta @tagomoris Sorry for my long long absence! I reviewed and added some inline comments.
By the way, when the discussion gets complicated, please feel free to use japanese to clalify the discussion.

@naritta
Copy link
Contributor Author
naritta commented Sep 5, 2017

@i110 I can't see your review comments anywhere, Where can I see it?

/* handler/mruby/channel.c */
void h2o_mruby_channel_init_context(h2o_mruby_shared_context_t *ctx);

mrb_value h2o_mruby_channel_shift_callback(h2o_mruby_context_t *ctx, mrb_value receiver, mrb_value input,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duplicated definition with L185?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I deleted duplicated definition.
792f3f1

return mrb_nil_value();
}

static mrb_value create_channel_method(mrb_state *mrb, mrb_value self)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about defining normal initialize method and move these code into that? If so, you can simply call H2O::Channel.new instead of defining and calling _init method. Then if you still want handy create_channel method under Kernel, you can define it by using H2O::Channel.new (but IMO it's not so useful)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fixed to define normal initialize method for initializing channel.
4c17ebd

h2o_mruby_run_fiber(ctx->ctx, detach_receiver(ctx), mrb_nil_value(), NULL);
h2o_mruby_shared_context_t *shared_ctx = mrb->ud;
/* When it's called in task, retrieve current_context for next action in task */
shared_ctx->current_context = ctx->ctx;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate why this is needed? I haven't understand the purpose for this clearly, but it seems to be something wrong for me

Copy link
Contributor Author
@naritta naritta Oct 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When creating task like this:
task { http_request(req_url).join; ch.push "hello"; http_request(req_url).join; }

channel_notify_method calls h2o_mruby_run_fiber, and sets current_context NULL in this h2o_mruby_run_fiber.
And after h2o_mruby_run_fiber, it continues processing task block, but current_context is NULL, then http_request_method cannot get current_context.

I think another way is not setting current_context NULL if it's call by channel_notify_method.
If there are other better way, please tell me. I'll fix it.

Copy link
Contributor
@i110 i110 Oct 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most straightforward solution might be doing like this in h2o_mruby_run_fiber:

h2o_mruby_context_t *old = ctx->shared->current_context;
ctx->shared->current_context = ctx;
/* ... */
ctx->shared->current_context = old;

But in my thoughts, it's better to eliminate nested h2o_mruby_run_fiber call. How about using yield to switch fibers and use the run loop in h2o_mruby_run_fiber function instead of nested calls?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to use yield to switch fibers and use the run loop in h2o_mruby_run_fiber, but it says
mruby raised: (eval):5: can't cross C function boundary (FiberError)
It seems impossible to yield from mruby->C->mruby.
If my understanding about this is not correct, please tell me.
Or I'll fix to use straightforward solution above.

if !@queue.empty?
return @queue.shift
end
_h2o__channel_wait(self)
Copy link
Contributor
@i110 i110 Sep 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for very trivial point, but I feel that it's clearer and safer to remove while loop.
After the fiber gets resumed, it's guaranteed that @queue variable is not empty, so how about the following code?

def shift
  if @queue.empty?
    _h2o__channel_wait(self)
  end
  raise 'oops' if @queue.empty?
  @queue.shift
end

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fixed not to use needless loop.
82fa91d

block.call
# For when it's called in h2o_mruby_run_fiber and return output,
# or block doesn't have asynchronous callback
Fiber.yield([0, nil, nil])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about simply returns [0, nil, nil] instead of yield ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fixed to simply returns [0, nil, nil]
ea711f5

klass = fiber_res[2][0]
# This should be called only one time.
# After that, the fiber is called in mruby_run_fiber and it register receiver in it.
klass.register_receiver(receiver, klass)
Copy link
Contributor
@i110 i110 Sep 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please let me ask some questions around here:

  • What does klass means? For example, in channel usage, fiber_res[2][0] seems to be H2O::Channel 's instance, not class.
  • Is it guaranteed that the klass object respond_to? :register_receiver ? I mean that many asynchronous functions can be called in task block, and HTTP response body streaming is one of the examples. So if I understand correctly, http_request(..).join[2].join fails (but sorry I haven't tried)
  • When the block yileds (calls defined callback method), there may be some args (i.e. fiber_res[2][1], fiber_res[2][2]..) How to handle that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does klass means? For example, in channel usage, fiber_res[2][0] seems to be H2O::Channel 's instance, not class.

I think we use klass for instance name in ruby usage. If others is better, I'll fix it.
https://stackoverflow.com/questions/4299289/what-is-the-difference-between-class-and-klass-in-ruby

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it guaranteed that the klass object respond_to? :register_receiver ? I mean that many asynchronous functions can be called in task block, and HTTP response body streaming is one of the examples. So if I understand correctly, http_request(..).join[2].join fails (but sorry I haven't tried)

It's not guaranteed and we must create register_receiver when we create new class having async function. (I forgot to add for HttpInputStream, then I added here.)
aa53e90

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the block yileds (calls defined callback method), there may be some args (i.e. fiber_res[2][1], fiber_res[2][2]..) How to handle that?

sorry, I realized h2o_mruby_http_fetch_chunk_callback are using args other than self in new code now.
How about this idea?:

  1. move parts using args to new function named reserved word like process_args(args)
  2. If there are some value on args other than self(index 0) in async function, call process_args(args)
  3. do same thing for task function calling process_args(args)

For example, In case of _h2o__http_fetch_chunk,

  1. move this(https://github.com/h2o/h2o/blob/master/lib/handler/mruby/http_request.c#L463-L471) from h2o_mruby_http_fetch_chunk_callback to new function process_args, and call process_args
  2. In task function, if there is common same named function process_args.

In this way, we don't have to create branch in task function like mruby_run_fiber because of using same named function, but we have to care when we add function using args.

struct st_h2o_mruby_http_request_context_t *ctx;

mrb_value receiver;
mrb_value self_obj;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it possible to use self for some reason?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had mistakes, I fixed it to use self.
6316078

@i110
Copy link
Contributor
i110 commented Sep 5, 2017

@naritta Sorry I forgot to submit reviews.

@naritta
Copy link
Contributor Author
naritta commented Sep 5, 2017

Thank you for reviewing, I'll fix it.

@i110 i110 mentioned this pull request Sep 7, 2017
5 tasks
@naritta
Copy link
Contributor Author
naritta commented Oct 12, 2017

@i110 Thank you for reviewing and sorry to be late, I fixed them.
Could you review it when you have time?

@i110
Copy link
Contributor
i110 commented Oct 21, 2017

@naritta Thank you for working on this.

Regarding the current design, I feel that it's けっこうだるい to define register_receiver function for every asynchronous features (like redis, sleep, etc..), but otherwise it can't be used in task method. Also, it's some kind of implicit conventions that the implementors must be aware of.

How about doing like this? I think this is much simpler than the current implementation, because all of the things required for task feature can be done by this small patch, without introducing any new C functions and conventions of that!

diff --git a/include/h2o/mruby_.h b/include/h2o/mruby_.h
index 37fa4ada..d045faef 100644
--- a/include/h2o/mruby_.h
+++ b/include/h2o/mruby_.h
@@ -128,6 +128,7 @@ typedef struct st_h2o_mruby_generator_t {
 #define H2O_MRUBY_CALLBACK_ID_HTTP_FETCH_CHUNK -6
 #define H2O_MRUBY_CALLBACK_ID_REDIS_JOIN_REPLY -7
 #define H2O_MRUBY_CALLBACK_ID_SLEEP -999
+#define H2O_MRUBY_CALLBACK_ID_RUN_CHILD_FIBER -777

 #define h2o_mruby_assert(mrb)                                                                                                      \
     if (mrb->exc != NULL)                                                                                                          \
diff --git a/lib/handler/mruby.c b/lib/handler/mruby.c
index 0f3361b6..003e35e9 100644
--- a/lib/handler/mruby.c
+++ b/lib/handler/mruby.c
@@ -759,6 +759,8 @@ void h2o_mruby_run_fiber(h2o_mruby_context_t *ctx, mrb_value receiver, mrb_value
     mrb_int status;
     h2o_mruby_generator_t *generator = NULL;

+    mrb_value resumers = mrb_ary_new(mrb); /* TODO this array should be re-used, protected from gc, and asserted being empty here*/
+
     while (1) {
         /* send input to fiber */
         output = mrb_funcall_argv(mrb, receiver, ctx->shared->symbols.sym_call, 1, &input);
@@ -775,7 +777,7 @@ void h2o_mruby_run_fiber(h2o_mruby_context_t *ctx, mrb_value receiver, mrb_value
         status = mrb_fixnum(v);

         /* if no special actions were necessary, then the output is a rack response */
-        if (status >= 0)
+        if (status > 0)
             break;

         /* take special action depending on the status code */
@@ -832,13 +834,24 @@ void h2o_mruby_run_fiber(h2o_mruby_context_t *ctx, mrb_value receiver, mrb_value
             case H2O_MRUBY_CALLBACK_ID_SLEEP:
                 input = h2o_mruby_sleep_callback(ctx, receiver, args, &run_again);
                 break;
+            case H2O_MRUBY_CALLBACK_ID_RUN_CHILD_FIBER: {
+                mrb_value resumer = mrb_ary_entry(args, 0);
+                mrb_ary_push(mrb, resumers, resumer);
+                input = mrb_nil_value();
+                run_again = 1;
+                break;
+            }
             default:
                 input = mrb_exc_new_str_lit(mrb, E_RUNTIME_ERROR, "unexpected callback id sent from rack app");
                 run_again = 1;
                 break;
             }
-            if (run_again == 0)
-                goto Exit;
+            if (run_again == 0) {
+                if (RARRAY_LEN(resumers) == 0)
+                    goto Exit;
+                input = mrb_nil_value();
+                receiver = mrb_ary_pop(mrb, resumers);
+            }

         } else {
             input = mrb_exc_new_str_lit(mrb, E_RUNTIME_ERROR, "callback from rack app did not receive an array arg");
diff --git a/lib/handler/mruby/embedded/core.rb b/lib/handler/mruby/embedded/core.rb
index e0f3a5c7..17655caf 100644
--- a/lib/handler/mruby/embedded/core.rb
+++ b/lib/handler/mruby/embedded/core.rb
@@ -29,6 +29,15 @@ module Kernel
   H2O_CALLBACK_ID_EXCEPTION_RAISED = -1
   H2O_CALLBACK_ID_CONFIGURING_APP  = -2
   H2O_CALLBACK_ID_CONFIGURED_APP   = -3
+  H2O_CALLBACK_ID_RUN_CHILD_FIBER  = -777
+
+  def task(&block)
+    fiber = Fiber.new {
+      block.call
+      [0]
+    }
+    Fiber.yield([H2O_CALLBACK_ID_RUN_CHILD_FIBER, proc { fiber.resume }, [_h2o_create_resumer()]])
+  end

   def _h2o_define_callback(name, id)
     Kernel.define_method(name) do |*args|

for example:

        mruby.handler: |
          puts "# before defining task.."
          task {
            3.times {|i|
              puts "# before http_request (#{i}).."
              status, headers, body = http_request('http://example.com/').join
              puts "# after http_request (#{i}) (status + #{status})"
            }
          }
          puts "# after defining task, will finish configuration phase!"

          proc {|env|
            [200, {}, ["hello\n"]]
          }

outputs the following.

# before defining task..
# before http_request (0)..
# after defining task, will finish configuration phase!
# after http_request (0) (status + 200)
# before http_request (1)..
# after http_request (1) (status + 200)
# before http_request (2)..
# after http_request (2) (status + 200)

But sorry I had no time to verify my idea and impl, so I would appreciate if you could verify and make it trusty. Of cause before that, please let me know your idea on this 🍣

And please merge the latest master when you get a chance!

if @queue.empty?
_h2o__channel_wait(self)
end
begin
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this begin ~ rescue is not necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I deleted begin ~ rescue
7a05215

h2o_mruby_run_fiber(ctx->ctx, detach_receiver(ctx), mrb_nil_value(), NULL);
h2o_mruby_shared_context_t *shared_ctx = mrb->ud;
/* When it's called in task, retrieve current_context for next action in task */
shared_ctx->current_context = ctx->ctx;
Copy link
Contributor
@i110 i110 Oct 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most straightforward solution might be doing like this in h2o_mruby_run_fiber:

h2o_mruby_context_t *old = ctx->shared->current_context;
ctx->shared->current_context = ctx;
/* ... */
ctx->shared->current_context = old;

But in my thoughts, it's better to eliminate nested h2o_mruby_run_fiber call. How about using yield to switch fibers and use the run loop in h2o_mruby_run_fiber function instead of nested calls?

h2o_mruby_shared_context_t *shared_ctx = mrb->ud;

struct st_h2o_mruby_channel_context_t *ctx;
ctx = h2o_mem_alloc(sizeof(*ctx));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to leak. You have to free this in on_gc_dispose_channel

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fixed to free ctx and check null of values of that.
05790cd

@@ -121,6 +125,7 @@ typedef struct st_h2o_mruby_generator_t {
#define H2O_MRUBY_CALLBACK_ID_SEND_CHUNKED_EOS -4
#define H2O_MRUBY_CALLBACK_ID_HTTP_JOIN_RESPONSE -5
#define H2O_MRUBY_CALLBACK_ID_HTTP_FETCH_CHUNK -6
#define H2O_MRUBY_CALLBACK_ID_CHANNEL_SHIFT -7
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

H2O_MRUBY_CALLBACK_ID_CHANNEL_WAIT might be better name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fixed to use name H2O_MRUBY_CALLBACK_ID_CHANNEL_WAIT
36f718e

static mrb_value channel_notify_method(mrb_state *mrb, mrb_value self)
{
struct st_h2o_mruby_channel_context_t *ctx;
ctx = DATA_PTR(self);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's better to use mrb_data_check_get_ptr instead of DATA_PTR macro, or at least be consistent

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fixed to use mrb_data_check_get_ptr
69a4baf

is $stdout, 1;
};

plan skip_all => 'mruby support is off'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JFYI, you don't have to call skip_all multiple times

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I deleted needless skip_all
8f4c9bf

/:
mruby.handler: |
Proc.new do |env|
ch = create_channel
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

H2O::Channel.new?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fix to use new for creating instances.
7edafbc

t/50mruby-task.t Outdated
is $stdout, "123456";
};

done_testing();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would appreciate if you could add the tests which joins response body :D

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a test for that.
250acdd

@naritta
Copy link
Contributor Author
naritta commented Nov 2, 2017

@i110 Thank you for sharing great idea about deleting register_receiver. I could make sure it works well.
Then I fixed to use this and deleted register_receiver.
25373b4

And also, I fixed other points. Could you review them when you have time?
(and I rebased master branch.)

@i110 i110 mentioned this pull request Nov 20, 2017
@i110
Copy link
Contributor
i110 commented Nov 20, 2017

@naritta @tagomoris Sorry for my belated response.. I filed naritta#1 to your forked repo, which merges the latest master and contains some fixes. If it's merged, the changes will appear here :D

@naritta
Copy link
Contributor Author
naritta commented Nov 21, 2017

@i110 Thank you for reviewing and fixing. I could make sure it works well.


mrb_gc_protect(mrb, receiver);
mrb_gc_protect(mrb, input);
}

if (status == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I forgot to remove this check and goto. I introduced _h2o__finish_child_fiber, so this is no longer needed. Could you remove this?

@i110
Copy link
Contributor
i110 commented Nov 21, 2017

From my viewpoint this PR seems to be ready to merged. @kazuho Any thoughts?

Copy link
Member
@kazuho kazuho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fascinated by how cleanly this big new feature is implemented. Thank you for working on it. I believe that the code is very near to getting merged.

I have left my comments in-line. Please let me know what you think.

" def task(&block)\n" \
" fiber = Fiber.new do\n" \
" block.call\n" \
" _h2o__finish_child_fiber()\n" \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind elaborating what happens if block.call raises an exception? My expectation is that h2o__finish_child_fiber() will not be called in such case.

If calling h2o__finish_child_fiber() makes any difference, then I believe that we should call the function even if block.call raises an error. Or if h2o__finish_child_fiber() is a noop, can we eliminate the function?

Copy link
Contributor Author
@naritta naritta Nov 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, entire application will stop when block.call raises.
Then I fixed to call _h2o__send_error and not to call _h2o__finish_child_fiber intentionally.
52ea223
By this, task will raise an error and only task dies but parent application will not stop when fiber in task raises.
If this design is wrong and I should make it parent application stop when task raises, I'll fix it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perfect! :D


static void attach_receiver(struct st_h2o_mruby_channel_context_t *ctx, mrb_value receiver)
{
assert(mrb_nil_p(ctx->receiver));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please elaborate how we ensure that a Channel will have at most one listeners?

I couldn't find code that provides such guarantee, and wonder what happens if two fibers call shift of one channel object.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PS. Please note that I am not suggesting that we should support multiple readers. Supporting only one reader is fine, but an error in mruby script should be reported as an mruby exception; not an abrupt termination of the server process.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fixed to raise error when shift is called though it's already registered.
2cf40c2

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@naritta Would you mind supporting multiple listener (shifter)? Discussed with @kazuho today, it may be useful in minor cases and can be easily implemented .

To implement that, simply do:

  • change receiver to receivers (Array)
  • use mrb_ary_push instead of attach_receiver
  • use mrb_ary_shift instead of detach_receiver

How do you feel?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds cool. I fixed to support multiple listener like above.
916cab1

{
struct st_h2o_mruby_channel_context_t *ctx = _ctx;
assert(ctx != NULL); /* ctx can only be disposed by gc, so data binding has been never removed */
free(ctx);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to call mrb_gc_unregister(mrb, ctx->receiver)?

I believe that the question boils down to if there is a chance in GC collecting the channel object while a receiver is being attached to a object; and to me, something like below seems to trigger such condition:

task {
  H2O::Channel.new.shift
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fixed to call mrb_gc_unregister.
da33b13

@naritta
Copy link
Contributor Author
naritta commented Nov 29, 2017

@kazuho Thank you for reviewing. I fixed them and I commented. Could you check it?

{
struct st_h2o_mruby_channel_context_t *ctx = _ctx;
assert(ctx != NULL); /* ctx can only be disposed by gc, so data binding has been never removed */
mrb_gc_unregister(mrb, ctx->receiver);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a check that ensures ctx->receiver is not nil? i.e. if (!mrb_nil_p(ctx->receiver))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fixed to check if ctx->receiver is not nil before mrb_gc_unregister .
4312b53

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this time we definitely set receivers in initialization, so this nil check you newly added is no longer needed. Sorry for the confusion 😵

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I forgot to check it. I deleted it.
03c0a42

@naritta
Copy link
Contributor Author
naritta commented Nov 30, 2017

@i110 Thank you for reviewing. I fixed to support multiple listener and could you check it?

@@ -73,7 +56,7 @@ static mrb_value channel_initialize_method(mrb_state *mrb, mrb_value self)
memset(ctx, 0, sizeof(*ctx));
assert(shared_ctx->current_context != NULL);
ctx->ctx = shared_ctx->current_context;
ctx->receiver = mrb_nil_value();
ctx->receivers = mrb_ary_new(mrb);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

receivers should be mrb_gc_register ed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I see. Do you think it should be mrb_gc_register or mrb_gc_protect after shited also?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I don't understand your question. mrb_gc_registerとmrb_gc_protectのどちらを使うべきかという意味ですか?であれば、ここではregisterのほうですね。 (protectだとgc arena indexがrestoreされたらgcされてしまうので)
それともshiftされたobjectも保護すべきかどうか、という質問ですかね? (shited => shifted?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for confusing.
shiftされたobjectも保護すべきかどうかという質問です。

Copy link
Contributor Author
@naritta naritta Nov 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it enough becausemrb_gc_arena_save and mrb_gc_arena_restore is done here?
https://github.com/naritta/h2o/blob/916cab15f8f88cfb608b769379f6af4139fb8612/lib/handler/mruby/channel.c#L74-L77

Copy link
Contributor
@i110 i110 Nov 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I think you have to call mrb_gc_protect(mrb, shifted_receiver) immediately after shifting from receiver array. Otherwise it can be GCed in mrb_funcall function before call method of that Proc is called.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I see. I fixed to call mrb_gc_protect here.
8926f7d

@i110 i110 merged commit d99c4be into h2o:master Nov 30, 2017
@i110
Copy link
Contributor
i110 commented Nov 30, 2017

Thank you so much for your effort!

@naritta
Copy link
Contributor Author
naritta commented Nov 30, 2017

Thank you for review and great opportunity!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants