Account Monitoring

Real-time monitoring of Solana account updates with filtering options and reliable reconnection handling.

Account Monitoring with Yellowstone gRPC

This guide demonstrates how to use Yellowstone gRPC to monitor account updates on Solana. You'll learn to use different account filtering options and process account data with reliable reconnection handling.

TypeScript Implementation

import Client, { CommitmentLevel, SubscribeRequest } from "@triton-one/yellowstone-grpc";
import * as bs58 from 'bs58';

class GrpcStreamManager {
    private client: Client;
    private stream: any;
    private isConnected: boolean = false;
    private reconnectAttempts: number = 0;
    private readonly maxReconnectAttempts: number = 10;
    private readonly reconnectInterval: number = 5000; // 5 seconds
    private readonly dataHandler: (data: any) => void;

    constructor(
        endpoint: string,
        authToken: string,
        dataHandler: (data: any) => void
    ) {
        this.client = new Client(
            endpoint,
            authToken,
            { "grpc.max_receive_message_length": 64 * 1024 * 1024 }
        );
        this.dataHandler = dataHandler;
    }

    async connect(subscribeRequest: SubscribeRequest): Promise<void> {
        try {
            this.stream = await this.client.subscribe();
            this.isConnected = true;
            this.reconnectAttempts = 0;

            // Set up stream handlers
            this.stream.on("data", this.handleData.bind(this));
            this.stream.on("error", this.handleError.bind(this));
            this.stream.on("end", () => this.handleDisconnect(subscribeRequest));
            this.stream.on("close", () => this.handleDisconnect(subscribeRequest));

            // Send initial subscription request
            await this.write(subscribeRequest);

            // Start keep-alive ping
            this.startPing();
        } catch (error) {
            console.error("Connection error:", error);
            await this.reconnect(subscribeRequest);
        }
    }

    private async write(req: SubscribeRequest): Promise<void> {
        return new Promise((resolve, reject) => {
            this.stream.write(req, (err: any) => err ? reject(err) : resolve());
        });
    }

    private async reconnect(subscribeRequest: SubscribeRequest): Promise<void> {
        if (this.reconnectAttempts >= this.maxReconnectAttempts) {
            console.error("Max reconnection attempts reached");
            return;
        }

        this.reconnectAttempts++;
        console.log(`Reconnecting... Attempt ${this.reconnectAttempts}`);

        setTimeout(async () => {
            try {
                await this.connect(subscribeRequest);
            } catch (error) {
                console.error("Reconnection failed:", error);
                await this.reconnect(subscribeRequest);
            }
        }, this.reconnectInterval * Math.min(this.reconnectAttempts, 5));
    }

    private startPing(): void {
        setInterval(() => {
            if (this.isConnected) {
                this.write({
                    ping: { id: 1 },
                    accounts: {},
                    accountsDataSlice: [],
                    transactions: {},
                    blocks: {},
                    blocksMeta: {},
                    entry: {},
                    slots: {},
                    transactionsStatus: {},
                }).catch(console.error);
            }
        }, 30000);
    }

    private handleData(data: any): void {
        try {
            const processed = this.processBuffers(data);
            this.dataHandler(processed);
        } catch (error) {
            console.error("Error processing data:", error);
        }
    }

    private handleError(error: any): void {
        console.error("Stream error:", error);
        this.isConnected = false;
    }

    private handleDisconnect(subscribeRequest: SubscribeRequest): void {
        console.log("Stream disconnected");
        this.isConnected = false;
        this.reconnect(subscribeRequest);
    }

    private processBuffers(obj: any): any {
        if (!obj) return obj;
        if (Buffer.isBuffer(obj) || obj instanceof Uint8Array) {
            return bs58.default.encode(obj);
        }
        if (Array.isArray(obj)) {
            return obj.map(item => this.processBuffers(item));
        }
        if (typeof obj === 'object') {
            return Object.fromEntries(
                Object.entries(obj).map(([k, v]) => [k, this.processBuffers(v)])
            );
        }
        return obj;
    }
}

// Account monitoring implementation
async function monitorAccounts() {
    const manager = new GrpcStreamManager(
        "your-dedicated-node-url:2053",
        "x-token",
        handleAccountUpdate
    );

    // Create subscription request with USDC token mint account
    const subscribeRequest: SubscribeRequest = {
        accounts: {
            accountSubscribe: {
                account: ["EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v"], // USDC mint account
                owner: [],
                filters: []
            }
        },
        accountsDataSlice: [],
        commitment: CommitmentLevel.CONFIRMED,
        slots: {},
        transactions: {},
        transactionsStatus: {},
        blocks: {},
        blocksMeta: {},
        entry: {}
    };

    await manager.connect(subscribeRequest);
}

function handleAccountUpdate(data: any): void {
    if (data.account) {
        const accountInfo = data.account.account;
        console.log(`Account Update:
            Pubkey: ${accountInfo.pubkey}
            Owner: ${accountInfo.owner}
            Lamports: ${accountInfo.lamports}
            Executable: ${accountInfo.executable}
            Data Length: ${accountInfo.data.length}
        `);
    }
}

monitorAccounts().catch(console.error);

Account Subscription Options

1. Monitor Specific Accounts

const subscribeRequest: SubscribeRequest = {
    accounts: {
        accountSubscribe: {
            account: ["EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v"],
        }
    },
    accountsDataSlice: [],
    commitment: CommitmentLevel.CONFIRMED,
};

2. Monitor by Program (Owner)

const subscribeRequest: SubscribeRequest = {
    accounts: {
        accountSubscribe: {
            owner: ["TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"],
        }
    },
    accountsDataSlice: [],
    commitment: CommitmentLevel.CONFIRMED,
};

3. Monitor with Data Filters

const subscribeRequest: SubscribeRequest = {
    accounts: {
        accountSubscribe: {
            filters: [
                {
                    memcmp: {
                        offset: 0,
                        bytes: bs58.encode(Buffer.from("your-data-pattern"))
                    }
                },
                {
                    dataSize: 165 // Specific account data size
                }
            ]
        }
    },
    accountsDataSlice: [],
    commitment: CommitmentLevel.CONFIRMED,
};

4. Monitor with Data Slices

const subscribeRequest: SubscribeRequest = {
    accounts: {
        accountSubscribe: {
            owner: ["TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"],
        }
    },
    accountsDataSlice: [
        {
            offset: 0,
            length: 32 // Get first 32 bytes only
        }
    ],
    commitment: CommitmentLevel.CONFIRMED,
};

Account Data Structure

The account update data includes:

  1. Basic Information:

    • Pubkey (account address)

    • Owner (program ID)

    • Lamports (balance)

    • Executable flag

    • Rent epoch

  2. Account Data:

    • Raw data (base58 encoded)

    • Data length

    • Write version

  3. Update Metadata:

    • Slot number

    • Startup flag (indicates initial load)

Rust Implementation

use {
    anyhow::Result,
    futures::{stream::StreamExt, sink::SinkExt},
    log::error,
    std::{collections::HashMap, sync::Arc, time::Duration},
    tokio::sync::Mutex,
    tonic::{metadata::errors::InvalidMetadataValue, transport::Endpoint},
    yellowstone_grpc_client::{GeyserGrpcClient, InterceptorXToken},
    yellowstone_grpc_proto::{
        geyser::{
            geyser_client::GeyserClient,
            SubscribeRequest,
            SubscribeRequestFilterAccounts,
            subscribe_update::UpdateOneof,
        },
        prelude::{CommitmentLevel, SubscribeRequestPing},
    },
    tonic_health::pb::health_client::HealthClient,
    solana_sdk::pubkey::Pubkey,
    hex,
};

/// Manager for handling gRPC stream connections and account updates
struct GrpcStreamManager {
    client: GeyserGrpcClient<InterceptorXToken>,
    is_connected: bool,
    reconnect_attempts: u32,
    max_reconnect_attempts: u32,
    reconnect_interval: Duration,
}

impl GrpcStreamManager {
    /// Handles account update messages from the gRPC stream
    /// This function can be customized based on your requirements:
    /// - Store updates in a database
    /// - Trigger specific actions based on account changes
    /// - Filter for specific types of updates
    /// - Transform data into your required format
    /// 
    /// # Arguments
    /// * `slot` - The slot number when the update occurred
    /// * `account_info` - The account information containing all update details
    fn handle_account_update(&self, slot: u64, account_info: &yellowstone_grpc_proto::geyser::SubscribeUpdateAccountInfo) {
        println!(
            "ACCOUNT UPDATE RECEIVED:\nSlot: {}\nPubkey: {}\nLamports: {}\nOwner: {}\nData length: {}\nExecutable: {}\nWrite version: {}\n",
            slot,
            Pubkey::try_from(account_info.pubkey.clone()).expect("valid pubkey"),
            account_info.lamports,
            Pubkey::try_from(account_info.owner.clone()).expect("valid pubkey"),
            account_info.data.len(),
            account_info.executable,
            account_info.write_version
        );
        if !account_info.data.is_empty() {
            println!("Data (hex): {}", hex::encode(&account_info.data));
        }
    }

    /// Creates a new GrpcStreamManager instance
    /// 
    /// # Arguments
    /// * `endpoint` - The gRPC endpoint URL
    /// * `x_token` - Authentication token for the endpoint
    async fn new(endpoint: &str, x_token: &str) -> Result<Arc<Mutex<GrpcStreamManager>>> {
        let interceptor = InterceptorXToken {
            x_token: Some(x_token.parse().map_err(|e: InvalidMetadataValue| anyhow::Error::from(e))?),
            x_request_snapshot: true,
        };

        let channel = Endpoint::from_shared(endpoint.to_string())?
            .connect_timeout(Duration::from_secs(10))
            .timeout(Duration::from_secs(10))
            .connect()
            .await
            .map_err(|e| anyhow::Error::from(e))?;

        let client = GeyserGrpcClient::new(
            HealthClient::with_interceptor(channel.clone(), interceptor.clone()),
            GeyserClient::with_interceptor(channel, interceptor),
        );

        Ok(Arc::new(Mutex::new(GrpcStreamManager {
            client,
            is_connected: false,
            reconnect_attempts: 0,
            max_reconnect_attempts: 10,
            reconnect_interval: Duration::from_secs(5),
        })))
    }

    /// Establishes connection and handles the subscription stream
    /// 
    /// # Arguments
    /// * `request` - The subscription request containing account filters and other parameters
    async fn connect(&mut self, request: SubscribeRequest) -> Result<()> {
        let request = request.clone();
        let (mut subscribe_tx, mut stream) = self.client.subscribe_with_request(Some(request.clone())).await?;

        self.is_connected = true;
        self.reconnect_attempts = 0;

        while let Some(message) = stream.next().await {
            match message {
                Ok(msg) => {
                    match msg.update_oneof {
                        Some(UpdateOneof::Account(account)) => {
                            if let Some(account_info) = account.account {
                                self.handle_account_update(account.slot, &account_info);
                            }
                        }
                        Some(UpdateOneof::Ping(_)) => {
                            subscribe_tx
                                .send(SubscribeRequest {
                                    ping: Some(SubscribeRequestPing { id: 1 }),
                                    ..Default::default()
                                })
                                .await?;
                        }
                        Some(UpdateOneof::Pong(_)) => {} // Ignore pong responses
                        _ => {
                            println!("Other update received: {:?}", msg);
                        }
                    }
                }
                Err(err) => {
                    error!("Error: {:?}", err);
                    self.is_connected = false;
                    Box::pin(self.reconnect(request.clone())).await?;
                    break;
                }
            }
        }

        Ok(())
    }

    /// Attempts to reconnect when the connection is lost
    /// 
    /// # Arguments
    /// * `request` - The original subscription request to reestablish the connection
    async fn reconnect(&mut self, request: SubscribeRequest) -> Result<()> {
        if self.reconnect_attempts >= self.max_reconnect_attempts {
            println!("Max reconnection attempts reached");
            return Ok(());
        }

        self.reconnect_attempts += 1;
        println!("Reconnecting... Attempt {}", self.reconnect_attempts);

        let backoff = self.reconnect_interval * std::cmp::min(self.reconnect_attempts, 5);
        tokio::time::sleep(backoff).await;

        Box::pin(self.connect(request)).await
    }
}

#[tokio::main]
async fn main() -> Result<()> {
    // Initialize gRPC stream manager
    let manager = GrpcStreamManager::new(
        "your-grpc-url:2053",
        "your-x-token",
    ).await?;

    let mut manager_lock = manager.lock().await;

    // Create subscription request for USDC mint account
    let request = SubscribeRequest {
        accounts: HashMap::from_iter(vec![(
            "client".to_string(),
            SubscribeRequestFilterAccounts {
                account: vec!["EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v".to_string()],
                owner: vec![],
                filters: vec![],
            },
        )]),
        commitment: Some(CommitmentLevel::Confirmed as i32),
        ..Default::default()
    };

    // Start the subscription
    let result = manager_lock.connect(request).await;
    if let Err(e) = &result {
        println!("Subscription error: {:?}", e);
    }
    result?;

    Ok(())
}

Go Implementation

package main

import (
	"context"
	"crypto/tls"
	"encoding/hex"
	"fmt"
	"log"
	"sync"
	"time"

	pb "github.com/rpcpool/yellowstone-grpc/examples/golang/proto"

	"github.com/mr-tron/base58"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials"
	"google.golang.org/grpc/metadata"
)

type GrpcStreamManager struct {
	mu                   sync.Mutex
	conn                 *grpc.ClientConn
	client               pb.GeyserClient
	stream               pb.Geyser_SubscribeClient
	isConnected          bool
	reconnectAttempts    int
	maxReconnectAttempts int
	reconnectInterval    time.Duration
	dataHandler          func(*pb.SubscribeUpdate)
	xToken               string
}

func NewGrpcStreamManager(endpoint string, xToken string, dataHandler func(*pb.SubscribeUpdate)) (*GrpcStreamManager, error) {
	// Create gRPC connection with interceptor for x-token
	ctx := metadata.NewOutgoingContext(
		context.Background(),
		metadata.New(map[string]string{"x-token": xToken}),
	)

	// Configure TLS
	config := &tls.Config{
		InsecureSkipVerify: true,
	}

	conn, err := grpc.DialContext(ctx, endpoint,
		grpc.WithTransportCredentials(credentials.NewTLS(config)),
		grpc.WithInitialWindowSize(1<<30),
		grpc.WithInitialConnWindowSize(1<<30),
		grpc.WithDefaultCallOptions(
			grpc.MaxCallSendMsgSize(64*1024*1024),
			grpc.MaxCallRecvMsgSize(64*1024*1024),
		),
	)
	if err != nil {
		return nil, fmt.Errorf("failed to connect: %v", err)
	}

	return &GrpcStreamManager{
		conn:                 conn,
		client:               pb.NewGeyserClient(conn),
		isConnected:          false,
		reconnectAttempts:    0,
		maxReconnectAttempts: 10,
		reconnectInterval:    5 * time.Second,
		dataHandler:          dataHandler,
		xToken:               xToken,
	}, nil
}

func (m *GrpcStreamManager) Connect(ctx context.Context, req *pb.SubscribeRequest) error {
	m.mu.Lock()
	defer m.mu.Unlock()

	log.Println("Attempting to connect...")

	// Add x-token to context
	ctx = metadata.NewOutgoingContext(
		ctx,
		metadata.New(map[string]string{"x-token": m.xToken}),
	)

	stream, err := m.client.Subscribe(ctx)
	if err != nil {
		log.Printf("Failed to subscribe: %v", err)
		return m.reconnect(ctx, req)
	}

	if err := stream.Send(req); err != nil {
		log.Printf("Failed to send request: %v", err)
		return m.reconnect(ctx, req)
	}

	m.stream = stream
	m.isConnected = true
	m.reconnectAttempts = 0
	log.Println("Connection established")

	// Start ping goroutine
	go func() {
		ticker := time.NewTicker(30 * time.Second)
		defer ticker.Stop()

		for {
			select {
			case <-ctx.Done():
				return
			case <-ticker.C:
				if !m.isConnected {
					return
				}
				pingReq := &pb.SubscribeRequest{
					Ping: &pb.SubscribeRequestPing{Id: 1},
				}
				if err := m.stream.Send(pingReq); err != nil {
					log.Printf("Ping failed: %v", err)
					m.handleDisconnect(ctx, req)
					return
				}
			}
		}
	}()

	// Process updates
	go func() {
		for {
			update, err := m.stream.Recv()
			if err != nil {
				log.Printf("Stream error: %v", err)
				m.handleDisconnect(ctx, req)
				return
			}

			m.dataHandler(update)
		}
	}()

	return nil
}

func (m *GrpcStreamManager) reconnect(ctx context.Context, req *pb.SubscribeRequest) error {
	if m.reconnectAttempts >= m.maxReconnectAttempts {
		return fmt.Errorf("max reconnection attempts reached")
	}

	m.reconnectAttempts++
	log.Printf("Reconnecting... Attempt %d", m.reconnectAttempts)

	backoff := m.reconnectInterval * time.Duration(min(m.reconnectAttempts, 5))
	time.Sleep(backoff)

	return m.Connect(ctx, req)
}

func (m *GrpcStreamManager) handleDisconnect(ctx context.Context, req *pb.SubscribeRequest) {
	m.mu.Lock()
	defer m.mu.Unlock()

	if !m.isConnected {
		return
	}

	m.isConnected = false
	if err := m.reconnect(ctx, req); err != nil {
		log.Printf("Failed to reconnect: %v", err)
	}
}

func (m *GrpcStreamManager) Close() error {
	m.mu.Lock()
	defer m.mu.Unlock()

	m.isConnected = false
	if m.conn != nil {
		return m.conn.Close()
	}
	return nil
}

func min(a, b int) int {
	if a < b {
		return a
	}
	return b
}

func handleAccountUpdate(update *pb.SubscribeUpdate) {
	if account := update.GetAccount(); account != nil {
		pubkey := base58.Encode(account.GetAccount().GetPubkey())
		owner := base58.Encode(account.GetAccount().GetOwner())

		log.Printf("Account Update:\n")
		log.Printf("  Pubkey: %s\n", pubkey)
		log.Printf("  Owner: %s\n", owner)
		log.Printf("  Lamports: %d\n", account.GetAccount().GetLamports())
		log.Printf("  Executable: %v\n", account.GetAccount().GetExecutable())
		log.Printf("  Data Length: %d\n", len(account.GetAccount().GetData()))
		if len(account.GetAccount().GetData()) > 0 {
			log.Printf("  Data (hex): %s\n", hex.EncodeToString(account.GetAccount().GetData()))
		}
	}
}

func main() {
	ctx := context.Background()

	// Create manager with data handler and x-token
	manager, err := NewGrpcStreamManager(
		"your-grpc-url:2053",
        	"your-x-token",
		handleAccountUpdate,
	)
	if err != nil {
		log.Fatal(err)
	}
	defer manager.Close()

	// Create subscription request for USDC mint account
	accounts := make(map[string]*pb.SubscribeRequestFilterAccounts)
	accounts["client"] = &pb.SubscribeRequestFilterAccounts{
		Account: []string{"EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v"},
	}

	commitment := pb.CommitmentLevel_CONFIRMED
	req := &pb.SubscribeRequest{
		Accounts:   accounts,
		Commitment: &commitment,
	}

	// Connect and handle updates
	if err := manager.Connect(ctx, req); err != nil {
		log.Fatal(err)
	}

	// Keep the main goroutine running
	select {}
}

Last updated

Was this helpful?