• No results found

TWO KINDS OF DATA THIS TALK IS ABOUT REFERENCE DATA. nicknames user, transactional reference, non-transactional

N/A
N/A
Protected

Academic year: 2021

Share "TWO KINDS OF DATA THIS TALK IS ABOUT REFERENCE DATA. nicknames user, transactional reference, non-transactional"

Copied!
75
0
0

Loading.... (view fulltext now)

Full text

(1)
(2)

LEON STEIN

current: chief architect, @decide.com

before that: farecast.com (now http://www.bing.com/travel) before that: amazon

(3)

TWO KINDS OF DATA

nicknames “user”, “transactional” “reference”, “non-transactional”

examples: •  user accounts

•  shopping cart/orders

•  user messages

•  …

•  product/offer catalogs

•  service catalogs

•  static geolocation data

•  dictionaries

•  …

created/modified by: users business (you) sensitivity to

staleness:

high low

plan for growth: hard easy

access optimization: read/write mostly read

(4)

reference data

user data
(5)

reference data

user data
(6)

reference data

user data
(7)

REFERENCE DATA: NEED FOR SPEED

§ everything needs reference data all the time, often in bulk:

§ the site

§ search index builder

§ internal applications/dashboards

§ internal reporting

§ internal analytics

§ speed of your site <= speed of your reference data

(8)

WHAT IS FAST?

main memory read 0.0001 ms (100 ns)

network roundtrip 0.5 ms (500,000 ns)

disk seek 10 ms (10,000,000 ns)

(9)

webapp webapp load b ala nc er lo ad b ala nc er BIG DATABASE service service service data loader

HOW IT ALL STARTED

availability problems

performance problems

scalability problems

(10)

REPLICA webapp webapp load b ala nc er lo ad b ala nc er service service service

REPLICATION!

BIG DATABASE data loader operational overhead availability problems performance problems scalability problems
(11)

REMOTE STORE RETRIEVAL LATENCY

TCP request: 0.5 ms lookup/write response: 0.5 ms TCP response: 0.5 ms read/parse response: 0.25 ms

remote store network client network

total time to retrieve single value:

•  from remote store: 1,75 ms

•  from memory: 0.001 ms (10 main memory reads)

sequential access of 1 million random keys:

•  from remote store: 30 minutes

(12)

webapp webapp load b ala nc er lo ad b ala nc er service ca che service ca che service ca che

LOCAL CACHING!

REPLICA BIG DATABASE data loader operational overhead performance problems scalability problems long tail performance problems consistency problems
(13)

LONG TAIL PROBLEM

example:

•  80% of requests query 10% of entries (head)

(14)

webapp webapp load b ala nc er lo ad b ala nc er replica service service service data loader BIG CACHE database preload

BIG CACHE!

operational overhead performance problems scalability problems long tail performance problems consistency problems
(15)

BIG CACHE PROBLEMS

§ additional hardware

§ additional configuration

§ additional monitoring

§ extra network hop

§ slow scanning

§ additional deserialization

}

operational overhead
(16)

webapp webapp load b ala nc er lo ad b ala nc er service service service

HELLO NOSQL!

REPLICA BIG NoSQL DATABASE data loader some operational overhead some performance problems some scalability problems
(17)

THE TRUTH ABOUT DATABASES:

FAST AS LONG AS THE DATA FITS IN MEMORY

MongoDB

“What I'm going to call as the hot data cliff: As the size of your hot data set (data frequently read at sustained rates above disk I/O capacity) approaches available memory, write operation bursts that exceeds disk write I/O capacity can create a trashing death spiral where hot disk pages that MongoDB desperately needs are evicted from disk cache by the OS as it consumes more buffer space to hold the writes in memory.”

http://www.quora.com/Is-MongoDB-a-good-replacement-for-Memcached Redis

“Redis is an in-memory but persistent on disk database, so it represents a different trade off where very high write and read speed is achieved with the limitation of data sets that can't be larger than memory.”

(18)

webapp webapp load b ala nc er lo ad b ala nc er

service cache full loader data

service cache full loader data

service cache full loader data

CAN YOU KEEP IT IN MEMORY YOURSELF?

BIG DATABASE consistency problems operational relief! performance gain! scales infinitely!

(19)

webapp webapp lo ad b ala nc er

service cache full loader data

service cache full loader data

service cache full loader data

FIXING CONSISTENCY: DEPLOYMENT “CELLS”, STICKY

USER SESSIONS

BIG DATABASE webapp deployment cell consistency problems operational relief! performance gain! scales infinitely!
(20)
(21)

"Programmers waste enormous amounts of time thinking about, or worrying about, the speed of noncritical parts of their programs, and these attempts at efficiency actually have a strong negative impact when debugging and maintenance are considered.

We should forget about small efficiencies, say about 97% of the

time: premature optimization is the root of all evil. Yet we should not pass up our opportunities in that critical 3%.”

Donald Knuth "Programmers waste enormous amounts of time thinking about, or worrying about, the speed of noncritical parts of their programs, and these attempts at efficiency actually have a strong negative impact when debugging and maintenance are considered.

We should forget about small efficiencies, say about 97% of the

time: premature optimization is the root of all evil. Yet we should not pass up our opportunities in that critical 3%.”

Donald Knuth

"Programmers waste enormous amounts of time thinking about, or worrying about, the speed of noncritical parts of their programs, and these attempts at efficiency actually have a strong negative impact when debugging and maintenance are considered.

We should forget about small efficiencies, say about 97% of the

time: premature optimization is the root of all evil. Yet we should not pass up our opportunities in that critical 3%.”

(22)

HOW TO FIT ALL THAT DATA IN MEMORY?

1.

design your domain model sensibly

2.

optimize collections

3.

optimize numeric data

4.

compress text

(23)

1. DOMAIN MODEL DESIGN

“Domain Layer (or Model Layer): Responsible for representing

concepts of the business, information about the business situation, and business rules. State that reflects the business situation is

controlled and used here, even though the technical details of

storing it are delegated to the infrastructure. This layer is the heart of business software.”

(24)

1. DOMAIN MODEL DESIGN

§

keep it immutable

§

use independent hierarchies of smaller entities

(25)

INTERN() YOUR IMMUTABLES!

V1 A C B D E V2 F B’ C’ E’ K1 K2 D’ E’ V1 A C B D E V2 F K1 K2
(26)

INTERN() YOUR IMMUTABLES!

private  final  Map<Class<?>,  Map<Object,  WeakReference<Object>>>  cache  =    

       new  ConcurrentHashMap<Class<?>,  Map<Object,  WeakReference<Object>>>();    

public  <T>  T  intern(T  o)  {    if  (o  ==  null)  

   return  null;  

Class<?>  c  =  o.getClass();  

 Map<Object,  WeakReference<Object>>  m  =  cache.get(c);    if  (m  ==  null)    

   cache.put(c,  m  =  synchronizedMap(new  WeakHashMap<Object,  WeakReference<Object>>()));  WeakReference<Object>  r  =  m.get(o);  

 @SuppressWarnings("unchecked")  

 T  v  =  (r  ==  null)  ?  null  :  (T)  r.get();    if  (v  ==  null)  {  

   v  =  o;  

   m.put(v,  new  WeakReference<Object>(v));    }      

 return  v;   }  

(27)

USE INDEPENDENT HIERARCHIES

Product id = … title= … Offers Specifications Description Reviews Rumors Model History Product Summary productId = … Offers productId = … Specifications productId = … Description productId = … Reviews productId = … Rumors productId = … Model History productId = … Product Info

}

(28)

2. COLLECTION OPTIMIZATION

§

leverage primitive keys/values

(29)

LEVERAGE PRIMITIVE KEYS/VALUES

Trove (“High Performance Collections for Java”)

collection with 10,000 elements [0 .. 9,999] size in memory

java.util.ArrayList<Integer>   200K java.util.HashSet<Integer>   546K gnu.trove.list.array.TIntArrayList   40K gnu.trove.set.hash.TIntHashSet  

(30)

OPTIMIZE SMALL IMMUTABLE COLLECTIONS

§ java.util.HashMap: 128 bytes + 32 bytes per entry

§ compact immutable map: 24 bytes + 8 bytes per entry

class  ImmutableMap<K,  V>  implements  Map<K,V>,  Serializable  {  …  }  

 

class  MapN<K,  V>  extends  ImmutableMap<K,  V>  {    final  K  k1,  k2,  …,  kN;  

 final  V  v1,  v2,  …,  vN;  

 @Override  public  boolean  containsKey(Object  key)  {  

   if  (eq(key,  k1))  return  true;      if  (eq(key,  k2))  return  true;        

   return  false;    }  

 …  

(31)

3. NUMERIC DATA OPTIMIZATION

§

use primitives!

(32)

EXAMPLE: PRICE HISTORY

Problem:

store daily prices for 1M products, 2 offers per product average price history length: 2 years

Price points:

(33)
(34)

PRICE HISTORY: FIRST ATTEMPT

 

TreeMap<Date,  Double>  

(35)

KNOW YOUR DATA: TYPICAL SHOPPING PRICE HISTORY

pr ice days 0 20 60 70 90 100 120 121 $100
(36)

PRICE HISTORY: OPTIMIZATION

keep it simple: Run-length encoding

•  positive: price (adjusted to scale)

•  negative: run length (precedes price)

•  zero: unavailable

drop pennies, store prices in primitive short (use scale factor to represent prices greater than Short.MAX_VALUE)

memory: 15 * 2 + 16 (array) + 24 (start date) + 4 (scale factor) = 74 bytes

Ø  reduction compared to TreeMap<Date,  Double>: 155 times

Ø  estimated memory for 2 billion price points: 1.2 GB (real life price history data

compresses much better than that, since tail offers are pretty flat)

(37)

PRICE HISTORY: MODEL

public  class  PriceHistory  {    

 private  final  Date  startDate;  //  or  use  org.joda.time.LocalDate    private  final  short[]  encoded;  

 private  final  int  scaleFactor;    

 public  PriceHistory(SortedMap<Date,  Double>  prices)  {  …  }  //  encode    public  SortedMap<Date,  Double>  getPricesByDate()  {  …  }  //  decode    public  Date  getStartDate()  {  return  startDate;  }  

 

 //  Below  computations  implemented  directly  against  encoded  data  

 public  Date  getEndDate()  {  …  }    public  Double  getMinPrice()  {  …  }  

 public  int  getNumChanges(double  minChangeAmt,  double  minChangePct,  boolean  abs)  {  …  }  public  PriceHistory  trim(Date  startDate,  Date  endDate)  {  …  }  

 public  PriceHistory  interpolate()  {  …  }    …  

(38)

4. STRING COMPRESSION: BYTE ARRAYS

§  Java stores strings internally as UTF-16

§  converting to byte array in UTF-8 encoding preserves non-ASCII chars, while reducing storage

required for ASCII chars by half:

static  Charset  UTF8  =  Charset.forName("UTF-­‐8");  

String  s  =  "The  quick  brown  fox  jumps  over  the  lazy  dog”;  //  42  chars,  136  bytes  

byte[]  b  =  "The  quick  brown  fox  jumps  over  the  lazy  dog”.getBytes(UTF8);  //  64  bytes  

String  s1  =  “Hello”;  //  5  chars,  64  bytes  

byte[]  b1  =  “Hello”.getBytes(UTF8);  //  24  bytes    

byte[]  toBytes(String  s)  {  return  s  ==  null  ?  null  :  s.getBytes(UTF8);  }   String  toString(byte[]  b)  {  return  b  ==  null  ?  null  :  new  String(b,  UTF8);  }  

(39)

STRING COMPRESSION: SHARED PREFIX

§ works on similar URLs, etc. § recursively reuses prefixes

§ use interning technique to lookup shared prefixes

public  class  PrefixedString  {  

 private  PrefixedString  prefix;    private  byte[]  suffix;  

 …  

 @Override  public  int  hashCode()  {  …  }  

 @Override  public  boolean  equals(Object  o)  {  …  }   }  

(40)

STRING COMPRESSION: SHORT ALPHANUMERIC

CASE-INSENSITIVE STRINGS

public  abstract  class  AlphaNumericString  {  

 public  static  AlphaNumericString  make(String  s)  {  

     try  {  return  new  Numeric(Long.parseLong(s,  Character.MAX_RADIX));  

     }  catch  (NumberFormatException  e)  {  return  new  Alpha(s.getBytes(UTF8));  }    }  

 protected  abstract  String  value();  

 @Override  public  String  toString()  {return  value();  }  

 private  static  class  Numeric  extends  AlphaNumericString  {  

     long  value;  

     Numeric(long  value)  {  this.value  =  value;  }  

     @Override  protected  String  value()  {  return  Long.toString(value,  Character.MAX_RADIX);  }  

     @Override  public  int  hashCode()  {  …  }  

     @Override  public  boolean  equals(Object  o)  {  …  }    }  

 private  static  class  Alpha  extends  AlphaNumericString  {  

     byte[]  value;  

     Alpha(byte[]  value)  {this.value  =  value;  }  

     @Override  protected  String  value()  {  return  new  String(value,  UTF8);  }  

     @Override  public  int  hashCode()  {  …  }  

     @Override  public  boolean  equals(Object  o)  {  …  }    }  

(41)

STRING COMPRESSION: LARGE STRINGS

§ gzip – fast

§ bzip2 – better compression, slower

§ larger strings (MBs) containing natural text compress better (15-25%)

§ for smaller strings (2-3K) good compression rates are around 50%

(42)

5. JVM TUNING

§ make sure compressed pointers are used (-XX:+UseCompressedOops)

§ use low pause GC (Concurrent Mark Sweep, G1)

§ heap sizing

§ overprovision total heap by ~30%

§ adjust generation sizes/ratios

§ print garbage collection

§ if GC pauses are still prohibitive (usually as heap grows over 4-5G),

(43)

CACHE LOADING

webapp webapp lo ad b ala nc er

service cache full loader data

service cache full loader data

service cache full loader data

webapp reliable file store (S3)

“cooked” datasets

(44)

CACHE LOADING

§  “cooked” datasets are placed as (compressed) flat files into a reliable store (e.g.

S3)

§  keep format simple: CSV, JSON (can you read them a year from now?)

§  each cache loader polls independently for new datasets

§  poll frequency ~ data inconsistency (if you poll every 5 minutes, data in the caches

(45)

CACHE LOADING: TIME SENSITIVITY

§ low: infrequently changed data (tax rates by zip code, IP address

geo location mapping)

§ medium: product information

(46)

CACHE LOADING: LOW TIME SENSITIVITY DATA

§ full feeds are generated as new data is available

/tax-rates /date=2012-05-01 tax-rates.2012-05-01.csv.gz /date=2012-06-01 tax-rates.2012-06-01.csv.gz /date=2012-07-01 tax-rates.2012-07-01.csv.gz

(47)

CACHE LOADING: MEDIUM/HIGH TIME

SENSITIVITY

§ full feeds are generated daily

§ incremental feeds are generated as new data is available /price-obs /date=2012-07-01 price-obs.2012-07-01.csv.gz /date=2012-07-02 /full /date=2012-07-01 2012-07-01T00-10-00.csv.gz /inc 2012-07-01T00-20-00.csv.gz

(48)

CACHE LOADING STRATEGY: SWAP

§ build new cache on the side and swap

§ breaking down your domain model into independent caches

pays off

§ cache is essentially immutable, no locking needed for

concurrent access

§ works well for infrequently updated datasets and/or datasets

that are fully refreshed on each update § Containers

§ Trove maps for primitive keys § HashMap otherwise

(49)

CACHE LOADING STRATEGY: CRUD

§ trickier to keep track of deletions

§ avoid full synchronization (like in Collections.synchronizedMap)

§ containers

§ Trove maps + ReentrantReadWriteLock  for primitive keys  

§ otherwise, use ConcurrentHashMap – virtually no synchronization, but

takes more memory

§ consider loading cache in small batches, locking/unlocking for each key

with short pauses (100 ms) between batches to unblock readers and allow GC

§ use separate container per partition to improve concurrency and

(50)

CONCURRENT LOCKING WITH TROVE MAP

public  class  LongCache<V>  {  

 private  TLongObjectMap<V>  map  =  new  TLongObjectHashMap<V>();  

 private  ReentrantReadWriteLock  lock  =  new  ReentrantReadWriteLock();  

private  Lock  r  =  lock.readLock(),  w  =  lock.writeLock();    public  V  get(long  k)  {  

 r.lock();  

   try  {  return  map.get(k);  }  finally  {  r.unlock();  }    }  

 public  V  update(long  k,  V  v)  {      w.lock();  

   try  {  return  map.put(k,  v);  }  finally  {  w.unlock();  }    }  

 public  V  remove(long  k)  {      w.lock();  

   try  {  return  map.remove(k);  }  finally  {  w.unlock();  }    }  

}    

(51)

CACHE LOADING: OPTIMIZATIONS

§ keep local feed copies (may be faster to read, reduces

dependency on remote storage)

§ generate periodic serialized data/state dumps (faster restarts)

(52)

service instance

DEPENDENT CACHES

cache A cache B cacheC cache D

§ compute derived data

§ compute statistics/metrics

§ use observer/observable pattern

§ aggregate “ready” status

service status aggregator (servlet) dependencies lo ad b ala nc er health check

(53)

DEPLOYMENT CELL STATUS

§ similarly aggregate status across all services in a deployment cell

before joining load balancer pool

cell status aggregator lo ad b ala nc er health check webapp status aggregator service 1 status aggregator service 2 status aggregator HTTP or JMX deployment cell

(54)
(55)

REASONS TO PARTITION

1.

data doesn’t fit in the heap (not enough RAM)

(56)

CHOOSING PARTITIONING GRAIN

look for opportunities to leverage natural partitioning product categories (in shopping)

airfare markets (in travel) geographical divisions

keeping partitions at natural granularity allows: avoid complexity for data rebalancing

(57)

PARTITIONING DECISION TREE

does my data fit in a single VM? can I partition statically? use fixed partitioning don’t partition use dynamic partitioning

harder

(58)

FIXED PARTITIONING

partitions are assigned to nodes through configuration

when to consider:

small number of partitions (up to few 100s) small number of nodes (up to few 10s)

(59)

FIXED PARTITION ASSIGNMENT

p 1 p 2 p 3 p 4 deployment cell webapp p 1 p 2 p 3 p 4 webapp lo ad b ala nc er
(60)

DYNAMIC PARTITIONING

partitions are dynamically assigned to nodes at runtime when to consider:

1000s of (natural) partitions 100s of nodes

partitions constantly added/removed nodes are constantly added/removed

(61)

DYNAMIC PARTITIONING

p 1 p 2 webapp lo ad b ala nc er webapp lo ad b ala nc er p 3 p 4 p 5 p 6 p 4 p 5 p 6 p 7 p 8 p 9 p 7 p 8 p 9 p 1 p 2 p 3 primary secondary extra network hop is back
(62)

DYNAMIC PARTITIONING:

REMOVING EXTRA HOP

p 1 p 2 webapp lo ad b ala nc er webapp p 3 p 4 p 5 p 6 p 4 p 5 p 6 p 7 p 8 p 9 p 7 p 8 p 9 p 1 p 2 p 3 primary secondary

(63)

DYNAMIC PARTITION ASSIGNMENT

each partition lives on at least 2 nodes: primary/secondary.

primary/secondary/… nodes for each partition are determined from node/partition sets via a simple computation.

“leader” node is the authority for: 1.  member node set

(64)

DYNAMIC PARTITION ASSIGNMENT

member nodes get “active state” (node/partition sets) from the leader node. as nodes/partitions are added/removed:

1. leader is (re)elected (e.g. node with lowest PID on machine with lowest IP) 2. member nodes learn “target state” from the leader

3. all nodes load partitions to achieve “target state” 4. leader tracks rebalancing progress

5. once “target” state is achieved, “target state” become “active state” 6. nodes drop partitions they are no longer responsible for

(65)

AD-HOC CACHE QUERYING

§ scanning in-memory data is very fast (seconds to scan millions of

records)

§ evaluating domain model methods directly can be very powerful

(compared to database queries) § compute statistics

§ compute metrics

§ generate reports

§ troubleshooting queries

(66)

AD-HOC CACHE QUERYING: LANGUAGES

§ can use scripting languages/query frameworks for ad-hoc

scanning § Groovy

§ JRuby

§ MVEL (http://mvel.codehaus.org)

(67)

AD-HOC CACHE QUERYING: ORGANIZING QUERY

§ break query into the following parts:

§ extractor expressions (like SQL “SELECT” clause)

§ filter (evaluates to boolean, like SQL “WHERE” clause)

§ sorter (like SQL “ORDER BY” clause)

§ limit (like SQL “LIMIT” clause)

§ simplifies query evaluation

§ simplifies supporting parallel query execution (a-la map/reduce)

§ parallel execution speeds up execution on multi-core or partitioned

(68)

partition 1

PARALLEL (FAN-OUT) QUERY EXECUTION

prune partitions filter sort limit extract intermediate results

partition 2 filter sort limit intermediate results partition N filter sort limit intermediate results sort limit final results
(69)

PARALLEL (FAN-OUT) QUERY: MULTI-LEVEL

REDUCTION

p 1 p 2 p 3 r 1 p M p M+1 r 2 p N r 3 r 4
(70)

location 3

OPTIMIZING FAN-OUT QUERIES FOR NETWORK

TOPOLOGY WITH MULTI-LEVEL REDUCTION

p 13 p 14 p 15 p 16 p 17 p 18 location 2 p 7 p 8 p 9 p 10 p 11 p 12 location 1 p 1 p 2 p 3 p 4 p 5 p 6

slower/lower capacity links faster/higher capacity links

(71)

JOSQL

execute SQL-like queries on collections of Java objects § supports grouping, variables, custom functions

§ doesn’t support joins

example (from JoSQL site):

SELECT  *  

FROM    java.io.File  

WHERE  name  $LIKE  "%.html"  

AND      toDate  (lastModified)    

(72)

JOSQL QUERY EXAMPLE

find average number of significant price changes and average price history by product category for a specific seller and time period (first half of 2012)

 

select  @priceChangeRate,  product,  @avgPriceHistory    

from  products    

where  offers.lowestPrice  >=  100    

   and  offer(offers,  12).priceHistory.startDate  <=  date('2012-­‐01-­‐01')    

   and  offer(offers,  12).priceHistory.endDate  >=  date('2012-­‐06-­‐01')  

   and  decideRank.rank  <  200  

group  by  category(product.primaryCategoryId).name  

group  by  order  1  

execute  on  group_by_results    

   sum(getNumberOfDailyPriceChanges(  

       trimPriceHistory(offer(offers,  12).priceHistory,  '2012-­‐01-­‐01',  '2012-­‐06-­‐01'),    

       offer(offers,  12).priceHistory.startDate,  15,  5))  /  count(1)  as  priceChangeRate,    

   avgPriceHistory(trimPriceHistory(offer(offers,  12).priceHistory,  '2012-­‐01-­‐01',  '2012-­‐06-­‐01'))    

(73)
(74)

MONITORING

cache readiness: § rolled up status aggregate metrics: § staleness § total counts § counts by partition § counts by attribute
(75)

References

Related documents

Due to the political uncertainty and insecurity, the achievement of the Millennium Development Goals were delayed until 2020 (grace period), which gives an indication of the

  Campus and Location 

The PI Integrator for Business Analytics creates seamless integration between the PI System and enterprise data warehouse systems enabling business intelligence

Intermediate demand [commodities, activities]: Data for this sub-matrix of the micro SAM stems from four main sources: the 1985 31-Sector Oxford Input-Output Table , the

Il fine dell’attuazione della concreta volontà di legge materiale non è perseguito a ogni costo, ma è controbilanciato da altri scopi, quali l’economia e la

But we also tried to encourage an environment where the work place could be an open community where the question of a person’s moral and spiritual development and the existence

implemented with the use of high quality service desk software, companies can follow a repeatable process to consistently deliver high-quality services to customers and

levels in water samples consumed by animals in Iğdır It is reported that F occurs enormous amounts in province, Turkey (north of Mount Ağrı), and also to volcanic materials