// Copyright 2015 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

package bindings

import (
	"fmt"
	"sync"

	"mojo/public/go/system"
)

// MessageReadResult contains information returned after reading and parsing
// a message: a non-nil error of a valid message.
type MessageReadResult struct {
	Message *Message
	Error   error
}

// routeRequest is a request sent from Router to routerWorker.
type routeRequest struct {
	// The outgoing message with non-zero request id.
	message *Message
	// The channel to send respond for the message.
	responseChan chan<- MessageReadResult
}

// routerWorker sends messages that require a response and and routes responses
// to appropriate receivers. The work is done on a separate go routine.
type routerWorker struct {
	// The message pipe handle to send requests and receive responses.
	handle system.MessagePipeHandle
	// Map from request id to response channel.
	responders map[uint64]chan<- MessageReadResult
	// The channel of incoming requests that require responses.
	requestChan <-chan routeRequest
	// The channel that indicates that the worker should terminate.
	done <-chan struct{}
	// Implementation of async waiter.
	waiter   AsyncWaiter
	waitChan chan WaitResponse
	waitId   AsyncWaitId
}

// readOutstandingMessages reads and dispatches available messages in the
// message pipe until the messages is empty or there are no waiting responders.
// If the worker is currently waiting on the message pipe, returns immediately
// without an error.
func (w *routerWorker) readAndDispatchOutstandingMessages() error {
	if w.waitId != 0 {
		// Still waiting for a new message in the message pipe.
		return nil
	}
	for len(w.responders) > 0 {
		result, bytes, handles := w.handle.ReadMessage(system.MOJO_READ_MESSAGE_FLAG_NONE)
		if result == system.MOJO_RESULT_SHOULD_WAIT {
			w.waitId = w.waiter.AsyncWait(w.handle, system.MOJO_HANDLE_SIGNAL_READABLE, w.waitChan)
			return nil
		}
		if result != system.MOJO_RESULT_OK {
			return fmt.Errorf("error reading message: %v", result)
		}
		message, err := ParseMessage(bytes, handles)
		if err != nil {
			return err
		}
		id := message.Header.RequestId
		w.responders[id] <- MessageReadResult{message, nil}
		delete(w.responders, id)
	}
	return nil
}

func (w *routerWorker) cancelIfWaiting() {
	if w.waitId != 0 {
		w.waiter.CancelWait(w.waitId)
		w.waitId = 0
	}
}

// runLoop is the main run loop of the worker. It processes incoming requests
// from Router and waits on a message pipe for new messages.
// Returns an error describing the cause of stopping.
func (w *routerWorker) runLoop() error {
	for {
		select {
		case waitResponse := <-w.waitChan:
			w.waitId = 0
			if waitResponse.Result != system.MOJO_RESULT_OK {
				return fmt.Errorf("error waiting for message: %v", waitResponse.Result)
			}
		case request := <-w.requestChan:
			if err := WriteMessage(w.handle, request.message); err != nil {
				return err
			}
			if request.responseChan != nil {
				w.responders[request.message.Header.RequestId] = request.responseChan
			}
		case <-w.done:
			return fmt.Errorf("message pipe is closed")
		}
		// Returns immediately without an error if still waiting for
		// a new message.
		if err := w.readAndDispatchOutstandingMessages(); err != nil {
			return err
		}
	}
}

// Router sends messages to a message pipe and routes responses back to senders
// of messages with non-zero request ids. The caller should issue unique request
// ids for each message given to the router.
type Router struct {
	// Mutex protecting requestChan from new requests in case the router is
	// closed and the handle.
	mu sync.Mutex
	// The message pipe handle to send requests and receive responses.
	handle system.MessagePipeHandle
	// Channel to communicate with worker.
	requestChan chan<- routeRequest

	// Channel to stop the worker.
	done chan<- struct{}
}

// NewRouter returns a new Router instance that sends and receives messages
// from a provided message pipe handle.
func NewRouter(handle system.MessagePipeHandle, waiter AsyncWaiter) *Router {
	requestChan := make(chan routeRequest, 10)
	doneChan := make(chan struct{})
	router := &Router{
		handle:      handle,
		requestChan: requestChan,
		done:        doneChan,
	}
	router.runWorker(&routerWorker{
		handle,
		make(map[uint64]chan<- MessageReadResult),
		requestChan,
		doneChan,
		waiter,
		make(chan WaitResponse, 1),
		0,
	})
	return router
}

// Close closes the router and the underlying message pipe. All new incoming
// requests are returned with an error. Panics if you try to close the router
// more than once.
func (r *Router) Close() {
	close(r.done)
}

// Accept sends a message to the message pipe. The message should have a
// zero request id in header.
func (r *Router) Accept(message *Message) error {
	if message.Header.RequestId != 0 {
		return fmt.Errorf("message header should have a zero request ID")
	}
	r.mu.Lock()
	defer r.mu.Unlock()
	// This can also mean that the router is closed.
	if !r.handle.IsValid() {
		return fmt.Errorf("can't write a message to an invalid handle")
	}
	r.requestChan <- routeRequest{message, nil}
	return nil
}

func (r *Router) runWorker(worker *routerWorker) {
	// Run worker on a separate go routine.
	go func() {
		// Get the reason why the worker stopped. The error means that
		// either the router is closed or there was an error reading
		// or writing to a message pipe. In both cases it will be
		// the reason why we can't process any more requests.
		err := worker.runLoop()
		worker.cancelIfWaiting()
		// Respond to all pending requests.
		for _, responseChan := range worker.responders {
			responseChan <- MessageReadResult{nil, err}
		}
		// Respond to incoming requests until we make sure that all
		// new requests return with an error before sending request
		// to responseChan.
		go func() {
			for responder := range worker.requestChan {
				responder.responseChan <- MessageReadResult{nil, err}
			}
		}()
		r.mu.Lock()
		r.handle.Close()
		// If we acquire the lock then no other go routine is waiting
		// to write to responseChan. All go routines that acquire the
		// lock after us will return before sending to responseChan as
		// the underlying handle is invalid (already closed).
		// We can safely close the requestChan.
		close(r.requestChan)
		r.mu.Unlock()
	}()
}

// AcceptWithResponse sends a message to the message pipe and returns a channel
// that will stream the result of reading corresponding response. The message
// should have a non-zero request id in header. It is responsibility of the
// caller to issue unique request ids for all given messages.
func (r *Router) AcceptWithResponse(message *Message) <-chan MessageReadResult {
	responseChan := make(chan MessageReadResult, 1)
	if message.Header.RequestId == 0 {
		responseChan <- MessageReadResult{nil, fmt.Errorf("message header should have a request ID")}
		return responseChan
	}
	r.mu.Lock()
	defer r.mu.Unlock()
	// Return an error before sending a request to requestChan if the router
	// is closed so that we can safely close responseChan once we close the
	// router.
	if !r.handle.IsValid() {
		responseChan <- MessageReadResult{nil, fmt.Errorf("can't write a message to an invalid handle")}
		return responseChan
	}
	r.requestChan <- routeRequest{message, responseChan}
	return responseChan
}
