blob: 24f1e8ce0b41e73185b684deb9cb91765905c99f [file] [log] [blame] [edit]
package session
import (
"context"
"encoding/json"
"fmt"
"io"
"time"
"github.com/mafredri/cdp"
"github.com/mafredri/cdp/internal/errors"
"github.com/mafredri/cdp/protocol/target"
"github.com/mafredri/cdp/rpcc"
)
// session represents a session connection to a target.
type session struct {
ID target.SessionID
TargetID target.ID
recvC chan []byte
send func([]byte) error
init chan struct{} // Protect conn from early read.
conn *rpcc.Conn
}
// Ensure that session implements rpcc.Codec.
var _ rpcc.Codec = (*session)(nil)
// WriteRequest implements rpcc.Codec.
func (s *session) WriteRequest(r *rpcc.Request) error {
data, err := json.Marshal(r)
if err != nil {
return err
}
return s.send(data)
}
// ReadResponse implements rpcc.Codec.
func (s *session) ReadResponse(r *rpcc.Response) error {
<-s.init
select {
case m := <-s.recvC:
return json.Unmarshal(m, r)
case <-s.conn.Context().Done():
return s.conn.Context().Err()
}
}
// Conn returns the underlying *rpcc.Conn that uses session as codec.
func (s *session) Conn() *rpcc.Conn { return s.conn }
// Write forwards a target message to the session connection.
// When write returns an error, the session is closed.
func (s *session) Write(data []byte) error {
select {
case s.recvC <- data:
return nil
case <-s.conn.Context().Done():
return s.conn.Context().Err()
}
}
// Close closes the underlying *rpcc.Conn.
func (s *session) Close() error {
return s.conn.Close()
}
var (
// We only handle Close on conn to detach the session. The codec
// handles the actual transport (Read / Write) in this case.
sessionDetachConn = func(detach func() error) rpcc.DialOption {
return rpcc.WithDialer(
func(_ context.Context, _ string) (io.ReadWriteCloser, error) {
return &closeConn{close: detach}, nil
},
)
}
sessionCodec = func(s *session) rpcc.DialOption {
return rpcc.WithCodec(func(_ io.ReadWriter) rpcc.Codec {
return s
})
}
)
// dial attaches to the target via the provided *cdp.Client and creates
// a lightweight RPC connection to the target. Communication is done via
// the underlying *rpcc.Conn for the provided *cdp.Client.
func dial(ctx context.Context, id target.ID, tc *cdp.Client, detachTimeout time.Duration) (s *session, err error) {
args := target.NewAttachToTargetArgs(id)
reply, err := tc.Target.AttachToTarget(ctx, args)
if err != nil {
return nil, err
}
s = &session{
TargetID: id,
ID: reply.SessionID,
recvC: make(chan []byte, 1),
init: make(chan struct{}),
send: func(data []byte) error {
<-s.init
// TODO(maf): Use async invocation.
return tc.Target.SendMessageToTarget(s.conn.Context(),
target.NewSendMessageToTargetArgs(string(data)).
SetSessionID(s.ID))
},
}
detach := func() error {
ctx, cancel := context.WithTimeout(context.Background(), detachTimeout)
defer cancel()
err := tc.Target.DetachFromTarget(ctx,
target.NewDetachFromTargetArgs().SetSessionID(s.ID))
err = cdp.ErrorCause(err)
if err == context.DeadlineExceeded {
return fmt.Errorf("session: detach timed out for session %s", s.ID)
}
return errors.Wrapf(err, "session: detach failed for session %s", s.ID)
}
s.conn, err = rpcc.DialContext(ctx, "", sessionDetachConn(detach), sessionCodec(s))
if err != nil {
return nil, err
}
close(s.init)
return s, nil
}