Transaction Monitoring
Track Solana transactions in real-time with program filtering and detailed execution data.
Last updated
Was this helpful?
Track Solana transactions in real-time with program filtering and detailed execution data.
Last updated
Was this helpful?
This guide demonstrates to monitor transactions on Solana. You'll learn how to track transactions in real-time with filtering options and reliable reconnection handling.
import Client, { CommitmentLevel, SubscribeRequest } from "@triton-one/yellowstone-grpc";
import * as bs58 from 'bs58';
class GrpcStreamManager {
private client: Client;
private stream: any;
private isConnected: boolean = false;
private reconnectAttempts: number = 0;
private readonly maxReconnectAttempts: number = 10;
private readonly reconnectInterval: number = 5000; // 5 seconds
private readonly dataHandler: (data: any) => void;
constructor(
endpoint: string,
authToken: string,
dataHandler: (data: any) => void
) {
this.client = new Client(
endpoint,
authToken,
{ "grpc.max_receive_message_length": 64 * 1024 * 1024 }
);
this.dataHandler = dataHandler;
}
async connect(subscribeRequest: SubscribeRequest): Promise<void> {
try {
this.stream = await this.client.subscribe();
this.isConnected = true;
this.reconnectAttempts = 0;
this.stream.on("data", this.handleData.bind(this));
this.stream.on("error", this.handleError.bind(this));
this.stream.on("end", () => this.handleDisconnect(subscribeRequest));
this.stream.on("close", () => this.handleDisconnect(subscribeRequest));
await this.write(subscribeRequest);
this.startPing();
} catch (error) {
console.error("Connection error:", error);
await this.reconnect(subscribeRequest);
}
}
private async write(req: SubscribeRequest): Promise<void> {
return new Promise((resolve, reject) => {
this.stream.write(req, (err: any) => err ? reject(err) : resolve());
});
}
private async reconnect(subscribeRequest: SubscribeRequest): Promise<void> {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error("Max reconnection attempts reached");
return;
}
this.reconnectAttempts++;
console.log(`Reconnecting... Attempt ${this.reconnectAttempts}`);
setTimeout(async () => {
try {
await this.connect(subscribeRequest);
} catch (error) {
console.error("Reconnection failed:", error);
await this.reconnect(subscribeRequest);
}
}, this.reconnectInterval * Math.min(this.reconnectAttempts, 5));
}
private startPing(): void {
setInterval(() => {
if (this.isConnected) {
this.write({
ping: { id: 1 },
accounts: {},
accountsDataSlice: [],
transactions: {},
blocks: {},
blocksMeta: {},
entry: {},
slots: {},
transactionsStatus: {},
}).catch(console.error);
}
}, 30000);
}
private handleData(data: any): void {
try {
const processed = this.processBuffers(data);
this.dataHandler(processed);
} catch (error) {
console.error("Error processing data:", error);
}
}
private handleError(error: any): void {
console.error("Stream error:", error);
this.isConnected = false;
}
private handleDisconnect(subscribeRequest: SubscribeRequest): void {
console.log("Stream disconnected");
this.isConnected = false;
this.reconnect(subscribeRequest);
}
private processBuffers(obj: any): any {
if (!obj) return obj;
if (Buffer.isBuffer(obj) || obj instanceof Uint8Array) {
return bs58.default.encode(obj);
}
if (Array.isArray(obj)) {
return obj.map(item => this.processBuffers(item));
}
if (typeof obj === 'object') {
return Object.fromEntries(
Object.entries(obj).map(([k, v]) => [k, this.processBuffers(v)])
);
}
return obj;
}
}
// Transaction monitoring implementation
async function monitorTransactions() {
const manager = new GrpcStreamManager(
"your-grpc-url:2053",
"your-x-token",
handleTransactionUpdate
);
// Create subscription request for monitoring program transactions
const subscribeRequest: SubscribeRequest = {
transactions: {
client: {
accountInclude: ["TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"],
accountExclude: [],
accountRequired: [],
vote: false,
failed: false
}
},
commitment: CommitmentLevel.CONFIRMED,
accounts: {},
accountsDataSlice: [],
blocks: {},
blocksMeta: {},
entry: {},
slots: {},
transactionsStatus: {}
};
await manager.connect(subscribeRequest);
}
function handleTransactionUpdate(data: any): void {
// Check if we have transaction data in the correct structure
if (data?.transaction?.transaction) {
const txInfo = data.transaction.transaction;
console.log('\n=== Transaction Details ===');
console.log(`Signature: ${txInfo.signature}`);
console.log(`Slot: ${data.transaction.slot}`);
console.log(`Is Vote: ${txInfo.isVote}`);
if (txInfo.meta) {
console.log(`Fee: ${txInfo.meta.fee} lamports`);
console.log(`Compute Units: ${txInfo.meta.computeUnitsConsumed}`);
console.log(`Status: ${txInfo.meta.err ? 'Failed' : 'Success'}`);
// Token balance changes
if (txInfo.meta.preTokenBalances?.length > 0) {
console.log('\n=== Token Balance Changes ===');
txInfo.meta.preTokenBalances.forEach((preBalance: any, index: number) => {
const postBalance = txInfo.meta.postTokenBalances[index];
if (preBalance && postBalance) {
console.log(`Token Account: ${preBalance.mint}`);
console.log(` Pre Balance: ${preBalance.uiTokenAmount?.uiAmount}`);
console.log(` Post Balance: ${postBalance.uiTokenAmount?.uiAmount}`);
if (preBalance.owner) {
console.log(` Owner: ${preBalance.owner}`);
}
}
});
}
// Log messages
if (txInfo.meta.logMessages?.length > 0) {
console.log('\n=== Program Logs ===');
txInfo.meta.logMessages.forEach((log: string) => {
console.log(` ${log}`);
});
}
}
// Transaction instructions
if (txInfo.transaction?.message) {
const message = txInfo.transaction.message;
console.log('\n=== Account Keys ===');
if (Array.isArray(message.accountKeys)) {
message.accountKeys.forEach((key: Uint8Array, index: number) => {
try {
console.log(` ${index}: ${bs58.default.encode(key)}`);
} catch (error) {
console.log(` ${index}: [Unable to encode key]`);
}
});
}
// Instructions
// TODO: Add instructions processing logic
if (Array.isArray(message.instructions)) {
console.log('\n=== Instructions ===');
message.instructions.forEach((ix: any, index: number) => {
try {
console.log(`\nInstruction ${index + 1}:`);
console.log(` Program: ${bs58.default.encode(message.accountKeys[ix.programIdIndex])}`);
console.log(` Accounts: ${ix.accounts.map((idx: number) =>
bs58.default.encode(message.accountKeys[idx])
)}`);
if (ix.data) {
console.log(` Data: ${bs58.default.encode(ix.data)}`);
}
} catch (error) {
console.log(` [Unable to decode instruction ${index + 1}]`);
}
});
}
// Process inner instructions
// TODO: Add inner instructions processing logic
if (txInfo.meta?.innerInstructions?.length > 0) {
console.log('\n=== Inner Instructions ===');
txInfo.meta.innerInstructions.forEach((inner: any, index: number) => {
console.log(`\nInner Instruction Set ${index + 1}:`);
inner.instructions.forEach((ix: any, i: number) => {
try {
console.log(` Instruction ${i + 1}:`);
console.log(` Program: ${bs58.default.encode(message.accountKeys[ix.programIdIndex])}`);
console.log(` Accounts: ${ix.accounts.map((idx: number) =>
bs58.default.encode(message.accountKeys[idx])
)}`);
if (ix.data) {
console.log(` Data: ${bs58.default.encode(ix.data)}`);
}
} catch (error) {
console.log(` [Unable to decode instruction]`);
}
});
});
}
}
console.log('\n' + '='.repeat(50) + '\n');
}
}
// Start monitoring
monitorTransactions().catch(console.error);
const subscribeRequest: SubscribeRequest = {
transactions: {
accountInclude: ["TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"], // Token program
accountExclude: [],
accountRequired: [],
vote: false,
failed: false,
signature: undefined
},
commitment: CommitmentLevel.CONFIRMED,
};
const subscribeRequest: SubscribeRequest = {
transactions: {
accountInclude: [
"TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA", // Token program
"ATokenGPvbdGVxr1b2hvZbsiqW5xWH25efTNsLJA8knL", // Associated Token program
],
accountExclude: [],
accountRequired: [],
vote: false,
failed: true,
signature: undefined
},
commitment: CommitmentLevel.CONFIRMED,
};
const subscribeRequest: SubscribeRequest = {
transactions: {
accountInclude: [],
accountExclude: [],
accountRequired: [],
vote: true,
failed: true,
signature: undefined
},
commitment: CommitmentLevel.CONFIRMED,
};
Transaction Update:
Slot number (when transaction was processed)
Transaction signature (unique identifier)
Transaction message
Account keys (involved accounts)
Instructions (program calls and data)
Recent blockhash
Transaction metadata
Status (success/failure)
Fee amount
Compute units consumed
Log messages
Inner instructions
Balance changes
Instruction Details:
Program ID (program being called)
Account list (accounts used by instruction)
Instruction data (program-specific data)
Inner instructions (CPI calls)
use {
anyhow::Result,
futures::{stream::StreamExt, sink::SinkExt},
log::error,
std::{collections::HashMap, sync::Arc, time::Duration},
tokio::sync::Mutex,
tonic::{metadata::errors::InvalidMetadataValue, transport::Endpoint},
yellowstone_grpc_client::{GeyserGrpcClient, InterceptorXToken},
yellowstone_grpc_proto::{
geyser::{
geyser_client::GeyserClient,
SubscribeRequest,
SubscribeRequestFilterTransactions,
subscribe_update::UpdateOneof,
SubscribeUpdateTransaction,
},
prelude::{CommitmentLevel, SubscribeRequestPing},
},
tonic_health::pb::health_client::HealthClient,
solana_sdk::pubkey::Pubkey,
hex,
tokio::sync::mpsc,
};
/// Manager for handling gRPC stream connections and transaction updates
struct GrpcStreamManager {
client: GeyserGrpcClient<InterceptorXToken>,
is_connected: bool,
reconnect_attempts: u32,
max_reconnect_attempts: u32,
reconnect_interval: Duration,
}
impl GrpcStreamManager {
/// Handles transaction update messages from the gRPC stream
/// This function can be customized based on your requirements:
/// - Store transactions in a database
/// - Trigger specific actions based on transaction contents
/// - Filter for specific types of transactions
/// - Transform data into your required format
///
/// # Arguments
/// * `transaction_update` - The transaction update containing all details
fn handle_transaction_update(&self, transaction_update: &SubscribeUpdateTransaction) {
if let Some(transaction) = &transaction_update.transaction {
println!("Transaction Update:");
println!(" Signature: {}", hex::encode(&transaction.signature));
if let Some(meta) = &transaction.meta {
println!(" Status: {}", if meta.err.is_none() { "Success" } else { "Failed" });
println!(" Fee: {:?}", meta.fee);
println!(" Compute Units: {:?}", meta.compute_units_consumed);
}
if let Some(transaction_message) = &transaction.transaction {
if let Some(message) = &transaction_message.message {
println!("Account Keys:");
for (i, key) in message.account_keys.iter().enumerate() {
if let Ok(pubkey) = Pubkey::try_from(key.as_slice()) {
println!(" {}: {}", i, pubkey);
} else {
println!(" {}: {}", i, hex::encode(key));
}
}
println!("Instructions:");
for (i, instruction) in message.instructions.iter().enumerate() {
println!("Instruction {}:", i + 1);
if let Some(program_key) = message.account_keys.get(instruction.program_id_index as usize) {
if let Ok(pubkey) = Pubkey::try_from(program_key.as_slice()) {
println!(" Program: {}", pubkey);
} else {
println!(" Program: {}", hex::encode(program_key));
}
}
println!(" Accounts:");
for &account_idx in &instruction.accounts {
if let Some(key) = message.account_keys.get(account_idx as usize) {
if let Ok(pubkey) = Pubkey::try_from(key.as_slice()) {
println!(" {}", pubkey);
} else {
println!(" {}", hex::encode(key));
}
}
}
println!(" Data: {}", hex::encode(&instruction.data));
}
if let Some(meta) = &transaction.meta {
if !meta.log_messages.is_empty() {
println!("Log Messages:");
for log in &meta.log_messages {
println!(" {}", log);
}
}
println!("Balance Changes:");
for i in 0..meta.pre_balances.len().min(meta.post_balances.len()) {
let pre = meta.pre_balances[i];
let post = meta.post_balances[i];
if pre != post {
if let Some(key) = message.account_keys.get(i) {
if let Ok(pubkey) = Pubkey::try_from(key.as_slice()) {
println!(" Account {}: {} -> {} (Δ{})",
pubkey,
pre, post, post.saturating_sub(pre));
} else {
println!(" Account {}: {} -> {} (Δ{})",
hex::encode(key),
pre, post, post.saturating_sub(pre));
}
}
}
}
}
}
}
}
}
/// Creates a new GrpcStreamManager instance
///
/// # Arguments
/// * `endpoint` - The gRPC endpoint URL
/// * `x_token` - Authentication token for the endpoint
async fn new(endpoint: &str, x_token: &str) -> Result<Arc<Mutex<GrpcStreamManager>>> {
let interceptor = InterceptorXToken {
x_token: Some(x_token.parse().map_err(|e: InvalidMetadataValue| anyhow::Error::from(e))?),
x_request_snapshot: true,
};
let channel = Endpoint::from_shared(endpoint.to_string())?
.connect_timeout(Duration::from_secs(10))
.timeout(Duration::from_secs(10))
.connect()
.await
.map_err(|e| anyhow::Error::from(e))?;
let client = GeyserGrpcClient::new(
HealthClient::with_interceptor(channel.clone(), interceptor.clone()),
GeyserClient::with_interceptor(channel, interceptor),
);
Ok(Arc::new(Mutex::new(GrpcStreamManager {
client,
is_connected: false,
reconnect_attempts: 0,
max_reconnect_attempts: 10,
reconnect_interval: Duration::from_secs(5),
})))
}
/// Establishes connection and handles the subscription stream
///
/// # Arguments
/// * `request` - The subscription request containing transaction filters and other parameters
async fn connect(&mut self, request: SubscribeRequest) -> Result<()> {
let request = request.clone();
let (mut subscribe_tx, mut stream) = self.client.subscribe_with_request(Some(request.clone())).await?;
self.is_connected = true;
self.reconnect_attempts = 0;
// Create a channel for sending ping requests
let (ping_sender, mut ping_receiver) = mpsc::channel(10);
// Add client-initiated ping mechanism
let ping_handle = tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(30));
loop {
interval.tick().await;
if let Err(e) = ping_sender.send(()).await {
error!("Failed to send ping signal: {:?}", e);
break;
}
}
});
// Process both stream messages and ping requests
loop {
tokio::select! {
Some(message) = stream.next() => {
match message {
Ok(msg) => {
match msg.update_oneof {
Some(UpdateOneof::Transaction(transaction)) => {
self.handle_transaction_update(&transaction);
}
Some(UpdateOneof::Ping(_)) => {
subscribe_tx
.send(SubscribeRequest {
ping: Some(SubscribeRequestPing { id: 1 }),
..Default::default()
})
.await?;
}
Some(UpdateOneof::Pong(_)) => {} // Ignore pong responses
_ => {
println!("Other update received: {:?}", msg);
}
}
}
Err(err) => {
error!("Error: {:?}", err);
self.is_connected = false;
ping_handle.abort(); // Cleanup ping task
Box::pin(self.reconnect(request.clone())).await?;
return Ok(());
}
}
}
Some(_) = ping_receiver.recv() => {
// Send ping when requested by the ping task
if let Err(e) = subscribe_tx
.send(SubscribeRequest {
ping: Some(SubscribeRequestPing { id: 1 }),
..Default::default()
})
.await
{
error!("Failed to send ping: {:?}", e);
break;
}
}
else => break,
}
}
ping_handle.abort(); // Cleanup ping task
Ok(())
}
/// Attempts to reconnect when the connection is lost
///
/// # Arguments
/// * `request` - The original subscription request to reestablish the connection
async fn reconnect(&mut self, request: SubscribeRequest) -> Result<()> {
if self.reconnect_attempts >= self.max_reconnect_attempts {
println!("Max reconnection attempts reached");
return Ok(());
}
self.reconnect_attempts += 1;
println!("Reconnecting... Attempt {}", self.reconnect_attempts);
let backoff = self.reconnect_interval * std::cmp::min(self.reconnect_attempts, 5);
tokio::time::sleep(backoff).await;
Box::pin(self.connect(request)).await
}
}
#[tokio::main]
async fn main() -> Result<()> {
// Initialize gRPC stream manager
let manager = GrpcStreamManager::new(
"your-grpc-url:2053",
"your-x-token",
).await?;
let mut manager_lock = manager.lock().await;
// Create subscription request for token program transactions
let request = SubscribeRequest {
transactions: HashMap::from_iter(vec![(
"transactions".to_string(),
SubscribeRequestFilterTransactions {
vote: Some(false),
failed: Some(false),
signature: None,
account_include: vec!["TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA".to_string()],
account_exclude: vec![],
account_required: vec![],
},
)]),
commitment: Some(CommitmentLevel::Confirmed as i32),
..Default::default()
};
println!("Starting subscription for Token Program transactions");
// Start the subscription
let result = manager_lock.connect(request).await;
if let Err(e) = &result {
println!("Subscription error: {:?}", e);
}
result?;
Ok(())
}
package main
import (
"context"
"crypto/tls"
"fmt"
"log"
"sync"
"time"
"github.com/mr-tron/base58"
pb "github.com/rpcpool/yellowstone-grpc/examples/golang/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
)
type GrpcStreamManager struct {
mu sync.Mutex
conn *grpc.ClientConn
client pb.GeyserClient
stream pb.Geyser_SubscribeClient
isConnected bool
reconnectAttempts int
maxReconnectAttempts int
reconnectInterval time.Duration
dataHandler func(*pb.SubscribeUpdate)
xToken string
}
func NewGrpcStreamManager(endpoint string, xToken string, dataHandler func(*pb.SubscribeUpdate)) (*GrpcStreamManager, error) {
// Create gRPC connection with interceptor for x-token
ctx := metadata.NewOutgoingContext(
context.Background(),
metadata.New(map[string]string{"x-token": xToken}),
)
// Configure TLS
config := &tls.Config{
InsecureSkipVerify: true,
}
conn, err := grpc.DialContext(ctx, endpoint,
grpc.WithTransportCredentials(credentials.NewTLS(config)),
grpc.WithInitialWindowSize(1<<30),
grpc.WithInitialConnWindowSize(1<<30),
grpc.WithDefaultCallOptions(
grpc.MaxCallSendMsgSize(64*1024*1024),
grpc.MaxCallRecvMsgSize(64*1024*1024),
),
)
if err != nil {
return nil, fmt.Errorf("failed to connect: %v", err)
}
return &GrpcStreamManager{
conn: conn,
client: pb.NewGeyserClient(conn),
isConnected: false,
reconnectAttempts: 0,
maxReconnectAttempts: 10,
reconnectInterval: 5 * time.Second,
dataHandler: dataHandler,
xToken: xToken,
}, nil
}
func (m *GrpcStreamManager) Connect(ctx context.Context, req *pb.SubscribeRequest) error {
m.mu.Lock()
defer m.mu.Unlock()
log.Println("Attempting to connect...")
// Add x-token to context
ctx = metadata.NewOutgoingContext(
ctx,
metadata.New(map[string]string{"x-token": m.xToken}),
)
stream, err := m.client.Subscribe(ctx)
if err != nil {
log.Printf("Failed to subscribe: %v", err)
return m.reconnect(ctx, req)
}
if err := stream.Send(req); err != nil {
log.Printf("Failed to send request: %v", err)
return m.reconnect(ctx, req)
}
m.stream = stream
m.isConnected = true
m.reconnectAttempts = 0
log.Println("Connection established")
// Start ping goroutine
go func() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !m.isConnected {
return
}
pingReq := &pb.SubscribeRequest{
Ping: &pb.SubscribeRequestPing{Id: 1},
}
if err := m.stream.Send(pingReq); err != nil {
log.Printf("Ping failed: %v", err)
m.handleDisconnect(ctx, req)
return
}
}
}
}()
// Process updates
go func() {
for {
update, err := m.stream.Recv()
if err != nil {
log.Printf("Stream error: %v", err)
m.handleDisconnect(ctx, req)
return
}
m.dataHandler(update)
}
}()
return nil
}
func (m *GrpcStreamManager) reconnect(ctx context.Context, req *pb.SubscribeRequest) error {
if m.reconnectAttempts >= m.maxReconnectAttempts {
return fmt.Errorf("max reconnection attempts reached")
}
m.reconnectAttempts++
log.Printf("Reconnecting... Attempt %d", m.reconnectAttempts)
backoff := m.reconnectInterval * time.Duration(min(m.reconnectAttempts, 5))
time.Sleep(backoff)
return m.Connect(ctx, req)
}
func (m *GrpcStreamManager) handleDisconnect(ctx context.Context, req *pb.SubscribeRequest) {
m.mu.Lock()
defer m.mu.Unlock()
if !m.isConnected {
return
}
m.isConnected = false
if err := m.reconnect(ctx, req); err != nil {
log.Printf("Failed to reconnect: %v", err)
}
}
func (m *GrpcStreamManager) Close() error {
m.mu.Lock()
defer m.mu.Unlock()
m.isConnected = false
if m.conn != nil {
return m.conn.Close()
}
return nil
}
func min(a, b int) int {
if a < b {
return a
}
return b
}
func handleAccountUpdate(update *pb.SubscribeUpdate) {
if tx := update.GetTransaction(); tx != nil {
log.Printf("Transaction Update:\n")
log.Printf(" Slot: %d\n", tx.GetSlot())
txInfo := tx.GetTransaction()
if txInfo != nil {
// Print signature if available
if len(txInfo.GetSignature()) > 0 {
log.Printf(" Signature: %s\n", base58.Encode(txInfo.GetSignature()))
}
// Print if it's a vote transaction
log.Printf(" Is Vote: %v\n", txInfo.GetIsVote())
// Print transaction index
log.Printf(" Transaction Index: %d\n", txInfo.GetIndex())
// Print transaction details
if tx := txInfo.GetTransaction(); tx != nil {
if msg := tx.GetMessage(); msg != nil {
if accounts := msg.GetAccountKeys(); len(accounts) > 0 {
log.Printf(" Account Keys:\n")
for _, acc := range accounts {
if len(acc) > 0 {
log.Printf(" - %s\n", base58.Encode(acc))
}
}
}
}
}
// Print status and metadata
if meta := txInfo.GetMeta(); meta != nil {
log.Printf(" Status: %v\n", meta.GetErr() == nil)
log.Printf(" Fee: %d\n", meta.GetFee())
// Print log messages if any
if len(meta.GetLogMessages()) > 0 {
log.Printf(" Log Messages:\n")
for _, msg := range meta.GetLogMessages() {
log.Printf(" - %s\n", msg)
}
}
}
}
}
}
func boolPtr(b bool) *bool {
return &b
}
func main() {
ctx := context.Background()
// Create manager with data handler and x-token
manager, err := NewGrpcStreamManager(
"your-grpc-url:2053",
"your-x-token",
handleAccountUpdate,
)
if err != nil {
log.Fatal(err)
}
defer manager.Close()
// Create subscription request for token program transactions
transactions := make(map[string]*pb.SubscribeRequestFilterTransactions)
transactions["transactions"] = &pb.SubscribeRequestFilterTransactions{
Vote: boolPtr(false),
Failed: boolPtr(false),
AccountInclude: []string{"TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"},
AccountExclude: []string{},
AccountRequired: []string{},
}
commitment := pb.CommitmentLevel_CONFIRMED
req := &pb.SubscribeRequest{
Transactions: transactions,
Commitment: &commitment,
}
// Connect and handle updates
if err := manager.Connect(ctx, req); err != nil {
log.Fatal(err)
}
// Keep the main goroutine running
select {}
}