Transaction Monitoring

Track Solana transactions in real-time with program filtering and detailed execution data.

Transaction Monitoring with Yellowstone gRPC

This guide demonstrates how to use Yellowstone gRPC to monitor transactions on Solana. You'll learn how to track transactions in real-time with filtering options and reliable reconnection handling.

TypeScript Implementation

import Client, { CommitmentLevel, SubscribeRequest } from "@triton-one/yellowstone-grpc";
import * as bs58 from 'bs58';

class GrpcStreamManager {
    private client: Client;
    private stream: any;
    private isConnected: boolean = false;
    private reconnectAttempts: number = 0;
    private readonly maxReconnectAttempts: number = 10;
    private readonly reconnectInterval: number = 5000; // 5 seconds
    private readonly dataHandler: (data: any) => void;

    constructor(
        endpoint: string,
        authToken: string,
        dataHandler: (data: any) => void
    ) {
        this.client = new Client(
            endpoint,
            authToken,
            { "grpc.max_receive_message_length": 64 * 1024 * 1024 }
        );
        this.dataHandler = dataHandler;
    }

    async connect(subscribeRequest: SubscribeRequest): Promise<void> {
        try {
            this.stream = await this.client.subscribe();
            this.isConnected = true;
            this.reconnectAttempts = 0;

            this.stream.on("data", this.handleData.bind(this));
            this.stream.on("error", this.handleError.bind(this));
            this.stream.on("end", () => this.handleDisconnect(subscribeRequest));
            this.stream.on("close", () => this.handleDisconnect(subscribeRequest));

            await this.write(subscribeRequest);
            this.startPing();
        } catch (error) {
            console.error("Connection error:", error);
            await this.reconnect(subscribeRequest);
        }
    }

    private async write(req: SubscribeRequest): Promise<void> {
        return new Promise((resolve, reject) => {
            this.stream.write(req, (err: any) => err ? reject(err) : resolve());
        });
    }

    private async reconnect(subscribeRequest: SubscribeRequest): Promise<void> {
        if (this.reconnectAttempts >= this.maxReconnectAttempts) {
            console.error("Max reconnection attempts reached");
            return;
        }

        this.reconnectAttempts++;
        console.log(`Reconnecting... Attempt ${this.reconnectAttempts}`);

        setTimeout(async () => {
            try {
                await this.connect(subscribeRequest);
            } catch (error) {
                console.error("Reconnection failed:", error);
                await this.reconnect(subscribeRequest);
            }
        }, this.reconnectInterval * Math.min(this.reconnectAttempts, 5));
    }

    private startPing(): void {
        setInterval(() => {
            if (this.isConnected) {
                this.write({
                    ping: { id: 1 },
                    accounts: {},
                    accountsDataSlice: [],
                    transactions: {},
                    blocks: {},
                    blocksMeta: {},
                    entry: {},
                    slots: {},
                    transactionsStatus: {},
                }).catch(console.error);
            }
        }, 30000);
    }

    private handleData(data: any): void {
        try {
            const processed = this.processBuffers(data);
            this.dataHandler(processed);
        } catch (error) {
            console.error("Error processing data:", error);
        }
    }

    private handleError(error: any): void {
        console.error("Stream error:", error);
        this.isConnected = false;
    }

    private handleDisconnect(subscribeRequest: SubscribeRequest): void {
        console.log("Stream disconnected");
        this.isConnected = false;
        this.reconnect(subscribeRequest);
    }

    private processBuffers(obj: any): any {
        if (!obj) return obj;
        if (Buffer.isBuffer(obj) || obj instanceof Uint8Array) {
            return bs58.default.encode(obj);
        }
        if (Array.isArray(obj)) {
            return obj.map(item => this.processBuffers(item));
        }
        if (typeof obj === 'object') {
            return Object.fromEntries(
                Object.entries(obj).map(([k, v]) => [k, this.processBuffers(v)])
            );
        }
        return obj;
    }
}

// Transaction monitoring implementation
async function monitorTransactions() {
    const manager = new GrpcStreamManager(
        "your-grpc-url:2053",
        "your-x-token",
        handleTransactionUpdate
    );

    // Create subscription request for monitoring program transactions
    const subscribeRequest: SubscribeRequest = {
        transactions: {
            accountInclude: ["TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"], // Token program
            accountExclude: [],
            accountRequired: [],
            vote: false,
            failed: false,
            signature: undefined
        },
        commitment: CommitmentLevel.CONFIRMED,
    };

    await manager.connect(subscribeRequest);
}

function handleTransactionUpdate(data: any): void {
    // Check if we have transaction data in the correct structure
    if (data?.transaction?.transaction) {
        const txInfo = data.transaction.transaction;
        console.log('\n=== Transaction Details ===');
        console.log(`Signature: ${txInfo.signature}`);
        console.log(`Slot: ${data.transaction.slot}`);
        console.log(`Is Vote: ${txInfo.isVote}`);
        
        if (txInfo.meta) {
            console.log(`Fee: ${txInfo.meta.fee} lamports`);
            console.log(`Compute Units: ${txInfo.meta.computeUnitsConsumed}`);
            console.log(`Status: ${txInfo.meta.err ? 'Failed' : 'Success'}`);

            // Token balance changes
            if (txInfo.meta.preTokenBalances?.length > 0) {
                console.log('\n=== Token Balance Changes ===');
                txInfo.meta.preTokenBalances.forEach((preBalance: any, index: number) => {
                    const postBalance = txInfo.meta.postTokenBalances[index];
                    if (preBalance && postBalance) {
                        console.log(`Token Account: ${preBalance.mint}`);
                        console.log(`  Pre Balance: ${preBalance.uiTokenAmount?.uiAmount}`);
                        console.log(`  Post Balance: ${postBalance.uiTokenAmount?.uiAmount}`);
                        if (preBalance.owner) {
                            console.log(`  Owner: ${preBalance.owner}`);
                        }
                    }
                });
            }

            // Log messages
            if (txInfo.meta.logMessages?.length > 0) {
                console.log('\n=== Program Logs ===');
                txInfo.meta.logMessages.forEach((log: string) => {
                    console.log(`  ${log}`);
                });
            }
        }

        // Transaction instructions
        if (txInfo.transaction?.message) {
            const message = txInfo.transaction.message;
            
            console.log('\n=== Account Keys ===');
            if (Array.isArray(message.accountKeys)) {
                message.accountKeys.forEach((key: Uint8Array, index: number) => {
                    try {
                        console.log(`  ${index}: ${bs58.default.encode(key)}`);
                    } catch (error) {
                        console.log(`  ${index}: [Unable to encode key]`);
                    }
                });
            }

            // Instructions
            // TODO: Add instructions processing logic
            if (Array.isArray(message.instructions)) {
                console.log('\n=== Instructions ===');
                message.instructions.forEach((ix: any, index: number) => {
                    try {
                        console.log(`\nInstruction ${index + 1}:`);
                        console.log(`  Program: ${bs58.default.encode(message.accountKeys[ix.programIdIndex])}`);
                        console.log(`  Accounts: ${ix.accounts.map((idx: number) => 
                            bs58.default.encode(message.accountKeys[idx])
                        )}`);
                        if (ix.data) {
                            console.log(`  Data: ${bs58.default.encode(ix.data)}`);
                        }
                    } catch (error) {
                        console.log(`  [Unable to decode instruction ${index + 1}]`);
                    }
                });
            }

            // Process inner instructions
            // TODO: Add inner instructions processing logic
            if (txInfo.meta?.innerInstructions?.length > 0) {
                console.log('\n=== Inner Instructions ===');
                txInfo.meta.innerInstructions.forEach((inner: any, index: number) => {
                    console.log(`\nInner Instruction Set ${index + 1}:`);
                    inner.instructions.forEach((ix: any, i: number) => {
                        try {
                            console.log(`  Instruction ${i + 1}:`);
                            console.log(`    Program: ${bs58.default.encode(message.accountKeys[ix.programIdIndex])}`);
                            console.log(`    Accounts: ${ix.accounts.map((idx: number) => 
                                bs58.default.encode(message.accountKeys[idx])
                            )}`);
                            if (ix.data) {
                                console.log(`    Data: ${bs58.default.encode(ix.data)}`);
                            }
                        } catch (error) {
                            console.log(`    [Unable to decode instruction]`);
                        }
                    });
                });
            }
        }

        console.log('\n' + '='.repeat(50) + '\n');
    }
}

// Start monitoring
monitorTransactions().catch(console.error);

Transaction Subscription Options

1. Monitor Program Transactions

const subscribeRequest: SubscribeRequest = {
    transactions: {
        accountInclude: ["TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"], // Token program
        accountExclude: [],
        accountRequired: [],
        vote: false,
        failed: false,
        signature: undefined
    },
    commitment: CommitmentLevel.CONFIRMED,
};

2. Monitor Multiple Programs

const subscribeRequest: SubscribeRequest = {
    transactions: {
        accountInclude: [
            "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA", // Token program
            "ATokenGPvbdGVxr1b2hvZbsiqW5xWH25efTNsLJA8knL", // Associated Token program
        ],
        accountExclude: [],
        accountRequired: [],
        vote: false,
        failed: true,
        signature: undefined
    },
    commitment: CommitmentLevel.CONFIRMED,
};

3. Monitor All Transactions

const subscribeRequest: SubscribeRequest = {
    transactions: {
        accountInclude: [],
        accountExclude: [],
        accountRequired: [],
        vote: true,
        failed: true,
        signature: undefined
    },
    commitment: CommitmentLevel.CONFIRMED,
};

Data Structures

  1. Transaction Update:

    • Slot number (when transaction was processed)

    • Transaction signature (unique identifier)

    • Transaction message

      • Account keys (involved accounts)

      • Instructions (program calls and data)

      • Recent blockhash

    • Transaction metadata

      • Status (success/failure)

      • Fee amount

      • Compute units consumed

      • Log messages

      • Inner instructions

      • Balance changes

  2. Instruction Details:

    • Program ID (program being called)

    • Account list (accounts used by instruction)

    • Instruction data (program-specific data)

    • Inner instructions (CPI calls)

Rust Implementation

use {
    anyhow::Result,
    futures::{stream::StreamExt, sink::SinkExt},
    log::error,
    std::{collections::HashMap, sync::Arc, time::Duration},
    tokio::sync::Mutex,
    tonic::{metadata::errors::InvalidMetadataValue, transport::Endpoint},
    yellowstone_grpc_client::{GeyserGrpcClient, InterceptorXToken},
    yellowstone_grpc_proto::{
        geyser::{
            geyser_client::GeyserClient,
            SubscribeRequest,
            SubscribeRequestFilterTransactions,
            subscribe_update::UpdateOneof,
            SubscribeUpdateTransaction,
        },
        prelude::{CommitmentLevel, SubscribeRequestPing},
    },
    tonic_health::pb::health_client::HealthClient,
    solana_sdk::pubkey::Pubkey,
    hex,
};

/// Manager for handling gRPC stream connections and transaction updates
struct GrpcStreamManager {
    client: GeyserGrpcClient<InterceptorXToken>,
    is_connected: bool,
    reconnect_attempts: u32,
    max_reconnect_attempts: u32,
    reconnect_interval: Duration,
}

impl GrpcStreamManager {
    /// Handles transaction update messages from the gRPC stream
    /// This function can be customized based on your requirements:
    /// - Store transactions in a database
    /// - Trigger specific actions based on transaction contents
    /// - Filter for specific types of transactions
    /// - Transform data into your required format
    /// 
    /// # Arguments
    /// * `transaction_update` - The transaction update containing all details
    fn handle_transaction_update(&self, transaction_update: &SubscribeUpdateTransaction) {
        if let Some(transaction) = &transaction_update.transaction {
            println!("Transaction Update:");
            println!("  Signature: {}", hex::encode(&transaction.signature));
            
            if let Some(meta) = &transaction.meta {
                println!("  Status: {}", if meta.err.is_none() { "Success" } else { "Failed" });
                println!("  Fee: {:?}", meta.fee);
                println!("  Compute Units: {:?}", meta.compute_units_consumed);
            }

            if let Some(transaction_message) = &transaction.transaction {
                if let Some(message) = &transaction_message.message {
                    println!("Account Keys:");
                    for (i, key) in message.account_keys.iter().enumerate() {
                        if let Ok(pubkey) = Pubkey::try_from(key.as_slice()) {
                            println!("  {}: {}", i, pubkey);
                        } else {
                            println!("  {}: {}", i, hex::encode(key));
                        }
                    }

                    println!("Instructions:");
                    for (i, instruction) in message.instructions.iter().enumerate() {
                        println!("Instruction {}:", i + 1);
                        if let Some(program_key) = message.account_keys.get(instruction.program_id_index as usize) {
                            if let Ok(pubkey) = Pubkey::try_from(program_key.as_slice()) {
                                println!("  Program: {}", pubkey);
                            } else {
                                println!("  Program: {}", hex::encode(program_key));
                            }
                        }
                        
                        println!("  Accounts:");
                        for &account_idx in &instruction.accounts {
                            if let Some(key) = message.account_keys.get(account_idx as usize) {
                                if let Ok(pubkey) = Pubkey::try_from(key.as_slice()) {
                                    println!("    {}", pubkey);
                                } else {
                                    println!("    {}", hex::encode(key));
                                }
                            }
                        }
                        println!("  Data: {}", hex::encode(&instruction.data));
                    }

                    if let Some(meta) = &transaction.meta {
                        if !meta.log_messages.is_empty() {
                            println!("Log Messages:");
                            for log in &meta.log_messages {
                                println!("  {}", log);
                            }
                        }

                        println!("Balance Changes:");
                        for i in 0..meta.pre_balances.len().min(meta.post_balances.len()) {
                            let pre = meta.pre_balances[i];
                            let post = meta.post_balances[i];
                            if pre != post {
                                if let Some(key) = message.account_keys.get(i) {
                                    if let Ok(pubkey) = Pubkey::try_from(key.as_slice()) {
                                        println!("  Account {}: {} -> {} (Δ{})",
                                            pubkey,
                                            pre, post, post.saturating_sub(pre));
                                    } else {
                                        println!("  Account {}: {} -> {} (Δ{})",
                                            hex::encode(key),
                                            pre, post, post.saturating_sub(pre));
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }
    }

    /// Creates a new GrpcStreamManager instance
    /// 
    /// # Arguments
    /// * `endpoint` - The gRPC endpoint URL
    /// * `x_token` - Authentication token for the endpoint
    async fn new(endpoint: &str, x_token: &str) -> Result<Arc<Mutex<GrpcStreamManager>>> {
        let interceptor = InterceptorXToken {
            x_token: Some(x_token.parse().map_err(|e: InvalidMetadataValue| anyhow::Error::from(e))?),
            x_request_snapshot: true,
        };

        let channel = Endpoint::from_shared(endpoint.to_string())?
            .connect_timeout(Duration::from_secs(10))
            .timeout(Duration::from_secs(10))
            .connect()
            .await
            .map_err(|e| anyhow::Error::from(e))?;

        let client = GeyserGrpcClient::new(
            HealthClient::with_interceptor(channel.clone(), interceptor.clone()),
            GeyserClient::with_interceptor(channel, interceptor),
        );

        Ok(Arc::new(Mutex::new(GrpcStreamManager {
            client,
            is_connected: false,
            reconnect_attempts: 0,
            max_reconnect_attempts: 10,
            reconnect_interval: Duration::from_secs(5),
        })))
    }

    /// Establishes connection and handles the subscription stream
    /// 
    /// # Arguments
    /// * `request` - The subscription request containing transaction filters and other parameters
    async fn connect(&mut self, request: SubscribeRequest) -> Result<()> {
        let request = request.clone();
        let (mut subscribe_tx, mut stream) = self.client.subscribe_with_request(Some(request.clone())).await?;

        self.is_connected = true;
        self.reconnect_attempts = 0;

        while let Some(message) = stream.next().await {
            match message {
                Ok(msg) => {
                    match msg.update_oneof {
                        Some(UpdateOneof::Transaction(transaction)) => {
                            self.handle_transaction_update(&transaction);
                        }
                        Some(UpdateOneof::Ping(_)) => {
                            subscribe_tx
                                .send(SubscribeRequest {
                                    ping: Some(SubscribeRequestPing { id: 1 }),
                                    ..Default::default()
                                })
                                .await?;
                        }
                        Some(UpdateOneof::Pong(_)) => {} // Ignore pong responses
                        _ => {
                            println!("Other update received: {:?}", msg);
                        }
                    }
                }
                Err(err) => {
                    error!("Error: {:?}", err);
                    self.is_connected = false;
                    Box::pin(self.reconnect(request.clone())).await?;
                    break;
                }
            }
        }

        Ok(())
    }

    /// Attempts to reconnect when the connection is lost
    /// 
    /// # Arguments
    /// * `request` - The original subscription request to reestablish the connection
    async fn reconnect(&mut self, request: SubscribeRequest) -> Result<()> {
        if self.reconnect_attempts >= self.max_reconnect_attempts {
            println!("Max reconnection attempts reached");
            return Ok(());
        }

        self.reconnect_attempts += 1;
        println!("Reconnecting... Attempt {}", self.reconnect_attempts);

        let backoff = self.reconnect_interval * std::cmp::min(self.reconnect_attempts, 5);
        tokio::time::sleep(backoff).await;

        Box::pin(self.connect(request)).await
    }
}

#[tokio::main]
async fn main() -> Result<()> {
    // Initialize gRPC stream manager
    let manager = GrpcStreamManager::new(
        "your-grpc-url:2053",
        "your-x-token",
    ).await?;

    let mut manager_lock = manager.lock().await;

    // Create subscription request for token program transactions
    let request = SubscribeRequest {
        transactions: HashMap::from_iter(vec![(
            "transactions".to_string(),
            SubscribeRequestFilterTransactions {
                vote: Some(false),
                failed: Some(false),
                signature: None,
                account_include: vec!["TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA".to_string()],
                account_exclude: vec![],
                account_required: vec![],
            },
        )]),
        commitment: Some(CommitmentLevel::Confirmed as i32),
        ..Default::default()
    };

    println!("Starting subscription for Token Program transactions");
    
    // Start the subscription
    let result = manager_lock.connect(request).await;
    if let Err(e) = &result {
        println!("Subscription error: {:?}", e);
    }
    result?;

    Ok(())
}

Go Implementation

package main

import (
	"context"
	"crypto/tls"
	"fmt"
	"log"
	"sync"
	"time"

	"github.com/mr-tron/base58"
	pb "github.com/rpcpool/yellowstone-grpc/examples/golang/proto"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials"
	"google.golang.org/grpc/metadata"
)

type GrpcStreamManager struct {
	mu                   sync.Mutex
	conn                 *grpc.ClientConn
	client               pb.GeyserClient
	stream               pb.Geyser_SubscribeClient
	isConnected          bool
	reconnectAttempts    int
	maxReconnectAttempts int
	reconnectInterval    time.Duration
	dataHandler          func(*pb.SubscribeUpdate)
	xToken               string
}

func NewGrpcStreamManager(endpoint string, xToken string, dataHandler func(*pb.SubscribeUpdate)) (*GrpcStreamManager, error) {
	// Create gRPC connection with interceptor for x-token
	ctx := metadata.NewOutgoingContext(
		context.Background(),
		metadata.New(map[string]string{"x-token": xToken}),
	)

	// Configure TLS
	config := &tls.Config{
		InsecureSkipVerify: true,
	}

	conn, err := grpc.DialContext(ctx, endpoint,
		grpc.WithTransportCredentials(credentials.NewTLS(config)),
		grpc.WithInitialWindowSize(1<<30),
		grpc.WithInitialConnWindowSize(1<<30),
		grpc.WithDefaultCallOptions(
			grpc.MaxCallSendMsgSize(64*1024*1024),
			grpc.MaxCallRecvMsgSize(64*1024*1024),
		),
	)
	if err != nil {
		return nil, fmt.Errorf("failed to connect: %v", err)
	}

	return &GrpcStreamManager{
		conn:                 conn,
		client:               pb.NewGeyserClient(conn),
		isConnected:          false,
		reconnectAttempts:    0,
		maxReconnectAttempts: 10,
		reconnectInterval:    5 * time.Second,
		dataHandler:          dataHandler,
		xToken:               xToken,
	}, nil
}

func (m *GrpcStreamManager) Connect(ctx context.Context, req *pb.SubscribeRequest) error {
	m.mu.Lock()
	defer m.mu.Unlock()

	log.Println("Attempting to connect...")

	// Add x-token to context
	ctx = metadata.NewOutgoingContext(
		ctx,
		metadata.New(map[string]string{"x-token": m.xToken}),
	)

	stream, err := m.client.Subscribe(ctx)
	if err != nil {
		log.Printf("Failed to subscribe: %v", err)
		return m.reconnect(ctx, req)
	}

	if err := stream.Send(req); err != nil {
		log.Printf("Failed to send request: %v", err)
		return m.reconnect(ctx, req)
	}

	m.stream = stream
	m.isConnected = true
	m.reconnectAttempts = 0
	log.Println("Connection established")

	// Start ping goroutine
	go func() {
		ticker := time.NewTicker(30 * time.Second)
		defer ticker.Stop()

		for {
			select {
			case <-ctx.Done():
				return
			case <-ticker.C:
				if !m.isConnected {
					return
				}
				pingReq := &pb.SubscribeRequest{
					Ping: &pb.SubscribeRequestPing{Id: 1},
				}
				if err := m.stream.Send(pingReq); err != nil {
					log.Printf("Ping failed: %v", err)
					m.handleDisconnect(ctx, req)
					return
				}
			}
		}
	}()

	// Process updates
	go func() {
		for {
			update, err := m.stream.Recv()
			if err != nil {
				log.Printf("Stream error: %v", err)
				m.handleDisconnect(ctx, req)
				return
			}

			m.dataHandler(update)
		}
	}()

	return nil
}

func (m *GrpcStreamManager) reconnect(ctx context.Context, req *pb.SubscribeRequest) error {
	if m.reconnectAttempts >= m.maxReconnectAttempts {
		return fmt.Errorf("max reconnection attempts reached")
	}

	m.reconnectAttempts++
	log.Printf("Reconnecting... Attempt %d", m.reconnectAttempts)

	backoff := m.reconnectInterval * time.Duration(min(m.reconnectAttempts, 5))
	time.Sleep(backoff)

	return m.Connect(ctx, req)
}

func (m *GrpcStreamManager) handleDisconnect(ctx context.Context, req *pb.SubscribeRequest) {
	m.mu.Lock()
	defer m.mu.Unlock()

	if !m.isConnected {
		return
	}

	m.isConnected = false
	if err := m.reconnect(ctx, req); err != nil {
		log.Printf("Failed to reconnect: %v", err)
	}
}

func (m *GrpcStreamManager) Close() error {
	m.mu.Lock()
	defer m.mu.Unlock()

	m.isConnected = false
	if m.conn != nil {
		return m.conn.Close()
	}
	return nil
}

func min(a, b int) int {
	if a < b {
		return a
	}
	return b
}

func handleAccountUpdate(update *pb.SubscribeUpdate) {
	if tx := update.GetTransaction(); tx != nil {
		log.Printf("Transaction Update:\n")
		log.Printf("  Slot: %d\n", tx.GetSlot())

		txInfo := tx.GetTransaction()
		if txInfo != nil {
			// Print signature if available
			if len(txInfo.GetSignature()) > 0 {
				log.Printf("  Signature: %s\n", base58.Encode(txInfo.GetSignature()))
			}

			// Print if it's a vote transaction
			log.Printf("  Is Vote: %v\n", txInfo.GetIsVote())

			// Print transaction index
			log.Printf("  Transaction Index: %d\n", txInfo.GetIndex())

			// Print transaction details
			if tx := txInfo.GetTransaction(); tx != nil {
				if msg := tx.GetMessage(); msg != nil {
					if accounts := msg.GetAccountKeys(); len(accounts) > 0 {
						log.Printf("  Account Keys:\n")
						for _, acc := range accounts {
							if len(acc) > 0 {
								log.Printf("    - %s\n", base58.Encode(acc))
							}
						}
					}
				}
			}

			// Print status and metadata
			if meta := txInfo.GetMeta(); meta != nil {
				log.Printf("  Status: %v\n", meta.GetErr() == nil)
				log.Printf("  Fee: %d\n", meta.GetFee())

				// Print log messages if any
				if len(meta.GetLogMessages()) > 0 {
					log.Printf("  Log Messages:\n")
					for _, msg := range meta.GetLogMessages() {
						log.Printf("    - %s\n", msg)
					}
				}
			}
		}
	}
}

func boolPtr(b bool) *bool {
	return &b
}

func main() {
	ctx := context.Background()

	// Create manager with data handler and x-token
	manager, err := NewGrpcStreamManager(
		"your-grpc-url:2053",
        	"your-x-token",
		handleAccountUpdate,
	)
	if err != nil {
		log.Fatal(err)
	}
	defer manager.Close()

	// Create subscription request for token program transactions
	transactions := make(map[string]*pb.SubscribeRequestFilterTransactions)
	transactions["transactions"] = &pb.SubscribeRequestFilterTransactions{
		Vote:            boolPtr(false),
		Failed:          boolPtr(false),
		AccountInclude:  []string{"TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"},
		AccountExclude:  []string{},
		AccountRequired: []string{},
	}

	commitment := pb.CommitmentLevel_CONFIRMED
	req := &pb.SubscribeRequest{
		Transactions: transactions,
		Commitment:   &commitment,
	}

	// Connect and handle updates
	if err := manager.Connect(ctx, req); err != nil {
		log.Fatal(err)
	}

	// Keep the main goroutine running
	select {}
}

Last updated

Was this helpful?