Slot and Block Monitoring

Monitor Solana slots and blocks with transaction details.

Slot and Block Monitoring with Yellowstone gRPC

This guide demonstrates how to use Yellowstone gRPC to monitor slots and blocks on Solana. You'll learn how to track slot updates, block creation, and block metadata with reliable reconnection handling.

TypeScript Implementation

import Client, { CommitmentLevel, SubscribeRequest } from "@triton-one/yellowstone-grpc";
import * as bs58 from 'bs58';

class GrpcStreamManager {
    private client: Client;
    private stream: any;
    private isConnected: boolean = false;
    private reconnectAttempts: number = 0;
    private readonly maxReconnectAttempts: number = 10;
    private readonly reconnectInterval: number = 5000; // 5 seconds
    private readonly dataHandler: (data: any) => void;

    constructor(
        endpoint: string,
        authToken: string,
        dataHandler: (data: any) => void
    ) {
        this.client = new Client(
            endpoint,
            authToken,
            { "grpc.max_receive_message_length": 64 * 1024 * 1024 }
        );
        this.dataHandler = dataHandler;
    }

    async connect(subscribeRequest: SubscribeRequest): Promise<void> {
        try {
            this.stream = await this.client.subscribe();
            this.isConnected = true;
            this.reconnectAttempts = 0;

            this.stream.on("data", this.handleData.bind(this));
            this.stream.on("error", this.handleError.bind(this));
            this.stream.on("end", () => this.handleDisconnect(subscribeRequest));
            this.stream.on("close", () => this.handleDisconnect(subscribeRequest));

            await this.write(subscribeRequest);
            this.startPing();
        } catch (error) {
            console.error("Connection error:", error);
            await this.reconnect(subscribeRequest);
        }
    }

    private async write(req: SubscribeRequest): Promise<void> {
        return new Promise((resolve, reject) => {
            this.stream.write(req, (err: any) => err ? reject(err) : resolve());
        });
    }

    private async reconnect(subscribeRequest: SubscribeRequest): Promise<void> {
        if (this.reconnectAttempts >= this.maxReconnectAttempts) {
            console.error("Max reconnection attempts reached");
            return;
        }

        this.reconnectAttempts++;
        console.log(`Reconnecting... Attempt ${this.reconnectAttempts}`);

        setTimeout(async () => {
            try {
                await this.connect(subscribeRequest);
            } catch (error) {
                console.error("Reconnection failed:", error);
                await this.reconnect(subscribeRequest);
            }
        }, this.reconnectInterval * Math.min(this.reconnectAttempts, 5));
    }

    private startPing(): void {
        setInterval(() => {
            if (this.isConnected) {
                this.write({
                    ping: { id: 1 },
                    accounts: {},
                    accountsDataSlice: [],
                    transactions: {},
                    blocks: {},
                    blocksMeta: {},
                    entry: {},
                    slots: {},
                    transactionsStatus: {},
                }).catch(console.error);
            }
        }, 30000);
    }

    private handleData(data: any): void {
        try {
            const processed = this.processBuffers(data);
            this.dataHandler(processed);
        } catch (error) {
            console.error("Error processing data:", error);
        }
    }

    private handleError(error: any): void {
        console.error("Stream error:", error);
        this.isConnected = false;
    }

    private handleDisconnect(subscribeRequest: SubscribeRequest): void {
        console.log("Stream disconnected");
        this.isConnected = false;
        this.reconnect(subscribeRequest);
    }

    private processBuffers(obj: any): any {
        if (!obj) return obj;
        if (Buffer.isBuffer(obj) || obj instanceof Uint8Array) {
            return bs58.default.encode(obj);
        }
        if (Array.isArray(obj)) {
            return obj.map(item => this.processBuffers(item));
        }
        if (typeof obj === 'object') {
            return Object.fromEntries(
                Object.entries(obj).map(([k, v]) => [k, this.processBuffers(v)])
            );
        }
        return obj;
    }
}

// Block monitoring implementation
async function monitorBlocks() {
    const manager = new GrpcStreamManager(
         "your-grpc-url:2053",
         "your-x-token",,
        handleBlockUpdate
    );

    const subscribeRequest: SubscribeRequest = {
        blocks: {
            blockSubscribe: {
                accountInclude: [
                    "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA",  // Token Program
                    "11111111111111111111111111111111",              // System Program
                    "So11111111111111111111111111111111111111112"   // Wrapped SOL
                ],
                includeTransactions: true,
                includeAccounts: true,
                includeEntries: false
            }
        },
        accounts: {},
        accountsDataSlice: [],
        slots: {},
        transactions: {},
        blocksMeta: {},
        entry: {},
        transactionsStatus: {},
        commitment: CommitmentLevel.CONFIRMED,
    };

    console.log('Starting block monitoring...');
    console.log('Monitoring blocks for Token Program, System Program, and Wrapped SOL activities...');
    await manager.connect(subscribeRequest);
}

function handleBlockUpdate(data: any): void {
    if (data?.block) {
        const block = data.block;
        console.log('\n=== Block Details ===');
        console.log(`Slot: ${block.slot}`);
        console.log(`Parent Slot: ${block.parentSlot}`);
        console.log(`Blockhash: ${block.blockhash}`);
        console.log(`Previous Blockhash: ${block.previousBlockhash}`);
        
        if (block.transactions?.length > 0) {
            console.log('\n=== Block Transactions ===');
            console.log(`Transaction Count: ${block.transactions.length}`);
            block.transactions.forEach((tx: any, index: number) => {
                if (tx.transaction?.signatures?.[0]) {
                    console.log(`\nTransaction ${index + 1}:`);
                    console.log(`  Signature: ${tx.transaction.signatures[0]}`);
                    
                    if (tx.meta?.err) {
                        console.log(`  Status: Failed`);
                        console.log(`  Error: ${JSON.stringify(tx.meta.err)}`);
                    } else {
                        console.log(`  Status: Success`);
                    }

                    if (tx.meta) {
                        console.log(`  Fee: ${tx.meta.fee} lamports`);
                        console.log(`  Compute Units: ${tx.meta.computeUnitsConsumed || 'N/A'}`);
                    }
                }
            });
        }

        if (block.rewards?.length > 0) {
            console.log('\n=== Block Rewards ===');
            block.rewards.forEach((reward: any) => {
                console.log(`  Account: ${reward.pubkey}`);
                console.log(`  Lamports: ${reward.lamports}`);
                console.log(`  Post Balance: ${reward.postBalance}`);
                console.log(`  Reward Type: ${reward.rewardType}`);
                if (reward.commission !== undefined) {
                    console.log(`  Commission: ${reward.commission}`);
                }
                console.log('  ---');
            });
        }

        console.log('\n' + '='.repeat(50) + '\n');
    }
}

// Slot monitoring implementation
async function monitorSlots() {
    const manager = new GrpcStreamManager(
        "your-grpc-url:2053",
        "your-x-token",
        handleSlotUpdate
    );

    const subscribeRequest: SubscribeRequest = {
        slots: {
            slotSubscribe: {}
        },
        accounts: {},
        accountsDataSlice: [],
        blocks: {},
        transactions: {},
        blocksMeta: {},
        entry: {},
        transactionsStatus: {},
        commitment: CommitmentLevel.CONFIRMED,
    };

    console.log('Starting slot monitoring...');
    await manager.connect(subscribeRequest);
}

function handleSlotUpdate(data: any): void {
    if (data?.slot) {
        const slotInfo = data.slot;
        console.log('\n=== Slot Update ===');
        console.log(`Slot: ${slotInfo.slot}`);
        
        if (slotInfo.parent !== undefined) {
            console.log(`Parent Slot: ${slotInfo.parent}`);
        }
        
        if (slotInfo.status) {
            console.log('Status:');
            if (slotInfo.status.confirmed !== undefined) {
                console.log(`  Confirmed: ${slotInfo.status.confirmed}`);
            }
            if (slotInfo.status.processed !== undefined) {
                console.log(`  Processed: ${slotInfo.status.processed}`);
            }
            if (slotInfo.status.finalized !== undefined) {
                console.log(`  Finalized: ${slotInfo.status.finalized}`);
            }
        }

        if (slotInfo.timestamp) {
            console.log(`Timestamp: ${new Date(slotInfo.timestamp).toISOString()}`);
        }

        console.log('\n' + '='.repeat(50) + '\n');
    }
}

// Choose which monitoring to start
monitorBlocks().catch(console.error);
// monitorSlots().catch(console.error);

Subscription Options

1. Monitor All Slots

const subscribeRequest: SubscribeRequest = {
    slots: {
        filterByCommitment: true,
    },
    commitment: CommitmentLevel.CONFIRMED,
};

2. Monitor Blocks with Account Filtering

const subscribeRequest: SubscribeRequest = {
    blocks: {
        accountInclude: ["TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"],
        includeTransactions: true,
        includeAccounts: true,
    },
    commitment: CommitmentLevel.CONFIRMED,
};

3. Monitor Block Metadata Only

const subscribeRequest: SubscribeRequest = {
    blocksMeta: {
        blockMetadata: {}
    },
    commitment: CommitmentLevel.CONFIRMED,
};

Data Structures

  1. Slot Update:

    • Slot number

    • Parent slot

    • Commitment status

  2. Block Update:

    • Slot number

    • Blockhash

    • Parent information

    • Transaction count

    • Account updates

    • Block entries

    • Rewards

    • Timestamp

    • Block height

  3. Block Metadata:

    • Basic block information

    • Transaction count

    • Entry count

    • Parent information

Rust Implementation

use {
    anyhow::Result, futures::{sink::SinkExt, stream::StreamExt}, hex, log::error, solana_sdk::pubkey::Pubkey, std::{collections::HashMap, sync::Arc, time::Duration}, tokio::sync::Mutex, tonic::{metadata::errors::InvalidMetadataValue, transport::Endpoint}, tonic_health::pb::health_client::HealthClient, yellowstone_grpc_client::{GeyserGrpcClient, InterceptorXToken}, yellowstone_grpc_proto::{
        geyser::{
            geyser_client::GeyserClient, subscribe_update::UpdateOneof, SubscribeRequest, SubscribeRequestFilterBlocks, SubscribeRequestFilterSlots, SubscribeUpdateBlock, SubscribeUpdateSlot, CommitmentLevel
        },
        prelude::SubscribeRequestPing,
    }
};

/// Manager for handling gRPC stream connections and updates
struct GrpcStreamManager {
    client: GeyserGrpcClient<InterceptorXToken>,
    is_connected: bool,
    reconnect_attempts: u32,
    max_reconnect_attempts: u32,
    reconnect_interval: Duration,
}

impl GrpcStreamManager {
    /// Creates a new GrpcStreamManager instance
    async fn new(endpoint: &str, x_token: &str) -> Result<Arc<Mutex<Self>>> {
        let interceptor = InterceptorXToken {
            x_token: Some(x_token.parse().map_err(|e: InvalidMetadataValue| anyhow::Error::from(e))?),
            x_request_snapshot: true,
        };

        let channel = Endpoint::from_shared(endpoint.to_string())?
            .connect_timeout(Duration::from_secs(10))
            .timeout(Duration::from_secs(10))
            .connect()
            .await
            .map_err(|e| anyhow::Error::from(e))?;

        let client = GeyserGrpcClient::new(
            HealthClient::with_interceptor(channel.clone(), interceptor.clone()),
            GeyserClient::with_interceptor(channel, interceptor),
        );

        Ok(Arc::new(Mutex::new(Self {
            client,
            is_connected: false,
            reconnect_attempts: 0,
            max_reconnect_attempts: 10,
            reconnect_interval: Duration::from_secs(5),
        })))
    }

    /// Handles block update messages from the gRPC stream
    fn handle_block_update(&self, block: &SubscribeUpdateBlock) {
        println!("\n=== Block Details ===");
        println!("Slot: {}", block.slot);
        println!("Parent Slot: {}", block.parent_slot);
        println!("Blockhash: {}", hex::encode(&block.blockhash));
        println!("Previous Blockhash: {}", hex::encode(&block.parent_blockhash));

        if !block.transactions.is_empty() {
            println!("\n=== Block Transactions ===");
            println!("Transaction Count: {}", block.transactions.len());

            for (index, tx) in block.transactions.iter().enumerate() {
                if let Some(transaction) = &tx.transaction {
                    println!("\nTransaction {}:", index + 1);
                    println!("  Signature: {}", hex::encode(&transaction.signatures[0]));

                    if let Some(meta) = &tx.meta {
                        println!("  Status: {}", if meta.err.is_none() { "Success" } else { "Failed" });
                        if let Some(err) = &meta.err {
                            println!("  Error: {:?}", err);
                        }
                        println!("  Fee: {} lamports", meta.fee);
                        println!("  Compute Units: {:?}", meta.compute_units_consumed);
                    }
                }
            }
        }

        if let Some(rewards) = &block.rewards {
            if !rewards.rewards.is_empty() {
                println!("\n=== Block Rewards ===");
                for reward in &rewards.rewards {
                    println!("  Account: {}", hex::encode(&reward.pubkey));
                    println!("  Lamports: {}", reward.lamports);
                    println!("  Post Balance: {}", reward.post_balance);
                    println!("  Reward Type: {}", reward.reward_type);
                    println!("  Commission: {}", reward.commission);
                    println!("  ---");
                }
            }
        }
        println!("\n{}\n", "=".repeat(50));
    }

    /// Handles slot update messages from the gRPC stream
    fn handle_slot_update(&self, slot: &SubscribeUpdateSlot) {
        println!("\n=== Slot Update ===");
        println!("Slot: {}", slot.slot);
        
        if let Some(parent) = slot.parent {
            println!("Parent Slot: {}", parent);
        }

        println!("Status: {}", slot.status);

        println!("\n{}\n", "=".repeat(50));
    }

    /// Establishes connection and handles the subscription stream
    async fn connect(&mut self, request: SubscribeRequest) -> Result<()> {
        let request = request.clone();
        let (mut subscribe_tx, mut stream) = self.client.subscribe_with_request(Some(request.clone())).await?;

        self.is_connected = true;
        self.reconnect_attempts = 0;

        while let Some(message) = stream.next().await {
            match message {
                Ok(msg) => {
                    match msg.update_oneof {
                        Some(UpdateOneof::Block(block)) => {
                            self.handle_block_update(&block);
                        }
                        Some(UpdateOneof::Slot(slot)) => {
                            self.handle_slot_update(&slot);
                        }
                        Some(UpdateOneof::Ping(_)) => {
                            subscribe_tx
                                .send(SubscribeRequest {
                                    ping: Some(SubscribeRequestPing { id: 1 }),
                                    ..Default::default()
                                })
                                .await?;
                        }
                        Some(UpdateOneof::Pong(_)) => {} // Ignore pong responses
                        _ => {
                            println!("Other update received: {:?}", msg);
                        }
                    }
                }
                Err(err) => {
                    error!("Error: {:?}", err);
                    self.is_connected = false;
                    Box::pin(self.reconnect(request.clone())).await?;
                    break;
                }
            }
        }

        Ok(())
    }

    /// Attempts to reconnect when the connection is lost
    async fn reconnect(&mut self, request: SubscribeRequest) -> Result<()> {
        if self.reconnect_attempts >= self.max_reconnect_attempts {
            println!("Max reconnection attempts reached");
            return Ok(());
        }

        self.reconnect_attempts += 1;
        println!("Reconnecting... Attempt {}", self.reconnect_attempts);

        let backoff = self.reconnect_interval * std::cmp::min(self.reconnect_attempts, 5);
        tokio::time::sleep(backoff).await;

        Box::pin(self.connect(request)).await
    }
}

#[tokio::main]
async fn main() -> Result<()> {
    // Initialize gRPC stream manager
    let manager = GrpcStreamManager::new(
        "your-grpc-url:2053",
        "your-x-token",
    ).await?;

    let mut manager_lock = manager.lock().await;

    // Create subscription request for blocks and slots
    let request = SubscribeRequest {
        blocks: HashMap::from_iter(vec![(
            "blocks".to_string(),
            SubscribeRequestFilterBlocks {
                account_include: vec![
                    "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA".to_string(),  // Token Program
                    "11111111111111111111111111111111".to_string(),              // System Program
                    "So11111111111111111111111111111111111111112".to_string(),   // Wrapped SOL
                ],
                include_transactions: Some(true),
                include_accounts: Some(true),
                include_entries: Some(false),
            },
        )]),
        slots: HashMap::from_iter(vec![(
            "slots".to_string(),
            SubscribeRequestFilterSlots {
                filter_by_commitment: Some(true),
            },
        )]),
        commitment: Some(CommitmentLevel::Confirmed as i32),
        ..Default::default()
    };

    println!("Starting block and slot monitoring...");
    println!("Monitoring blocks for Token Program, System Program, and Wrapped SOL activities...");
    
    // Start the subscription
    let result = manager_lock.connect(request).await;
    if let Err(e) = &result {
        println!("Subscription error: {:?}", e);
    }
    result?;

    Ok(())
}

Go Implementation

package main

import (
	"context"
	"crypto/tls"
	"fmt"
	"log"
	"strings"
	"sync"
	"time"

	"github.com/mr-tron/base58"
	pb "github.com/rpcpool/yellowstone-grpc/examples/golang/proto"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials"
	"google.golang.org/grpc/metadata"
)

type GrpcStreamManager struct {
	mu                   sync.Mutex
	conn                 *grpc.ClientConn
	client               pb.GeyserClient
	stream               pb.Geyser_SubscribeClient
	isConnected          bool
	reconnectAttempts    int
	maxReconnectAttempts int
	reconnectInterval    time.Duration
	dataHandler          func(*pb.SubscribeUpdate)
	xToken               string
}

func NewGrpcStreamManager(endpoint string, xToken string, dataHandler func(*pb.SubscribeUpdate)) (*GrpcStreamManager, error) {
	// Create gRPC connection with interceptor for x-token
	ctx := metadata.NewOutgoingContext(
		context.Background(),
		metadata.New(map[string]string{"x-token": xToken}),
	)

	// Configure TLS
	config := &tls.Config{
		InsecureSkipVerify: true,
	}

	conn, err := grpc.DialContext(ctx, endpoint,
		grpc.WithTransportCredentials(credentials.NewTLS(config)),
		grpc.WithInitialWindowSize(1<<30),
		grpc.WithInitialConnWindowSize(1<<30),
		grpc.WithDefaultCallOptions(
			grpc.MaxCallSendMsgSize(64*1024*1024),
			grpc.MaxCallRecvMsgSize(64*1024*1024),
		),
	)
	if err != nil {
		return nil, fmt.Errorf("failed to connect: %v", err)
	}

	return &GrpcStreamManager{
		conn:                 conn,
		client:               pb.NewGeyserClient(conn),
		isConnected:          false,
		reconnectAttempts:    0,
		maxReconnectAttempts: 10,
		reconnectInterval:    5 * time.Second,
		dataHandler:          dataHandler,
		xToken:               xToken,
	}, nil
}

func (m *GrpcStreamManager) Connect(ctx context.Context, req *pb.SubscribeRequest) error {
	m.mu.Lock()
	defer m.mu.Unlock()

	log.Println("Attempting to connect...")

	// Add x-token to context
	ctx = metadata.NewOutgoingContext(
		ctx,
		metadata.New(map[string]string{"x-token": m.xToken}),
	)

	stream, err := m.client.Subscribe(ctx)
	if err != nil {
		log.Printf("Failed to subscribe: %v", err)
		return m.reconnect(ctx, req)
	}

	if err := stream.Send(req); err != nil {
		log.Printf("Failed to send request: %v", err)
		return m.reconnect(ctx, req)
	}

	m.stream = stream
	m.isConnected = true
	m.reconnectAttempts = 0
	log.Println("Connection established")

	// Start ping goroutine
	go func() {
		ticker := time.NewTicker(30 * time.Second)
		defer ticker.Stop()

		for {
			select {
			case <-ctx.Done():
				return
			case <-ticker.C:
				if !m.isConnected {
					return
				}
				pingReq := &pb.SubscribeRequest{
					Ping: &pb.SubscribeRequestPing{Id: 1},
				}
				if err := m.stream.Send(pingReq); err != nil {
					log.Printf("Ping failed: %v", err)
					m.handleDisconnect(ctx, req)
					return
				}
			}
		}
	}()

	// Process updates
	go func() {
		for {
			update, err := m.stream.Recv()
			if err != nil {
				log.Printf("Stream error: %v", err)
				m.handleDisconnect(ctx, req)
				return
			}

			m.dataHandler(update)
		}
	}()

	return nil
}

func (m *GrpcStreamManager) reconnect(ctx context.Context, req *pb.SubscribeRequest) error {
	if m.reconnectAttempts >= m.maxReconnectAttempts {
		return fmt.Errorf("max reconnection attempts reached")
	}

	m.reconnectAttempts++
	log.Printf("Reconnecting... Attempt %d", m.reconnectAttempts)

	backoff := m.reconnectInterval * time.Duration(min(m.reconnectAttempts, 5))
	time.Sleep(backoff)

	return m.Connect(ctx, req)
}

func (m *GrpcStreamManager) handleDisconnect(ctx context.Context, req *pb.SubscribeRequest) {
	m.mu.Lock()
	defer m.mu.Unlock()

	if !m.isConnected {
		return
	}

	m.isConnected = false
	if err := m.reconnect(ctx, req); err != nil {
		log.Printf("Failed to reconnect: %v", err)
	}
}

func (m *GrpcStreamManager) Close() error {
	m.mu.Lock()
	defer m.mu.Unlock()

	m.isConnected = false
	if m.conn != nil {
		return m.conn.Close()
	}
	return nil
}

func min(a, b int) int {
	if a < b {
		return a
	}
	return b
}

func handleAccountUpdate(update *pb.SubscribeUpdate) {
	switch u := update.GetUpdateOneof().(type) {
	case *pb.SubscribeUpdate_Block:
		handleBlockUpdate(u.Block)
	case *pb.SubscribeUpdate_Slot:
		handleSlotUpdate(u.Slot)
	case *pb.SubscribeUpdate_Transaction:
		handleTransactionUpdate(u.Transaction)
	}
}

func handleBlockUpdate(block *pb.SubscribeUpdateBlock) {
	log.Printf("\n=== Block Details ===\n")
	log.Printf("Slot: %d\n", block.GetSlot())
	log.Printf("Parent Slot: %d\n", block.GetParentSlot())
	log.Printf("Blockhash: %s\n", block.GetBlockhash())
	log.Printf("Previous Blockhash: %s\n", block.GetParentBlockhash())

	if txs := block.GetTransactions(); len(txs) > 0 {
		log.Printf("\n=== Block Transactions ===\n")
		log.Printf("Transaction Count: %d\n", len(txs))

		for i, tx := range txs {
			if txInfo := tx.GetTransaction(); txInfo != nil {
				log.Printf("\nTransaction %d:\n", i+1)
				if signatures := txInfo.GetSignatures(); len(signatures) > 0 {
					log.Printf("  Signature: %s\n", base58.Encode(signatures[0]))
				}

				if meta := tx.GetMeta(); meta != nil {
					log.Printf("  Status: %v\n", meta.GetErr() == nil)
					log.Printf("  Fee: %d lamports\n", meta.GetFee())
				}
			}
		}
	}

	if rewards := block.GetRewards(); rewards != nil {
		if rewardList := rewards.GetRewards(); len(rewardList) > 0 {
			log.Printf("\n=== Block Rewards ===\n")
			for _, reward := range rewardList {
				pubkey := []byte(reward.GetPubkey())
				log.Printf("  Account: %s\n", base58.Encode(pubkey))
				log.Printf("  Lamports: %d\n", reward.GetLamports())
				log.Printf("  Post Balance: %d\n", reward.GetPostBalance())
				log.Printf("  Reward Type: %d\n", reward.GetRewardType())
				log.Printf("  Commission: %d\n", reward.GetCommission())
				log.Printf("  ---\n")
			}
		}
	}
	log.Printf("\n%s\n", strings.Repeat("=", 50))
}

func handleSlotUpdate(slot *pb.SubscribeUpdateSlot) {
	log.Printf("\n=== Slot Update ===\n")
	log.Printf("Slot: %d\n", slot.GetSlot())

	if parent := slot.GetParent(); parent > 0 {
		log.Printf("Parent Slot: %d\n", parent)
	}

	log.Printf("Status: %v\n", slot.GetStatus())

	if deadError := slot.GetDeadError(); deadError != "" {
		log.Printf("Dead Error: %s\n", deadError)
	}

	log.Printf("\n%s\n", strings.Repeat("=", 50))
}

func handleTransactionUpdate(tx *pb.SubscribeUpdateTransaction) {
	log.Printf("Transaction Update:\n")
	log.Printf("  Slot: %d\n", tx.GetSlot())

	txInfo := tx.GetTransaction()
	if txInfo != nil {
		// Print signature if available
		if len(txInfo.GetSignature()) > 0 {
			log.Printf("  Signature: %s\n", base58.Encode(txInfo.GetSignature()))
		}

		// Print if it's a vote transaction
		log.Printf("  Is Vote: %v\n", txInfo.GetIsVote())

		// Print transaction index
		log.Printf("  Transaction Index: %d\n", txInfo.GetIndex())

		// Print transaction details
		if tx := txInfo.GetTransaction(); tx != nil {
			if msg := tx.GetMessage(); msg != nil {
				if accounts := msg.GetAccountKeys(); len(accounts) > 0 {
					log.Printf("  Account Keys:\n")
					for _, acc := range accounts {
						if len(acc) > 0 {
							log.Printf("    - %s\n", base58.Encode(acc))
						}
					}
				}
			}
		}

		// Print status and metadata
		if meta := txInfo.GetMeta(); meta != nil {
			log.Printf("  Status: %v\n", meta.GetErr() == nil)
			log.Printf("  Fee: %d\n", meta.GetFee())

			// Print log messages if any
			if len(meta.GetLogMessages()) > 0 {
				log.Printf("  Log Messages:\n")
				for _, msg := range meta.GetLogMessages() {
					log.Printf("    - %s\n", msg)
				}
			}
		}
	}
}

func boolPtr(b bool) *bool {
	return &b
}

func main() {
	ctx := context.Background()

	// Create manager with data handler and x-token
	manager, err := NewGrpcStreamManager(
		"your-grpc-url:2053",
        	"your-x-token",
		handleAccountUpdate,
	)
	if err != nil {
		log.Fatal(err)
	}
	defer manager.Close()

	// Create subscription request for blocks and slots
	blocks := make(map[string]*pb.SubscribeRequestFilterBlocks)
	blocks["blocks"] = &pb.SubscribeRequestFilterBlocks{
		AccountInclude: []string{
			"TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA", // Token Program
			"11111111111111111111111111111111",            // System Program
			"So11111111111111111111111111111111111111112", // Wrapped SOL
		},
		IncludeTransactions: boolPtr(true),
		IncludeAccounts:     boolPtr(true),
		IncludeEntries:      boolPtr(false),
	}

	slots := make(map[string]*pb.SubscribeRequestFilterSlots)
	slots["slots"] = &pb.SubscribeRequestFilterSlots{
		FilterByCommitment: boolPtr(true),
	}

	commitment := pb.CommitmentLevel_CONFIRMED
	req := &pb.SubscribeRequest{
		Blocks:     blocks,
		Slots:      slots,
		Commitment: &commitment,
	}

	log.Println("Starting block and slot monitoring...")
	log.Println("Monitoring blocks for Token Program, System Program, and Wrapped SOL activities...")

	// Connect and handle updates
	if err := manager.Connect(ctx, req); err != nil {
		log.Fatal(err)
	}

	// Keep the main goroutine running
	select {}
}

Last updated

Was this helpful?