Skip to content

Commit

Permalink
fix message synchronization in app
Browse files Browse the repository at this point in the history
  • Loading branch information
Sven Fuchs committed Mar 20, 2011
1 parent c39bcf3 commit 030560e
Show file tree
Hide file tree
Showing 14 changed files with 297 additions and 109 deletions.
33 changes: 28 additions & 5 deletions app/controllers/builds_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,27 @@ def create

def update
build.update_attributes!(params[:build])
if build.matrix_expanded?

if build.was_started?
trigger('build:started')
elsif build.matrix_expanded?
build.matrix.each { |child| enqueue!(child) }
elsif build.finished?
elsif build.was_finished?
trigger('build:finished')
finished_email.deliver
end

render :nothing => true
end

def log
build.append_log!(params[:build][:log], params[:msg_id])
id, msg_id, log = params[:build].values_at(*%w(id msg_id log))

Travis::Synchronizer.receive(id, msg_id) do
build.append_log!(log)
trigger('build:log', 'log' => log, 'msg_id' => msg_id)
end

render :nothing => true
end

Expand All @@ -49,12 +60,24 @@ def payload
end

def enqueue!(build)
job = Travis::Builder.enqueue('build' => build.as_json(:for => :job), 'repository' => build.repository.as_json(:for => :job))
Pusher['jobs'].trigger('build:queued', 'build' => build.as_json(:for => :'build:queued'), 'repository' => build.repository.as_json(:for => :'build:queued'))
job = Travis::Builder.enqueue(json_for(:job))
build.update_attributes!(:job_id => job.meta_id)
trigger('build:queued')
end

def finished_email
BuildMailer.finished_email(build)
end

def trigger(event, data = {})
push(event, json_for(event).merge(data))
end

def json_for(event)
{ 'build' => build.as_json(:for => event.to_sym), 'repository' => repository.as_json(:for => event.to_sym) }
end

def push(event, data)
Pusher[event == 'build:queued' ? 'jobs' : 'repositories'].trigger(event, data)
end
end
28 changes: 6 additions & 22 deletions app/models/build.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require 'core_ext/array/flatten_once'
require 'core_ext/hash/compact'

class Build < ActiveRecord::Base
belongs_to :repository
Expand All @@ -11,9 +12,7 @@ class Build < ActiveRecord::Base
serialize :config

before_save :expand_matrix!, :if => :expand_matrix?

after_save :denormalize_to_repository, :if => :denormalize_to_repository?
after_save :sync_changes

class << self
def create_from_github_payload(data)
Expand All @@ -39,15 +38,14 @@ def started
end
end

attr_accessor :log_appended, :msg_id
attr_accessor :log_appended

def log_appended?
log_appended.present?
end

def append_log!(chars, msg_id)
def append_log!(chars)
self.log_appended = chars
self.msg_id = msg_id
update_attributes!(:log => [self.log, chars].join)
end

Expand All @@ -56,15 +54,15 @@ def started?
end

def was_started?
started? && started_at_changed?
started? && @previously_changed.keys.include?('started_at')
end

def finished?
finished_at.present?
end

def was_finished?
finished? && finished_at_changed?
finished? && @previously_changed.keys.include?('finished_at')
end

def pending?
Expand Down Expand Up @@ -103,7 +101,7 @@ def as_json(options = nil)
options ||= {}
json = super(:only => JSON_ATTRS[options[:for] || :default])
json.merge!(:matrix => matrix.as_json(:for => :'build:started')) if matrix?
json
json.compact
end

protected
Expand Down Expand Up @@ -161,18 +159,4 @@ def denormalize_to_repository
:last_build_finished_at => finished_at
)
end

def sync_changes
if was_started?
push 'build:started', 'build' => as_json(:for => :'build:started'), 'repository' => repository.as_json(:for => :'build:started')
elsif log_appended?
push 'build:log', 'build' => as_json(:for => :'build:log'), 'repository' => repository.as_json(:for => :'build:log'), 'log' => log_appended, 'msg_id' => msg_id
elsif was_finished?
push 'build:finished', 'build' => as_json(:for => :'build:finished'), 'repository' => repository.as_json(:for => :'build:finished')
end
end

def push(event, data)
Pusher['repositories'].trigger(event, data) # if Travis.pusher
end
end
3 changes: 2 additions & 1 deletion app/models/repository.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require 'uri'
require 'core_ext/hash/compact'

class Repository < ActiveRecord::Base
has_many :builds, :dependent => :delete_all, :conditions => 'parent_id IS null'
Expand Down Expand Up @@ -40,7 +41,7 @@ def human_status_by_name(name)

def as_json(options = nil)
options ||= {} # ActiveSupport seems to pass nil here?
super(:only => JSON_ATTRS[options[:for] || :default])
super(:only => JSON_ATTRS[options[:for] || :default]) #.compact
end


Expand Down
12 changes: 12 additions & 0 deletions lib/core_ext/hash/compact.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
class Hash
def compact
dup.compact!
end

def compact!
keys.each do |key|
delete(key) if self[key].nil?
end
self
end
end unless {}.respond_to?(:compact)
7 changes: 4 additions & 3 deletions lib/travis.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
module Travis
autoload :Buildable, 'travis/buildable'
autoload :Builder, 'travis/builder'
autoload :Config, 'travis/config'
autoload :Buildable, 'travis/buildable'
autoload :Builder, 'travis/builder'
autoload :Config, 'travis/config'
autoload :Synchronizer, 'travis/synchronizer'

class << self
attr_accessor :pusher
Expand Down
4 changes: 2 additions & 2 deletions lib/travis/builder/rails.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ def on_log(chars)

def on_finish
super
post('log' => log, 'status' => result, 'finished_at' => Time.now)
end
post('status' => result, 'finished_at' => Time.now) # 'log' => log,
end

protected
def msg_id
Expand Down
64 changes: 64 additions & 0 deletions lib/travis/synchronizer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
module Travis
class Synchronizer < Array
class << self
def timeout
@@timeout ||= 2
end

def timeout=(timeout)
@@timeout = timeout
end

def synchronizers
@@synchronizers ||= {}
end

def receive(id, msg_id, &forward)
synchronizers[id] ||= new(id)
synchronizers[id].receive(msg_id, &forward)
end
end

attr_reader :id, :last_id, :finalizer

def initialize(id)
@id = id
@last_id = 0
end

def receive(msg_id, &forward)
if msg_id
synchronize(msg_id, &forward)
else
forward.call
end
end

def synchronize(msg_id, &forward)
with_finalizer do
push([msg_id.to_i, forward])
sort!
while !empty? && last_id == self[0][0].to_i - 1
@last_id = self[0][0].to_i
shift[1].call
end
end
end

def with_finalizer
EM.cancel_timer(finalizer)
yield
@finalizer = EM.add_timer(self.class.timeout) { finalize }
end

def finalize
shift[1].call until empty?
self.class.synchronizers.delete(id)
end

def sort!
super { |lft, rgt| lft[0] <=> rgt[0] }
end
end
end

14 changes: 14 additions & 0 deletions play/sync_messages.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
require 'rubygems'
require 'eventmachine'
require 'travis'

EM.run do
Travis::Synchronizer.timeout = 0.2

Travis::Synchronizer.receive(1, 2) { p 2 }
Travis::Synchronizer.receive(1, 1) { p 1 }
Travis::Synchronizer.receive(1, 4) { p 4 }

EM.add_timer(0.3) { EM.stop }
end

2 changes: 1 addition & 1 deletion test/functional/travis/builder/rails_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def work!
end

test 'updates the build record on finish' do
builder.expects(:post).with('log' => '', 'status' => nil, 'finished_at' => Time.now)
builder.expects(:post).with('status' => nil, 'finished_at' => Time.now) # 'log' => '',
work!
end
end
Loading

0 comments on commit 030560e

Please sign in to comment.